sonic消息傳遞機制與架構(2)

發佈-訂閱機制實現

基於redis的發佈--訂閱,sonic實現了兩套消息傳遞系統:c++

  • KEY_SET消息系統:該機制經過一個set集合傳遞key,經過publish命令通知有新的key產生。消費者經過key組合成一個hash表的key,用於獲取真實的消息,set不保證順序。樣例以下所示:
"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8"        #在集合INTF_TABLE_KEY_SET中增長一個key
 "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" #在hash表INTF_TABLE:PortChannel1:1.1.1.1/8中添加內容
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
 "PUBLISH" "INTF_TABLE_CHANNEL" "G"     #通知訂閱者頻道INTF_TABLE_CHANNEL有消息,訂閱者根據INTF_TABLE_組合成INTF_TABLE_KEY_SET獲取key,而後根據key獲取hash表_INTF_TABLE:PortChannel1:1.1.1.1/8的內容,若是該內容爲空則表示刪除操做,不然表示SET操做。
  • KEY_VALUE_OP消息系統:該消息系統採用的是redis的list進行操做,嚴格保證操做順序。一次操做在LIST中壓入三個值,分別爲key,value,operate。其中的value是把一個hash表進行json編碼後造成了一個單一的字符串,因此訂閱者獲得消息後須要進行解碼還原,最後一個是操做類型。
"LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"1.1.1.0/24\",\"switch_id\":\"oid:0x21000000000000\",\"table_id\":\"oid:0x0\",\"vr\":\"oid:0x3000000000043\"}" "[\"SAI_ROUTE_ENTRY_ATTR_PACKET_ACTION\",\"SAI_PACKET_ACTION_FORWARD\",\"SAI_ROUTE_ENTRY_ATTR_NEXT_HOP_ID\",\"oid:0x600000000063a\"]" "Screate"
"PUBLISH" "ASIC_STATE_CHANNEL" "G"  #通知訂閱者進行消息處理,循環處理消息,一次必須從鏈表中拿出三個key。

消費者基類

class ConsumerTableBase: public TableConsumable, public RedisTransactioner
{
public:
    const int POP_BATCH_SIZE;

    ConsumerTableBase(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    ~ConsumerTableBase() override = default;

    void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX);

    void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX);

protected:

    std::deque<KeyOpFieldsValuesTuple> m_buffer;
};

KEY_SET消息系統

ProducerStateTable

class TableName_KeySet {
private:
    std::string m_key;
public:
    TableName_KeySet(const std::string &tableName)
        : m_key(tableName + "_KEY_SET")
    {
    }

    std::string getKeySetName() const { return m_key; }
};

class ProducerStateTable : public TableBase, public TableName_KeySet
{
public:
    ProducerStateTable(DBConnector *db, const std::string &tableName);
    ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
    ~ProducerStateTable();

