Dubbo源碼分析(五)服務暴露的具體流程(下)

1、服務暴露

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
	String key = getCacheKey(originInvoker);
	//首先嚐試從緩存中獲取
	ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
	if (exporter == null) {
		synchronized (bounds) {
			exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
			if (exporter == null) {
				//從export中拿到以前的url 即爲dubbo協議的url
				//建立 Invoker 爲委託類對象
				final Invoker<?> invokerDelegete = new InvokerDelegete<T>(
											originInvoker, getProviderUrl(originInvoker));
				exporter = new ExporterChangeableWrapper<T>((Exporter<T>) 
										protocol.export(invokerDelegete), originInvoker);
				//寫入緩存
				bounds.put(key, exporter);
			}
		}
	}
	return exporter;
}
複製代碼

如上代碼,它先嚐試從緩存中獲取,若是沒有則調用protocol.export去暴露。apache

在這裏的protocol對象實際上是一個自適應擴展類對象Protocol$Adaptive,咱們調用它的export方法,它會根據協議名稱獲取對應的擴展實現類,在這裏它是DubboProtocolbootstrap

不知諸位是否還有印象,咱們在第二章節已經說過。經過ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);這句代碼獲取到的實際上是Wrapper包裝類的對象,ProtocolListenerWrapper緩存

一、服務暴露監聽

ProtocolListenerWrapper.export方法主要是獲取服務暴露監聽器,在服務暴露和取消服務暴露時能夠得到通知。bash

public class ProtocolListenerWrapper implements Protocol {

	public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
		if ("registry".equals(invoker.getUrl().getProtocol())) {
			return protocol.export(invoker);
		}
		//獲取ExporterListener類型的擴展點加載器
		ExtensionLoader<ExporterListener> extensionLoader =
                    ExtensionLoader.getExtensionLoader(ExporterListener.class);		
		//獲取監聽器
		List<ExporterListener> activateExtension = extensionLoader.
                    getActivateExtension(invoker.getUrl(), "exporter.listener");
					
		//調用ProtocolFilterWrapper.export繼續暴露
		Exporter<T> export = protocol.export(invoker);
        List<ExporterListener> exporterListeners = 
						Collections.unmodifiableList(activateExtension);
		
		//循環監聽器 通知方法。返回ListenerExporterWrapper對象
        ListenerExporterWrapper<T> listenerExporterWrapper = 
					new ListenerExporterWrapper<>(export, exporterListeners);
        return listenerExporterWrapper;
	}	
}
複製代碼

好比,咱們能夠建立一個自定義的監聽器。服務器

public class MyExporterListener1 implements ExporterListener {
    public void exported(Exporter<?> exporter) throws RpcException {
        System.out.println("111111111111111-------服務暴露");
    }
    public void unexported(Exporter<?> exporter) {
        System.out.println("111111111111111-------取消服務暴露");
    }
}
複製代碼

而後建立擴展點配置文件,文件名稱爲: org.apache.dubbo.rpc.ExporterListener 內容爲: listener1=org.apache.dubbo.demo.provider.MyExporterListener1網絡

而後在Dubbo配置文件中,這樣定義: <dubbo:provider listener="listener1" />app

那麼,當服務暴露完成後,你將會得到通知。異步

二、構建調用鏈

上一步在ProtocolListenerWrapper.export方法中,返回以前還調用了ProtocolFilterWrapper.export。它主要是爲了建立包含各類Filter的調用鏈。ide

public class ProtocolFilterWrapper implements Protocol {	
	public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
		//建立Filter 過濾鏈的 Invoker
        Invoker<T> tInvoker = buildInvokerChain(invoker, "service.filter","provider");
		//調用DubboProtocol繼續暴露
        Exporter<T> export = protocol.export(tInvoker);
		//返回
        return export;
    }
}
複製代碼

這裏的重點是buildInvokerChain方法,它來建立調用鏈攔截器。每次遠程方法執行,該攔截都會被執行,在Dubbo中已知的Filter有函數

