sonic orch調度系統之----orch

​ sonic orchagent線程的調度最小單位是Consumer。Consumer是在epoll事件Selectable的基礎上的進一步封裝,每一次發生epoll事件會觸發orchagent進行一次調度。orch是資源的集合,一個orch能夠包含多個Consumer,好比acl orch會監聽多個redistable。c++

class Executor

// Design assumption
// 1. one Orch can have one or more Executor
// 2. one Executor must belong to one and only one Orch
// 3. Executor will hold an pointer to new-ed selectable, and delete it during dtor
// 設計假設:
// 1. 一個orch能夠擁有一個或者多個Executor
// 2. 一個Executor必須屬於一個orch並且僅僅屬於一個orch
// 3. Executor有一個指針指向一個new出來的Selectable結構,必須在析構函數中將其刪除,不然會泄漏
class Executor : public Selectable
{
public:
    Executor(Selectable *selectable, Orch *orch)
        : m_selectable(selectable)
        , m_orch(orch)
    {
    }

    virtual ~Executor() { delete m_selectable; }

    // Decorating Selectable
    int getFd() override { return m_selectable->getFd(); }
    void readData() override { m_selectable->readData(); }
    bool hasCachedData() override { return m_selectable->hasCachedData(); }
    bool initializedWithData() override { return m_selectable->initializedWithData(); }
    void updateAfterRead() override { m_selectable->updateAfterRead(); }
    Orch * getorch()   { return m_orch; }
    // Disable copying
    Executor(const Executor&) = delete;
    Executor& operator=(const Executor&) = delete;

    // Execute on event happening
    // execute執行事件,drain是一個輔助函數
    virtual void execute() { }
    virtual void drain() { }

protected:
    Selectable *m_selectable;//指向new出來的Selectable
    Orch *m_orch;//指向一個orch

    // Get the underlying selectable 獲取指向的Selectable
    Selectable *getSelectable() const { return m_selectable; }
};

class Executor只是一箇中間的派生類,orch直接使用的是class Consumer和class ExecutableTimer。redis

class Consumer

消費者類通常用於處理app_db的訂閱事件,對於asic_db通常是處理syncd的應答事件。數據庫

typedef std::pair<std::string, std::string> FieldValueTuple;
#define fvField std::get<0>
#define fvValue std::get<1>
typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple;
#define kfvKey    std::get<0>
#define kfvOp     std::get<1>
#define kfvFieldsValues std::get<2>
typedef map<string, KeyOpFieldsValuesTuple> SyncMap;

class Consumer : public Executor {
public:
    Consumer(TableConsumable *select, Orch *orch)
        : Executor(select, orch)
    {
    }

    TableConsumable *getConsumerTable() const
    {
        return static_cast<TableConsumable *>(getSelectable());
    }

    string getTableName() const
    {
        return getConsumerTable()->getTableName();
    }
    // 事物執行
    void execute();
    void drain();

    /* Store the latest 'golden' status */
    // TODO: hide?
    SyncMap m_toSync;
};

void Consumer::execute()

epoll事件觸發後,須要調用該函數從數據庫中讀取出指定key的內容,將其加工後存放在m_toSync中,供後續處理。json

void Consumer::execute()
{
    SWSS_LOG_ENTER();

    std::deque<KeyOpFieldsValuesTuple> entries;
    //調用pops函數,從redis數據庫中讀取數據,返回KeyOpFieldsValuesTuple結構
    getConsumerTable()->pops(entries);

    /* Nothing popped */
    if (entries.empty())
    {
        return;
    }
    // 遍歷每個事件
    for (auto& entry: entries)
    {
        string key = kfvKey(entry);
        string op  = kfvOp(entry);

        /* Record incoming tasks 記錄事件 */
        if (gSwssRecord)
        {
            Orch::recordTuple(*this, entry);
        }

        /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
        // 在這裏進行一次合併,對於刪除事件,直接覆蓋
        if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND)
        {
           m_toSync[key] = entry;
        }
        /* If an old task is still there, we combine the old task with new task */
        /*  */
        else
        {
            KeyOpFieldsValuesTuple existing_data = m_toSync[key];

            auto new_values = kfvFieldsValues(entry);
            auto existing_values = kfvFieldsValues(existing_data);

            //遍歷每個新的值
            for (auto it : new_values)
            {
                string field = fvField(it);
                string value = fvValue(it);

                auto iu = existing_values.begin();
                while (iu != existing_values.end())//遍歷每個舊的值
                {
                    string ofield = fvField(*iu);
                    if (field == ofield)//相同的域,將老的值覆蓋,這裏應該跳出while,代碼效率較差
                        iu = existing_values.erase(iu);
                    else
                        iu++;
                }
                /* 將新的值添加進去 */
                existing_values.push_back(FieldValueTuple(field, value));
            }
            m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
        }
    }
    //執行全部整理好的任務。
    drain();
}

假設有一個task的鍵值對以下:app

key=test;op=set;value={
    A:a,
    B:b,
    C:c,
}

第一次觸發任務是在APP_DB中寫入了:ide

key=test;op=set;value={
    A:a,
    B:b
}

加入orchagent只是將該任務讀取到了m_toSync中,因爲某種緣由沒有執行完該任務,依然駐留在m_toSync中。第二次寫入了:函數

