sonic orchagent線程的調度最小單位是Consumer。Consumer是在epoll事件Selectable的基礎上的進一步封裝,每一次發生epoll事件會觸發orchagent進行一次調度。orch是資源的集合,一個orch能夠包含多個Consumer,好比acl orch會監聽多個redistable。c++
// Design assumption // 1. one Orch can have one or more Executor // 2. one Executor must belong to one and only one Orch // 3. Executor will hold an pointer to new-ed selectable, and delete it during dtor // 設計假設: // 1. 一個orch能夠擁有一個或者多個Executor // 2. 一個Executor必須屬於一個orch並且僅僅屬於一個orch // 3. Executor有一個指針指向一個new出來的Selectable結構,必須在析構函數中將其刪除,不然會泄漏 class Executor : public Selectable { public: Executor(Selectable *selectable, Orch *orch) : m_selectable(selectable) , m_orch(orch) { } virtual ~Executor() { delete m_selectable; } // Decorating Selectable int getFd() override { return m_selectable->getFd(); } void readData() override { m_selectable->readData(); } bool hasCachedData() override { return m_selectable->hasCachedData(); } bool initializedWithData() override { return m_selectable->initializedWithData(); } void updateAfterRead() override { m_selectable->updateAfterRead(); } Orch * getorch() { return m_orch; } // Disable copying Executor(const Executor&) = delete; Executor& operator=(const Executor&) = delete; // Execute on event happening // execute執行事件,drain是一個輔助函數 virtual void execute() { } virtual void drain() { } protected: Selectable *m_selectable;//指向new出來的Selectable Orch *m_orch;//指向一個orch // Get the underlying selectable 獲取指向的Selectable Selectable *getSelectable() const { return m_selectable; } };
class Executor只是一箇中間的派生類,orch直接使用的是class Consumer和class ExecutableTimer。redis
消費者類通常用於處理app_db的訂閱事件,對於asic_db通常是處理syncd的應答事件。數據庫
typedef std::pair<std::string, std::string> FieldValueTuple; #define fvField std::get<0> #define fvValue std::get<1> typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple; #define kfvKey std::get<0> #define kfvOp std::get<1> #define kfvFieldsValues std::get<2> typedef map<string, KeyOpFieldsValuesTuple> SyncMap; class Consumer : public Executor { public: Consumer(TableConsumable *select, Orch *orch) : Executor(select, orch) { } TableConsumable *getConsumerTable() const { return static_cast<TableConsumable *>(getSelectable()); } string getTableName() const { return getConsumerTable()->getTableName(); } // 事物執行 void execute(); void drain(); /* Store the latest 'golden' status */ // TODO: hide? SyncMap m_toSync; };
epoll事件觸發後,須要調用該函數從數據庫中讀取出指定key的內容,將其加工後存放在m_toSync中,供後續處理。json
void Consumer::execute() { SWSS_LOG_ENTER(); std::deque<KeyOpFieldsValuesTuple> entries; //調用pops函數,從redis數據庫中讀取數據,返回KeyOpFieldsValuesTuple結構 getConsumerTable()->pops(entries); /* Nothing popped */ if (entries.empty()) { return; } // 遍歷每個事件 for (auto& entry: entries) { string key = kfvKey(entry); string op = kfvOp(entry); /* Record incoming tasks 記錄事件 */ if (gSwssRecord) { Orch::recordTuple(*this, entry); } /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */ // 在這裏進行一次合併,對於刪除事件,直接覆蓋 if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND) { m_toSync[key] = entry; } /* If an old task is still there, we combine the old task with new task */ /* */ else { KeyOpFieldsValuesTuple existing_data = m_toSync[key]; auto new_values = kfvFieldsValues(entry); auto existing_values = kfvFieldsValues(existing_data); //遍歷每個新的值 for (auto it : new_values) { string field = fvField(it); string value = fvValue(it); auto iu = existing_values.begin(); while (iu != existing_values.end())//遍歷每個舊的值 { string ofield = fvField(*iu); if (field == ofield)//相同的域,將老的值覆蓋,這裏應該跳出while,代碼效率較差 iu = existing_values.erase(iu); else iu++; } /* 將新的值添加進去 */ existing_values.push_back(FieldValueTuple(field, value)); } m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); } } //執行全部整理好的任務。 drain(); }
假設有一個task的鍵值對以下:app
key=test;op=set;value={ A:a, B:b, C:c, }
第一次觸發任務是在APP_DB中寫入了:ide
key=test;op=set;value={ A:a, B:b }
加入orchagent只是將該任務讀取到了m_toSync中,因爲某種緣由沒有執行完該任務,依然駐留在m_toSync中。第二次寫入了:函數
key=test;op=set;value={ B:b1, C:c }
那麼通過execute函數後m_toSync中將會是:ui
key=test;op=set;value={ A:a, B:b1, C:c }
執行m_toSync中的任務。this
void Consumer::drain() { if (!m_toSync.empty()) m_orch->doTask(*this); }
class Orch { public: //每一個orch都會鏈接到數據庫,以及其須要訂閱的表名,和訂閱該表產生的事件的優先級 //以默認優先級訂閱一個table Orch(DBConnector *db, const string tableName, int pri = default_orch_pri); //以默認優先級訂閱多個table Orch(DBConnector *db, const vector<string> &tableNames); //訂閱多個table,指明每一個table的優先級 Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNameWithPri); //鏈接多個數據庫 Orch(const vector<TableConnector>& tables); virtual ~Orch(); // 獲取該orch的全部epoll事件 vector<Selectable*> getSelectables(); /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */ // 執行該orch中全部的consumers中的m_sync中的任務 void doTask(); /* Run doTask against a specific executor */ // 任務的來源能夠是consumer,NotificationConsumer,SelectableTimer virtual void doTask(Consumer &consumer) = 0; virtual void doTask(NotificationConsumer &consumer) { } virtual void doTask(SelectableTimer &timer) { } /* TODO: refactor recording */ static void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); protected: // 消費者map,一個orch能夠訂閱多個table,key爲tableName,value爲Executor ConsumerMap m_consumerMap; // 與調試相關 static void logfileReopen(); string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); ref_resolve_status resolveFieldRefValue(type_map&, const string&, KeyOpFieldsValuesTuple&, sai_object_id_t&); bool parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uint32_t &range_high); bool parseReference(type_map &type_maps, string &ref, string &table_name, string &object_name); ref_resolve_status resolveFieldRefArray(type_map&, const string&, KeyOpFieldsValuesTuple&, vector<sai_object_id_t>&); /* Note: consumer will be owned by this class */ // 內部函數添加一個Executor,給addConsumer使用 void addExecutor(string executorName, Executor* executor); Executor *getExecutor(string executorName); private: // 添加一個消費者 void addConsumer(DBConnector *db, string tableName, int pri = default_orch_pri); };
void Orch::addExecutor(string executorName, Executor* executor) { m_consumerMap.emplace(std::piecewise_construct, std::forward_as_tuple(executorName), std::forward_as_tuple(executor)); } //添加一個消費者 void Orch::addConsumer(DBConnector *db, string tableName, int pri) { if (db->getDbId() == CONFIG_DB || db->getDbId() == STATE_DB) { addExecutor(tableName, new Consumer(new SubscriberStateTable(db, tableName, TableConsumable::DEFAULT_POP_BATCH_SIZE, pri), this)); } else { addExecutor(tableName, new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this)); } }
執行本orch中的每個消費者m_toSync中的task,無論該task是否本次從redis中讀取仍是之前未處理完畢的。線程
void Orch::doTask() { for(auto &it : m_consumerMap) { it.second->drain(); } }
orch2是在orch的基礎上的一個封裝,代碼的可讀性加強。
class Orch2 : public Orch { public: Orch2(DBConnector *db, const std::string& tableName, Request& request, int pri=default_orch_pri) : Orch(db, tableName, pri), request_(request) { } protected: virtual void doTask(Consumer& consumer); virtual bool addOperation(const Request& request)=0; virtual bool delOperation(const Request& request)=0; private: Request& request_; };
void Orch2::doTask(Consumer &consumer) { SWSS_LOG_ENTER(); auto it = consumer.m_toSync.begin(); while (it != consumer.m_toSync.end()) { bool erase_from_queue = true; try { request_.parse(it->second); auto op = request_.getOperation(); if (op == SET_COMMAND) { erase_from_queue = addOperation(request_); } else if (op == DEL_COMMAND) { erase_from_queue = delOperation(request_); } else { SWSS_LOG_ERROR("Wrong operation. Check RequestParser: %s", op.c_str()); } } catch (const std::invalid_argument& e) { SWSS_LOG_ERROR("Parse error: %s", e.what()); } catch (const std::logic_error& e) { SWSS_LOG_ERROR("Logic error: %s", e.what()); } catch (const std::exception& e) { SWSS_LOG_ERROR("Exception was catched in the request parser: %s", e.what()); } catch (...) { SWSS_LOG_ERROR("Unknown exception was catched in the request parser"); } request_.clear(); //執行成功,那麼從m_tosync中刪除,不然執行下一個task if (erase_from_queue) { it = consumer.m_toSync.erase(it); } else { ++it; } } }