簡潔易用的C++11網絡庫,From:https://github.com/yedf/handy
在整理過去的資料過程當中,發現過去有關注過這一個網絡庫,簡單看了一下屬於輕量級的實現,所以本文將對該庫進行簡單的學習之旅,目標是對網絡基礎知識進一步鞏固。html
庫目前實現了linux和mac環境,須要支持C++11所以gcc的版本要大於4.8,在個人虛擬機ubuntu12.04是要升級gcc版本,而後使用雲centos 7,以前安裝的cmake版本是2.8.12,與要求的版本大於3.2不匹配,所以先升級cmakejava
$ cd /tmp $ wget https://cmake.org/files/v3.3/cmake-3.3.2.tar.gz $ tar xzvf cmake-3.3.2.tar.gz $ cd cmake-3.3.2 $ ./bootstrap $ gmake $ make install #FROM : https://blog.csdn.net/fword/article/details/79347356
升級後能順利編譯。python
既然是高性能網絡庫,那linux必然是epoll,在raw-examples帶有對epoll的測試epoll.cc(水平觸發)和epoll-et.cc(邊緣觸發)
水平觸發:當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。若是此次沒有把數據一次性所有讀寫完(如讀寫緩衝區過小),那麼下次調用 epoll_wait()時,它還會通知你在上沒讀寫完的文件描述符上繼續讀寫,固然若是你一直不去讀寫,它會一直通知你!若是系統中有大量你不須要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大下降處理程序檢索本身關心的就緒文件描述符的效率!
Edge_triggered(邊緣觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。若是此次沒有把數據所有讀寫完(如讀寫緩衝區過小),那麼下次調用epoll_wait()時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件纔會通知你!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符!
水平觸發和邊緣觸發
根據linux的man-page中說明邊緣觸發要求在EPOLL_CTRL_ADD的時候就對文件描述符進行EPOLLIN|EPOLLOUT|EPOLLET事件關注(建議只對客戶端套接字),這能避免不斷地使用EPOLL_CTL_MOD修改對EPOLLIN和EPOLLOUT事件地關注。一般狀況下監聽套接字爲水平觸發,客戶套接字邊緣觸發,對監聽套接字和客戶套接字都要設置非阻塞模式。監聽套接字使用水平觸發的緣由是,多個鏈接同時到達若是使用邊緣觸發則epoll只會通知一次,有一些TCP鏈接在就緒隊列積累得不到及時處理,若是使用水平觸發須要採起而外的處理方式(使用while循環accpet,直到accept返回-1且errno設置爲EAGIN表示全部的鏈接處理完了)
EPOLL的系統函數定義以下:react
#include <sys/epoll.h> typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; // Epoll events epoll_data_t data; // User data variable }; /* 功能:建立epoll對象 [1]size無心義,要求大於0 返回值:成功爲非負文件描述符,失敗爲-1 */ int epoll_create(int size); /* 功能:對epoll對象增長,修改或刪除感興趣事件,輸入<文件描述符fd, 操做op, 事件epoll_event> 操做OP:增EPOLL_CTL_ADD,改EPOLL_CTL_MOD,刪EPOLL_CTL_DEL 事件epoll_event.events:對應文件描述符可讀EPOLLIN,可寫EPOLLOUT,對方關閉EPOLLRDHUP,異常EPOLLPRI ,錯誤EPOLLERR,掛起EPOLLHUP,設置邊緣觸發EPOLLET,設置只觸發一次EPOLLONESHOT,EPOLLWAKEUP,EPOLLEXCLUSIVE 返回值:0-成功,-1失敗 */ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /* 功能:等待內核中的epoll_event事件可讀或者timeout到達 [1]epfd是一個epoll實例句柄根據epoll_create獲得 [2]epoll_event包含文件描述符和Epoll事件,對應內存由用戶開闢 [3]最多事件數,必須大於0 [4]超時事件,單位爲ms 返回值:>0有對應個文件描述符發生了事件;0超時到達;-1發生錯誤 */ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
下面是代碼節選linux
//epoll.cc 水平觸發 //main函數 //0)忽略SIGPIPE信號,避免對等方關閉後觸發了寫操做引發的SIGPIPE信號,而致使進程退出 ::signal(SIGPIPE, SIG_IGN); //1)定義了回饋的報文,長度1048576是爲了測試寫緩衝區滿了的狀況 httpRes = "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: 1048576\r\n\r\n123456"; for (int i = 0; i < 1048570; i++) { httpRes += '\0'; } //2)建立epoll實例 int epollfd = epoll_create(1); //3)建立socket監聽套接字listenfd,設置非阻塞模式,bind,listen和加入到epollfd關注 int listenfd = socket(AF_INET, SOCK_STREAM, 0); int r = ::bind(listenfd, (struct sockaddr *) &addr, sizeof(struct sockaddr)); r = listen(listenfd, 20); setNonBlock(listenfd); updateEvents(epollfd, listenfd, EPOLLIN, EPOLL_CTL_ADD); //epoll_ctrl(epollfd,EPOLL_CTL_ADD,listenfd,ev.EPOLLIN)關注監聽套接字的可讀事件 //4)循環epoll_wait等待內核事件 for (;;) { //實際應用應當註冊信號處理函數,退出時清理資源 loop_once(epollfd, listenfd, 10000); //調用epoll_wait,超時等待爲10s,若是有事件返回也會當即返回 } //loop_once函數 int n = epoll_wait(efd, activeEvs, kMaxEvents, waitms); for (int i = 0; i < n; i++) { int fd = activeEvs[i].data.fd; int events = activeEvs[i].events; if (events & (EPOLLIN | EPOLLERR)) { if (fd == lfd) { handleAccept(efd, fd); //有鏈接到來,accpet獲得對應文件描述符,調用updateEvents加入efd的EPOLLIN關注列表 } else { handleRead(efd, fd); //客戶端有數據,保存鏈接上下文到map<fd, Con>cons中,根據http的協議(結尾"\n\n"或"\r\n\r\n")發送httpRes給客戶端,注意這裏httpRes太長,寫write返回小於0且errno爲EAGAIN或EWOULDBLOCK,則要表示緩衝區已近滿了不能再寫了,要修改關注對應套接字的可讀可寫事件;後續回調可寫繼續寫入,最後寫完成後修改成只關注可讀事件。 } } else if (events & EPOLLOUT) { if (output_log) printf("handling epollout\n"); handleWrite(efd, fd); } else { exit_if(1, "unknown event"); } //updateEvents函數 void updateEvents(int efd, int fd, int events, int op) { struct epoll_event ev; memset(&ev, 0, sizeof(ev)); ev.events = events; ev.data.fd = fd; printf("%s fd %d events read %d write %d\n", op == EPOLL_CTL_MOD ? "mod" : "add", fd, ev.events & EPOLLIN, ev.events & EPOLLOUT); int r = epoll_ctl(efd, op, fd, &ev); //把文件描述符fd加入到epoll對象efd關注 exit_if(r, "epoll_ctl failed"); }
值得注意的是水平觸發和邊緣觸發的區別,是在epoll_ctl中ev.events指定,默認爲水平觸發;後續要特別注意對可寫事件的處理上,水平觸發須要在寫遇到WOULDBLOCK後關注可寫事件,寫完後取消關注可寫事件,而邊緣觸發只是在epoll_ctl的add操做中指定EPOLLET和同時關注可讀可寫事件,然後在寫write數據中遇到EWOULDBLOCK直接跳出寫循環等到內核說能夠再寫則繼續寫。關於讀read每次都讀到返回-1且error爲EAGAIN|EWOULDBLOCK,這種策略下就不用在讀方面區分是水平模式仍是邊緣模式。git
注意:做者給出的示例中,沒有設置監聽套接字SO_REUSEADDR,若是服務端斷開而任一客戶端沒斷開,服務端從新啓動將出想bind失敗,錯誤緣由是"Address already in use"會有約2s時間處於TIME_WAIT狀態,建議服務端開始開啓這個選項,固然也要考慮屢次啓動和搶佔地址的狀況出現。github
handy文件夾即網絡庫的核心,最後生成動態庫和靜態庫,測試程序在example和10m兩個文件夾,分析網絡庫將重點關注handy文件夾。handy文件夾主要的功能實如今以下文件中(從CMakeList文件能夠看出)web
給上面功能分一下類:shell
//util.h struct util { static std::string format(const char *fmt, ...); } //util.cpp string util::format(const char *fmt, ...) { char buffer[500]; //棧內存 unique_ptr<char[]> release1; char *base; for (int iter = 0; iter < 2; iter++) { int bufsize; if(iter == 0) { //第一次嘗試用char[500]去獲取格式化數據 bufsize = sizeof(buffer); base = buffer; } else { //第二次嘗試用char[30000]去獲取格式化數據 bufsize = 30000; base = new char[bufsize]; //或許須要檢查一下30k內存是否分配成功 release1.reset(base); //新內存將由unique_ptr接管,即在程序真正退出前,unique_ptr對象銷燬時同時銷燬綁定的內存; } char *p = base; char *limit = base + bufsize; if (p < limit) { va_list ap; va_start(ap, fmt); p += vsnprintf(p, limit - p, fmt, ap); va_end(ap); } // Truncate to available space if necessary if(p >= limit) { if(iter == 0) { continue; } else { p = limit - 1; *p = '\0'; } } break; } return base;//注意這裏是把char* 返回給一個臨時結果string;若是是返回char *則會出現unique_ptr銷燬一次而外部使用時崩潰,permission denid }
以上主要的疑問:
1)p += vsnprintf(p, limit - p, fmt, ap);理論上p +=max(bufsize)會致使p>=limit出現嗎?
--邊界狀況會出現p==limit而不會大於。bootstrap
2)引入unique_ptr的做用是什麼?是爲了委託base的內存回收嗎?即本程序會內存泄漏嗎?
--unique_ptr的存在時爲了函數結束後對成員進行回收,若是不用unique_ptr,那麼會增長以下代碼釋放內存:
string strTemp(base); //多了一次拷貝 if(base != NULL && base != buffer) delete base; base = NULL; //多了一次釋放,主要判斷不爲棧數組,不然非法釋放 return strTemp;
測試代碼以下:
56 string s1 = "hello"; 57 for(int i = 0; i < 99; i++) { 58 s1 += "hello"; 59 } 60 printf("len(s1)=%d\n", s1.length()); //500 61 string s2 = std::string(util::format("%s", s1.c_str() ) ); 62 printf("len(s2)=%d\n", s2.length()); //500
//util.h struct noncopyable { protected: noncopyable() = default; virtual ~noncopyable() = default; noncopyable(const noncopyable &) = delete; noncopyable &operator=(const noncopyable &) = delete; }; struct ExitCaller : private noncopyable { ~ExitCaller() { functor_(); } ExitCaller(std::function<void()> &&functor) : functor_(std::move(functor)) {} private: std::function<void()> functor_; }; //usage.cc //... int fd = open(filename.c_str(), O_RDONLY); if (fd < 0) { return Status::ioError("open", filename); } ExitCaller ec1([=] { close(fd); });
上述的ExitCaller相似LockGuard,或者說go語言的defer,表示當變量離開做用域時調用某一個函數,defer實現以下和上面只差一個lambda匿名函數:
#pragma once #include <functional> #define CONNECTION(text1,text2) text1##text2 #define CONNECT(text1,text2) CONNECTION(text1,text2) class DeferHelper { public: DeferHelper(std::function<void ()> &&cb) : cb_(std::move(cb)) {} ~DeferHelper() { if (cb_) cb_(); } private: std::function<void ()> cb_; }; #define defer(code) DeferHelper CONNECT(L,__LINE__) ([&](){code;})
封裝了一個隊列和線程池。
隊列的優勢時put會檢查是否滿,pop_wait會等待超時或丟列不爲空;
template <typename T> struct SafeQueue : private std::mutex, private noncopyable { static const int wait_infinite = std::numeric_limits<int>::max(); //最大等待時間ms // 0 不限制隊列中的任務數 SafeQueue(size_t capacity = 0) : capacity_(capacity), exit_(false) {} //隊列滿則返回false,不然push_back到items_中,並使用ready_.notify_one()通知一個去取 bool push(T &&v); //超時則返回T(),出如今隊列爲空狀況;不超時返回items_中頭元素 T pop_wait(int waitMs = wait_infinite); //超時返回false;不超時,v中存儲items_中頭元素 bool pop_wait(T *v, int waitMs = wait_infinite); //有鎖獲取元素個數,即items_.size size_t size(); //退出,置exit_標識爲true void exit(); //取退出標識 bool exited() { return exit_; } private: std::list<T> items_; std::condition_variable ready_; size_t capacity_; std::atomic<bool> exit_; void wait_ready(std::unique_lock<std::mutex> &lk, int waitMs); //等待waitMs,調用ready.wait_until函數 };
多線程隊列則時能利用多個線程消化SafeQueue中的任務
typedef std::function<void()> Task; extern template class SafeQueue<Task>; struct ThreadPool : private noncopyable { //建立線程池,threads指定線程個數建議爲cpunum或2*cpunum, ThreadPool(int threads, int taskCapacity = 0, bool start = true); //銷燬safeQueue和一些打印信息 ~ThreadPool(); //使用線程從safeQueue中取元素讓後執行 void start(); //中止safeQueue ThreadPool &exit() { tasks_.exit(); return *this; } //等待線程集合退出,for(auto &t : threads_)t.join(); void join(); //隊列滿返回false, 使用std::move把右值引用變成引用:tasks_.push(move(task)); bool addTask(Task &&task); bool addTask(Task &task) { return addTask(Task(task)); } size_t taskSize() { return tasks_.size(); } private: SafeQueue<Task> tasks_; std::vector<std::thread> threads_; };
struct Status { Status() : state_(NULL) {} Status(int code, const char *msg);//state = new char[strlen(msg) + 8];state[0-4]=(strlen(msg) + 8),state[4-8]=code //... private: // state_[0..3] == length of message // state_[4..7] == code // state_[8..] == message const char *state_;
//file.h //把文件filename的內容讀到cont中 static Status getContent(const std::string &filename, std::string &cont); //把cont寫到文件filename中 static Status writeContent(const std::string &filename, const std::string &cont); //寫入cont到臨時文件tmpName,刪除舊文件name,重命名tmpName文件爲name文件 static Status renameSave(const std::string &name, const std::string &tmpName, const std::string &cont); //把文件夾dir中的文件名加入到result中,使用dirent.d中的readdir函數 static Status getChildren(const std::string &dir, std::vector<std::string> *result); //刪除文件,使用unlink刪除,c語言中的remove則內部使用了remove,不過remove也能刪除目錄 static Status deleteFile(const std::string &fname); //建立目錄,使用mkdir(name.c_str(), 0755),八進制0755表示文件權限爲文件全部着7(r4+w2+e1),組5(r4+e1),其餘用戶5(r4+e1) static Status createDir(const std::string &name); //刪除文件夾deleteDir static Status deleteDir(const std::string &name); //使用stat返回文件的信息 static Status getFileSize(const std::string &fname, uint64_t *size); //使用rename函數重命名一個文件 static Status renameFile(const std::string &src, const std::string &target); //使用access判斷文件是否存在;或許何以經過stat返回失敗-1且errno==ENOENT判斷文件不存在 static bool fileExists(const std::string &fname);
爲了程序的靈活性,通常都會有INI配置文件,INI配置文件的格式以下
[section]
key1 = value1
key2 = 2
做者導出接口以下:
//conf.h struct Conf { int parse(const std::string &filename); //解析文件filename的內容到values_ std::string get(std::string section, std::string name, std::string default_value); //取字符串值section[name],沒取到返回default_value long getInteger(std::string section, std::string name, long default_value);///取整數值section[name],沒取到返回default_value double getReal(std::string section, std::string name, double default_value);//取浮點數值section[name],沒取到返回default_value bool getBoolean(std::string section, std::string name, bool default_value);//取布爾值section[name],沒取到返回default_value std::list<std::string> getStrings(std::string section, std::string name);//取setction[name]對應的值 std::map<std::string, std::list<std::string>> values_;//存儲爲section.key,value,爲何值是用list來存呢?由於有多行的value的狀況。 std::string filename; //對應解析的文件名
據實現描述這個conf參考了python的ConfigParser,我喜歡輕量級mars的conf解析
日誌是服務器中比較重要的,由於發生異常基本都須要分析日誌改善程序,日誌庫大部分都有glog的影子。對於服務端的日誌,由於在多線程中,所以不能寫串,有人提倡用prinf而不是ostream,ostream真的不是多線程安全,這一點待探索;日誌是能分等級的,常見爲DEBUG,INFO,WARNING,FATAL;日誌能夠是緩衝寫或實時寫,但要保證程序退出的時候儘可能少的丟日誌,尤爲是異常退出的時候;日誌是要支持滾動的,根據具體的要求按天滾動或者按大小滾動;每條日誌頭部有時間信息,尾部可能有文件和代碼行信息。
經過查看logging.h的實現能夠發現,日誌分等級,日誌是一個靜態單例經過static Logger &getLogger()返回,而後定義了一些宏對日誌進行操做。文件要先設置文件名,而後真正寫入是調用logv函數,寫入前根據滾動規則獲取要寫入文件描述符,拼接當前時間等信息和傳入的要寫入的內容,實時寫入到文件中。
實現是目的我的理解是爲了讓服務在後臺運行,測試exmple/daemon.cc的程序,用戶輸入後終端會退出,可是服務會不退出。實現流程是fork一個子進程,而後父進程執行退出,調用setsid讓子進程脫離當前終端的控制不隨當前終端結束而結束。
實現了htobe的uint16_t,uint32_t,uint64_t,int16_t,int32_t,int64_t轉換
實現了獲取DNS信息的getHostByName("www.google.com")
struct Buffer { Buffer() : buf_(NULL), b_(0), e_(0), cap_(0), exp_(512) {} ~Buffer() { delete[] buf_; } //析構的時候銷燬 //統計屬性 size_t size() const { return e_ - b_; } //有效數據長度 bool empty() const { return e_ == b_; } //沒有有效數據 char *data() const { return buf_ + b_; } //有效數據起地址 char *begin() const { return buf_ + b_; } char *end() const { return buf_ + e_; } //內存分配,返回end()結果,分三種狀況 //1) end_ + len <= cap,足夠內存容納,不須要修改內存 //2) size() + len < cap_ / 2,可容納len,但通常以上的內存都在尾部,須要執行moveHead即把有效數據移動到buf_上讓b_=0 //3) 其餘狀況,expand(len),擴張的大小爲max(exp_, 2*cap_, size() + len) char *makeRoom(size_t len); //分配長度爲len的容量,返回數據結束位置 char *allocRoom(size_t len) { char *p = makeRoom(len); addSize(len); //e_ += len; return p; } //增長一段數據 Buffer &append(const char *p, size_t len) { memcpy(allocRoom(len), p, len); //1.調用allocRoom分配足夠容量,把新數據進去 return *this; } //消費長度爲len的數據,注意len必定小於size() Buffer &consume(size_t len) { b_ += len; if (size() == 0) clear(); return *this; } Buffer &absorb(Buffer &buf); //交換this和buf private: char *buf_;//內存的首地址 size_t b_, e_, cap_, exp_;//開始位置,結束位置,總容量,exp_指望大小 void copyFrom(const Buffer &b); //深拷貝b,先拷貝參數,而後this.buf_=new char[b.cap_];memcpy(this.buf_+b_,bu.buf_+b_,b.size())
poll/epoll能管理的不只僅是套接字,而是全部的文件描述符,在linux中管道,timefd_create,eventfd都是能夠歸入epoll來管理,所以要對epoll作簡單的封裝,核心的內容是addChannel,removeChannel,updateChannel對channel中的文件描述符fd和事件event的管理。
//poller.h struct PollerBase : private noncopyable { int64_t id_; int lastActive_; PollerBase() : lastActive_(-1) { static std::atomic<int64_t> id(0); id_ = ++id; } virtual void addChannel(Channel *ch) = 0; virtual void removeChannel(Channel *ch) = 0; virtual void updateChannel(Channel *ch) = 0; virtual void loop_once(int waitMs) = 0; virtual ~PollerBase(){}; }; PollerBase *createPoller(); //返回一個繼承自PollerBase的PollerEpoll struct PollerEpoll : public PollerBase { int fd_; //epoll對象,在構造函數中經過epoll_create獲得 std::set<Channel *> liveChannels_; //Channel集合,可認爲是要關注<fd,event>集合,不擁有他們的生命周器 // for epoll selected active events struct epoll_event activeEvs_[kMaxEvents]; //epoll_wait返回的活躍文件描述符 PollerEpoll(); //epoll_create1(EPOLL_CLOEXEC); ~PollerEpoll(); //while (liveChannels_.size()) {(*liveChannels_.begin())->close();}; ::close(fd_); void addChannel(Channel *ch) override; //加入關注int r = epoll_ctl(fd_, EPOLL_CTL_ADD, ch->fd(), &ev);liveChannels_.insert(ch); void removeChannel(Channel *ch) override;//取消關注liveChannels_.erase(ch); void updateChannel(Channel *ch) override;//更新關注int r = epoll_ctl(fd_, EPOLL_CTL_MOD, ch->fd(), &ev);activeEvs_[i].data.ptr = NULL;(這一個是爲何呢?) void loop_once(int waitMs) override;//等待epoll對象返回,回調對應的事件給通道lastActive_ = epoll_wait(fd_, activeEvs_, kMaxEvents, waitMs);Channel *ch = (Channel *) activeEvs_[i].data.ptr;ch->handleWrite(); };
TCP是基於字節流(STREAM)的可靠協議,客戶端一條最小的有意義的數據稱爲一幀,基於流意味着數據幀可能兩幀數據同時到達,或者數據幀不全的狀況。服務端應用要根據和客戶端約定的協議分離出一幀幀數據,響應相應的請求。
//codec.h struct CodecBase { // > 0 解析出完整消息,消息放在msg中,返回已掃描的字節數 // == 0 解析部分消息 // < 0 解析錯誤 virtual int tryDecode(Slice data, Slice &msg) = 0; virtual void encode(Slice msg, Buffer &buf) = 0; virtual CodecBase *clone() = 0; }; //以\r\n結尾的消息 struct LineCodec : public CodecBase { int tryDecode(Slice data, Slice &msg) override; //找到以\r\n或\n結尾的,返回長度和msg void encode(Slice msg, Buffer &buf) override; //給msg加上\r\n寫入到buf中 CodecBase *clone() override { return new LineCodec(); } } //給出長度的消息,[4][len_4][msg_len] struct LengthCodec : public CodecBase { int tryDecode(Slice data, Slice &msg) override;//首部8字節,第4-8字節爲長度,若是有完成的數據返回長度和msg void encode(Slice msg, Buffer &buf) override;//給buf增長數據‘mBdT’+len(msg)+msg CodecBase *clone() override { return new LengthCodec(); } }
UDP是一種簡單的面向數據報的運輸層協議,不提供可靠性,只是把應用程序傳給IP層的數據報發送出去,可是不能保證它們能到達目的地。在一些直播中會使用UDP,有一些遊戲開發者也探索了UDP實現可靠性的可能。
UDP建立的流程:
int fd = socket(AF_INET, SOCK_DGRAM, 0); //注意第二個參數爲SOCK_DGRAM數據報流 int r = net::setReuseAddr(fd); fatalif(r, "set socket reuse option failed"); r = net::setReusePort(fd, reusePort); fatalif(r, "set socket reuse port option failed"); r = util::addFdFlag(fd, FD_CLOEXEC); fatalif(r, "addFdFlag FD_CLOEXEC failed"); r = ::bind(fd, (struct sockaddr *) &addr_.getAddr(), sizeof(struct sockaddr));
讀寫UDP的命令以下:
//recvfrom truct sockaddr_in raddr; socklen_t rsz = sizeof(raddr); ssize_t rn = recvfrom(fd, buf, bufsize, 0, (sockaddr *) &raddr, &rsz); if (rn < 0) { error("udp %d recv failed: %d %s", fd, errno, strerror(errno)); return; } //sendto truct sockaddr_in raddr; socklen_t rsz = sizeof(raddr); int wn = ::sendto(fd, buf, bufsize, 0, (sockaddr *) raddr, rsz);
http協議應該是每個網絡人直接接觸最多的內容,由於BS和部分CS結構網絡傳輸都是用http,由於其簡單且描述的內容很全面。
http的交互分爲客戶端和服務端,客戶端也能夠是瀏覽器,客戶端發起的請求叫作HTTP請求(HTTP Request),其包括:request line + header + body,header與body之間有一個\r\n;HTTP的請求方法有Get, Post, Head, Put, Delete等。HTTP請求的回覆(HTTP Response)包括:status line + header + body (header分爲普通報頭,響應報頭與實體報頭)
一個典型的請求:
GET http://nooverfit.com/wp/ HTTP/1.1 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Language: zh-Hans-CN,zh-Hans;q=0.5 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.18362 Accept-Encoding: gzip, deflate Host: nooverfit.com Connection: Keep-Alive Cookie: Hm_lvt_416c770ac83a9d9wewewe=15678wwewe,1568260075; Hm_lvt_bfc6c239dfdfad0bbfed25f88a973fb0=1559dfd232 //HTTP Response HTTP/1.1 200 OK Server: Date: Thu, 19 Sep 2019 16:10:38 GMT Content-Type: text/html; charset=UTF-8 Transfer-Encoding: chunked Connection: keep-alive Vary: Cookie,Accept-Encoding,User-Agent Upgrade: h2,h2c Accept-Ranges: bytes Referrer-Policy: <html><head><title>This is title</title></head><body><h1>Hello</h1>Now is 20130611 02:14:31.518462</body></html>
對http實現來講首先是要解析請求和回覆,HttpMsg就是對http協議消息的解析,結果是分離出一個完整的請求幀
struct HttpMsg { enum Result { Error, Complete, NotComplete, Continue100, }; HttpMsg() { HttpMsg::clear(); }; //內容添加到buf,返回寫入的字節數 virtual int encode(Buffer &buf) = 0; //嘗試從buf中解析,默認複製body內容 virtual Result tryDecode(Slice buf, bool copyBody = true) = 0; //清空消息相關的字段 virtual void clear(); std::map<std::string, std::string> headers; std::string version, body; // body可能較大,爲了不數據複製,加入body2 Slice body2; std::string getHeader(const std::string &n) { return getValueFromMap_(headers, n); } Slice getBody() { return body2.size() ? body2 : (Slice) body; } //若是tryDecode返回Complete,則返回已解析的字節數 int getByte() { return scanned_; } //... }
獲得完整請求幀後就是分析對應的請求方法和請求資源
struct HttpRequest : public HttpMsg { std::map<std::string, std::string> args; std::string method, uri, query_uri; //請求的方法和uri virtual int encode(Buffer &buf); virtual Result tryDecode(Slice buf, bool copyBody = true); //... }
處理完請求以後就是回饋給對應的客戶端
struct HttpResponse : public HttpMsg { std::string statusWord; //example "ok" int status; // example 200 //... }
到了最後纔是最難的網絡封裝部分,先上一個muduo網絡庫的圖,這個是典型的reactor模式的設計,主要借鑑於java的NIO網絡模型的設計
首先有一個事件循環,會實例化一個poller,而後也會導出定時器接口,而後應用層會是tcp或者http服務的套接字會半丁到channel,經過EventLoop的updateloop加入poller對象關注,當有鏈接到來則回調channel中相關回調,最後傳遞到客戶和服務方。handy的設計像是muduo的簡化版本,沒那麼繁雜。even_base中實現和event_imp事件循環(不斷調用poller::loop_once)和計時定時器,Channel通道(文件描述符擁有着,控制關注事件,可讀可寫事件回調),
//event_base.cpp //事件循環類 struct EventsImp { PollerBase *poller_; SafeQueue<Task> tasks_; void loop_once(int waitMs) { poller_->loop_once(std::min(waitMs, nextTimeout_)); handleTimeouts(); } void EventsImp::loop() { while (!exit_) loop_once(10000); //... //添加超時任務 void safeCall(const Task &task) { safeCall(Task(task)); } void safeCall(Task &&task) { tasks_.push(move(task)); wakeup(); } //... }; //通道,封裝了能夠進行epoll的一個fd struct Channel { protected: EventBase *base_; //一個Channel必定屬於一個EventBase PollerBase *poller_; //base_->poller_ int fd_; //初始化綁定的文件描述符 short events_; //當前的關注事件 int64_t id_; //遞增標記 std::function<void()> readcb_, writecb_, errorcb_; //讀寫錯誤回調 // base爲事件管理器,fd爲通道內部的fd,events爲通道關心的事件,構造最後會調用poller_->addChannel(this);加入poller中 Channel(EventBase *base, int fd, int events); //設置回調 void onRead(const Task &readcb) { readcb_ = readcb; } void onWrite(const Task &writecb) { writecb_ = writecb; } void onRead(Task &&readcb) { readcb_ = std::move(readcb); } void onWrite(Task &&writecb) { writecb_ = std::move(writecb); } //啓用讀寫監聽 void enableRead(bool enable); //設置events_;更新通道poller_->updateChannel(this); void enableWrite(bool enable); void enableReadWrite(bool readable, bool writable); bool readEnabled(); //返回是否關注了可讀return events_ & kReadEvent; bool writeEnabled();//返回是否關注了可寫return events_ & kWriteEvent; //處理讀寫事件 void handleRead() { readcb_(); } //在poller的loop_once循環中,會根據struct epoll_event.data.ptr轉換爲Channel,若是可讀則調用對應的handleRead void handleWrite() { writecb_(); }//在poller的loop_once循環中,會根據struct epoll_event.data.ptr轉換爲Channel,若是可寫則調用對應的handleWrite }
在TCP數據能收到(回調)後,重要的是如何保存客戶端的數據,處理完請求後發送給對應的客戶端,由於有多個客戶端的存在,所以要使用TcpConn來記錄哪些TCP到來了,處理結果要回饋給哪一個數據。
//conn.h // Tcp鏈接,使用引用計數 struct TcpConn : public std::enable_shared_from_this<TcpConn> { // Tcp鏈接的個狀態 enum State { Invalid = 1, Handshaking, Connected, Closed, Failed, }; //服務端 static TcpConnPtr createConnection(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) { TcpConnPtr con(new C); con->attach(base, fd, local, peer); return con; } void attach(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) { fatalif((destPort_ <= 0 && state_ != State::Invalid) || (destPort_ >= 0 && state_ != State::Handshaking), "you should use a new TcpConn to attach. state: %d", state_); base_ = base; state_ = State::Handshaking; local_ = local; peer_ = peer; delete channel_; channel_ = new Channel(base, fd, kWriteEvent | kReadEvent); trace("tcp constructed %s - %s fd: %d", local_.toString().c_str(), peer_.toString().c_str(), fd); TcpConnPtr con = shared_from_this(); con->channel_->onRead([=] { con->handleRead(con); }); con->channel_->onWrite([=] { con->handleWrite(con); }); } //發送數據 void sendOutput() { send(output_); }//return ::write(channel_->fd, buf, bytes);if (wd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) //寫對應fd,若是寫失敗關注可寫事件(水平觸發模式) //收到數據 void handleRead(const TcpConnPtr &con) { while (state_ == State::Connected) { input_.makeRoom(); int rd = readImp(channel_->fd(), input_.end(), input_.space()); if(rd > 0) input_.addSize(rd); } //... } //客戶端 template <class C = TcpConn> static TcpConnPtr createConnection(EventBase *base, const std::string &host, unsigned short port, int timeout = 0, const std::string &localip = "") { TcpConnPtr con(new C); con->connect(base, host, port, timeout, localip); //執行connect return con; } public: EventBase *base_; //屬於哪個事件循環 Channel *channel_; //屬於哪個通道 Buffer input_, output_; //輸入和輸出緩衝區 Ip4Addr local_, peer_; //本地的套接字 State state_; //鏈接狀態 TcpCallBack readcb_, writablecb_, statecb_;//讀寫,連入/練出狀態回調 std::string destHost_, localIp_; std::unique_ptr<CodecBase> codec_; //對應codec }; //服務器 struct TcpServer { TcpServer(EventBases *bases); //屬於哪個事件循環 int bind(const std::string &host, unsigned short port, bool reusePort = false); //socket,bind,listen,建立listen_channel設置讀回調爲handleAccept() static TcpServerPtr startServer(EventBases *bases, const std::string &host, unsigned short port, bool reusePort = false); //建立一個TcpServer,並調用bind函數 void onConnState(const TcpCallBack &cb);//有新的鏈接連入 // 消息處理與Read回調衝突,只能調用一個 void onConnMsg(CodecBase *codec, const MsgCallBack &cb) { codec_.reset(codec); msgcb_ = cb; assert(!readcb_); } private: EventBase *base_;//屬於哪個事件循環 Ip4Addr addr_; //服務端地址 Channel *listen_channel_; //服務端的Channel TcpCallBack statecb_, readcb_; //讀寫回調 MsgCallBack msgcb_; //消息回調 std::unique_ptr<CodecBase> codec_; void handleAccept();//有新的鏈接到來,accept獲得客戶套接字cfd,建立一個TcpConnPtr綁定cfd,設置conn的讀寫和消息回調 //... };
以上就是handy的基本分析,總結來講算輕量級的muduo,可能還不該該用在生產環境,畢竟花一天多就能看得七七八八。最後就是示例代碼了。
//example/echo.cc #include <handy/handy.h> using namespace handy; int main(int argc, const char *argv[]) { EventBase base; Signal::signal(SIGINT, [&] { base.exit(); }); TcpServerPtr svr = TcpServer::startServer(&base, "", 2099); exitif(svr == NULL, "start tcp server failed"); svr->onConnRead([](const TcpConnPtr &con) { con->send(con->getInput()); }); base.loop(); }
//example/http-hello.cc #include <handy/handy.h> using namespace std; using namespace handy; int main(int argc, const char *argv[]) { int threads = 1; if (argc > 1) { threads = atoi(argv[1]); } setloglevel("TRACE"); MultiBase base(threads); HttpServer sample(&base); int r = sample.bind("", 8081); exitif(r, "bind failed %d %s", errno, strerror(errno)); sample.onGet("/hello", [](const HttpConnPtr &con) { string v = con.getRequest().version; HttpResponse resp; resp.body = Slice("hello world"); con.sendResponse(resp); if (v == "HTTP/1.0") { con->close(); } }); Signal::signal(SIGINT, [&] { base.exit(); }); base.loop(); return 0; }