前面和你們分享了我在CPU、內存、磁盤上的一點淺薄的思考。今天開始咱們討論Linux裏最重要的一個模塊-網絡模塊。仍是按照慣例來,讓咱們從一段最簡單的代碼開始思考。爲了簡單起見,咱們用upd來舉例,以下:linux
int main(){ int serverSocketFd = socket(AF_INET, SOCK_DGRAM, 0); bind(serverSocketFd, ...); char buff[BUFFSIZE]; int readCount = recvfrom(serverSocketFd, buff, BUFFSIZE, 0, ...); buff[readCount] = '\0'; printf("Receive from client:%s\n", buff); }
上面代碼是很是簡單的一段upd server接收收據的邏輯。 當在開發視角看的時候,只要客戶端有對應的數據發送過來,服務器端執行recv_from
後就能收到它,並把它打印出來。咱們如今想知道的是,當網絡包達到網卡,直到咱們的recvfrom
收到數據,這中間,究竟都發生過什麼?api
咱們爲何要了解這麼底層呢?若是你負責的應用不是高併發的,流量也不大,確實沒有必要往下看。若是你負責的是爲百萬,千萬甚至過億用戶提供的服務,深刻理解Linux系統內部是如何實現的,以及各個部分之間是如何交互對你的工做將會有很是大的幫助。本文基於Linux 3.10,源代碼參見https://mirrors.edge.kernel.org/pub/linux/kernel/v3.x/,網卡驅動採用Intel的igb網卡舉例。數組
在TCP/IP網絡分層模型裏,整個協議棧被分紅了物理層、鏈路層、網絡層,傳輸層和應用層。物理層對應的是網卡和網線,應用層對應的是咱們常見的Nginx,FTP等等各類應用。Linux實現的是鏈路層、網絡層和傳輸層這三層。緩存
在Linux內核實現中,鏈路層協議靠網卡驅動來實現,內核協議棧來實現網絡層和傳輸層。內核對更上層的應用層提供socket接口來供用戶進程訪問。咱們用Linux的視角來看到的TCP/IP網絡分層模型應該是下面這個樣子的。服務器
在Linux的源代碼中,網絡設備驅動對應的邏輯位於driver/net/ethernet
, 其中intel系列網卡的驅動在driver/net/ethernet/intel
目錄下。協議棧模塊代碼位於kernel
和net
目錄。網絡
內核和網絡設備驅動是經過中斷的方式來處理的。當設備上有數據到達的時候,會給CPU的相關引腳上觸發一個電壓變化,以通知CPU來處理數據。對於網絡模塊來講,因爲處理過程比較複雜和耗時,若是在中斷函數中完成全部的處理,將會致使中斷處理函數(優先級太高)將過分佔據CPU,將致使CPU沒法響應其它設備,例如鼠標和鍵盤的消息。所以Linux中斷處理函數是分上半部和下半部的。上半部是隻進行最簡單的工做,快速處理而後釋放CPU,接着CPU就能夠容許其它中斷進來。剩下將絕大部分的工做都放到下半部中,能夠慢慢從容處理。2.4之後的內核版本採用的下半部實現方式是軟中斷,由ksoftirqd內核線程全權處理。和硬中斷不一樣的是,硬中斷是經過給CPU物理引腳施加電壓變化,而軟中斷是經過給內存中的一個變量的二進制值以通知軟中斷處理程序。數據結構
好了,大概瞭解了網卡驅動、硬中斷、軟中斷和ksoftirqd線程以後,咱們在這幾個概念的基礎上給出一個內核收包的路徑示意:併發
當網卡上收到數據之後,Linux中第一個工做的模塊是網絡驅動。 網絡驅動會以DMA的方式把網卡上收到的幀寫到內存裏。再向CPU發起一箇中斷,以通知CPU有數據到達。第二,當CPU收到中斷請求後,會去調用網絡驅動註冊的中斷處理函數。 網卡的中斷處理函數並不作過多工做,發出軟中斷請求,而後儘快釋放CPU。ksoftirqd檢測到有軟中斷請求到達,調用poll開始輪詢收包,收到後交由各級協議棧處理。對於UPD包來講,會被放到用戶socket的接收隊列中。框架
咱們從上面這張圖中已經從總體上把握到了Linux對數據包的處理過程。可是要想了解更多網絡模塊工做的細節,咱們還得往下看。socket
Linux驅動,內核協議棧等等模塊在具有接收網卡數據包以前,要作不少的準備工做才行。好比要提早建立好ksoftirqd內核線程,要註冊好各個協議對應的處理函數,王闊設備子系統要提早初始化好,網卡要啓動好。只有這些都Ready以後,咱們才能真正開始接收數據包。那麼咱們如今來看看這些準備工做都是怎麼作的。
Linux的軟中斷都是在專門的內核線程(ksoftirqd)中進行的,所以咱們很是有必要看一下這些進程是怎麼初始化的,這樣咱們才能在後面更準確地瞭解收包過程。該進程數量不是1個,而是N個,其中N等於你的機器的核數。
系統初始化的時候在kernel/smpboot.c中調用了smpboot_register_percpu_thread, 該函數進一步會執行到spawn_ksoftirqd(位於kernel/softirq.c)來建立出softirqd進程。
相關代碼以下:
//file: kernel/softirq.c static struct smp_hotplug_thread softirq_threads = { .store = &ksoftirqd, .thread_should_run = ksoftirqd_should_run, .thread_fn = run_ksoftirqd, .thread_comm = "ksoftirqd/%u", }; static __init int spawn_ksoftirqd(void) { register_cpu_notifier(&cpu_nfb); BUG_ON(smpboot_register_percpu_thread(&softirq_threads)); return 0; } early_initcall(spawn_ksoftirqd);
當ksoftirqd被建立出來之後,它就會進入本身的線程循環函數ksoftirqd_should_run和run_ksoftirqd了。不停地判斷有沒有軟中斷須要被處理。這裏須要注意的一點是,軟中斷不只僅只有網絡軟中斷,還有其它類型。
//file: include/linux/interrupt.h enum { HI_SOFTIRQ=0, TIMER_SOFTIRQ, NET_TX_SOFTIRQ, NET_RX_SOFTIRQ, BLOCK_SOFTIRQ, BLOCK_IOPOLL_SOFTIRQ, TASKLET_SOFTIRQ, SCHED_SOFTIRQ, HRTIMER_SOFTIRQ, RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */ NR_SOFTIRQS };
linux內核經過調用subsys_initcall
來初始化各個子系統,在源代碼目錄裏你能夠grep出許多對這個函數的調用。這裏咱們要說的是網絡子系統的初始化,會執行到net_dev_init
函數。
//file: net/core/dev.c static int __init net_dev_init(void) { ...... for_each_possible_cpu(i) { struct softnet_data *sd = &per_cpu(softnet_data, i); memset(sd, 0, sizeof(*sd)); skb_queue_head_init(&sd->input_pkt_queue); skb_queue_head_init(&sd->process_queue); sd->completion_queue = NULL; INIT_LIST_HEAD(&sd->poll_list); ...... } ...... open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); } subsys_initcall(net_dev_init);
在這個函數裏,會爲每一個CPU都申請一個softnet_data
數據結構,在這個數據結構裏的poll_list
是等待驅動程序將其poll函數註冊進來,稍後網卡驅動初始化的時候咱們能夠看到這一過程。
另外open_softirq註冊了每一種軟中斷都註冊一個處理函數。 NET_TX_SOFTIRQ的處理函數爲net_tx_action,NET_RX_SOFTIRQ的爲net_rx_action。繼續跟蹤open_softirq
後發現這個註冊的方式是記錄在softirq_vec
變量裏的。後面ksoftirqd線程收到軟中斷的時候,也會使用這個變量來找到每一種軟中斷對應的處理函數。
//file: kernel/softirq.c void open_softirq(int nr, void (*action)(struct softirq_action *)) { softirq_vec[nr].action = action; }
內核實現了網絡層的ip協議,也實現了傳輸層的tcp協議和udp協議。 這些協議對應的實現函數分別是ip_rcv(),tcp_v4_rcv()和upd_rcv()。和咱們平時寫代碼的方式不同的是,內核是經過註冊的方式來實現的。 Linux內核中的fs_initcall
和subsys_initcall
相似,也是初始化模塊的入口。fs_initcall
調用inet_init
後開始網絡協議棧註冊。 經過inet_init
,將這些函數註冊到了inet_protos和ptype_base數據結構中了。以下圖:
相關代碼以下
//file: net/ipv4/af_inet.c static struct packet_type ip_packet_type __read_mostly = { .type = cpu_to_be16(ETH_P_IP), .func = ip_rcv, }; static const struct net_protocol udp_protocol = { .handler = udp_rcv, .err_handler = udp_err, .no_policy = 1, .netns_ok = 1, }; static const struct net_protocol tcp_protocol = { .early_demux = tcp_v4_early_demux, .handler = tcp_v4_rcv, .err_handler = tcp_v4_err, .no_policy = 1, .netns_ok = 1, }; static int __init inet_init(void) { ...... if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0) pr_crit("%s: Cannot add ICMP protocol\n", __func__); if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0) pr_crit("%s: Cannot add UDP protocol\n", __func__); if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0) pr_crit("%s: Cannot add TCP protocol\n", __func__); ...... dev_add_pack(&ip_packet_type); }
上面的代碼中咱們能夠看到,udp_protocol結構體中的handler是udp_rcv,tcp_protocol結構體中的handler是tcp_v4_rcv,經過inet_add_protocol被初始化了進來。
int inet_add_protocol(const struct net_protocol *prot, unsigned char protocol) { if (!prot->netns_ok) { pr_err("Protocol %u is not namespace aware, cannot register.\n", protocol); return -EINVAL; } return !cmpxchg((const struct net_protocol **)&inet_protos[protocol], NULL, prot) ? 0 : -1; }
inet_add_protocol
函數將tcp和upd對應的處理函數都註冊到了inet_protos數組中了。再看dev_add_pack(&ip_packet_type);
這一行,ip_packet_type結構體中的type是協議名,func是ip_rcv函數,在dev_add_pack中會被註冊到ptype_base哈希表中。
//file: net/core/dev.c void dev_add_pack(struct packet_type *pt) { struct list_head *head = ptype_head(pt); ...... } static inline struct list_head *ptype_head(const struct packet_type *pt) { if (pt->type == htons(ETH_P_ALL)) return &ptype_all; else return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK]; }
這裏咱們須要記住inet_protos記錄着upd,tcp的處理函數地址,ptype_base存儲着ip_rcv()函數的處理地址。後面咱們會看到軟中斷中會經過ptype_base找到ip_rcv函數地址,進而將ip包正確地送到ip_rcv()中執行。在ip_rcv中將會經過inet_protos找到tcp或者upd的處理函數,再而把包轉發給upd_rcv()或tcp_v4_rcv()函數。
擴展一下,若是看一下ip_rcv和upd_rcv等函數的代碼能看到不少協議的處理過程。例如,ip_rcv中會處理netfilter和iptable過濾,若是你有不少或者很複雜的 netfilter 或 iptables 規則,這些規則都是在軟中斷的上下文中執行的,會加大網絡延遲。再例如,upd_rcv中會判斷socket接收隊列是否滿了。對應的相關內核參數是net.core.rmem_max和net.core.rmem_default。若是有興趣,建議你們好好讀一下inet_init
這個函數的代碼。
每個驅動程序(不只僅只是網卡驅動)會使用 module_init 向內核註冊一個初始化函數,當驅動被加載時,內核會調用這個函數。好比igb網卡驅動的代碼位於drivers/net/ethernet/intel/igb/igb_main.c
//file: drivers/net/ethernet/intel/igb/igb_main.c static struct pci_driver igb_driver = { .name = igb_driver_name, .id_table = igb_pci_tbl, .probe = igb_probe, .remove = igb_remove, ...... }; static int __init igb_init_module(void) { ...... ret = pci_register_driver(&igb_driver); return ret; }
驅動的pci_register_driver
調用完成後,Linux內核就知道了該驅動的相關信息,好比igb網卡驅動的igb_driver_name
和igb_probe
函數地址等等。當網卡設備被識別之後,內核會調用其驅動的probe方法(igb_driver的probe方法是igb_probe)。驅動probe方法執行的目的就是讓設備ready,對於igb網卡,其igb_probe
位於drivers/net/ethernet/intel/igb/igb_main.c下。主要執行的操做以下:
第5步中咱們看到,網卡驅動實現了ethtool所須要的接口,也在這裏註冊完成函數地址的註冊。當 ethtool 發起一個系統調用以後,內核會找到對應操做的回調函數。對於igb網卡來講,其實現函數都在drivers/net/ethernet/intel/igb/igb_ethtool.c下。 相信你此次能完全理解ethtool的工做原理了吧? 這個命令之因此能查看網卡收發包統計、能修改網卡自適應模式、能調整RX 隊列的數量和大小,是由於ethtool命令最終調用到了網卡驅動的相應方法,而不是ethtool自己有這個超能力。
第6步註冊的igb_netdev_ops中包含的是igb_open等函數,該函數在網卡被啓動的時候會被調用。
//file: drivers/net/ethernet/intel/igb/igb_main.c ...... static const struct net_device_ops igb_netdev_ops = { .ndo_open = igb_open, .ndo_stop = igb_close, .ndo_start_xmit = igb_xmit_frame, .ndo_get_stats64 = igb_get_stats64, .ndo_set_rx_mode = igb_set_rx_mode, .ndo_set_mac_address = igb_set_mac, .ndo_change_mtu = igb_change_mtu, .ndo_do_ioctl = igb_ioctl,......
第7步中,在igb_probe初始化過程當中,還調用到了igb_alloc_q_vector
。他註冊了一個NAPI機制所必須的poll函數,對於igb網卡驅動來講,這個函數就是igb_poll,以下代碼所示。
static int igb_alloc_q_vector(struct igb_adapter *adapter, int v_count, int v_idx, int txr_count, int txr_idx, int rxr_count, int rxr_idx) { ...... /* initialize NAPI */ netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64); }
當上面的初始化都完成之後,就能夠啓動網卡了。回憶前面網卡驅動初始化時,咱們提到了驅動向內核註冊了 structure net_device_ops 變量,它包含着網卡啓用、發包、設置mac 地址等回調函數(函數指針)。當啓用一個網卡時(例如,經過 ifconfig eth0 up),net_device_ops 中的 igb_open方法會被調用。它一般會作如下事情:
//file: drivers/net/ethernet/intel/igb/igb_main.c static int __igb_open(struct net_device *netdev, bool resuming) { /* allocate transmit descriptors */ err = igb_setup_all_tx_resources(adapter); /* allocate receive descriptors */ err = igb_setup_all_rx_resources(adapter); /* 註冊中斷處理函數 */ err = igb_request_irq(adapter); if (err) goto err_req_irq; /* 啓用NAPI */ for (i = 0; i < adapter->num_q_vectors; i++) napi_enable(&(adapter->q_vector[i]->napi)); ...... }
在上面__igb_open
函數調用了igb_setup_all_tx_resources,和igb_setup_all_rx_resources。在igb_setup_all_rx_resources
這一步操做中,分配了RingBuffer,並創建內存和Rx隊列的映射關係。(Rx Tx 隊列的數量和大小能夠經過 ethtool 進行配置)。咱們再接着看中斷函數註冊igb_request_irq
:
static int igb_request_irq(struct igb_adapter *adapter) { if (adapter->msix_entries) { err = igb_request_msix(adapter); if (!err) goto request_done; ...... } } static int igb_request_msix(struct igb_adapter *adapter) { ...... for (i = 0; i < adapter->num_q_vectors; i++) { ... err = request_irq(adapter->msix_entries[vector].vector, igb_msix_ring, 0, q_vector->name, }
在上面的代碼中跟蹤函數調用, __igb_open
=> igb_request_irq
=> igb_request_msix
, 在igb_request_msix
中咱們看到了,對於多隊列的網卡,爲每個隊列都註冊了中斷,其對應的中斷處理函數是igb_msix_ring(該函數也在drivers/net/ethernet/intel/igb/igb_main.c下)。 咱們也能夠看到,msix方式下,每一個 RX 隊列有獨立的MSI-X 中斷,從網卡硬件中斷的層面就能夠設置讓收到的包被不一樣的 CPU處理。(能夠經過 irqbalance ,或者修改 /proc/irq/IRQ_NUMBER/smp_affinity可以修改和CPU的綁定行爲)。
當作好以上準備工做之後,就能夠開門迎客(數據包)了!
首先當數據幀從網線到達網卡上的時候,第一站是網卡的接收隊列。網卡在分配給本身的RingBuffer中尋找可用的內存位置,找到後DMA引擎會把數據DMA到網卡以前關聯的內存裏,這個時候CPU都是無感的。當DMA操做完成之後,網卡會像CPU發起一個硬中斷,通知CPU有數據到達。
注意:當RingBuffer滿的時候,新來的數據包將給丟棄。ifconfig查看網卡的時候,能夠裏面有個overruns,表示由於環形隊列滿被丟棄的包。若是發現有丟包,可能須要經過ethtool命令來加大環形隊列的長度。
在啓動網卡一節,咱們說到了網卡的硬中斷註冊的處理函數是igb_msix_ring。
//file: drivers/net/ethernet/intel/igb/igb_main.c static irqreturn_t igb_msix_ring(int irq, void *data) { struct igb_q_vector *q_vector = data; /* Write the ITR value calculated from the previous interrupt. */ igb_write_itr(q_vector); napi_schedule(&q_vector->napi); return IRQ_HANDLED; }
igb_write_itr
只是記錄一下硬件中斷頻率(聽說目的是在減小對CPU的中斷頻率時用到)。順着napi_schedule調用一路跟蹤下去,__napi_schedule
=>____napi_schedule
/* Called with irq disabled */ static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi) { list_add_tail(&napi->poll_list, &sd->poll_list); __raise_softirq_irqoff(NET_RX_SOFTIRQ); }
這裏咱們看到,list_add_tail
修改了CPU變量softnet_data裏的poll_list,將驅動napi_struct傳過來的poll_list添加了進來。
其中softnet_data中的poll_list是一個雙向列表,其中的設備都帶有輸入幀等着被處理。緊接着__raise_softirq_irqoff
觸發了一個軟中斷NET_RX_SOFTIRQ, 這個所謂的觸發過程只是對一個變量進行了一次或運算而已。
void __raise_softirq_irqoff(unsigned int nr) { trace_softirq_raise(nr); or_softirq_pending(1UL << nr); }
//file: include/linux/irq_cpustat.h #define or_softirq_pending(x) (local_softirq_pending() |= (x))
咱們說過,Linux在硬中斷裏只完成簡單必要的工做,剩下的大部分的處理都是轉交給軟中斷的。經過上面代碼能夠看到,硬中斷處理過程真的是很是短。只是記錄了一個寄存器,修改了一下下CPU的poll_list,而後發出個軟中斷。就這麼簡單,硬中斷工做就算是完成了。
內核線程初始化的時候,咱們介紹了ksoftirqd中兩個線程函數ksoftirqd_should_run
和run_ksoftirqd
。其中ksoftirqd_should_run
代碼以下:
static int ksoftirqd_should_run(unsigned int cpu) { return local_softirq_pending(); } #define local_softirq_pending() \ __IRQ_STAT(smp_processor_id(), __softirq_pending)
這裏看到和硬中斷中調用了同一個函數local_softirq_pending
。使用方式不一樣的是硬中斷位置是爲了寫入標記,這裏僅僅只是讀取。若是硬中斷中設置了NET_RX_SOFTIRQ
,這裏天然能讀取的到。接下來會真正進入線程函數中run_ksoftirqd
處理:
static void run_ksoftirqd(unsigned int cpu) { local_irq_disable(); if (local_softirq_pending()) { __do_softirq(); rcu_note_context_switch(cpu); local_irq_enable(); cond_resched(); return; } local_irq_enable(); }
在__do_softirq
中,判斷根據當前CPU的軟中斷類型,調用其註冊的action方法。
asmlinkage void __do_softirq(void) { do { if (pending & 1) { unsigned int vec_nr = h - softirq_vec; int prev_count = preempt_count(); ... trace_softirq_entry(vec_nr); h->action(h); trace_softirq_exit(vec_nr); ... } h++; pending >>= 1; } while (pending); }
在網絡子系統初始化小節, 咱們看到咱們爲NET_RX_SOFTIRQ註冊了處理函數net_rx_action。因此net_rx_action
函數就會被執行到了。
這裏須要注意一個細節,硬中斷中設置軟中斷標記,和ksoftirq的判斷是否有軟中斷到達,都是基於smp_processor_id()的。這意味着只要硬中斷在哪一個CPU上被響應,那麼軟中斷也是在這個CPU上處理的。因此說,若是你發現你的Linux軟中斷CPU消耗都集中在一個核上的話,作法是要把調整硬中斷的CPU親和性,來將硬中斷打散到不通的CPU核上去。
咱們再來把精力集中到這個核心函數net_rx_action
上來。
static void net_rx_action(struct softirq_action *h) { struct softnet_data *sd = &__get_cpu_var(softnet_data); unsigned long time_limit = jiffies + 2; int budget = netdev_budget; void *have; local_irq_disable(); while (!list_empty(&sd->poll_list)) { ...... n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list); work = 0; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); trace_napi_poll(n); } budget -= work; } }
函數開頭的time_limit和budget是用來控制net_rx_action函數主動退出的,目的是保證網絡包的接收不霸佔CPU不放。 等下次網卡再有硬中斷過來的時候再處理剩下的接收數據包。其中budget能夠經過內核參數調整。 這個函數中剩下的核心邏輯是獲取到當前CPU變量softnet_data,對其poll_list進行遍歷, 而後執行到網卡驅動註冊到的poll函數。對於igb網卡來講,就是igb驅動力的igb_poll
函數了。
/** * igb_poll - NAPI Rx polling callback * @napi: napi polling structure * @budget: count of how many packets we should handle **/ static int igb_poll(struct napi_struct *napi, int budget) { ... if (q_vector->tx.ring) clean_complete = igb_clean_tx_irq(q_vector); if (q_vector->rx.ring) clean_complete &= igb_clean_rx_irq(q_vector, budget); ... }
在讀取操做中,igb_poll
的重點工做是對igb_clean_rx_irq
的調用。
static bool igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget) { ... do { /* retrieve a buffer from the ring */ skb = igb_fetch_rx_buffer(rx_ring, rx_desc, skb); /* fetch next buffer in frame if non-eop */ if (igb_is_non_eop(rx_ring, rx_desc)) continue; } /* verify the packet layout is correct */ if (igb_cleanup_headers(rx_ring, rx_desc, skb)) { skb = NULL; continue; } /* populate checksum, timestamp, VLAN, and protocol */ igb_process_skb_fields(rx_ring, rx_desc, skb); napi_gro_receive(&q_vector->napi, skb); }
igb_fetch_rx_buffer
和igb_is_non_eop
的做用就是把數據幀從RingBuffer上取下來。爲何須要兩個函數呢?由於有可能幀要佔多多個RingBuffer,因此是在一個循環中獲取的,直到幀尾部。獲取下來的一個數據幀用一個sk_buff來表示。收取完數據之後,對其進行一些校驗,而後開始設置sbk變量的timestamp, VLAN id, protocol等字段。接下來進入到napi_gro_receive中:
//file: net/core/dev.c gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb) { skb_gro_reset_offset(skb); return napi_skb_finish(dev_gro_receive(napi, skb), skb); }
dev_gro_receive
這個函數表明的是網卡GRO特性,能夠簡單理解成把相關的小包合併成一個大包就行,目的是減小傳送給網絡棧的包數,這有助於減小 CPU 的使用量。咱們暫且忽略,直接看napi_skb_finish
, 這個函數主要就是調用了netif_receive_skb
。
//file: net/core/dev.c static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb) { switch (ret) { case GRO_NORMAL: if (netif_receive_skb(skb)) ret = GRO_DROP; break; ...... }
在netif_receive_skb
中,數據包將被送到協議棧中。
netif_receive_skb
函數會根據包的協議,假如是upd包,會將包依次送到ip_rcv(),upd_rcv()協議處理函數中進行處理。
//file: net/core/dev.c int netif_receive_skb(struct sk_buff *skb) { //RPS處理邏輯,先忽略 ...... return __netif_receive_skb(skb); } static int __netif_receive_skb(struct sk_buff *skb) { ...... ret = __netif_receive_skb_core(skb, false); } static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc) { ...... //pcap邏輯,這裏會將數據送入抓包點。tcpdump就是從這個入口獲取包的 list_for_each_entry_rcu(ptype, &ptype_all, list) { if (!ptype->dev || ptype->dev == skb->dev) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } } ...... list_for_each_entry_rcu(ptype, &ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) { if (ptype->type == type && (ptype->dev == null_or_dev || ptype->dev == skb->dev || ptype->dev == orig_dev)) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } } }
在__netif_receive_skb_core
中,我看着原來常用的tcpdump的抓包點,非常激動,看來讀一遍源代碼時間真的沒白浪費。接着__netif_receive_skb_core
取出protocol,它會從數據包中取出協議信息,而後遍歷註冊在這個協議上的回調函數列表。ptype_base
是一個 hash table,在協議註冊小節咱們提到過。ip_rcv 函數地址就是存在這個 hash table中的。
//file: net/core/dev.c static inline int deliver_skb(struct sk_buff *skb, struct packet_type *pt_prev, struct net_device *orig_dev) { ...... return pt_prev->func(skb, skb->dev, pt_prev, orig_dev); }
pt_prev->func
這一行就調用到了協議層註冊的處理函數了。對於ip包來將,就會進入到ip_rcv
(若是是arp包的話,會進入到arp_rcv)。
咱們再來大體看一下linux在ip協議層都作了什麼,包又是怎麼樣進一步被送到udp或tcp協議處理函數中的。
//file: net/ipv4/ip_input.c int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev) { ...... return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish); }
這裏NF_HOOK
是一個鉤子函數,當執行完註冊的鉤子後就會執行到最後一個參數指向的函數ip_rcv_finish
。
static int ip_rcv_finish(struct sk_buff *skb) { ...... if (!skb_dst(skb)) { int err = ip_route_input_noref(skb, iph->daddr, iph->saddr, iph->tos, skb->dev); ... } ...... return dst_input(skb); }
跟蹤ip_route_input_noref
後看到它又調用了 ip_route_input_mc
。 在ip_route_input_mc
中,函數ip_local_deliver
被賦值給了dst.input
, 以下:
//file: net/ipv4/route.c static int ip_route_input_mc(struct sk_buff *skb, __be32 daddr, __be32 saddr, u8 tos, struct net_device *dev, int our) { if (our) { rth->dst.input= ip_local_deliver; rth->rt_flags |= RTCF_LOCAL; } }
因此回到ip_rcv_finish
中的return dst_input(skb);
。
/* Input packet from network to transport. */ static inline int dst_input(struct sk_buff *skb) { return skb_dst(skb)->input(skb); }
skb_dst(skb)->input
調用的input方法就是路由子系統賦的ip_local_deliver。
//file: net/ipv4/ip_input.c int ip_local_deliver(struct sk_buff *skb) { /* * Reassemble IP fragments. */ if (ip_is_fragment(ip_hdr(skb))) { if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER)) return 0; } return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL, ip_local_deliver_finish); }
static int ip_local_deliver_finish(struct sk_buff *skb) { ...... int protocol = ip_hdr(skb)->protocol; const struct net_protocol *ipprot; ipprot = rcu_dereference(inet_protos[protocol]); if (ipprot != NULL) { ret = ipprot->handler(skb); } }
如協議註冊小節看到inet_protos中保存着tcp_rcv()和udp_rcv()的函數地址。這裏將會根據包中的協議類型選擇進行分發,在這裏skb包將會進一步被派送到更上層的協議中,udp和tcp。
在協議註冊小節的時候咱們說過,udp協議的處理函數是udp_rcv
。
//file: net/ipv4/udp.c int udp_rcv(struct sk_buff *skb) { return __udp4_lib_rcv(skb, &udp_table, IPPROTO_UDP); } int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable, int proto) { sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable); if (sk != NULL) { int ret = udp_queue_rcv_skb(sk, skb } icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0); }
__udp4_lib_lookup_skb
是根據skb來尋找對應的socket,當找到之後將數據包放到socket的緩存隊列裏。若是沒有找到,則發送一個目標不可達的icmp包。
//file: net/ipv4/udp.c int udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb) { ...... if (sk_rcvqueues_full(sk, skb, sk->sk_rcvbuf)) goto drop; rc = 0; ipv4_pktinfo_prepare(skb); bh_lock_sock(sk); if (!sock_owned_by_user(sk)) rc = __udp_queue_rcv_skb(sk, skb); else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) { bh_unlock_sock(sk); goto drop; } bh_unlock_sock(sk); return rc; }
sock_owned_by_user判斷的是用戶是否是正在這個socker上進行系統調用(socket被佔用),若是沒有,那就能夠直接放到socket的接收隊列中。若是有,那就經過sk_add_backlog
把數據包添加到backlog隊列。 當用戶釋放的socket的時候,內核會檢查backlog隊列,若是有數據再移動到接收隊列中。
sk_rcvqueues_full
接收隊列若是滿了的話,將直接把包丟棄。接收隊列大小受內核參數net.core.rmem_max和net.core.rmem_default影響。
花開兩朵,各表一枝。 上面咱們說完了整個Linux內核對數據包的接收和處理過程,最後把數據包放到socket的接收隊列中了。那麼咱們再回頭看用戶進程調用recvfrom
後是發生了什麼。 咱們在代碼裏調用的recvfrom
是一個glibc的庫函數,該函數在執行後會將用戶進行陷入到內核態,進入到Linux實現的系統調用sys_recvfrom
。在理解Linux對sys_recvfrom
以前,咱們先來簡單看一下socket
這個核心數據結構。這個數據結構太大了,咱們只把對和咱們今天主題相關的內容畫出來,以下:
socket
數據結構中的const struct proto_ops
對應的是協議的方法集合。每一個協議都會實現不一樣的方法集,對於IPv4 Internet協議族來講,每種協議都有對應的處理方法,以下。對於udp來講,是經過inet_dgram_ops
來定義的,其中註冊了inet_recvmsg
方法。
//file: net/ipv4/af_inet.c const struct proto_ops inet_stream_ops = { ...... .recvmsg = inet_recvmsg, .mmap = sock_no_mmap, ...... } const struct proto_ops inet_dgram_ops = { ...... .sendmsg = inet_sendmsg, .recvmsg = inet_recvmsg, ...... }
socket
數據結構中的另外一個數據結構struct sock *sk
是一個很是大,很是重要的子結構體。其中的sk_prot
又定義了二級處理函數。對於UPD協議來講,會被設置成UPD協議實現的方法集udp_prot
。
//file: net/ipv4/udp.c struct proto udp_prot = { .name = "UDP", .owner = THIS_MODULE, .close = udp_lib_close, .connect = ip4_datagram_connect, ...... .sendmsg = udp_sendmsg, .recvmsg = udp_recvmsg, .sendpage = udp_sendpage, ...... }
看完了socket
變量以後,咱們再來看sys_revvfrom
的實現過程。
在inet_recvmsg
調用了sk->sk_prot->recvmsg
。
//file: net/ipv4/af_inet.c int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t size, int flags) { ...... err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT, flags & ~MSG_DONTWAIT, &addr_len); if (err >= 0) msg->msg_namelen = addr_len; return err; }
上面咱們說過這個對於upd協議的socket來講,這個sk_prot
就是net/ipv4/udp.c
下的struct proto udp_prot
。由此咱們找到了udp_recvmsg
方法。
//file: net/core/datagram.c:EXPORT_SYMBOL(__skb_recv_datagram); struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags, int *peeked, int *off, int *err) { ...... do { struct sk_buff_head *queue = &sk->sk_receive_queue; skb_queue_walk(queue, skb) { ...... } /* User doesn't want to wait */ error = -EAGAIN; if (!timeo) goto no_packet; } while (!wait_for_more_packets(sk, err, &timeo, last)); } }
終於咱們找到了咱們想要看的重點,在上面咱們看到了所謂的讀取過程,就是訪問sk->sk_receive_queue
。若是沒有數據,且用戶也容許等待,則將調用wait_for_more_packets()執行等待操做,它加入會讓用戶進程進入睡眠狀態。
網絡模塊是Linux內核中最複雜的模塊了,看起來一個簡簡單單的收包過程就涉及到許多內核組件之間的交互,如網卡驅動、協議棧,內核ksoftirqd線程等。
看起來很複雜,本文想經過圖示的方式,儘可能以容易理解的方式來將內核收包過程講清楚。如今讓咱們再串一串整個收包過程。
當用戶執行完recvfrom
調用後,用戶進程就經過系統調用進行到內核態工做了。若是接收隊列沒有數據,進程就進入睡眠狀態被操做系統掛起。這塊相對比較簡單,剩下大部分的戲份都是由Linux內核其它模塊來表演了。
首先在開始收包以前,Linux要作許多的準備工做:
以上是內核準備收包以前的重要工做,當上面都ready以後,就能夠打開硬中斷,等待數據包的到來了。
當數據到到來了之後,第一個迎接它的是網卡(我去,這不是廢話麼):
如今咱們能夠回到開篇的問題了,咱們在用戶層看到的簡單一行recvfrom
,Linux內核要替咱們作如此之多的工做,才能讓咱們順利收到數據。這仍是簡簡單單的UDP,若是是TCP,內核要作的工做更多,不禁得感嘆內核的開發者們真的是用心良苦。
理解了整個收包過程之後,咱們就能明確知道Linux收一個包的CPU開銷了。首先第一塊是用戶進程調用系統調用陷入內核態的開銷。第二塊是CPU響應包的硬中斷的CPU開銷。第三塊是ksoftirqd內核線程的軟中斷上下文花費的。後面咱們再專門發一篇文章實際觀察一下這些開銷。
另外網絡收發中有不少末只細節我們並無展開了說,好比說no NAPI, GRO,RPS等。由於我以爲說的太對了反而會影響你們對整個流程的把握,因此儘可能只保留主框架了,少便是多!
開發內功修煉之硬盤篇專輯:
個人公衆號是「開發內功修煉」,在這裏我不是單純介紹技術理論,也不僅介紹實踐經驗。而是把理論與實踐結合起來,用實踐加深對理論的理解、用理論提升你的技術實踐能力。歡迎你來關注個人公衆號,也請分享給你的好友~~~