利用redis製做消息隊列

redis在遊戲服務器中的使用初探(一) 環境搭建
redis在遊戲服務器中的使用初探(二) 客戶端開源庫選擇
redis在遊戲服務器中的使用初探(三) 信息存儲
redis在遊戲服務器中的使用初探(四) redis應用html

 

在學習分佈式對象存儲的期間,有這麼一個需求linux

"多個接口服務(本文看成客戶端Clinet)須要以固定間隔向全部的多個服務器發送心跳,保證服務器確認客戶端狀態。ios

服務器在接收到文件讀取請求時候,會廣播詢問全部數據服務器(本文也看成服務器)存儲的數據狀況"redis

以上一對多  的詢問,是須要消息隊列來進行通信的ubuntu

可是其實 redis也能夠做爲輕量級的消息隊列來完成這個需求。windows

結構圖

服務器開啓一個線程進行redis訂閱模式,當有人在指定頻道發佈消息時,全部訂閱該頻道的節點均可以接收到消息。服務器

可是訂閱操做若是咱們不想採起固定時間間隔去獲取頻道是否有消息這麼LOW的方案,實際上是須要作成異步模式的。異步

而windows下 hredis異步模式是須要libevent支持的。 二者都是linux下運行良好的開源庫,在windows下倒是問題多多。async

通過屢次嘗試,我決定放棄使用這兩個開源庫而選擇cpp-redis。(linux下使用hredis和libevent ,有時間會試試)分佈式

流程以下:

一個服務節點 須要開啓一個線程 進行客戶端消息隊列的訂閱,每當收到消息就會調用收到消息的回調函數

而最初開啓的服務節點的運行線程會定時的在服務器消息隊列發佈詢問數據存儲的信息。

 

客戶端節點則相反 開啓一個線程 定時向客戶端消息隊列發佈心跳信息。

最初開啓的客戶端節點進行服務器消息隊列的訂閱,若收到服務器的數據存儲詢問,則進行自己是否存儲該數據的判斷

 

因爲資源有限,最開始咱們開啓了5個線程 來模擬 2個服務器和 3個客戶端

