RabbitMQ使用及原理解析

技術由螞蟻課堂教學 www.itmayiedu.com 高端IT人才培訓授予java

能夠報名學習螞蟻課堂高端學習,咱們等着你的到來。web

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。MQ消息隊列 是企業級互聯網架構核心產品,具有低延遲,高併發,高可用,高可靠,支撐萬億級數據洪峯的分佈式消息中間件。spring

RabbitMQ擁有兩種模式,點對點和發佈訂閱,它和ActiveMQ不一樣的地方就在於RabbitMQ在消費時能夠返回一個消費的標記。我的理解在實際生產中使用RabbitMQ的便捷性略高於ActiveMQ方法,由於RabbitMQ在發送時有發送確認和消費者消費返回。能夠很好的知道消費者是否已經正確消費。json

使用RabbitMQ最好在Linux系統上安裝RabbitMQ服務,具體安裝過程不作詳細介紹,百度一大堆。架構

安裝完成事後登陸管理頁面地址http://192.168.10.10:15672/,使用帳號和密碼登陸事後就能夠看到管理頁面。併發

manager

在界面上能夠點擊Queue隊列按鈕,建立咱們兩個本身的隊列,這裏建立hello.queue1和hello.queue2兩個隊列。app

queue

下面咱們就能夠建立SpringBoot工程來使用RabbitMQ消息隊列了。dom

下面是個人項目結構圖 proj分佈式

首先在父工程項目中的pom文件引入jar包:ide

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.4.0.RELEASE</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.40</version>
		</dependency>
	</dependencies>

在父工程下面建立兩個Maven Module子工程分別如圖命名。

在生產中中建立一個RabbitMQ配置文件RabbitConfig.java

@Configuration
public class RabbitConfig {
	// 聲明隊列
	@Bean
	public Queue queue1() {
		return new Queue("hello.queue1", true); // true表示持久化該隊列
	}
	@Bean
	public Queue queue2() {
		return new Queue("hello.queue2", true);
	}
	// 聲明交互器
	@Bean
	TopicExchange topicExchange() {
		return new TopicExchange("topicExchange");
	}
	// 綁定
	@Bean
	public Binding binding1() {
		return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
	}
	@Bean
	public Binding binding2() {
		return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
	}
}

而後在生產者中建立生產者代碼:

@Component
@EnableScheduling
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@PostConstruct
	public void init() {
		rabbitTemplate.setConfirmCallback(this);
		rabbitTemplate.setReturnCallback(this);
	}
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (ack) {
			System.out.println("消息發送成功:" + correlationData);
		} else {
			System.out.println("消息發送失敗:" + cause);
		}

	}
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		System.out.println(message.getMessageProperties().getCorrelationIdString() + " 發送失敗");

	}
	// 發送消息,不須要實現任何接口,供外部調用。
	public void send(String msg) {
		CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
		System.out.println("開始發送消息 : " + msg.toLowerCase());
		String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
		System.out.println("結束髮送消息 : " + msg.toLowerCase());
		System.out.println("消費者響應 : " + response + "---消息處理完成");
	}

	// 每隔五秒就發送消息進行測試
	@Scheduled(fixedDelay = 5000)
	public void sendsmg() {
		JSONObject obj = new JSONObject();
		obj.put("time", System.currentTimeMillis());
		obj.put("name", "LLL丶禾羊");
		obj.put("address", "www.itmayiedu.com");
		obj.put("orderId", "20135111111123");
		send(obj.toJSONString());
	}
}

建立配置文件application.propreties

spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

到這裏生產者就能夠直接啓動了,建立一個App.java進行啓動,默認配置的定時器5秒鐘觸發一次消息 能夠看到控制檯輸出。

-----------------------------------------www.itmayiedu.com----高端IT培訓------------------------------------------------------------

下面消費者更加的簡單,因爲咱們在生產者中綁定了兩個隊列,咱們在接收者中就須要兩個監聽器。

咱們在消費者代碼中建立消費者代碼Receive.java

@Component
public class Receiver {
	@RabbitListener(queues = "hello.queue1")
	public String processMessage1(String msg) {
		System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue1隊列的消息:" + msg);
		return msg.toUpperCase();
	}
	@RabbitListener(queues = "hello.queue2")
	public void processMessage2(String msg) {
		System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue2隊列的消息:" + msg);
	}
}

配置文件application.propreties

server.port=8081
spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.concurrency=2
spring.rabbitmq.listener.max-concurrency=2

而後建立啓動類直接啓動就好了

最後分別啓動生產者和消費者,能夠直接在控制檯看到輸出

生產者: sender

消費者: receiver

在被消費事後咱們能夠很清楚的發如今生產者這邊會打印出消費者返回的信息,代表了消費成功。

到這裏RabbitMQ的示例就完了

總結:  

須要知道MQ的模式      

 點對點        一對一     例子    私聊

  發佈訂閱      一對多   例子   一我的和多我的聊天

步驟

1.須要配置  指定隊列queue   是否持久化配置

2.生產者發送消息到隊列中

3.消費者去隊列中去消費消息,若是消費了會給生產者返回一個成功失敗標識,此時隊列中不存在消息

相關文章
相關標籤/搜索