消息中間件

消息中間件的定義:

沒有標準定義,通常認爲,採用消息傳送機制/消息隊列 的中間件技術,進行數據交流,用在分佈式系統的集成前端

 

爲何要用消息中間件?

解決分佈式系統之間消息的傳遞。java

電商場景:用戶下單減庫存,調用物流系統,系統擴充後服務化和業務拆分。系統交互,通常用RPC(遠程過程調用)。若是系統擴充到有幾十個接口,消息中間件來解決問題。node

 

和RPC有何區別?

使用區分標準:一、系統之間的依賴程度  二、量(業務量,數據量,訪問量)linux

 

消息中間件有些什麼使用場景?

一、 異步處理正則表達式

用戶註冊(50ms),還需發送郵件(50ms)和手機信息(50ms)spring

串行(150ms)用戶註冊—》發送郵件----》發送手機信息數據庫

並行(100ms):用戶註冊—》發送郵件|----》發送手機信息apache

消息中間件(56ms):用戶註冊(50ms)—》(6ms)消息中間件《-----發送郵件《-----發送手機信息centos

二、 應用的解耦api

訂單系統---》庫存系統(強耦合)

消息中間件:訂單系統---》消息中間件《----庫存系統(解耦)

三、 流量的削峯

用戶請求-----》秒殺應用

應用的前端加入消息隊列

用戶請求-----》消息隊列《----秒殺應用

四、 日誌處理

錯誤日誌---》消息隊列《----日誌處理

用戶行爲日誌--》消息隊列(kafka)《-----日誌的存儲或流式處理

五、純粹的消息通訊

 

常見消息中間件比較

kafka和RabbitMQ的比較

一、  RabbitMq比kafka成熟,在可用性上,穩定性上,可靠性上,RabbitMq超過kafka

二、  Kafka設計的初衷就是處理日誌的,能夠看作是一個日誌系統,針對性很強,因此它並無具有一個成熟MQ應該具有的特性

三、  Kafka的性能(吞吐量、tps)比RabbitMq要強

 

1、JMS與ActiveMQ

什麼是JMS規範

本質是API,Java平臺消息中間件的規範,java應用程序之間進行消息交換。而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。

 

 

JMS對象模型包含以下幾個要素:

1)鏈接工廠:建立一個JMs鏈接

2)JMS鏈接:客戶端和服務器之間的一個鏈接。

3)JMS會話:客戶和服務器會話的狀態,創建在鏈接之上的

4)JMS目的:消息隊列

5)JMS生產者:消息的生成

6)JMS消費者:接收消息

7)Broker:消息中間件的實例(ActiveMq)

 

JMS規範中的點對點模式:

特色:有隊列,生產者的一個消息只發送給一個消費(接受)者(即便有多個接受者監聽隊列),消費者是要向隊列應答成功

 

JMS規範中的主題模式(發佈訂閱):

特色:發佈到Topic的消息會被當前主題全部的訂閱者消費(接受)

 

Request-Response模式

在前面的兩種模式中都是一方負責發送消息而另一方負責處理。而咱們實際中的不少應用至關於一種一應一答的過程,須要雙方都能給對方發送消息。因而請求-應答的這種通訊方式也很重要。它也應用的很廣泛。

請求-應答方式並非JMS規範系統默認提供的一種通訊方式

 

JMS規範中的消息類型

TextMessage

MapMessage

ObjectMessage

BytesMessage

StreamMessage

 

實現代碼

原生客戶端的代碼

/**原生的生產者*/
public class JmsProducer {
    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //發送的消息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer messageProducer;

        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            connection = connectionFactory.createConnection();
            connection.start();
            /* createSession參數取值
            * 一、爲true表示啓用事務
            * 二、消息的確認模式
            * AUTO_ACKNOWLEDGE  自動簽收
            * CLIENT_ACKNOWLEDGE 客戶端自行調用acknowledge方法簽收
            * DUPS_OK_ACKNOWLEDGE 不是必須簽收,消費可能會重複發送
            * 在第二次從新傳送消息的時候,消息頭的JmsDelivered會被置爲true表示當前消息已經傳送過一次,客戶端須要進行消息的重複處理控制。
            * */
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("HelloWAM");
            messageProducer = session.createProducer(destination);
            for(int i=0;i<SENDNUM;i++){
                String msg = "發送消息"+i+" "+System.currentTimeMillis();
                TextMessage message = session.createTextMessage(msg);
                System.out.println("發送消息:"+msg);
                messageProducer.send(message);
            }
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
/*原生的消費者*/
public class JmsConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認鏈接用戶名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認鏈接密碼
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認鏈接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//鏈接工廠
        Connection connection = null;//鏈接
        Session session;//會話 接受或者發送消息的線程
        Destination destination;//消息的目的地
        MessageConsumer messageConsumer;//消息的消費者

        //實例化鏈接工廠
        connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);
        try {
            //經過鏈接工廠獲取鏈接
            connection = connectionFactory.createConnection();
            //啓動鏈接
            connection.start();
            //建立session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個鏈接HelloWorld的消息隊列
            destination = session.createQueue("HelloWAM");
            //建立消息消費者
            messageConsumer = session.createConsumer(destination);
            //讀取消息
            System.out.println("啓動了");
            while(true){
                TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
                if(textMessage != null){
                    System.out.println("Accept msg : "+textMessage.getText());
                }else{
                    break;
                }
            }
            System.out.println("結束了");

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

 

整合spring的代碼

1)Spring的配置文件中要增長命名空間

xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms=http://www.springframework.org/schema/jms

http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"

2)引入

<!-- ActiveMQ 鏈接工廠 -->
    <amq:connectionFactory id="amqConnectionFactory"
             brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />

	<!-- Spring Caching鏈接工廠 -->
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100"></property>
    </bean>

	<!-- Spring JmsTemplate 的消息生產者 start-->
	<!-- 定義JmsTemplate的Queue類型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 隊列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>
    	<!-- 定義JmsTemplate的Topic類型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 發佈訂閱模式-->
        <property name="pubSubDomain" value="true"></property>
    </bean>
    
	<!-- 定義Queue監聽器 -->
	<jms:listener-container destination-type="queue" container-type="default"
							connection-factory="connectionFactory" acknowledge="auto">
		<jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
		<jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
	</jms:listener-container>
	<!-- 定義Topic監聽器 -->
	<jms:listener-container destination-type="topic" container-type="default"
							connection-factory="connectionFactory" acknowledge="auto">
		<jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
		<jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
	</jms:listener-container>
	
	<!--接收消費者應答的監聽器-->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
    </jms:listener-container>

