本章節主要分析sonic使用redis的鍵空間消息機制實現的消息傳遞框架,該機制區別於發佈-訂閱機制在於發佈者不須要進行pubulish通知,只要往數據庫中寫入指定的鍵,redis就會通知監聽了該鍵空間的客戶端。該機制目前只用於監聽config_db,用於監聽config的變化。而後將其同步到app_db。使用該機制的案例有:VlanMgr,IntfMgr,portsyncd等,能夠經過orch包裝使用,好比VlanMgr;也能夠直接定義SubscriberStateTable表,好比portCfg。c++
#在數據庫0中訂閱以tom開頭的鍵 127.0.0.1:6379> PSUBSCRIBE __keyspace@0__:tom* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__keyspace@0__:tom*" 3) (integer) 1 #在數據庫0中添加hash表tom 127.0.0.1:6379> HMSET tom|1 name tom age 28 OK #訂閱者獲得應答 1) "pmessage" 2) "__keyspace@0__:tom*" 3) "__keyspace@0__:tom|1" 4) "hset" #刪除key 127.0.0.1:6379> DEL tom|1 (integer) 1 127.0.0.1:6379> 1) "pmessage" 2) "__keyspace@0__:tom*" 3) "__keyspace@0__:tom|1" 4) "del"
class SubscriberStateTable : public ConsumerTableBase { public: SubscriberStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get all elements available */ void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX); /* Read keyspace event from redis */ void readData() override; bool hasCachedData() override; bool initializedWithData() override { return !m_buffer.empty(); } private: /* Pop keyspace event from event buffer. Caller should free resources. */ std::shared_ptr<RedisReply> popEventBuffer(); std::string m_keyspace; //全部應答存儲在該隊列中 std::deque<std::shared_ptr<RedisReply>> m_keyspace_event_buffer; Table m_table;//很是重要的一個成員,具體的 };
class Table : public TableBase, public TableEntryEnumerable { public: Table(DBConnector *db, const std::string &tableName); Table(RedisPipeline *pipeline, const std::string &tableName, bool buffered); ~Table() override; /* Set an entry in the DB directly (op not in use) */ virtual void set(const std::string &key, const std::vector<FieldValueTuple> &values, const std::string &op = "", const std::string &prefix = EMPTY_PREFIX); /* Delete an entry in the table */ virtual void del(const std::string &key, const std::string &op = "", const std::string &prefix = EMPTY_PREFIX); /* Read a value from the DB directly */ /* Returns false if the key doesn't exists */ virtual bool get(const std::string &key, std::vector<FieldValueTuple> &ovalues); void getKeys(std::vector<std::string> &keys); void setBuffered(bool buffered); void flush(); void dump(TableDump &tableDump); protected: bool m_buffered; bool m_pipeowned; RedisPipeline *m_pipe; /* Strip special symbols from keys used for type identification * Input example: * port@ * DB entry: * 1) "ports@" * 2) "Ethernet0,Ethernet4,... * */ std::string stripSpecialSym(const std::string &key); };
Table::Table(RedisPipeline *pipeline, const string &tableName, bool buffered) : TableBase(pipeline->getDbId(), tableName) , m_buffered(buffered) , m_pipeowned(false) , m_pipe(pipeline) { }
bool Table::get(const string &key, vector<FieldValueTuple> &values) { /* 127.0.0.1:6379[4]> HGETALL "VLAN|Vlan1000" 1) "vlanid" 2) "1000" 127.0.0.1:6379[4]> */ RedisCommand hgetall_key; hgetall_key.format("HGETALL %s", getKeyName(key).c_str()); RedisReply r = m_pipe->push(hgetall_key, REDIS_REPLY_ARRAY); redisReply *reply = r.getContext(); values.clear(); if (!reply->elements) return false; if (reply->elements & 1)//必須是偶數,鍵值對 throw system_error(make_error_code(errc::address_not_available), "Unable to connect netlink socket"); //整理鍵值對 for (unsigned int i = 0; i < reply->elements; i += 2) { values.emplace_back(stripSpecialSym(reply->element[i]->str), reply->element[i + 1]->str); } return true; }
SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName) { //鍵空間 m_keyspace = "__keyspace@"; m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*"; //訂閱鍵空間事件 psubscribe(m_db, m_keyspace); vector<string> keys; m_table.getKeys(keys); for (const auto &key: keys) { KeyOpFieldsValuesTuple kco; kfvKey(kco) = key; kfvOp(kco) = SET_COMMAND; if (!m_table.get(key, kfvFieldsValues(kco))) { continue; } m_buffer.push_back(kco); } }
該訂閱者實現了本身的數據讀取函數redis
void SubscriberStateTable::readData() { redisReply *reply = nullptr; /* Read data from redis. This call is non blocking. This method * is called from Select framework when data is available in socket. * NOTE: All data should be stored in event buffer. It won't be possible to * read them second time. */ if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } //將應答壓入鍵空間事件緩存中 m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply))); /* Try to read data from redis cacher. * If data exists put it to event buffer. * NOTE: Keyspace event is not persistent and it won't * be possible to read it second time. If it is not stared in * the buffer it will be lost. */ //循環獲取全部應答 reply = nullptr; int status; do { status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)); if(reply != nullptr && status == REDIS_OK) { m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply))); } } while(reply != nullptr && status == REDIS_OK); if (status != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } }
該類訂閱者本身實現了判斷是否還有數據,只要大於1,則認爲還有數據,相比之下比默認的接口更優。shell
bool SubscriberStateTable::hasCachedData() { return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1; }
該類訂閱者,指望的數據即在訂閱事件的返回應答中,應答中只是key和事件類型。若是不是del的話,須要根據具體的事件進行數據庫讀取。數據庫
shared_ptr<RedisReply> SubscriberStateTable::popEventBuffer() { if (m_keyspace_event_buffer.empty()) { return NULL; } auto reply = m_keyspace_event_buffer.front(); m_keyspace_event_buffer.pop_front(); return reply; } void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/) { vkco.clear(); if (!m_buffer.empty())//不爲空,則將其中的內容拷貝出來 { vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end()); m_buffer.clear(); return; } while (auto event = popEventBuffer())//提取信息 { KeyOpFieldsValuesTuple kco; /* if the Key-space notification is empty, try next one. */ if (event->getContext()->type == REDIS_REPLY_NIL) { continue; } assert(event->getContext()->type == REDIS_REPLY_ARRAY); size_t n = event->getContext()->elements; /* Expecting 4 elements for each keyspace pmessage notification */ //鍵空間的應答通常包含四個消息 if (n != 4) { SWSS_LOG_ERROR("invalid number of elements %lu for pmessage of %s", n, m_keyspace.c_str()); continue; } /* The second element should be the original pattern matched */ /* 第二個是命中的模式 */ auto ctx = event->getContext()->element[1]; if (m_keyspace != ctx->str) { SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } //第三個包含命中的key,冒號後面就是key ctx = event->getContext()->element[2]; string msg(ctx->str); size_t pos = msg.find(':'); if (pos == msg.npos) { SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } //冒號後面就是key,好比"VLAN_INTERFACE|Vlan1000|192.168.0.1/27" string table_entry = msg.substr(pos + 1); //獲取分割符號,分隔符前面是表名 pos = table_entry.find(m_table.getTableNameSeparator()); if (pos == table_entry.npos) { SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } string key = table_entry.substr(pos + 1); //最後一個是操做 ctx = event->getContext()->element[3]; if (strcmp("del", ctx->str) == 0) { kfvKey(kco) = key; kfvOp(kco) = DEL_COMMAND; } else { //執行get操做 if (!m_table.get(key, kfvFieldsValues(kco))) { SWSS_LOG_ERROR("Failed to get content for table key %s", table_entry.c_str()); continue; } kfvKey(kco) = key; kfvOp(kco) = SET_COMMAND; } vkco.push_back(kco); } m_keyspace_event_buffer.clear(); return; }