在传统模式下,Dubbo消费端需要调用某一远程服务器端的方法时,消费端也需要额外导入服务类接口API,Dubbo也由此实现了面向接口代理的高性能RPC调用。


但是当服务消费端没有服务接口或方法参数类型时,无法使用上述的方式进行服务调用,针对该场景,Dubbo使用泛化调用方法进行服务调用。


Dubbo在进行泛化调用时,将相关信息封装到Map对象中,并利用GenericService接口处理。


举个例子

服务器端配置:


<bean id="helloserviceimpl" class="org.apache.dubbo.samples.generic.call.impl.HelloServiceImpl"/>


<dubbo:service interface="org.apache.dubbo.samples.generic.call.api.HelloService" ref="helloserviceimpl"/>

服务器端服务具体实现类:


public class HelloServiceImpl implements HelloService {

    @Override

    public CompletableFuture<String> sayHelloAsync(String name) {

        CompletableFuture<String> future = new CompletableFuture<>();

        new Thread(() -> {

            try {

                Thread.sleep(5000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            future.complete("sayHelloAsync: " + name);

        }).start();

        return future;

    }

消费端配置:


public class GenericCallConsumer {

    private static GenericService genericService;

    public static void main(String[] args) throws Exception {

        ApplicationConfig applicationConfig = new ApplicationConfig();

        applicationConfig.setName("generic-call-consumer");

        RegistryConfig registryConfig = new RegistryConfig();

        registryConfig.setAddress("zookeeper://127.0.0.1:2181");

        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();

    referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");

        applicationConfig.setRegistry(registryConfig);

        referenceConfig.setApplication(applicationConfig);

        // 开启泛化

        referenceConfig.setGeneric(true);

        referenceConfig.setAsync(true);

        referenceConfig.setTimeout(7000);

        genericService = referenceConfig.get();

        invokeAsyncSayHelloAsync();

public static void invokeAsyncSayHelloAsync() throws Exception {

        CompletableFuture<Object> future = genericService.$invokeAsync("sayHelloAsync",

                new String[]{"java.lang.String"}, new Object[]{"world"});

        CountDownLatch latch = new CountDownLatch(1);

        future.whenComplete((value, t) -> {

           System.err.println("invokeAsyncSayHelloAsync(whenComplete): " + value);

            latch.countDown();

        });

        latch.await();

}

原理分析

消费端通过一个代理对象进行服务调用,


1)执行InvokerInvocationHandler#invoke方法,


2)调用MockClusterInvoker#invoke方法


3)调用AbstractCluster$InterceptorInvokerNode#invoke方法


code1.png

这里新增了一个ClusterInterceptor,与Filter不同,它在一个特定的地址或invoker被选择之前的较外层执行逻辑,在服务发现之前拦截请求。


3.1)调用ConsumerContextClusterInterceptor#before方法。

从RpcContext设置invocation,并设置localAddress和invoker(默认FailoverClusterInvoker),清除RpcContext内部的SERVER_LOCAL上下文内容。


3.2)调用ClusterInterceptor#intercept方法


code2.png

默认调用FailoverClusterInvoker#doinvoke方法。


在该方法中从RegistryDirectory中获取invoker列表,然后获取负载均衡LoadBalance(默认RandomLoadBalance), 选择一个invoker,进行服务调用。


4)调用InvokerWrapper#invoke方法,之后执行一个Filter链


4.1)调用ConsumerContextFilter#invoke方法,


设置RpcContext中的invoker变量问当前invoker(默认是ProtocolFilterWrapper),设置invocation。


从RpcContext中获取timeout-countdown变量,如果存在,则转化为TimeCountDown对象,判断该调用是否超时,如果超时,则返回一个AsyncRpcResult对象,记录一个异常。


4.2)调用FutureFilter#invoke方法和MonitorFilter#invoke方法。


4.3)调用GenericImplFilter#invoke方法,这里是泛化调用在客户端的主要核心步骤。


4.3.1)从url中获取generic字段,调用方法不是$invoke,也不是$invokeAysnc时:


4.3.1.1)重新创建一个RpcInvocation,在attributes变量中添加GENERIC_IMPL_MARKER值,设置为true,其中:attributes变量参数类型为Map<Object,Object>,并且该变量只在调用者端,不会出现在线路上。


4.3.1.2)获取调用的方法名,调用的参数类型和参数值,对参数类型进行解析修改,效果如下:


java.lang.Object[][].class => "java.lang.Object[][]"

4.3.1.3)如果泛化调用方式为bean方式,遍历参数值,并序列化为JavaBeanDescriptor类型数据;


如果是其他调用方式,深入对象,将复杂类型转化为简单类型。


4.3.1.4)如果方法返回类型是CompletableFuture,则设置方法名为$invokeAysnc;其他情况设置方法名为$invoke


