sonic處理netlink事件

sonic處理netlink事件

​ sonic在處理路由,接口up/down,接口地址變化,team等事件上極大的依賴內核。sonic經過監聽rtnl事件來響應linux事件。從而感知相關信息變化。linux

libnl

sonic使用libnl庫來操做netlink事件,詳細內容能夠訪問http://www.infradead.org/~tgr...。sonic在libnl庫基礎上封裝了類NetLink進行netlink操做。c++

NetLink

class NetLink : public Selectable {
public:
    NetLink(int pri = 0);
    ~NetLink() override;

    void registerGroup(int rtnlGroup);//註冊想要監聽的事件,加入組播組
    void dumpRequest(int rtmGetCommand);

    int getFd() override;//判斷句柄是否在select的可用事件中
    void readData() override;//獲取socket中的信息,觸發回調函數

private:
    static int onNetlinkMsg(struct nl_msg *msg, void *arg);//回調函數

    nl_sock *m_socket;//套接口描述符
};

NetLink::NetLink(int pri) :

NetLink::NetLink(int pri) :
    Selectable(pri), m_socket(NULL)
{
    m_socket = nl_socket_alloc();//申請描述符
    if (!m_socket)
    {
        SWSS_LOG_ERROR("Unable to allocated netlink socket");
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to allocated netlink socket");
    }

    nl_socket_disable_seq_check(m_socket);//不進行序列號檢查
    //註冊回調函數,讀取信息時,會自動回調該函數onNetlinkMsg
    nl_socket_modify_cb(m_socket, NL_CB_VALID, NL_CB_CUSTOM, onNetlinkMsg, this);
    //鏈接內核netlink
    int err = nl_connect(m_socket, NETLINK_ROUTE);
    if (err < 0)
    {
        SWSS_LOG_ERROR("Unable to connect netlink socket: %s", nl_geterror(err));
        nl_socket_free(m_socket);
        m_socket = NULL;
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to connect netlink socket");
    }
    //非阻塞
    nl_socket_set_nonblocking(m_socket);
    /* Set socket buffer size to 256KB */
    nl_socket_set_buffer_size(m_socket, 2097152, 0);
}

void NetLink::registerGroup(int rtnlGroup)

void NetLink::registerGroup(int rtnlGroup)
{
    int err = nl_socket_add_membership(m_socket, rtnlGroup);//加入組播組
    if (err < 0)
    {
        SWSS_LOG_ERROR("Unable to register to group %d: %s", rtnlGroup,
                       nl_geterror(err));
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to register group");
    }
}

int NetLink::getFd()

int NetLink::getFd()//獲取套接口句柄
{
    return nl_socket_get_fd(m_socket);
}

void NetLink::readData()

void NetLink::readData()
{
    int err;

    do
    {
        err = nl_recvmsgs_default(m_socket);//讀取數據,有libnl觸發回調函數,處理業務
    }
    while(err == -NLE_INTR); // Retry if the process was interrupted by a signal

    if (err < 0)
    {
        if (err == -NLE_NOMEM)
            SWSS_LOG_ERROR("netlink reports out of memory on reading a netlink socket. High possiblity of a lost message");
        else if (err == -NLE_AGAIN)
            SWSS_LOG_DEBUG("netlink reports NLE_AGAIN on reading a netlink socket");
        else
            SWSS_LOG_ERROR("netlink reports an error=%d on reading a netlink socket", err);
    }
}

int NetLink::onNetlinkMsg(......)

//回調函數,讀取消息時回調該函數,該函數是一個消息分發器
int NetLink::onNetlinkMsg(struct nl_msg *msg, void *arg)
{
    NetDispatcher::getInstance().onNetlinkMessage(msg);
    return NL_OK;
}

void NetLink::dumpRequest(.....)

void NetLink::dumpRequest(int rtmGetCommand)//用於獲取信息,實現get命令,查看內核相關信息
{
    int err = nl_rtgen_request(m_socket, rtmGetCommand, AF_UNSPEC, NLM_F_DUMP);
    if (err < 0)
    {
        SWSS_LOG_ERROR("Unable to request dump on group %d: %s", rtmGetCommand,
                       nl_geterror(err));
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to request dump");
    }
}

消息分發器

class NetDispatcher {
public:
    /**/
    static NetDispatcher& getInstance();//獲取消息分發器實例,消息分發器全局一個,靜態函數

    /*
     * Register callback class according to message-type.
     *
     * Throw exception if,註冊消息處理函數
     */
    void registerMessageHandler(int nlmsg_type, NetMsg *callback);

    /*
     * Called by NetLink or FpmLink classes as indication of new packet arrival
     * 給netlink的回調函數
     */
    void onNetlinkMessage(struct nl_msg *msg);

private:
    NetDispatcher() = default;

    /* nl_msg_parse callback API */
    static void nlCallback(struct nl_object *obj, void *context);

