消息中間件

一、什麼是消息中間件?php

      關注於數據的發送和接收,利用高效可靠地異步消息傳遞機制集成分佈式系統。java

二、什麼是JMS? java - apilinux

      Java 消息服務(Java Message Service)即JMS, 是一個Java 平臺(不能跨語言)中關於面向消息中間件的API,用戶兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。c++

三、AMQP協議(爲了跨語言) Wire-protocaol, 只支持 byte[] 二進制的消息類型c#

   AMQP(advanced message queuing protocol) 是一個提供統一消息服務的應用層標準協議,給予此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣開發語言等條件的限制。api

四、經常使用的消息中間件服務器

      <1> ActiveMQ 徹底支持JMS1.1與J2EE 1.4規範的JMS Provider實現。session

             支持多種語言 Java c c++ c# Ruby Perl Python PHP,支持的應用協議:OpenWire, Stomp REST, WS Notification, Xmpp, AMQP  (沒有JMS 由於不是應用協議,只是開發規範)。負載均衡

      <2> RabbitMQ  是一個開源的AMQP實現,服務器端使用Erlang語言編寫,用於在分佈式系統中存儲轉發消息,在易用性和擴展性 高可用等方便變現不俗dom

             支持多種客戶端, 如 Python Ruby .Net  java jms c php等

      <3> kafka  是一種高吞吐量的分佈式發佈訂閱消息系統,是一個分佈式的,分區的,可靠地分佈式日誌存儲服務,它經過一種獨一無二的設計提供了一個消息系統的功能。不是一個嚴格的消息中間件,

     主要是用來作日誌儲存。即便是很是普通的硬件kafka也能夠支持每秒數百萬的消息。

     比對:

     

五、消息模式

   主題和隊列兩種模式。

      隊列模型: 客戶端包含生產者和消費者

                         隊列中的消息只能被一個消費者消費

                         消費者能夠隨時消費隊列中的消息

 

     主題模型:  客戶端包括髮布者和訂閱者

         主題中的消息被全部的訂閱者消費

         消費者不能消費訂閱以前就發送到主題中的消息

 

六、JMS 編碼接口之間的關係

七、隊列模式

在linux上安裝activemq部分此處省略

安裝完後打開activemq平臺,訪問地址http://192.168.37.128:8161,登陸帳號和密碼都是admin

下面咱們來建立一個生產者:

public class AppProducer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立鏈接工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 創建鏈接
		Connection connection = connectionFactory.createConnection();
		// 啓動鏈接
		connection.start();
		// 建立會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一個目標
		Destination destination = session.createQueue(QUEUE_NAME);
		// 建立一個生產者
		MessageProducer producer = session.createProducer(destination);
		// 發送消息
		for(int i=0; i < 100; i++){
			// 建立消息
			TextMessage textMessage = session.createTextMessage("test" + i);
			// 發送消息
			producer.send(textMessage);
            System.out.println("send" + textMessage.getText());					
		}
		// 關閉鏈接
		connection.close();
	
	}
}

 運行成功後,咱們查看activemq平臺,發現隊列中有100個待消費的消息:

ok, 下面咱們來建立一個消費者:

public class AppConsumer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立鏈接工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 創建鏈接
		Connection connection = connectionFactory.createConnection();
		// 啓動鏈接
		connection.start();
		// 建立會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一個目標
		Destination destination = session.createQueue(QUEUE_NAME);
		// 建立一個消費者
		MessageConsumer consumer = session.createConsumer(destination);
		// 消費消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message arg0) {
				TextMessage textMessage = (TextMessage)arg0;
				System.out.println("receive" + textMessage);
				
			}
		});
	
	}
}

運行成功後,咱們查看activemq平臺,發現隊列中100個待消費的消息所有被消費:

8 主題模式  主題模式下,消費者必需要比訂閱者先啓動,不然不會收到消息。

   主題模式和隊列模式,代碼機會相同,只須要在建立目標的時候改爲建立主題。

