阿里雲RocketMQ的生產者簡單實現

// MQ的應用場景有好比 訂單變動消息能夠經過產生這個事件的地方(好比前端調用後端的接口post一個訂單,那麼就是在這個mapping方法裏作一個生產者【不過最好經過aop來實現,否則n多個接口都要寫生產者代碼】,
// 將客戶端發來的訂單消息存入MQ裏,而後相關的服務【好比須要通知後臺管理人員,那麼通知後臺管理人員的service確定是一個consumer來消費該消息,不過的組的消費者是能夠屢次消費該消息的,這符合現實邏輯,我有一個消息
// 須要讓多個部門知道,那麼這個部門其實就是消費組,可是一個組裏只有一個consumer消費該消息】會處理這條消息(好比入庫/調用相關接口啥的)
// 注意,消息發送給消息隊列後,若是沒有任何消費組有消費過,那麼這條消息會被保存直到被至少一個消費組消費;已經消費的消息正常狀況下不會再被消費;
package
com.test.eee; import com.aliyun.openservices.ons.api.*; import java.util.Date; import java.util.Properties; import java.util.Scanner; /** * 阿里雲RocketMQ Producer簡單實現 */ public class MQTestProducer { public static void main(String[] args) { Properties properties = new Properties(); // GROUP_ID其實就是網上文章裏的GroupName,對於RocketMQ而言一條消息會分發給全部的訂閱了此消息(topic/tag)的group // 可是一個group裏的consumer只有一個會獲得這條消息 // TODO 通過測試阿里雲上的RocketMQ能夠不設置組 //properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_CONSUMER_EEE"); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.AccessKey, "sss"); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立 properties.put(PropertyKeyConst.SecretKey, "bbb"); //設置發送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設置 TCP 接入域名,到控制檯的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://wwws.mq-internet-access.mq-internet.aliyuncs.com:80"); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用 start 方法來啓動 Producer,只需調用一次便可 producer.start(); //循環發送消息 //for (int i = 0; i < 100; i++) { Message msg = new Message( // Message 所屬的 Topic "sss-change", // TODO Topic表示某種業務的類型,而Tag則是對這種業務的具體的劃分 // 好比消息類型是保存的是微信消息,那麼能夠分爲好友消息/羣組消息/小程序消息等等Tag "TagA", // Message Body 能夠是任何二進制形式的數據, MQ 不作任何干預, // 須要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 "Hello 中文拉拉".getBytes()); // 設置表明消息的業務關鍵屬性,請儘量全局惟一。 // 以方便您在沒法正常收到消息狀況下,可經過阿里雲服務器管理控制檯查詢消息並補發 // 注意:不設置也不會影響消息正常收發 // TODO 這個key就是比Tag更加精確的標識,通常設置爲整個Topic裏的消息惟一標識 //msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // 同步發送消息,只要不拋異常就是成功 // TODO 發送的每條消息在RocketMQ裏都有一個惟一的MessageId if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息發送失敗,須要進行重試處理,可從新發送這條消息或持久化這條數據進行補償處理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } // } var scanner = new Scanner(System.in); scanner.next(); System.out.println("closed producer conn."); // 在應用退出前,銷燬 Producer 對象 // 注意:若是不銷燬也沒有問題 producer.shutdown(); } }

若是是要結合Spring,則能夠用ProducerBean代替Producer前端

@Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean xxxProducer() {
        return this.getProducer(gid);
    }


    private ProducerBean getProducer(String gid) {
        Properties properties = new Properties();
        properties.setProperty("addr", nameSrvAddr);
        properties.setProperty("AccessKey", accessKey);
        properties.setProperty("SecretKey", secretKey);
        // 對於producer能夠不要gid
        //properties.setProperty("GROUP_ID", gid);

        ProducerBean producer = new ProducerBean();
        producer.setProperties(properties);
        return producer;
    }

用的時候就是用過這個bean.send(new Message(...))來實現發送
相關文章
相關標籤/搜索