代碼以下

  1 #include <iostream>
  2 #include <Winsock2.h>
  3 #include <thread>
  4 #include <mutex>
  5 
  6 #include "cpp_redis/cpp_redis"
  7 #include "tacopie/tacopie"
  8 
  9 using namespace std;
 10 
 11 const int serverThreadNum = 2;
 12 const int clientThreadNum = 3;
 13 const int heartBeatTime = 1;
 14 const int ServerQueryTime = 1;
 15 const std::string clientChanName = "ClientChan";
 16 const std::string serverChanName = "ServerChan";
 17 std::mutex g_mutex;
 18 
 19 class WinsockGuard {
 20 public:
 21     WinsockGuard() {
 22         WORD version = MAKEWORD(2, 2);
 23         if (WSAStartup(version, &data) != 0) {
 24             std::cerr << "WSAStartup() failure!" << std::endl;
 25             return;
 26         }
 27     }
 28 
 29     ~WinsockGuard() {
 30         WSACleanup();
 31     }
 32 private:
 33     WSADATA data;
 34 };
 35 
 36 bool SubcribCommFunc(int threadNum,bool isServer) {
 37     cpp_redis::subscriber sub;
 38 
 39     try {
 40         sub.connect("127.0.0.1", 6379, [](const std::string& host, std::size_t port, cpp_redis::subscriber::connect_state status) {
 41             if (status == cpp_redis::subscriber::connect_state::dropped) {
 42                 {std::lock_guard<std::mutex> l(g_mutex); std::cout << "client disconnected from " << host << ":" << port << std::endl; }
 43                 //should_exit.notify_all();
 44             }
 45         });
 46 
 47     }
 48     catch (std::exception& e) {
 49         {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; }
 50         return false;
 51     }
 52     std::string chanName;
 53     if (isServer) {chanName = clientChanName;}
 54     else {chanName = serverChanName;}
 55 
 56     sub.subscribe(chanName.c_str(), [threadNum, isServer](const std::string& chan, const std::string& msg) {
 57         string s;
 58         if (isServer)s = "server ";
 59         else s = "client ";
 60         s += to_string(threadNum);s += " recv ";
 61         {std::lock_guard<std::mutex> l(g_mutex); std::cout << s.c_str() << chan << ": " << msg << std::endl; }
 62         //todo Check heatbeat or response
 63     });
 64     sub.commit();
 65 
 66     while (1) {
 67         std::this_thread::sleep_for(std::chrono::seconds(50000));
 68     }
 69 
 70     return true;
 71 }
 72 
 73 bool RecvClientInfo(int i) {
 74     return SubcribCommFunc(i,true);
 75 }
 76 
 77 bool PublishCommFunc(int threadNum, bool isServer, string publishStr) {
 78     cpp_redis::client client;
 79     try {
 80         client.connect("127.0.0.1", 6379, [threadNum, isServer,&publishStr](const std::string& host, std::size_t port, cpp_redis::client::connect_state status) {
 81             if (status == cpp_redis::client::connect_state::dropped) {
 82                 {std::lock_guard<std::mutex> l(g_mutex);  std::cout << "disconnected from " << host << ":" << port << std::endl; }
 83             }
 84         });
 85         while (1) {
 86             std::string chanName;
 87             if (isServer) {chanName = serverChanName;}
 88             else {    chanName = clientChanName;}
 89 
 90             client.publish(chanName.c_str(), publishStr.c_str());
 91             client.commit();
 92 
 93             int PubliLoopTime = 9;
 94             if (isServer) {PubliLoopTime = ServerQueryTime;}
 95             else {PubliLoopTime = heartBeatTime;}
 96 
 97             std::this_thread::sleep_for(std::chrono::seconds(PubliLoopTime));
 98         }
 99     }
100     catch (std::exception& e) {
101         {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; }
102         return false;
103     }
104 
105     return true;
106 }
107 
108 void QueryWhoSaveDataLoop(int i) {
109     string s = "Server thread ";s += to_string(i);s += " query Who save data? ";
110     PublishCommFunc(i, true, s);
111     return;
112 }
113 
114 void ServerFunc(int i) {
115     {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ServerFunc threadNo = " << i << std::endl;}
116     //開啓一個訂閱客戶端消息隊列的線程 接受客戶端的心跳包
117     thread t = thread(RecvClientInfo, i);
118     t.detach();
119 
120     //開啓一個定時檢測心跳超時的客戶端 todo
121 
122     //本線程不定時隨機 發送一個詢問各個客戶端是否保存有數據
123     QueryWhoSaveDataLoop(i);
124 
125     std::this_thread::sleep_for(std::chrono::seconds(500));
126 }
127 
128 void SendHeatBeatOnTime(int threadNum, int sendTime) {
129     string s = "client thread ";s += to_string(threadNum);s += " send heartbeat";
130     PublishCommFunc(threadNum, false, s);
131 }
132 
133 void ClientFunc(int i) {
134     {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ClientFunc threadNo = " << i << std::endl;}
135 
136     //開啓一個線程 定時發送心跳包
137     int s = heartBeatTime;
138     std::thread t = thread(SendHeatBeatOnTime, i, s);
139     t.detach();
140 
141     SubcribCommFunc(i, false);
142 }
143 
144 void Start() {
145     thread serverThread[serverThreadNum];
146     thread clientThread[clientThreadNum];
147 
148     for (int i = 0; i < serverThreadNum; i++) {
149         serverThread[i] = thread(ServerFunc, i);
150     }
151     for (int i = 0; i < clientThreadNum; i++) {
152         clientThread[i] = thread(ClientFunc, i);
153     }
154     //==================================================
155     for (int i = 0; i < serverThreadNum; i++) {
156         serverThread[i].join();
157     }
158     for (int i = 0; i < clientThreadNum; i++) {
159         clientThread[i].join();
160     }
161 }
162 
163 int main()
164 {
165     WinsockGuard g;
166     Start();
167     std::cout << "Finish!\n";
168 }
View Code

 

開啓redis  運行代碼如圖

 

番外: 補上我在ubuntu下進行的libevent + hiredis的異步測試

首先是安裝源頭更新 更新 gcc  g++  make 等工具

sudo apt-get update

sudo apt-get install g++ gcc

安裝 redis server

sudo apt-get install redis-server 

如今能夠經過下面的命令查看到該進程:
ps -ef|grep redis

 

而後安裝 hiredis 和 libevent

sudo apt-get install libhiredis-dev

sudo apt-get install libevent-dev

 

安裝完成驗證下是否正確安裝

編寫libevent 示例代碼

 1 #include <event.h>
 2 #include <stdio.h>
 3 
 4 struct event ev;
 5 struct timeval tv;
 6 
 7 
 8 void time_cb(int fd, short event, void *argc)
 9 {
10     printf(  "timer wakeup\n"); 
11     event_add(&ev, &tv);
12 }
13 
14 int main()
15 {
16     struct event_base *base = event_init();
17 
18     tv.tv_sec = 1;
19     tv.tv_usec = 0;
20     evtimer_set(&ev, time_cb, NULL);
21     event_add(&ev, &tv);
22     event_base_dispatch(base);
23 
24     return 0;
25 }
libeventTest.c

執行編譯命令並運行 gcc -o eventexe libeventTest.c  -levent

./eventexe  執行無錯誤則驗證經過

 

編寫hiredis示例代碼

 1 #include <stdio.h> 
 2 #include <hiredis/hiredis.h> 
 3 int main() 
 4 { 
 5      redisContext *conn  = redisConnect("127.0.0.1",6379); 
 6      if(conn != NULL && conn->err) 
 7      {   
 8          printf("connection error: %s\n",conn->errstr); 
 9          return 0; 
10      }   
11      redisReply *reply = (redisReply*)redisCommand(conn,"set foo 1234"); 
12      freeReplyObject(reply); 
13              
14      reply = redisCommand(conn,"get foo"); 
15      printf("%s\n",reply->str); 
16      freeReplyObject(reply); 
17              
18      redisFree(conn); 
19      return 0; 
20 }
View Code

執行編譯命令並運行 gcc -o hiredisCli hiredisTest.c  -lhiredis

./hiredisCli  執行無錯誤則驗證經過

 

libevent和hiredis都確認無誤後 開始測試異步代碼

編寫異步示例代碼

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <string.h>
 4 #include <signal.h>
 5 
 6 #include <hiredis/hiredis.h> 
 7 #include <hiredis/async.h>
 8 #include <hiredis/adapters/libevent.h>
 9 
10 
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <signal.h>
15 
16 void getCallback(redisAsyncContext *c, void *r, void *privdata) {
17     redisReply *reply = r;
18     if (reply == NULL) return;
19     printf("argv[%s]: %s\n", (char*)privdata, reply->str);
20 
21     /* Disconnect after receiving the reply to GET */
22     redisAsyncDisconnect(c);
23 }
24 
25 void connectCallback(const redisAsyncContext *c, int status) {
26     if (status != REDIS_OK) {
27         printf("Error: %s\n", c->errstr);
28         return;
29     }
30     printf("Connected...\n");
31 }
32 
33 void disconnectCallback(const redisAsyncContext *c, int status) {
34     if (status != REDIS_OK) {
35         printf("Error: %s\n", c->errstr);
36         return;
37     }
38     printf("Disconnected...\n");
39 }
40 
41 int main (int argc, char **argv) {
42     signal(SIGPIPE, SIG_IGN);
43     struct event_base *base = event_base_new();
44 
45     redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
46     if (c->err) {
47         /* Let *c leak for now... */
48         printf("Error: %s\n", c->errstr);
49         return 1;
50     }
51 
52     redisLibeventAttach(c,base);
53     redisAsyncSetConnectCallback(c,connectCallback);
54     redisAsyncSetDisconnectCallback(c,disconnectCallback);
55     redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
56     redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
57     event_base_dispatch(base);
58     return 0;
59 }
View Code

執行編譯命令並運行  

gcc -o async async.c -lhiredis -levent

./async

測試成功

相關文章
相關標籤/搜索