微服务框架

【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】

微服务面试篇

文章目录

微服务框架微服务面试篇54 微服务篇54.6 Nacos与Eureka的区别有哪些?【服务发现】54.6.1 Nacos 的服务拉取和订阅 机制

54 微服务篇

54.6 Nacos与Eureka的区别有哪些?【服务发现】

54.6.1 Nacos 的服务拉取和订阅 机制

服务发现:Nacos支持定时拉取和订阅推送两种模式;Eureka只支持定时拉取模式

【Nacos 的服务拉取 和订阅机制】

先看看Nacos 官方API 文档【查询实例 列表】

【描述】

查询服务下的实例列表

【请求类型】

GET

【请求路径】

/nacos/v1/ns/instance/list

【请求参数】

名称类型是否必选描述serviceName字符串是服务名groupName字符串否分组名namespaceId字符串否命名空间IDclusters字符串,多个集群用逗号分隔否集群名称healthyOnlyboolean否,默认为false是否只返回健康实例

现在 Nacos的 服务端肯定需要有一个 controller 来接收这个请求并 返回实例列表

作为消费者【客户端】就应该向 这个服务端 发送这个请求来获取 实例

【问题】

我们的微服务( 比如order-service ),它是什么时候来做这个拉取的 呢?

因为我们的微服务 都是基于Ribbon 来做的远程 调用和负载均衡,所以,服务拉取的动作也是Ribbon 去做的

进到一个 DynamicServerListLoadBalancer类, 动态 列表服务均衡器

它在它的构造函数 中就会执行 一系列的初始化动作

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList serverList, ServerListFilter filter, ServerListUpdater serverListUpdater) {

super(clientConfig, rule, ping);

this.isSecure = false;

this.useTunnel = false;

this.serverListUpdateInProgress = new AtomicBoolean(false);

this.updateAction = new NamelessClass_1();

this.serverListImpl = serverList;

this.filter = filter;

this.serverListUpdater = serverListUpdater;

if (filter instanceof AbstractServerListFilter) {

((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());

}

this.restOfInit(clientConfig); // 注意这个【基于rest 请求的初始化】【即基于rest 请求拉取服务列表】

}

跟进这个方法

再跟入 updateListOfServers 这个方法

@VisibleForTesting

public void updateListOfServers() {

List servers = new ArrayList();

if (this.serverListImpl != null) {

servers = this.serverListImpl.getUpdatedListOfServers(); //[核心]

LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);

if (this.filter != null) {

servers = this.filter.getFilteredListOfServers((List)servers);

LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);

}

}

this.updateAllServerList((List)servers);

}

跟入 getUpdatedListOfServers 方法

@Override

public List getUpdatedListOfServers() {

return getServers();

}

getServers 方法就在下面

【是真的复杂,笔者看到这儿,完全不知道怎么做笔记了 …】

直接回到 服务端 的list 接口 了

*/

@GetMapping("/list")

@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)

public ObjectNode list(HttpServletRequest request) throws Exception {

// 解析request 中的namespaceId、serviceName

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

NamingUtils.checkServiceNameFormat(serviceName);

String agent = WebUtils.getUserAgent(request);

// 获取集群信息

String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);

// 客户端【消费者】的IP 和udp 端口

String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);

int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));

String env = WebUtils.optional(request, "env", StringUtils.EMPTY);

boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,

healthyOnly);

}

再进到 doSrvIpxt 方法

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,

int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

ClientInfo clientInfo = new ClientInfo(agent);

// 创建空的json 对象,作为result【将来的 】

ObjectNode result = JacksonUtils.createEmptyJsonNode();

// 从注册表 尝试获取服务

Service service = serviceManager.getService(namespaceId, serviceName);

long cacheMillis = switchDomain.getDefaultCacheMillis();

// now try to enable the push

try {

if (udpPort > 0 && pushService.canEnablePush(agent)) {

// UDP服务端,记录客户端的IP、端口、要监听的服务信息

pushService

.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),

pushDataSource, tid, app);

cacheMillis = switchDomain.getPushCacheMillis(serviceName);

}

} catch (Exception e) {

Loggers.SRV_LOG

.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);

cacheMillis = switchDomain.getDefaultCacheMillis();

}

// 【封装最终结果】

if (service == null) {

if (Loggers.SRV_LOG.isDebugEnabled()) {

Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);

}

result.put("name", serviceName);

result.put("clusters", clusters);

result.put("cacheMillis", cacheMillis);

result.replace("hosts", JacksonUtils.createEmptyArrayNode());

return result;

}

checkIfDisabled(service);

List srvedIPs;

srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

// filter ips using selector:

if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {

srvedIPs = service.getSelector().select(clientIP, srvedIPs);

}

if (CollectionUtils.isEmpty(srvedIPs)) {

if (Loggers.SRV_LOG.isDebugEnabled()) {

Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);

}

if (clientInfo.type == ClientInfo.ClientType.JAVA

&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {

result.put("dom", serviceName);

} else {

result.put("dom", NamingUtils.getServiceName(serviceName));

}

result.put("name", serviceName);

result.put("cacheMillis", cacheMillis);

result.put("lastRefTime", System.currentTimeMillis());

result.put("checksum", service.getChecksum());

result.put("useSpecifiedURL", false);

result.put("clusters", clusters);

result.put("env", env);

result.set("hosts", JacksonUtils.createEmptyArrayNode());

result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));

return result;

}

