大衆點評Cat源碼閱讀(七)——客戶端選擇server的機制

1、概要思路

客戶端跟服務端鏈接創建,分兩步:服務器

  1. 初始ChannelMananger的時候 ;
  2. ChannelManager異步線程,每隔10秒作一次檢查。

1.1 初始ChannelMananger的時候

實例化ChannelManager的時候,根據配置的第一個server,從遠程服務器讀取服務器列表,若是能讀取到,則順序創建鏈接,直到創建成功爲止;若是不能 讀到,則根據本地配置的列表,逐個創建鏈接,直到成功爲止。負載均衡

1.2 ChannelManager異步線程,每隔10秒作一次檢查

1.2.1 檢查server列表是否變動

每間隔10s,檢查當前channelFuture是否活躍,活躍,則300s檢查一次,不活躍,則執行檢查。檢查的邏輯是:比較本地server列表跟遠程服務提供的列表是否相等,不相等則根據遠程服務提供的server列表順序的從新創建第一個能用的ChannelFuture異步

1.2.2 查看當前客戶端是否有積壓,或者channelFuture是否被關閉

若是有積壓,或者關閉 掉了,則關閉當前鏈接,將activeIndex=-1,表示當前鏈接不可用。tcp

1.2.3 重連默認server

從0到activeIndex中找一個能鏈接的server,中心創建一個鏈接。若是activeIndex爲-1,則從整個的server列表中順序的找一個可用的鏈接創建鏈接。ide

2、ChannelManager 實例化,創建netty鏈接邏輯

客戶端實例化DefaultTransportManager對象時,回按照以下流程先實例化m_tcpSocketSender,接着實例化ChannelManager。ChannelManager管理對服務端的netty鏈接。 實例化流程以下: 輸入圖片說明線程

ChannelManager經過ChannelHolder把netty的ChannnelFuture封裝起來。ChannnelFuture結構以下:netty

public static class ChannelHolder {
		/**
		 * 當前活躍的channelFuture
		 */
		private ChannelFuture m_activeFuture;

		/**
		 * 當前server在m_serverAddresses中的第幾個
		 */
		private int m_activeIndex = -1;

		/**
		 * 當前活躍的ChannelFuture對應的配置
		 */
		private String m_activeServerConfig;

		/**
		 * 從配置文件中讀取的服務端列表
		 */
		private List<InetSocketAddress> m_serverAddresses;

		/**
		 * 當前活躍的ChannelFutre對應的ip
		 */
		private String m_ip;

		/**
		 * 鏈接從第一次初始化開始,是否發生過變動
		 */
		private boolean m_connectChanged;
                
                //省略其它的代碼
}

3、ChannelManager內部異步線程,動態切換netty鏈接邏輯

ChannelManager內部每隔10秒鐘,檢查netty鏈接。這部分代碼以下:code

@Override
	public void run() {
		while (m_active) {
			/*
			 * make save message id index asyc
			 * 本地存儲index,和 時間戳,防止重啓,致使本地的消息id重了
			 */
			m_idfactory.saveMark();
			
			/**
			 * 檢查本地初始化的服務列表跟遠程的服務列表是否有差別,若是有差別,則取遠程第一個能創建鏈接的server,創建一個新的鏈接,
			 * 關閉舊的鏈接
			 */
			checkServerChanged();

			ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
			List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

			/**
			 * 檢查當前channelFuture是否有消息積壓(本地隊列長度超過4990),或者 channelFuture不是開的
			 * @param activeFuture
			 */
			doubleCheckActiveServer(activeFuture);
			/**
			 * 從serverAddresses列表裏面,重新順序選一個,從新鏈接
			 */
			reconnectDefaultServer(activeFuture, serverAddresses);

			try {
				Thread.sleep(10 * 1000L); // check every 10 seconds
			} catch (InterruptedException e) {
				// ignore
			}
		}
	}

總結:服務端沒有作到負載均衡,鏈接會慢慢鏈接到server列表裏面第一個可用的server上。server

相關文章
相關標籤/搜索