文章目录

@[TOC](文章目录)

前言概述二、问题原因分析三、问题解决总结

前言

最近在编写调度框架,构思引入一个类型为"远程服务调用"的调度任务,该任务,主要用于实现远程服务 + 微服务请求调用。任务内容,大体分为两部分,第一部分,通过读取调度配置信息,获取远程服务请求信息后,进行请求调用;另一部分,通过取号服务,获取分布式序号,唯一标识调度结果,并存储。 系统基于WebFlux搭建,于是选择Reactive Feign + WebClient,进行服务调用。

概述

基于该调度任务的定位,自然而然,引入了Reactive Feign + WebClient。于是,在代码中,分别配置Reactive Feign、WebClient.Builder、LoadBalanced注解标记的WebClient.Builder。其中将LoadBalanced注解标记的bean,添加了@Primary注解标记。

@Bean

@Primary

@LoadBalanced

public WebClient.Builder microServiceWebClient() {

return WebClient.builder();

}

@Bean

public WebClient.Builder normalWebClient() {

return WebClient.builder();

}

启动项目,并当调度任务,调度到基于微服务进行请求的远程服务时,灾难的一刻发生了,请求出现如下错误。

reactor.core.Exceptions$ErrorCallbackNotImplemented: feign.FeignException$ServiceUnavailable: [503 Service Unavailable] during [GET] to [http://192.168.0.101:10001/sequence/distributeSequenceController/id] [SequenceFeign#distributeSequence()]: [LoadBalancer does not contain an instance for the service 192.168.0.101]

Caused by: feign.FeignException$ServiceUnavailable: [503 Service Unavailable] during [GET] to [http://192.168.0.101:10001/sequence/distributeSequenceController/id] [SequenceFeign#distributeSequence()]: [LoadBalancer does not contain an instance for the service 192.168.0.101]

at feign.FeignException.serverErrorStatus(FeignException.java:256) ~[feign-core-11.8.jar:na]

at feign.FeignException.errorStatus(FeignException.java:197) ~[feign-core-11.8.jar:na]

at feign.FeignException.errorStatus(FeignException.java:185) ~[feign-core-11.8.jar:na]

at feign.codec.ErrorDecoder$Default.decode(ErrorDecoder.java:92) ~[feign-core-11.8.jar:na]

at reactivefeign.client.statushandler.ReactiveStatusHandlers$1.lambda$decode$0(ReactiveStatusHandlers.java:49) ~[feign-reactor-core-3.2.6.jar:na]

at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoCollect$CollectSubscriber.onSubscribe(MonoCollect.java:104) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:98) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:44) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onComplete(FluxDematerialize.java:121) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:91) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:44) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:340) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:227) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.request(FluxDematerialize.java:127) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onSubscribe(FluxDematerialize.java:77) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Mono.subscribe(Mono.java:4455) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:132) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:187) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxMaterialize$MaterializeSubscriber.onComplete(FluxMaterialize.java:141) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxTake$TakeSubscriber.onComplete(FluxTake.java:153) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxTake$TakeSubscriber.onNext(FluxTake.java:133) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:180) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129) ~[reactor-core-3.4.24.jar:3.4.24]

at org.springframework.cloud.commons.publisher.FluxFirstNonEmptyEmitting$FirstNonEmptyEmittingSubscriber.onComplete(FluxFirstNonEmptyEmitting.java:325) ~[spring-cloud-commons-3.1.4.jar:3.1.4]

at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onComplete(FluxSubscribeOn.java:166) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:148) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Flux.subscribe(Flux.java:8526) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:200) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.Flux.subscribe(Flux.java:8512) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:200) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:76) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.4.24.jar:3.4.24]

at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.4.24.jar:3.4.24]

at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]

at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]

这篇博文,主要通过解决这个异常,来解密Reactive Feign底层调用原理。

二、问题原因分析

上述异常,是使用如下Feign,进行服务调用时,抛出来的。而且服务实例名称为"distribute-sequence",而不是"192.168.0.101",而且,这个明明是服务实例的部署地址。

@ReactiveFeignClient("distribute-sequence")

public interface DistributeSequenceFeignClient {

@GetMapping("sequence/distributeSequenceController/id/{number}")

Mono> createSequenceId(@PathVariable("number") Integer number);

}

