Rabbitmq學習

RabbitMQ簡介

目前RabbitMQ是AMQP 0-9-1(高級消息隊列協議)的一個實現,使用Erlang語言編寫,利用了Erlang的分佈式特性。java

 

代碼地址spring

http://download.csdn.net/download/qq_27116489/10131642


概念介紹:

  1. Broker:簡單來講就是消息隊列服務器實體。
  2. Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
  3. Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
  4. Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
  5. Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  6. vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
  7. producer:消息生產者,就是投遞消息的程序。
  8. consumer:消息消費者,就是接受消息的程序。
  9. channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。

 

使用流程

AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概以下:apache

  1. 客戶端鏈接到消息隊列服務器,打開一個channel。
  2. 客戶端聲明一個exchange,並設置相關屬性。
  3. 客戶端聲明一個queue,並設置相關屬性。
  4. 客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  5. 客戶端投遞消息到exchange。

exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。 exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。服務器

Exchange類型

Exchange路由消息的集中類型:app

名稱框架

默認的預先定義exchange名字maven

做用描述分佈式

Direct exchangeide

(Empty string) and amq.direct測試

根據Binding指定的Routing Key,將符合Key的消息發送到Binding的Queue

Fanout exchange

amq.fanout

將同一個message發送到全部同該Exchange bingding的queue

Topic exchange

amq.topic

根據Binding指定的Routing Key,Exchange對key進行模式匹配後路由到相應的Queue,模式匹配時符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。

Headers exchange

amq.match (and amq.headers in RabbitMQ)

同direct exchange相似,不一樣之處是再也不使用Routing Key路由,而是使用headers(message attributes)進行匹配路由到指定Queue。

Hello,Rabbitmq搭建

本文基於Spring rabbitmq搭建基礎框架,

須要的依賴pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.mq</groupId>
	<artifactId>rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>

		<!-- <dependency> <groupId>com.rabbitmq</groupId> <artifactId>rabbitmq-client</artifactId> 
			<version>1.3.0</version> </dependency> -->
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>1.4.5.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-amqp</artifactId>
			<version>1.4.5.RELEASE</version>
		</dependency>       
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>4.1.6.RELEASE</version>
		</dependency>
		<!-- Junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>

	</dependencies>
</project>

applicationcontext.xml

1,鏈接rabbitmq

<rabbit:connection-factory id="connectionFactory"
		host="127.0.0.1" username="admin" password="admin" port="5672"
		virtual-host="/" />
	<!-- 鏈接配置 -->
	<rabbit:admin connection-factory="connectionFactory" />

2,聲明

<!-- spring template聲明 -->
	<rabbit:template exchange="test-mq-exchange" id="amqpTemplate"
		connection-factory="connectionFactory" message-converter="messageConverter" />
	<bean id="messageConverter"
		class="org.springframework.amqp.support.converter.SimpleMessageConverter" />

3,隊列

<!-- 標準的創建Queue的參數 -->
	<rabbit:queue-arguments id="amqpQueueArguments">
		<!-- 暫時沒有 -->
	</rabbit:queue-arguments>
	
	<!-- queue -->
	<rabbit:queue id="test_queue_key" name="test_queue_key"  queue-arguments="amqpQueueArguments"
		durable="true" auto-delete="false" exclusive="false" />

	<rabbit:queue id="test_queue_key2" name="test_queue_key2"  queue-arguments="amqpQueueArguments"
		durable="true" auto-delete="false" exclusive="false" />
	<rabbit:queue id="test_queue_key3" name="test_queue_key3"  queue-arguments="amqpQueueArguments"
		durable="true" auto-delete="false" exclusive="false" />
	<!-- durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 -->

 

4,交換器

<!-- exchange -->
	<rabbit:direct-exchange name="test-mq-exchange"
		durable="true" auto-delete="false" id="test-mq-exchange" auto-declare="true">
		<rabbit:bindings>
			<rabbit:binding queue="test_queue_key2" key="test_queue_key2" />
			<rabbit:binding queue="test_queue_key" key="test_queue_key" />
			<rabbit:binding queue="test_queue_key3" key="test_queue_key3" />
		</rabbit:bindings>
	</rabbit:direct-exchange>

5,監聽消費者

<bean name="Consummer" class="com.mq.rabbitmq.Consummer" />
	<bean name="Consummer2" class="com.mq.rabbitmq.Consummer" />
	<bean name="Consummer3" class="com.mq.rabbitmq.Consummer" />
	<!-- 配置監聽 消費者 -->
	<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="auto">
		<!-- queues 監聽隊列,多個用逗號分隔 ref 監聽器 -->
		<rabbit:listener queues="test_queue_key2" ref="Consummer2" />
		<rabbit:listener queues="test_queue_key" ref="Consummer" />
		<rabbit:listener queues="test_queue_key3" ref="Consummer3" />
	</rabbit:listener-container>

