概覽spring
本文主要介紹如何使用RabbitMQ消息代理來實現分佈式系統之間的通訊,從而促進微服務的鬆耦合。服務器
RabbitMQ,也被稱爲開源消息代理,它支持多種消息協議,而且能夠部署在分佈式系統上。它輕量級,便於部署應用程序。它主要充當一個隊列,其中輸入的消息能夠首先被操做。RabbitMQ能夠在許多操做系統和雲環境中運行,併爲大多數流行語言提供了普遍的開發工具。它是生產者-消費者模式,生產者發出信息,消費者消費信息。RabbitMQ的主要特色以下:架構
異步消息app
分佈式部署框架
管理和監控異步
企業和雲計算分佈式
安裝函數
對於RabbitMQ,首先須要在系統中安裝ErLang,由於RabbitMQ是用ErLang語言編寫的。安裝Erlang以後,你能夠經過下面的介紹從它的官網下載最新版本的 RabbitMQ 。微服務
在微服務中使用RabbitMQ工具
在您的微服務體系結構中,RabbitMQ是實現消息隊列的最簡單的免費的可用選項之一。這些隊列模式有助於解耦各個微服務之間的通訊來增長應用程序的彈性。咱們能夠將這些隊列用於各類目的,好比核心微服務之間的交互、微服務的解耦、實現故障轉移機制,以及經過消息代理髮送電子郵件通知。
不管在哪裏,只要有兩個或兩個以上的核心模塊須要相互通訊,咱們就不該該進行直接的HTTP調用,由於它們會使核心層產生緊耦合,而且當每一個核心模塊有更多實例時將很難管理。並且每當服務宕機時,HTTP調用模式就會失敗,由於在服務重啓以後,咱們將沒法跟蹤舊的HTTP請求調用。這就產生了對RabbitMQ的需求。
在微服務中設置RabbitMQ
在微服務架構中,爲了演示,咱們將使用一個能夠經過任何核心微服務發送電子郵件通知的示例模式。在這種模式下,咱們將有一個能夠存在任何核心微服務的生產者,它將生成電子郵件內容並將其發送到隊列。而後,這個電子郵件內容由老是在等待隊列中新消息的消費者來處理。
請注意,因爲正在使用Spring Boot構建微服務,所以咱們將爲Spring提供配置。
1)生產者:這一層負責生成電子郵件內容,並將此內容發送給RabbitMQ中的消息代理。
a)在properties文件中,咱們須要配置隊列名和交換類型,以及安裝RabbitMQ服務器的主機和端口。
queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
b)咱們須要建立一個配置類,它將使用隊列名和交換類型將隊列綁定到微服務模塊。
@Configuration
public class RabbitConfiguration {
@Value("${fanout.exchange}")
private String fanoutExchange;
@Value("${queue.name}")
private String queueName;
Queue queue() {
return new Queue(queueName, true);
}
FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
c)最後,咱們須要一個工具類,它將使用Spring框架提供的RabbitTemplate將實際的電子郵件內容發送到隊列中。
@Component
public class QueueProducer {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Value("${fanout.exchange}")
private String fanoutExchange;
private final RabbitTemplate rabbitTemplate;
@Autowired
public QueueProducer(RabbitTemplate rabbitTemplate) {
super();
this.rabbitTemplate = rabbitTemplate;
}
public void produce(NotificationRequestDTO notificationDTO) throws Exception {
logger.info("Storing notification...");
rabbitTemplate.setExchange(fanoutExchange);
rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(notificationDTO));
logger.info("Notification stored in queue sucessfully");
}
}
d)而後,您能夠在模塊的任何地方調用這個produce方法。
{
queueProducer.produce(notificationDTO);
}
2) 消費者:這一層負責使用FIFO方法從RabbitMQ消息代理中消費消息,而後執行與電子郵件相關的操做。
a)在這個properties文件中,咱們須要配置隊列名和交換類型,以及安裝RabbitMQ服務器的主機和端口。
queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
b)咱們須要建立一個配置類,它將使用隊列名和交換類型將隊列綁定到微服務模塊。此外,在消費者的RabbitMQ配置中,咱們須要建立一個充當消費者的MessageListenerAdapter bean,它始終偵遵從隊列中傳入的消息。這個MessageListenerAdapter將有一個帶有消費者工具類和defaultListenerMethod的有參構造函數,在這裏咱們能夠指定與電子郵件相關的操做。
@Configuration
public class RabbitConfiguration {
private static final String LISTENER_METHOD = "receiveMessage";
@Value("${queue.name}")
private String queueName;
@Value("${fanout.exchange}")
private String fanoutExchange;
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(QueueConsumer consumer) {
return new MessageListenerAdapter(consumer, LISTENER_METHOD);
}
}
c)而後,須要建立具備特定消息偵聽器方法的 QueueConsumer類,在該類中咱們能夠進行實際發送電子郵件的操做。
@Component
public class QueueConsumer {
@Autowired
MailServiceImpl mailServiceImpl;
protected Logger logger = LoggerFactory.getLogger(getClass());
public void receiveMessage(String message) {
logger.info("Received (String) " + message);
processMessage(message);
}
public void receiveMessage(byte[] message) {
String strMessage = new String(message);
logger.info("Received (No String) " + strMessage);
processMessage(strMessage);
}
private void processMessage(String message) {
try {
MailDTO mailDTO = new ObjectMapper().readValue(message, MailDTO.class);
ValidationUtil.validateMailDTO(mailDTO);
mailServiceImpl.sendMail(mailDTO, null);
} catch (JsonParseException e) {
logger.warn("Bad JSON in message: " + message);
} catch (JsonMappingException e) {
logger.warn("cannot map JSON to NotificationRequest: " + message);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
總結
經過使用RabbitMQ,您能夠避免服務之間直接的HTTP調用,並消除核心微服務的緊密耦合。這將幫助您在更高級別上實現微服務的可伸縮性,並在微服務之間添加故障轉移機制。