    void setBuffered(bool buffered);
    /* Implements set() and del() commands using notification messages */
    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = SET_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    virtual void del(const std::string &key,
                     const std::string &op = DEL_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    void flush();

private:
    bool m_buffered;
    bool m_pipeowned;
    RedisPipeline *m_pipe;
    std::string m_shaSet;
    std::string m_shaDel;
    std::string m_shaHmSet;
    std::string m_shaMod;
};
ProducerStateTable::ProducerStateTable(......)
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
    : TableBase(pipeline->getDbId(), tableName)
    , TableName_KeySet(tableName)
    , m_buffered(buffered)
    , m_pipeowned(false)
    , m_pipe(pipeline)
{
    string luaSet =
        "redis.call('SADD', KEYS[2], ARGV[2])\n"
        "for i = 0, #KEYS - 3 do\n"
        "    redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
        "end\n"
        "redis.call('PUBLISH', KEYS[1], ARGV[1])\n";
    m_shaSet = m_pipe->loadRedisScript(luaSet);

    string luaDel =
        "redis.call('SADD', KEYS[2], ARGV[2])\n"
        "redis.call('DEL', KEYS[3])\n"
        "redis.call('PUBLISH', KEYS[1], ARGV[1])\n";
    m_shaDel = m_pipe->loadRedisScript(luaDel);
}

構造函數加載了幾個lua腳本,分別用於hset,del,hmset,以及用於修改的hmset。redis

void ProducerStateTable::set(......)

set操做封裝shell

void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
                 const string &op /*= SET_COMMAND*/, const string &prefix)
{
    // Assembly redis command args into a string vector
    vector<string> args;
    args.emplace_back("EVALSHA");//
    if(0 == op.compare(HMSET_COMMAND)){
        args.emplace_back(m_shaHmSet);
    }
    else if(0 == op.compare(MOD_COMMAND))
    {
        args.emplace_back(m_shaMod);
    }
    else
    {
        args.emplace_back(m_shaSet);
    }
    //添加頻道名稱和key名字
    args.emplace_back(to_string(values.size() + 2));
    args.emplace_back(getChannelName());
    args.emplace_back(getKeySetName());

    args.insert(args.end(), values.size(), getKeyName(key));
    //添加一個G做爲參數
    args.emplace_back("G");
    args.emplace_back(key);
    for (const auto& iv: values)
    {
        args.emplace_back(fvField(iv));
        args.emplace_back(fvValue(iv));
    }

    // Transform data structure
    vector<const char *> args1;
    transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

    // Invoke redis command
    // 構造redis命令
    RedisCommand command;
    command.formatArgv((int)args1.size(), &args1[0], NULL);
    //壓入redis命令
    m_pipe->push(command, REDIS_REPLY_NIL);
    //若是啓用了pipeline,則積累命令,一同發送,提高傳輸效率
    if (!m_buffered)
    {
        m_pipe->flush();
    }
}
void ProducerStateTable::del(......)
void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix)
{
    // Assembly redis command args into a string vector
    vector<string> args;
    args.emplace_back("EVALSHA");
    args.emplace_back(m_shaDel);
    args.emplace_back("3");
    args.emplace_back(getChannelName());
    args.emplace_back(getKeySetName());
    args.emplace_back(getKeyName(key));
    args.emplace_back("G");
    args.emplace_back(key);
    args.emplace_back("''");

    // Transform data structure
    vector<const char *> args1;
    transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

    // Invoke redis command
    RedisCommand command;
    command.formatArgv((int)args1.size(), &args1[0], NULL);
    m_pipe->push(command, REDIS_REPLY_NIL);
    if (!m_buffered)
    {
        m_pipe->flush();
    }
}

ConsumerStateTable

class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet
{
public:
    ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    /* Get multiple pop elements */
    void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
};
ConsumerStateTable::ConsumerStateTable(......)
ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize, int pri)
    : ConsumerTableBase(db, tableName, popBatchSize, pri)
    , TableName_KeySet(tableName)
{
    for (;;)
    {
        RedisReply watch(m_db, "WATCH " + getKeySetName(), REDIS_REPLY_STATUS);
        watch.checkStatusOK();
        multi();
        enqueue(std::string("SCARD ") + getKeySetName(), REDIS_REPLY_INTEGER);
        //訂閱頻道,精確訂閱,訂閱的頻道爲:表名_CHANNEL,例如ROUTE_TABLE_CHANNEL
        subscribe(m_db, getChannelName());
        bool succ = exec();
        if (succ) break;
    }

    RedisReply r(dequeueReply());
    setQueueLength(r.getReply<long long int>());
}
void ConsumerStateTable::pops(......)
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
    //使用redis的lua腳本引擎進行操做,腳本爲consumer_state_table_pops.lua
    static std::string luaScript = loadLuaScript("consumer_state_table_pops.lua");
    //加載腳本,返回的是腳本的sha
    static std::string sha = loadRedisScript(m_db, luaScript);
    //構建腳本命令
    RedisCommand command;
    command.format(
        "EVALSHA %s 2 %s %s: %d ''",
        sha.c_str(),
        getKeySetName().c_str(),
        getTableName().c_str(),
        POP_BATCH_SIZE);
    //執行命令
    RedisReply r(m_db, command);
    auto ctx0 = r.getContext();
    vkco.clear();

    // if the set is empty, return an empty kco object
    if (ctx0->type == REDIS_REPLY_NIL)
    {
        return;
    }
    //處理返回的結果
    assert(ctx0->type == REDIS_REPLY_ARRAY);
    size_t n = ctx0->elements;
    vkco.resize(n);
    for (size_t ie = 0; ie < n; ie++)
    {
        auto& kco = vkco[ie];
        auto& values = kfvFieldsValues(kco);
        assert(values.empty());

        auto& ctx = ctx0->element[ie];
        assert(ctx->elements == 2);
        assert(ctx->element[0]->type == REDIS_REPLY_STRING);
        std::string key = ctx->element[0]->str;
        kfvKey(kco) = key;

        assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
        auto ctx1 = ctx->element[1];
        for (size_t i = 0; i < ctx1->elements / 2; i++)
        {
            FieldValueTuple e;
            fvField(e) = ctx1->element[i * 2]->str;
            fvValue(e) = ctx1->element[i * 2 + 1]->str;
            values.push_back(e);
        }

        // if there is no field-value pair, the key is already deleted
        // 若是沒有對應的域值對,則表示刪除,存在則表示添加或者更新,同一使用SET命令
        if (values.empty())
        {
            kfvOp(kco) = DEL_COMMAND;
        }
        else
        {
            kfvOp(kco) = SET_COMMAND;
        }
    }
}
consumer_state_table_pops.lua

