準備:php
1.RabbitMQ安裝(我是在window環境下安裝的)。java
安裝完成以後進入登陸頁面配置,默認地址:http://localhost:15672json
2.建立一個SpringBoot項目。學習
配置文件:ui
#rabbitmq rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 123456 virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: none concurrency: 1 max-concurrency: 1 retry: enabled: false
java代碼spa
RabbitMQ配置:debug
/** * 消息列隊配置 * * @author My */ @Component public class RabbitmqConfig { private Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class); @Autowired private RabbitTemplate rabbitTemplate; // 消息交換機 public final static String DIRECT_EXCHANGE = "directExchange"; // 日誌收集 public final static String ROUTING_KEY_LOGGER = "directLoggerQueue"; }
配置交換機日誌
@Bean public DirectExchange directExchange() { // return new DirectExchange(DIRECT_EXCHANGE, true, false); logger.debug("RabbitMQ交換機初始化"); return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build(); }
配置logger消息列隊code
@Bean public Queue directLoggerQueue() { return QueueBuilder.durable(ROUTING_KEY_LOGGER).build(); }
將消息列隊和交換機綁定blog
@Bean public Binding bindingDirectLoggerQueue(Queue directLoggerQueue, DirectExchange directExchange) { return BindingBuilder.bind(directLoggerQueue).to(directExchange).with(ROUTING_KEY_LOGGER); }
生成者:
/** * 日誌收集 * 生產者 * @author My */ @Component public class LoggerProducer { private Logger logger = LoggerFactory.getLogger(LoggerProducer.class); @Autowired private AmqpTemplate amqpTemplate; /** * 添加消息 * @param object */ public void send(Object object) { String jsonStr = JSONObject.toJSONString(object); amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY_LOGGER, jsonStr); logger.info("日誌消息已經發送++++++++++++++++++"); } }
我這裏傳輸的時候是對數據進行了json格式化,也能夠根據本身的業務進行修改。
RabbitmqConfig.DIRECT_EXCHANGE 交換機的名字
RabbitmqConfig.ROUTING_KEY_LOGGER Queue列隊的名字
消費者:
/** * 日誌收集處理 * 消費者 * @author My */ @Component public class LoggerConsumer { private Logger logger = LoggerFactory.getLogger(LoggerConsumer.class); @Autowired private ILogService iLogService; @RabbitHandler @RabbitListener(queues = RabbitmqConfig.ROUTING_KEY_LOGGER) public void process(String json, Message amqpMessage, Channel channel) throws Exception { logger.debug("logger接收到消息:{}", json); LogModel log = JSONObject.parseObject(json, LogModel.class); iLogService.insert(log); } }
這裏@RabbitListener註解中的queues須要和生產者中的名字一致。
調用的地方注入以後使用就能夠了。
/** 日誌收集通道 */ @Autowired private LoggerProducer loggerProducer;
整合完成。
做者:Se7end
聲明:本博客文章爲原創,只表明本人在工做學習中某一時間內總結的觀點或結論。轉載時請在文章頁面明顯位置給出原文連接。