1)MqttClientspring
public class MqttPublishSample { public static void main(String[] args) throws KeyManagementException, CertificateException, FileNotFoundException, IOException, KeyStoreException { String topic = "testtopic"; String content = "jiu shi zhe me zhuailulu"; int qos = 2; String broker = "tcp://127.0.0.1:1883";//"ssl://10.110.111.251:8883"; String clientId = "JavaSample"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // connOpts.setUserName("unique"); // connOpts.setPassword("abc".toCharArray()); connOpts.setCleanSession(true); System.out.println("Connecting to broker: "+broker); sampleClient.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: "+content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); message.setRetained(true); sampleClient.setCallback(new PushCallBack()); sampleClient.publish(topic, message); System.out.println("Message published"); // sampleClient.disconnect(); System.out.println("Disconnected"); // System.exit(0); // sampleClient.subscribe(topic, new MqttMessageLisener()); sampleClient.subscribe(topic); // sampleClient.disconnect(); // System.exit(0); } catch(MqttException me) { System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("excep "+me); me.printStackTrace(); } } } class PushCallBack implements MqttCallback { @Override public void connectionLost(Throwable throwable) { System.out.println("ERROR: connection lost"); //斷開重連 } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("message arrived"); System.out.println(s+mqttMessage.getPayload().toString()+"******"); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("delivery complete"); System.out.println(iMqttDeliveryToken.isComplete()); } } class MqttMessageLisener implements IMqttMessageListener { @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println(s); System.out.println(mqttMessage); } }
2) 經過Spring Integration的方式進行鏈接, 參考:https://www.jianshu.com/p/483ad386fe80tcp
@Configuration @IntegrationComponentScan public class MqttMsgHandler { @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setServerURIs(new String[]{"tcp://127.0.0.1:1883"}); mqttConnectOptions.setKeepAliveInterval(20); factory.setConnectionOptions(mqttConnectOptions); return factory; } @Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows.from(mqttInbound()) .transform(p -> p + ", received from MQTT") .handle(logger()) .get(); } private LoggingHandler logger() { System.out.println("set logger handler"); LoggingHandler loggingHandler = new LoggingHandler("INFO"); loggingHandler.setLoggerName("siSample"); return loggingHandler; } @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", mqttClientFactory(), "siSampleTopic"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); return adapter; } @Bean public IntegrationFlow mqttOutFlow() { //console input // return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(), // e -> e.poller(Pollers.fixedDelay(1000))) // .transform(p -> p + " sent to MQTT") // .handle(mqttOutbound()) // .get(); return IntegrationFlows.from(outChannel()) .handle(mqttOutbound()) .get(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } @Bean public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("siSampleTopic"); return messageHandler; } @MessagingGateway(defaultRequestChannel = "outChannel") public interface MsgWriter { void write(String note); } }
兩種方式均可以發佈和訂閱~ide