sonic核心守護線程orchagent以orch爲單位進行資源管理,一個orch包含了一組類似的資源;orchagent調度系統以Executor爲調度單位,調度實體有Consumer,ExecutableTimer等,本文分析一下sonic調度細節。c++
orchagent以OrchDaemon做爲核心類進行描述。redis
class OrchDaemon { public: OrchDaemon(DBConnector *, DBConnector *, DBConnector *); ~OrchDaemon(); bool init();//初始化進程 void start();//啓動調度系統 private: //鏈接了三個數據庫 DBConnector *m_applDb; DBConnector *m_configDb; DBConnector *m_stateDb; //包含全部的orch std::vector<Orch *> m_orchList; //建立的select多路異步IO控制塊 Select *m_select; //將asic_db的pipe進行flush,不在等待。 void flush(); };
初始化orchagent執行環境。數據庫
bool OrchDaemon::init() { SWSS_LOG_ENTER(); ...... //鏈接數據庫 TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME); TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME); TableConnector stateDbLagTable(m_stateDb, STATE_LAG_TABLE_NAME); vector<TableConnector> acl_table_connectors = { confDbAclTable, confDbAclRuleTable, stateDbLagTable }; ...... //收集對應的orch m_orchList = { switch_orch, gCrmOrch, gBufferOrch, gPortsOrch, intfs_orch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, mirror_orch, gAclOrch, gFdbOrch, vrf_orch }; //建立多路事件控制塊 m_select = new Select(); ...... //除了上面的orch外,其它代碼還添加了一些orch,這裏再也不列出 return true; }
將asic_db的生產者的pipeline清空json
/* Flush redis through sairedis interface */ void OrchDaemon::flush() { SWSS_LOG_ENTER(); sai_attribute_t attr; attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH; sai_status_t status = sai_switch_api->set_switch_attribute(gSwitchId, &attr); if (status != SAI_STATUS_SUCCESS) { SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status); exit(EXIT_FAILURE); } }
void OrchDaemon::start() { SWSS_LOG_ENTER(); //遍歷每個orch,將每個orch中的全部關心的事件加入epoll中,基本一個Executor爲一個事件 for (Orch *o : m_orchList) { m_select->addSelectables(o->getSelectables()); } //進入dead loop while (true) { Selectable *s; int ret; // 進行epoll阻塞監聽 ret = m_select->select(&s, SELECT_TIMEOUT); // 錯誤事件,繼續監聽 if (ret == Select::ERROR) { SWSS_LOG_NOTICE("Error: %s!\n", strerror(errno)); continue; } //超時事件 if (ret == Select::TIMEOUT) { /* Let sairedis to flush all SAI function call to ASIC DB. * Normally the redis pipeline will flush when enough request * accumulated. Still it is possible that small amount of * requests live in it. When the daemon has nothing to do, it * is a good chance to flush the pipeline * 確保redis-pipeline中的少許請求在10秒後可以獲得處理,不在積累請求 */ flush(); continue; } //獲取觸發epoll事件的Executor auto *c = (Executor *)s; //對於Consumer來講,執行數據庫操做,將redis中的通知轉換到m_tosync中,而且執行如下m_tosync中的task c->execute(); /* After each iteration, periodically check all m_toSync map to * execute all the remaining tasks that need to be retried. */ /* TODO: Abstract Orch class to have a specific todo list */ // 執行其它的orch中遺留的任務 for (Orch *o : m_orchList) o->doTask(); } }
下面咱們以orchagent處理的最多的事件:ConsumerStateTable來分析一下orchagent調度系統。api
ConsumerStateTable即orchagent訂閱的app_db事件。格式以下所示:app
"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8" #在集合INTF_TABLE_KEY_SET中增長一個key "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4" "PUBLISH" "INTF_TABLE_CHANNEL" "G"
1) "message" 2) "INTF_TABLE_CHANNEL" 3) "G"
local ret = {} local keys = redis.call('SPOP', KEYS[1], ARGV[1])--一次處理128個key local n = table.getn(keys) for i = 1, n do local key = keys[i] local values = redis.call('HGETALL', KEYS[2] .. key) table.insert(ret, {key, values}) end return ret
腳本返回的內容將會是:異步
INTF_TABLE:PortChannel1:1.1.1.1/8:{ "scope":"global", "family": "IPv4" }
pops而後對上面的內容進行加工成以下格式:函數
std::tuple<std::string, std::string, std::vector<FieldValueTuple> >oop
即最後返回:lua
"SET", "INTF_TABLE:PortChannel1:1.1.1.1/8", <"scope":"global","family": "IPv4">
sonic調度系統過於簡單,沒法處理大規模,邏輯複雜的業務,效率很是低下。