隨着微服務概念發展,大應用逐步拆分爲小應用,提升開發效率,專門的人作專門的事情,逐漸的流行起來。git
在微服務上實現通訊的方式大部分是採用rpc方式,也有升級版本的grpc。github
還有另一種實現就是使用mq來進行解耦。spring
今天初識mq,快速入門先,準備一個環境實現案例,該文涉及如下內容:docker
rabbitmq的安裝咱們採用docker的方式,docker方便咱們快速的實現rabbitmq的安裝,不須要再對安裝mq進行頭疼。微信
docker 的兩種方式併發
//拉取mq鏡像
docker pull rabbitmq
//啓動mq
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
複製代碼
說明:app
version: "3"
services:
rabbit:
image: docker.infervision.com/library/rabbitmq:3-management
ports:
- "4369:4369"
- "5671:5671"
- "5672:5672"
- "15671:15671"
- "15672:15672"
restart: always
environment:
- RABBITMQ_DEFAULT_USER=test
- RABBITMQ_DEFAULT_PASS=test
volumes:
- /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq
container_name: rabbitmq
在該文件目錄下執行:docker-compose up -d
複製代碼
下載的rabbitmq內置管理界面,ip:15672 用戶名與密碼是咱們在啓動是寫入的。 異步
通俗的來講,主要使用MQ來解決如下三個問題。ide
在業務中,常常會遇到同時發送郵件,短信或者其餘通知內容服務。業務初期,採用同步或者異步處理方式都須要等發送完畢後再返回給客戶端。中間有必定的延遲spring-boot
業務增加後,此方式系統性能就會形成很大的浪費。採用消息隊列,將這幾個服務進行解耦,只需將消息內容發送到消息隊列中,下降用戶的等待時間,體驗效果比原先好不少。
同一個服務中可能須要其餘服務的配合才能完成一項業務操做.仍是拿常見的購物案例來講明。
在京東下單支付後,消息要通知到商家,郵件通知用戶已經購買某商品。
若是這兩種操做都採用同步執行,用戶等待時間會變長。
採用mq方式以後,訂單系統將消息持久化到mq上,返回給用戶下單成功。
mq保證消息的可靠投遞,不會致使消息丟失,保證消息的高可靠性。若是庫存出現失敗也不會致使用戶下單失敗的狀況,能夠從新進行投遞。
流量削峯,通常是同一時間涌進來不少請求,後臺處理不過來。那麼須要採用削峯方式來處理。
簡單來講是經過一個隊列承接瞬時過來流量洪峯,在消費端平滑的將消息推送出去,若是消費者消費不及時能夠將消息內容持久化在隊列中,消息不存在丟失。
本文是按照Java語言進行,使用Spring boot搭建,包管理工具Gradle。
compile("org.springframework.boot:spring-boot-starter-amqp:1.5.10.RELEASE")
複製代碼
yaml 文件配置
spring:
rabbitmq:
host: 192.168.110.5
port: 5672
username: tuixiang
password: tuixiang
複製代碼
準備好模板類,供後面直接使用
package com.infervision.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author: fruiqi * @date: 19-2-18 下午2:42 * @version:1.0 rabbit配置 **/
@Configuration
public class RabbitConfig {
/** * 日誌 **/
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.username}")
String userName;
@Value("${spring.rabbitmq.password}")
String userPassword;
@Value("${spring.rabbitmq.host}")
String host;
@Value("${spring.rabbitmq.port}")
Integer port;
/** * 注入 * * @param * @return com.rabbitmq.client.Connection * @author fruiqi * @date 19-1-22 下午5:41 **/
@Bean
public ConnectionFactory getConnection() throws Exception {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(userName);
factory.setPassword(userPassword);
factory.setHost(host);
factory.setPort(port);
return factory;
}
/** * 建立制定的 監聽容器 * * @param queueName 監聽的隊列名字 * @param listenerChannel 設置是否將監聽的頻道 公開給已註冊的 * @param PrefetchCount 告訴代理一次請求多少條消息過來 * @param ConcurrentConsumers 制定建立多少個併發的消費者數量 * @param acknowledgeMode 消息確認模式 * @param listener 監聽器 * @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer **/
public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel, int PrefetchCount, int ConcurrentConsumers, AcknowledgeMode acknowledgeMode, ChannelAwareMessageListener listener) throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());
container.setQueueNames(queueName);
container.setExposeListenerChannel(listenerChannel);
container.setPrefetchCount(PrefetchCount);
container.setConcurrentConsumers(ConcurrentConsumers);
container.setAcknowledgeMode(acknowledgeMode);
container.setMessageListener(listener);
return container;
}
}
package com.infervision.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** * @author: fruiqi * @date: 19-2-18 下午2:51 * @version:1.0 **/
@Component
public class MsgSender {
private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/** * @param exchange 交換機名稱 * @param routingKey 路由名稱 * @param message 消息內容 * @return void * @description //TODO 發送消息到消息隊列中 **/
public void sendMsg(String exchange, String routingKey, Object message) {
try {
rabbitTemplate.convertAndSend(exchange,routingKey,message);
}catch (Exception e){
logger.error("[ERROR] send statistic message error ",e);
}
}
}
複製代碼
在使用rabbitmq 有的時候須要本身客戶端建立queue,但有的時候並非本身建立,在rabbitmq頁面上進行建立queue,其餘消費者直接引用。
//初始化隊列,若是隊列已存在,則不做任何處理 若是有權限控制以下操做並不能實現
@Bean
public Queue dicomQueue() {
return new Queue(getMacPreStr(DICOM_QUEUE_NAME));
}
//初始化交換機
@Bean
public Exchange topicExchange() {
return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();
}
// 將隊列與交換機按照路由規則進行綁定
@Bean
Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);
}
複製代碼
隊列的使用:一個是發送,屬於生產者;一個是監聽,屬於消費者.
在mq配置模板類中,專門實現了一個發送類,發送文件內容,直接調用發送接口便可。
@Autowired
RabbitService rabbitService;
/** * 練習 發送數據到 mq中 * 1. 發送的數據會到 mq中 * 2. 咱們配置的 listener 是用來消費消息的 * 3. 客戶端配置 能夠參考 RabbitClientConfig * @param name 名字編號 * @param vo 實體內容 * @return: com.infervision.model.NameVo */
@ApiOperation(value = "增長name信息", notes = "實體信息")
@PostMapping(value = "/{name}")
@ApiImplicitParam(paramType = "query", name = "name", value = "用戶名字", required = true, dataType = "string")
public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {
rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));
return vo;
}
@Service
public class RabbitServiceImpl implements RabbitService {
@Autowired
MsgSender msgSender;
/** * 嘗試發送 message 到mq中 * @param message * @return: void */
@Override
public void sendMessage(String exchange, String routingKey,String message) {
msgSender.sendMsg(exchange, routingKey, message);
}
}
複製代碼
消費者實現有兩種方式,一種經過註解的方式監聽,一種是實現ChannelAwareMessageListener類來實現消費。
//在方法上進行注入。配置工廠幫助提升單個消費者一次性消費的消息數量,設置多少個消費者,用來提升程序的性能
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")
public void processDicomMessage(Message message, Channel channel) {
logger.info(message);
}
// 工廠能夠在配置模板類中中配置好。
@Bean("multipleConsumerContainerFactory")
public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(50);
factory.setConcurrentConsumers(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
複製代碼
/** * 建立監聽器。 * @author fruiqi * @date 19-2-11 下午4:18 * @param labelStatisticsListener 監聽器 * 調用咱們公用的方法 **/
@Bean
public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception {
SimpleMessageListenerContainer container = rabbitConfig.setSimpleMessageListenerContainer(「queue_name」,
true, rabbitProperties.getMaximumDelivery(),
rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener);
return container;
}
@Component
public class LabelStatisticsListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class);
/** * 處理傳輸過來的數據 * @param message 傳送的消息內容 * @param channel 實現通道 * @return: void */
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String mes = new String(message.getBody());
logger.info("[INFO] message is {}",mes);
// 手動應答 消息已消費
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
複製代碼
以上內容就完成了rabbitmq 從搭建到使用所有的流程。固然裏面還有更多的可讓咱們去探討,好比mq的隊列模式,一個系統配置多個mq等等內容。敬請期待咱們下一篇mq系列內容。
你們在系統中使用過mq嗎?大家使用的mq是什麼樣的?能夠在留言區咱們一塊兒探討哦。
代碼存放在:github中
·END·
路雖遠,行則必至
本文原發於 同名微信公衆號「胖琪的升級之路」,回覆「1024」你懂得,給個讚唄。
微信ID:YoungRUIQ