EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 平臺開發的開源物聯網 MQTT 消息服務器。Erlang/OTP 是出色的軟實時(Soft-Realtime)、低延時(Low-Latency)、分佈式(Distributed) 的語言平臺。MQTT 是輕量的(Lightweight)、發佈訂閱模式(PubSub) 的物聯網消息協議。git
EMQ 項目設計目標是承載移動終端或物聯網終端海量 MQTT 鏈接,並實如今海量物聯網設備間快速低延時消息路由:github
如下介紹其中的兩種 MQTTKit 和 MQTT-Client-Framework 這兩種都是OC 使用
Swift 版本可參考 CocoaMQTT一、 ** MQTTKit **
已經不更新 可是基本使用沒問題web
pod 'MQTTKit'數據庫
頭文件json
#import <MQTTKit.h> #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTClient *client;後端
WEAKSELF
NSString *clientID = @"測試client - 必須是全局惟一的id ";
MQTTClient *client = [[MQTTClient alloc] initWithClientId:StrFormat(@"%@", clientID)];
client.username = @"username";
client.password = @"password";
client.cleanSession = false;
client.keepAlive = 20;
client.port = 11883;// 端口號 根據服務端 選擇
self.client = client;
// 連接MQTT
[client connectToHost:@"連接的MQTT的URL" completionHandler:^(MQTTConnectionReturnCode code) {
if (code == ConnectionAccepted) {
NSLog(@"連接MQTT 成功 😁😁😁");
// 連接成功 訂閱相對應的主題
[weakSelf.client subscribe:@"你須要訂閱的主題" withQos:AtLeastOnce completionHandler:^(NSArray *grantedQos) {
DLog(@"訂閱 返回 %@",grantedQos);
}];
}else if (code == ConnectionRefusedBadUserNameOrPassword){
NSLog(@"MQTT 帳號或驗證碼錯誤");
} else if (code == ConnectionRefusedUnacceptableProtocolVersion){
NSLog(@"MQTT 不可接受的協議");
}else if (code == ConnectionRefusedIdentiferRejected){
NSLog(@"MQTT不承認");
}else if (code == ConnectionRefusedServerUnavailable){
NSLog(@"MQTT拒絕連接");
}else {
NSLog(@"MQTT 未受權");
}
}];
// 接收消息體
client.messageHandler = ^(MQTTMessage *message) {
NSString *jsonStr = [[NSString alloc] initWithData:message.payload encoding:NSUTF8StringEncoding];
NSLog(@"EasyMqttService mqtt connect success %@",jsonStr);
};
複製代碼
// 方法 封裝 可外部調用
-(void)subscribeType:(NSString *)example{
// 訂閱主題
[self.client subscribe:@"你須要訂閱的主題" withQos:AtMostOnce completionHandler:^(NSArray *grantedQos) {
NSLog(@"訂閱 返回 %@",grantedQos);
}];
}
複製代碼
-(void)closeMQTTClient{
WEAKSELF
[self.client disconnectWithCompletionHandler:^(NSUInteger code) {
// The client is disconnected when this completion handler is called
NSLog(@"MQTT client is disconnected");
[weakSelf.client unsubscribe:@"已經訂閱的主題" withCompletionHandler:^{
NSLog(@"取消訂閱");
}];
}];
}
複製代碼
[self.client publishString:postMsg toTopic:@"發送消息的主題 根據服務端定" withQos:AtLeastOnce retain:NO completionHandler:^(int mid) {
if (cmd != METHOD_SOCKET_CHAT_TO) {
NSLog(@"發送消息 返回 %d",mid);
}
}];
複製代碼
二、 ** MQTTClient ** MQTTClient 配置更多 是可持續更新,可配置 SSL
bash
基本使用
pod 'MQTTClient'websocket
方式鏈接 pod 'MQTTClient/MinL' pod 'MQTTClient/ManagerL' pod 'MQTTClient/WebsocketL'服務器
頭文件websocket
基本使用
#import <MQTTClient.h>websocket
須要添加的頭文件 #import <MQTTWebsocketTransport.h> #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTSession *mySession;須要添加協議頭
<MQTTSessionDelegate,MQTTSessionManagerDelegate>session
基本使用
#import "MQTTClient.h"
\@interface MyDelegate : ... <MQTTSessionDelegate>
...
MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
transport.host = @"localhost";
transport.port = 1883;
MQTTSession *session = [[MQTTSession alloc] init];
session.transport = transport;
session.delegate = self;
[session connectAndWaitTimeout:30]; //this is part of the synchronous API
複製代碼
websocket 鏈接
WEAKSELF
NSString *clientID = @"測試client - 必須是全局惟一的id ";
MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init];
transport.host = @"鏈接MQTT 地址";
transport.port = 8083; // 端口號
transport.tls = YES; // 根據須要配置 YES 開起 SSL 驗證 此處爲單向驗證 雙向驗證 根據SDK 提供方法直接添加
MQTTSession *session = [[MQTTSession alloc] init];
NSString *linkUserName = @"username";
NSString *linkPassWord = @"password";
[session setUserName:linkUserName];
[session setClientId:clientID];
[session setPassword:linkPassWord];
[session setKeepAliveInterval:5];
session.transport = transport;
session.delegate = self;
self.mySession = session;
[self reconnect];
[self.mySession addObserver:self forKeyPath:@"state" options:NSKeyValueObservingOptionNew|NSKeyValueObservingOptionOld context:nil]; //添加事件監聽
複製代碼
websocket
監聽 響應事件- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
switch (self.mySession.status) {
case MQTTSessionManagerStateClosed:
NSLog(@"鏈接已經關閉");
break;
case MQTTSessionManagerStateClosing:
NSLog(@"鏈接正在關閉");
break;
case MQTTSessionManagerStateConnected:
NSLog(@"已經鏈接");
break;
case MQTTSessionManagerStateConnecting:
NSLog(@"正在鏈接中");
break;
case MQTTSessionManagerStateError: {
// NSString *errorCode = self.mySession.lastErrorCode.localizedDescription;
NSString *errorCode = self.mySession.description;
NSLog(@"鏈接異常 ----- %@",errorCode);
}
break;
case MQTTSessionManagerStateStarting:
NSLog(@"開始鏈接");
break;
default:
break;
}
}
複製代碼
session Delegate 協議
鏈接 返回狀態
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{
if (eventCode == MQTTSessionEventConnected) {
NSLog(@"2222222 連接MQTT 成功");
}else if (eventCode == MQTTSessionEventConnectionRefused) {
NSLog(@"MQTT拒絕連接");
}else if (eventCode == MQTTSessionEventConnectionClosed){
NSLog(@"MQTT連接關閉");
}else if (eventCode == MQTTSessionEventConnectionError){
NSLog(@"MQTT 連接錯誤");
}else if (eventCode == MQTTSessionEventProtocolError){
NSLog(@"MQTT 不可接受的協議");
}else{//MQTTSessionEventConnectionClosedByBroker
NSLog(@"MQTT連接 其餘錯誤");
}
if (error) {
NSLog(@"連接報錯 -- %@",error);
}
}
複製代碼
收到發送的消息
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid
{
NSDictionary *dic = [NSJSONSerialization JSONObjectWithData:data options:NSJSONReadingMutableContainers error:nil];
NSLog(@"EasyMqttService mqtt connect success %@",dic);
// 作相對應的操做
}
複製代碼
基本使用
// 方法 封裝 可外部調用
[session subscribeToTopic:@"example/#" atLevel:2 subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss){
if (error) {
NSLog(@"Subscription failed %@", error.localizedDescription);
} else {
NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss);
}
}]; // this is part of the block API
複製代碼
websocket
- (void)subscibeToTopicAction {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_async(dispatch_get_main_queue(), ^{
[self subscibeToTopic:@"你要訂閱的主題"];
});
});
}
-(void)subscibeToTopic:(NSString *)topicUrl
{
// self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelAtMostOnce] forKey:topicUrl];
[self.mySession subscribeToTopic:topicUrl atLevel:MQTTQosLevelAtMostOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
if (error) {
NSLog(@"訂閱 %@ 失敗 緣由 %@",topicUrl,error);
}else
{
NSLog(@"訂閱 %@ 成功 g1oss %@",topicUrl,gQoss);
dispatch_async(dispatch_get_main_queue(), ^{
// 操做
});
};
}];
}
複製代碼
-(void)closeMQTTClient{
[self.mySession disconnect];
[self.mySession unsubscribeTopics:@[@"已經訂閱的主題"] unsubscribeHandler:^(NSError *error) {
if (error) {
DLog(@"取消訂閱失敗");
}else{
DLog(@"取消訂閱成功");
}
}];
}
複製代碼
[self.mySession publishData:jsonData onTopic:@"發送消息的主題 服務端定義" retain:NO qos:MQTTQosLevelAtMostOnce publishHandler:^(NSError *error) {
if (error) {
NSLog(@"發送失敗 - %@",error);
}else{
NSLog(@"發送成功");
}
}];
複製代碼