Eureka源码分析
2026/1/15大约 5 分钟Eureka源码分析原理
Eureka 源码分析
一、核心类结构
1.1 类图概览
1.2 核心组件
| 组件 | 说明 |
|---|---|
| DiscoveryClient | 客户端核心类,负责注册、续约、获取注册表 |
| EurekaServerContext | 服务端上下文 |
| PeerAwareInstanceRegistry | 集群感知的注册表 |
| PeerEurekaNodes | 集群节点管理 |
二、客户端启动流程
2.1 自动配置
// EurekaClientAutoConfiguration
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
public class EurekaClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean(EurekaClient.class)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, ...);
}
}2.2 DiscoveryClient 初始化
// DiscoveryClient 构造函数(伪代码)
public DiscoveryClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
// 1. 初始化配置
this.clientConfig = config;
this.applicationInfoManager = manager;
// 2. 初始化线程池
scheduler = Executors.newScheduledThreadPool(2);
heartbeatExecutor = new ThreadPoolExecutor(...);
cacheRefreshExecutor = new ThreadPoolExecutor(...);
// 3. 获取注册表
if (clientConfig.shouldFetchRegistry()) {
fetchRegistry(false);
}
// 4. 注册服务
if (clientConfig.shouldRegisterWithEureka()) {
register();
}
// 5. 启动定时任务
initScheduledTasks();
}2.3 定时任务初始化
// initScheduledTasks(伪代码)
private void initScheduledTasks() {
// 1. 注册表刷新任务
if (clientConfig.shouldFetchRegistry()) {
scheduler.schedule(
new CacheRefreshThread(),
registryFetchIntervalSeconds,
TimeUnit.SECONDS
);
}
// 2. 心跳续约任务
if (clientConfig.shouldRegisterWithEureka()) {
scheduler.schedule(
new HeartbeatThread(),
renewalIntervalInSecs,
TimeUnit.SECONDS
);
}
// 3. 实例信息复制任务
instanceInfoReplicator = new InstanceInfoReplicator(this);
instanceInfoReplicator.start(initialInstanceInfoReplicationIntervalSeconds);
}三、服务注册源码
3.1 客户端注册
// DiscoveryClient.register()
boolean register() {
EurekaHttpResponse<Void> httpResponse;
try {
// 发送 POST 请求到 /eureka/apps/{appName}
httpResponse = eurekaTransport.registrationClient
.register(instanceInfo);
} catch (Exception e) {
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}3.2 服务端处理
// ApplicationResource.addInstance()
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION)
String isReplication) {
// 1. 参数校验
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
}
// 2. 注册到本地
registry.register(info, "true".equals(isReplication));
// 3. 返回 204
return Response.status(204).build();
}3.3 注册表写入
// PeerAwareInstanceRegistryImpl.register()
public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 1. 调用父类注册
super.register(info, leaseDuration, isReplication);
// 2. 同步到其他节点(非复制请求才同步)
if (!isReplication) {
replicateToPeers(Action.Register, info.getAppName(),
info.getId(), info, null, isReplication);
}
}
// AbstractInstanceRegistry.register()
public void register(InstanceInfo registrant, int leaseDuration,
boolean isReplication) {
// 1. 获取或创建应用的实例 Map
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
if (gMap == null) {
ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
new ConcurrentHashMap<>();
gMap = registry.putIfAbsent(appName, gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 2. 创建租约
Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
// 3. 存入注册表
gMap.put(registrant.getId(), lease);
// 4. 加入最近变更队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 5. 使缓存失效
invalidateCache(appName, registrant.getVIPAddress(),
registrant.getSecureVipAddress());
}四、心跳续约源码
4.1 客户端心跳
// HeartbeatThread
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
// DiscoveryClient.renew()
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 发送 PUT 请求到 /eureka/apps/{appName}/{instanceId}
httpResponse = eurekaTransport.registrationClient
.sendHeartBeat(instanceInfo.getAppName(),
instanceInfo.getId(),
instanceInfo,
null);
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
// 实例不存在,重新注册
register();
return true;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
return false;
}
}4.2 服务端处理
// InstanceResource.renewLease()
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("status") String status) {
// 1. 续约
boolean isSuccess = registry.renew(app.getName(), id,
"true".equals(isReplication));
if (!isSuccess) {
// 实例不存在
return Response.status(Status.NOT_FOUND).build();
}
return Response.ok().build();
}
// AbstractInstanceRegistry.renew()
public boolean renew(String appName, String id, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
return false;
}
// 更新最后更新时间
leaseToRenew.renew();
return true;
}五、服务发现源码
5.1 获取注册表
// DiscoveryClient.fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
try {
Applications applications;
if (forceFullRegistryFetch ||
clientConfig.shouldDisableDelta() ||
localRegionApps.get() == null) {
// 全量获取
applications = getAndStoreFullRegistry();
} else {
// 增量获取
applications = getAndUpdateDelta(applications);
}
// 更新本地缓存
localRegionApps.set(applications);
return true;
} catch (Throwable e) {
return false;
}
}5.2 全量获取
// DiscoveryClient.getAndStoreFullRegistry()
private Applications getAndStoreFullRegistry() {
// GET /eureka/apps
EurekaHttpResponse<Applications> httpResponse =
eurekaTransport.queryClient.getApplications();
Applications apps = httpResponse.getEntity();
if (apps != null) {
// 打乱顺序,实现负载均衡
apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
return apps;
}5.3 增量获取
// DiscoveryClient.getAndUpdateDelta()
private Applications getAndUpdateDelta(Applications applications) {
// GET /eureka/apps/delta
EurekaHttpResponse<Applications> httpResponse =
eurekaTransport.queryClient.getDelta();
Applications delta = httpResponse.getEntity();
if (delta == null) {
// 增量为空,全量获取
return getAndStoreFullRegistry();
}
// 合并增量到本地
updateDelta(delta);
// 计算 hashCode 校验
String reconcileHashCode = getReconcileHashCode(applications);
if (!reconcileHashCode.equals(delta.getAppsHashCode())) {
// hashCode 不一致,全量获取
return getAndStoreFullRegistry();
}
return applications;
}六、服务剔除源码
6.1 剔除定时任务
// AbstractInstanceRegistry.EvictionTask
class EvictionTask extends TimerTask {
public void run() {
try {
// 计算补偿时间
long compensationTimeMs = getCompensationTimeMs();
// 执行剔除
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
}6.2 剔除逻辑
// AbstractInstanceRegistry.evict()
public void evict(long additionalLeaseMs) {
// 1. 检查是否允许剔除(自我保护)
if (!isLeaseExpirationEnabled()) {
return;
}
// 2. 收集过期实例
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry :
registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
for (Entry<String, Lease<InstanceInfo>> leaseEntry :
leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs)) {
expiredLeases.add(lease);
}
}
}
// 3. 计算剔除数量(最多剔除 15%)
int registrySize = getLocalRegistrySize();
int evictionLimit = (int) (registrySize * 0.15);
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
// 4. 随机剔除
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
int next = random.nextInt(expiredLeases.size());
Lease<InstanceInfo> lease = expiredLeases.get(next);
// 执行剔除
internalCancel(lease.getHolder().getAppName(),
lease.getHolder().getId(), false);
expiredLeases.remove(next);
}
}七、三级缓存源码
7.1 缓存结构
// ResponseCacheImpl
public class ResponseCacheImpl implements ResponseCache {
// 只读缓存
private final ConcurrentMap<Key, Value> readOnlyCacheMap =
new ConcurrentHashMap<>();
// 读写缓存(Guava Cache)
private final LoadingCache<Key, Value> readWriteCacheMap;
// 定时同步任务
private final ScheduledExecutorService timer;
}7.2 缓存读取
// ResponseCacheImpl.get()
public String get(Key key) {
// 1. 先从只读缓存获取
Value payload = readOnlyCacheMap.get(key);
if (payload == null) {
// 2. 从读写缓存获取
payload = readWriteCacheMap.get(key);
// 3. 写入只读缓存
readOnlyCacheMap.put(key, payload);
}
return payload.getPayload();
}7.3 缓存同步
// ResponseCacheImpl 构造函数
private void scheduleCacheSync() {
timer.schedule(
() -> {
for (Key key : readOnlyCacheMap.keySet()) {
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
}
},
responseCacheUpdateIntervalMs,
TimeUnit.MILLISECONDS
);
}八、总结
核心源码要点:
- 客户端:DiscoveryClient 负责注册、心跳、获取注册表
- 服务端:PeerAwareInstanceRegistry 处理请求并同步到集群
- 注册表:ConcurrentHashMap 存储,支持高并发
- 缓存:三级缓存提高读取性能
- 剔除:定时任务检查过期实例
