初識rabbitMQ(一)

19/5/29  對於rabbitMQ ,我已經研究了幾天。 以前徹底的沒有接觸過,因此有不少的概念,不少的坑要踩html

首先是安裝 rabbitmq 這個就不記錄了。java

一、引入 Mavenweb

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>



二、配置 ,寫配置文件
<!--步驟一、配置連接工廠-->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="${mq.address}"/>
<property name="port" value="${mq.port}"/>
<property name="password" value="${mq.pwd}"/>
<property name="username" value="${mq.user}"/>
<property name="publisherConfirms" value="true"/>
<property name="publisherReturns" value="true"/>
<property name="virtualHost" value="${mq.vhost}"/>
<property name="requestedHeartBeat" value="50"/>
</bean>
<!--步驟二、建立rabbitTemplate 消息模板-->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<!--構造方法須要連接信息-->
<constructor-arg ref="connectionFactory"/>
<!--配置交換機-->
<property name="exchange" value="${mq.exchange}"/>
<!--配置路由鍵-->
<property name="routingKey" value="${mq.routingKey}"/>
<!--配置隊列-->
<property name="queue" value="${mq.queue}"/>
<!--配置消息轉換-->
<property name="messageConverter" ref="serializerMessageConverter"/>
<property name="confirmCallback" ref="rabbitTemplateConfig" />
<property name="returnCallback" ref="rabbitTemplateConfig" />
<property name="mandatory" value="true" />
</bean>
<bean id="rabbitTemplateConfig" class="mq.RabbitTemplateConfig"/>
<!--注入消息轉換器-->
<bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/>
<!--引入元素文件-->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties">
<bean class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:conf/value.properties</value>
</list>
</property>
<property name="fileEncoding" value="UTF-8"/>
</bean>
</property>
</bean>
<!--申明消費者-->
<bean id="rmqConsumer" class="mq.RmqConsumer" />
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rmqConsumer" />
<property name="defaultListenerMethod" value="rmqConsumeMessage"/>
<property name="messageConverter" ref="serializerMessageConverter"/>
</bean>
<!--註冊監聽-->
<bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>
<property name="messageListener" ref="messageListenerAdapter"/>
<property name="acknowledgeMode" value="MANUAL"/>
</bean>

這個是我關於rabbitMQ 所用的配置,下面記錄一下具體的做用。
(一、)配置連接
  經過配置連接工廠從而連接到rabbitMQ
<!--步驟一、配置連接工廠-->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="${mq.address}"/>//連接的地址 127.0.0.1
<property name="port" value="${mq.port}"/>//端口號 5627
<property name="password" value="${mq.pwd}"/> //密碼
<property name="username" value="${mq.user}"/> //用戶名
<property name="publisherConfirms" value="true"/> //是否開啓提交到交換機的回調
<property name="publisherReturns" value="true"/> //是否開啓發送到隊列的錯誤回調
<property name="virtualHost" value="${mq.vhost}"/>// 虛擬機
<property name="requestedHeartBeat" value="50"/>//心跳時間(這個可刪除,我不知道有什麼用,之後有領悟再記錄)
</bean>

屬性文件中的內容
mq.address=127.0.0.1
mq.exchange=ceshi
mq.routingKey=ceshiRouting
mq.queue=ceshiQueues
mq.port=5672
mq.user=***
mq.pwd=t**an****
mq.timeout=5000
mq.vhost=testMQ

關於開啓 ConfirmReturn 的回調 還須要在模板 rabbitTemplate 中進行設置

<!--步驟二、建立rabbitTemplate 消息模板-->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<!--構造方法須要連接信息-->
<constructor-arg ref="connectionFactory"/>
<!--配置交換機-->
<property name="exchange" value="${mq.exchange}"/>
<!--配置路由鍵-->
<property name="routingKey" value="${mq.routingKey}"/>
<!--配置隊列-->
<property name="queue" value="${mq.queue}"/>
<!--配置消息轉換-->
<property name="messageConverter" ref="serializerMessageConverter"/>
<property name="confirmCallback" ref="rabbitTemplateConfig" />
<property name="returnCallback" ref="rabbitTemplateConfig" />
<property name="mandatory" value="true" />
</bean>
註冊模板類的bean 類 org.springframework.amqp.rabbit.core.RabbitTemplate 
在其構造方法中傳入連接工廠的引用, 如上 代碼 重點看 下面這幾行配置 
 <property name="confirmCallback" ref="rabbitTemplateConfig" />
<property name="returnCallback" ref="rabbitTemplateConfig" />
<property name="mandatory" value="true" />
這個就是上面提到的 回調,<property name="mandatory" value="true" /> 這個是必定要的 ,刪除了會致使 returnCallback 不起效 ,下面貼上實現類代碼 

package mq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

/**
* @author tia
* @date 2019/5/2910:45
*/
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