key=test;op=set;value={
    B:b1,
    C:c
}

那麼通過execute函數後m_toSync中將會是:ui

key=test;op=set;value={
    A:a,
    B:b1,
    C:c
}

void Consumer::drain()

執行m_toSync中的任務。this

void Consumer::drain()
{
    if (!m_toSync.empty())
        m_orch->doTask(*this);
}

class Orch

class Orch
{
public:
    //每一個orch都會鏈接到數據庫,以及其須要訂閱的表名,和訂閱該表產生的事件的優先級
    
    //以默認優先級訂閱一個table
    Orch(DBConnector *db, const string tableName, int pri = default_orch_pri);
    //以默認優先級訂閱多個table
    Orch(DBConnector *db, const vector<string> &tableNames);
    //訂閱多個table,指明每一個table的優先級
    Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNameWithPri);
    //鏈接多個數據庫
    Orch(const vector<TableConnector>& tables);
    virtual ~Orch();
    // 獲取該orch的全部epoll事件
    vector<Selectable*> getSelectables();

    /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
    // 執行該orch中全部的consumers中的m_sync中的任務
    void doTask();

    /* Run doTask against a specific executor */
    // 任務的來源能夠是consumer,NotificationConsumer,SelectableTimer
    virtual void doTask(Consumer &consumer) = 0;
    virtual void doTask(NotificationConsumer &consumer) { }
    virtual void doTask(SelectableTimer &timer) { }

    /* TODO: refactor recording */
    static void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
protected:
    // 消費者map,一個orch能夠訂閱多個table,key爲tableName,value爲Executor
    ConsumerMap m_consumerMap;
    // 與調試相關
    static void logfileReopen();
    string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
    ref_resolve_status resolveFieldRefValue(type_map&, const string&, KeyOpFieldsValuesTuple&, sai_object_id_t&);
    bool parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uint32_t &range_high);
    bool parseReference(type_map &type_maps, string &ref, string &table_name, string &object_name);
    ref_resolve_status resolveFieldRefArray(type_map&, const string&, KeyOpFieldsValuesTuple&, vector<sai_object_id_t>&);

    /* Note: consumer will be owned by this class */
    // 內部函數添加一個Executor,給addConsumer使用
    void addExecutor(string executorName, Executor* executor);
    Executor *getExecutor(string executorName);
private:
    // 添加一個消費者
    void addConsumer(DBConnector *db, string tableName, int pri = default_orch_pri);
};

void Orch::addConsumer(......)

void Orch::addExecutor(string executorName, Executor* executor)
{
    m_consumerMap.emplace(std::piecewise_construct,
            std::forward_as_tuple(executorName),
            std::forward_as_tuple(executor));
}
//添加一個消費者
void Orch::addConsumer(DBConnector *db, string tableName, int pri)
{
    if (db->getDbId() == CONFIG_DB || db->getDbId() == STATE_DB)
    {
        addExecutor(tableName, new Consumer(new SubscriberStateTable(db, tableName, TableConsumable::DEFAULT_POP_BATCH_SIZE, pri), this));
    }
    else
    {
        addExecutor(tableName, new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this));
    }
}

void Orch::doTask(......)

執行本orch中的每個消費者m_toSync中的task,無論該task是否本次從redis中讀取仍是之前未處理完畢的。線程

void Orch::doTask()
{
    for(auto &it : m_consumerMap)
    {
        it.second->drain();
    }
}

class Orch2

orch2是在orch的基礎上的一個封裝,代碼的可讀性加強。

class Orch2 : public Orch
{
public:
    Orch2(DBConnector *db, const std::string& tableName, Request& request, int pri=default_orch_pri)
        : Orch(db, tableName, pri), request_(request)
    {
    }

protected:
    virtual void doTask(Consumer& consumer);

    virtual bool addOperation(const Request& request)=0;
    virtual bool delOperation(const Request& request)=0;

private:
    Request& request_;
};

void Orch2::doTask

void Orch2::doTask(Consumer &consumer)
{
    SWSS_LOG_ENTER();

    auto it = consumer.m_toSync.begin();
    while (it != consumer.m_toSync.end())
    {
        bool erase_from_queue = true;
        try
        {
            request_.parse(it->second);

            auto op = request_.getOperation();
            if (op == SET_COMMAND)
            {
                erase_from_queue = addOperation(request_);
            }
            else if (op == DEL_COMMAND)
            {
                erase_from_queue = delOperation(request_);
            }
            else
            {
                SWSS_LOG_ERROR("Wrong operation. Check RequestParser: %s", op.c_str());
            }
        }
        catch (const std::invalid_argument& e)
        {
            SWSS_LOG_ERROR("Parse error: %s", e.what());
        }
        catch (const std::logic_error& e)
        {
            SWSS_LOG_ERROR("Logic error: %s", e.what());
        }
        catch (const std::exception& e)
        {
            SWSS_LOG_ERROR("Exception was catched in the request parser: %s", e.what());
        }
        catch (...)
        {
            SWSS_LOG_ERROR("Unknown exception was catched in the request parser");
        }
        request_.clear();
        //執行成功,那麼從m_tosync中刪除,不然執行下一個task
        if (erase_from_queue)
        {
            it = consumer.m_toSync.erase(it);
        }
        else
        {
            ++it;
        }
    }
}
相關文章
相關標籤/搜索