MQTT學習(六)--各種MQTT代理服務器特性對比(譯文)_lordwish的專欄-CSDN博客https://blog.csdn.net/lordwish/article/details/85061687html
5G 時代,萬物互聯消息引擎 | 全球領先的開源 MQTT 消息服務器 | EMQhttps://www.emqx.io/cn/python
程序安裝 (Installation) — EMQ X - 百萬級開源 MQTT 消息服務器 3.2.0 文檔https://docs.emqx.io/broker/v3/cn/install.htmlgit
使用指南 · EMQ X Tutorialhttps://docs.emqx.io/tutorial/v3/cn/github
Installing via EMQ X Docker Imagegolang
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
訪問EMQT web管理頁面:
http://ip:18083 默認帳號:admin 密碼:publicweb
Python使用mqtt極簡例子
https://www.jianshu.com/p/0ed4e59b1e8fredis
推數據 pub.pydocker
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = 'bigc'
# @Time : 2019/12/27 19:31
# @Email : luocs1@lenovo.com
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect('10.110.149.132', 1883, 600) # 600爲keepalive的時間間隔
client.publish('fifa', payload='amazing', qos=0)
阻塞接收數據sub.py數據庫
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = 'bigc'
# @Time : 2019/12/27 19:32
# @Email : luocs1@lenovo.com
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# client.on_disconnect = on_disconnect
client.connect('10.110.149.132', 1883, 600) # 600爲keepalive的時間間隔
client.subscribe('fifa', qos=0)
client.loop_forever() # 保持鏈接
先運行sub.py 在運行pub.py 就能收到數據json
EMQX的我的空間 - OSCHINAhttps://my.oschina.net/u/4174826
kerl build 22.0 erlang22.0
EMQX 規則引擎 簡單實現將8266消息保存到MySQL數據庫以及更新數據_weixin_44821644的博客-CSDN博客https://blog.csdn.net/weixin_44821644/article/details/100593769
方法一
yum -y install make
yum -y install gcc
yum -y install gcc-c++
yum -y install kernel-devel
yum -y install m4
yum -y install ncurses-devel
yum -y install openssl-devel
yum -y install rsync
yum -y install perl-net-snmp
yum -y install wx
yum -y install fop
yum -y install net-snmp
yum -y install unixODBC
yum -y install unixODBC-devel
yum -y install lrzsz
yum -y install bc
yum -y install sysstat
yum -y install lsof
yum -y install wget
yum -y install xz
yum -y install automake
yum -y install autoconf
安裝Erlang報錯
https://www.cnblogs.com/tu6ge/p/5673320.html
方法二
安裝Erlang/OTP的簡單方法 - 簡書
https://www.jianshu.com/p/caddaa8251af
curl -O https://raw.githubusercontent.com/yrashk/kerl/master/kerl
chmod a+x kerl
kerl build 22.0 erlang22.0
kerl install erlang22.0 /home/Erlang22_0
. /home/home/Erlang22_0/activate
ln -s /home/Erlang22_0/bin/erl /usr/local/bin/
EMQ擴展插件-emq_plugin_kafka - 簡書https://www.jianshu.com/p/1e3bfd383280
EMQ集成Kafka插件編寫過程 emq_plugin_kafka_caijiapeng0102的博客-CSDN博客https://blog.csdn.net/caijiapeng0102/article/details/80852933
【跟我一塊兒搭建物聯網平臺】六、EMQX之Kafka插件編譯安裝http://www.piaoyi.org/iot/635.html
BerkOzdilek/emq_kafka-bridge_docker: emqtt with redis auth plugin, kafka bridgehttps://github.com/BerkOzdilek/emq_kafka-bridge_docker
物聯網架構成長之路(8)-EMQ-Hook瞭解、鏈接Kafka發送消息_weixin_34112030的博客-CSDN博客https://blog.csdn.net/weixin_34112030/article/details/85963541
python 訂閱消息通知
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = 'bigc'
# @Time : 2019/12/31 13:49
# @Email : luocs1@lenovo.com
# -*- coding: utf-8 -*-
# 如下代碼在2019年2月28日 python3.6環境下運行經過
import paho.mqtt.client as mqtt
import json
import time
HOST = "10.110.149.130"
PORT = 1883
client_id = "1083421130" # 沒有就不寫,此處部份內容用xxx代替原內容,下同
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("data/receive") # 訂閱消息
def on_message(client, userdata, msg):
print("主題:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))
def on_subscribe(client, userdata, mid, granted_qos):
print("On Subscribed: qos = %d" % granted_qos)
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection %s" % rc)
data = {
"type":2,
"timestamp": time.time(),
"messageId":"9fcda359-89f5-4933-xxxx",
"command":"130/recommend",
"data":{
"openId":"130",
"appId":130,
"recommendType":"temRecommend"
}
}
param = json.dumps(data)
client = mqtt.Client(client_id)
client.username_pw_set("xxxxxx", "xxxxxx")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
client.connect(HOST, PORT, 60)
client.publish("data/send", payload=param, qos=0) # 發送消息
client.loop_forever()
go編寫mqtt 鏈接和發佈消息示例
https://www.jianshu.com/p/0ad7582baa93
package main
import (
"flag"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"sync"
"time"
)
/***
*
* 鏈接任務和發佈消息方法
*/
func mqttConnPubMsgTask(taskId int, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
//設置鏈接參數
clinetOptions := mqtt.NewClientOptions().AddBroker("tcp://10.110.149.130:1883").SetUsername("admin").SetPassword("public")
//設置客戶端ID
clinetOptions.SetClientID(fmt.Sprintf("go Publish client example:%d-%d", taskId, time.Now().Unix()))
//設置handler
clinetOptions.SetDefaultPublishHandler(messagePubHandler)
//設置鏈接超時
clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
//建立客戶端鏈接
client := mqtt.NewClient(clinetOptions)
//客戶端鏈接判斷
if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
failNums++
fmt.Printf("[Pub] mqtt connect error, taskId: %d, fail_nums: %d, error: %s \n", taskId, failNums, token.Error())
return
}
i := 0
for {
i++
time.Sleep(time.Duration(3) * time.Second)
text := fmt.Sprintf("this is test msg #%d ! from task :%d", i, taskId)
//fmt.Printf("start publish msg to mqtt broker, taskId: %d, count: %d \n", taskId, i)
//發佈消息
token := client.Publish("go-test-topic", 1, false, text)
fmt.Printf("[Pub] end publish msg to mqtt broker, taskId: %d, count: %d, token : %s \n", taskId, i, token)
token.Wait()
}
client.Disconnect(250)
fmt.Println("[Pub] task is ok")
}
/***
*
* 鏈接任務和消息訂閱方法
*/
func mqttConnSubMsgTask(taskId int, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
//設置鏈接參數
clinetOptions := mqtt.NewClientOptions().AddBroker("tcp://10.110.149.130:1883").SetUsername("admin").SetPassword("public")
//設置客戶端ID
clinetOptions.SetClientID(fmt.Sprintf("go Subscribe client example:%d-%d", taskId, time.Now().Unix()))
//設置鏈接超時
clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
//建立客戶端鏈接
client := mqtt.NewClient(clinetOptions)
//客戶端鏈接判斷
if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
failNums++
fmt.Printf("[Sub] mqtt connect error, taskId: %d, fail_nums: %d, error: %s \n", taskId, failNums, token.Error())
return
}
i := 0
for {
i++
time.Sleep(time.Duration(3) * time.Second)
//fmt.Printf("start publish msg to mqtt broker, taskId: %d, count: %d \n", taskId, i)
//發佈消息
token := client.Subscribe("go-test-topic", 1, messageSubHandler)
fmt.Printf("[Sub] end Subscribe msg to mqtt broker, taskId: %d, count: %d, token : %s \n", taskId, i, token)
token.Wait()
}
client.Disconnect(250)
fmt.Println("[Sub] task is ok")
}
//建立全局mqtt publish消息處理 handler
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Pub Client Topic : %s \n", msg.Topic())
fmt.Printf("Pub Client msg : %s \n", msg.Payload())
}
//建立全局mqtt sub消息處理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Sub Client Topic : %s \n", msg.Topic())
fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}
//鏈接失敗數
var failNums = 0
/***
* 建立客戶端鏈接
*/
func main() {
clientNum := flag.Uint64("clientNum", 30000, "client nums")
flag.Parse()
nums := int(*clientNum)
waitGroup := sync.WaitGroup{}
for i := 0; i < nums; i++ {
fmt.Printf("publish client num : %s \n", i)
waitGroup.Add(1)
time.Sleep(3 * time.Millisecond)
//調用鏈接和發佈消息
go mqttConnPubMsgTask(i, &waitGroup)
//訂閱
go mqttConnSubMsgTask(i, &waitGroup)
}
waitGroup.Wait()
}
本文分享自微信公衆號 - IT民工技術之路(python_er)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。