/**
* 是否成功發送到交換器
* 成功、失敗都會回調
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息惟一標識:"+correlationData);
System.out.println("確認結果:"+b);
System.out.println("失敗緣由:"+s);
}

/**
* 是否成功發送到隊列(須要設置mandatory 爲true)
* 失敗回調
* @param message
* @param i
* @param s
* @param s1
* @param s2
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息主體:"+message);
System.out.println("消息主體:"+i);
System.out.println("描述:"+s);
System.out.println("交換器:"+s1);
System.out.println("路由鍵:"+s2);
}
}
偷了個懶,把兩個回調放在了一塊兒 implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback 這兩個實現是必定要的。
這兩個方法的做用,就是對消息進行從新發送,或是記錄沒有發送出去的消息,等等,看我的安排了。

在個人配置中是沒有關於 隊列的建立,交換器的建立,虛擬機的建立、綁定等的內容, 這些都在RabbitMQ 的後臺完成了 圖個簡單。

到這裏,就能夠向mq發送消息了。我寫的一個例子:
package mq;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import po.Message;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.Date;

@RestController
@RequestMapping(value = "/mq",produces = "text/html;charset=UTF-8")
public class RmqProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(RmqProducer.class);
@Resource
private RabbitTemplate rabbitTemplate;

/**
*
發送信息
*/
public void sendMessage(String queueKey,Object msg) {
try {
// 發送信息
rabbitTemplate.convertAndSend(queueKey,"1");
} catch (Exception e) {
LOGGER.error("rmq消費者任務處理出現異常", e);
}
}
@RequestMapping("/sendMessage")
public void sendActiveCount(String activeMap) throws UnsupportedEncodingException {
Message message=new Message();
message.setFrom(1234566l);
message.setTo(754964641l);
message.setText("你妹妹好漂亮");
message.setDate(new Date());
message.setFromName("你妹妹");
String s = JSON.toJSONString(message);
for (int i = 0; i <100 ; i++) {
rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i));
}

}
}
主要的內容就是這個方法 rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i)); 哪兒都能發送。

再看看 消費者 怎麼弄,但是花了我大量的時間 去弄這個。

<!--申明消費者-->
<bean id="rmqConsumer" class="mq.RmqConsumer" />
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rmqConsumer" />
<property name="defaultListenerMethod" value="rmqConsumeMessage"/>
<property name="messageConverter" ref="serializerMessageConverter"/>
</bean>
<!--註冊監聽-->
<bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>
<property name="messageListener" ref="messageListenerAdapter"/>
<property name="acknowledgeMode" value="MANUAL"/>
</bean>
這個監聽是必定要有的,或許你能夠使用註解來幹掉他。
看到這個了嗎? <property name="defaultListenerMethod" value="rmqConsumeMessage"/> 這個東西就是說默認去執行你 <constructor-arg ref="rmqConsumer" /> 這個類的 這個 方法的。不過也有其餘的弊端就是 通道的問題
還有就是 若是實現了 implements ChannelAwareMessageListener 就不起效了。
看代碼:
package mq;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;;


public class RmqConsumer implements ChannelAwareMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RmqConsumer.class);
int i=0;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
Object ddd=null;
JSONObject jsonObject=JSONObject.parseObject(new String(message.getBody(),"utf-8"));
po.Message message1 = JSON.toJavaObject(jsonObject, po.Message.class);
System.out.println(message1.toString());
if(i++%10==0)
System.out.println(ddd.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody());
System.out.println(e.getMessage());
}
}
}

這個裏面沒有配置裏提到的方法,他被我吃了。由於他不生效了。
再說這個通道的問題 channel ,我這兒 消費者方法是不能拋出錯誤的,會停掉,因此只能處理, <property name="acknowledgeMode" value="MANUAL"/> 者個配置是在配置是否手動確認的。
MANUAL 手動確認 AUTO 自動確認(默認值) 若是開啓自動確認,那麼 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 
將會 報錯,也就是說,他不須要手動確認的代碼存在。它會默認全部的方法都進行 成功確認,這個真的很無奈。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 成功確認 他有兩個參數 ,消息Tag 與是否批量確認。若是true 則批量確認 tag值小於該值的全部信息將被成功確認。
message.getMessageProperties().getDeliveryTag() 消息的Tag
若是你開啓了手動確認,但並無確認,那麼你的消息就會處於未確認狀態,就像這樣 Unacked 100 ,Total 100, 發送100條消息,都沒有確認。那rabbitMQ不會把它刪除,一直堆積在內存中,後果,就看你怎麼處理了.....
 
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 失敗確認 這個方法有三個參數。 消息Tag 、是否批量確認、和 是否從新回到隊列。前兩個參數跟 成功確認相同, 最後一個若是爲true 將從新回到隊列頂端
注意 是隊列頂端,下一次消費者就會調用返回隊列的消息。若是這條消息有錯誤,那就意味着,程序會一直進行 失敗確認 返回隊列 ,死循環 。
因此 看這個 channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody()); 發送消息,它會把消息發送到隊列的末尾,這樣最後執行,就能夠避免不消費其餘正確的消息了。
相關文章
相關標籤/搜索