dubbo、zookeeper心跳相關參數解析與測試

dubbo consumer和provider的心跳機制

dubbo客戶端和dubbo服務端之間存在心跳,目的是維持provider和consumer之間的長鏈接。由dubbo客戶端主動發起,可參見dubbo源碼 HeartbeatTask。dubbo心跳時間heartbeat默認是60s,超過heartbeat時間沒有收到消息,就發送心跳消息(provider,consumer同樣),若是連着3次(heartbeatTimeout爲heartbeat*3)沒有收到心跳響應,provider會關閉channel,而consumer會進行重連;不管是provider仍是consumer的心跳檢測都是經過啓動定時任務的方式實現。html

  • provider綁定和consumer鏈接的入口:
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

  

  • provider啓動心跳檢測
public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        //心跳超時時間默認爲心跳時間的3倍
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        //若是心跳超時時間小於心跳時間的兩倍則拋異常
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
    }

  

  • startHeatbeatTimer的實現 
    • 先中止已有的定時任務,啓動新的定時任務
private void startHeatbeatTimer() {
        // 中止原有定時任務
        stopHeartbeatTimer();
        // 發起新的定時任務
        if (heartbeat > 0) {
            heatbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

 

  • HeartBeatTask的實現 
    • 遍歷全部的channel,檢測心跳間隔,若是超過心跳間隔沒有讀或寫,則發送須要回覆的心跳消息,最有判斷是否心跳超時(heartbeatTimeout),若是超時,provider關閉channel,consumer進行重連
public void run() {
        try {
            long now = System.currentTimeMillis();
            for (Channel channel : channelProvider.getChannels()) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 讀寫的時間,任一超過心跳間隔,發送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true); // 須要響應的心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 最後讀的時間,超過心跳超時時間
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 客戶端側,從新鏈接服務端
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        // 服務端側,關閉客戶端鏈接
                        } else {
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

 

  • consumer端的實現 
    • 默認須要心跳檢測
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 建立 HeaderExchangeChannel 對象
        this.channel = new HeaderExchangeChannel(client);
        // 讀取心跳相關配置
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) { // 避免間隔過短
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        // 發起心跳定時器
        if (needHeartbeat) {
            startHeatbeatTimer();
        }
 
  

dubbo客戶端/服務端和註冊中心(zk)存在心跳

由dubbo客戶端/服務端發起,這是基於zk集羣和zk客戶端之間的心跳機制。由zk參數tickTime(這個時間是做爲Zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔,每隔tickTime時間就會發送一個心跳;最小的session過時時間爲2倍tickTime)控制間隔,可是實際狀況是咱們發現心跳間隔是tickTime的1/2,以下:java

[] 2019-08-07 14:51:46 [5189311] [ERROR] Curator-Framework-0 org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:200) Connection timed out for connection string (localhost:2181) and timeout (5000) / elapsed (27053)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
        at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:116) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) [curator-framework-2.10.0.jar!/:?]
        at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_211]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_211]
[] 2019-08-07 14:51:46 [5190082] [WARN] Curator-Framework-0-SendThread(0:0:0:0:0:0:0:1:2181) org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1102) Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused: no further information
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_211]
        at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[?:1.8.0_211]
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) [zookeeper-3.4.6.jar!/:3.4.6-1569965]
[] 2019-08-07 14:51:47 [5190312] [ERROR] Curator-Framework-0 org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:200) Connection timed out for connection string (localhost:2181) and timeout (5000) / elapsed (28054)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
        at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:116) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) [curator-framework-2.10.0.jar!/:?]
        at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_211]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_211]
[] 2019-08-07 14:51:47 [5191182] [INFO] Curator-Framework-0-SendThread(k3ctest.yidooo.com:2181) org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:975) Opening socket connection to server k3ctest.yidooo.com/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[] 2019-08-07 14:51:48 [5191314] [ERROR] Curator-Framework-0 org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:200) Connection timed out for connection string (localhost:2181) and timeout (5000) / elapsed (29056)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
        at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:116) [curator-client-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) [curator-framework-2.10.0.jar!/:?]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) [curator-framework-2.10.0.jar!/:?]
        at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_211]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_211]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_211]

  

 集羣節點間的同步時間apache

(1)initLimit服務器

此配置表示,容許follower(相對於Leaderer言的「客戶端」)鏈接並同步到Leader的初始化鏈接時間,以tickTime爲單位。當初始化鏈接時間超過該值,則表示鏈接失敗。markdown

(2)syncLimitsession

此配置項表示Leader與Follower之間發送消息時,請求和應答時間長度。若是follower在設置時間內不能與leader通訊,那麼此follower將會被丟棄。socket

最後要知道Leader節點是單點的,負責全部事務的協調,若是leader掛掉,須要知道它如何被從新選舉出,能夠參考:https://blog.51cto.com/14214194/2376270。ide

相關文章
相關標籤/搜索