您当前的位置: 首页 >  dubbo

庄小焱

暂无认证

  • 3浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Dubbo——服务发现(consume)原理与源码解析

庄小焱 发布时间:2021-04-02 08:37:01 ,浏览量:3

摘要

带着大家再来看看 Dubbo 服务引入全流程。同时对dubbo消费者的相关的源码分析。

服务发现大致原理

Provider将注册到注册中心, Consumer操作从注册中心得知 Provider 的信息,然后自己封装一个调用类和Provider 进行深入地交流。而之前已经提到在 Dubbo中一个可执行体就是 Invoker,所有调用都要向 Invoker 靠拢,因此可以推断出应该要先生成一个 Invoker,然后又因为框架需要往不侵入业务代码的方向发展,那我们的 Consumer 需要无感知的调用远程接口,因此需要搞个代理类,包装一下屏蔽底层的细节。整体大致流程如下:

服务发现开端

服务的引入和服务的暴露一样,也是通过 spring 自定义标签机制解析生成对应的 Bean,Provider Service 对应解析的是 ServiceBean 而 Consumer Reference 对应的是 ReferenceBean。

 在 Spring 容器刷新完成之后开始暴露,而服务的引入时机有两种,第一种是饿汉式,第二种是懒汉式。饿汉式是通过实现 Spring 的InitializingBean接口中的 afterPropertiesSet 方法,容器通过调用 ReferenceBean afterPropertiesSet 方法时引入服务。懒汉式是只有当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入。默认情况下,Dubbo 使用懒汉式引入服务,如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。

BeanFactory 、FactoryBean、ObjectFactory

BeanFactory 其实就是 IOC 容器,有多种实现类我就不分析了,简单的说就是 Spring 里面的 Bean 都归它管,而FactoryBean也是 Bean 所以说也是归 BeanFactory 管理的。

FactoryBean 到底是个什么 Bean 呢?它其实就是把你真实想要的 Bean 封装了一层,在真正要获取这个 Bean 的时候容器会调用 FactoryBean#getObject() 方法,而在这个方法里面你可以进行一些复杂的组装操作。这个方法就封装了真实想要的对象复杂的创建过程。使用 FactoryBean 封装了一层,屏蔽了底层创建的细节,便于 Bean 的使用。

ObjectFactory 这个是用于延迟查找的场景,它就是一个普通工厂,当得到 ObjectFactory 对象时,相当于 Bean 没有被创建,只有当 getObject() 方法时,才会触发 Bean 实例化等生命周期。

主要用于暂时性地获取某个 Bean Holder 对象,如果过早的加载,可能会引起一些意外的情况,比如当 Bean A 依赖 Bean B 时,如果过早地初始化 A,那么 B 里面的状态可能是中间状态,这时候使用 A 容易导致一些错误。(循环依赖的错误)。

总结的说 BeanFactory 就是 IOC 容器,FactoryBean 是特殊的 Bean, 用来封装创建比较复杂的对象,而 ObjectFactory 主要用于延迟查找的场景,延迟实例化对象。

服务发现的三种方式

服务的引入又分为了三种,第一种是本地引入、第二种是直接连接引入远程服务、第三种是通过注册中心引入远程服务。

本地服务发现

本地引入不知道大家是否还有印象,之前服务暴露的流程每个服务都会通过搞一个本地暴露,走 injvm 协议(当然你要是 scope = remote 就没本地引用了),因为存在一个服务端既是 Provider 又是 Consumer 的情况,然后有可能自己会调用自己的服务,因此就弄了一个本地引入,这样就避免了远程网络调用的开销。所以服务引入会先去本地缓存找找看有没有本地服务。

远程服务发现

这个其实就是平日测试的情况下用用,不需要启动注册中心,由 Consumer 直接配置写死Provider 的地址,然后直连即可。

注册中心发现

