sonic orch調度系統之----orchagent

​ sonic核心守護線程orchagent以orch爲單位進行資源管理,一個orch包含了一組類似的資源;orchagent調度系統以Executor爲調度單位,調度實體有Consumer,ExecutableTimer等,本文分析一下sonic調度細節。c++

class OrchDaemon

orchagent以OrchDaemon做爲核心類進行描述。redis

class OrchDaemon
{
public:
    OrchDaemon(DBConnector *, DBConnector *, DBConnector *);
    ~OrchDaemon();

    bool init();//初始化進程
    void start();//啓動調度系統
private:
    //鏈接了三個數據庫
    DBConnector *m_applDb;
    DBConnector *m_configDb;
    DBConnector *m_stateDb;
    //包含全部的orch
    std::vector<Orch *> m_orchList;
    //建立的select多路異步IO控制塊
    Select *m_select;
    //將asic_db的pipe進行flush,不在等待。
    void flush();
};

bool OrchDaemon::init()

初始化orchagent執行環境。數據庫

bool OrchDaemon::init()
{
    SWSS_LOG_ENTER();

    ......
        
    //鏈接數據庫
    TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME);
    TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
    TableConnector stateDbLagTable(m_stateDb, STATE_LAG_TABLE_NAME);

    vector<TableConnector> acl_table_connectors = {
        confDbAclTable,
        confDbAclRuleTable,
        stateDbLagTable
    };

    ......
    
    //收集對應的orch
    m_orchList = { switch_orch, gCrmOrch, gBufferOrch, gPortsOrch, intfs_orch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, mirror_orch, gAclOrch, gFdbOrch, vrf_orch };
    //建立多路事件控制塊
    m_select = new Select();

    ......
    //除了上面的orch外,其它代碼還添加了一些orch,這裏再也不列出

    return true;
}

void OrchDaemon::flush()

將asic_db的生產者的pipeline清空json

/* Flush redis through sairedis interface */
void OrchDaemon::flush()
{
    SWSS_LOG_ENTER();

    sai_attribute_t attr;
    attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH;
    sai_status_t status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
    if (status != SAI_STATUS_SUCCESS)
    {
        SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
        exit(EXIT_FAILURE);
    }
}

void OrchDaemon::start()

void OrchDaemon::start()
{
    SWSS_LOG_ENTER();
    //遍歷每個orch,將每個orch中的全部關心的事件加入epoll中,基本一個Executor爲一個事件
    for (Orch *o : m_orchList)
    {
        m_select->addSelectables(o->getSelectables());
    }
    //進入dead loop
    while (true)
    {
        Selectable *s;
        int ret;
        // 進行epoll阻塞監聽
        ret = m_select->select(&s, SELECT_TIMEOUT);
        // 錯誤事件,繼續監聽
        if (ret == Select::ERROR)
        {
            SWSS_LOG_NOTICE("Error: %s!\n", strerror(errno));
            continue;
        }
        //超時事件
        if (ret == Select::TIMEOUT)
        {
            /* Let sairedis to flush all SAI function call to ASIC DB.
             * Normally the redis pipeline will flush when enough request
             * accumulated. Still it is possible that small amount of
             * requests live in it. When the daemon has nothing to do, it
             * is a good chance to flush the pipeline  
             * 確保redis-pipeline中的少許請求在10秒後可以獲得處理,不在積累請求
             */
            flush();
            continue;
        }
        //獲取觸發epoll事件的Executor
        auto *c = (Executor *)s;
        //對於Consumer來講,執行數據庫操做,將redis中的通知轉換到m_tosync中,而且執行如下m_tosync中的task
        c->execute();

        /* After each iteration, periodically check all m_toSync map to
         * execute all the remaining tasks that need to be retried. */

        /* TODO: Abstract Orch class to have a specific todo list */
        // 執行其它的orch中遺留的任務
        for (Orch *o : m_orchList)
            o->doTask();

    }
}

