sonic消息傳遞機制與架構(3)

本章節主要分析sonic使用redis的鍵空間消息機制實現的消息傳遞框架,該機制區別於發佈-訂閱機制在於發佈者不須要進行pubulish通知,只要往數據庫中寫入指定的鍵,redis就會通知監聽了該鍵空間的客戶端。該機制目前只用於監聽config_db,用於監聽config的變化。而後將其同步到app_db。使用該機制的案例有:VlanMgr,IntfMgr,portsyncd等,能夠經過orch包裝使用,好比VlanMgr;也能夠直接定義SubscriberStateTable表,好比portCfg。c++

redis鍵空間事件機制樣例

#在數據庫0中訂閱以tom開頭的鍵
127.0.0.1:6379> PSUBSCRIBE __keyspace@0__:tom*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyspace@0__:tom*"
3) (integer) 1

#在數據庫0中添加hash表tom
127.0.0.1:6379> HMSET tom|1 name tom age 28 
OK

#訂閱者獲得應答
1) "pmessage"
2) "__keyspace@0__:tom*"
3) "__keyspace@0__:tom|1"
4) "hset"

#刪除key
127.0.0.1:6379> DEL tom|1
(integer) 1
127.0.0.1:6379> 

1) "pmessage"
2) "__keyspace@0__:tom*"
3) "__keyspace@0__:tom|1"
4) "del"

鍵空間消息機制實現

class SubscriberStateTable : public ConsumerTableBase
{
public:
    SubscriberStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    /* Get all elements available */
    void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);

    /* Read keyspace event from redis */
    void readData() override;
    bool hasCachedData() override;
    bool initializedWithData() override
    {
        return !m_buffer.empty();
    }

private:
    /* Pop keyspace event from event buffer. Caller should free resources. */
    std::shared_ptr<RedisReply> popEventBuffer();

    std::string m_keyspace;
    //全部應答存儲在該隊列中
    std::deque<std::shared_ptr<RedisReply>> m_keyspace_event_buffer;
    Table m_table;//很是重要的一個成員,具體的
};

class Table

class Table : public TableBase, public TableEntryEnumerable {
public:
    Table(DBConnector *db, const std::string &tableName);
    Table(RedisPipeline *pipeline, const std::string &tableName, bool buffered);
    ~Table() override;

    /* Set an entry in the DB directly (op not in use) */
    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX);
    /* Delete an entry in the table */
    virtual void del(const std::string &key,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX);

    /* Read a value from the DB directly */
    /* Returns false if the key doesn't exists */
    virtual bool get(const std::string &key, std::vector<FieldValueTuple> &ovalues);

    void getKeys(std::vector<std::string> &keys);

    void setBuffered(bool buffered);

    void flush();

    void dump(TableDump &tableDump);

protected:

    bool m_buffered;
    bool m_pipeowned;
    RedisPipeline *m_pipe;

    /* Strip special symbols from keys used for type identification
     * Input example:
     *     port@
     * DB entry:
     * 1) "ports@"
     * 2) "Ethernet0,Ethernet4,...
     * */
    std::string stripSpecialSym(const std::string &key);
};

Table::Table

Table::Table(RedisPipeline *pipeline, const string &tableName, bool buffered)
    : TableBase(pipeline->getDbId(), tableName)
    , m_buffered(buffered)
    , m_pipeowned(false)
    , m_pipe(pipeline)
{
}

Table::get

bool Table::get(const string &key, vector<FieldValueTuple> &values)
{
    /*
        127.0.0.1:6379[4]> HGETALL "VLAN|Vlan1000"
        1) "vlanid"
        2) "1000"
        127.0.0.1:6379[4]> 
    */
    RedisCommand hgetall_key;
    hgetall_key.format("HGETALL %s", getKeyName(key).c_str());
    RedisReply r = m_pipe->push(hgetall_key, REDIS_REPLY_ARRAY);
    redisReply *reply = r.getContext();
    values.clear();

    if (!reply->elements)
        return false;

    if (reply->elements & 1)//必須是偶數,鍵值對
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to connect netlink socket");
    //整理鍵值對
    for (unsigned int i = 0; i < reply->elements; i += 2)
    {
        values.emplace_back(stripSpecialSym(reply->element[i]->str),
                                    reply->element[i + 1]->str);
    }

    return true;
}

SubscriberStateTable::SubscriberStateTable(......)

SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
    : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName)
{
    //鍵空間
    m_keyspace = "__keyspace@";

    m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*";
    //訂閱鍵空間事件
    psubscribe(m_db, m_keyspace);

    vector<string> keys;
    m_table.getKeys(keys);

    for (const auto &key: keys)
    {
        KeyOpFieldsValuesTuple kco;

        kfvKey(kco) = key;
        kfvOp(kco) = SET_COMMAND;

        if (!m_table.get(key, kfvFieldsValues(kco)))
        {
            continue;
        }

        m_buffer.push_back(kco);
    }
}

readData

該訂閱者實現了本身的數據讀取函數redis

void SubscriberStateTable::readData()
{
    redisReply *reply = nullptr;

    /* Read data from redis. This call is non blocking. This method
     * is called from Select framework when data is available in socket.
     * NOTE: All data should be stored in event buffer. It won't be possible to
     * read them second time. */
    if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
    {
        throw std::runtime_error("Unable to read redis reply");
    }
    //將應答壓入鍵空間事件緩存中
    m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply)));

    /* Try to read data from redis cacher.
     * If data exists put it to event buffer.
     * NOTE: Keyspace event is not persistent and it won't
     * be possible to read it second time. If it is not stared in
     * the buffer it will be lost. */
    //循環獲取全部應答
    reply = nullptr;
    int status;
    do
    {
        status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
        if(reply != nullptr && status == REDIS_OK)
        {
            m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply)));
        }
    }
    while(reply != nullptr && status == REDIS_OK);

    if (status != REDIS_OK)
    {
        throw std::runtime_error("Unable to read redis reply");
    }
}

hasCachedData

該類訂閱者本身實現了判斷是否還有數據,只要大於1,則認爲還有數據,相比之下比默認的接口更優。shell

bool SubscriberStateTable::hasCachedData()
{
    return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1;
}

pops

該類訂閱者,指望的數據即在訂閱事件的返回應答中,應答中只是key和事件類型。若是不是del的話,須要根據具體的事件進行數據庫讀取。數據庫

shared_ptr<RedisReply> SubscriberStateTable::popEventBuffer()
{
    if (m_keyspace_event_buffer.empty())
    {
        return NULL;
    }

    auto reply = m_keyspace_event_buffer.front();
    m_keyspace_event_buffer.pop_front();

    return reply;
}

void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
{
    vkco.clear();

    if (!m_buffer.empty())//不爲空,則將其中的內容拷貝出來
    {
        vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end());
        m_buffer.clear();
        return;
    }

    while (auto event = popEventBuffer())//提取信息
    {
        KeyOpFieldsValuesTuple kco;
        /* if the Key-space notification is empty, try next one. */
        if (event->getContext()->type == REDIS_REPLY_NIL)
        {
            continue;
        }

        assert(event->getContext()->type == REDIS_REPLY_ARRAY);
        size_t n = event->getContext()->elements;

        /* Expecting 4 elements for each keyspace pmessage notification */
        //鍵空間的應答通常包含四個消息
        if (n != 4)
        {
            SWSS_LOG_ERROR("invalid number of elements %lu for pmessage of %s", n, m_keyspace.c_str());
            continue;
        }
        /* The second element should be the original pattern matched */
        /* 第二個是命中的模式 */
        auto ctx = event->getContext()->element[1];
        if (m_keyspace != ctx->str)
        {
            SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
            continue;
        }
        //第三個包含命中的key,冒號後面就是key
        ctx = event->getContext()->element[2];
        string msg(ctx->str);
        size_t pos = msg.find(':');
        if (pos == msg.npos)
        {
            SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
            continue;
        }
        //冒號後面就是key,好比"VLAN_INTERFACE|Vlan1000|192.168.0.1/27"
        string table_entry = msg.substr(pos + 1);
        //獲取分割符號,分隔符前面是表名
        pos = table_entry.find(m_table.getTableNameSeparator());
        if (pos == table_entry.npos)
        {
            SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
            continue;
        }
        string key = table_entry.substr(pos + 1);
        //最後一個是操做
        ctx = event->getContext()->element[3];
        if (strcmp("del", ctx->str) == 0)
        {
            kfvKey(kco) = key;
            kfvOp(kco) = DEL_COMMAND;
        }
        else
        {   //執行get操做  
            if (!m_table.get(key, kfvFieldsValues(kco)))
            {
                SWSS_LOG_ERROR("Failed to get content for table key %s", table_entry.c_str());
                continue;
            }
            kfvKey(kco) = key;
            kfvOp(kco) = SET_COMMAND;
        }

        vkco.push_back(kco);
    }

    m_keyspace_event_buffer.clear();

    return;
}
相關文章
相關標籤/搜索