3)發送類-點對點隊列模式(包含應答)

/**
 * 點對點隊列發送模式
 * @author Administrator
 */
@Controller
public class QueueSendController {

	@Autowired
	@Qualifier("jmsQueueTemplate")
	private JmsTemplate jmsTemplate;
	@Autowired
	private GetResponse getResponse;
	
	
	@ResponseBody
	@RequestMapping("tosendQueueMsg")
	public String tosendQueueMsg(ModelMap modelMap,String msg,String queueName){
		queueName="test.queue";
		if (msg!=null&&queueName!=null) {
			jmsTemplate.send(queueName,new MessageCreator() {
				@Override
				public Message createMessage(Session session) throws JMSException {
					Message message = session.createTextMessage(msg);
					
					//配置消費者應答相關內容
					Destination tempDest = session.createTemporaryQueue();
					MessageConsumer responseConsumer = session.createConsumer(tempDest);
					responseConsumer.setMessageListener(getResponse);
					message.setJMSReplyTo(tempDest);
					//消費者應答的id,發送出的消息和應答消息進行匹配
					String uid = System.currentTimeMillis()+"";
					message.setJMSCorrelationID(uid);
					System.out.println("JMSCorrelationID="+uid);
					return message;
				}
			});
			modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.SUCCESS);
		}else {
			modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.PARAMERROR);
		}
		
		return JsonUtil.toJson(modelMap);
	}
	
}

4)應答響應類

@Component
public class GetResponse implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("GetResponse accept response : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

5)接受類-點對點隊列模式

/**
 * 隊列消費者1
 * @author Administrator
 *
 */
@Component
public class QueueReceiver1 implements MessageListener {

	@Autowired
	@Qualifier("jmsQueueTemplate")
	private JmsTemplate jmsTemplate;
	
	@Override
	public void onMessage(Message message) {
		try {
			String textMsg = ((TextMessage)message).getText();
			System.out.println("QueueReceiver1 accept msg : "+textMsg);
			
			//應答
			System.out.println("message.getJMSReplyTo()="+message.getJMSReplyTo());
			System.out.println("message.getJMSCorrelationID()="+message.getJMSCorrelationID());
			jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
	            public Message createMessage(Session session) throws JMSException {
	                Message msg = session.createTextMessage("QueueReceiver1 accept msg"+textMsg);
	                return msg;
	            }
	        });
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
/**
 * 隊列消費者2
 * @author Administrator
 *
 */
@Component
public class QueueReceiver2 implements MessageListener {

	@Override
	public void onMessage(Message message) {
		try {
			String textMsg = ((TextMessage)message).getText();
			System.out.println("QueueReceiver2 accept msg : "+textMsg);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

6)發送類-主題訂閱發送模式

/**
 * 主題訂閱發送模式
 * @author Administrator
 */
@Controller
public class TopicSenderController {
	
	@Autowired
	@Qualifier("jmsTopicTemplate")
	private JmsTemplate jmsTemplate;
	
	@ResponseBody
	@RequestMapping("tosendQTopicMsg")
	public String tosendQTopicMsg(ModelMap modelMap,String msg,String queueName){
		queueName="test.topic";
		if (msg!=null&&queueName!=null) {
			jmsTemplate.send(queueName,new MessageCreator() {
				@Override
				public Message createMessage(Session session) throws JMSException {
					Message message = session.createTextMessage(msg);
					return message;
				}
			});
			modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.SUCCESS);
		}else {
			modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.PARAMERROR);
		}
		return JsonUtil.toJson(modelMap);
	}

}

7)接受類-主題訂閱模式

/**
 * 主題訂閱消費者1
 * @author Administrator
 *
 */
@Component
public class TopicReceiver1  implements MessageListener{