實例分析

下面咱們以orchagent處理的最多的事件:ConsumerStateTable來分析一下orchagent調度系統。api

ConsumerStateTable即orchagent訂閱的app_db事件。格式以下所示:app

"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8"        #在集合INTF_TABLE_KEY_SET中增長一個key
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" 
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
"PUBLISH" "INTF_TABLE_CHANNEL" "G"
  • 當生產者發送命令"PUBLISH" "INTF_TABLE_CHANNEL" "G",orchagent從epoll中被喚醒,而後從對應的redis客戶端句柄中調用RedisSelect::readData()進行數據讀取該命令的應答,應答以下所示:
1) "message"
2) "INTF_TABLE_CHANNEL"
3) "G"
  • 每收到一個應答都會對RedisSelect::m_queueLength進行加1,該應答只有一個信息"G",只起到通知做用。因此若是生產者同時寫入了大量的app_db事件,某一個epoll喚醒,RedisSelect::readData()能夠讀出大量的應答,從而致使m_queueLength大於1。
  • 讀完數據後將該事件Selectable添加到Select:m_ready中。同時經過調用RedisSelect::updateAfterRead()對RedisSelect::m_queueLength進行減1,這裏只減去了1,而沒有根據實際處理的狀況減去對應的值,這裏存在增長多個值減去一個值的狀況。只要m_queueLength不爲1,就不會將Selectable從Select:m_ready刪除。
  • Select::select函數會返回Select:m_ready中全部的Selectable。
  • 遍歷每個selectable,在函數Consumer::execute()中調用TableEntryPoppable::pops使用腳本consumer_state_table_pops.lua處理INTF_TABLE_KEY_SET中的key,每次最多處理128個key。
local ret = {}
local keys = redis.call('SPOP', KEYS[1], ARGV[1])--一次處理128個key
local n = table.getn(keys)
for i = 1, n do
   local key = keys[i]
   local values = redis.call('HGETALL', KEYS[2] .. key)
   table.insert(ret, {key, values})
end
return ret

腳本返回的內容將會是:異步

INTF_TABLE:PortChannel1:1.1.1.1/8:{
    "scope":"global",
    "family": "IPv4"
}

pops而後對上面的內容進行加工成以下格式:函數

std::tuple<std::string, std::string, std::vector<FieldValueTuple> >oop

即最後返回:lua

"SET", "INTF_TABLE:PortChannel1:1.1.1.1/8", <"scope":"global","family": "IPv4">
  • Consumer::execute()會對前面返回的值添加到Consumer::m_toSync,這裏會進行合併。
  • 最後OrchDaemon::start()函數會執行每個orch的Consumer::m_toSync中的task。

sonic調度系統的缺陷

sonic調度系統過於簡單,沒法處理大規模,邏輯複雜的業務,效率很是低下。

  1. 當超規格下發配置的時候,會致使m_toSync中由於資源不足而駐留大量的task。OrchDaemon::start()每一次事件觸發都會遍歷全部orch的m_toSync中的task,形成無效task頻繁執行,引起orchagent發生震盪。必須給Consumer增長一些標記,代表當前Consumer處於資源不足狀態,暫不執行m_toSync中的task
  2. 當大規模下發配置的時候,若是本配置的依賴尚未下發,會致使m_toSync中由於依賴不知足而駐留大量的task。引起orchagent震盪問題。
  3. 當大規模下刪除配置的時候,若是本配置引用尚未刪除,會致使m_toSync中由於被引用不能刪除而駐留大量的task。引起orchagent震盪問題。
  4. orchagent在處理task的時候沒有考慮事件處理順序,應該首先執行DEL操做,而後再執行SET操做。對於DEL操做,應該先刪除低優先級的Consumer,對於SET操做來講,應該先處理高優先級的Consumer。二者順序相反。
相關文章
相關標籤/搜索