RabbitMQ實現即時通信竟然如此簡單!

有時候咱們的項目中會用到即時通信功能,好比電商系統中的客服聊天功能,還有在支付過程當中,當用戶支付成功後,第三方支付服務會回調咱們的回調接口,此時咱們須要通知前端支付成功。最近發現RabbitMQ能夠很方便的實現即時通信功能,若是你沒有特殊的業務需求,甚至能夠不寫後端代碼,今天給你們講講如何使用RabbitMQ來實現即時通信!javascript

MQTT協議

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈/訂閱(publish/subscribe)模式的輕量級通信協議,該協議構建於TCP/IP協議上。MQTT最大優勢在於,能夠以極少的代碼和有限的帶寬,爲鏈接遠程設備提供實時可靠的消息服務。html

ThirdPartyImage_ae54bd97.png

MQTT相關概念

  • Publisher(發佈者):消息的發出者,負責發送消息。
  • Subscriber(訂閱者):消息的訂閱者,負責接收並處理消息。
  • Broker(代理):消息代理,位於消息發佈者和訂閱者之間,各種支持MQTT協議的消息中間件均可以充當。
  • Topic(主題):能夠理解爲消息隊列中的路由,訂閱者訂閱了主題以後,就能夠收到發送到該主題的消息。
  • Payload(負載);能夠理解爲發送消息的內容。
  • QoS(消息質量):全稱Quality of Service,即消息的發送質量,主要有QoS 0、QoS 一、QoS 2三個等級,下面分別介紹下: QoS 0(Almost Once):至多一次,只發送一次,會發生消息丟失或重複; QoS 1(Atleast Once):至少一次,確保消息到達,但消息重複可能會發生; QoS 2(Exactly Once):只有一次,確保消息只到達一次。

RabbitMQ啓用MQTT功能

RabbitMQ啓用MQTT功能,須要先安裝然RabbitMQ而後再啓用MQTT插件。前端

  • 首先咱們須要安裝並啓動RabbitMQ,對RabbitMQ不瞭解的朋友能夠參考《花了3天總結的RabbitMQ實用技巧,有點東西!》;
  • 接下來就是啓用RabbitMQ的MQTT插件了,默認是不啓用的,使用以下命令開啓便可;
rabbitmq-plugins enable rabbitmq_mqtt
  • 開啓成功後,查看管理控制檯,咱們能夠發現MQTT服務運行在1883端口上了。

ThirdPartyImage_af39b4a1.png

MQTT客戶端

咱們可使用MQTT客戶端來測試MQTT的即時通信功能,這裏使用的是MQTTBox這個客戶端工具。java

ThirdPartyImage_8e80c3e9.png

  • 點擊Create MQTT Client按鈕來建立一個MQTT客戶端;

ThirdPartyImage_d91430ae.png

  • 接下來對MQTT客戶端進行配置,主要是配置好協議端口、鏈接用戶名密碼和QoS便可;

ThirdPartyImage_3f0d8691.png

  • 再配置一個訂閱者,訂閱者訂閱testTopicA這個主題,咱們會向這個主題發送消息;

ThirdPartyImage_55cad197.png

  • 發佈者向主題中發佈消息,訂閱者能夠實時接收到。

ThirdPartyImage_15d5058b.png

前端直接實現即時通信

既然MQTTBox客戶端能夠直接經過RabbitMQ實現即時通信,那咱們是否是直接使用前端技術也能夠實現即時通信?答案是確定的!下面咱們將經過html+javascript實現一個簡單的聊天功能,真正不寫一行後端代碼實現即時通信!git

  • 因爲RabbitMQ與Web端交互底層使用的是WebSocket,因此咱們須要開啓RabbitMQ的MQTT WEB支持,使用以下命令開啓便可;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 開啓成功後,查看管理控制檯,咱們能夠發現MQTT的WEB服務運行在15675端口上了;

