一、什麼是消息中間件?php
關注於數據的發送和接收,利用高效可靠地異步消息傳遞機制集成分佈式系統。java
二、什麼是JMS? java - apilinux
Java 消息服務(Java Message Service)即JMS, 是一個Java 平臺(不能跨語言)中關於面向消息中間件的API,用戶兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。c++
三、AMQP協議(爲了跨語言) Wire-protocaol, 只支持 byte[] 二進制的消息類型c#
AMQP(advanced message queuing protocol) 是一個提供統一消息服務的應用層標準協議,給予此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣開發語言等條件的限制。api
四、經常使用的消息中間件服務器
<1> ActiveMQ 徹底支持JMS1.1與J2EE 1.4規範的JMS Provider實現。session
支持多種語言 Java c c++ c# Ruby Perl Python PHP,支持的應用協議:OpenWire, Stomp REST, WS Notification, Xmpp, AMQP (沒有JMS 由於不是應用協議,只是開發規範)。負載均衡
<2> RabbitMQ 是一個開源的AMQP實現,服務器端使用Erlang語言編寫,用於在分佈式系統中存儲轉發消息,在易用性和擴展性 高可用等方便變現不俗dom
支持多種客戶端, 如 Python Ruby .Net java jms c php等
<3> kafka 是一種高吞吐量的分佈式發佈訂閱消息系統,是一個分佈式的,分區的,可靠地分佈式日誌存儲服務,它經過一種獨一無二的設計提供了一個消息系統的功能。不是一個嚴格的消息中間件,
主要是用來作日誌儲存。即便是很是普通的硬件kafka也能夠支持每秒數百萬的消息。
比對:
五、消息模式
主題和隊列兩種模式。
隊列模型: 客戶端包含生產者和消費者
隊列中的消息只能被一個消費者消費
消費者能夠隨時消費隊列中的消息
主題模型: 客戶端包括髮布者和訂閱者
主題中的消息被全部的訂閱者消費
消費者不能消費訂閱以前就發送到主題中的消息
六、JMS 編碼接口之間的關係
七、隊列模式
在linux上安裝activemq部分此處省略
安裝完後打開activemq平臺,訪問地址http://192.168.37.128:8161,登陸帳號和密碼都是admin
下面咱們來建立一個生產者:
public class AppProducer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 創建鏈接 Connection connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個目標 Destination destination = session.createQueue(QUEUE_NAME); // 建立一個生產者 MessageProducer producer = session.createProducer(destination); // 發送消息 for(int i=0; i < 100; i++){ // 建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 發送消息 producer.send(textMessage); System.out.println("send" + textMessage.getText()); } // 關閉鏈接 connection.close(); } }
運行成功後,咱們查看activemq平臺,發現隊列中有100個待消費的消息:
ok, 下面咱們來建立一個消費者:
public class AppConsumer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 創建鏈接 Connection connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個目標 Destination destination = session.createQueue(QUEUE_NAME); // 建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 消費消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage)arg0; System.out.println("receive" + textMessage); } }); } }
運行成功後,咱們查看activemq平臺,發現隊列中100個待消費的消息所有被消費:
8 主題模式 主題模式下,消費者必需要比訂閱者先啓動,不然不會收到消息。
主題模式和隊列模式,代碼機會相同,只須要在建立目標的時候改爲建立主題。
public class AppTopicProducer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 創建鏈接 Connection connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個目標 Destination destination = session.createTopic(QUEUE_NAME); // 建立一個生產者 MessageProducer producer = session.createProducer(destination); // 發送消息 for(int i=0; i < 100; i++){ // 建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 發送消息 producer.send(textMessage); System.out.println("send" + textMessage.getText()); } // 關閉鏈接 connection.close(); } }
public class AppTopicConsumer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 創建鏈接 Connection connection = connectionFactory.createConnection(); // 啓動鏈接 connection.start(); // 建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個目標 Destination destination = session.createTopic(QUEUE_NAME); // 建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 消費消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage)arg0; System.out.println("receive" + textMessage); } }); } }
之因此 會是 200個消息,可是隻消費了 100個, 是由於在 消費者訂閱以前, 就已經產生了100個消息,因此這100個不會被消費。
9 ActiveMQ集羣配置
爲何要對消息中間件集羣?
實現高可用,以排除單點故障引發的服務中斷
實現負載均衡,以提高效率爲更多的客戶提供服務
集羣方式:
客戶端集羣:讓多個消費者消費同一隊列
Broker clusters:多個Broker之間同步消息
Master Slave:實現高可用
客戶端配置
ActiveMQ 失效轉移(failover)
容許當其中一臺消息服務器宕機時,客戶端在傳輸層上從新鏈接到其餘消息服務器。
語法:failover:(url1,...,urln)?transportOptions
transportOptions 說明:
randomize 默認爲true, 表示在URL列表中選擇URL連接時 是否採用隨機策略
initialReconnectDelay 默認爲10,單位毫秒,表示第一次嘗試重連之間等待時間
maxReconnectDelay 默認30000, 單位毫秒,最長重連的時間間隔
Broker Cluster 集羣配置
原理:
節點A的消息 能夠同步到 節點B, 節點B的消息也能夠同步到節點A上, 節點A產生的消息能夠被節點B的消費者消費, 節點B產生的消息能夠被節點A的消費者消費。
Master/Slave集羣配置
集羣方案
Share nothing storage master/slave (已過期,5.8+後移除)
Shared storage master/slave 共享存儲
Replicated LevelDB Store 基於複製的LevelDB Store === > 基於zookeeper 的 master 選擇方案
共享存儲集羣的原理
當前 節點A得到鎖資源,成爲master , 節點B就沒法得到鎖資源,一直在等待。若是某個時刻,節點A掛掉,那麼節點B會得到鎖資源,成爲master, 節點A 成爲slave
下面是 基於複製的LevelDB Store 的原理
基於zk, 此時節點A被zk選舉爲Master, 此時節點A做爲與外界溝通的口子,當A接收到新的信息,則會在本地進行存儲,而且經過zk 傳輸到節點B和節點C上進行本地存儲。
兩種集羣方式對比:
master/slave 方式 只支持高可用,可是不支持負載均衡。
Broker Cluster 支持 負載均衡,可是不支持高可用。
下面來看一種 便可以高可用,又能夠負載均衡的方式。
咱們使用三臺服務器的完美集羣方案:
節點B和節點C實現高可用, 節點A爲 節點B 或者C 的負載均衡。 可是此方案,一旦節點B 和 節點C 所有掛掉,那麼整個系統也就掛掉了,因此咱們要使用更多太服務器來防止多臺服務器宕機的場景。