上篇博客講解了服務器集羣部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集羣html
這篇在上篇搭建好的基礎上,將SpringBoot整合RocketMQ實現生產消費。git
GitHub地址
: https://github.com/yudiandemingzi/spring-boot-studygithub
先說下技術大體架構spring
SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)
<!--注意: 這裏的版本,要和部署在服務器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
鏈接RocketMQ服務器配置類,這裏爲了方便直接寫成常量。apache
/** * @Description: 安裝實際開發這裏的信息 都是應該寫在配置裏,來讀取,這裏爲了方便因此寫成常量 */ public class JmsConfig { /** * Name Server 地址,由於是集羣部署 因此有多個用 分號 隔開 */ public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877"; /** * 主題名稱 主題通常是服務器設置好 而不能在代碼裏去新建topic( 若是沒有建立好,生產者往該主題發送消息 會報找不到topic錯誤) */ public static final String TOPIC = "topic_family"; }
@Slf4j @Component public class Producer { private String producerGroup = "test_producer"; private DefaultMQProducer producer; public Producer(){ //示例生產者 producer = new DefaultMQProducer(producerGroup); //不開啓vip通道 開通口端口會減2 producer.setVipChannelEnabled(false); //綁定name server producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } /** * 對象在使用以前必需要調用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer(){ return this.producer; } /** * 通常在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown(){ this.producer.shutdown(); } }
@Slf4j @Component public class Consumer { /** * 消費者實體對象 */ private DefaultMQPushConsumer consumer; /** * 消費者組 */ public static final String CONSUMER_GROUP = "test_consumer"; /** * 經過構造函數 實例化對象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消費模式:一個新的訂閱組第一次啓動從隊列的最後位置開始消費 後續再啓動接着上次消費的進度開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //訂閱主題和 標籤( * 表明全部標籤)下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); // //註冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // msgs中只收集同一個topic,同一個tag,而且key相同的message // 會把不一樣的消息分別放置到不一樣的隊列中 try { for (Message msg : msgs) { //消費者獲取消息 這裏只輸出 不作後面邏輯處理 String body = new String(msg.getBody(), "utf-8"); log.info("Consumer-獲取消息-主題topic爲={}, 消費消息爲={}", msg.getTopic(), body); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消費者 啓動成功======="); } }
大體就是這邊簡單,下面就是測試。服務器
先寫個測試接口進行測試。架構
@Slf4j @RestController public class Controller { @Autowired private Producer producer; private List<String> mesList; /** * 初始化消息 */ public Controller() { mesList = new ArrayList<>(); mesList.add("小小"); mesList.add("爸爸"); mesList.add("媽媽"); mesList.add("爺爺"); mesList.add("奶奶"); } @RequestMapping("/text/rocketmq") public Object callback() throws Exception { //總共發送五次消息 for (String s : mesList) { //建立生產信息 Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的稱謂:" + s).getBytes()); //發送 SendResult sendResult = producer.getProducer().send(message); log.info("輸出生產者信息={}",sendResult); } return "成功"; } }
很明顯生產發送消息已經成功,二消費者也成功接收了消息!app
另外咱們再來看下RocketMQ控制檯是否也有消費記錄分佈式
很明顯在控制檯這邊也會有消費記錄!函數
總結
這邊只是簡單的整合,後面會經過RocketMQ實現分佈式事務,能夠用於線上實際環境中,到時候會深刻講解下源碼。