4.3.1.5)将参数类型设置为new Class<?>[]{String.class, String[].class, Object[].class};,这样转化为传统的泛化调用方式,并将参数值设置为类似new Object[]{methodName, types, args}的格式。


4.3.2)当调用方法为$invoke或者$invokeAysnc,并且方法参数变量数量为3个时,首先获取泛化参数,然后判断泛化调用方式:


4.3.2.1)如果是nativejava方式,判断参数是否为byte[]类型;如果不是,则说明参数传递异常。


4.3.2.2)如果是bean方式,则判断参数是否为JavaBeanDescriptor类型;如果不是,则说明参数传递异常。


4.3.3)在RpcInvocation中的attachment中设置是否泛化调用。


4.4) 调用invoker#invoke方法


当远程调用返回结果时,会触发onResponse方法。


从url中获取generic参数值,从invocation中获取方法名,方法参数类型,参数值GENERIC_IMPL是否存在。


如果参数值GENERIC_IMPL存在,并且为true:

从invoker中获取接口类型,当方法不是$invoke也不是$invokeAysnc,并且接口父类型为GenericService时,从invoker中的interface参数中获取真实的interface,并转化为Class类型。


之后,是所有不同调用方式的统一处理。


获取调用的方法Method,如果调用方式是Bean方式:

判断appReponse的value是否为JavaBeanDescriptor类型,如果是,将该value进行反序列化,重新赋值;如果不是,则抛出异常。


如果是其他调用方式,则使用PojoUtils工具类进行反序列化。


5)调用AsyncToSyncInvoker#invoke方法


6)调用DubboInvoker#invoke方法,发起远程调用。


服务端收到请求后,在最终调用AbstractProxyInvoker#invoke方法之前,会先执行一个过滤器链,和上述的消费端的类似,其中会经过一个GenericFilter,该类是服务端实现泛化调用功能的重要步骤。


在GenericFilter中,首先判断方法名是否为$invoke或者$invokeAsync,由此来判定是否为泛化调用,如果方法名不是这两个,则直接调用下一个invoker;如果是,则执行下面的逻辑。


获取参数名称、参数类型、参数值,通过反射获取调用方法,根据不同的调用方式进行反序列化,获取实际调用方法的相关信息,然后将RpcInvocation中的相关信息进行替换:


code3.png

Dubbo2.7下的超时机制

在上述中的ConsumerContextFilter#invoke中涉及到了超时情况的处理,使用了TimeoutCountDown类,是2020.5.1日提交的信息。


Dubbo2.6版本中,在HeaderExchangeChannel中进行远程调用前,会创建一个DefaultFuture对象,里面有一个静态代码块,创建一个线程,执行RemotingInvocationTimeoutScan任务,轮询FUTURES集合,通过DefaultFuture记录的开始时间与当前时间进行计算,判断是否超时,如果超时,则直接创建一个超时的Response,并将该DefaultFuture从FUTURES集合中移除。


当服务端在一定时间内执行完逻辑后,会发送给客户端,在此之前,客户端通过定时任务已经将相关信息从FUTURES集合中移除,所以这次服务端发送过来的信息在FUTURES集合中查找不到,所以不做处理,服务端的这次发送显得有些多余,对于客户端来说是无用的。


所以在2020.5.1日,提交了上述代码来解决这一问题。


code4.png

在DubboInvoker#doInvoke方法中进行远程远程调用前,会计算timeout。


code5.png

首先从RpcContext中获取timeout-countdown变量值


如果为空:

----》 从url中通过timeout参数获取超时时间,默认是1000,

----》从url中获取enable-timeout-countdown参数值,默认是false,通过该参数开启新的超时机制(使用了上述的TimeoutCountDown)

----》如果开启了,在attachments变量里添加_TO变量,值为计算后的timeout


如果不为空:

将其转化为 TimeoutCountDown对象,计算剩余的有效时间,将其设置为新的timeout,并将其添加到_TO变量,值为计算后的timeout。


code6.png

那TimeoutCountDown对象在什么时候被创建的呢?


在服务端的ContextFilter。


从RpcInvocation中获取“_TO”变量的值,如果不为-1,则在RpcContext中创建一个TimeoutCountDown。


code7.png

在后续的TimeoutFilter中,从RpcContext中获取TimeoutCountDown,如果超时了,则清空处理的结果。


code8.png

在消费端的ConsumerContextFilter中,在进行远程调用前,同样从RpcContext中获取TimeOutCountDown,当过期时,直接返回一个异常,而不再进行远程调用。


code10.png

疑问点

但是最后有一个疑问,即当超时后,服务端仍然会发送给客户端,虽然结果已经被清空,(可能自己的理解问题)。


code11.png

下面的这个建议感觉挺好的,但是在Dubbo在没有发现类似的机制。


链接:https://www.jianshu.com/p/9a3ea4ffb4e1