MQTT協議之 Apache Apollo服務

1、說明html

  MQTTIBM開發的一個即時通信協議,有可能成爲物聯網的重要組成部分。該協議支持全部平臺,幾乎能夠把全部聯網物品和外部鏈接起來,被用來當作傳感器和致動器(好比經過Twitter讓房屋聯網)的通訊協議。java

  Apache Apollo是一個代理服務器,其是在ActiveMQ基礎上發展而來的,能夠支持STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多種協議。apache

  總結來講MQTT只是一種消息推送的協議目前(2016/1/13)V3.1版本,而Apache Apollo是更具這種協議而開發的一款服務性的服務程序,被用來進行消息推送。一樣的服務還有Mosquitto,介紹地址:http://blog.csdn.net/xukai871105/article/details/39252653windows

2、原理瀏覽器

    Apache Apollo說白了其實很簡單,就是在服務器端建立一個惟一訂閱號,發送者能夠向這個訂閱號中發東西,而後接受者(即訂閱了這個訂閱號的人)都會收到這個訂閱號發出來的消息。以此來完成消息的推送。服務器實際上是一個消息中轉站。服務器

3、下載安裝Apollo(windows)網絡

    1.下載地址:http://activemq.apache.org/apollo/session

      

      

  2.安裝eclipse

  apollo中間件實際上是免安裝的,咱們只須要下載apache-apollo-1.7.1-windows-distro.zip,而後解壓到某個文件夾就能夠了。在這裏我解壓到D:\apache-apollo-1.7.1。解壓開的路徑以下:tcp

 

    

 3.建立屬於本身的apollo域

  下載下來的是官方的apollo,咱們須要本身生成本身的apollo,這樣作的好處就是咱們能夠根據本身的需求修改一些配置文件,建立過程以下: 

  1 建立一個D:\myApollo文件夾。

  2 進入命令輸入模式,進入到剛建立的文件下下:cd D:\myApollo

  3 由於接到的目錄關係因此可能有些改變,目錄爲(解壓路徑\bin\apollo create myapollo) 例子:D:\apache-apollo-1.7.1\bin\apollo create myapollo

  4 建立成功後,在D:\myApollo會有一個myapollo子文件夾,裏面內容以下:

   

  其中 bin爲啓動目錄 etc爲配置目錄。

  4.啓動myapollo

  1 cd D:\myApollo\myapollo\bin  (命名模式下進入到本身生成的apollo下的bin目錄)

  2 apollo-broker run  (輸入啓動命令)

  輸入後效果以下:

  

  這裏咱們能夠看到端口配置信息

 5.訪問控制檯 

  在瀏覽器輸入http://127.0.0.1:61680/,就是上面黑窗口最後一行,打開以下頁面

  

  而後輸入默認用戶名/密碼:(admin/password),用戶名密碼能夠在etc/users.properties中找到,點擊登錄,而後進入控制頁面,能夠看到myapollo,固然目前裏面是沒有鏈接,沒有消息的,由於咱們尚未創建鏈接,發送消息。

  

  至此,咱們的apollo中間件就能夠正常使用了。

  6.修改配置

  在咱們的配置目錄下(D:\myApollo\myapollo\etc)

  

  apollo.xml 爲網絡配置信息,其餘的不用管最主要的

    <connector id="tcp" bind="tcp://0.0.0.0:61613" connection_limit="2000"/>   connection_limit鏈接限制條數2000,就是說超過2000就GG了。可不能夠修改等鏈接到了2000條的時候更改試試。

  groups.properties 用於增長用戶

    本來爲: admins=admin

    增長test用戶: admins=admin|test(中間用|分開)

  users.properties  用於設置用戶的帳號密碼

    用戶名=密碼

    本來爲: admin=password

    增長test用戶:  admin=password

            test=test         (下面新增一列,此處與groups.properties文件對應)

 

 7.鏈接程序:

  依賴包:mqtt-client-0.4.0.jar

  包下載地址:https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/

  代碼示例下載:http://pan.baidu.com/s/1kUmsVrT   (gbk編碼)

    在線代碼解析:

 

public class MyMqtt{
	/**鏈接訪問者id,不能重複*/
	private String clientId;
	/**默認鏈接路徑,服務器所在ip*/
	private String host="tcp://192.168.1.130:61613";
	/**MQTT訪問默認用戶名*/
	private String userName = "admin";
	/**MQTT訪問默密碼*/
	private String passWord = "password";
	/**發送的訂閱號集合*/
	private String[] publishTopics={"test"};
	/**訂閱者的訂閱號集合*/
	private String[] subTopics={"test"};
	/**消息發送時的消息模式集合*/
	private int[] publishQos={2};
	/**消息訂閱式的消息模式集合*/
	private int[] subQos={2};
	/**單次MQTT鏈接*/
	private MqttClient client;
	/**鏈接時的一些額外設置*/
	private MqttConnectOptions options;
	/**發送消息是的個體topic*/
	private MqttTopic topic;
	/**消息傳遞hander*/
	private Handler mHandler;
	
	
	public MyMqtt(Context context,final Handler handler) {
		clientId=Tool.getIdentifyNumber(context);
		if(client==null || !client.isConnected()){
			try {
				//設置鏈接指定的額ip,鏈接人
				client = new MqttClient(host, clientId, new MemoryPersistence());
				//開始設置鏈接時的參數
				options = new MqttConnectOptions();
				//設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接
				options.setCleanSession(true);
				//設置鏈接用戶名
				options.setUserName(userName);
				//設置鏈接密碼
				options.setPassword(passWord.toCharArray());
				//設置超時鏈接
				options.setConnectionTimeout(10);
				//設置心跳間隔
				options.setKeepAliveInterval(20);
				//設置當鏈接斷開時發送的死亡預告,當這句被接受到時 證實本鏈接斷開
//				options.setWill(publishTopic, (clientId+"destroy").getBytes(), 0, true);
				//鏈接回調函數
				client.setCallback(new MqttCallback() {			
					@Override
					public void messageArrived(String topicName, MqttMessage message) throws Exception {
						// TODO Auto-generated method stub
						Message msg = new Message();
						msg.what=1;
						msg.obj=message.toString();
						handler.sendMessage(msg);
					}
					@Override
					public void deliveryComplete(IMqttDeliveryToken token) {
						// TODO Auto-generated method stub
						
					}
					
					@Override
					public void connectionLost(Throwable cause) {
						// TODO Auto-generated method stub
						
					}
				});
				client.connect(options);
				client.subscribe(subTopics, subQos);
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 發送消息
	 * @param msg
	 */
	public void sendMessage(String msg){
		if(client!=null && client.isConnected()){
			System.out.println("MQTT發送消息了呀 "+msg);
			try {
				MqttMessage message=new MqttMessage();
				//設置消息傳輸的類型 0,1,2可選
				message.setQos(2);
				//設置是否在服務器中保存消息體
				message.setRetained(false);
				//設置消息的內容
				message.setPayload(msg.getBytes());
				//循環發送,由於一次只能一個
				for (String publishTopic: publishTopics) {
					topic=client.getTopic(publishTopic);
					MqttDeliveryToken token = topic.publish(message);
					token.waitForCompletion();
				}
			} catch (MqttPersistenceException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (NullPointerException e) {
				// TODO: handle exception
				e.printStackTrace();
			}
		}
	}

  8. API查看地址
  http://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/package-summary.html

相關文章
相關標籤/搜索