spring: mqtt: client: username: 用戶名 password: 密碼 serverURIs: tcp://ip:port # 客戶端地址,多個使用逗號隔開 clientId: client0001 # ${random.value} keepAliveInterval: 30 connectionTimeout: 30 producer: defaultQos: 1 defaultRetained: true defaultTopic: defaultTopicName consumer: defaultQos: 1 completionTimeout: 30000 consumerTopics: topic1,topic2 # 監聽的 topic,多個使用逗號隔開
/* 客戶端 */ @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(keepAliveInterval); mqttConnectOptions.setConnectionTimeout(connectionTimeout); return mqttConnectOptions; } @Bean public MqttPahoClientFactory getMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; }
@Bean public MessageChannel outboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = OUTBOUND_CHANNEL) public MessageHandler getMqttProducer() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setDefaultRetained(defaultRetained); messageHandler.setDefaultQos(defaultProducerQos); return messageHandler; }
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL) public interface MqttSender { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
@RestController public class TestController { @Autowired private MqttSender mqttSender; @RequestMapping("/send") private void send(String data){ mqttSender.sendToMqtt(data); } }
@Bean public MessageChannel inboundChannel() { return new DirectChannel(); } @Bean public MessageProducer getMqttConsumer() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(defaultConsumerQos); adapter.setOutputChannel(inboundChannel()); return adapter; }
@Component public class MqttConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class); @Bean @ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL) public MessageHandler handler() { return message -> { String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString(); LOGGER.info("[{}]主題接收到消息:{}", topic, message.getPayload().toString()); }; } }
注意事項html
@ServiceActivator 和 @MessagingGateway 中綁定的 Channel 名,需與返回 MessageChannel 的 Bean 的方法名同樣:vue
如發佈者綁定的 Channel 名爲 outboundChannel,則須要有對應的方法,以下:java
@Bean public MessageChannel outboundChannel() { return new DirectChannel(); }
參考git
完整代碼:GitHubgithub