客戶端跟服務端鏈接創建,分兩步:服務器
實例化ChannelManager的時候,根據配置的第一個server,從遠程服務器讀取服務器列表,若是能讀取到,則順序創建鏈接,直到創建成功爲止;若是不能 讀到,則根據本地配置的列表,逐個創建鏈接,直到成功爲止。負載均衡
每間隔10s,檢查當前channelFuture是否活躍,活躍,則300s檢查一次,不活躍,則執行檢查。檢查的邏輯是:比較本地server列表跟遠程服務提供的列表是否相等,不相等則根據遠程服務提供的server列表順序的從新創建第一個能用的ChannelFuture異步
若是有積壓,或者關閉 掉了,則關閉當前鏈接,將activeIndex=-1,表示當前鏈接不可用。tcp
從0到activeIndex中找一個能鏈接的server,中心創建一個鏈接。若是activeIndex爲-1,則從整個的server列表中順序的找一個可用的鏈接創建鏈接。ide
客戶端實例化DefaultTransportManager對象時,回按照以下流程先實例化m_tcpSocketSender,接着實例化ChannelManager。ChannelManager管理對服務端的netty鏈接。 實例化流程以下: 線程
ChannelManager經過ChannelHolder把netty的ChannnelFuture封裝起來。ChannnelFuture結構以下:netty
public static class ChannelHolder { /** * 當前活躍的channelFuture */ private ChannelFuture m_activeFuture; /** * 當前server在m_serverAddresses中的第幾個 */ private int m_activeIndex = -1; /** * 當前活躍的ChannelFuture對應的配置 */ private String m_activeServerConfig; /** * 從配置文件中讀取的服務端列表 */ private List<InetSocketAddress> m_serverAddresses; /** * 當前活躍的ChannelFutre對應的ip */ private String m_ip; /** * 鏈接從第一次初始化開始,是否發生過變動 */ private boolean m_connectChanged; //省略其它的代碼 }
ChannelManager內部每隔10秒鐘,檢查netty鏈接。這部分代碼以下:code
@Override public void run() { while (m_active) { /* * make save message id index asyc * 本地存儲index,和 時間戳,防止重啓,致使本地的消息id重了 */ m_idfactory.saveMark(); /** * 檢查本地初始化的服務列表跟遠程的服務列表是否有差別,若是有差別,則取遠程第一個能創建鏈接的server,創建一個新的鏈接, * 關閉舊的鏈接 */ checkServerChanged(); ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture(); List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); /** * 檢查當前channelFuture是否有消息積壓(本地隊列長度超過4990),或者 channelFuture不是開的 * @param activeFuture */ doubleCheckActiveServer(activeFuture); /** * 從serverAddresses列表裏面,重新順序選一個,從新鏈接 */ reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore } } }
總結:服務端沒有作到負載均衡,鏈接會慢慢鏈接到server列表裏面第一個可用的server上。server