ThirdPartyImage_b4e2b561.png

ThirdPartyImage_7a1a1d42.png

  • 實現的功能很是簡單,一個單聊功能,須要注意的是配置好MQTT服務的訪問地址爲:ws://localhost:15675/ws
Title

目標Topic:發送消息:發送 清空github

``web

ThirdPartyImage_f92103e3.png

在SpringBoot中使用

沒有特殊業務需求的時候,前端能夠直接和RabbitMQ對接實現即時通信。可是有時候咱們須要經過服務端去通知前端,此時就須要在應用中集成MQTT了,接下來咱們來說講如何在SpringBoot應用中使用MQTT。spring

  • 首先咱們須要在pom.xml中添加MQTT相關依賴;
org.springframework.integration    spring-integration-mqtt
  • 在application.yml中添加MQTT相關配置,主要是訪問地址、用戶名密碼、默認主題信息;
rabbitmq:  mqtt:    url: tcp://localhost:1883    username: guest    password: guest    defaultTopic: testTopic
  • 編寫一個Java配置類從配置文件中讀取配置便於使用;
/** * MQTT相關配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig {    /**     * RabbitMQ鏈接用戶名     */    private String username;    /**     * RabbitMQ鏈接密碼     */    private String password;    /**     * RabbitMQ的MQTT默認topic     */    private String defaultTopic;    /**     * RabbitMQ的MQTT鏈接地址     */    private String url;}
  • 添加MQTT消息訂閱者相關配置,使用@ServiceActivator註解聲明一個服務激活器,經過MessageHandler來處理訂閱消息;
/** * MQTT消息訂閱者相關配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",                        mqttConfig.getDefaultTopic());        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //設置消息質量:0->至多一次;1->至少一次;2->只有一次        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message> message) throws MessagingException {                //處理訂閱消息                log.info("handleMessage : {}",message.getPayload());            }        };    }}
  • 添加MQTT消息發佈者相關配置;
/** * MQTT消息發佈者相關配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { mqttConfig.getUrl()});        options.setUserName(mqttConfig.getUsername());        options.setPassword(mqttConfig.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());        return messageHandler;    }    @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }}
  • 添加MQTT網關,用於向主題中發送消息;
/** * MQTT網關,經過接口將數據傳遞到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {    /**     * 發送消息到默認topic     */    void sendToMqtt(String payload);    /**     * 發送消息到指定topic     */    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);    /**     * 發送消息到指定topic並設置QOS     */    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
  • 添加MQTT測試接口,使用MQTT網關向特定主題中發送消息;
/** * MQTT測試接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT測試接口")@RestController@RequestMapping("/mqtt")public class MqttController {    @Autowired    private MqttGateway mqttGateway;    @PostMapping("/sendToDefaultTopic")    @ApiOperation("向默認主題發送消息")    public CommonResult sendToDefaultTopic(String payload) {        mqttGateway.sendToMqtt(payload);        return CommonResult.success(null);    }    @PostMapping("/sendToTopic")    @ApiOperation("向指定主題發送消息")    public CommonResult sendToTopic(String payload, String topic) {        mqttGateway.sendToMqtt(payload, topic);        return CommonResult.success(null);    }}
  • 調用接口向主題中發送消息進行測試;

ThirdPartyImage_cb4af000.png

  • 後臺成功接收到消息並進行打印。
2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 來自網頁上的消息

總結

消息中間件應用愈來愈普遍,不只能夠實現可靠的異步通訊,還能夠實現即時通信,掌握一個消息中間件仍是頗有必要的。若是沒有特殊業務需求,客戶端或者前端直接使用MQTT對接消息中間件便可實現即時通信,有特殊需求的時候也可使用SpringBoot集成MQTT的方式來實現,總之消息中間件是實現即時通信的一個好選擇!後端

關注公衆號:java寶典
aspringboot

相關文章
相關標籤/搜索