上一篇介紹了rabbitmq的安裝以及監控,基礎是資料網上有不少。這裏仍是補充一點概念,我也沒看官方文檔因此說的不對還請留言指正。java
消息中間件最大的做用就是解耦,消峯,這個你們應該都知道。可是這個解耦就意味着解除依賴關係,因此有時候這種解耦會給業務帶來必定的缺陷,好比:你但願消費者消費完消息之後及時通知生成者,這時候是作不到的,消費狀況和推送狀況只能通知到隊列,消費者和生產者是不會直接通訊的,若是你想消費完後可以通知到生產者這樣就違背了消息中間件設計的原理,也就說明你不適合選擇消息中間件來解決這個業務問題。spring
具體操做:apache
一、引入rabbitmq依賴,具體座標以下:json
<!-- 配置 rabbitMq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、application配置緩存
#配置rabbitmq spring.application.name=springboot-rabbitmq spring.rabbitmq.host=192.168.19.8 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.publisher-confirms=true#手動確認 spring.rabbitmq.virtual-host=/ spring.rabbitmq.listener.prefetch=5#每次處理5條消息 spring.rabbitmq.listener.acknowledge-mode=MANUAL #手動確認
三、生產者springboot
package com.mymall.crawlerTaskCenter.mq; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; import javax.annotation.PostConstruct; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.mymall.crawlerTaskCenter.domain.CrawlerJob; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.springframework.amqp.rabbit.support.CorrelationData; @Component public class RabbitMqService implements RabbitTemplate.ConfirmCallback{ @Autowired RabbitTemplate rabbitTemplate; public void sendCrawlerJob(CrawlerJob crawlerJob) { String sendMsg = JSONObject.toJSONString(crawlerJob); System.out.println("Sender1 : " + sendMsg); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(this.rabbitTemplate.getExchange(),"crawlerJobs", sendMsg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回調id:" + correlationData); if (ack) { System.out.println("消息成功消費"); } else { System.out.println("消息消費失敗:" + cause); } } }
四、消費者app
package com.crawlerTaskCenter.rabbitMq; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.apache.log4j.Logger; import org.reflections.Reflections; import org.reflections.util.ConfigurationBuilder; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.crawlerTaskCenter.AppInit; import com.crawlerTaskCenter.Application; import com.crawlerTaskCenter.annotation.CrawlerAnnotation; import com.crawlerTaskCenter.crawler.CrawlerBase; import com.crawlerTaskCenter.model.CrawlerJob; import com.crawlerTaskCenter.util.InetAddressUtil; import com.crawlerTaskCenter.util.ThreadPoolUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; @Configuration public class Receiver { private static Logger log = Logger.getLogger(Application.class); private static Map<String, Class<?>> crawlerClassMap=new HashMap<String, Class<?>>(); private static String returnPath; private static ThreadPoolUtil threadPool ; @Value("${server.port}") private String serverport; public Receiver(){ System.out.println("init Receiver"); } @PostConstruct public void init(){ //初始化returnPath returnPath="http://" + InetAddressUtil.getHostIP() + ":"+serverport+"/task/getMessage"; log.info("returnPath:"+returnPath); initCrawlerClassMap(); threadPool = new ThreadPoolUtil("crawlerTaskPool"); threadPool.setPoolsize(30); threadPool.setMaximumPoolSize(50); threadPool.initPool(); } @Autowired CachingConnectionFactory connectionFactory; private void initCrawlerClassMap(){ Reflections reflections = new Reflections( ConfigurationBuilder.build("com.crawlerTaskCenter.crawler")); Set<Class<?>> set = reflections.getTypesAnnotatedWith(CrawlerAnnotation.class); for (Class<?> object : set) { try { CrawlerAnnotation geccoClass = (CrawlerAnnotation) object .getAnnotation(CrawlerAnnotation.class); String key = geccoClass.key(); String type=geccoClass.type(); //緩存全部爬蟲class 方便在啓動線程時直接實例化 crawlerClassMap.put(key+type, object); } catch (Exception e) { e.printStackTrace(); } } } @Bean public SimpleMessageListenerContainer messageContainer() { connectionFactory.setPublisherConfirms(true); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("crawlerJobs");; container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setPrefetchCount(5); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); try{ JSONObject json=JSON.parseObject(new String(body)); System.out.println("receive msg : " + new String(body)); // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 new tt(channel, message.getMessageProperties().getDeliveryTag(),json).start(); }catch(Exception ex){ ex.printStackTrace(); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //確認消息成功消費 } } }); return container; } class tt extends Thread{ long tag=0l; Channel channel=null; JSONObject jsonObject=null; tt(Channel channel,long tag,JSONObject jsonObject){ this.channel=channel; this.tag=tag; this.jsonObject=jsonObject; } @Override public void run() { try { Class[] paraTypes = { CrawlerJob.class }; // 從緩存中獲取爬蟲class Class<?> object = crawlerClassMap.get(jsonObject .getString("key") + jsonObject.getString("type")); if (object != null) { try { // 實例化 CrawlerJob CrawlerJob crawler = JSON.parseObject( jsonObject.toJSONString(), CrawlerJob.class); Object[] paras = { crawler }; Constructor<?> cons = object.getConstructor(paraTypes); CrawlerBase crawlerInstace = (CrawlerBase) cons .newInstance(paras); crawlerInstace.returnPath = returnPath; threadPool.executeCrawlerTask(crawlerInstace); } catch (Exception e) { e.printStackTrace(); } } else { log.error(jsonObject.getString("jobName") + "爬蟲不存在"); } } catch (Exception e) { e.printStackTrace(); try { channel.basicNack(tag,false, false); } catch (IOException e1) { e1.printStackTrace(); } } try { channel.basicAck(tag, false); } catch (IOException e) { e.printStackTrace(); } } } }
核心代碼是messageContainer方法,其餘的是我真實項目中的一些邏輯(消息隊列中存的是爬蟲任務,根據任務啓動爬蟲線程),你們能夠忽略。dom
總結:ide
我只用了最簡單消息發送、接收、手動確認,其實rabbitmq最比較強大的消息分發策略、響應策略、事務、重試機制等。spring-boot
目前生產環境尚未上,由於生產環境中間件必須是高可用,還要進行集羣部署才能發佈到生產環境。
下一篇:rabbitmq集羣搭建