ActiveMQ之 TCP通信機制

  

  ActiveMQ支持多種通信協議TCP/UDP等,咱們選取最經常使用的TCP來分析ActiveMQ的通信機制。首先咱們來明確一個概念
  客戶(Client):消息的生產者、消費者對ActiveMQ來講都叫做客戶。 
  消息中介(Message broker)接收消息並進行相關處理後分發給消息的消費者.網絡

  

     

 

  爲了能清楚的描述出ActiveMQ的核心通信機制,咱們選擇3個部分來進行說明,它們分別是創建連接、關閉連接、心跳。 併發

1、ClientactiveMQTCP通信的初始化過程分析以下: 
  (1) ActiveMQ初始化時,經過TcpTransportServer類根據配置打開TCP偵聽端口,客戶端經過該端口發起創建連接的動做。 
  (2) 把接收到的socket放入阻塞隊列。 
  (3) 另一個線程Socket handler阻塞着,監聽是否有新的socket,若是有則取出來。 
  (4) 生成一個TransportConnection的實例。TransportConnection類的主要做用是處理鏈路的狀態信息,並實現CommandVisitor接口來完成各種消息的處理。 
  (5) TransportConnection會使用一個由多個TransportFilter實例組成的消息處理鏈條,負責對接收到的各種消息進行處理併發送相應的應答。這個鏈條的典型組成順序: socket

    MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在這條鏈條中最後的一環是TcpTransport類,它是實際和Client獲取和發送數據的地方,該類的重要方法有run()oneway(),一個負責讀取,一個負責發送。 
  (6) 建鏈完成,能夠進行通信操做。 spa


2、關閉連接 
  ActiveMQ發現TCP連接的關閉,最關鍵的代碼在TcpBufferedInputStream類中的 線程

  int n = in.read(buffer, position, buffer.length - position); 3d


3、心跳 
  爲了更好的維護TCP鏈路的使用,ActiveMQ採用了心跳機制做爲判斷雙方鏈路的健康狀況。ActiveMQ使用的是雙向心跳,也就是ActiveMQBrokerClient雙方都進行相互心跳,但無論是BrokerClient心跳的具體處理狀況是徹底同樣的,都在InactivityMonitor類中實現,下面具體介紹。 
  心跳會產生兩個線程InactivityMonitor ReadCheck」InactivityMonitor WriteCheck」,它們都是Timer類型,都會隔一段固定時間被調用一次。ReadCheck線程主要調用的方法是readCheck(),當在等待時間內,有消息接收到,則該方法會返回trueWriteCheck線程主要調用的方法是writeCheck()orm

  這有個小技巧,你們能夠參考一下,那就是當WriteCheck線程休眠時,有任何數據發送成功,則該線程被喚醒後,不用經過TCP向對方真的發送心跳消息,這樣能夠從必定程度上減小網絡傳輸的數據量。blog

相關文章
相關標籤/搜索