org.apache.dubbo.rpc.filter.EchoFilter
org.apache.dubbo.rpc.filter.GenericFilter
org.apache.dubbo.rpc.filter.GenericImplFilter
org.apache.dubbo.rpc.filter.TokenFilter
org.apache.dubbo.rpc.filter.AccessLogFilter
org.apache.dubbo.rpc.filter.CountFilter
org.apache.dubbo.rpc.filter.ActiveLimitFilter
org.apache.dubbo.rpc.filter.ClassLoaderFilter
org.apache.dubbo.rpc.filter.ContextFilter
org.apache.dubbo.rpc.filter.ConsumerContextFilter
org.apache.dubbo.rpc.filter.ExceptionFilter
org.apache.dubbo.rpc.filter.ExecuteLimitFilter
org.apache.dubbo.rpc.filter.DeprecatedFilter
複製代碼

此時的invoker通過各類Filter的包裝,就變成了下面這個樣子:

帶有Filter的調用鏈

固然了,咱們也能夠自定義Filter。好比像下面這樣:

public class MyFilter1 implements Filter {
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        System.out.println("調用以前:"+invoker.getUrl().toFullString());
        Result result = invoker.invoke(invocation);
        System.out.println("調用以後:"+invoker.getUrl().toFullString());
        return result;
    }
}
複製代碼

而後建立擴展點配置文件,文件名稱爲: resources\META-INF\dubbo\com.alibaba.dubbo.rpc.Filter 內容爲: myfilter1=org.apache.dubbo.demo.provider.MyFilter1

而後在Dubbo配置文件中,這樣定義: <dubbo:provider filter="myfilter1"/>

須要注意的是,這樣配置以後,myfilter1會在默認的Filter以後。若是你但願在默認的Filter前面,那麼你能夠這樣配置<dubbo:provider filter="myfilter1,default"/>

三、DubboProtocol

通過上面各類的搞來搞去,終於能夠真正的暴露服務了。調用DubboProtocol.export,咱們重點兩部分:建立DubboExporter和啓動服務器。

public class DubboProtocol extends AbstractProtocol {
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        //服務標識
		//例如:com.viewscenes.netsupervisor.service.InfoUserService:20880
        String key = serviceKey(url);
		//建立 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
		//將 <key, exporter> 鍵值對放入緩存中
        exporterMap.put(key, exporter);
		//省略無關代碼...
		// 啓動通訊服務器
        openServer(url);
		//優化序列化
        optimizeSerialization(url);
        return exporter;
    }
}
複製代碼

3.一、建立DubboExporter

事實上,建立DubboExporter的過程很是簡單,就是調用構造函數賦值而已。

public class DubboExporter<T> extends AbstractExporter<T> {
	public DubboExporter(Invoker<T> invoker, String key, 
				Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
    }
}
複製代碼

3.二、啓動服務器

private void openServer(URL url) {
	//獲取IP:端口 ,並將它做爲服務器實例的key
	String key = url.getAddress();
	boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
	if (isServer) {
		//先從緩存中獲取
		ExchangeServer server = serverMap.get(key);
		if (server == null) {
			//建立服務器實例
			serverMap.put(key, createServer(url));
		} else {
			//重置服務器
			server.reset(url);
		}
	}
}
複製代碼

如上代碼,Dubbo先從緩存中獲取已啓動的服務器實例,未命中的話就去建立。若是已經存在服務器實例,就根據url的內容重置服務器。咱們重點分析建立的過程。