public class AppTopicProducer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立鏈接工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 創建鏈接
		Connection connection = connectionFactory.createConnection();
		// 啓動鏈接
		connection.start();
		// 建立會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一個目標
		Destination destination = session.createTopic(QUEUE_NAME);
		// 建立一個生產者
		MessageProducer producer = session.createProducer(destination);
		// 發送消息
		for(int i=0; i < 100; i++){
			// 建立消息
			TextMessage textMessage = session.createTextMessage("test" + i);
			// 發送消息
			producer.send(textMessage);
            System.out.println("send" + textMessage.getText());					
		}
		// 關閉鏈接
		connection.close();
	
	}
}

  

public class AppTopicConsumer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立鏈接工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 創建鏈接
		Connection connection = connectionFactory.createConnection();
		// 啓動鏈接
		connection.start();
		// 建立會話
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一個目標
		Destination destination = session.createTopic(QUEUE_NAME);
		// 建立一個消費者
		MessageConsumer consumer = session.createConsumer(destination);
		// 消費消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message arg0) {
				TextMessage textMessage = (TextMessage)arg0;
				System.out.println("receive" + textMessage);
				
			}
		});
	
	}
}

之因此 會是 200個消息,可是隻消費了 100個, 是由於在 消費者訂閱以前, 就已經產生了100個消息,因此這100個不會被消費。

 9  ActiveMQ集羣配置

爲何要對消息中間件集羣?

  實現高可用,以排除單點故障引發的服務中斷

  實現負載均衡,以提高效率爲更多的客戶提供服務

集羣方式:

       客戶端集羣:讓多個消費者消費同一隊列

    Broker clusters:多個Broker之間同步消息

       Master Slave:實現高可用

客戶端配置

      ActiveMQ 失效轉移(failover)

      容許當其中一臺消息服務器宕機時,客戶端在傳輸層上從新鏈接到其餘消息服務器。

      語法:failover:(url1,...,urln)?transportOptions

      transportOptions 說明:

            randomize 默認爲true, 表示在URL列表中選擇URL連接時 是否採用隨機策略

            initialReconnectDelay 默認爲10,單位毫秒,表示第一次嘗試重連之間等待時間

            maxReconnectDelay 默認30000, 單位毫秒,最長重連的時間間隔

Broker Cluster 集羣配置

           原理: 

           節點A的消息 能夠同步到 節點B, 節點B的消息也能夠同步到節點A上, 節點A產生的消息能夠被節點B的消費者消費, 節點B產生的消息能夠被節點A的消費者消費。

Master/Slave集羣配置

集羣方案

      Share nothing storage master/slave (已過期,5.8+後移除)

      Shared storage master/slave 共享存儲

           Replicated LevelDB Store 基於複製的LevelDB Store   === > 基於zookeeper 的 master 選擇方案

           共享存儲集羣的原理

            

            當前 節點A得到鎖資源,成爲master , 節點B就沒法得到鎖資源,一直在等待。若是某個時刻,節點A掛掉,那麼節點B會得到鎖資源,成爲master, 節點A 成爲slave

                 

            下面是 基於複製的LevelDB Store 的原理

            

            基於zk, 此時節點A被zk選舉爲Master,  此時節點A做爲與外界溝通的口子,當A接收到新的信息,則會在本地進行存儲,而且經過zk 傳輸到節點B和節點C上進行本地存儲。

兩種集羣方式對比:

           master/slave 方式 只支持高可用,可是不支持負載均衡。  

           Broker Cluster 支持 負載均衡,可是不支持高可用。

下面來看一種 便可以高可用,又能夠負載均衡的方式。

      咱們使用三臺服務器的完美集羣方案:

節點B和節點C實現高可用, 節點A爲 節點B 或者C 的負載均衡。 可是此方案,一旦節點B 和 節點C 所有掛掉,那麼整個系統也就掛掉了,因此咱們要使用更多太服務器來防止多臺服務器宕機的場景。

相關文章
相關標籤/搜索