文章目录

1 客户端心跳任务2 服务端处理2.1 服务注册时开启客户端心跳检查2.2 客户端发送心跳任务续约2.3 服务实例移除2.4 心跳任务闭环

结语

1 客户端心跳任务

在上一篇文章==0201服务注册源码解析-nacos2.x-微服务架构==分析客户端服务注册的时候,流程在NacosNamingService#registerInstance()的方法中,调用registerService()方法之前先执行了客户端发送心跳任务。源代码如下1-1所示:

@Override

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

if (instance.isEphemeral()) {

BeatInfo beatInfo = new BeatInfo();

beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));

beatInfo.setIp(instance.getIp());

beatInfo.setPort(instance.getPort());

beatInfo.setCluster(instance.getClusterName());

beatInfo.setWeight(instance.getWeight());

beatInfo.setMetadata(instance.getMetadata());

beatInfo.setScheduled(false);

long instanceInterval = instance.getInstanceHeartBeatInterval();

beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);

beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);

}

serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

}

判断实例为临时实例,执行发送定时心跳任务封装心跳对象-BeatInfo类型,设置服务名称、Ip、端口、集群名称等等信息BeatReactor对象添加心跳任务

我们来看下BeatReactor是干嘛的?源代码如下1-2所示:

package com.alibaba.nacos.client.naming.beat;

import com.alibaba.nacos.api.common.Constants;

import com.alibaba.nacos.client.monitor.MetricsMonitor;

import com.alibaba.nacos.client.naming.net.NamingProxy;

import com.alibaba.nacos.client.naming.utils.UtilAndComs;

import java.util.Map;

import java.util.concurrent.*;

import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;

/**

* @author harold

*/

public class BeatReactor {

private ScheduledExecutorService executorService;

private NamingProxy serverProxy;

public final Map dom2Beat = new ConcurrentHashMap();

public BeatReactor(NamingProxy serverProxy) {

this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);

}

public BeatReactor(NamingProxy serverProxy, int threadCount) {

this.serverProxy = serverProxy;

executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setDaemon(true);

thread.setName("com.alibaba.nacos.naming.beat.sender");

return thread;

}

});

}

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {

NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);

dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);

executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);

MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());

}

public void removeBeatInfo(String serviceName, String ip, int port) {

NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);

BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));

if (beatInfo == null) {

return;

}

beatInfo.setStopped(true);

MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());

}

private String buildKey(String serviceName, String ip, int port) {

return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER

+ ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;

}

class BeatTask implements Runnable {

BeatInfo beatInfo;

public BeatTask(BeatInfo beatInfo) {

this.beatInfo = beatInfo;

}

@Override

public void run() {

if (beatInfo.isStopped()) {

return;

}

long result = serverProxy.sendBeat(beatInfo);

long nextTime = result > 0 ? result : beatInfo.getPeriod();

executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);

}

}

}

ScheduledExecutorService executorService:定时任务线程池 NamingProxy serverProxy:简单理解提供远程调用 Map dom2Beat ConcurrentHashMap类型:心跳任务缓存 BeatTask:心跳线程,线程run方法通过serverProxy发起远程调用,把心跳信息发送给nacos服务端。根据返回时间重新,通过定时任务线程池执行新的心跳任务。该线程被设置为守护线程。

客户端心跳任务执行核心逻辑:

心跳管理BeatReactor缓存心跳对象 ScheduledExecutorService 定时任务线程池执行心跳任务(线程) NamingProxy发起远程调用,根据结果,重新执行步骤2来维持心跳

通过开启守护线程,定时发送心跳更新时间这种机制,有没有很熟悉的感觉?回想下redis 分布式锁或者红锁算法。

2 服务端处理

我们来到服务端这边看下,做了那些关于客户端心跳相关的处理呢?

2.1 服务注册时开启客户端心跳检查

示意图2.1-1如下所示:

首先,在之前我们讲解服务端服务注册的时候,提到创建Client的时候,代码2.1-1如下

private void createIpPortClientIfAbsent(String clientId) {

if (!clientManager.contains(clientId)) {

// 忽略客户端创建

clientManager.clientConnected(clientId, clientAttributes);

}

}