6,Spring包掃描

<context:component-scan base-package="com.mq.rabbitmq"></context:component-scan>

完整的applicationcontext.xml

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans default-lazy-init="false" xmlns:p="http://www.springframework.org/schema/p"
	xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:task="http://www.springframework.org/schema/task" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd 
		 http://www.springframework.org/schema/util  http://www.springframework.org/schema/util/spring-util-3.1.xsd
		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd  ">

	<rabbit:connection-factory id="connectionFactory"
		host="127.0.0.1" username="admin" password="admin" port="5672"
		virtual-host="/" />
	<!-- 鏈接配置 -->
	<rabbit:admin connection-factory="connectionFactory" />

	<!-- spring template聲明 -->
	<rabbit:template exchange="test-mq-exchange" id="amqpTemplate"
		connection-factory="connectionFactory" message-converter="messageConverter" />
	<bean id="messageConverter"
		class="org.springframework.amqp.support.converter.SimpleMessageConverter" />

	<!-- 標準的創建Queue的參數 -->
	<rabbit:queue-arguments id="amqpQueueArguments">
		<!-- 暫時沒有 -->
	</rabbit:queue-arguments>
	
	<!-- queue -->
	<rabbit:queue id="test_queue_key" name="test_queue_key"  queue-arguments="amqpQueueArguments"
		durable="true" auto-delete="false" exclusive="false" />

	<rabbit:queue id="test_queue_key2" name="test_queue_key2"  queue-arguments="amqpQueueArguments"
		durable="true" auto-delete="false" exclusive="false" />
	<rabbit:queue id="test_queue_key3" name="test_queue_key3"  queue-arguments="amqpQueueArguments"
		durable="true" auto-delete="false" exclusive="false" />
	<!-- durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 -->
	<!-- exchange -->
	<rabbit:direct-exchange name="test-mq-exchange"
		durable="true" auto-delete="false" id="test-mq-exchange" auto-declare="true">
		<rabbit:bindings>
			<rabbit:binding queue="test_queue_key2" key="test_queue_key2" />
			<rabbit:binding queue="test_queue_key" key="test_queue_key" />
			<rabbit:binding queue="test_queue_key3" key="test_queue_key3" />
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<bean name="Consummer" class="com.mq.rabbitmq.Consummer" />
	<bean name="Consummer2" class="com.mq.rabbitmq.Consummer" />
	<bean name="Consummer3" class="com.mq.rabbitmq.Consummer" />
	<!-- 配置監聽 消費者 -->
	<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="auto">
		<!-- queues 監聽隊列,多個用逗號分隔 ref 監聽器 -->
		<rabbit:listener queues="test_queue_key2" ref="Consummer2" />
		<rabbit:listener queues="test_queue_key" ref="Consummer" />
		<rabbit:listener queues="test_queue_key3" ref="Consummer3" />
	</rabbit:listener-container>
	
	
	
	
	<context:component-scan base-package="com.mq.rabbitmq"></context:component-scan>
</beans>

生產者 

發送消息接口

package com.mq.rabbitmq;

public interface MqInt {
	
	
	    public void sendDataToQueue(String queueKey, Object object);
}

接口實現類

package com.mq.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqImpl implements MqInt {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	@Override
	public void sendDataToQueue(String queueKey, Object object) {
		amqpTemplate.convertAndSend(queueKey, object);
		
	}

}

監聽 ------消費者

package com.mq.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

public class Consummer implements MessageListener{

	
	@Autowired
	private AmqpTemplate amqpTemplate;
	public int count =1;
	@Override
	public void onMessage(Message message) {
		System.out.println(count+"-----------");
		count++;
		System.out.println(message);
	}

}

 

測試類

package test;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.mq.rabbitmq.MqImpl;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationcontext.xml" })
public class TestQueue {
	@Autowired
	MqImpl mqImpl;

	final String queue_key = "test_queue_key";
	final String queue_key2 = "test_queue_key2";
	final String queue_key3 = "test_queue_key3";

	@Test
	public void send() {
		Map<String, String> msg = new HashMap<String, String>();
		msg.put("data", "hello,rabbmitmq!");
		System.out.println("--+amqpTemplate");

		for (int i = 0; i < 200; i++) {
			
			mqImpl.sendDataToQueue(queue_key, msg);
		}

		System.out.println("------------------");
	}
}

 

能夠登陸localhost:5672/看交換器和queue的效果

相關文章
相關標籤/搜索