初始化SDK:java
/** * 初始化SDK * * @param context context */ public void initSDK(Context context) { String clientId = String.valueOf(System.currentTimeMillis()+userId); mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, clientId); subscriptionTopics = new ArrayList<>(); mqttAndroidClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { Log.d(TAG, "Reconnected to : " + serverURI); // Because Clean Session is true, we need to re-subscribe // subscribeToTopic(); //publishMessage(); } else { Log.d(TAG, "Connected to: " + serverURI); } connectSuccess = true; subscribeToTopic(); } @Override public void connectionLost(Throwable cause) { connectSuccess = false; Log.e(TAG, "The Connection was lost." + cause.getLocalizedMessage()); } // THIS DOES NOT WORK! @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.d(TAG, "Incoming message: " +topic+ new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); }
鏈接遠程服務:服務器
/** * 鏈接遠程服務 */ public void connectServer() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setCleanSession(false); try { //addToHistory("Connecting to " + serverUri); mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { connectSuccess = true; DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); disconnectedBufferOptions.setBufferEnabled(true); disconnectedBufferOptions.setBufferSize(100); disconnectedBufferOptions.setPersistBuffer(false); disconnectedBufferOptions.setDeleteOldestMessages(false); mqttAndroidClient.setBufferOpts(disconnectedBufferOptions); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Log.e(TAG, "Failed to connect to: " + serverUri); exception.printStackTrace(); Log.d(TAG, "onFailure: " + exception.getCause()); connectSuccess = false; } }); } catch (MqttException ex) { ex.printStackTrace(); } }
獲取訂閱信息:網絡
/** *獲取訂閱信息
*/
public void connectGateway(String gatewayId, String userId) {
//獲取訂閱信息 if (!subscriptionTopics.contains(gatewayId)) { subscriptionTopics.add(gatewayId); } Log.d(TAG, "pre sub topic: connect status=" + connectSuccess); Log.d(TAG, "subtopic " + subscriptionTopics); subscribeToTopic(); }
訂閱mqtt消息:app
/** * 訂閱mqtt消息 */ private void subscribeToTopic() { try { if(subscriptionTopics.size()==0) return; String[] topics = new String[subscriptionTopics.size()]; subscriptionTopics.toArray(topics); int[] qoc = new int[topics.length]; IMqttMessageListener[] mqttMessageListeners = new IMqttMessageListener[topics.length]; for (int i = 0; i < topics.length; i++) { IMqttMessageListener mqttMessageListener = new IMqttMessageListener() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // message Arrived!消息送達後作出的處理 Log.d(TAG, topic + " : " + new String(message.getPayload())); handleReceivedMessage(new String(message.getPayload()), topic); } }; mqttMessageListeners[i] = mqttMessageListener; Log.d(TAG, "subscribeToTopic: qoc= " + qoc[i]); } mqttAndroidClient.subscribe(topics, qoc, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken iMqttToken) { Log.d(TAG, "Subscribed!"); } @Override public void onFailure(IMqttToken iMqttToken, Throwable throwable) { Log.d(TAG, "Failed to subscribe"); } }, mqttMessageListeners); } catch (MqttException ex) { System.err.println("Exception whilst subscribing"); ex.printStackTrace(); } }
處理收到的消息:async
private void handleReceivedMessage(String message, String gatewayId) { //能夠發送一條廣播通知程序 }
發送mqtt消息:tcp
/** * 發送 mqtt 消息 * * @param publishMessage 要發送的信息的 字符串 */ private void publishMessage(String publishMessage, String publishTopic) { try { publishTopic = userId + "/" + publishTopic; MqttMessage message = new MqttMessage(); message.setPayload(publishMessage.getBytes()); mqttAndroidClient.publish(publishTopic, message); Log.d(TAG, "publishMessage:Message Published \n" + publishTopic + ":" + message); if (!mqttAndroidClient.isConnected()) { Log.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer."); } } catch (MqttException e) { System.err.println("Error Publishing: " + e.getMessage()); e.printStackTrace(); } }
注意:當時調用initSdk方法是在application中,最後發現跟信鴿推送裏的mqtt消息有衝突,致使信鴿推送手機收不到,還彈出程序運行中止的崩潰彈窗,看log是信鴿報的錯,曾覺得是信鴿的問題,困擾了很久,最後才發現是本身的mqtt消息初始化的問題,只好把mqtt消息的初始化放到welcomeActivity裏。ide
沒有封裝的類:函數
public class SubscribeClient { private final static String CONNECTION_STRING = "tcp://mqtt地址:mqtt端口"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;//低耗網絡,可是又須要及時獲取數據,心跳30s private final static String CLIENT_ID = "client1"; private final static String[] TOPICS = { //訂閱信息 }; private final static int[] QOS_VALUES = {0, 0, 2, 0}; private MqttClient mqttClient = null; public SubscribeClient(String i) { try { mqttClient = new MqttClient(CONNECTION_STRING); SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler(); mqttClient.registerSimpleHandler(simpleCallbackHandler);//註冊接收消息方法 mqttClient.connect(CLIENT_ID + i, CLEAN_START, KEEP_ALIVE); mqttClient.subscribe(TOPICS, QOS_VALUES);//訂閱接主題 /** * 完成訂閱後,能夠增長心跳,保持網絡通暢,也能夠發佈本身的消息 */ mqttClient.publish(PUBLISH_TOPICS, "keepalive".getBytes(), QOS_VALUES[0], true); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 簡單回調函數,處理client接收到的主題消息 * * @author pig */ class SimpleCallbackHandler implements MqttSimpleCallback { /** * 當客戶機和broker意外斷開時觸發 * 能夠再此處理從新訂閱 */ @Override public void connectionLost() throws Exception { // TODO Auto-generated method stub System.out.println("客戶機和broker已經斷開"); } /** * 客戶端訂閱消息後,該方法負責回調接收處理消息 */ @Override public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception { // TODO Auto-generated method stub System.out.println("訂閱主題: " + topicName); System.out.println("消息數據: " + new String(payload)); System.out.println("消息級別(0,1,2): " + Qos); System.out.println("是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): " + retained); } } /** * 高級回調 * * @author pig */ class AdvancedCallbackHandler implements MqttSimpleCallback { @Override public void connectionLost() throws Exception { // TODO Auto-generated method stub } @Override public void publishArrived(String arg0, byte[] arg1, int arg2, boolean arg3) throws Exception { // TODO Auto-generated method stub } } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub new SubscribeClient("" + i); } }