	public void onMessage(Message message) {
		try {
			String textMsg = ((TextMessage)message).getText();
			System.out.println("TopicReceiver1 accept msg : "+textMsg);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
}
/**
 * 主題訂閱消費者2
 * @author Administrator
 *
 */
@Component
public class TopicReceiver2  implements MessageListener{

	public void onMessage(Message message) {
		try {
			String textMsg = ((TextMessage)message).getText();
			System.out.println("TopicReceiver2 accept msg : "+textMsg);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
}

 

2、AMQP與RabbitMQ

Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議是一個標準協議,支持不一樣語言和不一樣的產品

 

生產者

消息的建立者,發送到amqp的消息中間件

 

消費者

鏈接到amqp的消息中間件,訂閱到隊列上,進行消息的消費。分爲持續訂閱(basicConsumer)和單條訂閱(basicGet)

 

消息

包括有效載荷和標籤。有效載荷就是要傳輸的數據。標籤描述有效載荷的屬性,rabbitmq用標籤來決定誰得到當前消息。消費者只能拿到有效載荷。

 

信道

虛擬的鏈接,創建在真實的tcp鏈接之上的。信道的建立沒有限制的。

交換器、隊列、綁定、路由鍵

隊列經過路由鍵(routing key,某種肯定的規則)綁定到交換器,生產者把消息發送到了交換器,交換器根據綁定的路由鍵將消息路由到特定的隊列,訂閱了隊列的消費者進行接收。

若是消息達到無人訂閱的隊列會怎麼辦

 消息會一直在隊列中等待,rabbitmq會默認隊列是無限長度的。

多個消費者訂閱到同一隊列怎麼辦? 

消息會輪詢的方式發送給消費者,每一個消息只會發送給一個消費者

消息路由到了不存在的隊列怎麼辦?

會忽略,當消息不存在,消息丟失了。

 

消息的確認機制

消費者收到的每一條消息都必須進行確認。

(分爲自動確認和消費者自行確認)

消費者在聲明隊列時,指定autoAck參數,true自動確認,false時rabbitmq會等到消費者顯示的發回一個ack信號纔會刪除消息。

autoAck=false,有足夠時間讓消費者處理消息,直到消費者顯示調用basicAck爲止。Rabbitmq中消息分爲了兩部分:一、等待投遞的消息;二、已經投遞,可是尚未收到ack信號的。若是消費者斷連了,服務器會把消息從新入隊,投遞給下一個消費者。

未ack的消息是沒有超時時間的,

 

如何明確拒絕消息?

 一、消費者斷連,二、消費者使用reject命令(requeue=true,從新分發消息,false移除消息),三、nack命令(RabbitMQ纔有的批量的拒絕)

 

建立隊列

(生產/消費)declareQueue。消費者訂閱了隊列,不能再聲明隊列了。

相關參數(exclusive 隊列爲應用程序私有,auto-delete 最後一個消費者取消訂閱時,隊列會自動刪除,durable 隊列持久化)

 

檢測隊列是否存在

 Declare 時的passive參數

 

交換器

direct,fanout,topic,headers

direct: 路由鍵徹底匹配時,消息投放到對應隊列。Amqp實現都必須有一個direct交換器(默認交換器),名稱爲空白字符。隊列不聲明交換器,會自動綁定到默認交換器,隊列的名稱做爲路由鍵。

Fanout:能夠理解爲廣播

Topic:主題,使來自不一樣源頭的消息到達同一個隊列

Headers: 匹配消息頭,其他與direct同樣,實用性不大

日誌處理場景:

一、 有交換器(topic)log_exchange,日誌級別有 error,info,warning,應用模塊有 user,order,email,路由鍵的規則是 日誌級別+「.」+應用模塊名(例如info.user)

二、 發送郵件失敗,報告一個email的error,basicPublic(message,’log-exchange’,’error.email’)

隊列的綁定:queueBind(「email-error-queue」,’log-exchange’,’error.email’)

要監聽email全部的日誌怎麼辦?

queueBind(「email-log-queue」,’log-exchange’,’*.email’)

監聽全部模塊全部級別日誌?

queuebind(「all-log-queue」,’log-exchange’,’#’)

「.」會把路由鍵分爲好幾個標識符,「*」匹配一個標識符,「#」匹配一個或者多個(xxx.yyy.zzzz 能夠: xxx.*. zzzz , xxx.# , #.zzzz)。

虛擬主機

Vhost,真實rabbitmq服務器上的mini型虛擬的mq服務器。有本身的權限機制。Vhost提供了一個邏輯上的分離,能夠區分客戶端,避免隊列和交換器的名稱衝突。RabbitMq包含了一個缺省的vhost :「/」,用戶名guest,口令 guest(guest用戶只能在本機訪問)。

消息持久化

一、 隊列是必須持久化

二、 交換器也必須是持久化

三、 消息的投遞模式必須(int型) 2

以上條件所有知足,消息才能持久化

問題:性能(降低10倍)

AMQP和JMS區別:

      

JMS

AMQP

定義

Java api

協議

Model

P2P

Pub/Sub

Direct

Fanout

Topic

headers

支持消息類型

5種

Byte[]

自行消息序列化,Json化

綜合評價

Java系統,模型知足要求,跨平臺較差

協議,自然跨平臺,跨語言

 

RabbitMq在Windows下安裝和運行

一、下載Erlang: http://www.erlang.org/downloads/19.2

二、下載Windows版RabbitMq:

http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6.exe

三、安裝並配置環境變量:

ERLANG_HOME    C:\Program Files\erl8.2

path下添加   %ERLANG_HOME%\bin

RABBITMQ_BASE  C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6      

path下添加  %RABBITMQ_BASE%\sbin;%RABBITMQ_BASE%\ebin

四、在開始菜單中啓動服務

五、在安裝目錄的sbin下運行rabbitmqctl.bat status

 

客戶端Jar包和源碼包下載地址:

http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar

還須要slf4j-api-1.6.1.jar

若是是Maven工程加入:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>

注意:5系列的版本最好使用JDK8及以上, 低於JDK8可使用4.x(具體的版本號到Maven的中央倉庫查)的版本

實踐代碼

/**
 * direct交換器生產者
 * @author Administrator
 *
 */
public class DirectProducer {
	
	/**交換器名稱*/
	private final static String EXCHANGE_NAME = "direct_logs";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**鏈接到本機能夠省略 guest用戶只能在本機上使用
		        factory.setUsername(..);
		        factory.setPort();
		        factory.setVirtualHost();*/
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		        /**路由鍵數組*/
		        String[] routingKeys = {"error","info","warning"};
		        for(int i=0;i<3;i++){
		            String routingKey = routingKeys[i];
		            String message = "Hello world_"+(i+1);
			        /**發送消息                   (交換器 ,路由鍵 ,BasicProperties=null,  內容)*/
		            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
		            System.out.println("Sent "+routingKey+":"+message);
		        }
		        /**關閉信道和鏈接*/
		        channel.close();
		        connection.close();
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}

 

/**
 * fanout交換器生產者
 * @author Administrator
 *
 */
public class FanoutProducer {
	
	/**交換器名稱*/
	private final static String EXCHANGE_NAME = "fanout_logs";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明fanout交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
		        /**路由鍵數組*/
		        String[] routingKeys = {"error","info","warning"};
		        for(int i=0;i<3;i++){
		            String routingKey = routingKeys[i];
		            String message = "Hello world_"+(i+1);
			        /**發送消息                   (交換器 ,路由鍵 ,BasicProperties=null,  內容)*/
		            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
		            System.out.println("Sent "+routingKey+":"+message);
		        }
		        /**關閉信道和鏈接*/
		        channel.close();
		        connection.close();
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}
/***
 * 消費者  接受全部消息
 * @author Administrator
 *
 */
public class ConsumerAll {
	
	/**交換器名稱*/
	//private final static String EXCHANGE_NAME = "direct_logs";
	private final static String EXCHANGE_NAME = "fanout_logs";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		        /**聲明fanout交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
		        /**聲明隨機隊列*/
		        String queueName = channel.queueDeclare().getQueue();
		        /**路由鍵數組*/
		        String[] routingKeys = {"error","info","warning"};
		        /**一個隊列綁定多個路由鍵*/
		        for(String routingKey:routingKeys){
		            /**隊列綁定交換器和路由鍵*/
		            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
		        }
		        System.out.println("Waiting message.......");
		        
		        /**建立Consumer消息監聽器*/
		        Consumer callbackConsumer=new DefaultConsumer(channel){
		        	@Override
		        	public void handleDelivery(String consumerTag,Envelope envelope,
		        			BasicProperties properties,byte[] body) throws IOException {
		        		String message=new String(body,"UTF-8");
		        		System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
		        	}
		        };
		        /**信道啓用監聽器	隊列名稱 ,是否自動確認,callback監聽器 */
		        channel.basicConsume(queueName,true,callbackConsumer);
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
	   }
}
/***
 * 消費者  接受erroe消息
 * @author Administrator
 *
 */
public class ConsumerError {
	
	/**交換器名稱*/
	//private final static String EXCHANGE_NAME = "direct_logs";
	private final static String EXCHANGE_NAME = "fanout_logs";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		        /**聲明fanout交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
		        /**聲明隨機隊列*/
		        String queueName = channel.queueDeclare().getQueue();
		        /**路由鍵*/
		        String routingKey = "error";
		        /**隊列綁定交換器和路由鍵*/
	            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
		        System.out.println("Waiting message.......");
		        
		        /**建立Consumer消息監聽器*/
		        Consumer callbackConsumer=new DefaultConsumer(channel){
		        	@Override
		        	public void handleDelivery(String consumerTag,Envelope envelope,
		        			BasicProperties properties,byte[] body) throws IOException {
		        		String message=new String(body,"UTF-8");
		        		System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
		        	}
		        };
		        /**信道啓用監聽器	隊列名稱 ,是否自動確認,callback監聽器 */
		        channel.basicConsume(queueName,true,callbackConsumer);
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}

 

實踐代碼 消費者確認

/**
 * 消費者確認模式
 * @author Administrator
 *
 */
public class ConsumerConfirmProducer {
	
	/**交換器名稱*/
	private final static String EXCHANGE_NAME = "direct_confirm";
	/**路由鍵*/
	private final static String RoutingKey="error";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/ 
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		        for(int i=0;i<10;i++){
		            String message = "Hello world_"+(i+1);
			        /**發送消息*/
		            channel.basicPublish(EXCHANGE_NAME,RoutingKey,null,message.getBytes());
		            System.out.println("Sent "+RoutingKey+":"+message);
		        }
		        /**關閉信道和鏈接*/
		        channel.close();
		        connection.close();
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}
/***
 * 消費者本身響應
 * @author Administrator
 *
 */
public class ClientConsumerAck {
	
	/**交換器名稱*/
	private final static String EXCHANGE_NAME = "direct_confirm";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		        /**隊列名稱*/
		        String queueName = "consumer_confirm";
		        /**聲明隊列				名稱,持久化,私有化,自動刪除,arguments	*/
		        channel.queueDeclare(queueName, false, false, false, null);
		        /**路由鍵*/
		        String routingKey = "error";
		        /**隊列綁定交換器和路由鍵*/
	            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
		        System.out.println("Waiting message.......");
		        
		        /**建立Consumer消息監聽器*/
		        Consumer callbackConsumer=new DefaultConsumer(channel){
		        	@Override
		        	public void handleDelivery(String consumerTag,Envelope envelope,
		        			BasicProperties properties,byte[] body) throws IOException {
		        		String message=new String(body,"UTF-8");
		        		System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
		        		/**手動消息確認				deliveryTag標誌符,	multiple 是否批量回復*/
		        		this.getChannel().basicAck(envelope.getDeliveryTag(), false);
		        	}
		        };
		        /**信道啓用監聽器	隊列名稱 ,是否自動確認 false,callback監聽器 */
		        channel.basicConsume(queueName,false,callbackConsumer);
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}
/**
 * 消費者響應緩慢
 * @author Administrator
 *
 */
public class ClientConsumerSlowAck {
	
	/**交換器名稱*/
	private final static String EXCHANGE_NAME = "direct_confirm";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		        /**隊列名稱*/
		        String queueName = "consumer_confirm";
		        /**聲明隊列				名稱,持久化,私有化,自動刪除,arguments	*/
		        channel.queueDeclare(queueName, false, false, false, null);
		        /**路由鍵*/
		        String routingKey = "error";
		        /**隊列綁定交換器和路由鍵*/
	            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
		        System.out.println("Waiting message.......");
		        
		        /**建立Consumer消息監聽器*/
		        Consumer callbackConsumer=new DefaultConsumer(channel){
		        	@Override
		        	public void handleDelivery(String consumerTag,Envelope envelope,
		        			BasicProperties properties,byte[] body) throws IOException {
		        		/**接收前休眠25秒*/
		        		try {
							Thread.sleep(25000);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
		        		String message=new String(body,"UTF-8");
		        		System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
		        		/**手動消息確認註釋				deliveryTag標誌符,	multiple 是否批量回復*/
		        		//this.getChannel().basicAck(envelope.getDeliveryTag(), false);
		        	}
		        };
		        /**信道啓用監聽器	隊列名稱 ,是否自動確認 false,callback監聽器 */
		        channel.basicConsume(queueName,false,callbackConsumer);
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}
/**
 * 消費者拒絕
 * @author Administrator
 *
 */
public class ClientConsumerReject {
	/**交換器名稱*/
	private final static String EXCHANGE_NAME = "direct_confirm";
	
	public static void main(String[] args) throws IOException{
			try {
				ConnectionFactory factory = new ConnectionFactory();
		        factory.setHost("127.0.0.1");
				/**新建鏈接*/
				Connection connection = factory.newConnection();
				/**建立信道*/
				Channel channel = connection.createChannel();
				/**聲明direct交換器*/
		        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
		        /**隊列名稱*/
		        String queueName = "consumer_confirm";
		        /**聲明隊列				名稱,持久化,私有化,自動刪除,arguments	*/
		        channel.queueDeclare(queueName, false, false, false, null);
		        /**路由鍵*/
		        String routingKey = "error";
		        /**隊列綁定交換器和路由鍵*/
	            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
		        System.out.println("Waiting message.......");
		        
		        /**建立Consumer消息監聽器*/
		        Consumer callbackConsumer=new DefaultConsumer(channel){
		        	@Override
		        	public void handleDelivery(String consumerTag,Envelope envelope,
		        			BasicProperties properties,byte[] body) throws IOException {
		        		/**手動消息確認				deliveryTag標誌符,	requeue 是否重發到隊列*/
		        		this.getChannel().basicReject(envelope.getDeliveryTag(),true);
		        		System.out.println("Reject:"+envelope.getRoutingKey()+":"+new String(body,"UTF-8"));
		        	}
		        };
		        /**信道啓用監聽器	隊列名稱 ,是否自動確認 false,callback監聽器 */
		        channel.basicConsume(queueName,false,callbackConsumer);
			} catch (TimeoutException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

	   }
}

同步和異步、監聽器的使用

/***
 * 發送方確認  同步模式  生產者
 * @author Administrator
 *
 */
public class ProducerConfirm {
    private final static String EXCHANGE_NAME = "producer_confirm";
    private final static String ROUTE_KEY = "error";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        /**	
         * 建立鏈接鏈接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        /**設置MabbitMQ所在主機ip或者主機名*/
        factory.setHost("127.0.0.1");
        /**建立一個鏈接*/
        Connection connection = factory.newConnection();
       /**建立一個信道*/
        Channel channel = connection.createChannel();
        /**將信道設置爲發送方確認*/
        channel.confirmSelect();
        for(int i=0;i<2;i++){
            String msg = "Hello "+(i+1);
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
            /**阻塞    等待確認*/
            if (channel.waitForConfirms()){
                System.out.println(ROUTE_KEY+":"+msg);
            }
        }
        /**關閉頻道和鏈接*/
        channel.close();
        connection.close();
    }

}
/***
 * 發送方確認  異步模式  生產者
 */
public class ProducerConfirmAsync {

    private final static String EXCHANGE_NAME = "producer_confirm";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        /**
         * 建立鏈接鏈接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        /** 設置MabbitMQ所在主機ip或者主機名*/
        factory.setHost("127.0.0.1");
        /** 建立一個鏈接*/
        Connection connection = factory.newConnection();
        
        /**建立一個信道*/
        Channel channel = connection.createChannel();
        /** 監聽器*/
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /**將信道設置爲發送方確認*/
        channel.confirmSelect();

        /**鏈接被關閉時觸發
        connection.addShutdownListener();*/
        /**信道被關閉時觸發
        channel.addShutdownListener();*/
        /**確認監聽(消息成功投遞時觸發)      deliveryTag表明了(channel)惟一的投遞  multiple:false*/
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }
            /**內部錯誤時*/
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }
        });
        /** 返回監聽(消息投遞失敗時觸發)*/
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("msg:"+msg);
            }
        });
        /**標誌位 mandatory參數
     	爲true,投遞消息時沒法找到一個合適的隊列   消息返回給生產者
    	爲false 丟棄消息(缺省) */
        boolean mandatory=true; 
        String[] severities={"error","info","warning"};
        for(int i=0;i<3;i++){
            String severity = severities[i%3];
            // 發送的消息
            String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
            channel.basicPublish(EXCHANGE_NAME, severity, mandatory,null, message.getBytes());
            //channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
            System.out.println("----------------------------------------------------");
            System.out.println(" Sent Message: [" + severity +"]:'"+ message + "'");
            Thread.sleep(200);
        }

        // 關閉頻道和鏈接
        channel.close();
        connection.close();
    }


}
/***
 * 發送方確認  消費者
 * @author Administrator
 *
 */
