消息中間件-activeMQ

JMS簡介


  全稱:Java Message Service 中文:Java消息服務。
  JMS是Java的一套API標準,最初的目的是爲了使應用程序可以訪問現有的MOM系統(MOM是Message Oriented Middleware的英文縮寫,指的是利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。);後來被許多現有的MOM供應商採用,並實現爲MOM系統。
  基於JMS實現的MOM,又被稱爲JMS Provider。
 html

 

經常使用的消息中間件有哪些
1ActiveMQ
  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。java

2RabbitMQ
  RabbitMQ是一個在AMQP基礎上完成的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。開發語言爲Erlang。spring

3RocketMQ
  由阿里巴巴定義開發的一套消息隊列應用服務。
 apache

在用activeMQ獲取對象(objectmessage)數據消息時 ,若是顯示以下錯誤(這裏在activeMQ裏取到了消息,可是打印不出來):緩存

Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.liy.pojo.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
    at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:208)
    at com.liy.comsumer.ObjectConsumer.main(ObjectConsumer.java:39)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.liy.pojo.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
    at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
    at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
    at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)
    ... 1 more
 session

說找不到能夠信任的實體類(pojo類)什麼的     app

那麼讓他信任全部包下的類框架

    //設置全部包下的類都是能夠信任的
      ((ActiveMQConnectionFactory) factory).setTrustAllPackages(true);maven

 

 

activeMQ和spring的使用tcp

建立兩個maven的jar工程  一個用來發送消息 ,一個用來接收消息

Activemq-02-spring-producer工程裏步驟

1.引入依賴(另一個工程裏依賴是同樣的 ,   這生產者和消費者徹底能夠寫在一個工程了)

<dependencies>
	<!-- ActiveMQ客戶端完整jar包依賴 -->
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.9.0</version>
	</dependency>
	<!-- ActiveMQ和Spring整合配置文件標籤處理jar包依賴 -->
	<dependency>
		<groupId>org.apache.xbean</groupId>
		<artifactId>xbean-spring</artifactId>
		<version>4.5</version>
	</dependency>
	<!-- Spring-JMS插件相關jar包依賴 -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jms</artifactId>
		<version>4.1.6.RELEASE</version>
	</dependency>
	<!-- Spring框架上下文jar包依賴 -->
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>4.1.6.RELEASE</version>
	</dependency>
</dependencies>

 

2.隨便寫個pojo類,用來傳輸的數據消息(注意必須實現序列化,以及寫上這個序列UID)

public class User implements Serializable{
 
	private static final long serialVersionUID = 1L;
	
	private String name;
	
	private String pwd;

 

3.寫生產類

package com.liy.producer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.liy.pojo.User;

public class Producers {
	
	private JmsTemplate template;
	
	public JmsTemplate getTemplate() {
		return template;
	}

	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}

	/**
	 * 發送消息
	 * @param destinationName 
	 */
	public void send(String destinationName,User user){
		template.send(destinationName, new MessageCreator() {
			
			@Override
			public Message createMessage(Session arg0) throws JMSException {
				 ObjectMessage message = arg0.createObjectMessage(user);
				return message;
			}
		});
		
	}
	
}

 

