redis在遊戲服務器中的使用初探(一) 環境搭建
redis在遊戲服務器中的使用初探(二) 客戶端開源庫選擇
redis在遊戲服務器中的使用初探(三) 信息存儲
redis在遊戲服務器中的使用初探(四) redis應用html
在學習分佈式對象存儲的期間,有這麼一個需求linux
"多個接口服務(本文看成客戶端Clinet)須要以固定間隔向全部的多個服務器發送心跳,保證服務器確認客戶端狀態。ios
服務器在接收到文件讀取請求時候,會廣播詢問全部數據服務器(本文也看成服務器)存儲的數據狀況"redis
以上一對多 的詢問,是須要消息隊列來進行通信的ubuntu
可是其實 redis也能夠做爲輕量級的消息隊列來完成這個需求。windows
服務器開啓一個線程進行redis訂閱模式,當有人在指定頻道發佈消息時,全部訂閱該頻道的節點均可以接收到消息。服務器
可是訂閱操做若是咱們不想採起固定時間間隔去獲取頻道是否有消息這麼LOW的方案,實際上是須要作成異步模式的。異步
而windows下 hredis異步模式是須要libevent支持的。 二者都是linux下運行良好的開源庫,在windows下倒是問題多多。async
通過屢次嘗試,我決定放棄使用這兩個開源庫而選擇cpp-redis。(linux下使用hredis和libevent ,有時間會試試)分佈式
一個服務節點 須要開啓一個線程 進行客戶端消息隊列的訂閱,每當收到消息就會調用收到消息的回調函數
而最初開啓的服務節點的運行線程會定時的在服務器消息隊列發佈詢問數據存儲的信息。
客戶端節點則相反 開啓一個線程 定時向客戶端消息隊列發佈心跳信息。
最初開啓的客戶端節點進行服務器消息隊列的訂閱,若收到服務器的數據存儲詢問,則進行自己是否存儲該數據的判斷
因爲資源有限,最開始咱們開啓了5個線程 來模擬 2個服務器和 3個客戶端
代碼以下
1 #include <iostream> 2 #include <Winsock2.h> 3 #include <thread> 4 #include <mutex> 5 6 #include "cpp_redis/cpp_redis" 7 #include "tacopie/tacopie" 8 9 using namespace std; 10 11 const int serverThreadNum = 2; 12 const int clientThreadNum = 3; 13 const int heartBeatTime = 1; 14 const int ServerQueryTime = 1; 15 const std::string clientChanName = "ClientChan"; 16 const std::string serverChanName = "ServerChan"; 17 std::mutex g_mutex; 18 19 class WinsockGuard { 20 public: 21 WinsockGuard() { 22 WORD version = MAKEWORD(2, 2); 23 if (WSAStartup(version, &data) != 0) { 24 std::cerr << "WSAStartup() failure!" << std::endl; 25 return; 26 } 27 } 28 29 ~WinsockGuard() { 30 WSACleanup(); 31 } 32 private: 33 WSADATA data; 34 }; 35 36 bool SubcribCommFunc(int threadNum,bool isServer) { 37 cpp_redis::subscriber sub; 38 39 try { 40 sub.connect("127.0.0.1", 6379, [](const std::string& host, std::size_t port, cpp_redis::subscriber::connect_state status) { 41 if (status == cpp_redis::subscriber::connect_state::dropped) { 42 {std::lock_guard<std::mutex> l(g_mutex); std::cout << "client disconnected from " << host << ":" << port << std::endl; } 43 //should_exit.notify_all(); 44 } 45 }); 46 47 } 48 catch (std::exception& e) { 49 {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; } 50 return false; 51 } 52 std::string chanName; 53 if (isServer) {chanName = clientChanName;} 54 else {chanName = serverChanName;} 55 56 sub.subscribe(chanName.c_str(), [threadNum, isServer](const std::string& chan, const std::string& msg) { 57 string s; 58 if (isServer)s = "server "; 59 else s = "client "; 60 s += to_string(threadNum);s += " recv "; 61 {std::lock_guard<std::mutex> l(g_mutex); std::cout << s.c_str() << chan << ": " << msg << std::endl; } 62 //todo Check heatbeat or response 63 }); 64 sub.commit(); 65 66 while (1) { 67 std::this_thread::sleep_for(std::chrono::seconds(50000)); 68 } 69 70 return true; 71 } 72 73 bool RecvClientInfo(int i) { 74 return SubcribCommFunc(i,true); 75 } 76 77 bool PublishCommFunc(int threadNum, bool isServer, string publishStr) { 78 cpp_redis::client client; 79 try { 80 client.connect("127.0.0.1", 6379, [threadNum, isServer,&publishStr](const std::string& host, std::size_t port, cpp_redis::client::connect_state status) { 81 if (status == cpp_redis::client::connect_state::dropped) { 82 {std::lock_guard<std::mutex> l(g_mutex); std::cout << "disconnected from " << host << ":" << port << std::endl; } 83 } 84 }); 85 while (1) { 86 std::string chanName; 87 if (isServer) {chanName = serverChanName;} 88 else { chanName = clientChanName;} 89 90 client.publish(chanName.c_str(), publishStr.c_str()); 91 client.commit(); 92 93 int PubliLoopTime = 9; 94 if (isServer) {PubliLoopTime = ServerQueryTime;} 95 else {PubliLoopTime = heartBeatTime;} 96 97 std::this_thread::sleep_for(std::chrono::seconds(PubliLoopTime)); 98 } 99 } 100 catch (std::exception& e) { 101 {std::lock_guard<std::mutex> l(g_mutex); std::cerr << "in " << __FUNCTION__ << ".err = " << e.what() << std::endl; } 102 return false; 103 } 104 105 return true; 106 } 107 108 void QueryWhoSaveDataLoop(int i) { 109 string s = "Server thread ";s += to_string(i);s += " query Who save data? "; 110 PublishCommFunc(i, true, s); 111 return; 112 } 113 114 void ServerFunc(int i) { 115 {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ServerFunc threadNo = " << i << std::endl;} 116 //開啓一個訂閱客戶端消息隊列的線程 接受客戶端的心跳包 117 thread t = thread(RecvClientInfo, i); 118 t.detach(); 119 120 //開啓一個定時檢測心跳超時的客戶端 todo 121 122 //本線程不定時隨機 發送一個詢問各個客戶端是否保存有數據 123 QueryWhoSaveDataLoop(i); 124 125 std::this_thread::sleep_for(std::chrono::seconds(500)); 126 } 127 128 void SendHeatBeatOnTime(int threadNum, int sendTime) { 129 string s = "client thread ";s += to_string(threadNum);s += " send heartbeat"; 130 PublishCommFunc(threadNum, false, s); 131 } 132 133 void ClientFunc(int i) { 134 {std::lock_guard<std::mutex> l(g_mutex);std::cout << "Enter ClientFunc threadNo = " << i << std::endl;} 135 136 //開啓一個線程 定時發送心跳包 137 int s = heartBeatTime; 138 std::thread t = thread(SendHeatBeatOnTime, i, s); 139 t.detach(); 140 141 SubcribCommFunc(i, false); 142 } 143 144 void Start() { 145 thread serverThread[serverThreadNum]; 146 thread clientThread[clientThreadNum]; 147 148 for (int i = 0; i < serverThreadNum; i++) { 149 serverThread[i] = thread(ServerFunc, i); 150 } 151 for (int i = 0; i < clientThreadNum; i++) { 152 clientThread[i] = thread(ClientFunc, i); 153 } 154 //================================================== 155 for (int i = 0; i < serverThreadNum; i++) { 156 serverThread[i].join(); 157 } 158 for (int i = 0; i < clientThreadNum; i++) { 159 clientThread[i].join(); 160 } 161 } 162 163 int main() 164 { 165 WinsockGuard g; 166 Start(); 167 std::cout << "Finish!\n"; 168 }
開啓redis 運行代碼如圖
首先是安裝源頭更新 更新 gcc g++ make 等工具
sudo apt-get update
sudo apt-get install g++ gcc
安裝 redis server
sudo apt-get install redis-server
如今能夠經過下面的命令查看到該進程:
ps -ef|grep redis
而後安裝 hiredis 和 libevent
sudo apt-get install libhiredis-dev
sudo apt-get install libevent-dev
安裝完成驗證下是否正確安裝
編寫libevent 示例代碼
1 #include <event.h> 2 #include <stdio.h> 3 4 struct event ev; 5 struct timeval tv; 6 7 8 void time_cb(int fd, short event, void *argc) 9 { 10 printf( "timer wakeup\n"); 11 event_add(&ev, &tv); 12 } 13 14 int main() 15 { 16 struct event_base *base = event_init(); 17 18 tv.tv_sec = 1; 19 tv.tv_usec = 0; 20 evtimer_set(&ev, time_cb, NULL); 21 event_add(&ev, &tv); 22 event_base_dispatch(base); 23 24 return 0; 25 }
執行編譯命令並運行 gcc -o eventexe libeventTest.c -levent
./eventexe 執行無錯誤則驗證經過
編寫hiredis示例代碼
1 #include <stdio.h> 2 #include <hiredis/hiredis.h> 3 int main() 4 { 5 redisContext *conn = redisConnect("127.0.0.1",6379); 6 if(conn != NULL && conn->err) 7 { 8 printf("connection error: %s\n",conn->errstr); 9 return 0; 10 } 11 redisReply *reply = (redisReply*)redisCommand(conn,"set foo 1234"); 12 freeReplyObject(reply); 13 14 reply = redisCommand(conn,"get foo"); 15 printf("%s\n",reply->str); 16 freeReplyObject(reply); 17 18 redisFree(conn); 19 return 0; 20 }
執行編譯命令並運行 gcc -o hiredisCli hiredisTest.c -lhiredis
./hiredisCli 執行無錯誤則驗證經過
libevent和hiredis都確認無誤後 開始測試異步代碼
編寫異步示例代碼
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 #include <signal.h> 5 6 #include <hiredis/hiredis.h> 7 #include <hiredis/async.h> 8 #include <hiredis/adapters/libevent.h> 9 10 11 #include <stdio.h> 12 #include <stdlib.h> 13 #include <string.h> 14 #include <signal.h> 15 16 void getCallback(redisAsyncContext *c, void *r, void *privdata) { 17 redisReply *reply = r; 18 if (reply == NULL) return; 19 printf("argv[%s]: %s\n", (char*)privdata, reply->str); 20 21 /* Disconnect after receiving the reply to GET */ 22 redisAsyncDisconnect(c); 23 } 24 25 void connectCallback(const redisAsyncContext *c, int status) { 26 if (status != REDIS_OK) { 27 printf("Error: %s\n", c->errstr); 28 return; 29 } 30 printf("Connected...\n"); 31 } 32 33 void disconnectCallback(const redisAsyncContext *c, int status) { 34 if (status != REDIS_OK) { 35 printf("Error: %s\n", c->errstr); 36 return; 37 } 38 printf("Disconnected...\n"); 39 } 40 41 int main (int argc, char **argv) { 42 signal(SIGPIPE, SIG_IGN); 43 struct event_base *base = event_base_new(); 44 45 redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); 46 if (c->err) { 47 /* Let *c leak for now... */ 48 printf("Error: %s\n", c->errstr); 49 return 1; 50 } 51 52 redisLibeventAttach(c,base); 53 redisAsyncSetConnectCallback(c,connectCallback); 54 redisAsyncSetDisconnectCallback(c,disconnectCallback); 55 redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); 56 redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); 57 event_base_dispatch(base); 58 return 0; 59 }
執行編譯命令並運行
gcc -o async async.c -lhiredis -levent
./async
測試成功