Rust 是由 Mozilla 主導開發的通用、編譯型編程語言。該語言的設計準則爲:安全、併發、實用,支持 函數式、併發式、過程式以及面向對象的編程風格。Rust 速度驚人且內存利用率極高。因爲沒有運行時和垃圾回收,它可以勝任對性能要求特別高的服務,能夠在嵌入式設備上運行,還能輕鬆和其餘語言集成。Rust 豐富的類型系統和全部權模型保證了內存安全和線程安全,讓您在編譯期就可以消除各類各樣的錯誤。git
MQTT 是一種基於發佈/訂閱模式的 輕量級物聯網消息傳輸協議 ,能夠用極少的代碼和帶寬爲聯網設備提供實時可靠的消息服務,它普遍應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等行業。github
本文主要介紹如何在 Rust 項目中使用 paho-mqtt 客戶端庫 ,實現客戶端與 MQTT 服務器的鏈接、訂閱、取消訂閱、收發消息等功能。編程
項目初始化
本項目使用 Rust 1.44.0 進行開發測試,並使用 Cargo 1.44.0 包管理工具進行項目管理,讀者可用以下命令查看當前的 Rust 版本。安全
~ rustc --version rustc 1.44.0 (49cae5576 2020-06-01)
選擇 MQTT 客戶端庫
paho-mqtt 是目前 Rust 中,功能完善且使用較多的 MQTT 客戶端,最新的 0.7.1
版本支持 MQTT v五、3.1.一、3.1,支持經過標準 TCP、SSL / TLS、WebSockets 傳輸數據,QoS 支持 0、一、2 等。bash
初始化項目
執行如下命令建立名爲 mqtt-example
的 Rust 新項目。服務器
~ cargo new mqtt-example Created binary (application) `mqtt-example` package
編輯項目中的 Cargo.toml
文件,在 dependencies
中添加 paho-mqtt
庫的地址,以及指定訂閱、發佈代碼文件對應的二進制文件。session
[dependencies] paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" } [[bin]] name = "sub" path = "src/sub/main.rs" [[bin]] name = "pub" path = "src/pub/main.rs"
Rust MQTT 的使用
建立客戶端鏈接
本文將使用 EMQ X 提供的 免費公共 MQTT 服務器 做爲測試鏈接的 MQTT 服務器,該服務基於 EMQ X 的 MQTT 物聯網雲平臺 建立。服務器接入信息以下:併發
- Broker: broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
配置 MQTT Broker 鏈接參數
配置 MQTT Broker 鏈接地址(包括端口)、topic (這裏咱們配置了兩個 topic ),以及客戶端 id。app
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_publish"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
編寫 MQTT 鏈接代碼
編寫 MQTT 鏈接代碼,爲了提高使用體驗,可在執行二進制文件時經過命令行參數的形式傳入鏈接地址。一般咱們須要先建立一個客戶端,而後將該客戶端鏈接到 broker.emqx.io
。eclipse
let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() ); // Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize(); // Create a client. let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); }); // Define the set of options for the connection. let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(true) .finalize(); // Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); }
發佈消息
這裏咱們總共發佈五條消息,根據循環的奇偶性,分別向 rust/mqtt
、 rust/test
這兩個主題發佈。
for num in 0..5 { let content = "Hello world! ".to_string() + &num.to_string(); let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS); if num % 2 == 0 { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]); msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS); } else { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]); } let tok = cli.publish(msg); if let Err(e) = tok { println!("Error sending message: {:?}", e); break; } }
訂閱消息
在客戶端鏈接以前,須要先初始化消費者。這裏咱們會循環處理消費者中的消息隊列,並打印出訂閱的 topic 名稱及接收到的消息內容。
fn subscribe_topics(cli: &mqtt::Client) { if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) { println!("Error subscribes topics: {:?}", e); process::exit(1); } } fn main() { ... // Initialize the consumer before connecting. let rx = cli.start_consuming(); ... // Subscribe topics. subscribe_topics(&cli); println!("Processing requests..."); for msg in rx.iter() { if let Some(msg) = msg { println!("{}", msg); } else if !cli.is_connected() { if try_reconnect(&cli) { println!("Resubscribe topics..."); subscribe_topics(&cli); } else { break; } } } ... }
完整代碼
消息發佈代碼
use std::{ env, process, time::Duration }; extern crate paho_mqtt as mqtt; const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_publish"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"]; // Define the qos. const QOS:i32 = 1; fn main() { let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() ); // Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize(); // Create a client. let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); }); // Define the set of options for the connection. let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(true) .finalize(); // Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); } // Create a message and publish it. // Publish message to 'test' and 'hello' topics. for num in 0..5 { let content = "Hello world! ".to_string() + &num.to_string(); let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS); if num % 2 == 0 { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]); msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS); } else { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]); } let tok = cli.publish(msg); if let Err(e) = tok { println!("Error sending message: {:?}", e); break; } } // Disconnect from the broker. let tok = cli.disconnect(None); println!("Disconnect from the broker"); tok.unwrap(); }
消息訂閱代碼
爲了提高使用體驗,消息訂閱作了斷開重連的處理,並在從新創建鏈接後對主題進行從新訂閱。
use std::{ env, process, thread, time::Duration }; extern crate paho_mqtt as mqtt; const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_subscribe"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"]; // The qos list that match topics above. const DFLT_QOS:&[i32] = &[0, 1]; // Reconnect to the broker when connection is lost. fn try_reconnect(cli: &mqtt::Client) -> bool { println!("Connection lost. Waiting to retry connection"); for _ in 0..12 { thread::sleep(Duration::from_millis(5000)); if cli.reconnect().is_ok() { println!("Successfully reconnected"); return true; } } println!("Unable to reconnect after several attempts."); false } // Subscribes to multiple topics. fn subscribe_topics(cli: &mqtt::Client) { if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) { println!("Error subscribes topics: {:?}", e); process::exit(1); } } fn main() { let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() ); // Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize(); // Create a client. let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); }); // Initialize the consumer before connecting. let rx = cli.start_consuming(); // Define the set of options for the connection. let lwt = mqtt::MessageBuilder::new() .topic("test") .payload("Consumer lost connection") .finalize(); let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(false) .will_message(lwt) .finalize(); // Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); } // Subscribe topics. subscribe_topics(&cli); println!("Processing requests..."); for msg in rx.iter() { if let Some(msg) = msg { println!("{}", msg); } else if !cli.is_connected() { if try_reconnect(&cli) { println!("Resubscribe topics..."); subscribe_topics(&cli); } else { break; } } } // If still connected, then disconnect now. if cli.is_connected() { println!("Disconnecting"); cli.unsubscribe_many(DFLT_TOPICS).unwrap(); cli.disconnect(None).unwrap(); } println!("Exiting"); }
運行與測試
編譯二進制文件
執行如下命令,會在 mqtt-example/target/debug
目錄下生成消息訂閱、發佈對應的 sub
、pub
二進制文件。
cargo build
執行 sub
二進制文件,等待消費發佈。
消息發佈
執行 pub
二進制文件,能夠看到分別往 rust/test
、rust/mqtt
這兩個主題發佈了消息。
同時在消息訂閱中可看到發佈的消息
至此,咱們完成了使用 paho-mqtt 客戶端鏈接到 公共 MQTT 服務器,並實現了測試客戶端與 MQTT 服務器的鏈接、消息發佈和訂閱。
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。