public class ProducerConfirmConsumer {
	
	   private static final String EXCHANGE_NAME = "producer_confirm";

	    public static void main(String[] argv) throws IOException, TimeoutException {
	        ConnectionFactory factory = new ConnectionFactory();
	        factory.setHost("127.0.0.1");
	        /**打開鏈接和建立頻道,與發送端同樣*/
	        Connection connection = factory.newConnection();
	        final Channel channel = connection.createChannel();

	        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

	        String queueName = "producer_confirm";
	        channel.queueDeclare(queueName,false,false,
	                false,null);
	        /**只關注error級別的日誌*/
	        String severity="error";
	        channel.queueBind(queueName, EXCHANGE_NAME, severity);

	        System.out.println(" [*] Waiting for messages......");

	        /**建立隊列消費者*/
	        final Consumer consumerB = new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                String message = new String(body, "UTF-8");
	                System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
	            }
	        };
	        channel.basicConsume(queueName, true, consumerB);
	    }
}

 

與Spring集成

1)Spring的配置文件中要增長命名空間

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd

2)其餘配置

<!-- rabbitMQ配置 -->
	<bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="127.0.0.1"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    
    <!--Spring的rabbitmq admin-->
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
    <!-- 生產者配置 -->
    <!--生產者建立隊列  不使用的隊列   持久化durable=false -->
    <rabbit:queue name="p_create_queue" durable="false"/>
    
    <!--fanout交換器-->
    <rabbit:fanout-exchange name="fanout-exchange"
        xmlns="http://www.springframework.org/schema/rabbit" durable="false">
        <!-- 綁定隊列 -->
        <rabbit:bindings>
            <rabbit:binding queue="p_create_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    
    <!--topic交換器-->
    <rabbit:topic-exchange name="topic-exchange"
    	 xmlns="http://www.springframework.org/schema/rabbit" durable="false">
    </rabbit:topic-exchange>
    
	<!-- rabbitTemplate 消息模板類 -->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
    </bean>
    
    
    <!-- 消費者配置 -->
    <!-- fanout交換器 begin-->
    <!-- 消費者定義隊列 -->
	<rabbit:queue name="h1_queue" durable="false"/>
	<rabbit:queue name="h2_queue" durable="false"/>
	<rabbit:queue name="h3_queue" durable="false"/>
	<!-- 把須要數據的隊列與交換器綁定一塊兒 -->
	<rabbit:fanout-exchange name="fanout-exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="false">
		<rabbit:bindings>
			<rabbit:binding queue="h1_queue"></rabbit:binding>
			<rabbit:binding queue="h2_queue"></rabbit:binding>
			<rabbit:binding queue="h3_queue"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:fanout-exchange>
    <!-- fanout交換器 end-->
    
    <!-- topic交換器 begin-->
    <!-- 定義隊列 -->
    <rabbit:queue name="all_log_queue" durable="false"/>
    <rabbit:queue name="email_all_queue" durable="false"/>
    <rabbit:queue name="email_error_queue" durable="false"/>
    <rabbit:queue name="all_error_queue" durable="false"/>
    <!--隊列經過路由鍵與交換器綁定一塊兒 -->
    <rabbit:topic-exchange name="topic-exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="all_log_queue" pattern="#"></rabbit:binding>
            <rabbit:binding queue="email_all_queue" pattern="*.email"></rabbit:binding>
            <rabbit:binding queue="email_error_queue"  pattern="error.email"></rabbit:binding>
            <rabbit:binding queue="all_error_queue"  pattern="error.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- topic交換器 end-->
    
    <!--fanout消費者定義-->
    <bean id="h1_Consumer" class="com.xjt.astudy.rabbitmq.spring.H1_Service"></bean>
    <bean id="h2_Consumer" class="com.xjt.astudy.rabbitmq.spring.H2_Service"></bean>
    <bean id="h3_Consumer" class="com.xjt.astudy.rabbitmq.spring.H3_Service"></bean>

    <!--監聽容器    消費者監聽隊列-->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory">
        <rabbit:listener ref="h1_Consumer" queues="h1_queue" method="onMessage" />
        <rabbit:listener ref="h2_Consumer" queues="h2_queue" method="onMessage" />
        <rabbit:listener ref="h3_Consumer" queues="h3_queue" method="onMessage" />
        <rabbit:listener ref="allLogTopicService" queues="all_log_queue" method="onMessage" />
        <rabbit:listener ref="emailAllTopicService" queues="email_all_queue" method="onMessage" />
        <rabbit:listener ref="emailErrorTopicService" queues="email_error_queue" method="onMessage" />
        <rabbit:listener ref="allErrorTopicService" queues="all_error_queue" method="onMessage" />
    </rabbit:listener-container>

