EMQX docker安裝及運行

MQTT學習(六)--各種MQTT代理服務器特性對比(譯文)_lordwish的專欄-CSDN博客https://blog.csdn.net/lordwish/article/details/85061687html


5G 時代,萬物互聯消息引擎 | 全球領先的開源 MQTT 消息服務器 | EMQhttps://www.emqx.io/cn/python

程序安裝 (Installation) — EMQ X - 百萬級開源 MQTT 消息服務器 3.2.0 文檔https://docs.emqx.io/broker/v3/cn/install.htmlgit

使用指南 · EMQ X Tutorialhttps://docs.emqx.io/tutorial/v3/cn/github

Installing via EMQ X Docker Imagegolang

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx

訪問EMQT web管理頁面

http://ip:18083 默認帳號:admin 密碼:publicweb

Python使用mqtt極簡例子

https://www.jianshu.com/p/0ed4e59b1e8fredis

推數據 pub.pydocker

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = 'bigc'
# @Time   : 2019/12/27 19:31
# @Email   : luocs1@lenovo.com
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
   print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
   print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect('10.110.149.132', 1883, 600) # 600爲keepalive的時間間隔
client.publish('fifa', payload='amazing', qos=0)

阻塞接收數據sub.py數據庫

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = 'bigc'
# @Time   : 2019/12/27 19:32
# @Email   : luocs1@lenovo.com
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
   print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
   print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# client.on_disconnect = on_disconnect
client.connect('10.110.149.132', 1883, 600) # 600爲keepalive的時間間隔
client.subscribe('fifa', qos=0)
client.loop_forever() # 保持鏈接

先運行sub.py  在運行pub.py  就能收到數據json

EMQX的我的空間 - OSCHINAhttps://my.oschina.net/u/4174826

kerl build 22.0 erlang22.0

EMQX 規則引擎 簡單實現將8266消息保存到MySQL數據庫以及更新數據_weixin_44821644的博客-CSDN博客https://blog.csdn.net/weixin_44821644/article/details/100593769



方法一
yum -y install make
yum -y install gcc
yum -y install gcc-c++
yum -y install kernel-devel
yum -y install m4
yum -y install ncurses-devel
yum -y install openssl-devel
yum -y install rsync
yum -y install perl-net-snmp
yum -y install wx
yum -y install fop
yum -y install net-snmp
yum -y install unixODBC
yum -y install unixODBC-devel
yum -y install lrzsz
yum -y install bc
yum -y install sysstat
yum -y install lsof
yum -y install wget
yum -y install xz  
yum -y install automake
yum -y install autoconf
安裝Erlang報錯

https://www.cnblogs.com/tu6ge/p/5673320.html
方法二
安裝Erlang/OTP的簡單方法 - 簡書
https://www.jianshu.com/p/caddaa8251af

curl -O https://raw.githubusercontent.com/yrashk/kerl/master/kerl
chmod a+x kerl


kerl build 22.0 erlang22.0
kerl install erlang22.0 /home/Erlang22_0
. /home/home/Erlang22_0/activate
ln -s  /home/Erlang22_0/bin/erl /usr/local/bin/

EMQ擴展插件-emq_plugin_kafka - 簡書https://www.jianshu.com/p/1e3bfd383280


EMQ集成Kafka插件編寫過程 emq_plugin_kafka_caijiapeng0102的博客-CSDN博客https://blog.csdn.net/caijiapeng0102/article/details/80852933

【跟我一塊兒搭建物聯網平臺】六、EMQX之Kafka插件編譯安裝http://www.piaoyi.org/iot/635.html

BerkOzdilek/emq_kafka-bridge_docker: emqtt with redis auth plugin, kafka bridgehttps://github.com/BerkOzdilek/emq_kafka-bridge_docker

物聯網架構成長之路(8)-EMQ-Hook瞭解、鏈接Kafka發送消息_weixin_34112030的博客-CSDN博客https://blog.csdn.net/weixin_34112030/article/details/85963541


python 訂閱消息通知

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = 'bigc'
# @Time   : 2019/12/31 13:49
# @Email   : luocs1@lenovo.com
# -*- coding: utf-8 -*-
# 如下代碼在2019年2月28日 python3.6環境下運行經過
import paho.mqtt.client as mqtt
import json
import time

HOST = "10.110.149.130"
PORT = 1883
client_id = "1083421130"                       # 沒有就不寫,此處部份內容用xxx代替原內容,下同

def on_connect(client, userdata, flags, rc):
   print("Connected with result code "+str(rc))
   client.subscribe("data/receive")         # 訂閱消息


