首先介紹 JMShtml
JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持(百度百科給出的概述)。咱們能夠簡單的理解:兩個應用程序之間須要進行通訊,咱們使用一個JMS服務,進行中間的轉發,經過JMS 的使用,咱們能夠解除兩個程序之間的耦合。JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它相似於JDBC(Java Database Connectivity)。java
概念
JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
JMS生產者(Message Producer)
JMS消費者(Message Consumer)
JMS消息
JMS隊列
JMS主題spring
JMS消息一般有兩種類型:點對點(Point-to-Point)、發佈/訂閱(Publish/Subscribe)apache
接着介紹 activeMQ瀏覽器
ActiveMQ 是Apache出品,最流行的. 功能強大的即時通信和集成模式的開源服務器springboot
特色:
1)支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各類跨語言客戶端和協議
2)支持許多高級功能,如消息組,虛擬目標,通配符和複合目標
3) 徹底支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息
4) Spring支持,ActiveMQ能夠輕鬆嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置
5) 支持在流行的J2EE服務器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
6) 使用JDBC和高性能日誌支持很是快速的持久化
...
服務器
下載地址:http://activemq.apache.org/activemq-5153-release.htmlapp
下載解壓,進入 bin 文件夾 ,若是咱們是32位的機器,就雙擊win32目錄下的activemq.bat,若是是64位機器,則雙擊win64目錄下的activemq.bat異步
若是沒有異常,表明啓動成功,常見啓動失敗緣由是端口占用的問題tcp
咱們能夠關閉佔用的端口,或者修改 activeMQ 的端口。打開 conf/activemq.xml,修改被佔用的端口。
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
瀏覽器打開 http://localhost:8161 進入 控制檯,輸入 默認的用戶名和密碼 admin/admin
打開 queue 頁面
Name:隊列名稱。
Number Of Pending Messages:等待消費的消息個數。
Number Of Consumers:當前鏈接的消費者數目
Messages Enqueued:進入隊列的消息總個數,包括出隊列的和待消費的,這個數量只增不減。
Messages Dequeued:已經消費的消息數量
建立 隊列,輸入隊列名稱,點擊 create 按鈕
-----------------------------------------分割線-------------------------------------------------------------------------
如今咱們整合 springboot 和 activeMQ
第一步、引入 pom 依賴
<!-- 整合消息隊列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 若是配置線程池則加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
第二步、配置文件 application.properties 修改
#整合jms測試,安裝在別的機器,防火牆和端口號記得開放 spring.activemq.broker-url=tcp://127.0.0.1:61616 #集羣配置 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) spring.activemq.user=admin spring.activemq.password=admin #下列配置要增長依賴 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100
第三步、啓動類
加入註解
@EnableJms
加入 bean
@Bean ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setPriority(999); return jmsTemplate; } @Bean(value="jmsMessagingTemplate") JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) { JmsMessagingTemplate messagingTemplate = new JmsMessagingTemplate(jmsTemplate); return messagingTemplate; }
第四步、消息發佈者
public interface ProducerService { /** * 功能描述:指定消息隊列,還有消息 * @param destination * @param message */ public void sendMessage(Destination destination, final String message); }
@Service public class ProducerServiceImpl implements ProducerService{ @Autowired private JmsMessagingTemplate jmsTemplate; //用來發送消息到broker的對象 //發送消息,destination是發送到的隊列,message是待發送的消息 @Override public void sendMessage(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } }
第五步、消息消費者
@Component public class OrderConsumer { // 接收 order.queue 隊列的消息 @JmsListener(destination="order.queue") public void receiveQueue(String text){ System.out.println("OrderConsumer收到的報文爲:"+text); } }
第六步、測試類進行測試
@RestController @RequestMapping("/queue") public class QueueController { @Autowired private ProducerService producerService; /** * 點對點消息發送 ,指定 隊列 * @param msg * @return */ @RequestMapping("/order") public Object order(String msg){ Destination destination = new ActiveMQQueue("order.queue"); producerService.sendMessage(destination, msg); return "ok"; } }
打開 activeMQ 控制檯,能夠看到 隊列中的消息數量 發生了 變化
以上是 activeMQ 的點對點消息隊列。點對點並非只A發送的消息只能指定B接收,而是隻A發送的任意一條消息只能由一我的接收處理,也就是每條消息只能被消費一次。
JMS 的另外一種模式 發佈/訂閱模式。A發送的消息能夠被全部監聽A的對象的接收,就比如學校的廣播,全部的學生均可以收聽校園廣播信息。
正常狀況下,activeMQ 只支持一種消息模式,這裏作出配置修改,讓其可以同時支持 兩種 模式
啓動類中 加入 bean
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
@Bean public Topic topic(){ return new ActiveMQTopic("topic.queue"); }
Topic 發佈/訂閱 使用 topic.queue 隊列
新建訂閱者,@JmsListener若是不指定獨立的containerFactory的話是隻能消費queue消息
@Component public class TopicSub { @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("topic.queue 消費者:receive1="+text); } @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("topic.queue 消費者:receive2="+text); } @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic") public void receive3(String text){ System.out.println("topic.queue 消費者:receive3="+text); } }
修改 信息發送者 ProducerServiceImpl.java,增長代碼
@Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic, msg); }
修改測試類 QueueController.java ,增長代碼
/** * 發佈、訂閱消息 * @param msg * @return */ @RequestMapping("/publish") public Object publish(String msg){ producerService.publish(msg); return "ok"; }
啓動項目,訪問 /queue/order ,打印出一條數據
OrderConsumer收到的報文爲:訂單信息
訪問 /queue/publish ,打印出三條數據
topic.queue 消費者:receive3=發佈、訂閱 topic.queue 消費者:receive1=發佈、訂閱 topic.queue 消費者:receive2=發佈、訂閱