.srcsonic-swss-commoncommonconsumer_state_table_pops.lua數據庫

local ret = {}
--- 使用SPOP命令從set中彈出keys
local keys = redis.call('SPOP', KEYS[1], ARGV[1])
-- 獲取key中鍵的個數
local n = table.getn(keys)
-- 遍歷每個key
for i = 1, n do
   local key = keys[i]
    -- 使用HGETALL命令獲取KEYS[2] .. key
   local values = redis.call('HGETALL', KEYS[2] .. key)
   -- 在ret table中添加key:values鍵值對,values是一個hash表的值,多個鍵值對
   table.insert(ret, {key, values})
end
return ret

ProducerStateTable和ConsumerStateTable是以app_db爲數據庫構建的一個消息發佈--訂閱機制,二者是單向,無應答。生產者是mgr,xxx_sync,agent。消費者是orchagent。json

KEY_VALUE_OP消息系統

ProducerTable

class TableName_KeyValueOpQueues {
private:
    std::string m_keyvalueop;
public:
    TableName_KeyValueOpQueues(const std::string &tableName)
        : m_keyvalueop(tableName + "_KEY_VALUE_OP_QUEUE")
    {
    }

    std::string getKeyValueOpQueueTableName() const { return m_keyvalueop; }
};

class ProducerTable : public TableBase, public TableName_KeyValueOpQueues
{
public:
    ProducerTable(DBConnector *db, const std::string &tableName);
    ProducerTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
    ProducerTable(DBConnector *db, const std::string &tableName, const std::string &dumpFile);
    virtual ~ProducerTable();

    void setBuffered(bool buffered);

