常見的服務器模型有多進程模型,多線程,IO多路複用,協程等模型。sonic的核心守護進程orchagent採用的是IO多路複用模型,早期的sonic採用的是select實現多路複用,後面的版本採用的是epoll。使用select(跟多路複用的select名字同樣)類對底層進行了封裝,屏蔽了差別。c++
事件基類,描述了epoll事件,能夠是讀,寫,異常等事件。該結構對通用epoll事件進行了封裝,真實事件經過該類派生出來,好比redis數據庫事件:class RedisSelect : public Selectable;netlink事件:class NetLink : public Selectable;通知:class NotificationConsumer : public Selectable,orch執行單元:class Executor : public Selectable,定時器:class SelectableTimer : public Selectable等。redis
class Selectable { public: Selectable(int pri = 0) : m_priority(pri), m_last_used_time(std::chrono::steady_clock::now()) { lastusedsequence = g_lastusedsequence++;} virtual ~Selectable() = default; /* return file handler for the Selectable */ virtual int getFd() = 0; /* Read all data from the fd assicaited with Selectable */ virtual void readData() = 0; /* true if Selectable has data in its cache */ // 判斷是否還有數據,若是有放入就緒事件set virtual bool hasCachedData() { return false; } /* true if Selectable was initialized with data */ // 判斷是否須要讀取初始數據 virtual bool initializedWithData() { return false; } /* run this function after every read */ // 更新事件數 virtual void updateAfterRead() { } int getPri() const { return m_priority; } private: friend class Select;//友元類爲Select // only Select class can access and update m_last_used_time std::chrono::time_point<std::chrono::steady_clock> getLastUsedTime() const { return m_last_used_time; } // 最後使用序列號 unsigned long getLastUsedsequence() const { return lastusedsequence; } // 跟新最後使用序列號 void updateLastUsedTime() { m_last_used_time = std::chrono::steady_clock::now(); lastusedsequence = g_lastusedsequence++; } // 優先級,實現基於優先級調度 int m_priority; // defines priority of Selectable inside Select // higher value is higher priority std::chrono::time_point<std::chrono::steady_clock> m_last_used_time; unsigned long lastusedsequence;//上次使用序列號 static unsigned long g_lastusedsequence;//全局基準序列號,用於對同優先級業務進行公平調度 };
class Select { public: Select(); ~Select(); /* Add object for select 給epoll添加一個事件 */ void addSelectable(Selectable *selectable); /* Remove object from select 刪除一個epoll事件 */ void removeSelectable(Selectable *selectable); /* Add multiple objects for select 添加多個epoll事件 */ void addSelectables(std::vector<Selectable *> selectables); enum {//返回的事件類型 OBJECT = 0, ERROR = 1, TIMEOUT = 2, }; //執行epoll int select(Selectable **c, unsigned int timeout = std::numeric_limits<unsigned int>::max()); int select(std::vector<Selectable *> &vc, unsigned int timeout = std::numeric_limits<unsigned int>::max()); private: //epoll事件比較函數,經過該函數實現事件的優先級 struct cmp { bool operator()(const Selectable* a, const Selectable* b) const { /* Choose Selectable with highest priority first */ if (a->getPri() > b->getPri()) return true; else if (a->getPri() < b->getPri()) return false; /* if the priorities are equal */ /* use Selectable which was selected later */ if (a->getLastUsedsequence() < b->getLastUsedsequence()) return true; else if (a->getLastUsedsequence() > b->getLastUsedsequence()) return false; /* when a == b */ return false; } }; //epoll輪詢函數 int poll_descriptors(Selectable **c, unsigned int timeout); int poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout); int m_epoll_fd;//epoll句柄 std::unordered_map<int, Selectable *> m_objects;//監聽的事件句柄與其對應的selectable之間的關係 std::set<Selectable *, Select::cmp> m_ready;//已經就緒的事件集合,提供了比較函數,從而實現優先級調度 };
Select::Select() { m_epoll_fd = ::epoll_create1(0);//建立epoll句柄 if (m_epoll_fd == -1) { std::string error = std::string("Select::constructor:epoll_create1: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } }
Select::~Select() { (void)::close(m_epoll_fd); }
void Select::addSelectable(Selectable *selectable) { const int fd = selectable->getFd(); if(m_objects.find(fd) != m_objects.end())//已經添加了該事件,退出 { SWSS_LOG_WARN("Selectable is already added to the list, ignoring."); return; } m_objects[fd] = selectable; if (selectable->initializedWithData())//是否已經有數據可讀,讀出已有的數據 { m_ready.insert(selectable); } //添加可讀事件 struct epoll_event ev = { .events = EPOLLIN, .data = { .fd = fd, }, }; int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (res == -1) { std::string error = std::string("Select::add_fd:epoll_ctl: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } }
void Select::removeSelectable(Selectable *selectable) { const int fd = selectable->getFd(); m_objects.erase(fd); m_ready.erase(selectable); //從epoll中刪除事件 int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL); if (res == -1) { std::string error = std::string("Select::del_fd:epoll_ctl: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } }
void Select::addSelectables(vector<Selectable *> selectables) { for(auto it : selectables)//添加多個事件 { addSelectable(it); } }
提取一個就緒事件數據庫
int Select::poll_descriptors(Selectable **c, unsigned int timeout) { int sz_selectables = static_cast<int>(m_objects.size()); std::vector<struct epoll_event> events(sz_selectables); int ret; //阻塞等待事件發生,發生錯誤或者被中斷打斷則繼續監聽,發生事件則執行事件 do { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); } while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal if (ret < 0) return Select::ERROR; //遍歷每個發生的事件 for (int i = 0; i < ret; ++i) { int fd = events[i].data.fd; Selectable* sel = m_objects[fd];//獲取事件描述符 sel->readData();//讀取數據 m_ready.insert(sel);//插入就緒集合 } //存在就緒事件 if (!m_ready.empty()) { auto sel = *m_ready.begin(); *c = sel; m_ready.erase(sel); // we must update clock only when the selector out of the m_ready // otherwise we break invariant of the m_ready // 更新該事件的使用時間,使用事件會做爲事件優先級進行使用,越頻繁的優先級越低,從而避免同優先級的事件 // 餓死 sel->updateLastUsedTime(); // 有數據,依然放入已經就緒集合 if (sel->hasCachedData()) { // reinsert Selectable back to the m_ready set, when there're more messages in the cache m_ready.insert(sel); } // 對數據進行刷新,若是該句柄只發生了一次事件,那麼這裏會進行減1,下次m_ready中將不會存在該sel。 // 仔細分析了sonic的selectable的實現,這裏是有bug的,會形成大量的空轉。 sel->updateAfterRead(); return Select::OBJECT; } return Select::TIMEOUT; }
提取多個就緒事件,該函數是在上面的函數的基礎上的改進。只提取一個事件將會形成"餓死和脹死"的問題。因爲m_ready是有序隊列,對於高優先的事件老是會被優先提取,若是高優先級的事件依賴於低優先級事件的話,會形成高優先級的業務一直被調度,可是缺乏依賴條件而不能執行業務,低優先級業務老是得不到調度,造成死鎖問題。同時提取全部就緒事件能夠解決高低優先級死鎖問題。服務器
int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout) { int sz_selectables = static_cast<int>(m_objects.size()); std::vector<struct epoll_event> events(sz_selectables); int ret; do { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); } while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal if (ret < 0) return Select::ERROR; for (int i = 0; i < ret; ++i) { int fd = events[i].data.fd; Selectable* sel = m_objects[fd]; sel->readData(); m_ready.insert(sel); } auto iter = m_ready.begin(); while(iter !=m_ready.end()) { auto sel = *iter; vc.push_back(sel); iter = m_ready.erase(iter); sel->updateLastUsedTime(); } for(auto se:vc) { if (se->hasCachedData()) { m_ready.insert(se); } se->updateAfterRead(); } if(!vc.empty()) { return Select::OBJECT; } return Select::TIMEOUT; }
int Select::select(Selectable **c, unsigned int timeout) { SWSS_LOG_ENTER(); int ret; *c = NULL; if (timeout == numeric_limits<unsigned int>::max()) timeout = -1; /* check if we have some data */ ret = poll_descriptors(c, 0); /* return if we have data, we have an error or desired timeout was 0 */ if (ret != Select::TIMEOUT || timeout == 0) return ret; /* wait for data */ ret = poll_descriptors(c, timeout); return ret; }
int Select::select(std::vector<Selectable *> &vc, unsigned int timeout) { SWSS_LOG_ENTER(); int ret; if (timeout == numeric_limits<unsigned int>::max()) timeout = -1; /* check if we have some data */ ret = poll_descriptors(vc, 0); /* return if we have data, we have an error or desired timeout was 0 */ if (ret != Select::TIMEOUT || timeout == 0) return ret; /* wait for data */ ret = poll_descriptors(vc, timeout); return ret; }