正所謂「工欲善其事,必先利其器」, 咱們在實現通訊設計任務的過程當中須要一些基礎工具來幫助咱們搭建部分基礎組件,這些基礎工具包括消息隊列,線程池,緩衝區抽象,事件循環和日誌工具。接下來對這部分基礎工具進行描述和實現。linux
(1)成員函數和數據結構 ios
struct msgbuf{ long mtype; char mtext[1]; } // msgid_ds內核數據結構 struct msgid_ds
// 生成惟一的鍵 key ftok(const char *pathname, int proj_id) int msgget(key_t key, int msgflg) msgflg是一個標誌參數: * IPC_CREATE 若是內核不存在與key相等的消息隊列,則建立一個 一個消息隊列,若是存在這樣的消息隊列,返回該消息隊列的描述符 * IPC_EXCL 和IPC_CREATE一塊兒使用,若是對應鍵值的消息隊列已 經存在,則出錯,返回-1 int msgsnd(int msgid, struct msgbuf* msgp. size_t msgsz, int magflg) * 特別注意第三個參數,能夠設置爲0或IPC_NOWAIT, 當爲0時,消息 已滿的時候會阻塞,若是爲IPC_NOWAIT,則不等待當即返回,常見錯 誤碼有EAGAIN(消息隊列已滿),EIDRN(消息隊列已被刪除) EACCESS(沒法訪問消息對列)
int msgrcv(int msgid, struct msgbuff* , size_t msgsz, long int msgtype, int msgflag) * msgflg 操做標誌位,IPC_NOWAIT(若是沒有知足條件的消息立馬返回) IPC_EXCEPT
(返回隊列中第一個類型不會msgtype的消息),IPC_NOERROR(若是隊列中知足條件的消息
內容大於所請求的實際字節數,則把該消息截斷,截斷部分將被丟棄)
int msgctl(int msgid, int cmd, struct msgid_ds*) *cmd有如下三個參數: *IPC_STAT(獲取消息隊列對應的msgid_ds數據結構) *IPC_SET(設置消息隊列的屬性) *IPC_RMID(從內核中刪除msgid標識的消息隊列)
(2)示例express
// linux系統的消息隊列使用 #include "stdio.h" #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdint.h> #include <errno.h> #include <string> #include <string.h> #include <iostream> using namespace std; const int32_t BUFFSIZE = 256; struct msgBuff { long msgType; char buff[BUFFSIZE]; }; int main() { int32_t proj_id = 32; key_t key = ftok("./messagekey", proj_id); if(-1 == key) { cout << "ftok error" <<endl; } int msgid = msgget(key, IPC_CREAT); if (msgid == -1) { cout << "msgget error" << endl; } msgBuff msg; memset(&msg, 0, sizeof(msgBuff)); msg.msgType = 3; strcpy(msg.buff, "message tset"); cout << msg.buff << endl; int32_t msgLen = sizeof(msgBuff) - sizeof(long); cout << "msgLen:" << msgLen << endl; int nsize; if ((nsize = msgsnd(msgid, &msg, msgLen, 0)) == -1){ cout << strerror(errno) << endl; } return 0; }
// linux系統的消息隊列使用 #include "stdio.h" #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdint.h> #include <errno.h> #include <string> #include <string.h> #include <iostream> using namespace std; #include <thread> #include <chrono> const int32_t BUFFSIZE = 256; struct msgBuff { long msgType; char buff[BUFFSIZE]; }; int main() { int32_t proj_id = 32; key_t key = ftok("./messagekey", proj_id); if(-1 == key) { cout << "ftok error" <<endl; } int msgid = msgget(key, IPC_CREAT); if (msgid == -1) { cout << "msgget error" << endl; } msgBuff msg; memset(&msg, 0, sizeof(msgBuff)); int32_t msgLen = sizeof(msgBuff) - sizeof(long); int nsize; while(1) { if (msgrcv(msgid, &msg, msgLen, 0, 0) == -1){ cout << strerror(errno) << endl; } cout << msg.msgType << "--" << msg.buff <<endl; std::this_thread::sleep_for(std::chrono::milliseconds (2000)); } return 0; }
(1)類圖apache
(2)代碼實現安全
#pragma once #include <stdint.h> class Message { public: struct Type { enum{Stop = 0}; }; Message() { } Message(int32_t type) :_type(type) { } virtual ~Message() { } int32_t GetType() const { return _type; } void SetType(int32_t type) { _type = type; } private: int32_t _type; };
/* @線程安全的消息隊列 */ #pragma once #include <mutex> #include <memory> #include <queue> #include <condition_variable> #include <iostream> using namespace std; template <class MSG> class MessageQueue { public: MessageQueue():_mutex(), _condition(), _queue(){} MessageQueue(const MessageQueue&) = delete; const MessageQueue& operator=(const MessageQueue&) = delete; virtual ~MessageQueue(){}; void Push(const MSG& msg) { std::lock_guard<std::mutex> lock(_mutex); _queue.push(msg); cout << "add msg" << endl; _condition.notify_one(); } bool Pop(MSG& msg, bool isBlocked = true) { std::unique_lock<std::mutex> lock(_mutex); if(isBlocked) { while(_queue.empty()) { cout << "block state, MessageQueue is empty,please wait..." << endl; _condition.wait(lock); } } else { if(IsEmpty()) return false; } msg = std::move(_queue.front()); _queue.pop(); return true; } bool IsEmpty() { std::lock_guard<std::mutex> lock(_mutex); return _queue.empty(); } int32_t Size() { std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); } private: std::mutex _mutex; std::condition_variable _condition; std::queue<MSG> _queue; };
注意:unique_lock和lock_guard的使用,unique_lock不像lock_guard只能在析構時才釋放鎖,它能夠隨時釋放鎖,所以在wait時讓unique_lock釋放鎖從語義上看更加準確,其次要防止死鎖狀況的出現,不要鎖裏面再等待鎖
很少說,直接上代碼數據結構
#pragma once #include <thread> #include <functional> #include <stdint.h> #include <vector> #include <iostream> using namespace std; #include "MessageQueue.h" #define MIN_THREADS 3 template <typename Type> class ThreadPool{ ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; public: ThreadPool(int32_t threads, std::function<void(Type &record)> handler); virtual ~ThreadPool(); void Submit(Type record); private: bool _shutdown; int32_t _threads; std::function<void(Type &record)> _handler; std::vector<std::thread> _workers; MessageQueue<Type> _tasks; }; template <typename Type> ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler) :_shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks() { if(_threads < MIN_THREADS) { _threads = MIN_THREADS; } for(int32_t i = 0; i < _threads; i++) { std::thread workThread([this]{ while(!_shutdown) { Type record; bool ret = _tasks.Pop(record, true); _handler(record); } }); workThread.detach(); _workers.emplace_back(std::move(workThread)); } } template <typename Type> ThreadPool<Type>::~ThreadPool() { for(std::thread &worker : _workers) { worker.join(); } } template <typename Type> void ThreadPool<Type>::Submit(Type record) { _tasks.Push(record); }
以日誌工具爲例,爲了實現高性能的日誌工具,咱們必須確保日誌I/O所有處於一個獨立線程,而不會影響後續的操做,所以,實際上日誌記錄就是其它線程向日志線程發送日誌消息,這樣一來,事件循環模型就變得很是必要。app
(1)ByteArrayless
#pragma once #include <vector> #include <string> #include <cstring> class ByteArray : public std::vector<char> { public: ByteArray() = default; ByteArray(int32_t size) : std::vector<char>(size) { } ByteArray(const char *buffer, int32_t size) : std::vector<char>(buffer, buffer + size) { } ByteArray(const std::string &str) : std::vector<char>(str.size()) { memcpy(data(), str.c_str(), str.size()); } std::string ToStdString() const { std::string result(this->cbegin(), this->cend()); return result; } ByteArray &Concat(const ByteArray &buffer2) { size_t oldSize = size(); size_t newSize = oldSize + buffer2.size(); resize(newSize); memcpy(this->data() + oldSize, buffer2.data(), buffer2.size()); return *this; } ByteArray operator+(const ByteArray &buffer2) const { ByteArray buffer1(this->size() + buffer2.size()); memcpy(buffer1.data(), this->data(), this->size()); memcpy(buffer1.data() + this->size(), buffer2.data(), buffer2.size()); return buffer1; } };
(2)IStreamide
#pragma once #include "ByteArray.h" #include <functional> class IStream { public: typedef std::function<void(const char* buf, int64_t size)> DataIndicationHandler; virtual int32_t Receive(char* buffer, int32_t bufferSize, int32_t& readSize) = 0; virtual int32_t Send(const ByteArray& byteArray) = 0; virtual void OnDataIndication(DataIndicationHandler handler) = 0; virtual DataIndicationHandler GetDataIndication() = 0; };
(3) BaseEvent函數
#pragma once #include "ByteArray.h" #include "IStream.h" class BaseEvent { public: BaseEvent() { } BaseEvent(const std::string &type, const ByteArray &data, IStream *stream) : _type(type), _data(data), _stream(stream) { } void SetData(const ByteArray &data) { _data = data; } const ByteArray &GetData() const { return _data; } void SetType(const std::string &type) { _type = type; } const std::string &GetType() const { return _type; } void SetStream(IStream *stream) { _stream = stream; } IStream *GetStream() const { return _stream; } private: std::string _type; ByteArray _data; IStream* _stream; };
(4)EventQueue
#pragma once #include "BaseEvent.h" #include <memory> #include <mutex> #include <condition_variable> #include <chrono> class EventQueue { public: EventQueue(int timeout = 0) : _timeout(timeout) { } void PostEvent(BaseEvent *event) { std::unique_lock <std::mutex> locker(_mutex); _events.push_back(std::shared_ptr<BaseEvent>(event)); } std::shared_ptr <BaseEvent> GetEvent() { std::unique_lock <std::mutex> locker(_mutex); if (_events.empty()) { if (_timeout == 0) { return nullptr; } _waitCondition.wait_for(locker, std::chrono::milliseconds(_timeout)); } if (!_events.empty()) { std::shared_ptr <BaseEvent> event = _events.front(); _events.erase(_events.begin()); return event; } return nullptr; } private: std::vector <std::shared_ptr<BaseEvent>> _events; std::mutex _mutex; std::condition_variable _waitCondition; // ms int _timeout; };
(5)Loop
#pragma once class Loop { public: void Start() { _Run(); } virtual ~Loop(){} private: virtual void _Run() = 0; }
(6)EventQueueLoop
#pragma once #include "Loop.h" #include "EventQueue.h" #include <memory> class EventQueueLoop : public Loop { public: EventQueueLoop(EventQueue *queue); protected: virtual void _Run(); virtual void OnEvent(std::shared_ptr <BaseEvent> event) = 0; private: EventQueue *_queue; };
#include "EventQueueLoop.h" EventQueueLoop::EventQueueLoop(EventQueue *queue) : _queue(queue) { } void EventQueueLoop::_Run() { while (true) { std::shared_ptr <BaseEvent> event = _queue->GetEvent(); if (!event) { continue; } OnEvent(event); } }
const std::string PRIORITY_STRING[] = { "DEBUG", "CONFIG", "INFO", "WARNING", "ERROR" };
void Logger::WriteLog(Priority priority, const std::string &log) { if (priority < _priority) return; std::stringstream stream; stream << HurricaneUtils::GetCurrentTimeStamp() << " [" << PRIORITY_STRING[priority] << "] " << log; _queue.Push(stream.str()); }
std::string GetCurrentTimeStamp() { // get current time auto currentTime = std::chrono::system_clock::now(); // get milliseconds auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000; auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime); // output the time stamp std::ostringstream stream; #if (defined(WIN32) || defined(_WIN32) || defined(__WIN32__)) && !defined(__MINGW32__) stream << std::put_time(std::localtime(¤tTimePoint), "%T"); #else char buffer[80]; auto success = std::strftime(buffer, 80, "%T", std::localtime(¤tTimePoint)); // %T顯示時分秒 assert(0 != success); stream << buffer; #endif stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count(); return stream.str(); }
/** * licensed to the apache software foundation (asf) under one * or more contributor license agreements. see the notice file * distributed with this work for additional information * regarding copyright ownership. the asf licenses this file * to you under the apache license, version 2.0 (the * "license"); you may not use this file except in compliance * with the license. you may obtain a copy of the license at * * http://www.apache.org/licenses/license-2.0 * * unless required by applicable law or agreed to in writing, software * distributed under the license is distributed on an "as is" basis, * without warranties or conditions of any kind, either express or implied. * see the license for the specific language governing permissions and * limitations under the license. */ #pragma once #include "MessageQueue.h" #include <memory> #include <thread> #include <queue> #include <string> #include <fstream> enum Priority { DEBUG, STATE, INFO, WARNING, FAULT }; class Logger { Logger &operator=(const Logger &) = delete; Logger(const Logger &other) = delete; public: static Logger *Get(); void SetPriority(Priority priority); Priority GetPriority(); void WriteLog(Priority priority, const std::string &log); private: Logger(Priority priority); virtual ~Logger(); void _InitializeFileStream(); void _WriteThread(); std::string GetCurrentTimeStamp(); private: MessageQueue <std::string> _queue; std::ofstream *_fileStream; Priority _priority; bool _shutdown; }; #define TRACE_DEBUG(LOG_CONTENT) Logger::Get()->WriteLog(DEBUG, LOG_CONTENT); #define TRACE_STATE(LOG_CONTENT) Logger::Get()->WriteLog(STATE, LOG_CONTENT); #define TRACE_INFO(LOG_CONTENT) Logger::Get()->WriteLog(INFO, LOG_CONTENT); #define TRACE_WARNING(LOG_CONTENT) Logger::Get()->WriteLog(WARNING, LOG_CONTENT); #define TRACE_ERROR(LOG_CONTENT) Logger::Get()->WriteLog(FAULT, LOG_CONTENT);
/** * licensed to the apache software foundation (asf) under one * or more contributor license agreements. see the notice file * distributed with this work for additional information * regarding copyright ownership. the asf licenses this file * to you under the apache license, version 2.0 (the * "license"); you may not use this file except in compliance * with the license. you may obtain a copy of the license at * * http://www.apache.org/licenses/license-2.0 * * unless required by applicable law or agreed to in writing, software * distributed under the license is distributed on an "as is" basis, * without warranties or conditions of any kind, either express or implied. * see the license for the specific language governing permissions and * limitations under the license. */ #include "logger.h" #include <iostream> #include <sstream> #include <iomanip> const std::string PRIORITY_STRING[] = { "DEBUG", "CONFIG", "INFO", "WARNING", "ERROR" }; Logger *Logger::Get() { static Logger logger(DEBUG); return &logger; } Logger::Logger(Priority priority) : _queue(), _fileStream(nullptr), _shutdown(false) { _priority = priority; _InitializeFileStream(); auto func = std::bind(&Logger::_WriteThread, this); std::thread writeThread(func); writeThread.detach(); } Logger::~Logger() { _shutdown = true; if (nullptr != _fileStream) { _fileStream->close(); delete _fileStream; _fileStream = nullptr; } } std::string Logger::GetCurrentTimeStamp() { // get current time auto currentTime = std::chrono::system_clock::now(); // get milliseconds auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000; auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime); // output the time stamp std::ostringstream stream; char buffer[80]; auto success = std::strftime(buffer, 80, "%T", std::localtime(¤tTimePoint)); // %T顯示時分秒 stream << buffer; //assert(0 != success); //stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count(); return stream.str(); } void Logger::SetPriority(Priority priority) { _priority = priority; } Priority Logger::GetPriority() { return _priority; } void Logger::_InitializeFileStream() { // Prepare fileName std::string fileName = "./logger.log"; // Initialize file stream _fileStream = new std::ofstream(); std::ios_base::openmode mode = std::ios_base::out; mode |= std::ios_base::trunc; _fileStream->open(fileName, mode); // Error handling if (!_fileStream->is_open()) { // Print error information std::ostringstream ss_error; ss_error << "FATAL ERROR: could not Open log file: [" << fileName << "]"; ss_error << "\n\t\t std::ios_base state = " << _fileStream->rdstate(); std::cerr << ss_error.str().c_str() << std::endl << std::flush; // Cleanup _fileStream->close(); delete _fileStream; _fileStream = nullptr; } } void Logger::WriteLog(Priority priority, const std::string &log) { if (priority < _priority) return; std::stringstream stream; stream << GetCurrentTimeStamp() << " [" << PRIORITY_STRING[priority] << "] " << log; _queue.Push(stream.str()); } void Logger::_WriteThread() { while (!_shutdown) { std::string log; _queue.Pop(log, true); //std::cout << log << std::endl; if (_fileStream) *_fileStream << log << std::endl; } }
測試:
#include "stdio.h" #include <thread> #include <chrono> #include "../logger.h" int main() { TRACE_DEBUG("Logger Debug test"); while(1) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } return 0; }