sonic orch調度系統(1)----select

sonic orch調度系統之----select

​ 常見的服務器模型有多進程模型,多線程,IO多路複用,協程等模型。sonic的核心守護進程orchagent採用的是IO多路複用模型,早期的sonic採用的是select實現多路複用,後面的版本採用的是epoll。使用select(跟多路複用的select名字同樣)類對底層進行了封裝,屏蔽了差別。c++

class Selectable

事件基類,描述了epoll事件,能夠是讀,寫,異常等事件。該結構對通用epoll事件進行了封裝,真實事件經過該類派生出來,好比redis數據庫事件:class RedisSelect : public Selectable;netlink事件:class NetLink : public Selectable;通知:class NotificationConsumer : public Selectable,orch執行單元:class Executor : public Selectable,定時器:class SelectableTimer : public Selectable等。redis

class Selectable
{
public:
    Selectable(int pri = 0) : m_priority(pri),
                              m_last_used_time(std::chrono::steady_clock::now()) {
                              lastusedsequence = g_lastusedsequence++;}

    virtual ~Selectable() = default;

    /* return file handler for the Selectable */
    virtual int getFd() = 0;

    /* Read all data from the fd assicaited with Selectable */
    virtual void readData() = 0;

    /* true if Selectable has data in its cache */
    // 判斷是否還有數據,若是有放入就緒事件set
    virtual bool hasCachedData()
    {
        return false;
    }

    /* true if Selectable was initialized with data */
    // 判斷是否須要讀取初始數據
    virtual bool initializedWithData()
    {
        return false;
    }

    /* run this function after every read */
    // 更新事件數
    virtual void updateAfterRead()
    {
    }

    int getPri() const
    {
        return m_priority;
    }

private:

    friend class Select;//友元類爲Select

    // only Select class can access and update m_last_used_time

    std::chrono::time_point<std::chrono::steady_clock> getLastUsedTime() const
    {
        return m_last_used_time;
    }
    // 最後使用序列號
    unsigned long getLastUsedsequence() const
    {
        return lastusedsequence;
    }
    // 跟新最後使用序列號
    void updateLastUsedTime()
    {
        m_last_used_time = std::chrono::steady_clock::now();
        lastusedsequence = g_lastusedsequence++;
    }
    // 優先級,實現基於優先級調度
    int m_priority; // defines priority of Selectable inside Select
                    // higher value is higher priority
    std::chrono::time_point<std::chrono::steady_clock> m_last_used_time;
    unsigned long lastusedsequence;//上次使用序列號
    static unsigned long g_lastusedsequence;//全局基準序列號,用於對同優先級業務進行公平調度
};

class Select

class Select
{
public:
    Select();
    ~Select();

    /* Add object for select 給epoll添加一個事件 */
    void addSelectable(Selectable *selectable);

    /* Remove object from select 刪除一個epoll事件 */
    void removeSelectable(Selectable *selectable);

    /* Add multiple objects for select 添加多個epoll事件 */
    void addSelectables(std::vector<Selectable *> selectables);

    enum {//返回的事件類型
        OBJECT = 0,
        ERROR = 1,
        TIMEOUT = 2,
    };
    //執行epoll 
    int select(Selectable **c, unsigned int timeout = std::numeric_limits<unsigned int>::max());
    int select(std::vector<Selectable *> &vc, unsigned int timeout = std::numeric_limits<unsigned int>::max());

private:
    //epoll事件比較函數,經過該函數實現事件的優先級
    struct cmp
    {
        bool operator()(const Selectable* a, const Selectable* b) const
        {
            /* Choose Selectable with highest priority first */
            if (a->getPri() > b->getPri())
                return true;
            else if (a->getPri() < b->getPri())
                return false;

            /* if the priorities are equal */
            /* use Selectable which was selected later */
            if (a->getLastUsedsequence() < b->getLastUsedsequence())
                return true;
            else if (a->getLastUsedsequence() > b->getLastUsedsequence())
                return false;

            /* when a == b */
            return false;
        }
    };
    //epoll輪詢函數
    int poll_descriptors(Selectable **c, unsigned int timeout);
    int poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout);

    int m_epoll_fd;//epoll句柄
    std::unordered_map<int, Selectable *> m_objects;//監聽的事件句柄與其對應的selectable之間的關係
    std::set<Selectable *, Select::cmp> m_ready;//已經就緒的事件集合,提供了比較函數,從而實現優先級調度
};

Select::Select()

Select::Select()
{
    m_epoll_fd = ::epoll_create1(0);//建立epoll句柄
    if (m_epoll_fd == -1)
    {
        std::string error = std::string("Select::constructor:epoll_create1: error=("
                          + std::to_string(errno) + "}:"
                          + strerror(errno));
        throw std::runtime_error(error);
    }
}

Select::~Select()

Select::~Select()
{
    (void)::close(m_epoll_fd);
}

void Select::addSelectable(Selectable *selectable)

void Select::addSelectable(Selectable *selectable)
{
    const int fd = selectable->getFd();

    if(m_objects.find(fd) != m_objects.end())//已經添加了該事件,退出
    {
        SWSS_LOG_WARN("Selectable is already added to the list, ignoring.");
        return;
    }

    m_objects[fd] = selectable;

    if (selectable->initializedWithData())//是否已經有數據可讀,讀出已有的數據
    {
        m_ready.insert(selectable);
    }
    //添加可讀事件
    struct epoll_event ev = {
        .events = EPOLLIN,
        .data = { .fd = fd, },
    };

    int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
    if (res == -1)
    {
        std::string error = std::string("Select::add_fd:epoll_ctl: error=("
                          + std::to_string(errno) + "}:"
                          + strerror(errno));
        throw std::runtime_error(error);
    }
}