4.寫spring的配置文件(applicationContext.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd     
        http://www.springframework.org/schema/context     
        http://www.springframework.org/schema/context/spring-context-4.0.xsd  
        http://www.springframework.org/schema/jms  
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd  
        http://activemq.apache.org/schema/core  
        http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">
        
    <!-- 添加掃描 -->
	<!--<context:component-scan base-package="com.liy.*"></context:component-scan>  -->
	
	<!-- ActiveMQ 鏈接工廠 -->
	<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<!-- 需提供訪問路徑tcp://ip:61616;以及用戶名,密碼 -->
	<amq:connectionFactory id="amqConnectionFactory"
		brokerURL="tcp://192.168.72.114:61616" userName="admin" password="admin" />

	<!-- Spring Caching鏈接工廠 -->
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
		<!-- Session緩存數量 -->
		<property name="sessionCacheSize" value="100" />
	</bean>

	<!-- 消息生產者 start -->

	<!-- 定義JmsTemplate對象. 此類型由Spring框架JMS組件提供. 用於訪問ActiveMQ使用. -->
	<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
		<constructor-arg ref="connectionFactory" />
		<!-- 非pub/sub模型(發佈/訂閱),即隊列模式, 默認數據可省略配置 -->
		<!-- <property name="pubSubDomain" value="false" /> -->
	</bean>
	
	<!-- 定義生成者對象 -->
	<bean id="Producer" class="com.liy.producer.Producers">
		<!-- 爲屬性賦值 -->
		<property name="template" ref="jmsQueueTemplate"></property>
	</bean>
	
	 
</beans>

 

5.寫生產工程的測試類

public class Test {
	public static void main(String[] args) {
		ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
		Producers pro = ac.getBean(Producers.class);
	
		User user = new User("liy","666");
		
		pro.send("act-spring", user);
		System.out.println("消息發送完成....");
	}
}

 

以下即爲啓動成功,消息發送完了 , 這裏就算關掉也不要緊

 

 

 

消費者工程步驟

1.依賴和生產者依賴同樣

2.實體類也是同樣的,複製下來便可

3.接下來寫消費者類

package com.liy.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

public class Consumers implements MessageListener{

	@Override
	public void onMessage(Message message) {
		 ObjectMessage obj = (ObjectMessage) message;
		try {
			System.out.println(obj.getObject());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
}

 

4.寫spring的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd     
        http://www.springframework.org/schema/context     
        http://www.springframework.org/schema/context/spring-context-4.0.xsd  
        http://www.springframework.org/schema/jms  
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd  
        http://activemq.apache.org/schema/core  
        http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">
    <!-- 添加掃描 -->
	<!-- <context:component-scan base-package="com.liy.*"></context:component-scan>-->
	
	<!-- ActiveMQ 鏈接工廠 -->
	<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<!-- 需提供訪問路徑tcp://ip:61616;以及用戶名,密碼 -->
	<amq:connectionFactory id="amqConnectionFactory"
		brokerURL="tcp://192.168.72.114:61616" userName="admin" password="admin" />

	<!-- Spring Caching鏈接工廠 -->
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
		<!-- Session緩存數量 -->
		<property name="sessionCacheSize" value="100" />
	</bean>

	 

	<!-- 消息消費者 start -->

	<!-- 定義消息監聽器, 此組件爲spring-jms組件定義. 能夠一次註冊若干消息監聽器.
		屬性解釋:
			destination-type - 目的地類型, queue表明消息隊列
				可選值: queue | topic | durableTopic
				queue - 默認值. 表明消息隊列
				topic - 表明消息隊列集合
				durableTopic - 持久化的消息隊列集合. ActiveMQ會保證消息的消費者必定接收到此消息.
			container-type - 容器類型
				可選值: default | simple
				default - 默認值. 默認容器類型, 對應DefaultMessageListenerContainer
				simple - 簡單容器類型, 對應SimpleMessageListenerContainer
			connection-factory - 連接工廠, 注入的是Spring-JMS組件提供的連接工廠對象.
			acknowledge - 確認方式
				可選值: auto | client | dups-ok | transacted
				auto - 默認值, 即自動確認消息
				client - 客戶端確認消息
				dups-ok - 可以使用副本的客戶端確認消息
				transacted - 有事務的持久化消息確認機制. 需開啓對ActiveMQ的事務控制纔可應用. 
	 -->
	<jms:listener-container destination-type="queue"
		container-type="default" connection-factory="connectionFactory"
		acknowledge="auto">
		<!-- 註冊消息監聽器. 若是須要註冊多個, 重複定義下述標籤. -->
		<jms:listener destination="act-spring" ref="orderReciver" />
	</jms:listener-container>
	
	<!-- 容器管理消息監聽器實現類對象 -->
	<bean id="orderReciver" class="com.liy.consumer.Consumers"/>

	<!-- 消息消費者 end -->
</beans>

 

5.消費工程的測試類(由於spring配置文件裏有設置監聽器 ,只要生產者工程發佈了消息,監聽器就會自動獲取消息, 因此只要把spring容易開啓便可)

public class Test {
	public static void main(String[] args) {
		ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
	 
	}
}

 

如圖

相關文章
相關標籤/搜索