sonic在處理路由,接口up/down,接口地址變化,team等事件上極大的依賴內核。sonic經過監聽rtnl事件來響應linux事件。從而感知相關信息變化。linux
sonic使用libnl庫來操做netlink事件,詳細內容能夠訪問http://www.infradead.org/~tgr...。sonic在libnl庫基礎上封裝了類NetLink進行netlink操做。c++
class NetLink : public Selectable { public: NetLink(int pri = 0); ~NetLink() override; void registerGroup(int rtnlGroup);//註冊想要監聽的事件,加入組播組 void dumpRequest(int rtmGetCommand); int getFd() override;//判斷句柄是否在select的可用事件中 void readData() override;//獲取socket中的信息,觸發回調函數 private: static int onNetlinkMsg(struct nl_msg *msg, void *arg);//回調函數 nl_sock *m_socket;//套接口描述符 };
NetLink::NetLink(int pri) : Selectable(pri), m_socket(NULL) { m_socket = nl_socket_alloc();//申請描述符 if (!m_socket) { SWSS_LOG_ERROR("Unable to allocated netlink socket"); throw system_error(make_error_code(errc::address_not_available), "Unable to allocated netlink socket"); } nl_socket_disable_seq_check(m_socket);//不進行序列號檢查 //註冊回調函數,讀取信息時,會自動回調該函數onNetlinkMsg nl_socket_modify_cb(m_socket, NL_CB_VALID, NL_CB_CUSTOM, onNetlinkMsg, this); //鏈接內核netlink int err = nl_connect(m_socket, NETLINK_ROUTE); if (err < 0) { SWSS_LOG_ERROR("Unable to connect netlink socket: %s", nl_geterror(err)); nl_socket_free(m_socket); m_socket = NULL; throw system_error(make_error_code(errc::address_not_available), "Unable to connect netlink socket"); } //非阻塞 nl_socket_set_nonblocking(m_socket); /* Set socket buffer size to 256KB */ nl_socket_set_buffer_size(m_socket, 2097152, 0); }
void NetLink::registerGroup(int rtnlGroup) { int err = nl_socket_add_membership(m_socket, rtnlGroup);//加入組播組 if (err < 0) { SWSS_LOG_ERROR("Unable to register to group %d: %s", rtnlGroup, nl_geterror(err)); throw system_error(make_error_code(errc::address_not_available), "Unable to register group"); } }
int NetLink::getFd()//獲取套接口句柄 { return nl_socket_get_fd(m_socket); }
void NetLink::readData() { int err; do { err = nl_recvmsgs_default(m_socket);//讀取數據,有libnl觸發回調函數,處理業務 } while(err == -NLE_INTR); // Retry if the process was interrupted by a signal if (err < 0) { if (err == -NLE_NOMEM) SWSS_LOG_ERROR("netlink reports out of memory on reading a netlink socket. High possiblity of a lost message"); else if (err == -NLE_AGAIN) SWSS_LOG_DEBUG("netlink reports NLE_AGAIN on reading a netlink socket"); else SWSS_LOG_ERROR("netlink reports an error=%d on reading a netlink socket", err); } }
//回調函數,讀取消息時回調該函數,該函數是一個消息分發器 int NetLink::onNetlinkMsg(struct nl_msg *msg, void *arg) { NetDispatcher::getInstance().onNetlinkMessage(msg); return NL_OK; }
void NetLink::dumpRequest(int rtmGetCommand)//用於獲取信息,實現get命令,查看內核相關信息 { int err = nl_rtgen_request(m_socket, rtmGetCommand, AF_UNSPEC, NLM_F_DUMP); if (err < 0) { SWSS_LOG_ERROR("Unable to request dump on group %d: %s", rtmGetCommand, nl_geterror(err)); throw system_error(make_error_code(errc::address_not_available), "Unable to request dump"); } }
class NetDispatcher { public: /**/ static NetDispatcher& getInstance();//獲取消息分發器實例,消息分發器全局一個,靜態函數 /* * Register callback class according to message-type. * * Throw exception if,註冊消息處理函數 */ void registerMessageHandler(int nlmsg_type, NetMsg *callback); /* * Called by NetLink or FpmLink classes as indication of new packet arrival * 給netlink的回調函數 */ void onNetlinkMessage(struct nl_msg *msg); private: NetDispatcher() = default; /* nl_msg_parse callback API */ static void nlCallback(struct nl_object *obj, void *context); std::map<int, NetMsg * > m_handlers;//回調函數存儲map }; class NetMsg { public: /* Called by NetDispatcher when netmsg matches filters */ virtual void onMsg(int nlmsg_type, struct nl_object *obj) = 0; }; }
NetDispatcher& NetDispatcher::getInstance()//消息分發器實例獲取函數 { static NetDispatcher gInstance;//定義了一個靜態分發器,全局一個 return gInstance; }
void NetDispatcher::registerMessageHandler(int nlmsg_type, NetMsg *callback)//註冊回調函數 { if (m_handlers.find(nlmsg_type) != m_handlers.end()) throw "Trying to registered on already registerd netlink message"; m_handlers[nlmsg_type] = callback; }
void NetDispatcher::nlCallback(struct nl_object *obj, void *context) { NetMsg *callback = (NetMsg *)context; callback->onMsg(nl_object_get_msgtype(obj), obj); }
void NetDispatcher::onNetlinkMessage(struct nl_msg *msg)//netlink回調函數的真實實現 { struct nlmsghdr *nlmsghdr = nlmsg_hdr(msg);//獲取netlink消息頭 auto callback = m_handlers.find(nlmsghdr->nlmsg_type);//獲取消息類型對應的NetMsg描述結構 /* Drop not registered messages */ if (callback == m_handlers.end())//沒有對應的消息處理函數 return; //解析消息,調用NetDispatcher::nlCallback nl_msg_parse(msg, NetDispatcher::nlCallback, (void *)(callback->second)); }
咱們以接口管理爲例進行說明。數據庫
class IntfSync : public NetMsg { public: enum { MAX_ADDR_SIZE = 64 }; IntfSync(DBConnector *db);//鏈接數據庫 virtual void onMsg(int nlmsg_type, struct nl_object *obj); private: ProducerStateTable m_intfTable; }; } //消息處理函數 void IntfSync::onMsg(int nlmsg_type, struct nl_object *obj) { char addrStr[MAX_ADDR_SIZE + 1] = {0}; struct rtnl_addr *addr = (struct rtnl_addr *)obj; string key; string scope = "global"; string family; //響應新地址,獲取地址,刪除地址三個信息 if ((nlmsg_type != RTM_NEWADDR) && (nlmsg_type != RTM_GETADDR) && (nlmsg_type != RTM_DELADDR)) return; /* Don't sync local routes,不一樣步local地址信息 */ if (rtnl_addr_get_scope(addr) != RT_SCOPE_UNIVERSE) { scope = "local"; return; } if (rtnl_addr_get_family(addr) == AF_INET) family = IPV4_NAME; else if (rtnl_addr_get_family(addr) == AF_INET6) family = IPV6_NAME; else // Not supported return; //獲取接口名字以及地址,組合成key key = LinkCache::getInstance().ifindexToName(rtnl_addr_get_ifindex(addr)); key+= ":"; nl_addr2str(rtnl_addr_get_local(addr), addrStr, MAX_ADDR_SIZE); key+= addrStr; if (nlmsg_type == RTM_DELADDR)//地址刪除,刪除key { m_intfTable.del(key); return; } //添加key std::vector<FieldValueTuple> fvVector; FieldValueTuple f("family", family); FieldValueTuple s("scope", scope); fvVector.push_back(s); fvVector.push_back(f); m_intfTable.set(key, fvVector); }
int main(int argc, char **argv) { swss::Logger::linkToDbNative("intfsyncd"); DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);//鏈接APPL_DB IntfSync sync(&db);//實例化netmsg //訂閱消息,加入組播組 NetDispatcher::getInstance().registerMessageHandler(RTM_NEWADDR, &sync); NetDispatcher::getInstance().registerMessageHandler(RTM_DELADDR, &sync); while (1) { try { NetLink netlink; Select s; netlink.registerGroup(RTNLGRP_IPV4_IFADDR); netlink.registerGroup(RTNLGRP_IPV6_IFADDR); cout << "Listens to interface messages..." << endl; netlink.dumpRequest(RTM_GETADDR);//打印全部地址 s.addSelectable(&netlink);//加入select事件 while (true) { Selectable *temps; s.select(&temps);//監聽select事件 } } catch (const std::exception& e) { cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl; return 0; } } return 1; }