void Select::removeSelectable(Selectable *selectable)

void Select::removeSelectable(Selectable *selectable)
{
    const int fd = selectable->getFd();

    m_objects.erase(fd);
    m_ready.erase(selectable);
    //從epoll中刪除事件
    int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL);
    if (res == -1)
    {
        std::string error = std::string("Select::del_fd:epoll_ctl: error=("
                          + std::to_string(errno) + "}:"
                          + strerror(errno));
        throw std::runtime_error(error);
    }
}

void Select::addSelectables(vector<Selectable *> selectables)

void Select::addSelectables(vector<Selectable *> selectables)
{
    for(auto it : selectables)//添加多個事件
    {
        addSelectable(it);
    }
}

int Select::poll_descriptors(......)

提取一個就緒事件數據庫

int Select::poll_descriptors(Selectable **c, unsigned int timeout)
{
    int sz_selectables = static_cast<int>(m_objects.size());
    std::vector<struct epoll_event> events(sz_selectables);
    int ret;
    //阻塞等待事件發生,發生錯誤或者被中斷打斷則繼續監聽,發生事件則執行事件
    do
    {
        ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
    }
    while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal

    if (ret < 0)
        return Select::ERROR;
    //遍歷每個發生的事件
    for (int i = 0; i < ret; ++i)
    {
        int fd = events[i].data.fd;
        Selectable* sel = m_objects[fd];//獲取事件描述符
        sel->readData();//讀取數據
        m_ready.insert(sel);//插入就緒集合
    }
    //存在就緒事件
    if (!m_ready.empty())
    {
        auto sel = *m_ready.begin();

        *c = sel;

        m_ready.erase(sel);
        // we must update clock only when the selector out of the m_ready
        // otherwise we break invariant of the m_ready
        // 更新該事件的使用時間,使用事件會做爲事件優先級進行使用,越頻繁的優先級越低,從而避免同優先級的事件
        // 餓死
        sel->updateLastUsedTime();
        // 有數據,依然放入已經就緒集合
        if (sel->hasCachedData())
        {
            // reinsert Selectable back to the m_ready set, when there're more messages in the cache
            m_ready.insert(sel);
        }
        // 對數據進行刷新,若是該句柄只發生了一次事件,那麼這裏會進行減1,下次m_ready中將不會存在該sel。
        // 仔細分析了sonic的selectable的實現,這裏是有bug的,會形成大量的空轉。
        sel->updateAfterRead();

        return Select::OBJECT;
    }

    return Select::TIMEOUT;
}

int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)

提取多個就緒事件,該函數是在上面的函數的基礎上的改進。只提取一個事件將會形成"餓死和脹死"的問題。因爲m_ready是有序隊列,對於高優先的事件老是會被優先提取,若是高優先級的事件依賴於低優先級事件的話,會形成高優先級的業務一直被調度,可是缺乏依賴條件而不能執行業務,低優先級業務老是得不到調度,造成死鎖問題。同時提取全部就緒事件能夠解決高低優先級死鎖問題。服務器

int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)
{
    int sz_selectables = static_cast<int>(m_objects.size());
    std::vector<struct epoll_event> events(sz_selectables);
    int ret;

    do
    {
        ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
    }
    while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal

    if (ret < 0)
        return Select::ERROR;

    for (int i = 0; i < ret; ++i)
    {
        int fd = events[i].data.fd;
        Selectable* sel = m_objects[fd];
        sel->readData();
        m_ready.insert(sel);
    }
    
    auto iter = m_ready.begin();
    while(iter !=m_ready.end())
    {
        auto sel = *iter;
        vc.push_back(sel);
        iter = m_ready.erase(iter);
        sel->updateLastUsedTime();
    }

    for(auto se:vc)
    {
        if (se->hasCachedData())
        {
            m_ready.insert(se);
        }
        se->updateAfterRead();
    }

    if(!vc.empty())
    {
        return Select::OBJECT;
    }
    
    return Select::TIMEOUT;
}

int Select::select(Selectable **c, unsigned int timeout)

int Select::select(Selectable **c, unsigned int timeout)
{
    SWSS_LOG_ENTER();

    int ret;

    *c = NULL;
    if (timeout == numeric_limits<unsigned int>::max())
        timeout = -1;

    /* check if we have some data */
    ret = poll_descriptors(c, 0);

    /* return if we have data, we have an error or desired timeout was 0 */
    if (ret != Select::TIMEOUT || timeout == 0)
        return ret;

    /* wait for data */
    ret = poll_descriptors(c, timeout);

    return ret;
}

int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)

int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)
{
    SWSS_LOG_ENTER();

    int ret;

    if (timeout == numeric_limits<unsigned int>::max())
        timeout = -1;

    /* check if we have some data */
    ret = poll_descriptors(vc, 0);

    /* return if we have data, we have an error or desired timeout was 0 */
    if (ret != Select::TIMEOUT || timeout == 0)
        return ret;

    /* wait for data */
    ret = poll_descriptors(vc, timeout);

    return ret;

}
相關文章
相關標籤/搜索