ActiveMQ支持多種通信協議TCP/UDP等,咱們選取最經常使用的TCP來分析ActiveMQ的通信機制。首先咱們來明確一個概念:
客戶(Client):消息的生產者、消費者對ActiveMQ來講都叫做客戶。
消息中介(Message broker):接收消息並進行相關處理後分發給消息的消費者.網絡
爲了能清楚的描述出ActiveMQ的核心通信機制,咱們選擇3個部分來進行說明,它們分別是創建連接、關閉連接、心跳。 併發
1、Client跟activeMQ的TCP通信的初始化過程分析以下:
(1) ActiveMQ初始化時,經過TcpTransportServer類根據配置打開TCP偵聽端口,客戶端經過該端口發起創建連接的動做。
(2) 把接收到的socket放入阻塞隊列。
(3) 另一個線程Socket handler阻塞着,監聽是否有新的socket,若是有則取出來。
(4) 生成一個TransportConnection的實例。TransportConnection類的主要做用是處理鏈路的狀態信息,並實現CommandVisitor接口來完成各種消息的處理。
(5) TransportConnection會使用一個由多個TransportFilter實例組成的消息處理鏈條,負責對接收到的各種消息進行處理併發送相應的應答。這個鏈條的典型組成順序: socket
MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在這條鏈條中最後的一環是TcpTransport類,它是實際和Client獲取和發送數據的地方,該類的重要方法有run()和oneway(),一個負責讀取,一個負責發送。
(6) 建鏈完成,能夠進行通信操做。 spa
2、關閉連接
ActiveMQ發現TCP連接的關閉,最關鍵的代碼在TcpBufferedInputStream類中的 線程
int n = in.read(buffer, position, buffer.length - position); 3d
3、心跳
爲了更好的維護TCP鏈路的使用,ActiveMQ採用了心跳機制做爲判斷雙方鏈路的健康狀況。ActiveMQ使用的是雙向心跳,也就是ActiveMQ的Broker和Client雙方都進行相互心跳,但無論是Broker或Client心跳的具體處理狀況是徹底同樣的,都在InactivityMonitor類中實現,下面具體介紹。
心跳會產生兩個線程「InactivityMonitor ReadCheck」和「InactivityMonitor WriteCheck」,它們都是Timer類型,都會隔一段固定時間被調用一次。ReadCheck線程主要調用的方法是readCheck(),當在等待時間內,有消息接收到,則該方法會返回true。WriteCheck線程主要調用的方法是writeCheck()。orm
這有個小技巧,你們能夠參考一下,那就是當WriteCheck線程休眠時,有任何數據發送成功,則該線程被喚醒後,不用經過TCP向對方真的發送心跳消息,這樣能夠從必定程度上減小網絡傳輸的數據量。blog