基於redis的發佈--訂閱,sonic實現了兩套消息傳遞系統:c++
"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8" #在集合INTF_TABLE_KEY_SET中增長一個key "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" #在hash表INTF_TABLE:PortChannel1:1.1.1.1/8中添加內容 "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4" "PUBLISH" "INTF_TABLE_CHANNEL" "G" #通知訂閱者頻道INTF_TABLE_CHANNEL有消息,訂閱者根據INTF_TABLE_組合成INTF_TABLE_KEY_SET獲取key,而後根據key獲取hash表_INTF_TABLE:PortChannel1:1.1.1.1/8的內容,若是該內容爲空則表示刪除操做,不然表示SET操做。
"LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"1.1.1.0/24\",\"switch_id\":\"oid:0x21000000000000\",\"table_id\":\"oid:0x0\",\"vr\":\"oid:0x3000000000043\"}" "[\"SAI_ROUTE_ENTRY_ATTR_PACKET_ACTION\",\"SAI_PACKET_ACTION_FORWARD\",\"SAI_ROUTE_ENTRY_ATTR_NEXT_HOP_ID\",\"oid:0x600000000063a\"]" "Screate" "PUBLISH" "ASIC_STATE_CHANNEL" "G" #通知訂閱者進行消息處理,循環處理消息,一次必須從鏈表中拿出三個key。
class ConsumerTableBase: public TableConsumable, public RedisTransactioner { public: const int POP_BATCH_SIZE; ConsumerTableBase(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); ~ConsumerTableBase() override = default; void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX); void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX); protected: std::deque<KeyOpFieldsValuesTuple> m_buffer; };
class TableName_KeySet { private: std::string m_key; public: TableName_KeySet(const std::string &tableName) : m_key(tableName + "_KEY_SET") { } std::string getKeySetName() const { return m_key; } }; class ProducerStateTable : public TableBase, public TableName_KeySet { public: ProducerStateTable(DBConnector *db, const std::string &tableName); ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false); ~ProducerStateTable(); void setBuffered(bool buffered); /* Implements set() and del() commands using notification messages */ virtual void set(const std::string &key, const std::vector<FieldValueTuple> &values, const std::string &op = SET_COMMAND, const std::string &prefix = EMPTY_PREFIX); virtual void del(const std::string &key, const std::string &op = DEL_COMMAND, const std::string &prefix = EMPTY_PREFIX); void flush(); private: bool m_buffered; bool m_pipeowned; RedisPipeline *m_pipe; std::string m_shaSet; std::string m_shaDel; std::string m_shaHmSet; std::string m_shaMod; };
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered) : TableBase(pipeline->getDbId(), tableName) , TableName_KeySet(tableName) , m_buffered(buffered) , m_pipeowned(false) , m_pipe(pipeline) { string luaSet = "redis.call('SADD', KEYS[2], ARGV[2])\n" "for i = 0, #KEYS - 3 do\n" " redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n" "end\n" "redis.call('PUBLISH', KEYS[1], ARGV[1])\n"; m_shaSet = m_pipe->loadRedisScript(luaSet); string luaDel = "redis.call('SADD', KEYS[2], ARGV[2])\n" "redis.call('DEL', KEYS[3])\n" "redis.call('PUBLISH', KEYS[1], ARGV[1])\n"; m_shaDel = m_pipe->loadRedisScript(luaDel); }
構造函數加載了幾個lua腳本,分別用於hset,del,hmset,以及用於修改的hmset。redis
set操做封裝shell
void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op /*= SET_COMMAND*/, const string &prefix) { // Assembly redis command args into a string vector vector<string> args; args.emplace_back("EVALSHA");// if(0 == op.compare(HMSET_COMMAND)){ args.emplace_back(m_shaHmSet); } else if(0 == op.compare(MOD_COMMAND)) { args.emplace_back(m_shaMod); } else { args.emplace_back(m_shaSet); } //添加頻道名稱和key名字 args.emplace_back(to_string(values.size() + 2)); args.emplace_back(getChannelName()); args.emplace_back(getKeySetName()); args.insert(args.end(), values.size(), getKeyName(key)); //添加一個G做爲參數 args.emplace_back("G"); args.emplace_back(key); for (const auto& iv: values) { args.emplace_back(fvField(iv)); args.emplace_back(fvValue(iv)); } // Transform data structure vector<const char *> args1; transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } ); // Invoke redis command // 構造redis命令 RedisCommand command; command.formatArgv((int)args1.size(), &args1[0], NULL); //壓入redis命令 m_pipe->push(command, REDIS_REPLY_NIL); //若是啓用了pipeline,則積累命令,一同發送,提高傳輸效率 if (!m_buffered) { m_pipe->flush(); } }
void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix) { // Assembly redis command args into a string vector vector<string> args; args.emplace_back("EVALSHA"); args.emplace_back(m_shaDel); args.emplace_back("3"); args.emplace_back(getChannelName()); args.emplace_back(getKeySetName()); args.emplace_back(getKeyName(key)); args.emplace_back("G"); args.emplace_back(key); args.emplace_back("''"); // Transform data structure vector<const char *> args1; transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } ); // Invoke redis command RedisCommand command; command.formatArgv((int)args1.size(), &args1[0], NULL); m_pipe->push(command, REDIS_REPLY_NIL); if (!m_buffered) { m_pipe->flush(); } }
class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet { public: ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get multiple pop elements */ void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX); };
ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri) , TableName_KeySet(tableName) { for (;;) { RedisReply watch(m_db, "WATCH " + getKeySetName(), REDIS_REPLY_STATUS); watch.checkStatusOK(); multi(); enqueue(std::string("SCARD ") + getKeySetName(), REDIS_REPLY_INTEGER); //訂閱頻道,精確訂閱,訂閱的頻道爲:表名_CHANNEL,例如ROUTE_TABLE_CHANNEL subscribe(m_db, getChannelName()); bool succ = exec(); if (succ) break; } RedisReply r(dequeueReply()); setQueueLength(r.getReply<long long int>()); }
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/) { //使用redis的lua腳本引擎進行操做,腳本爲consumer_state_table_pops.lua static std::string luaScript = loadLuaScript("consumer_state_table_pops.lua"); //加載腳本,返回的是腳本的sha static std::string sha = loadRedisScript(m_db, luaScript); //構建腳本命令 RedisCommand command; command.format( "EVALSHA %s 2 %s %s: %d ''", sha.c_str(), getKeySetName().c_str(), getTableName().c_str(), POP_BATCH_SIZE); //執行命令 RedisReply r(m_db, command); auto ctx0 = r.getContext(); vkco.clear(); // if the set is empty, return an empty kco object if (ctx0->type == REDIS_REPLY_NIL) { return; } //處理返回的結果 assert(ctx0->type == REDIS_REPLY_ARRAY); size_t n = ctx0->elements; vkco.resize(n); for (size_t ie = 0; ie < n; ie++) { auto& kco = vkco[ie]; auto& values = kfvFieldsValues(kco); assert(values.empty()); auto& ctx = ctx0->element[ie]; assert(ctx->elements == 2); assert(ctx->element[0]->type == REDIS_REPLY_STRING); std::string key = ctx->element[0]->str; kfvKey(kco) = key; assert(ctx->element[1]->type == REDIS_REPLY_ARRAY); auto ctx1 = ctx->element[1]; for (size_t i = 0; i < ctx1->elements / 2; i++) { FieldValueTuple e; fvField(e) = ctx1->element[i * 2]->str; fvValue(e) = ctx1->element[i * 2 + 1]->str; values.push_back(e); } // if there is no field-value pair, the key is already deleted // 若是沒有對應的域值對,則表示刪除,存在則表示添加或者更新,同一使用SET命令 if (values.empty()) { kfvOp(kco) = DEL_COMMAND; } else { kfvOp(kco) = SET_COMMAND; } } }
.srcsonic-swss-commoncommonconsumer_state_table_pops.lua數據庫
local ret = {} --- 使用SPOP命令從set中彈出keys local keys = redis.call('SPOP', KEYS[1], ARGV[1]) -- 獲取key中鍵的個數 local n = table.getn(keys) -- 遍歷每個key for i = 1, n do local key = keys[i] -- 使用HGETALL命令獲取KEYS[2] .. key local values = redis.call('HGETALL', KEYS[2] .. key) -- 在ret table中添加key:values鍵值對,values是一個hash表的值,多個鍵值對 table.insert(ret, {key, values}) end return ret
ProducerStateTable和ConsumerStateTable是以app_db爲數據庫構建的一個消息發佈--訂閱機制,二者是單向,無應答。生產者是mgr,xxx_sync,agent。消費者是orchagent。json
class TableName_KeyValueOpQueues { private: std::string m_keyvalueop; public: TableName_KeyValueOpQueues(const std::string &tableName) : m_keyvalueop(tableName + "_KEY_VALUE_OP_QUEUE") { } std::string getKeyValueOpQueueTableName() const { return m_keyvalueop; } }; class ProducerTable : public TableBase, public TableName_KeyValueOpQueues { public: ProducerTable(DBConnector *db, const std::string &tableName); ProducerTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false); ProducerTable(DBConnector *db, const std::string &tableName, const std::string &dumpFile); virtual ~ProducerTable(); void setBuffered(bool buffered); /* Implements set() and del() commands using notification messages */ virtual void set(const std::string &key, const std::vector<FieldValueTuple> &values, const std::string &op = SET_COMMAND, const std::string &prefix = EMPTY_PREFIX); virtual void del(const std::string &key, const std::string &op = DEL_COMMAND, const std::string &prefix = EMPTY_PREFIX); void flush(); private: /* Disable copy-constructor and operator = */ ProducerTable(const ProducerTable &other); ProducerTable & operator = (const ProducerTable &other); std::ofstream m_dumpFile; bool m_firstItem = true; bool m_buffered; bool m_pipeowned; RedisPipeline *m_pipe; std::string m_shaEnque; void enqueueDbChange(const std::string &key, const std::string &value, const std::string &op, const std::string &prefix); };
構造函數,構造一個lua腳本luaEnque,該腳本用於發佈消息。服務器
ProducerTable::ProducerTable(RedisPipeline *pipeline, const string &tableName, bool buffered) : TableBase(pipeline->getDbId(), tableName) , TableName_KeyValueOpQueues(tableName) , m_buffered(buffered) , m_pipeowned(false) , m_pipe(pipeline) { /* * KEYS[1] : tableName + "_KEY_VALUE_OP_QUEUE example :ASIC_STATE_KEY_VALUE_OP_QUEUE * ARGV[1] : key * ARGV[2] : value * ARGV[3] : op * KEYS[2] : tableName + "_CHANNEL" * ARGV[4] : "G" */ string luaEnque = "redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);" "redis.call('PUBLISH', KEYS[2], ARGV[4]);"; m_shaEnque = m_pipe->loadRedisScript(luaEnque); }
執行腳本函數:app
void ProducerTable::enqueueDbChange(const string &key, const string &value, const string &op, const string& /* prefix */) { RedisCommand command; command.format( "EVALSHA %s 2 %s %s %s %s %s %s", m_shaEnque.c_str(), getKeyValueOpQueueTableName().c_str(), getChannelName().c_str(), key.c_str(), value.c_str(), op.c_str(), "G"); m_pipe->push(command, REDIS_REPLY_NIL); }
void ProducerTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op, const string &prefix) { if (m_dumpFile.is_open()) { if (!m_firstItem) m_dumpFile << "," << endl; else m_firstItem = false; json j; string json_key = getKeyName(key); j[json_key] = json::object(); for (const auto &it : values) j[json_key][fvField(it)] = fvValue(it); j["OP"] = op; m_dumpFile << j.dump(4); } //發送lua腳本命令 enqueueDbChange(key, JSon::buildJson(values), "S" + op, prefix); // Only buffer continuous "set/set" or "del" operations if (!m_buffered || (op != "set" && op != "bulkset" )) { m_pipe->flush(); } }
void ProducerTable::del(const string &key, const string &op, const string &prefix) { if (m_dumpFile.is_open()) { if (!m_firstItem) m_dumpFile << "," << endl; else m_firstItem = false; json j; string json_key = getKeyName(key); j[json_key] = json::object(); j["OP"] = op; m_dumpFile << j.dump(4); } enqueueDbChange(key, "{}", "D" + op, prefix); if (!m_buffered) { m_pipe->flush(); } }
class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues { public: ConsumerTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get multiple pop elements */ void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX); };
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri) , TableName_KeyValueOpQueues(tableName) { for (;;) { RedisReply watch(m_db, string("WATCH ") + getKeyValueOpQueueTableName(), REDIS_REPLY_STATUS); watch.checkStatusOK(); multi(); enqueue(string("LLEN ") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER); //訂閱頻道 subscribe(m_db, getChannelName()); enqueue(string("LLEN ") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER); bool succ = exec(); if (succ) break; } RedisReply r(dequeueReply()); long long int len = r.getReply<long long int>(); //Key, Value and OP are in one list, they are processed in one shot setQueueLength(len/3); }
//使用lua腳本:consumer_table_pops.lua從數據庫中獲取消息ide
void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix) { static std::string luaScript = loadLuaScript("consumer_table_pops.lua"); static string sha = loadRedisScript(m_db, luaScript); RedisCommand command; command.format( "EVALSHA %s 2 %s %s %d ''", sha.c_str(), getKeyValueOpQueueTableName().c_str(), (prefix+getTableName()).c_str(), POP_BATCH_SIZE); RedisReply r(m_db, command, REDIS_REPLY_ARRAY); auto ctx0 = r.getContext(); vkco.clear(); // if the set is empty, return an empty kco object if (r.getContext()->type == REDIS_REPLY_NIL) { return; } assert(ctx0->type == REDIS_REPLY_ARRAY); size_t n = ctx0->elements; vkco.resize(n); for (size_t ie = 0; ie < n; ie++) { auto& kco = vkco[ie]; auto& values = kfvFieldsValues(kco); assert(values.empty()); auto& ctx = ctx0->element[ie]; string key = ctx->element[0]->str; kfvKey(kco) = key; string op = ctx->element[1]->str; kfvOp(kco) = op; for (size_t i = 2; i < ctx->elements; i += 2) { if (i+1 >= ctx->elements) { SWSS_LOG_ERROR("invalid number of elements in returned table: %lu >= %lu", i+1, ctx->elements); throw runtime_error("invalid number of elements in returned table"); } FieldValueTuple e; fvField(e) = ctx->element[i+0]->str; fvValue(e) = ctx->element[i+1]->str; values.push_back(e); } } }
consumer_table_pops.lua與consumer_state_table_pops.lua最大的不一樣是。consumer_table_pops.lua採用的是有序鏈表,consumer_state_table_pops.lua用的是set。consumer_table_pops.lua對業務順序比較關心,syncd沒有業務邏輯,只是簡單的執行sai調用,對硬件的操做是有時序的,必須嚴格按照時序進行,不然系統沒法正常運轉,而orchagent則是軟件層面,它能夠等待事件所需條件都知足後才進行動做。另一點不一樣的是,對於刪除操做,consumer_state_table_pops.lua不負責刪除數據庫的內容,而是由生產者負責清除。consumer_table_pops.lua必須本身清除數據庫的刪除操做。函數
local rets = {} -- pop Key, Value and OP together. local popsize = ARGV[1] * 3 -- 從尾部開始取數據,嚴格按照時序進行 local keys = redis.call('LRANGE', KEYS[1], -popsize, -1) -- 將提取出來的key從list中刪除 redis.call('LTRIM', KEYS[1], 0, -popsize-1) -- 獲取提取的key的個數,key三個爲一組,分別構成消息的key,value,op,其中op的第一個字母爲前綴 local n = table.getn(keys) for i = n, 1, -3 do -- 三組一個處理消息 local op = keys[i-2] local value = keys[i-1] local key = keys[i] local dbop = op:sub(1,1) op = op:sub(2) local ret = {key, op} -- 解碼value,還原成鍵值對 local jj = cjson.decode(value) local size = #jj for idx=1,size,2 do table.insert(ret, jj[idx]) table.insert(ret, jj[idx+1]) end table.insert(rets, ret) if op == 'bulkset' or op == 'bulkcreate' then -- key is "OBJECT_TYPE:num", extract object type from key key = key:sub(1, string.find(key, ':') - 1) local len = #ret local st = 3 -- since 1 and 2 is key/op while st <= len do local field = ret[st] -- keyname is ASIC_STATE : OBJECT_TYPE : OBJECT_ID local keyname = KEYS[2] .. ':' .. key .. ':' .. field -- value can be multiple a=v|a=v|... we need to split using gmatch local vars = ret[st+1] for value in string.gmatch(vars,'([^|]+)') do local attr = value:sub(1, string.find(value, '=') - 1) local val = value.sub(value, string.find(value, '=') + 1) redis.call('HSET', keyname, attr, val) end st = st + 2 end elseif op ~= 'flush' and op ~= 'flushresponse' and op ~= 'get' and op ~= 'getresponse' and op ~= 'notify' then local keyname = KEYS[2] .. ':' .. key if key == '' then keyname = KEYS[2] end --刪除命令,還要負責刪除數據庫,這就是在數據庫中爲何咱們看不到信息了 if dbop == 'D' then redis.call('DEL', keyname) else -- 對於添加,還須要將解碼後的鍵值對,從新以hash表的方式插回去,這就是咱們在redis數據庫中看到的是hash local st = 3 local len = #ret while st <= len do redis.call('HSET', keyname, ret[st], ret[st+1]) st = st + 2 end end end end return rets
ProducerTable和ConsumerTable用於orchagent和syncd之間的生產者--消費者工具,以asic_db爲數據庫。工具
agent寫入app-db [0 unix:/var/run/redis/redis.sock] "EVALSHA" "433b5d51dc97a94f3b084255db05473699f3873a" "12" "VRF_TABLE_CHANNEL" "VRF_TABLE_KEY_SET" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "G" "router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr" [0 lua] "SADD" "VRF_TABLE_KEY_SET" "router-3-2" [0 lua] "HMSET" "VRF_TABLE:router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr" [0 lua] "PUBLISH" "VRF_TABLE_CHANNEL" "G" orchagent從app-db讀出數據 [0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "VRF_TABLE_KEY_SET" "VRF_TABLE:" "8192" "''" [0 lua] "SPOP" "VRF_TABLE_KEY_SET" "8192" [0 lua] "HGETALL" "VRF_TABLE:router-3-2" [0 lua] "HGETALL" "VRF_TABLE:router-3-1" #orchagent寫asic-db,value會進行編碼 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate" "G" [1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate" [1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G" #syncd監聽後,調用腳本進行解析 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''" [1 lua] "LRANGE" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1" [1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385" #decode後從新寫回數據庫 [1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE" "true" [1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE" "false" [1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS" "6C:AE:8B:52:D8:66"
#咱們以以下數據爲例進行說明: "SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24" "DEL" "ROUTE_TABLE:10.254.31.0/24" "PUBLISH" "ROUTE_TABLE_CHANNEL" "G" #在數據庫app_db中寫入刪除路由表項10.254.31.0/24數據 1543393647.059780 [0 127.0.0.1:33859] "SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24" 1543393647.066503 [0 127.0.0.1:33859] "DEL" "ROUTE_TABLE:10.254.31.0/24" 1543393647.090635 [0 127.0.0.1:33859] "PUBLISH" "ROUTE_TABLE_CHANNEL" "G" #orchagent做爲客戶端經過訂閱頻道ROUTE_TABLE_CHANNEL,服務器通知了orchagent,執行腳本consumer_state_table_pops.lua #計算sha1sum consumer_state_table_pops.lua dc95f6831ed81200867b02a0ff2f8a25db9d6540,可見執行的是lua腳本consumer_state_table_pops.lua #該腳本循環取出全部key的hash屬性,一次最多取出8192個key 1543393647.090864 [0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "ROUTE_TABLE_KEY_SET" "ROUTE_TABLE:" "8192" "''" 1543393647.090934 [0 lua] "SPOP" "ROUTE_TABLE_KEY_SET" "8192" 1543393647.090997 [0 lua] "HGETALL" "ROUTE_TABLE:10.254.31.0/24" #orchagent獲取內容後,將hash表數據進行序列化,即將hash類型轉換爲列表類型,做爲生產者調用腳本 #"redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);" #"redis.call('PUBLISH', KEYS[2], ARGV[4]);" #將內容寫入數據庫1(asic-db)) 1543393647.091399 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove" "G" 1543393647.091510 [1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove" 1543393647.091591 [1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G" #syncd訂閱了ASIC_STATE_CHANNEL頻道,獲得了通知,執行腳本consumer_table_pops.lua #sha1sum usr/share/swss/consumer_table_pops.lua #22e4ca16cf37e220c92df33903a0e95585ab76fd usr/share/swss/consumer_table_pops.lua 1543393647.091734 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''" 1543393647.091856 [1 lua] "LRANGE" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1" 1543393647.091889 [1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385" 1543393647.091950 [1 lua] "DEL" "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}"