sonic消息傳遞機制與架構(1)

sonic是一個網絡操做系統,採用了大量的相互獨立的第三方開源組件,這些組件在依賴,編譯環境,庫,配置方式都有很大的不一樣。爲了讓這些組件在sonic中相互協做,互不干擾,同時儘可能不修改第三方組件的代碼,sonic採用容器技術爲各個組件提供獨立的運行環境,經過容器間共享網絡命名空間進行通訊。

​ 各個第三組件有各自的配置文件格式和消息格式,如何讓這些組件互通訊息了。sonic採用redis數據庫做爲消息傳遞平臺,經過純字符消息方式屏蔽各個組件的插件,經過膠水代碼將其粘起來。c++

sonic消息框架圖

clipboard.png

實現

sonic經過redis數據庫的發佈-訂閱機制和鍵空間事件機制實現了整個消息傳遞機制。redis

基類

class TableBase {
public:
    TableBase(int dbId, const std::string &tableName)
        : m_tableName(tableName), m_dbId(dbId)
    {
        /* Look up table separator for the provided DB */
        auto it = tableNameSeparatorMap.find(dbId);

        if (it != tableNameSeparatorMap.end())
        {
            m_tableSeparator = it->second;
        }
        else
        {
            SWSS_LOG_NOTICE("Unrecognized database ID. Using default table name separator ('%s')", TABLE_NAME_SEPARATOR_VBAR.c_str());
            m_tableSeparator = TABLE_NAME_SEPARATOR_VBAR;
        }
    }

    std::string getTableName() const { return m_tableName; }
    int getDbId() const { return m_dbId; }

    /* Return the actual key name as a combination of tableName<table_separator>key */
    std::string getKeyName(const std::string &key)
    {
        if (key == "") return m_tableName;
        else return m_tableName + m_tableSeparator + key;
    }

    /* Return the table name separator being used */
    std::string getTableNameSeparator() const
    {
        return m_tableSeparator;
    }

    std::string getChannelName() { return m_tableName + "_CHANNEL"; }
private:
    static const std::string TABLE_NAME_SEPARATOR_COLON;
    static const std::string TABLE_NAME_SEPARATOR_VBAR;
    static const TableNameSeparatorMap tableNameSeparatorMap;

    std::string m_tableName;
    std::string m_tableSeparator;
    int m_dbId;
};
class TableEntryWritable {
public:
    virtual ~TableEntryWritable() = default;

    /* Set an entry in the table */
    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX) = 0;
    /* Delete an entry in the table */
    virtual void del(const std::string &key,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX) = 0;

};

消費者基類

消費者響應生產者的事件,能夠採用阻塞或者輪詢的方式處理。sonic採用了異步事件通知機制(poll)進行處理。消費者類必須實現事件通知機制相關的接口。
RedisSelect

該類對異步通知機制Selectable(select,poll等)進行了封裝,集成該類的派生類能夠加入異步事件機制,經過集成該類,消費者能夠持續監聽事件。shell

class RedisSelect : public Selectable
{
public:
    /* The database is already alive and kicking, no need for more than a second */
    static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000;

    RedisSelect(int pri = 0);//調度優先級

    int getFd() override;
    void readData() override;
    bool hasCachedData() override;
    bool initializedWithData() override;
    void updateAfterRead() override;

    /* Create a new redisContext, SELECT DB and SUBSCRIBE */
    void subscribe(DBConnector* db, const std::string &channelName);

    /* PSUBSCRIBE */
    void psubscribe(DBConnector* db, const std::string &channelName);

    void setQueueLength(long long int queueLength);

protected:
    std::unique_ptr<DBConnector> m_subscribe;
    long long int m_queueLength;//接收的應答的個數,一個請求一個應答。
};
getFd
int RedisSelect::getFd()
{
    return m_subscribe->getContext()->fd;
}
readData
void RedisSelect::readData()
{
    redisReply *reply = nullptr;

    if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
        throw std::runtime_error("Unable to read redis reply");

    freeReplyObject(reply);
    m_queueLength++;//事件加一次,

    reply = nullptr;
    int status;
    do
    {
        status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
        if(reply != nullptr && status == REDIS_OK)
        {//一個響應加一次,該值會大於最終處理的循環次數,形成空轉,可是不加的話,極端狀況下會形成丟失信息問題
            m_queueLength++;
            freeReplyObject(reply);
        }
    }
    while(reply != nullptr && status == REDIS_OK);

    if (status != REDIS_OK)
    {
        throw std::runtime_error("Unable to read redis reply");
    }
}
hasCachedData
bool RedisSelect::hasCachedData()
{//判斷是否還有消息,存在消息的話,加入m_ready,保證已經讀出來的消息能被處理
    return m_queueLength > 1;
}
updateAfterRead
void RedisSelect::updateAfterRead()
{
    m_queueLength--;//假設一次處理一個應答,這裏減去1,即便一次處理了多個消息,依然只減掉1,形成空轉的根本緣由
}
setQueueLength
void RedisSelect::setQueueLength(long long int queueLength)
{
    m_queueLength = queueLength;//設置消息個數,用於構造函數
}
subscribe and psubscribe
/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void RedisSelect::subscribe(DBConnector* db, const std::string &channelName)
{
    m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT));

    /* Send SUBSCRIBE #channel command */
    std::string s("SUBSCRIBE ");
    s += channelName;
    RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY);
}

