1、什麼是消息中間件(MQ)html
1.1 爲何會須要消息隊列(MQ)?java
主要緣由是因爲在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說,大量的insert,update之類的請求同時到達MySQL,直接致使無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。經過使用消息隊列,咱們能夠異步處理請求,從而緩解系統的壓力。數據庫
2.2 什麼是消息中間件macos
消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。對於消息中間件,常見的角色大體也就有Producer(生產者)、Consumer(消費者)
常見的消息中間件產品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。咱們在本次課程中介紹 ActiveMQ的使用。
(2)RabbitMQ
AMQP協議的領導實現,支持多種場景。淘寶的MySQL集羣內部有使用它進行通信,OpenStack開源雲平臺的通訊組件,最早在金融行業獲得運用。
(3)ZeroMQ
史上最快的消息隊列系統
(4)Kafka
Apache下的一個子項目 。特色:高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統。適合處理海量數據。apache
2、JMS簡介服務器
2.1 什麼是JMS session
JMS(Java Messaging Service)是Java平臺上有關面向消息中間件的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。併發
JMS自己只定義了一系列的接口規範,是一種與廠商無關的 API,用來訪問消息收發系統。它相似於 JDBC(java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您可以經過消息收發服務(有時稱爲消息中介程序或路由器)從一個 JMS 客戶機向另外一個 JML 客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。
JMS 定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。異步
· TextMessage--一個字符串對象
· MapMessage--一套名稱-值對
· ObjectMessage--一個序列化的 Java 對象
· BytesMessage--一個字節的數據流
· StreamMessage -- Java 原始值的數據流tcp
2.2 JMS 消息傳遞類型
對於消息的傳遞有兩種類型:
一種是點對點的,即一個生產者和一個消費者一一對應:
另外一種是發佈/ 訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收:
3、ActiveMQ下載與安裝
3.1 下載
官方下載地址:http://activemq.apache.org/activemq-5153-release.html
3.2 安裝(OSX,Linux類同)
解壓下載文件,apache-activemq-5.12.0-bin.tar.gz
tar zxvf apache-activemq-5.12.0-bin.tar.gz
爲apache-activemq-5.12.0目錄賦權
chmod 777 apache-activemq-5.12.0
3.3 啓動、訪問與關閉
#切換至安裝目錄macosx下
macosx Mac$ pwd
/Users/Mac/JavaUtils/apache-activemq-5.15.3/bin/macosx
#啓動activemq服務
macosx Mac$ ./activemq start
Starting ActiveMQ Broker...
#關閉activemq服務
:macosx Mac$ ./activemq stop
Stopping ActiveMQ Broker...
Stopped ActiveMQ Broker.
:macosx Mac$
4、JMS入門
4.1 點對點模式
一個生成者產生一個消息 只能被被一個消費者消費,消費完,消息就沒有了。
4.1.1 消息生產者
(1)建立工程,引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
(2)建立生產者
1 public class QueueProducer { 2 3 public static void main(String[] args) throws JMSException { 4 //1.建立鏈接工廠 5 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 6 //2.獲取鏈接 7 Connection connection = connectionFactory.createConnection(); 8 //3.啓動鏈接 9 connection.start(); 10 /*4.獲取session (參數1:是否啓動事務, 11 參數2:消息確認模式[ 12 AUTO_ACKNOWLEDGE = 1 自動確認 13 CLIENT_ACKNOWLEDGE = 2 客戶端手動確認 14 DUPS_OK_ACKNOWLEDGE = 3 自動批量確認 15 SESSION_TRANSACTED = 0 事務提交併確認 16 ])*/ 17 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 18 //5.建立隊列對象 19 Queue queue = session.createQueue("test-queue"); 20 //6.建立消息生產者 21 MessageProducer producer = session.createProducer(queue); 22 //7.建立消息 23 TextMessage textMessage = session.createTextMessage("歡迎來到MQ世界"); 24 //8.發送消息 25 producer.send(textMessage); 26 //9.關閉資源 27 producer.close(); 28 session.close(); 29 connection.close(); 30 } 31 32 }
(3)運行經過界面查看
4.1.2 消息消費者
(1)建立消息消費者
1 public class QueueConsumer { 2 public static void main(String[] args) throws JMSException, IOException { 3 //1.建立鏈接工廠 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 5 //2.獲取鏈接 6 Connection connection = connectionFactory.createConnection(); 7 //3.啓動鏈接 8 connection.start(); 9 //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) 10 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 11 //5.建立隊列對象 12 Queue queue = session.createQueue("test-queue"); 13 //6.建立消息消費者 14 MessageConsumer consumer = session.createConsumer(queue); 15 //7.監聽消息 16 consumer.setMessageListener(new MessageListener() { 17 @Override 18 public void onMessage(Message message) { 19 TextMessage textMessage = (TextMessage) message; 20 try { 21 System.out.println("接收到消息:"+textMessage.getText()); 22 } catch (JMSException e) { 23 e.printStackTrace(); 24 } 25 } 26 }); 27 //8.等待鍵盤輸入 28 System.in.read(); 29 //9.關閉資源 30 consumer.close(); 31 session.close(); 32 connection.close(); 33 } 34 35 }
(2)運行查看控制檯輸出與經過界面查看
4.2 發佈/訂閱模式
4.2.1 消息生產者
1 public class TopicProducer { 2 3 public static void main(String[] args) throws JMSException { 4 //1.建立鏈接工廠 5 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 6 //2.獲取鏈接 7 Connection connection = connectionFactory.createConnection(); 8 //3.啓動鏈接 9 connection.start(); 10 /*4.獲取session (參數1:是否啓動事務, 11 參數2:消息確認模式)*/ 12 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 13 //5.建立主題對象 14 Topic topic = session.createTopic("test-topic"); 15 //6.建立消息生產者 16 MessageProducer producer = session.createProducer(topic); 17 //7.建立消息 18 TextMessage textMessage = session.createTextMessage("歡迎來到MQ世界!"); 19 //8.發送消息 20 producer.send(textMessage); 21 //9.關閉資源 22 producer.close(); 23 session.close(); 24 connection.close(); 25 } 26 27 }
4.2.2 消息2個消費者(消費者2代碼同消費者1)
1 public class TopicConsumer1 { 2 public static void main(String[] args) throws JMSException, IOException { 3 //1.建立鏈接工廠 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 5 //2.獲取鏈接 6 Connection connection = connectionFactory.createConnection(); 7 //3.啓動鏈接 8 connection.start(); 9 //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) 10 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 11 //5.建立主題對象 12 Topic topic = session.createTopic("test-topic"); 13 //6.建立消息消費者 14 MessageConsumer consumer = session.createConsumer(topic); 15 //7.監聽消息 16 consumer.setMessageListener(new MessageListener() { 17 @Override 18 public void onMessage(Message message) { 19 TextMessage textMessage = (TextMessage) message; 20 try { 21 System.out.println("消費1--接收到消息:"+textMessage.getText()); 22 } catch (JMSException e) { 23 e.printStackTrace(); 24 } 25 } 26 }); 27 //8.等待鍵盤輸入 28 System.in.read(); 29 //9.關閉資源 30 consumer.close(); 31 session.close(); 32 connection.close(); 33 } 34 35 }
4.2.3 運行查看測試結果
同時開啓2個以上的消費者,再次運行生產者,觀察每一個消費者控制檯的輸出,會發現每一個消費者會接收到消息。
4.3 總結
發佈訂閱的模式 默認的請狀況下:消息的內容不存在服務器,當生產者發送了一個消息,若是消費者以前沒有訂閱,就沒了。 點對點的方式:默認的請狀況下:將消息存儲在服務器上,消費者隨時來取,可是一旦一個消費者獲取到了消息,這個消息就沒有了。