默认时临时的,我们继续查看EphemeralIpPortClientManager#()方法,代码2.1-2如下:

@Override

public boolean clientConnected(final Client client) {

clients.computeIfAbsent(client.getClientId(), s -> {

Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());

IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;

ipPortBasedClient.init();

return ipPortBasedClient;

});

return true;

}

ipPortBasedClient.init()继续查看初始化方法,代码2.1-3:

public void init() {

if (ephemeral) {

beatCheckTask = new ClientBeatCheckTaskV2(this);

HealthCheckReactor.scheduleCheck(beatCheckTask);

} else {

healthCheckTaskV2 = new HealthCheckTaskV2(this);

HealthCheckReactor.scheduleCheck(healthCheckTaskV2);

}

}

ephemeral默认为true:

创建客户端心跳检查任务定时任务线程池执行该任务

定时任务代码2.1-4:

public static void scheduleCheck(BeatCheckTask task) {

Runnable wrapperTask =

task instanceof NacosHealthCheckTask ? new HealthCheckTaskInterceptWrapper((NacosHealthCheckTask) task)

: task;

futureMap.computeIfAbsent(task.taskKey(),

k -> GlobalExecutor.scheduleNamingHealth(wrapperTask, 5000, 5000, TimeUnit.MILLISECONDS));

}

即定时任务延时5s后开始执行定时任务,间隔5s。我们来看下执行的ClientBeatCheckTaskV2具体执行了什么任务?

@Override

public void doHealthCheck() {

try {

Collection services = client.getAllPublishedService();

for (Service each : services) {

HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client

.getInstancePublishInfo(each);

interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));

}

} catch (Exception e) {

Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);

}

}

@Override

public void run() {

doHealthCheck();

}

继续追踪下InstanceBeatCheckTask任务做了什么呢?

public class InstanceBeatCheckTask implements Interceptable {

static {

CHECKERS.add(new UnhealthyInstanceChecker());

CHECKERS.add(new ExpiredInstanceChecker());

CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));

}

@Override

public void passIntercept() {

for (InstanceBeatChecker each : CHECKERS) {

each.doCheck(client, service, instancePublishInfo);

}

}

@Override

public void afterIntercept() {

}

}

该实例健康检查任务添加2项检查:不健康实例检查和过期实例健康检查

看下不健康实例检查做了什么?

public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {

if (instance.isHealthy() && isUnhealthy(service, instance)) {

changeHealthyStatus(client, service, instance);

}

}

private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {

long beatTimeout = getTimeout(service, instance);

return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;

}

private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {

instance.setHealthy(false);

Loggers.EVT_LOG

.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),

instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,

instance.getLastHeartBeatTime());

NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));

NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));

NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),

service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(), instance.getPort(),

false, "client_beat"));

}

如果服务实例健康状态之前为true,检测当前是否健康通过判断(当前时间-实例最后一次心跳时间)是否大于心跳超时时间(默认15s)如果超心跳超时时间,设置服务实例为不健康

看下过期时间检测任务具体做类什么?

public class ExpiredInstanceChecker implements InstanceBeatChecker {

@Override

public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {

boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();

if (expireInstance && isExpireInstance(service, instance)) {

deleteIp(client, service, instance);

}

}

private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {

long deleteTimeout = getTimeout(service, instance);

// deleteTimeout默认30s

return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;

}

private void deleteIp(Client client, Service service, InstancePublishInfo instance) {

// 省略日志记录

// 移除该服务实例

client.removeServiceInstance(service);

// 省略事件发布

}

}

判断实例算法过去算法:(当前时间-最后一次实例心跳时间)> 删除超时时间(默认30s);如果判断实例过期,会移除该服务实例。

2.2 客户端发送心跳任务续约

示意图2.2-2如下所示:

Url: /v1/ns/instance ,匹配服务端InstanceController#beat()代码2.2-1如下所示:

@CanDistro

@PutMapping("/beat")

@Secured(action = ActionTypes.WRITE)

public ObjectNode beat(HttpServletRequest request) throws Exception {

ObjectNode result = JacksonUtils.createEmptyJsonNode();

result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);

RsInfo clientBeat = null;