    std::map<int, NetMsg * > m_handlers;//回調函數存儲map
};
class NetMsg {
public:
    /* Called by NetDispatcher when netmsg matches filters */
    virtual void onMsg(int nlmsg_type, struct nl_object *obj) = 0;
};

}

NetDispatcher& NetDispatcher::getInstance()

NetDispatcher& NetDispatcher::getInstance()//消息分發器實例獲取函數
{
    static NetDispatcher gInstance;//定義了一個靜態分發器,全局一個
    return gInstance;
}

void NetDispatcher::registerMessageHandler()

void NetDispatcher::registerMessageHandler(int nlmsg_type, NetMsg *callback)//註冊回調函數
{
    if (m_handlers.find(nlmsg_type) != m_handlers.end())
        throw "Trying to registered on already registerd netlink message";

    m_handlers[nlmsg_type] = callback;
}

void NetDispatcher::nlCallback()

void NetDispatcher::nlCallback(struct nl_object *obj, void *context)
{
    NetMsg *callback = (NetMsg *)context;
    callback->onMsg(nl_object_get_msgtype(obj), obj);
}

void NetDispatcher::onNetlinkMessage()

void NetDispatcher::onNetlinkMessage(struct nl_msg *msg)//netlink回調函數的真實實現
{
    struct nlmsghdr *nlmsghdr = nlmsg_hdr(msg);//獲取netlink消息頭
    auto callback = m_handlers.find(nlmsghdr->nlmsg_type);//獲取消息類型對應的NetMsg描述結構

    /* Drop not registered messages */
    if (callback == m_handlers.end())//沒有對應的消息處理函數
        return;
    //解析消息,調用NetDispatcher::nlCallback 
    nl_msg_parse(msg, NetDispatcher::nlCallback, (void *)(callback->second));
}

使用實例

咱們以接口管理爲例進行說明。數據庫

實現NetMsg

class IntfSync : public NetMsg
{
public:
    enum { MAX_ADDR_SIZE = 64 };

    IntfSync(DBConnector *db);//鏈接數據庫

    virtual void onMsg(int nlmsg_type, struct nl_object *obj);

private:
    ProducerStateTable m_intfTable;
};

}
//消息處理函數
void IntfSync::onMsg(int nlmsg_type, struct nl_object *obj)
{
    char addrStr[MAX_ADDR_SIZE + 1] = {0};
    struct rtnl_addr *addr = (struct rtnl_addr *)obj;
    string key;
    string scope = "global";
    string family;
    //響應新地址,獲取地址,刪除地址三個信息
    if ((nlmsg_type != RTM_NEWADDR) && (nlmsg_type != RTM_GETADDR) &&
        (nlmsg_type != RTM_DELADDR))
        return;

    /* Don't sync local routes,不一樣步local地址信息 */
    if (rtnl_addr_get_scope(addr) != RT_SCOPE_UNIVERSE)
    {
        scope = "local";
        return;
    }

    if (rtnl_addr_get_family(addr) == AF_INET)
        family = IPV4_NAME;
    else if (rtnl_addr_get_family(addr) == AF_INET6)
        family = IPV6_NAME;
    else
        // Not supported
        return;
    //獲取接口名字以及地址,組合成key
    key = LinkCache::getInstance().ifindexToName(rtnl_addr_get_ifindex(addr));
    key+= ":";
    nl_addr2str(rtnl_addr_get_local(addr), addrStr, MAX_ADDR_SIZE);
    key+= addrStr;
    if (nlmsg_type == RTM_DELADDR)//地址刪除,刪除key
    {
        m_intfTable.del(key);
        return;
    }
    //添加key
    std::vector<FieldValueTuple> fvVector;
    FieldValueTuple f("family", family);
    FieldValueTuple s("scope", scope);
    fvVector.push_back(s);
    fvVector.push_back(f);
    m_intfTable.set(key, fvVector);
}

實現main

int main(int argc, char **argv)
{
    swss::Logger::linkToDbNative("intfsyncd");
    DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);//鏈接APPL_DB
    IntfSync sync(&db);//實例化netmsg
    //訂閱消息,加入組播組
    NetDispatcher::getInstance().registerMessageHandler(RTM_NEWADDR, &sync);
    NetDispatcher::getInstance().registerMessageHandler(RTM_DELADDR, &sync);

    while (1)
    {
        try
        {
            NetLink netlink;
            Select s;

            netlink.registerGroup(RTNLGRP_IPV4_IFADDR);
            netlink.registerGroup(RTNLGRP_IPV6_IFADDR);
            cout << "Listens to interface messages..." << endl;
            netlink.dumpRequest(RTM_GETADDR);//打印全部地址

            s.addSelectable(&netlink);//加入select事件
            while (true)
            {
                Selectable *temps;
                s.select(&temps);//監聽select事件
            }
        }
        catch (const std::exception& e)
        {
            cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;
            return 0;
        }
    }

    return 1;
}
相關文章
相關標籤/搜索