消息隊列之ZeroMQ(C++)

  ZMQ是什麼?
  這是個相似於Socket的一系列接口,他跟Socket的區別是:普通 的socket是端到端的(1:1的關係),而ZMQ倒是能夠N:M 的關係,人們對BSD套接字的瞭解較多的是點對點的鏈接,點對點鏈接須要顯式地創建鏈接、銷燬鏈接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏 蔽了這些細節,讓你的網絡編程更爲簡單。ZMQ用於node與node間的通訊,node能夠是主機或者是進程。
  引用官方的說法: 「ZMQ(如下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架同樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是 「成爲標準網絡協議棧的一部分,以後進入Linux內核」。如今還未看到它們的成功。可是,它無疑是極具前景的、而且是人們更加須要的「傳統」BSD套接 字之上的一 層封裝。ZMQ讓編寫高性能網絡應用程序極爲簡單和有趣。」
  以上拷貝至百度百科。
  對了使用ZMQ以前必需要有那麼幾樣東西libzmq.lib,zhelpers.hpp,zmq.h,zmq.hpp。這些均可以在ZMQ的官網下載。
 
  接下來是來講說ZMQ有模式,能夠概括成三種,請求迴應模式(1對N),發佈訂閱模式(單向1對N),還有 推拉模型。
1:請求迴應模式(Req-Rep)
        
能夠有多個client,這個很容易理解跟TCP很像,但服務器與客戶端必須是1問1答的形式。直接看源代碼。
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <windows.h>
#include<zhelpers.hpp>
using namespace std;


DWORD WINAPI MyThread_client(LPVOID lpParamter)
{
    zmq::context_t context (1);
    //創建套接字
    zmq::socket_t socket (context, ZMQ_REQ);

    std::cout << "Connecting to hello world server..." << std::endl;
    //鏈接服務器
    socket.connect ("tcp://localhost:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        s_send (socket, "hello");
        std::cout << "Client1 Received :" <<s_recv (socket)<< std::endl;

        Sleep(1000);
    }
    return 0;

}

DWORD WINAPI MyThread_client1(LPVOID lpParamter)
{
    zmq::context_t context (1);
    //創建套接字
    zmq::socket_t socket (context, ZMQ_REQ);

    std::cout << "Connecting to hello world server..." << std::endl;
    //鏈接服務器
    socket.connect ("tcp://localhost:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        s_send (socket, "SB");
        std::cout << "Client2 Received :" <<s_recv (socket)<< std::endl;

        Sleep(1000);
    }
    return 0;

}

DWORD WINAPI MyThread_servce(LPVOID lpParamter)
{
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    //綁定端口
    socket.bind ("tcp://*:5555");
 
    while (true) {
        std::cout << "Servce Received: "<<s_recv (socket)<< std::endl;
        s_send (socket, "world");
    }
}

int main () 
{

    HANDLE hThread1 = CreateThread(NULL, 0, MyThread_client, NULL, 0, NULL);
    HANDLE hThread2 = CreateThread(NULL, 0, MyThread_servce, NULL, 0, NULL);

    HANDLE hThread3 = CreateThread(NULL, 0, MyThread_client1, NULL, 0, NULL);

    while(1);
    return 0;
}

運行結果:node

  這裏我創建了兩個客戶端和一個服務器,每一個都獨立運行一個線程。客戶端1發了「hello」,客戶端2發了「SB」,服務器都能接收到而且返回了world。ios

2:發佈訂閱模式(PUB-SUB)git

所謂發佈訂閱,好比天氣預報,當不少人訂閱以後,中心服務器直接往訂閱的人發送就能夠了,不須要管對方有沒有收到。也就是1對N的模式。這裏還有重要的一個概念,頻道:跟收音機的頻道相似,訂閱者設定了頻道才能聽到該頻道的消息github

 

  

看例程序:編程

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <windows.h>
#include<zhelpers.hpp>
using namespace std;


//訂閱1
DWORD WINAPI MyThread_sub1(LPVOID lpParamter)
{
    zmq::context_t context(1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    //鏈接
    subscriber.connect("tcp://localhost:5563");
    //設置頻道B
    subscriber.setsockopt( ZMQ_SUBSCRIBE, "A", 1);
    while (1) {
 
        //  Read envelope with address
        std::string address = s_recv (subscriber);
        //  Read message contents
        std::string contents = s_recv (subscriber);
        
        std::cout << "訂閱1:[" << address << "] " << contents << std::endl;
    }
    return 0;

}
//訂閱2
DWORD WINAPI MyThread_sub2(LPVOID lpParamter)
{
    zmq::context_t context(1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    //鏈接
    subscriber.connect("tcp://localhost:5563");
    //設置頻道B
    subscriber.setsockopt( ZMQ_SUBSCRIBE, "B", 1);
    while (1) {
 
        //  Read envelope with address
        std::string address = s_recv (subscriber);
        //  Read message contents
        std::string contents = s_recv (subscriber);
        
        std::cout << "訂閱2:[" << address << "] " << contents << std::endl;
    }
    return 0;

}
//訂閱3
DWORD WINAPI MyThread_sub3(LPVOID lpParamter)
{
    zmq::context_t context(1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    //鏈接
    subscriber.connect("tcp://localhost:5563");
    //設置頻道B
   // subscriber.setsockopt( ZMQ_SUBSCRIBE, "B", 1);
    while (1) {
 
        //  Read envelope with address
        std::string address = s_recv (subscriber);
        //  Read message contents
        std::string contents = s_recv (subscriber);
        
        std::cout << "訂閱3:[" << address << "] " << contents << std::endl;
    }
    return 0;

}
//發佈線程
DWORD WINAPI MyThread_pub(LPVOID lpParamter)
{
    //  Prepare our context and publisher
    zmq::context_t context(1);
    zmq::socket_t publisher(context, ZMQ_PUB);
    publisher.bind("tcp://*:5563");

    while (1) {
        //  Write two messages, each with an envelope and content
        s_sendmore (publisher, "A");
        s_send (publisher, "We don't want to see this");

        Sleep (100);
        s_sendmore (publisher, "B");
        s_send (publisher, "We would like to see this");
        Sleep (100);

    }
}

int main () 
{
    HANDLE hThread1 = CreateThread(NULL, 0, MyThread_pub, NULL, 0, NULL);
    Sleep(1000);
    HANDLE hThread2 = CreateThread(NULL, 0, MyThread_sub1, NULL, 0, NULL);
    HANDLE hThread3 = CreateThread(NULL, 0, MyThread_sub2, NULL, 0, NULL);
    HANDLE hThread4 = CreateThread(NULL, 0, MyThread_sub3, NULL, 0, NULL);
    while(1);
    return 0;
}

結果:windows

 

例程中,設置了1個發佈端和3個訂閱端,訂閱端訂閱的頻道分別是是A,B和沒有訂閱,發佈端發佈了對應頻道的訂閱消息。由此訂閱3沒有設置頻道,因此收不到消息。
 
3:推拉模式,這個詞語感受污,仍是我想歪了。還沒研究過,不過看網上說的,跟發佈訂閱模式相似,只是能夠負載均衡。目前項目中也沒有用到,下次有機會再研究吧。
ZEROMQ官網的github上面有詳細的例程:https://github.com/imatix/zguide/tree/master/examples/
相關文章
相關標籤/搜索