def on_message(client, userdata, msg):
   print("主題:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))


def on_subscribe(client, userdata, mid, granted_qos):
   print("On Subscribed: qos = %d" % granted_qos)


def on_disconnect(client, userdata, rc):
   if rc != 0:
       print("Unexpected disconnection %s" % rc)

data = {
   "type":2,
   "timestamp": time.time(),
   "messageId":"9fcda359-89f5-4933-xxxx",
   "command":"130/recommend",
   "data":{
       "openId":"130",
       "appId":130,
       "recommendType":"temRecommend"
  }
}
param = json.dumps(data)
client = mqtt.Client(client_id)
client.username_pw_set("xxxxxx", "xxxxxx")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
client.connect(HOST, PORT, 60)
client.publish("data/send", payload=param, qos=0)     # 發送消息
client.loop_forever()

go編寫mqtt 鏈接和發佈消息示例

https://www.jianshu.com/p/0ad7582baa93


package main

import (
"flag"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"sync"
"time"
)

/***
*
* 鏈接任務和發佈消息方法
*/
func mqttConnPubMsgTask(taskId int, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
//設置鏈接參數
clinetOptions := mqtt.NewClientOptions().AddBroker("tcp://10.110.149.130:1883").SetUsername("admin").SetPassword("public")
//設置客戶端ID
clinetOptions.SetClientID(fmt.Sprintf("go Publish client example:%d-%d", taskId, time.Now().Unix()))
//設置handler
clinetOptions.SetDefaultPublishHandler(messagePubHandler)
//設置鏈接超時
clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
//建立客戶端鏈接
client := mqtt.NewClient(clinetOptions)

//客戶端鏈接判斷
if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
failNums++
fmt.Printf("[Pub] mqtt connect error, taskId: %d, fail_nums: %d, error: %s \n", taskId, failNums, token.Error())
return
}

i := 0

for {
i++
time.Sleep(time.Duration(3) * time.Second)
text := fmt.Sprintf("this is test msg #%d ! from task :%d", i, taskId)
//fmt.Printf("start publish msg to mqtt broker, taskId: %d, count: %d \n", taskId, i)
//發佈消息
token := client.Publish("go-test-topic", 1, false, text)
fmt.Printf("[Pub] end publish msg to mqtt broker, taskId: %d, count: %d, token : %s \n", taskId, i, token)
token.Wait()
}

client.Disconnect(250)
fmt.Println("[Pub] task is ok")

}

/***
*
* 鏈接任務和消息訂閱方法
*/
func mqttConnSubMsgTask(taskId int, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
//設置鏈接參數
clinetOptions := mqtt.NewClientOptions().AddBroker("tcp://10.110.149.130:1883").SetUsername("admin").SetPassword("public")
//設置客戶端ID
clinetOptions.SetClientID(fmt.Sprintf("go Subscribe client example:%d-%d", taskId, time.Now().Unix()))
//設置鏈接超時
clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
//建立客戶端鏈接
client := mqtt.NewClient(clinetOptions)

//客戶端鏈接判斷
if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
failNums++
fmt.Printf("[Sub] mqtt connect error, taskId: %d, fail_nums: %d, error: %s \n", taskId, failNums, token.Error())
return
}

i := 0

for {
i++
time.Sleep(time.Duration(3) * time.Second)
//fmt.Printf("start publish msg to mqtt broker, taskId: %d, count: %d \n", taskId, i)
//發佈消息
token := client.Subscribe("go-test-topic", 1, messageSubHandler)
fmt.Printf("[Sub] end Subscribe msg to mqtt broker, taskId: %d, count: %d, token : %s \n", taskId, i, token)
token.Wait()
}

client.Disconnect(250)
fmt.Println("[Sub] task is ok")

}

//建立全局mqtt publish消息處理 handler
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Pub Client Topic : %s \n", msg.Topic())
fmt.Printf("Pub Client msg : %s \n", msg.Payload())
}

//建立全局mqtt sub消息處理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Sub Client Topic : %s \n", msg.Topic())
fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}

//鏈接失敗數
var failNums = 0

/***
* 建立客戶端鏈接
*/
func main() {
clientNum := flag.Uint64("clientNum", 30000, "client nums")
flag.Parse()
nums := int(*clientNum)
waitGroup := sync.WaitGroup{}

for i := 0; i < nums; i++ {
fmt.Printf("publish client num : %s \n", i)
waitGroup.Add(1)
time.Sleep(3 * time.Millisecond)
//調用鏈接和發佈消息
go mqttConnPubMsgTask(i, &waitGroup)
//訂閱
go mqttConnSubMsgTask(i, &waitGroup)
}

waitGroup.Wait()
}


本文分享自微信公衆號 - IT民工技術之路(python_er)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索