那么,就有了一个问题,上述Feign调用时,内部经历了什么,导致服务实例名称,被转换成服务真实地址后,再进行微服务调用?仔细分析一后,大体会形成如下猜想

Feign执行期间,经历两次,首先通过负载均衡算法,获取真实请求地址。也就是将distribute-sequence,得到了192.168.0.101:8081,这个服务实例地址。 然后,在尝试通过192.168.0.101,进行负载均衡算法,获取服务实例。

那么,为什么只需要一次负载均衡算法,就能得到服务实例的地址,在这里,会出现两次?通过阅读代码,发现了如下代码片段(为了不打乱问题分析思路,会在后续博文中,介绍发现过程)

// LoadBalancerPublisherClient.java类内容

private final ReactiveLoadBalancer reactiveLoadBalancer;

@Override

public Publisher executeRequest(ReactiveHttpRequest request) {

return Mono.from(reactiveLoadBalancer.choose())

.flatMapMany(serviceInstanceResponse -> {

URI lbUrl = reconstructURI(serviceInstanceResponse.getServer(), request.uri());

ReactiveHttpRequest lbRequest = new ReactiveHttpRequest(request, lbUrl);

return publisherClient.executeRequest(lbRequest);

});

}

// MonoPublisherHttpClient.java内容

@Override

public Mono executeRequest(ReactiveHttpRequest request) {

Mono response = Mono.defer(() -> reactiveHttpClient.executeRequest(request));

return response.flatMap(resp -> Mono.from(resp.body()));

}

// WebReactiveHttpClient

private final WebClient webClient;

@Override

public Mono> executeRequest(ReactiveHttpRequest request) {

return webClient.method(HttpMethod.valueOf(request.method()))

.uri(request.uri())

.headers(httpHeaders -> setUpHeaders(request, httpHeaders))

.body(provideBody(request))

.exchange()

.onErrorMap(ex -> {

Throwable errorMapped = errorMapper.apply(request, ex);

if(errorMapped != null){

return errorMapped;

} else {

return new ReactiveFeignException(ex, request);

}

})

.map(response -> toReactiveHttpResponse(request, response));

}

上述源码内容,有点跳跃,这里坐下简短介绍。

Feign,基于反射机制,通过LoadBalancerPublisherClient.executeRequest进行请求调用。其内部,先通过ReactiveLoadBalancer,进行负载均衡,选择服务实例。然后,通过WebReactiveHttpClient,进行请求调用,并将响应结果,通过MonoPublisherHttpClient,包装后,返回。

通过观察WebReactiveHttpClient,发现其内部,通过WebClient进行服务请求调用。到这里,上述的问题,就可以解答了。

Reactive,首先选择负载均衡算法,得到服务实例后,在通过WebClient,进行服务请求调用。而在上述配置中,由于LoadBanlanced注解标记的WebClient.Builder,使用了Primary注解标记,导致这里的WebClient,也会进行一次负载均衡算法后,在进行服务调用。从而导致,仅需一次负载均衡算法的服务调用,现在变成了两次,从而出现如上错误。

三、问题解决

通过上述分析,不难发现,第一次reactiveLoadBalancer.choose()负载均衡算法,无法改变。现在只能改变第二次的webclient服务调用,让其在请求调用时,不再负载均衡,那么上述问题,就能迎刃而解。现在只需要,将WebClient.Builder配置,进行一个简单修改,将普通的WebClient.Builder,设置成Primary即可。

@Bean

@LoadBalanced

public WebClient.Builder microServiceWebClient() {

return WebClient.builder();

}

@Bean

@Primary

public WebClient.Builder normalWebClient() {

return WebClient.builder();

}

总结

Reactive Feign进行请求调用时,其内部,首先通过ReactiveLoadBalancer,选择服务实例,然后,在通过WebClient进行服务调用。那么,针对需要在运行时,才能确定服务调用地址时,那么就不大方便通过Feign定义请求接口。此时,需要借助WebClient,进行服务调用。此时,不配置使用LoadBalanced注解标记的WebClient.Builder,或者将普通的WebClient.Builder Bean,设置为Primary。

文章来源

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。

发表评论

返回顶部暗黑模式