    /* Implements set() and del() commands using notification messages */

    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = SET_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    virtual void del(const std::string &key,
                     const std::string &op = DEL_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    void flush();

private:
    /* Disable copy-constructor and operator = */
    ProducerTable(const ProducerTable &other);
    ProducerTable & operator = (const ProducerTable &other);

    std::ofstream m_dumpFile;
    bool m_firstItem = true;
    bool m_buffered;
    bool m_pipeowned;
    RedisPipeline *m_pipe;
    std::string m_shaEnque;

    void enqueueDbChange(const std::string &key, const std::string &value, const std::string &op, const std::string &prefix);
};
ProducerTable::ProducerTable(......)

構造函數,構造一個lua腳本luaEnque,該腳本用於發佈消息。服務器

ProducerTable::ProducerTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
    : TableBase(pipeline->getDbId(), tableName)
    , TableName_KeyValueOpQueues(tableName)
    , m_buffered(buffered)
    , m_pipeowned(false)
    , m_pipe(pipeline)
{
    /*
     * KEYS[1] : tableName + "_KEY_VALUE_OP_QUEUE example :ASIC_STATE_KEY_VALUE_OP_QUEUE
     * ARGV[1] : key
     * ARGV[2] : value
     * ARGV[3] : op
     * KEYS[2] : tableName + "_CHANNEL"
     * ARGV[4] : "G"
     */
    string luaEnque =
        "redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
        "redis.call('PUBLISH', KEYS[2], ARGV[4]);";

    m_shaEnque = m_pipe->loadRedisScript(luaEnque);
}
void ProducerTable::enqueueDbChange(......)

執行腳本函數:app

void ProducerTable::enqueueDbChange(const string &key, const string &value, const string &op, const string& /* prefix */)
{
    RedisCommand command;

    command.format(
        "EVALSHA %s 2 %s %s %s %s %s %s",
        m_shaEnque.c_str(),
        getKeyValueOpQueueTableName().c_str(),
        getChannelName().c_str(),
        key.c_str(),
        value.c_str(),
        op.c_str(),
        "G");

    m_pipe->push(command, REDIS_REPLY_NIL);
}
void ProducerTable::set(......)
void ProducerTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op, const string &prefix)
{
    if (m_dumpFile.is_open())
    {
        if (!m_firstItem)
            m_dumpFile << "," << endl;
        else
            m_firstItem = false;

        json j;
        string json_key = getKeyName(key);
        j[json_key] = json::object();
        for (const auto &it : values)
            j[json_key][fvField(it)] = fvValue(it);
        j["OP"] = op;
        m_dumpFile << j.dump(4);
    }
    //發送lua腳本命令
    enqueueDbChange(key, JSon::buildJson(values), "S" + op, prefix);
    // Only buffer continuous "set/set" or "del" operations
    if (!m_buffered || (op != "set" && op != "bulkset" ))
    {
        m_pipe->flush();
    }
}
void ProducerTable::del(......)
void ProducerTable::del(const string &key, const string &op, const string &prefix)
{
    if (m_dumpFile.is_open())
    {
        if (!m_firstItem)
            m_dumpFile << "," << endl;
        else
            m_firstItem = false;

        json j;
        string json_key = getKeyName(key);
        j[json_key] = json::object();
        j["OP"] = op;
        m_dumpFile << j.dump(4);
    }

    enqueueDbChange(key, "{}", "D" + op, prefix);
    if (!m_buffered)
    {
        m_pipe->flush();
    }
}

ConsumerTable