3)fanout消費者

/**
 * fanout消費者 H1
 */
public class H1_Service implements MessageListener{

    public void onMessage(Message message) {
    	System.out.println("H1_Service Get message:"+new String(message.getBody()));
    }
}
/**
 * fanout消費者 H2
 */
public class H2_Service implements MessageListener{
    public void onMessage(Message message) {
    	System.out.println("H2_Service Get message:"+new String(message.getBody()));
    }
}
/**
 * fanout消費者 H3
 */
public class H3_Service implements MessageListener{

    public void onMessage(Message message) {
    	System.out.println("H3_Service Get message:"+new String(message.getBody()));
    }
}

4)Topic消費者

/**
 * Topic消費者 AllError
 */
@Component
public class AllErrorTopicService implements MessageListener{
	
    public void onMessage(Message message) {
    	System.out.println("AllErrorTopicService Get message:"+new String(message.getBody()));
    }
}
/**
 * Topic消費者 AllLog
 */
@Component
public class AllLogTopicService implements MessageListener{
	
    public void onMessage(Message message) {
    	System.out.println("AllLogTopicService Get message:"+new String(message.getBody()));
    }
}
/**
 * Topic消費者 EmailAll
 */
@Component
public class EmailAllTopicService implements MessageListener{
	
    public void onMessage(Message message) {
    	System.out.println("EmailAllTopicService Get message:"+new String(message.getBody()));
    }
}
/**
 * Topic消費者 EmailError
 */