private ExchangeServer createServer(URL url) {
	
	//服務器關閉時 發送readonly事件
	url = url.addParameterIfAbsent("channel.readonly.sent","true");
	//設置心跳檢測
	url = url.addParameterIfAbsent("heartbeat", "60000");
	//獲取服務器參數 默認爲netty
	String str = url.getParameter("server","netty");
	//經過 SPI 檢測是否存在 server 參數所表明的 Transporter 拓展,不存在則拋出異常
	if (str != null && str.length() > 0 && 
			!ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)){
		throw new RpcException("Unsupported server type: " + str + ", url: " + url);
	}
	//設置服務器編解碼器爲dubbo
	url = url.addParameter("codec", "dubbo");
	ExchangeServer server;
	try {
		//建立ExchangeServer
		server = Exchangers.bind(url, requestHandler);
	} catch (RemotingException e) {
		throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
	}
	str = url.getParameter(Constants.CLIENT_KEY);
	if (str != null && str.length() > 0) {
		Set<String> supportedTypes = ExtensionLoader.
						getExtensionLoader(Transporter.class).getSupportedExtensions();
		if (!supportedTypes.contains(str)) {
			throw new RpcException("Unsupported client type: " + str);
		}
	}
	return server;
}
複製代碼

上面的代碼主要分爲兩部分:設置默認參數和建立服務器實例。設置參數沒什麼好說的,下面調用到HeaderExchanger.bind方法,它只是設置封裝Handler處理器。

public class HeaderExchanger implements Exchanger {

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //封裝Handler處理器
		HeaderExchangeHandler headerExchangeHandler = new HeaderExchangeHandler(handler);
        DecodeHandler decodeHandler = new DecodeHandler(headerExchangeHandler);
		//建立服務器
        Server bind = Transporters.bind(url, decodeHandler);
		//封裝爲HeaderExchangeServer對象返回
        HeaderExchangeServer server = new HeaderExchangeServer(bind);
        return server;
    }
}
複製代碼

咱們只需關注Transporters.bind,它負責啓動服務器。

public class Transporters {
	public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
		//獲取自適應 Transporter 實例
		Transporter adaptiveExtension = ExtensionLoader.
				getExtensionLoader(Transporter.class).getAdaptiveExtension();
		//調用NettyServer.bind
        return adaptiveExtension.bind(url, handler);
    }
}
複製代碼

如上代碼,它首先獲取自適應 Transporter 實例,即TransporterAdaptive。而後根據傳入的url參數來加載哪一個Transporter,在Dubbo中默認是NettyTransporter。須要注意的是,根據Dubbo版本的不一樣,有可能使用Netty的版本也不同。

好比,筆者在Dubbo2.7快照版本中(還未發行),看到的Netty配置文件是這樣,說明它默認使用的就是Netty4:

netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter
netty= org.apache.dubbo.remoting.transport.netty4.NettyTransporter
複製代碼

在Dubbo2.6版本中,看到的Netty配置文件是這樣,說明你只要不指定Netty4,那就使用Netty3

netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
複製代碼

不過這些都無傷大雅,咱們以Netty3接着看....

public class NettyTransporter implements Transporter {
    public Server bind(URL url, ChannelHandler listener){
		//建立 NettyServer
        return new NettyServer(url, listener);
    }
}
public class NettyServer extends AbstractServer implements Server {
	public NettyServer(URL url, ChannelHandler handler) {
        super(url, ChannelHandlers.wrap(handler, 
			ExecutorUtil.setThreadName(url, "DubboServerHandler")));
    }
}
複製代碼

咱們看到, 在NettyTransporter.bind方法裏,它調用的是NettyServer構造函數,緊接着又調用父類的構造函數。

public abstract class AbstractServer extends AbstractEndpoint implements Server {

	public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
	
		//獲取 ip 和端口
        String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost());
        int bindPort = getUrl().getParameter("bind.port", getUrl().getPort());
        if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) {
			// 設置 ip 爲 0.0.0.0
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter("accepts", 0);
        this.idleTimeout = url.getParameter("idle.timeout", 600000);
        try {
			//調用子類方法 開啓服務器
            doOpen();
        }
    }
}
複製代碼

如上代碼,在父類的構造函數裏面主要是設置了一些參數,無需多說。接着咱們再看子類的doOpen實現。

