有時候咱們的項目中會用到即時通信功能,好比電商系統中的客服聊天功能,還有在支付過程當中,當用戶支付成功後,第三方支付服務會回調咱們的回調接口,此時咱們須要通知前端支付成功。最近發現RabbitMQ能夠很方便的實現即時通信功能,若是你沒有特殊的業務需求,甚至能夠不寫後端代碼,今天給你們講講如何使用RabbitMQ來實現即時通信!javascript
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈/訂閱(publish/subscribe)模式的輕量級通信協議,該協議構建於TCP/IP協議上。MQTT最大優勢在於,能夠以極少的代碼和有限的帶寬,爲鏈接遠程設備提供實時可靠的消息服務。html
RabbitMQ啓用MQTT功能,須要先安裝然RabbitMQ而後再啓用MQTT插件。前端
rabbitmq-plugins enable rabbitmq_mqtt
咱們可使用MQTT客戶端來測試MQTT的即時通信功能,這裏使用的是MQTTBox這個客戶端工具。java
既然MQTTBox客戶端能夠直接經過RabbitMQ實現即時通信,那咱們是否是直接使用前端技術也能夠實現即時通信?答案是確定的!下面咱們將經過html+javascript實現一個簡單的聊天功能,真正不寫一行後端代碼實現即時通信!git
rabbitmq-plugins enable rabbitmq_web_mqtt
Title
目標Topic:發送消息:發送 清空
github
``web
沒有特殊業務需求的時候,前端能夠直接和RabbitMQ對接實現即時通信。可是有時候咱們須要經過服務端去通知前端,此時就須要在應用中集成MQTT了,接下來咱們來說講如何在SpringBoot應用中使用MQTT。spring
org.springframework.integration spring-integration-mqtt
rabbitmq: mqtt: url: tcp://localhost:1883 username: guest password: guest defaultTopic: testTopic
/** * MQTT相關配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig { /** * RabbitMQ鏈接用戶名 */ private String username; /** * RabbitMQ鏈接密碼 */ private String password; /** * RabbitMQ的MQTT默認topic */ private String defaultTopic; /** * RabbitMQ的MQTT鏈接地址 */ private String url;}
/** * MQTT消息訂閱者相關配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //設置消息質量:0->至多一次;1->至少一次;2->只有一次 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { //處理訂閱消息 log.info("handleMessage : {}",message.getPayload()); } }; }}
/** * MQTT消息發佈者相關配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttConfig.getUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisherClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }}
/** * MQTT網關,經過接口將數據傳遞到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { /** * 發送消息到默認topic */ void sendToMqtt(String payload); /** * 發送消息到指定topic */ void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); /** * 發送消息到指定topic並設置QOS */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
/** * MQTT測試接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT測試接口")@RestController@RequestMapping("/mqtt")public class MqttController { @Autowired private MqttGateway mqttGateway; @PostMapping("/sendToDefaultTopic") @ApiOperation("向默認主題發送消息") public CommonResult sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload); return CommonResult.success(null); } @PostMapping("/sendToTopic") @ApiOperation("向指定主題發送消息") public CommonResult sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic); return CommonResult.success(null); }}
2020-09-17 14:29:01.689 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的消息2020-09-17 14:29:06.101 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的消息2020-09-17 14:29:07.384 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 來自網頁上的消息
消息中間件應用愈來愈普遍,不只能夠實現可靠的異步通訊,還能夠實現即時通信,掌握一個消息中間件仍是頗有必要的。若是沒有特殊業務需求,客戶端或者前端直接使用MQTT對接消息中間件便可實現即時通信,有特殊需求的時候也可使用SpringBoot集成MQTT的方式來實現,總之消息中間件是實現即時通信的一個好選擇!後端
關注公衆號:java寶典
springboot