/* PSUBSCRIBE */
void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName)
{
    m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT));

    /*
     * Send PSUBSCRIBE #channel command on the
     * non-blocking subscriber DBConnector
     */
    std::string s("PSUBSCRIBE ");
    s += channelName;
    RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY);
}

消費者進一步封裝基類

class TableEntryPoppable {
public:
    virtual ~TableEntryPoppable() = default;

    /* Pop an action (set or del) on the table */
    virtual void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX) = 0;

    /* Get multiple pop elements */
    virtual void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX) = 0;
};

class TableConsumable : public TableBase, public TableEntryPoppable, public RedisSelect {
public:
    /* The default value of pop batch size is 128 */
    static constexpr int DEFAULT_POP_BATCH_SIZE = 128;//一次消費128條消息

    TableConsumable(int dbId, const std::string &tableName, int pri) : TableBase(dbId, tableName), RedisSelect(pri) { }
};

redis lua執行腳本

EVAL script numkeys key [key ...] arg [arg ...]
首先你們必定要知道eval的語法格式,其中:
   <1> script:     你的lua腳本
   <2> numkeys:  key的個數
   <3> key:         redis中各類數據結構的替代符號
   <4> arg:         你的自定義參數
ok,可能乍一看模板不是特別清楚,下面我能夠用官網的小案例演示一下:
eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 username age jack 20

上面這一串代碼大概是什麼意思呢? 第一個參數的字符串就是script,也就是lua腳本。2表示keys的個數,KEYS[1] 就是 username的佔位符, KEYS[2]就是age的佔位符,ARGV[1]就是jack的佔位符,ARGV[2]就是20的佔位符,,以此類推,,,因此最後的結果應該就是:{return username age jack 20} 是否是有點像C#中的佔位符:{0}呢?下面我在Redis中給你們演示一下:數據庫

admin@admin:~$ redis-cli
127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 username age jack 20
1) "username"
2) "age"
3) "jack"
4) "20"
127.0.0.1:6379>

而後咱們經過下面命令執行,這種方式和前面介紹的不同,參數 --eval script key1 key2 , arg1 age2 這種模式,key和value用一個逗號隔開就行了,最後咱們也看到了,數據都出來了,對吧.網絡

admin@admin:~$  redis-cli --eval t.lua username age , jack 20                 
1) "username"
2) "age"
3) "jack"
4) "20"
admin@admin:~$
注意上面的逗號左右兩邊都有空格
  • 腳本也能夠在REPL模式上執行,不過須要先加載腳本:
admin@admin:~$ redis-cli script load "$(cat t.lua)"                          
"a42059b356c875f0717db19a51f6aaca9ae659ea"
admin@admin:~$
admin@admin:~$ redis-cli
127.0.0.1:6379> EVALSHA a42059b356c875f0717db19a51f6aaca9ae659ea 2 username age jack 20
1) "username"
2) "age"
3) "jack"
4) "20"
127.0.0.1:6379>
  • lua腳本比較大小,須要使用函數tonumber將字符轉換成數字,而後比較大小
admin@admin:~$ cat flashsale.lua
local buyNum = ARGV[1]                        -- 本次購買的數量
local goodsKey = KEYS[1]                      -- 本次購買的商品名
local goodsNum = redis.call('get',goodsKey)   -- 獲取商品的庫存個數
if tonumber(goodsNum) >= tonumber(buyNum)     -- 庫存足夠,那麼出貨
    then redis.call('decrby',goodsKey,buyNum) -- 減小本次買的量
    return buyNum                             -- 返回購買的量
else
    return '0'                                -- 數量不夠,直接返回0
end
admin@admin:~$
admin@admin:~$ redis-cli --eval flashsale.lua "pets" , 8
"8"
admin@admin:~$
上面腳本實現的是一個簡單的秒殺程序
相關文章
相關標籤/搜索