protected void doOpen() throws Throwable {
	NettyHelper.setNettyLoggerFactory();
	// 建立 boss 和 worker 線程池
	// 設置線程的名稱
	ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
	ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
	ChannelFactory channelFactory = new NioServerSocketChannelFactory(
		boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
	
	//建立 ServerBootstrap
	bootstrap = new ServerBootstrap(channelFactory);

	final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
	channels = nettyHandler.getChannels();
	// 設置 PipelineFactory
	bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
		@Override
		public ChannelPipeline getPipeline() {
			NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
			ChannelPipeline pipeline = Channels.pipeline();
			/*int idleTimeout = getIdleTimeout();
			if (idleTimeout > 10000) {
				pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
			}*/
			pipeline.addLast("decoder", adapter.getDecoder());
			pipeline.addLast("encoder", adapter.getEncoder());
			pipeline.addLast("handler", nettyHandler);
			return pipeline;
		}
	});
	// 綁定到指定的 ip 和端口上
	channel = bootstrap.bind(getBindAddress());
}
複製代碼

以上方法就經過Netty啓動了通訊服務器。熟悉Netty的朋友對這段代碼必定不陌生,若是想了解更多,咱們須要關注一下它的處理器。

處理器

ChannelHandler是Netty中的核心組件之一。在這裏,Dubbo使用NettyHandler做爲消息處理器。它繼承自SimpleChannelHandler,這說明Netty接收到的事件都會由此類來處理。好比:客戶端鏈接、客戶端斷開鏈接、數據讀取、網絡異常...咱們重點來看數據讀取方法。

@Sharable
public class NettyHandler extends SimpleChannelHandler {

    public NettyHandler(URL url, ChannelHandler handler) {
        this.url = url;
        this.handler = handler;
    }
    //接收到消息
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}
複製代碼

當Netty的Selector輪詢到數據讀取事件後,將調用messageReceived方法。在這裏,它調用的是handler.received,由構造函數可得知,此處的handler對象實際上是NettyServer對象的實例。

中間它會通過AllChannelHandler,在這裏會在線程池中分配一個線程去處理。

public class AllChannelHandler extends WrappedChannelHandler {
	public void received(Channel channel, Object message) throws RemotingException {
		ExecutorService cexecutor = getExecutorService();
		cexecutor.execute(new ChannelEventRunnable(channel, 
			                      handler, ChannelState.RECEIVED, message));
	}
}	
複製代碼

ChannelEventRunnable實現Runnable接口,咱們看它的run方法。其實也很簡單,就是根據事件狀態,繼續往下調用。

public class ChannelEventRunnable implements Runnable {
    public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }
}
複製代碼

再深刻的過程我想沒必要再深究了,無非是業務邏輯處理。不過還有另一個問題,這個線程池是什麼樣的?大小多少呢? 經過跟蹤,咱們發現它是在其父類中被初始化的。它也是經過ExtensionLoader加載的

public class WrappedChannelHandler implements ChannelHandlerDelegate {

	protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;
	
	public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        ExtensionLoader<ThreadPool> extensionLoader = 
							ExtensionLoader.getExtensionLoader(ThreadPool.class);
        ThreadPool adaptiveExtension = extensionLoader.getAdaptiveExtension();
        executor = (ExecutorService) adaptiveExtension.getExecutor(url);
    }
}
複製代碼

而後咱們看ThreadPool接口標註了默認實現@SPI("fixed") ,它是一個固定數量的線程池。

public class FixedThreadPool implements ThreadPool {
    public Executor getExecutor(URL url) {
		//設置線程池參數
        String name = url.getParameter("threadname", "Dubbo");
        int threads = url.getParameter("threads", 200);
        int queues = url.getParameter("queues",0);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
複製代碼

由此咱們能夠回答上面的問題了,Dubbo中的線程池是固定線程數量大小爲200的線程池。若是線程池滿了怎麼辦?咱們再看下它的拒絕策略。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	String msg = String.format("Thread pool is EXHAUSTED!" +
					" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
					" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
			threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
			e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
			url.getProtocol(), url.getIp(), url.getPort());
	logger.warn(msg);
	dumpJStack();
	throw new RejectedExecutionException(msg);
}
複製代碼

學到了嗎?