@Component
public class EmailErrorTopicService implements MessageListener{

    public void onMessage(Message message) {
    	System.out.println("EmailErrorTopicService Get message:"+new String(message.getBody()));
    }
}

5)生產者  消費發送控制層

/**
 * RabbitMqController
 */
@Controller
public class RabbitMqController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 發送信息到fanout交換器上
     * @param message
     * @return
     */
    @ResponseBody
    @RequestMapping("fanoutSender")
    public String fanoutSender(ModelMap modelMap,String message){
        try {
            for(int i=0;i<3;i++){
                String str = "Fanout,the message_"+i+" is : "+message;
                System.out.println("**************************Send Message:["+str+"]");
                /**模版發送消息                  交換器 ,路由鍵,消息*/
                rabbitTemplate.send("fanout-exchange","",new Message(str.getBytes(),new MessageProperties()));

            }
            modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.SUCCESS);
        } catch (Exception e) {
            e.printStackTrace();
            modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.PARAMERROR);
        }
        return JsonUtil.toJson(modelMap);
    }

    /**
     * 發送信息到topic交換器上
     * @param message
     * @return
     */
    @ResponseBody
    @RequestMapping("topicSender")
    public String topicSender(ModelMap modelMap,String message){
        try {
            String[] routeKeys={"error","info","warning"};
            String[] modules={"email","order","user"};
            for(int i=0;i<routeKeys.length;i++){
                for(int j=0;j<modules.length;j++){
                    String routeKey = routeKeys[i]+"."+modules[j];
                    String str = "the message is [rk:"+routeKey+"]["+message+"]";
                    /**模版發送消息                  交換器 ,路由鍵,消息*/
                    rabbitTemplate.send("topic-exchange",routeKey,new Message(str.getBytes(),new MessageProperties()));
                }
            }
            modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.SUCCESS);
        } catch (Exception e) {
        	e.printStackTrace();
            modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.PARAMERROR);
        }
        return  JsonUtil.toJson(modelMap);
    }

}

 

異步處理

場景:

用戶註冊,寫入數據庫成功之後,發送郵件和短信。

串行模式  ************spend time : 251ms

並行模式  ************spend time : 153ms

消息隊列模式:************spend time : 66ms

 

應用解耦

場景:

用戶下訂單買商品,下單成功了,扣庫存。有庫存直接扣,沒有庫存或低於某個閥值,看下單成功,但要通知其餘系統(採購系統)進行調貨。

RPC實現:庫存系統失敗,訂單系統也沒法成功,訂單系統和庫存系統耦合了。

使用消息中間件進行解耦:寫有一個扣減消息,保證消息的必須處理。

三個問題須要解決:

1. 訂單發送給MQ的消息必需要被MQ服務器接收到。(事務、發送者確認)

2. MQ服務器拿到了消息後,消息被正常處理之前必須保持住。(持久化)

3. 某個庫存服出了異常,消息要可以被其餘的庫存服務處理。(消費者確認,消息監聽類要實現ChannelAwareMessageLListener)

若是訂單系統必定要知道庫存系統是否成功怎麼辦?庫存系統和訂單系統之間建一個消息通道,庫存系統去通知訂單系統。

<!-- rabbitMQ鏈接工廠配置 -->
	<bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="127.0.0.1"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
        <!--發佈者確認開啓 -->
        <property name="publisherConfirms" value="true"></property>
    </bean>
    
    <!--Spring的rabbitmq admin-->
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <!-- 建立rabbitTemplate 消息模板類 -->
 	<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
		<constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
		<!-- 消息確認回調 -->
		<property name="confirmCallback" ref="confirmCallback"/>
		<property name="returnCallback" ref="returnCallback"/>
		<!-- mandatory必須設置true,return callback才生效 -->  
		<property name="mandatory" value="false"/>
	</bean>

	<!-- 隊列持久化 durable=ture -->
	<rabbit:queue name="depot_queue" durable="true"/>
	<!-- 交換器持久化 -->
	<rabbit:direct-exchange name="depot-amount-exchange"
          xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<rabbit:bindings>
			<rabbit:binding queue="depot_queue" key="amount.depot" ></rabbit:binding>
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
		<!-- 消費監聽  手動確認 acknowledge="manual"-->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual">
    	<!--                      監聽隊列                  監聽實現類                 onMessage方法 -->
        <rabbit:listener queues="depot_queue" ref="processDepot" method="onMessage" />
    </rabbit:listener-container>
/**
 * goods類
 */
public class GoodTransferVo  implements Serializable {

    private String goodsId;
    private int changeAmount;
    private boolean inOrOut;

    public String getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(String goodsId) {
        this.goodsId = goodsId;
    }

    public int getChangeAmount() {
        return changeAmount;
    }

    public void setChangeAmount(int changeAmount) {
        this.changeAmount = changeAmount;
    }

    public boolean isInOrOut() {
        return inOrOut;
    }