if (StringUtils.isNotBlank(beat)) {

clientBeat = JacksonUtils.toObj(beat, RsInfo.class);

}

// 省略。。。获取信息

BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();

builder.setRequest(request);

int resultCode = getInstanceOperator()

.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);

result.put(CommonParams.CODE, resultCode);

result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,

getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));

result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());

return result;

}

我查看handleBeat() 方法,实际执行InstanceOperatorClientImpl#handleBeat()方法,代码如下:

@Override

public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,

RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {

Service service = getService(namespaceId, serviceName, true);

String clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);

IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);

if (null == client || !client.getAllPublishedService().contains(service)) {

if (null == clientBeat) {

return NamingResponseCode.RESOURCE_NOT_FOUND;

}

Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();

registerInstance(namespaceId, serviceName, instance);

client = (IpPortBasedClient) clientManager.getClient(clientId);

}

if (!ServiceManager.getInstance().containSingleton(service)) {

throw new NacosException(NacosException.SERVER_ERROR,

"service not found: " + serviceName + "@" + namespaceId);

}

if (null == clientBeat) {

clientBeat = new RsInfo();

clientBeat.setIp(ip);

clientBeat.setPort(port);

clientBeat.setCluster(cluster);

clientBeat.setServiceName(serviceName);

}

ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);

HealthCheckReactor.scheduleNow(beatProcessor);

client.setLastUpdatedTime();

return NamingResponseCode.OK;

}

初始第一次client==null,会创建客户端实例并注册HealthCheckReactor.scheduleNow(beatProcessor);会通过定时任务线程池执行ClientBeatProcessorV2类型的任务

下面我们来看下ClientBeatProcessorV2线程类型里面具体做了什么?

public void run() {

if (Loggers.EVT_LOG.isDebugEnabled()) {

Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());

}

String ip = rsInfo.getIp();

int port = rsInfo.getPort();

String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());

String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());

Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());

HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);

// 获取服务实例的IP端口与心跳传递的IP端口比较

if (instance.getIp().equals(ip) && instance.getPort() == port) {

if (Loggers.EVT_LOG.isDebugEnabled()) {

Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);

}

// 这里完成服务实例续约,即通过设置最后心跳时间

instance.setLastHeartBeatTime(System.currentTimeMillis());

if (!instance.isHealthy()) {

instance.setHealthy(true);

// 省略事件发布

}

}

}

通过心跳传递的IP和端口与当前nacos以发布的对应服务实例IP和端口比对,确定是哪个服务实例发送的心跳。上面学习中,我们知道心跳检查通过(当前时间-服务实例最后心跳时间与设置的时间比对)完成的,这里把最好心跳时间更新为当前时间,完成了服务实例的续约;如果之前因为网络延时等原因造成实例被设置为不健康,这里重新设置实例为健康状态。

2.3 服务实例移除

在#2.1中我们知道当检测任务检测到服务实例过期后,会移除该实例 ,看看具体做了什么,继续追踪下AbstractClient#removeServiceInstance()方法:

@Override

public InstancePublishInfo removeServiceInstance(Service service) {

InstancePublishInfo result = publishers.remove(service);

if (null != result) {

if (result instanceof BatchInstancePublishInfo) {

MetricsMonitor.decrementIpCountWithBatchRegister(result);

} else {

MetricsMonitor.decrementInstanceCount();

}

NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));

}

Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());

return result;

}

protected final ConcurrentHashMap publishers = new ConcurrentHashMap<>(16, 0.75f, 1);

publishers:nacos维护的缓存key为服务名,value为服务发布实例的缓存,类型为ConcurrentHashMap;服务实例移除就是冲当前服务实例缓存中移除该服务对应的服务实例。

2.4 心跳任务闭环

客户端根据服务的返回的心跳时间,执行新的定时任务。

public void run() {

if (beatInfo.isStopped()) {

return;

}

long result = serverProxy.sendBeat(beatInfo);

long nextTime = result > 0 ? result : beatInfo.getPeriod();

executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);

}

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

⭐️源代码仓库地址:https://gitee.com/gaogzhen/spring-cloud-study.git

参考地址:

[1]Nacos官网

[2]Nacos-服务端心跳机制

[3]Nacos客户端心跳续约

精彩内容

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。