  • 打印錯誤信息
  • 導出線程棧信息
  • 拋出異常

到此,關於服務暴露的過程就分析完了。整個過程比較複雜,你們在分析的過程當中耐心一些。而且多寫 Demo 進行斷點調試,以便可以更好的理解代碼邏輯。

2、服務註冊

服務註冊就是把已經暴露的服務信息註冊到第三方平臺,以供消費者使用。咱們把目光回到RegistryProtocol.export方法,咱們以zookeeper註冊中心爲例。

一、建立註冊中心

首先,須要根據配置文件的信息獲取到註冊中心的url,好比以zookeeper爲例: zookeeper://192.168.139.131:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo_producer1&client=zkclient&dubbo=2.6.2......

咱們直接來到ZookeeperRegistry,這裏的重點是調用connect方法建立Zookeeper 客戶端。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
	//省略部分代碼...
	
	//建立zookeeper客戶端
	zkClient = zookeeperTransporter.connect(url);
	zkClient.addStateListener(new StateListener() {
		public void stateChanged(int state) {
			if (state == RECONNECTED) {
				try {
					//從新鏈接事件
					recover();
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
			}
		}
	});
}
複製代碼

在這裏有一點須要注意,Dubbo官網說,鏈接zookeeper缺省使用zkclient。

從 2.2.0 版本開始缺省爲 zkclient 實現,以提高 zookeeper 客戶端的健狀性。

但從代碼上看,它默認使用的是curator客戶端。@SPI("curator") 這一點比較費解,因此若是想使用zkclient,要在配置文件中指定: <dubbo:registry address="zookeeper://192.168.139.131:2181?client=zkclient"/>

而後咱們接着往下繼續看,最終調用zkclient的方法完成zookeeper客戶端的建立。

public ZkclientZookeeperClient(URL url) {
	
	//異步調用ZkClient建立客戶端
	client = new ZkClientWrapper(url.getBackupAddress(), 30000);
	//監聽zookeeper狀態
	client.addListener(new IZkStateListener() {
		@Override
		public void handleStateChanged(KeeperState state) throws Exception {
			ZkclientZookeeperClient.this.state = state;
			if (state == KeeperState.Disconnected) {
				stateChanged(StateListener.DISCONNECTED);
			} else if (state == KeeperState.SyncConnected) {
				stateChanged(StateListener.CONNECTED);
			}
		}
		@Override
		public void handleNewSession() throws Exception {
			stateChanged(StateListener.RECONNECTED);
		}
	});
	client.start();
}
複製代碼

二、建立節點

建立節點很簡單,就是將服務配置數據寫入到 Zookeeper 的某個路徑的節點下。

protected void doRegister(URL url) {
	try {
		zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
	} catch (Throwable e) {
		throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
	}
}
複製代碼

咱們看下zookeeper中已經建立好的節點信息:

服務配置信息

3、總結

至此,Dubbo中的服務暴露全過程咱們已經分析完了。因爲篇幅問題,筆者將它們分爲了上下兩篇。字數比較多,邏輯也較爲複雜,若是文章有不妥錯誤之處,但願你們提出寶貴意見。

咱們再回憶一下整個流程:

  • 經過Spring接口調用初始化方法
  • 配置信息檢查以及缺省值設置
  • 建立服務類ref Invoker
  • 服務暴露監聽、構建調用鏈
  • 本地暴露
  • 遠程暴露
  • 啓動Netty通訊服務器,監聽端口
  • 鏈接zookeeper建立節點,將已暴露的服務信息寫入註冊中心
相關文章
相關標籤/搜索