    public void setInOrOut(boolean inOrOut) {
        this.inOrOut = inOrOut;
    }
}

 

/**
 * 消息確認回調
 * @author Administrator
 *
 */
@Service
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {

	 @Override
	 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
	        if (ack) {
	            System.out.println("消息確認發送給mq成功");
	        } else {
	            //處理失敗的消息
	        	System.out.println("消息發送給mq失敗,考慮重發:"+cause);
	        }
	    }

}
/**
 * 消息返回回調
 */
@Service
public class ReturnCallback  implements RabbitTemplate.ReturnCallback  {

	@Override
    public void returnedMessage(Message message, int replyCode,String replyText, String exchange,
            String routingKey) {
		System.out.println("Returned replyText:"+replyText);
		System.out.println("Returned exchange:"+exchange);
		System.out.println("Returned routingKey:"+routingKey);
		String msgJson  = new String(message.getBody());
		System.out.println("Returned Message:"+msgJson);
		/**也須要考慮重發機制*/
	}

}
/**
 * 消息生產者
 * @author Administrator
 *
 */
@Controller
public class MqMode {
	
	/**路由鍵*/
    private final static String DEPOT_RK = "amount.depot";
    /**交換器*/
    private final static String DEPOT_EXCHANGE = "depot-amount-exchange";
	
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @ResponseBody
    @RequestMapping("processDepot")
    public String processDepot(ModelMap modelMap,String goodsId, Integer amount) {
    	if (goodsId!=null&&amount!=null) {
    		GoodTransferVo goodTransferVo = new GoodTransferVo();
            goodTransferVo.setGoodsId(goodsId);
            goodTransferVo.setChangeAmount(amount);
            goodTransferVo.setInOrOut(false);
            String goods = JsonUtil.toJson(goodTransferVo);
            MessageProperties messageProperties = new MessageProperties();
            /**消息持久化*/
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            /**發送持久化的消息*/
            rabbitTemplate.send(DEPOT_EXCHANGE, DEPOT_RK,new Message(goods.getBytes(), messageProperties));
            modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.SUCCESS);
		}else {
			modelMap.put(CommonInfoUtil.JSONMSG, CommonInfoUtil.PARAMERROR);
		}
        return JsonUtil.toJson(modelMap);
    }
}
/**
 * 消費者
 */
@Service
public class ProcessDepot  implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String msg = new String(message.getBody());
            System.out.println(">>>>>>>>>>>>>>接收到消息:"+msg);
            GoodTransferVo goodTransferVo = JsonUtil.fromJson(msg,GoodTransferVo.class);
            try {
            	/**處理業務*/
            	processGoods(goodTransferVo);
                /**確認返回ack*/
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                System.out.println(">>>>>>>>>>>>>>庫存處理完成,應答Mq服務");
            } catch (Exception e) {
            	/**拒絕消息*/
            	System.err.println(e.getMessage());
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
                System.err.println(">>>>>>>>>>>>>>庫存處理失敗,拒絕消息,要求Mq從新派發");
                throw e;
            }

        } catch (Exception e) {
        	System.err.println(e.getMessage());
        }
    }
    
    //處理業務
    private void processGoods(GoodTransferVo goodTransferVo){
    	if(goodTransferVo.isInOrOut()){
            inDepot(goodTransferVo.getGoodsId(),goodTransferVo.getChangeAmount());
        }else{
            outDepot(goodTransferVo.getGoodsId(),goodTransferVo.getChangeAmount());
        }
    }
    
    //增長庫存
    public void inDepot(String goodsId,int addAmout){
    	 System.out.println("增長庫存");
    }
    
    //減小庫存
    public void outDepot(String goodsId,int reduceAmout){
    	 System.out.println("減小庫存");
    }
}

 

SpringBoot整合RabbitMQ

 

在Linux下(以CentOS7爲例)安裝RabbitMQ

一、wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

二、rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

三、yum install epel-release

四、yum install erlang

五、wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm

六、yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

安裝完成後 一、service rabbitmq-server start 二、service rabbitmq-server 

Rabbitmq經常使用端口

一、client端通訊端口: 5672

二、管理端口 : 15672

三、server間內部通訊端口: 25672

如端口出現不能訪問,使用形如如下命令開啓: firewall-cmd --add-port=15672/tcp --permanent

運行rabbitmqctl status出現Error: unable to connect to node rabbit@controller: nodedown之類問題考慮以下幾種解決辦法:

一、重啓服務 service rabbitmq-server stop service rabbitmq-server start

二、檢查/var/lib/rabbitmq中是否存在.erlang.cookie,沒有則新建一個,裏面隨便輸入一段字符串

三、從新安裝服務

四、百度或者Google一下

日誌通常放在:
/var/log/rabbitmq/rabbit@centosvm.log
/var/log/rabbitmq/rabbit@centosvm-sasl.log

管理虛擬主機 
rabbitmqctl add_vhost [vhost_name] 
rabbitmqctl list_vhosts 

啓動和關閉rabbitmq
rabbitmq-server會啓動Erlang節點和Rabbitmq應用
rabbitmqctl stop會關閉Erlang節點和Rabbitmq應用
rabbitmqctl stop_app關閉Rabbitmq應用
rabbitmqctl start_app啓動Rabbitmq應用

Rabbitmq配置文件放在 /etc/rabbitmq 下,名爲rabbitmq.config,沒有且須要使用則能夠本身新建一個。

用戶管理
rabbitmqctl add_user [username] [pwd]
rabbitmqctl delete_user [username]

用戶權限控制
rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

查看隊列
rabbitmqctl list_queues

查看交換器
rabbitmqctl list_exchanges

查看綁定
rabbitmqctl list_bindings

RabbitMq的Web控制檯

運行rabbitmq-plugins enable rabbitmq_management

重啓RabbitMq 在瀏覽中打開http://localhost:15672

 

RabbitMq集羣

內建集羣的設計目標: 

客戶端在節點崩潰的狀況下能夠運行,線性擴展來擴充消息的吞吐量

能夠保證消息的萬無一失嗎? 

當一個節點崩潰了之後,節點全部隊列上的消息都會丟失。默認不會將隊列的消息在集羣中複製。(冗餘)

集羣中的隊列

在集羣中不會複製,其餘節點只會保存隊列所處的節點和元數據,消息的傳遞給隊列的全部者節點。

集羣中的交換器

會進行復制。本質就是一個相似於hashmap的映射表。

集羣中的節點

兩種:內存節點,磁盤節點。單機狀況下,必定是個磁盤節點。集羣裏面,要求每一個集羣必須有至少以一個磁盤節點,出於高可用考慮,建議配兩個。

 