Consumer 通过注册中心得知 Provider 的相关信息,然后进行服务的引入,这里还包括多注册中心,同一个服务多个提供者的情况,如何抉择如何封装,如何进行负载均衡、容错并且让使用者无感知。

服务发现源码解析

默认是懒汉式的,所以服务引入的入口就是 ReferenceBean 的 getObject 方法。

    @Override
    public Object getObject() {
        //懒汉式的
        return get();
    }
    public synchronized T get() {
        //懒汉式的
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
init()源码
 /**
     * @description  大部分就是检查配置然后将配置构建成 map  主要的方式的是 ref = createProxy(map);
      * @param:
     * @date: 2021/12/11 17:34
     * @return: void
     * @author: xjl
    */
    public synchronized void init() {
        ******** 省略代码

        serviceMetadata.getAttachments().putAll(map);
        /**从名字可以得到就是要创建的一个代理 */
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        checkInvokerAvailable();

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }
createProxy()源码分析
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map map) {
        if (shouldJvmRefer(map)) {
            //如果是走本地的话,那么直接构建个走本地协议的 URL 然后进行服务的引入
            //  LOCAL_PROTOCOL: injvm , LOCALHOST_VALUE: 127.0.0.1
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            //如果配置里面设置了url ,那要么是点对点直连,要么是配置中心地址,都经过处理加入到urls中
            urls.clear();
            if (url != null && url.length() > 0) {
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        //得到配置的url,进行循环
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (UrlUtils.isRegistry(url)) {
                            //如果是注册中心地址将 map转换为查询字符串,并作为 refer 参数的值添加到 url中
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            //如果是点对点会合并url,移除服务提供者的一些配置
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
                // 然后就是没配置 url 的情况  如果配置了url 那么不是直连的地址,到这里肯定走的就是注册中心引入远程服务了。
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    checkRegistry();
                    List us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException(
                                "No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
                                        " use dubbo version " + Version.getVersion() +
                                        ", please config  to your spring config.");
                    }
                }
            }

            if (urls.size() == 1) {
                //如果只有一个URL直接转换成invoker
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                List>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 多个url挨个转换成invoker
                    Invoker referInvoker = REF_PROTOCOL.refer(interfaceClass, url);
                    if (shouldCheck()) {
                        if (referInvoker.isAvailable()) {
                            invokers.add(referInvoker);
                        } else {
                            referInvoker.destroy();
                        }
                    } else {
                        invokers.add(referInvoker);
                    }

                    if (UrlUtils.isRegistry(url)) {
                        //用最后一个注册中心的地址
                        registryURL = url; // use last registry url
                    }
                }

                if (shouldCheck() && invokers.size() == 0) {
                    throw new IllegalStateException("Failed to check the status of the service "
                            + interfaceName
                            + ". No provider available for the service "
                            + (group == null ? "" : group + "/")
                            + interfaceName +
                            (version == null ? "" : ":" + version)
                            + " from the multi registry cluster"
                            + " use dubbo version " + Version.getVersion());
                }

                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    // //创建StaticDirectory实例,并由 Cluster对多个Invoker 进行合并,只暴露出一个 invoker便于调用
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                } else { // not a registry url, must be direct invoke.
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ?
                            (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
                                    Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                }
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }

        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        MetadataUtils.publishServiceDefinition(consumerURL);

        // create service proxy
        // 通过代理封装invoker返回代理
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

  • 这其实就是整个流程了,简述一下就是先检查配置,通过配置构建一个 map ,然后利用 map 来构建 URL再通过 URL 上的协议利用自适应扩展机制调用对应的 protocol.refer 得到相应的 invoker 。
  • 在有多个 URL 的时候,先遍历构建出 invoker 然后再由 StaticDirectory 封装一下,然后通过 cluster 进行合并,只暴露出一个 invoker 。
  • 然后再构建代理,封装 invoker 返回服务引用,之后 Comsumer 调用的就是这个代理类。