Map> ipMap = new HashMap<>(2);

ipMap.put(Boolean.TRUE, new ArrayList<>());

ipMap.put(Boolean.FALSE, new ArrayList<>());

for (Instance ip : srvedIPs) {

ipMap.get(ip.isHealthy()).add(ip);

}

if (isCheck) {

result.put("reachProtectThreshold", false);

}

double threshold = service.getProtectThreshold();

if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);

if (isCheck) {

result.put("reachProtectThreshold", true);

}

ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));

ipMap.get(Boolean.FALSE).clear();

}

if (isCheck) {

result.put("protectThreshold", service.getProtectThreshold());

result.put("reachLocalSiteCallThreshold", false);

return JacksonUtils.createEmptyJsonNode();

}

ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

for (Map.Entry> entry : ipMap.entrySet()) {

List ips = entry.getValue();

if (healthyOnly && !entry.getKey()) {

continue;

}

for (Instance instance : ips) {

// remove disabled instance:

if (!instance.isEnabled()) {

continue;

}

ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

ipObj.put("ip", instance.getIp());

ipObj.put("port", instance.getPort());

// deprecated since nacos 1.0.0:

ipObj.put("valid", entry.getKey());

ipObj.put("healthy", entry.getKey());

ipObj.put("marked", instance.isMarked());

ipObj.put("instanceId", instance.getInstanceId());

ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));

ipObj.put("enabled", instance.isEnabled());

ipObj.put("weight", instance.getWeight());

ipObj.put("clusterName", instance.getClusterName());

if (clientInfo.type == ClientInfo.ClientType.JAVA

&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {

ipObj.put("serviceName", instance.getServiceName());

} else {

ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));

}

ipObj.put("ephemeral", instance.isEphemeral());

hosts.add(ipObj);

}

}

result.replace("hosts", hosts);

if (clientInfo.type == ClientInfo.ClientType.JAVA

&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {

result.put("dom", serviceName);

} else {

result.put("dom", NamingUtils.getServiceName(serviceName));

}

result.put("name", serviceName);

result.put("cacheMillis", cacheMillis);

result.put("lastRefTime", System.currentTimeMillis());

result.put("checksum", service.getChecksum());

result.put("useSpecifiedURL", false);

result.put("clusters", clusters);

result.put("env", env);

result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));

return result;

}

再进到,PushService

里面有一段静态代码块

static {

try {

udpSocket = new DatagramSocket();

Receiver receiver = new Receiver();

Thread inThread = new Thread(receiver);

inThread.setDaemon(true);

inThread.setName("com.alibaba.nacos.naming.push.receiver");

inThread.start();

GlobalExecutor.scheduleRetransmitter(() -> {

try {

// 定时移除已经 断开的客户端

removeClientIfZombie();

} catch (Throwable e) {

Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");

}

}, 0, 20, TimeUnit.SECONDS);

} catch (SocketException e) {

Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");

}

}

再看到 “监听自己” 的 onApplicationEvent方法

@Override

public void onApplicationEvent(ServiceChangeEvent event) {

// 得到发生变化 的服务

Service service = event.getService();

//得到 服务名称

String serviceName = service.getName();

// 得到 namespaceId

String namespaceId = service.getNamespaceId();

// 用线程池 异步执行任务、发送服务最新数据给 所有的监听者

Future future = GlobalExecutor.scheduleUdpSender(() -> {

try {

Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");

// 得到监听当前服务 【namespaceId, serviceName】 的所有的消费者的 客户端 的PushClient

ConcurrentMap clients = clientMap

.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));

if (MapUtils.isEmpty(clients)) {

return;

}

Map cache = new HashMap<>(16);

long lastRefTime = System.nanoTime();

for (PushClient client : clients.values()) {

// 遍历所有的 PushClient

if (client.zombie()) {

// 挂掉 的PushClient,直接移除

Loggers.PUSH.debug("client is zombie: " + client.toString());

clients.remove(client.toString());

Loggers.PUSH.debug("client is zombie: " + client.toString());

continue;

}

// 准备消息

Receiver.AckEntry ackEntry;

Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());

String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());

byte[] compressData = null;

Map data = null;

if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {

org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);

compressData = (byte[]) (pair.getValue0());

data = (Map) pair.getValue1();

Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());

}

if (compressData != null) {

ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);

} else {

ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);

if (ackEntry != null) {

cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));

}

}

Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",

client.getServiceName(), client.getAddrStr(), client.getAgent(),

(ackEntry == null ? null : ackEntry.key));

// 利用UDP 套接字,发送 服务信息

udpPush(ackEntry);

}

} catch (Exception e) {

Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

} finally {

futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));

}

}, 1000, TimeUnit.MILLISECONDS);

futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

OK,再次回答 我们 的问题

问题说明:考察对Nacos、Eureka的底层实现的掌握情况

难易程度:难

参考话术:

Nacos与Eureka有相同点,也有不同之处,可以从以下几点来描述:

接口方式:Nacos与Eureka都对外暴露了Rest风格的API接口,用来实现服务注册、发现等功能实例类型:Nacos的实例有永久和临时实例之分;而Eureka只支持临时实例健康检测:Nacos对临时实例采用心跳模式检测,对永久实例采用主动请求来检测;Eureka只支持心跳模式服务发现:Nacos支持定时拉取和订阅推送两种模式;Eureka只支持定时拉取模式【这次说了 这个】

Nacos的服务发现分为两种模式:

模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。

与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。

好文阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。