本機集羣(不建議安裝,有條件應在多個服務器上安裝):

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 

RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit_1 rabbitmq-server -detached 

RABBITMQ_NODE_PORT=5674 RABBITMQ_NODENAME=rabbit_2 rabbitmq-server -detached 

rabbitmqctl -n rabbit_1@centosvm stop_app

rabbitmqctl -n rabbit_1@centosvm reset

rabbitmqctl -n rabbit_1@centosvm join_cluster rabbit@centosvm

rabbitmqctl -n rabbit_1@centosvm start_app

rabbitmqctl cluster_status

rabbitmqctl -n rabbit_2@centosvm stop_app

rabbitmqctl -n rabbit_2@centosvm reset

rabbitmqctl -n rabbit_2@centosvm join_cluster rabbit@centosvm --ram (內存節點)

rabbitmqctl -n rabbit_2@centosvm start_app

rabbitmqctl cluster_status

從外部要訪問虛擬機中的mq記得在防火牆中打開端口

firewall-cmd --add-port=5673/tcp --permanent  

firewall-cmd --add-port=5674/tcp --permanent  

 

rabbitmqctl add_user mq mq

rabbitmqctl set_permissions mq ".*" ".*" ".*"

rabbitmqctl set_user_tags mq administrator

rabbitmq-plugins -n rabbit_1@centosvm enable rabbitmq_management

 

多機下的集羣

一、 修改 /etc/hosts

    192.168.1.1 node1

    192.168.1.2 node2

    192.168.1.3 node3

二、Erlang Cookie 文件

/var/lib/rabbitmq/.erlang.cookie。將 node1 的該文件複製到 node二、node3,因爲這個文件權限是 400,因此須要先修改 node二、node3 中的該文件權限爲 777,而後將 node1 中的該文件拷貝到 node二、node3,最後將權限和所屬用戶/組修改回來。

三、運行各節點

四、在node二、node3上分別運行

[root@node2 ~]# rabbitmqctl stop_app

[root@node2 ~]./rabbitmqctl reset

[root@node2 ~]# rabbitmqctl join_cluster rabbit@node1

[root@node2 ~]# rabbitmqctl start_app

內存節點則是rabbitmqctl join_cluster rabbit@node1 –ram

 

移除集羣中的節點

[root@node2 ~]# rabbitmqctl stop_app

[root@node2 ~]./rabbitmqctl reset

[root@node2 ~]# rabbitmqctl start_app

 

鏡像隊列

什麼是鏡像隊列

若是RabbitMQ集羣是由多個broker節點構成的,那麼從服務的總體可用性上來說,該集羣對於單點失效是有彈性的,可是同時也須要注意:儘管exchange和binding可以在單點失效問題上倖免於難,可是queue和其上持有的message卻不行,這是由於queue及其內容僅僅存儲於單個節點之上,因此一個節點的失效表現爲其對應的queue不可用。

引入RabbitMQ的鏡像隊列機制,將queue鏡像到cluster中其餘的節點之上。在該實現下,若是集羣中的一個節點失效了,queue能自動地切換到鏡像中的另外一個節點以保證服務的可用性。在一般的用法中,針對每個鏡像隊列都包含一個master和多個slave,分別對應於不一樣的節點。slave會準確地按照master執行命令的順序進行命令執行,故slave與master上維護的狀態應該是相同的。除了publish外全部動做都只會向master發送,而後由master將命令執行的結果廣播給slave們,故看似從鏡像隊列中的消費操做其實是在master上執行的。

RabbitMQ的鏡像隊列同時支持publisher confirm和事務兩種機制。在事務機制中,只有當前事務在所有鏡像queue中執行以後,客戶端纔會收到Tx.CommitOk的消息。一樣的,在publisher confirm機制中,向publisher進行當前message確認的前提是該message被所有鏡像所接受了。

鏡像隊列的使用

添加policy

Rabbitmqctl set_policy Name Pattern Definition

Name:策略的名字

Pattern:隊列匹配模式(正則表達式)

Definition:鏡像的定義:ha-mode,ha-params,ha-sycn-mode

ha-mode: all/exactly/nodes

ha-params: n表示幾個節點上覆制/節點名稱

ha-sycn-mode:automatic manual

對隊列名稱以「queue_」隊列進行鏡像,只在兩個節點上完成複製

Rabbitmqctl set_policy ha_queue_two 「^queue_」 ‘{「ha-mode」:」exactly」,」ha-params」:2,」ha-sycn-mode「:「atuomatic」}’

在代碼中也要進行編寫

 

使用HAProxy(如下步驟僅供參考)

做用:將RabbitMQ請求均勻分發到集羣上、負載均衡、心跳檢測

安裝配置:

1.下載最新haproxy安裝包,官網:http://www.haproxy.org

2.上傳到linux的haproxy用戶根目錄下,並解壓:

     tar -zxvf haproxy-1.5.8.tar.gz

    建立目錄/home/haproxy/haproxy

3.安裝

cd haproxy-1.5.8

make  TARGET=linux26 ARCH=x86_64 PREFIX=/home/haproxy/haproxy   #將haproxy安裝到/home/haproxy/haproxy ,TARGET是指定內核版本

make install PREFIX=/home/haproxy/haproxy  

進入/home/haproxy/haproxy  目錄,建立/home/haproxy/haproxy/conf目錄,複製配置examples

cp  /home/haproxy/haproxy-1.5.8/examples/haproxy.cfg  /home/haproxy/haproxy/conf/

四、配置修改(以haproxy rabbitmq 配置爲關鍵字搜索)

 

互聯網時代的消息中間件

消息發送一致性

Void busi{

     //業務操做

    //寫庫

    //發送消息

}

業務成功,消息發送也要成功

業務失敗,消息不該該發送

消息的重複

一、 讓處理消息的服務具備冪等性

    Update a set zz = 12; (冪等性)

    Update a set zz = zz+12;(無冪等性)

二、 db或者緩存保存消息的處理情況,消息id做爲惟一性索引

消息中間件與RPC的關係

二者並非水火不容的關係,二者能夠很好的進行融合,結合起來使用。rpc客戶端調用rpc服務,或者rpc服務返回處理結果,就徹底能夠經過消息中間件進行。

使用消息中間件作rpc有何好處:自動將消息路由到合適的地方,經過消息中間件能夠在rpc服務集羣中作到負載均衡,甚至當rpc服務中某臺服務掛了,能夠作到自動重發。

消息的數據量不能太大。

相關文章
相關標籤/搜索