RegistryProtocol.refer()源码
 @Override
    @SuppressWarnings("unchecked")
    public  Invoker refer(Class type, URL url) throws RpcException {
        //取registry参数值,并将其设置为协议头
        url = getRegistryUrl(url);
        // 获取中心实例
        Registry registry = getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"  将参数转为map 
        Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            // 如果分group 的话
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
            }
        }

        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url, qs);
    }
RegistryProtocol.doCreateInvoker()源码

主要就是获取注册中心实例,然后调用doCreateInvoker()进行真正的 refer。

 protected  ClusterInvoker doCreateInvoker(DynamicDirectory directory, Cluster cluster, Registry registry, Class type) {
        // 设置注册中心实例
        directory.setRegistry(registry);
        // 设置动态生成的protocol $Adaptive
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map parameters = new HashMap(directory.getConsumerUrl().getParameters());
        // 生成服务者消费链接
        URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(urlToRegistry);
            // 向注册中心 注册消费者 在consumer 目录下创建新节点
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(urlToRegistry);
        //再订阅注册中心的 providers目录、 configurators目录和routers目录,订阅好了之后就会触发 DubboProtocol的refer方法.
        directory.subscribe(toSubscribeUrl(urlToRegistry));
        //利用cluser封装direcotry其实就是封装多个invoker
        return (ClusterInvoker) cluster.join(directory);
    }

这个方法很关键,可以看到生成了 RegistryDirectory 这个 directory 塞了注册中心实例,它自身也实现了 NotifyListener 接口,因此注册中心的监听其实是靠这家伙来处理的。

然后向注册中心注册自身的信息,并且向注册中心订阅了 providers 节点、 configurators 节点 和 routers 节点,订阅了之后 RegistryDirectory 会收到这几个节点下的信息,就会触发 DubboInvoker 的生成了,即用于远程调用的 Invoker。

然后通过 cluster 再包装一下得到 Invoker,因此一个服务可能有多个提供者,最终在 ProviderConsumerRegTable 中记录这些信息,然后返回 Invoker。

    protected  Invoker interceptInvoker(ClusterInvoker invoker, URL url, URL consumerUrl) {
        List listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, invoker, consumerUrl);
        }
        return invoker;
    }

终于我们从注册中心拿到远程 Provider 的信息了,然后进行服务的引入。

 private ExchangeClient[] getClients(URL url) {
        // 是否共享连接
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        // 如果没有配置,连接是共享的,否则,一个服务一个连接
        if (connections == 0) {
            /*
             * xml 配置应该比属性具有更高的优先级。
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            return getSharedClient(url, connections).toArray(new ExchangeClient[0]);
        } else {
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                //得到初始化的新的客户端
                clients[i] = initClient(url);
            }
            return clients;
        }

    }

这里的重点在 getClients,因为终究是要跟远程服务进行网络调用的,而 getClients 就是用于获取客户端实例,实例类型为 ExchangeClient,底层依赖 Netty 来进行网络通信,并且可以看到默认是共享连接。

然后最终得到的 Invoker 就是这个样子,可以看到记录的很多信息,基本上该有的都有了,我这里走的是对应的服务只有一个 url 的情况,多个url 无非也是利用 directorycluster封装一层。

相信分析下来整个流程不难的,总结地说无非就是通过配置组成 URL ,然后通过自适应得到对于的实现类进行服务引入,如果是注册中心那么会向注册中心注册自己的信息,然后订阅注册中心相关信息,得到远程 provider 的 ip 等信息,再通过netty客户端进行连接。

并且通过directorycluster 进行底层多个服务提供者的屏蔽、容错和负载均衡等,最终得到封装好的 invoker 再通过动态代理封装得到代理类,让接口调用者无感知的调用方法。

博文参考

Dubbo源码解读与实战-完

《Dubbo系列》-Dubbo服务引入过程 - 掘金

关注
打赏
1657692713
查看更多评论
立即登录/注册

微信扫码登录

0.1758s