一、JMS是什麼java
「The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous.」。翻譯成中文就是:JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,容許應用程序組件基於JavaEE平臺建立、發送、接收和讀取消息。它使分佈式通訊耦合度更低,消息服務更加可靠以及異步性。spring
二、JMS的消息模式:apache
1)隊列模式:消費者一個一個消費(按次序分配)安全
2)主題模式:每一個訂閱者都能消費任意消息(訂閱後)session
3)隊列模式與主題模式最大的區別是:app
(1)隊列模式:平均接收消息生產者產生的消息負載均衡
(2)主題模式:所有接收生產者產生的全部消息異步
三、JMS編碼接口之間的關係async
對上圖解析:maven
1)又鏈接工廠(ConnectionFactory)建立鏈接(Connection);
2)由鏈接(Connection)建立會話(Session)(鏈接能夠建立多個會話,鏈接可共多個線程同時使用,而會話是單線程的,它只能在單個線程裏面有效。會話可作一些事務上的處理。一個會話是一個線程上下文);
3)由會話(Session)建立生產者(MessageProducer)和消費者(MessageConsumer);
4)會話(Session)建立新的消息(Message);
5)生產者(MessageProducer)將新消息發送到指定目的地,消費者(MessageConsumer)到指定目的地接收新消息。
四、Spring集成jms鏈接ActiveMQ
1)ConnectionFactory用於管理鏈接功工廠(Spring提供的鏈接池,spring提供了SingleConnectionFactory和CachingConnectionFactory)
2)JmsTemplate用於發送和接收消息的模板類
(1)每次發送消息都會從新建立鏈接、會話和productor。
(2)是spring提供的,只須要向spring容器內註冊這個類就可使用JmsTemplate方便的操做Jms。
(3)JmsTemplate類是線程安全的,能夠在整個應用範圍使用。
3)MessageListerner消息監聽器
(1)實現一個onMessage方法,該方法只接收一個Message參數,就能夠處理消息了。
五、ActiveMQ集羣配置
1)對消息中間件集羣緣由:
(1)實現高可用,以排除單點故障引發的服務中斷
(2)實現負載均衡,以提高效率爲更多客戶提供服務
2)ActiveMQ集羣的方式:
(1)客戶端集羣:讓多個消費者消費同一個隊列
(2)Broker cluster:多個Broker之間同步消息
(3)Master Slave:實現高可用
3)ActiveMQ的隊列模式實例:
(1)引用ActiveMQ的maven依賴
<!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
(2)AppProducer.java
import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Connection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 隊列模式實例 - 生產者 */ public class AppProducer { // 默認端口 private static final String url = "tcp://127.0.0.1:61616"; private static final String queueName = "queue-test"; public static void main(String[] args) throws JMSException { // 一、建立鏈接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 二、建立鏈接 Connection connection = connectionFactory.createConnection(); // 三、啓動鏈接 connection.start(); // 四、建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、建立一個目標 Destination destination = session.createQueue(queueName); // 六、建立生產者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 七、建立消息 TextMessage textMessage = session.createTextMessage("test queue" + i); // 八、發佈消息 producer.send(textMessage); } // 九、關閉鏈接 connection.close(); } }
(3)AppConsumer.java
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 隊列模式實例 -消費者 */ public class AppConsumer { // 默認端口 private static final String url = "tcp://127.0.0.1:61616"; private static final String queueName = "queue-test"; public static void main(String[] args) throws JMSException { // 一、建立鏈接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 二、建立鏈接 Connection connection = connectionFactory.createConnection(); // 三、啓動鏈接 connection.start(); // 四、建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、建立一個目標 Destination destination = session.createQueue(queueName); // 六、建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 七、建立一個監聽器(注意消息的監聽是個異步的過程) consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("隊列模式消費消息 : " + textMessage.getText()); } catch (JMSException e) { } } }); // 八、關閉鏈接(注意消息的監聽是個異步的過程,因此在此不能先關閉鏈接) // connection.close(); } }
(4)運行:先運行提供者類,再運行多個消費者類,發現隊列中的消費者中不重複消費掉了全部消息。
4)ActiveMQ的主題模式實例:
(1)一樣,先引用ActiveMQ的maven依賴
<!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
(2)AppProducer.java
import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Connection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 主題模式實例 - 生產者 */ public class AppProducer { // 默認端口 private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "topic-test"; public static void main(String[] args) throws JMSException { // 一、建立鏈接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 二、建立鏈接 Connection connection = connectionFactory.createConnection(); // 三、啓動鏈接 connection.start(); // 四、建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、建立一個目標,(注意是建立主題模式目標session.createTopic();) Destination destination = session.createTopic(topicName); // 六、建立生產者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 七、建立消息 TextMessage textMessage = session.createTextMessage("test queue" + i); // 八、發佈消息 producer.send(textMessage); } // 九、關閉鏈接 connection.close(); } }
(3)AppConsumer.java
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 主題模式實例 - 消費者 */ public class AppConsumer { // 默認端口 private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "topic-test"; public static void main(String[] args) throws JMSException { // 一、建立鏈接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 二、建立鏈接 Connection connection = connectionFactory.createConnection(); // 三、啓動鏈接 connection.start(); // 四、建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、建立一個目標(注意是建立隊列模式目標session.createQueue()) Destination destination = session.createTopic(topicName); // 六、建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 七、建立一個監聽器(注意消息的監聽是個異步的過程) consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("主題模式消費消息 : " + textMessage.getText()); } catch (JMSException e) { } } }); // 八、關閉鏈接(注意消息的監聽是個異步的過程,因此在此不能先關閉鏈接) // connection.close(); } }
(4)運行:先運行提供者類,再運行多個消費者類,發現隊列中的每一個消費者到能夠消費掉全部的消息。