class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues
{
public:
    ConsumerTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    /* Get multiple pop elements */
    void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
};
ConsumerTable::ConsumerTable(......)
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
    : ConsumerTableBase(db, tableName, popBatchSize, pri)
    , TableName_KeyValueOpQueues(tableName)
{
    for (;;)
    {
        RedisReply watch(m_db, string("WATCH ") + getKeyValueOpQueueTableName(), REDIS_REPLY_STATUS);
        watch.checkStatusOK();
        multi();
        enqueue(string("LLEN ") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
        //訂閱頻道
        subscribe(m_db, getChannelName());
        enqueue(string("LLEN ") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
        bool succ = exec();
        if (succ) break;
    }

    RedisReply r(dequeueReply());
    long long int len = r.getReply<long long int>();
    //Key, Value and OP are in one list, they are processed in one shot
    setQueueLength(len/3);
}
void ConsumerTable::pops(......)

//使用lua腳本:consumer_table_pops.lua從數據庫中獲取消息ide

void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix)
{
    static std::string luaScript = loadLuaScript("consumer_table_pops.lua");

    static string sha = loadRedisScript(m_db, luaScript);
    RedisCommand command;
    command.format(
        "EVALSHA %s 2 %s %s %d ''",
        sha.c_str(),
        getKeyValueOpQueueTableName().c_str(),
        (prefix+getTableName()).c_str(),
        POP_BATCH_SIZE);

    RedisReply r(m_db, command, REDIS_REPLY_ARRAY);

    auto ctx0 = r.getContext();
    vkco.clear();

    // if the set is empty, return an empty kco object
    if (r.getContext()->type == REDIS_REPLY_NIL)
    {
        return;
    }

    assert(ctx0->type == REDIS_REPLY_ARRAY);
    size_t n = ctx0->elements;
    vkco.resize(n);

    for (size_t ie = 0; ie < n; ie++)
    {
        auto& kco = vkco[ie];
        auto& values = kfvFieldsValues(kco);
        assert(values.empty());

        auto& ctx = ctx0->element[ie];
        string key = ctx->element[0]->str;
        kfvKey(kco) = key;
        string op  = ctx->element[1]->str;
        kfvOp(kco) = op;

        for (size_t i = 2; i < ctx->elements; i += 2)
        {
            if (i+1 >= ctx->elements)
            {
                SWSS_LOG_ERROR("invalid number of elements in returned table: %lu >= %lu", i+1, ctx->elements);
                throw runtime_error("invalid number of elements in returned table");
            }

            FieldValueTuple e;

            fvField(e) = ctx->element[i+0]->str;
            fvValue(e) = ctx->element[i+1]->str;
            values.push_back(e);
        }
    }
}
consumer_table_pops.lua

consumer_table_pops.lua與consumer_state_table_pops.lua最大的不一樣是。consumer_table_pops.lua採用的是有序鏈表,consumer_state_table_pops.lua用的是set。consumer_table_pops.lua對業務順序比較關心,syncd沒有業務邏輯,只是簡單的執行sai調用,對硬件的操做是有時序的,必須嚴格按照時序進行,不然系統沒法正常運轉,而orchagent則是軟件層面,它能夠等待事件所需條件都知足後才進行動做。另一點不一樣的是,對於刪除操做,consumer_state_table_pops.lua不負責刪除數據庫的內容,而是由生產者負責清除。consumer_table_pops.lua必須本身清除數據庫的刪除操做。函數

local rets = {}
-- pop Key, Value and OP together.
local popsize = ARGV[1] * 3
-- 從尾部開始取數據,嚴格按照時序進行
local keys   = redis.call('LRANGE', KEYS[1], -popsize, -1)
-- 將提取出來的key從list中刪除
redis.call('LTRIM', KEYS[1], 0, -popsize-1)
-- 獲取提取的key的個數,key三個爲一組,分別構成消息的key,value,op,其中op的第一個字母爲前綴
local n = table.getn(keys)
for i = n, 1, -3 do -- 三組一個處理消息
   local op = keys[i-2]
   local value = keys[i-1]
   local key = keys[i]
   local dbop = op:sub(1,1)
   op = op:sub(2)
   local ret = {key, op}
   -- 解碼value,還原成鍵值對
   local jj = cjson.decode(value)
   local size = #jj

   for idx=1,size,2 do
       table.insert(ret, jj[idx])
       table.insert(ret, jj[idx+1])
   end
   table.insert(rets, ret)

   if op == 'bulkset' or op == 'bulkcreate' then

-- key is "OBJECT_TYPE:num", extract object type from key
       key = key:sub(1, string.find(key, ':') - 1)

       local len = #ret
       local st = 3         -- since 1 and 2 is key/op
       while st <= len do
           local field = ret[st]
-- keyname is ASIC_STATE : OBJECT_TYPE : OBJECT_ID
           local keyname = KEYS[2] .. ':' .. key .. ':' .. field

-- value can be multiple a=v|a=v|... we need to split using gmatch
           local vars = ret[st+1]
           for value in string.gmatch(vars,'([^|]+)') do
               local attr = value:sub(1, string.find(value, '=') - 1)
               local val = value.sub(value, string.find(value, '=') + 1)
               redis.call('HSET', keyname, attr, val)
           end

           st = st + 2
       end

   elseif op ~= 'flush' and op ~= 'flushresponse' and op ~= 'get' and op ~= 'getresponse' and op ~= 'notify' then
       local keyname = KEYS[2] .. ':' .. key
       if key == '' then
           keyname = KEYS[2]
       end
       --刪除命令,還要負責刪除數據庫,這就是在數據庫中爲何咱們看不到信息了
       if dbop == 'D' then
           redis.call('DEL', keyname)
       else -- 對於添加,還須要將解碼後的鍵值對,從新以hash表的方式插回去,這就是咱們在redis數據庫中看到的是hash
           local st = 3
           local len = #ret
           while st <= len do
               redis.call('HSET', keyname, ret[st], ret[st+1])
               st = st + 2
           end
       end
   end
end

return rets

ProducerTable和ConsumerTable用於orchagent和syncd之間的生產者--消費者工具,以asic_db爲數據庫。工具

添加實例

agent寫入app-db
[0 unix:/var/run/redis/redis.sock] "EVALSHA" "433b5d51dc97a94f3b084255db05473699f3873a" "12" "VRF_TABLE_CHANNEL" "VRF_TABLE_KEY_SET" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "G" "router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr"
[0 lua] "SADD" "VRF_TABLE_KEY_SET" "router-3-2"
[0 lua] "HMSET" "VRF_TABLE:router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr"
[0 lua] "PUBLISH" "VRF_TABLE_CHANNEL" "G"

orchagent從app-db讀出數據
[0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "VRF_TABLE_KEY_SET" "VRF_TABLE:" "8192" "''"
[0 lua] "SPOP" "VRF_TABLE_KEY_SET" "8192"
[0 lua] "HGETALL" "VRF_TABLE:router-3-2"
[0 lua] "HGETALL" "VRF_TABLE:router-3-1"

#orchagent寫asic-db,value會進行編碼
[1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate" "G"
[1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate"
[1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G"

#syncd監聽後,調用腳本進行解析
[1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''"
[1 lua] "LRANGE" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1"
[1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385"
#decode後從新寫回數據庫
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE" "true"
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE" "false"
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS" "6C:AE:8B:52:D8:66"

刪除實例

#咱們以以下數據爲例進行說明:
"SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24"
"DEL" "ROUTE_TABLE:10.254.31.0/24"
"PUBLISH" "ROUTE_TABLE_CHANNEL" "G"

#在數據庫app_db中寫入刪除路由表項10.254.31.0/24數據
1543393647.059780 [0 127.0.0.1:33859] "SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24"
1543393647.066503 [0 127.0.0.1:33859] "DEL" "ROUTE_TABLE:10.254.31.0/24"
1543393647.090635 [0 127.0.0.1:33859] "PUBLISH" "ROUTE_TABLE_CHANNEL" "G"

#orchagent做爲客戶端經過訂閱頻道ROUTE_TABLE_CHANNEL,服務器通知了orchagent,執行腳本consumer_state_table_pops.lua  
#計算sha1sum consumer_state_table_pops.lua dc95f6831ed81200867b02a0ff2f8a25db9d6540,可見執行的是lua腳本consumer_state_table_pops.lua
#該腳本循環取出全部key的hash屬性,一次最多取出8192個key
1543393647.090864 [0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "ROUTE_TABLE_KEY_SET" "ROUTE_TABLE:" "8192" "''"
1543393647.090934 [0 lua] "SPOP" "ROUTE_TABLE_KEY_SET" "8192"
1543393647.090997 [0 lua] "HGETALL" "ROUTE_TABLE:10.254.31.0/24"

#orchagent獲取內容後,將hash表數據進行序列化,即將hash類型轉換爲列表類型,做爲生產者調用腳本
#"redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
#"redis.call('PUBLISH', KEYS[2], ARGV[4]);"
#將內容寫入數據庫1(asic-db))
1543393647.091399 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove" "G"
1543393647.091510 [1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove"
1543393647.091591 [1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G"

#syncd訂閱了ASIC_STATE_CHANNEL頻道,獲得了通知,執行腳本consumer_table_pops.lua
#sha1sum usr/share/swss/consumer_table_pops.lua
#22e4ca16cf37e220c92df33903a0e95585ab76fd  usr/share/swss/consumer_table_pops.lua
1543393647.091734 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''"
1543393647.091856 [1 lua] "LRANGE" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1"
1543393647.091889 [1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385"
1543393647.091950 [1 lua] "DEL" "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}"
相關文章
相關標籤/搜索