Overviewlinux
從宏觀的角度來看,一個packet從網卡到socket接收緩衝區的路徑以下所示:express
整個流程會在下文的各個章節中進行詳細的描述,而下文中的protocol layer會以IP和UDP做爲例子,可是其中的不少內容,對於其餘protocol layer都是通用的。api
Detailed Look緩存
本文將會以igb驅動程序做爲例子,並用它來控制一個比較常見的服務器網卡Intel I350。所以,咱們首先來看看igb設備驅動程序是怎麼工做的。服務器
Network Device Driver網絡
Initialization數據結構
驅動程序會利用module_init宏註冊一個初始化函數,當內核加載驅動程序時,該函數就會被調用。igb初始化函數(igb_init_module)和它利用module_init進行註冊的代碼以下:app
/** * igb_init_module - Driver Registration Routine * * igb_init_module is the first routine called when the driver is * loaded. All it does is register with the PCI subsystem. **/ static int __init igb_init_module(void) { int ret; pr_info("%s - version %s\n", igb_driver_string, igb_driver_version); pr_info("%s\n", igb_copyright); /* ... */ ret = pci_register_driver(&igb_driver); return ret; } module_init(igb_init_module);
其中初始化設備的大部分工做都是由pci_register_driver來完成的。咱們將在下面詳細介紹。負載均衡
PCI initializationsocket
Intel I350是一個PCI express設備。PCI設備經過PCI Configuration Space中的一些寄存器標識本身。
當一個設備驅動程序被編譯時,會用一個叫作MODULE_DEVICE_TABLE的宏來建立一個table,用該table來包含該設備驅動程序能夠控制的PCI設備的設備ID。接着這個table也會被註冊爲一個結構的一部分,咱們在下面立刻就能看到。
而內核最終將會使用這個table來決定該加載哪一個驅動程序來控制該設備。
操做系統就是這樣肯定哪一個設備和系統鏈接了,以及該使用哪一個驅動來和該設備進行交互。
static DEFINE_PCI_DEVICE_TABLE(igb_pci_tbl) = { { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_1GBPS) }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_SGMII) }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_2_5GBPS) }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I211_COPPER), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_FIBER), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SGMII), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER_FLASHLESS), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES_FLASHLESS), board_82575 }, /* ... */ }; MODULE_DEVICE_TABLE(pci, igb_pci_tbl);
從上文已知,在驅動的初始化函數中會調用pci_register_driver函數。
該函數會註冊一個盡是指針的結構,其中大多數的指針都是函數指針,不過包含PCI device ID的table一樣會被註冊。內核會利用這些驅動註冊的函數來啓動PCI設備。
static struct pci_driver igb_driver = { .name = igb_driver_name, .id_table = igb_pci_tbl, .probe = igb_probe, .remove = igb_remove, /* ... */ };
PCI probe
一旦一個設備經過它的PCI ID被識別,內核就會選擇合適的驅動程序來控制該設備。每個PCI設備驅動程序都在內核的PCI子系統中註冊了一個probe function。對於尚未驅動控制的設備,內核會調用該函數,直到和某個驅動程序相匹配。大多數驅動程序都有大量的代碼用來控制設備。具體的操做各個驅動也有所不一樣。可是一些典型的操做以下所示:
讓咱們來快速瀏覽一下,igd裏對應的igb_probe是如何完成上述操做的
A peek into PCI initialization
接下來的這些代碼取自igb_probe函數,主要用於一些基本的PCI配置
err = pci_enable_device_mem(pdev); /* ... */ err = dma_set_mask_and_coherent(&pdev->dev, DMA_BIT_MASK(64)); /* ... */ err = pci_request_selected_regions(pdev, pci_select_bars(pdev, IORESOURCE_MEM), igb_driver_name); pci_enable_pcie_error_reporting(pdev); pci_set_master(pdev); pci_save_state(pdev);
首先,設備會由pci_enable_device_mem初始化,若是該設備處於暫停狀態就會被喚醒,獲取內存資源以及其餘一些工做。接着會對DMA mask進行設置,由於該設備會讀寫64位的內存地址,所以dma_set_mask_and_coherent的參數爲DMA_BIT_MASK(64)。而後調用pci_request_selected_regions獲取內存,同時使能PCI Express Advanced Error Reporting功能,最終調用pci_set_master使能DMA而且調用pci_save_state保存PCI configuration space。
Network device initialization
igb_probe函數作了大量關於網絡設備初始化的工做。除了一些針對PCI的工做之外,它還須要作以下這些工做:
下面咱們對上述的每一部分進行詳細的分析。
struct net_device_ops
struct net_device_ops中包含許多函數指針指向一些網絡子系統用來操做設備的重要功能。咱們將在接下來的內容中屢次說起此結構。在igb_probe中net_device_ops結構將會和struct net_device綁定,代碼以下:
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent) { /* ... */ netdev->netdev_ops = &igb_netdev_ops;
而net_device_ops結構中的各個指針指向的函數也定義在同一個文件中:
static const struct net_device_ops igb_netdev_ops = { .ndo_open = igb_open, .ndo_stop = igb_close, .ndo_start_xmit = igb_xmit_frame, .ndo_get_stats64 = igb_get_stats64, .ndo_set_rx_mode = igb_set_rx_mode, .ndo_set_mac_address = igb_set_mac, .ndo_change_mtu = igb_change_mtu, .ndo_do_ioctl = igb_ioctl, /* ... */
咱們能夠看到,這個結構中包含不少有趣的字段,例如ndo_open,ndo_stop,ndo_start_xmit和ndo_get_stats64,他們都包含了igb驅動實現的對應函數的地址。咱們下面會對其中的某些內容作進一步的分析。
ethtool registration
ethtool是一個命令行工具,用來獲取和設置各類驅動和硬件相關的選項。一般會利用ethtool來從網絡設備收集一些詳細的數據。
ethtool經過ioctl系統調用和設備驅動程序交互。設備驅動程序註冊了一系列的函數用於ethtool的操做。當ethtool發出一個ioctl調用時,內核會找到對應驅動的ethtool結構而且執行相應的註冊函數。驅動的ethtool函數能夠作許多事情,包括修改驅動中一個簡單的falg,乃至經過寫設備的寄存器來調整真實設備。
igb驅動經過在igb_probe中調用igb_set_ethtool_ops來註冊ethtool的各個操做。
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent) { /* ... */ igb_set_ethtool_ops(netdev);
void igb_set_ethtool_ops(struct net_device *netdev) { SET_ETHTOOL_OPS(netdev, &igb_ethtool_ops); }
static const struct ethtool_ops igb_ethtool_ops = { .get_settings = igb_get_settings, .set_settings = igb_set_settings, .get_drvinfo = igb_get_drvinfo, .get_regs_len = igb_get_regs_len, .get_regs = igb_get_regs, /* ... */
每一個驅動都能本身決定哪些ethtool函數和本身有關而且決定實現其中的哪些。並非每一個驅動都須要實現全部的ethtool函數。其中一個比較有趣的ethtool函數是get_ethtool_stats,它會建立一些很是詳細的計數器進行追蹤,它們要麼位於驅動中,要麼位於設備內。
IRQs
當一個數據幀經過DMA被寫入RAM時,網卡是如何通知系統的其他部分,已經有數據能夠處理了呢?
通常網卡會產生一個interrupt request(IRQ)表示有數據到了。有如下三種IRQ類型:MSI-X,MSI和legacy IRQ。可是若是有大量的數據幀到達時,就會致使產生大量的IRQ。而產生的IRQ越多,那麼用於high level task,例如用戶進程的CPU時間就越少。
因而建立了New API(NAPI)這種機制,用於減小數據包的到來致使設備產生中斷的數目。儘管NAPI能夠減小IRQ的數目,可是並不能徹底避免。下面的章節會告訴咱們緣由。
NAPI
NAPI在許多方面和獲取數據傳統的方式不一樣。NAPI容許設備驅動程序註冊一個poll函數,NAPI子系統會調用它來獲取數據幀。
NAPI通常的使用方式以下:
上述這種收集數據的方式和傳統方式相比可以有效減小overhead,由於一次能處理不少數據,而不須要每一個數據幀產生一次IRQ。設備驅動程序實現了poll函數並經過調用netif_napi_add將它註冊到NAPI中。當經過netif_napi_add向NAPI註冊poll時,驅動同時會聲明一個weight,大多數驅動都會將它固定爲64。該值的意義將會在下文討論。
一般,驅動程序會在初始化的時候註冊他們的NAPI poll函數。
NAPI initialization in the igb driver
igb驅動經過以下一個長長的調用鏈來實現NAPI的初始化:
這個調用鏈會致使一些上層的事發生:
讓咱們來看一看igb_alloc_q_vector是如何註冊poll回調函數以及它的私有數據的
static int igb_alloc_q_vector(struct igb_adapter *adapter, int v_count, int v_idx, int txr_count, int txr_idx, int rxr_count, int rxr_idx) { /* ... */ /* allocate q_vector and rings */ q_vector = kzalloc(size, GFP_KERNEL); if (!q_vector) return -ENOMEM; /* initialize NAPI */ netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64); /* ... */
上述代碼爲receive queue分配了內存,而且向NAPI子系統註冊了igb_poll函數。其中的參數包含了新建立的receive queue相關的struct napi_struct的引用(&q_vector->napi)。當須要從receive queue中接收數據包時,NAPI子系統會將它傳輸給igb_poll函數。這對於咱們之後研究數據流從驅動發送網絡棧的過程是很是重要的。
Bring a network device up
回憶一下以前說過的net_device_ops結構,它註冊了一系列函數用於啓動設備,傳輸包,設置mac地址等等。當一個網絡設備被啓動時(好比,調用ifconfig eth0 up) ,和net_device_ops結構中的ndo_open域相關的函數就會被調用。
ndo_open函數會作以下操做:
在igb驅動中,和net_device_ops結構中的ndo_open相關的函數爲igb_open。
Preparing to receive data from the network
如今大多數的網卡都利用DMA直接將數據寫入RAM,從而讓操做系統能直接獲取數據進行處理。許多網卡爲此使用的數據結構相似於建立在環形緩衝區上的隊列。爲了實現DMA,設備驅動程序必須和操做系統合做,保留一些內存可供網卡使用。一旦區域肯定,網卡會獲得關於通知,而且會將收到的數據都寫入其中。以後這些數據會被取出並交由網絡子系統處理。
這些都很是簡單,可是若是數據包到達的過快,單個CPU不能很好地處理全部的數據包怎麼辦?由於該數據結構是基於一個固定大小的內存區域,所以接收到的包將會被丟棄。這個時候,像Receive Side Scaling(RSS)或multiqueue就能派上用場了。有的設備有能力同時將包寫入不一樣的RAM,每一個區域都是一個單獨的隊列。這就容許操做系統在硬件層面使用多個CPU並行處理獲取的數據。可是這個特性並非被全部網卡支持的。不過Intel I350是支持multiple queue的。咱們能夠看到在igb驅動中,它在啓動時就是調用一個叫igb_setup_all_rx_resources的函數。而它又會調用另外一個函數,igb_setup_rx_resources,用於讓receive queue處理DMA內存。事實上,receive queue的數目和長度能夠經過ethtool進行調整。對該這些值進行調整,咱們能夠看到對於已處理包的數目和已丟棄包數目比例的影響。
網卡通常使用基於packet header field(源地址,目的地址,端口)的哈希函數來肯定某個包該發往哪一個receive queue。有的網卡還容許你調整某些receive queue的權重,從而讓某些特定的隊列處理更多的流量。還有的網卡甚至容許你調整哈希函數。這樣的話,你能夠將特定的數據流發往特定的receive queue進行處理,甚至在硬件層面就將包丟棄。接下來咱們很快會看到如何對這些設置進行調整。
Enable NAPI
當打開一個設備時,驅動一般會使能NAPI。以前咱們看到了驅動是如何註冊NAPI的poll函數的,可是直到打開設備前,NAPI都不是使能的。使能NAPI其實很是簡單直接,調用napi_enable翻轉struct napi_struct 中的一個位就表示NAPI使能了。如上所述,儘管NAPI使能了,可是它仍然可能處於關閉狀態。在igb driver中,每一個q_vector的NAPI都會在設備打開或者利用ethtool改變隊列的數目或大小時使能。
for (i = 0; i < adapter->num_q_vectors; i++) napi_enable(&(adapter->q_vector[i]->napi));
Register an interrupt handler
使能了NAPI以後,下一步就是註冊一個interrupt handler。如今有好幾種方式用於發生一箇中斷:MSI-X,MSI以及legacy interrupts。所以,這一部分的代碼對於不一樣的驅動都是不一樣的,這取決於特定的硬件支持哪一種中斷方式。驅動必須肯定設備支持哪一種中斷方式,而且註冊合適的處理方法,從而能在中斷髮生時進行處理。有些驅動,例如igb會爲每種方法註冊一個interrupt handler,一種方法失敗就換另外一種。對於支持multiple receive queue的網卡來講,MSI-X是更好的方法。這樣的話,每一個receive queue都有本身的hardware interrupt,從而能被特定的CPU處理(經過irqbalance或修改/proc/irq/IRQ_NUMBER/smp_affinity)。咱們很快就能看到,處理中斷的CPU也將是對包進行處理的CPU。這樣一來,收到的包就能從hardware interrupt開始直到整個網絡棧都由不一樣的CPU處理。
若是MSI-X不能用,MSI仍然要優於legacy interrupts。在igb驅動中,函數igb_msix_ring,igb_intr_msi和igb_intr分別是MSI-X,MSI和legacy interrupt對應的interrupt handler。
static int igb_request_irq(struct igb_adapter *adapter) { struct net_device *netdev = adapter->netdev; struct pci_dev *pdev = adapter->pdev; int err = 0; if (adapter->msix_entries) { err = igb_request_msix(adapter); if (!err) goto request_done; /* fall back to MSI */ /* ... */ } /* ... */ if (adapter->flags & IGB_FLAG_HAS_MSI) { err = request_irq(pdev->irq, igb_intr_msi, 0, netdev->name, adapter); if (!err) goto request_done; /* fall back to legacy interrupts */ /* ... */ } err = request_irq(pdev->irq, igb_intr, IRQF_SHARED, netdev->name, adapter); if (err) dev_err(&pdev->dev, "Error %d getting interrupt\n", err); request_done: return err; }
從上面的代碼咱們能夠看到,驅動會首先嚐試利用igb_request_msix設置MSI-X的interrupt handler,若是失敗的話,進入MSI。request_irq用於註冊MSI的interrupt handler,igb_intr_mis。若是這也失敗了,則會進入legacy interrupts。這個時候會再次使用request_irq註冊legacy interrupt的interrupt handler,igb_intr。igb的驅動就是這樣註冊一個函數用於處理,當網卡發出中斷說明有數據到達並已經準備好接受處理了。
Enable Interrupts
到如今爲止,基本上全部事情都設置完畢了。惟一剩下的就是打開中斷而且等待數據的到來。打開中斷對於每一個設備都是不同的,對於igb驅動,它是在__igb_open中經過調用igb_irq_enable完成的。通常,打開中斷都是經過寫設備的寄存器完成的:
static void igb_irq_enable(struct igb_adapter *adapter) { /* ... */ wr32(E1000_IMS, IMS_ENABLE_MASK | E1000_IMS_DRSTA); wr32(E1000_IAM, IMS_ENABLE_MASK | E1000_IMS_DRSTA); /* ... */ }
The network device is now up
驅動可能還須要作另一些事,例如啓動定時器,work queue,或者其餘硬件相關的設置。一旦這些都完成了,那麼設備就已經啓動並準備好投入使用了。
SoftIRQs
在深刻網絡棧以前,咱們先要了解一下Linux內核中一個叫作SoftIRQ的東西
What is a softirq
Linux內核中的softirq system是一種可以讓代碼到interrupt handler上下文以外執行的一種機制。它很是重要,由於在幾乎全部的interrupt handler的執行過程當中,hardware interrupts都是關閉的。而中斷關閉的時間越長,那麼就越有可能丟失某些event。所以咱們能夠把一些執行時間較長的代碼放到interrupt handler以外執行,這樣就能讓它快點完成從而恢復中斷。在內核中,還有其餘的機制可以延遲代碼的執行,可是對於網絡棧來講,咱們選擇softirqs。
softirq system能夠被當作是一系列的kernel thread(每一個CPU一個),它們會對不一樣的softirq event運行不一樣的處理函數。若是你觀察過top命令的輸出,而且在一系列的kernel threads中看到了一個ksoftirqd/0,那麼它就是運行在CPU 0上的一個softirq kernel thread。
內核子系統能夠經過運行open_softirq函數來註冊一個softirq handler。咱們下面將看到的是網絡子系統如何註冊它的softirq handlers。如今,咱們先來學習一下softirq是如何工做的。
ksoftirqd
由於softirq對於推遲設備驅動工做的執行太太重要了,你能夠想象,它必定在內核整個生命週期中很早的時候就開始執行了。下面咱們來看看ksoftirqd系統是如何初始化的:
static struct smp_hotplug_thread softirq_threads = { .store = &ksoftirqd, .thread_should_run = ksoftirqd_should_run, .thread_fn = run_ksoftirqd, .thread_comm = "ksoftirqd/%u", }; static __init int spawn_ksoftirqd(void) { register_cpu_notifier(&cpu_nfb); BUG_ON(smpboot_register_percpu_thread(&softirq_threads)); return 0; } early_initcall(spawn_ksoftirqd);
你能夠看到上面struct smp_hotplug_thread的定義,其中註冊了兩個函數指針:ksoftirqd_should_run和run_softirqd。這兩個函數都會在kernel/smpboot.c中被調用,用來構成一個event loop。kernel/smpboot.c中的代碼首先會調用ksoftirqd_should_run來肯定是否還有pending softirq,若是有的話,就執行run_softirqd。run_ksoftirqd會在調用__do_softirq以前執行一些minor bookkeeping。
__do_softirq
__do_softirq函數主要作如下這些事:
所以,如今你看CPU的使用圖,其中的softirq或si表明的就是用於這些deferred work所需的時間。
Linux network device subsysem
既然咱們已經大概瞭解了網卡驅動和softirq是如何工做的,接下來咱們來看看Linux network device subsystem是如何初始化的。接着咱們將追蹤一個包從它到達網卡以後所走過的整條路徑。
Initialization of network device system
network device (netdev) subsystem是在函數net_dev_init中初始化的。有許多有趣的事情在這個初始化函數中發生。
Initialization of struct softnet_data structures
net_dev_init會爲每一個CPU都建立一個struct softnet_data。這個結構會包含不少指針用於處理網絡數據:
其中的每一部分咱們都會在下文中詳細敘述。
Initialization of softirq handlers
net_dev_init註冊了一個receive softirq handler和transmit softirq handler分佈用於處理輸入和輸出的數據。代碼以下:
static int __init net_dev_init(void) { /* ... */ open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); /* ... */ }
咱們很快就能看到驅動的interrupt handler是如何觸發NET_RX_SOFTIRQ的net_rx_action函數的
Data arrives
終於,數據來了!
假設receive queue有足夠的descriptors,packet會直接經過DMA寫入RAM。以後設備就會產生一個相應的中斷(或者在MSI-X中,是packet到達的receive queue對應的中斷)
Interrupt handler
通常來講,當一箇中斷對應的interrupt handler運行時,它應該將盡可能多的工做都放到中斷上下文以外進行。這很是重要,由於在一箇中斷執行的過程當中,其餘中斷都阻塞了。讓咱們來看看MSI-X interrupt handler的源碼,它能很好地解釋,爲何interrupt handler應該儘量地少作工做。
static irqreturn_t igb_msix_ring(int irq, void *data) { struct igb_q_vector *q_vector = data; /* Write the ITR value calculated from the previous interrupt. */ igb_write_itr(q_vector); napi_schedule(&q_vector->napi); return IRQ_HANDLED; }
這個interrupt handler很是短,在返回以前僅僅作了兩個很快的操做。首先,它調用了igb_write_itr,更新了一下硬件相關的寄存器。在這個例子中,被更新的寄存器是用於追蹤hardware interrupt到達速率的。這個寄存器一般和一個叫"Interrupt Throttling"(或者叫"Interrupt Coalescing")的硬件特性相結合,它用來調整中斷髮往CPU的速率。咱們很快能夠看到ethtool提供了一種機制,可以調節IRQ發生的速率。
接着調用napi_schedule用來喚醒NAPI processing loop(若是它不在運行的話)。注意的是NAPI processing loop是在softirq運行的,而不是在interrupt handler中。interrupt handler只是簡單地讓它開始執行,若是它沒有準備好的話。
真正的代碼會展示這些工做會是多麼重要,它會幫助咱們理解網絡數據是如何在多CPU系統中處理的。
NAPI and napi_schedule
讓咱們來看看hardware interrupt handler中調用的napi_schedule是如何工做的。
要記住,NAPI存在的目的就是在不須要網卡發送中斷,表示已經有數據能夠準備處理的狀況下也能接收數據。如上文所述,NAPI的poll loop會在收到一個hardware interrupt後生成。換句話說:NAPI是使能的,可是處於關閉狀態,直到網卡產生一個IRQ表示第一個packet到達,NAPI纔算打開。固然還有其餘一些狀況,咱們很快就能看到,NAPI會被關閉,直到一個hardware interrupt讓它從新開啓。
NAPI poll loop會在驅動的interrupt handler調用napi_schedule後啓動。不過napi_schedule只是一個包裝函數,它直接調用了__napi_schedule
/** * __napi_schedule - schedule for receive * @n: entry to schedule * * The entry's receive function will be scheduled to run */ void __napi_schedule(struct napi_struct *n) { unsigned long flags; local_irq_save(flags); ____napi_schedule(&__get_cpu_var(softnet_data), n); local_irq_restore(flags); } EXPORT_SYMBOL(__napi_schedule);
該代碼調用__get_cpu_var獲取當前運行的CPU的softnet_data結構。接着softnet_data結構和struct napi_struct結構會被傳輸給__napi_schedule。
/* Called with irq disabled */ static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi) { list_add_tail(&napi->poll_list, &sd->poll_list); __raise_softirq_irqoff(NET_RX_SOFTIRQ); }
上面的代碼主要作了兩件事:
咱們很快就能看到,softirq的處理函數net_rx_action會調用NAPI的poll函數用於獲取數據
A note about CPU and network data processing
須要注意的是到目前爲止咱們見到的全部把任務從hardware interrupt handler推遲到softirq的代碼使用的結構都是和當前CPU相關的。儘管驅動的IRQ handler只作不多的工做,可是softirq handler會和驅動的IRQ handler在同一個CPU上執行。
這就是爲何IRQ會由哪一個CPU處理很重要了,由於該CPU不只會用於執行驅動的interrupt handler,還會經過對應的NAPI在softirq中獲取數據。
咱們接下去將會看到,像Receive Packet Steering這樣的機制會將其中的一些工做分發到其餘CPU上去。
Network data processing begins
一旦softirq的代碼知道了是哪一個softirq被掛起了,它就會開始執行,而且調用net_rx_action,這個時候網絡數據的處理就開始了。讓咱們來看看net_rx_action的processing loop的各個部分,瞭解一下它是如何工做的。
net_rx_action processing loop
net_rx_action從被DAM寫入的packet所在的內存開始處理。該函數會遍歷在當前CPU上排隊的NAPI結構,依次取下每一個結構並進行處理。processing loop指定了NAPI的poll函數所能進行的工做量以及消耗的工做時間。它經過以下兩種方式實現:
while (!list_empty(&sd->poll_list)) { struct napi_struct *n; int work, weight; /* If softirq window is exhausted then punt. * Allow this to run for 2 jiffies since which will allow * an average latency of 1.5/HZ. */ if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) goto softnet_break;
這就是內核如何防止packet processing一直獨佔整個CPU。其中的budget是該CPU上每一個NAPI結構的預算的總和。這就是爲何multiqueue網卡要當心地調節IRQ affinity的緣由。咱們直到,處理從設備發出的IRQ的CPU也會被用來處理對應的softirq handler,所以也會成爲處理上述循環和budget computation的CPU。
有着multiqueue網卡的系統可能會出現這種狀況,多個NAPI結構被註冊到了同一個CPU上。全部的NAPI結構的處理都會消耗同一個CPU的budget。
若是你沒有足夠的CPU去分發網卡的IRQ,你能夠考慮增長net_rx_action的budget從而容許每一個CPU能處理更多的packet。增長budget會增長CPU的使用率,可是能夠減少延時,由於數據處理地更及時(可是CPU的處理時間仍然是2 jiffies,無論budget是多少)。
NAPI poll function and weight
咱們已經知道網卡驅動調用netif_napi_add註冊poll函數。在上文中咱們已經看到,igb驅動中有以下這段代碼:
/* initialize NAPI */ netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64);
它給NAPI結構的weight賦值爲64。咱們如今就會看到它是如何在net_rx_action processing loop中使用的。
weight = n->weight; work = 0; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); trace_napi_poll(n); } WARN_ON_ONCE(work > weight); budget -= work;
首先從NAPI結構中獲取weight(此處是64),而後將它傳遞給一樣註冊到NAPI結構中的poll函數(此處爲igb_poll)。poll函數會返回已經被處理的幀數,並保存在work中,以後它將從budget中減去。所以,假設:
你的系統將會在以下任意一種狀況發生時,中止處理數據;
The NAPI / network device driver contract
NAPI子系統和設備驅動的交互中還未說起的一部分就是關閉NAPI的條件,包含的內容以下:
咱們先來看看net_rx_action如何處理第一種狀況
Finishing the net_rx_action loop
/* Drivers must not modify the NAPI state if they * consume the entire weight. In such cases this code * still "owns" the NAPI instance and therefore can * move the instance around on the list at-will. */ if (unlikely(work == weight)) { if (unlikely(napi_disable_pending(n))) { local_irq_enable(); napi_complete(n); local_irq_disable(); } else { if (n->gro_list) { /* flush too old packets * If HZ < 1000, flush all packets. */ local_irq_enable(); napi_gro_flush(n, HZ >= 1000); local_irq_disable(); } list_move_tail(&n->poll_list, &sd->poll_list); } }
若是全部的work都被消耗完了,net_rx_action須要處理如下兩種狀況:
這就是packet processing loop如何調用驅動註冊的poll函數來處理數據。咱們很快將會看到,poll函數將會獲取數據並將它傳遞到協議棧進行處理。
Exiting the loop when limits are reached
當下列狀況發生時,net_rx_action的循環將會退出:
/* If softirq window is exhausted then punt. * Allow this to run for 2 jiffies since which will allow * an average latency of 1.5/HZ. */ if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) goto softnet_break;
softnet_break: sd->time_squeeze++; __raise_softirq_irqoff(NET_RX_SOFTIRQ); goto out;
struct softnet_data結構中的某些統計數據增長了而且softirq的NET_RX_SOFTIRQ被關閉了。其中的time_squeeze域是用來測量這樣一個數據:net_rx_action還有不少工做要作,可是要麼由於budget用完了,要麼超時了,此類狀況發生的次數。這些統計數據對於瞭解網絡的瓶頸是很是有用的。NET_RX_SOFTIRQ被關閉從而能給其餘任務騰出時間。這一小段代碼的意義是,儘管還有不少工做要作,可是咱們不想再獨佔CPU了,
執行流接着被傳遞給了out。當沒有更多的NAPI結構須要處理,換句話說,budget比network activity更多,全部的驅動都已經關閉了NAPI,net_rx_action無事可作的時候,也會運行到out。
out段代碼在從net_rx_action返回以前作了一件重要的事情:調用net_rps_action_and_irq_enable。它在Receive Packet Steering使能的狀況下有着重要的做用;它會喚醒遠程的CPU用於處理網絡數據。
咱們將在以後更多地瞭解RPS是如何工做的。如今讓咱們先走進NAPI poll函數的內部,這樣咱們就能向上進入網絡棧了。
NAPI poll
咱們已經知道設備驅動程序申請了一塊內存用於讓設備DMA到達packet。驅動有責任申請這些區域,一樣也有責任unmap those regions,獲取其中的數據而且將它發往網絡棧。讓咱們經過觀察igb driver是如何完成這些工做的,從而瞭解在實際過程當中這些步驟是如何完成的。
igb_poll
/** * igb_poll - NAPI Rx polling callback * @napi: napi polling structure * @budget: count of how many packets we should handle **/ static int igb_poll(struct napi_struct *napi, int budget) { struct igb_q_vector *q_vector = container_of(napi, struct igb_q_vector, napi); bool clean_complete = true; #ifdef CONFIG_IGB_DCA if (q_vector->adapter->flags & IGB_FLAG_DCA_ENABLED) igb_update_dca(q_vector); #endif /* ... */ if (q_vector->rx.ring) clean_complete &= igb_clean_rx_irq(q_vector, budget); /* If all work not completed, return budget and keep polling */ if (!clean_complete) return budget; /* If not enough Rx work done, exit the polling mode */ napi_complete(napi); igb_ring_irq_enable(q_vector); return 0; }
上述代碼幹了以下這些有趣的事情:
讓咱們來看看igb_clean_rx_irq是如何將數據送往協議棧的
igb_clean_rx_irq
igb_clean_rx_irq函數是一個循環,它一次處理一個packet直到到達budget或者沒有多餘的數據須要處理了。
函數中的循環幹了以下這些很是重要的事情:
IGB_RX_BUFFER_WRITE(16)
一旦循環結束,函數會將收到的packet數和字節數加到統計數據中。
接着咱們首先來聊一聊Generic Receive Offloading(GRO),以後再進入函數napi_gro_receive
Generic Receive Offloading(GRO)
Generic Receive Offloading(GRO)是硬件層面的優化Large Receive Offloading(LRO)的軟件實現。這兩種方法的核心思想都是經過將"相似"的包組合起來以減小傳輸給網絡棧的包的數量,從而減小CPU的使用。例如咱們要傳輸一個大文件,其中有許多包都包含的都是文件中的數據塊。顯然,咱們能夠不用每次都將一個small packet發往網絡棧,而是將這些包組合起來,增大負載,最後讓這個組合起來的包發往協議棧。這就可讓協議層只處理一個包的頭部,就能傳輸更多的數據到用戶空間。
可是這類優化的最大問題就是,信息丟失。若是一個packet中設置了一些重要的選項或者標誌,若是將這個包和其餘包合併,這些選項或者標誌就會丟失。這也就是爲何不少人都不建議使用LRO的緣由。事實上,LRO對於合併包的規則的定義是很是寬鬆的。
GRO做爲LRO的硬件實現被引入,可是對於哪些包能夠組合有着更爲嚴格的規則
若是你有使用過tcpdump而且看到了一些大的難以想象的包,那麼頗有可能你的系統已經打開了GRO。你很快就能看到,抓包工具進行抓包的位置是在GRO發生以後,在協議棧的更上層。
napi_gro_receive
函數napi_gro_receive用於處理網絡數據的GRO操做(若是GRO打開的話)並將數據傳送到協議棧。而一個叫作dev_gro_receive的函數處理了其中的大部分邏輯
dev_gro_receive
這個函數首先檢查GRO是否打開,若是打開的話,則準備進行GRO操做。當GRO打開時,首先會遍歷一系列的GRO offload filter從而讓上層的協議棧對要進行GRO的數據進行處理。這樣協議層就能讓設備層知道,該packet是否屬於正在處理的network flow以及處理一些對於GRO所須要作的特定於協議的事情。例如,TCP協議須要知道是否或者什麼時候須要給一個已經組合到現有packet的packet發送ACK
list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || !ptype->callbacks.gro_receive) continue; skb_set_network_header(skb, skb_gro_offset(skb)); skb_reset_mac_len(skb); NAPI_GRO_CB(skb)->same_flow = 0; NAPI_GRO_CB(skb)->flush = 0; NAPI_GRO_CB(skb)->free = 0; pp = ptype->callbacks.gro_receive(&napi->gro_list, skb); break; }
若是協議層認爲是時候清除GRO packet了,以後就會調用napi_gro_complete進行處理,以後它就會調用協議層對應的gro_complete,最後再調用netif_receive_skb將包傳送給網絡棧
if (pp) { struct sk_buff *nskb = *pp; *pp = nskb->next; nskb->next = NULL; napi_gro_complete(nskb); napi->gro_count--; }
若是協議層將packet合併進existing flow,napi_gro_receive就會直接返回。若是packet沒有被合併,而且如今的GRO flow小於MAX_GRO_SKBS,以後就會在該NAPI的gro_list新增一個條目
if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS) goto normal; napi->gro_count++; NAPI_GRO_CB(skb)->count = 1; NAPI_GRO_CB(skb)->age = jiffies; skb_shinfo(skb)->gso_size = skb_gro_len(skb); skb->next = napi->gro_list; napi->gro_list = skb; ret = GRO_HELD;
Linux網絡棧中的GRO系統就是這樣工做的
napi_skb_finish
一旦dev_gro_receive運行完成,napi_skb_finish就會被調用,要不就是釋放由於包已經被合併就沒用了的數據結構,要麼調用netif_receive_skb將數據傳輸給網絡棧(由於如今已經有MAX_GRO_SKBS個flow了)。如今是時候看看netif_receive_skb是如何將數據傳輸給協議層了。可是在此以前,咱們先來看看什麼是Receive Packet Steering(RPS)
Receive Packet Steering(RPS)
咱們已經知道每一個網絡設備驅動都註冊了一個NAPI poll函數。每一個NAPI poller實例都執行在每一個CPU的softirq上下文中。而處理驅動的IRQ handler的CPU會喚醒它的softirq processing loop去處理包。換句話說:處理硬件中斷的CPU也會用於poll相應的輸入數據。
有的硬件(例如Intel I350)在硬件層面支持multiple queue。這意味着輸入的數據會被分流到不一樣的receive queue,並被DMA到不一樣的內存區域,從而會有不一樣的NAPI結構處理對應的區域。從而能讓多個CPU並行地處理來自設備的中斷而且對數據進行處理。
這個特性咱們就稱做Receive Side Scaling(RSS)
而Receive Packet Steering(RPS)是RSS的軟件實現。由於它是由軟件實現的,所以它能夠用於任何網卡,即便是那些只有一個receive queue的網卡。然而,一樣由於是軟件層面的實現,RPS只能在包從DMA內存區域中取出以後,才能對它進行處理。這意味着,你並不會看到CPU使用在處理IRQ或者NAPI poll loop的時間降低,可是你能夠從獲取到包以後,對它進行負載均衡,而且今後處開始,到協議層向上減小CPU時間。
RPS經過對輸入的數據計算出一個哈希值肯定該由哪一個CPU對其進行處理。以後,該數據會被排入每一個CPU的receive network backlog等待處理。一個Inter-processor Interrupt(IPI)會被髮往擁有該backlog的CPU。這會幫助觸發backlog的處理,若是它當前仍未進行處理的話。/proc/net/softnet_stat中包含了每個softnet_data中接收到的IPI的次數
所以,netif_receive_skb要麼會接着將數據送往網絡棧,要麼就會經過RPS將它發往其餘CPU進行處理
Receive Flow Steering(RFS)
Receive Flow Steering(RFS)一般會和RPS混合使用。RPS會將輸入數據在多個CPU之間進行負載均衡,可是它並不會考慮局部性從而最大化CPU cache的命中率。你可使用RFS將屬於同一個flow的數據送往同一個CPU處理,從而提升cache命中率。
Hardware accelerated Receive Flow Steering(aRFS)
RFS可使用hardware acceleration來加速。網卡和內核能夠聯合起來,共同決定哪一個flow須要發往哪一個CPU進行處理。爲了使用這一特性,你的網卡和驅動必須對它支持。若是你的網卡驅動有一個叫作ndo_rx_flow_steer的函數,那麼該驅動支持accelerated RFS。
Moving up the network stack with netif_receive_skb
netif_receive_skb會在如下兩個地方被調用:
須要注意的是netif_receive_skb以及它後續調用的函數都是在softirq processing loop的上下文中進行的。netif_receive_skb首先檢查一個sysctl的值用來確認用戶是否要求在packet進入backlog queue以前或以後加入receive timestamp。若是有設置的話,如今就對該數據進行timestamp,在進行RPS以前。若是未被設置,則會在它加入隊列以後在打timestamp。這能夠將timestamp形成的負載在多個CPU間進行均衡,不過一樣會引入延遲
netif_receive_skb
當timestamp被處理完以後,netif_receive_skb會根據RPS是否可用進行不一樣的操做。讓咱們先從最簡單的開始:RPS不可用
Without RPS(default setting)
若是RPS不可用,首先會調用__netif_receive_skb作一些bookkeeping接着再調用__netif_receive_skb_core將數據移往協議棧。咱們很快就會看到__netif_receive_skb_core是如何工做的,不過在此以前,咱們先來看看RPS可用時的傳輸路徑是怎樣的,由於該代碼一樣會調用__netif_receive_skb_core。
With RPS enabled
若是RPS可用的話,在timestamp選項被處理完以後,netif_receive_skb會進行一些計算用於決定該使用哪一個CPU的backlog queue。這是經過函數get_rps_cpu完成的
cpu = get_rps_cpu(skb->dev, skb, &rflow); if (cpu >= 0) { ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); rcu_read_unlock(); return ret; }
get_rps_cpu會將上文所述的RFS和aRFS都考慮在內,並調用enqueue_to_backlog將數據加入相應的CPU的backlog queue
enqueue_to_backlog
該函數首先獲取遠程CPU的softnet_data結構的指針,其中包含了一個指向input_pkt_queue的指針。接着,獲取遠程CPU的input_pkt_queue的隊列長度
qlen = skb_queue_len(&sd->input_pkt_queue); if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
input_pkt_queue的長度首先和netdev_max_backlog相比較。若是隊列的長度大於該值,則數據被丟棄。一樣flow limit也會被檢查,若是超過了,數據一樣會被丟棄。這兩種狀況下,softnet_data節後中丟棄包的數目都將增長。注意這裏的softnet_data是數據將要發往的CPU的。
enqueue_to_backlog並不會在不少地方被調用。它只會在RPS可用的包處理過程當中或者netif_rx中。許多驅動不該該使用netif_rx,而應該使用netif_receive_skb。若是你不使用RPS或者你的驅動不使用netif_rx,那麼增長backlog不會對你的系統產生任何影響,由於它根本就沒被用到。(若是你的驅動使用netif_receive_skb而且未使用RPS,那麼增長netdev_max_backlog不會產生任何性能上的提升,由於沒有數據會被加入到input_pkt_queue中)
若是input_pkt_queue足夠小,而也沒有超過flow limit,數據就會被加入隊列。大概的邏輯以下:
if (skb_queue_len(&sd->input_pkt_queue)) { enqueue: __skb_queue_tail(&sd->input_pkt_queue, skb); input_queue_tail_incr_save(sd, qtail); rps_unlock(sd); local_irq_restore(flags); return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device * We can use non atomic operation since we own the queue lock */ if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) { if (!rps_ipi_queued(sd)) ____napi_schedule(sd, &sd->backlog); } goto enqueue;
Flow limits
RPS會將包分發到多個CPU進行處理,不過一個large flow極可能會佔據整個CPU,從而讓其餘small flow處於飢餓狀態。flow limit可以讓每一個flow添加到backlog中的包的數目有一個最大值。這個特性能夠幫助small flow一樣可以獲得處理,即便有larger flow的包也在入隊
backlog queue NAPI poller
每一個CPU的backlog queue以和設備驅動程序同樣的方式插入NAPI。一個poll函數用於處理來自softirq上下文的包,一樣還提供了一個weight。這個NAPI結構在網絡系統初始化的時候被處理:
sd->backlog.poll = process_backlog; sd->backlog.weight = weight_p; sd->backlog.gro_list = NULL; sd->backlog.gro_count = 0;
backlog的NAPI結構和驅動的NAPI有所不一樣,它的weight參數是能夠調節的,而驅動程序則會將它們的NAPI weight硬編碼爲64。
process_backlog
process_backlog函數是一個循環,直到它的weight耗盡或者backlog中沒有其餘數據須要處理。每個在backlog中的數據都將從backlog queue傳輸到__netif_receive_skb。一旦數據到達__netif_receive_skb以後,它的傳輸路徑就和RPS不可用時同樣了。__netif_receive_skb只是在調用__netif_receive_skb_core將數據傳輸到協議棧以前作一些bookkeeping。
process_backlog和驅動程序使用NAPI的方式相同:若是weight沒用完,那麼關閉NAPI。而poller在enqueue_to_backlog調用__napi_schedule以後被從新啓動。
該函數會返回已經完成的工做量,net_rx_action會將它從budget中減去
__netif_receive_skb_core delivers data to packet taps and protocol layer
__netif_receive_skb_core用於完成將數據傳往網絡棧的工做。在此以前,它先確認是否安裝了packet taps用來抓取輸入的包。其中一個例子就是libcap使用的AF_PACKET address family。若是有這樣的tap存在,則數據先被髮往tap,在被髮往協議層。
Packet tap delivery
若是安裝了packet tap,則包將安裝如下代碼被髮送:
list_for_each_entry_rcu(ptype, &ptype_all, list) { if (!ptype->dev || ptype->dev == skb->dev) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } }
Protocol layer delivery
一旦tap處理完成以後,__netif_receive_skb_core會將數據發往協議層。先從數據中獲取protocol field,而後再遍歷一系列該協議類型對應的deliver functions
type = skb->protocol; list_for_each_entry_rcu(ptype, &ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) { if (ptype->type == type && (ptype->dev == null_or_dev || ptype->dev == skb->dev || ptype->dev == orig_dev)) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } }
上文中的ptype_base是一個以下所示的哈希表:
struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;
每一個協議層都會在哈希表給定的slot中加入一個filter,經過以下的ptype_head函數計算:
static inline struct list_head *ptype_head(const struct packet_type *pt) { if (pt->type == htons(ETH_P_ALL)) return &ptype_all; else return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK]; }
將filter加入list的操做是由dev_add_pack完成的。這就是協議層如何註冊本身,從而獲取發往它們的數據的方法。
如今咱們就知道了數據如何從網卡發往協議層
Protocol layer registration
如今咱們已經知道了數據如何從網絡設備發往協議棧,下面咱們就來看看協議層是如何註冊本身的。
IP protocol layer
IP協議層會先把本身註冊到ptype_base這個哈希表中,從而讓數據可以從網絡設備發往它
dev_add_pack(&ip_packet_type);
static struct packet_type ip_packet_type __read_mostly = { .type = cpu_to_be16(ETH_P_IP), .func = ip_rcv, };
__netif_receive_skb_core會調用deliver_skb,而它最終會調用func(在這裏,即爲ip_rcv)
ip_rcv
ip_rcv的操做很是直接,首先對數據進行檢查,而後更新一些統計數據。ip_rcv會最終經過netfilter將packet發往ip_rcv_finish,從而讓那些iptables中匹配IP協議層的規則可以對數據進行處理
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish);
須要注意的是,若是你有很是多,很是複雜的netfilter或者iptables規則,這些規則都會在softirq上下文中執行,從而致使網絡棧的延時,而這每每是不可避免的。
ip_rcv_finish
當netfilter並無把包丟棄時,就會調用ip_rcv_finish。ip_rcv_finish開始就有一個優化,爲了將包傳輸到合適的地方,首先要從路由系統中獲取dst_entry。所以,首先須要調用該數據發往的高層協議的early_demux。early_demux首先會判斷是否有dst_entry緩存在socket結構中
if (sysctl_ip_early_demux && !skb_dst(skb) && skb->sk == NULL) { const struct net_protocol *ipprot; int protocol = iph->protocol; ipprot = rcu_dereference(inet_protos[protocol]); if (ipprot && ipprot->early_demux) { ipprot->early_demux(skb); /* must reload iph, skb->head might have changed */ iph = ip_hdr(skb); } }
咱們能夠看到這部分代碼是由sysctl_ip_early_demux控制的。early_demux默認是打開的。若是該優化是打開的,而且沒有cached entry(由於這是第一個到達的packet),則這個packet會被髮往路由系統,在那可以獲取dst_entry。
一旦路由系統工做完畢以後,就會更新計數器,而後再調用dst_input(skb),它轉而會調用剛剛獲取的dst_entry結構中的input function pointer。
若是packet的最終目的地是本地,那麼路由系統就會將ip_local_deliver賦值給dst_entry中的input function pointer。
ip_local_deliver
/* * Deliver IP Packets to the higher protocol layers. */ int ip_local_deliver(struct sk_buff *skb) { /* * Reassemble IP fragments. */ if (ip_is_fragment(ip_hdr(skb))) { if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER)) return 0; } return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL, ip_local_deliver_finish); }
和ip_rcv_finish相似,netfilter會先對packet進行檢查,若未被丟棄,則調用ip_local_deliver_finish
ip_local_deliver_finish
ip_local_deliver_finish從packet中獲取protocol,而後查詢該protocol註冊的net_protocol結構,接着再調用該net_protocol結構中的handler函數指針。這就將packet發往更高的協議層了。
Higher level protocol registration
本篇文章主要分析UDP,可是TCP protocol handler和UDP protocol handler的註冊方式是相同的。在net/ipv4/af_inet.c中的函數定義包含了用於UDP,TCP和ICMP協議和IP協議層進行鏈接的處理函數
static const struct net_protocol tcp_protocol = { .early_demux = tcp_v4_early_demux, .handler = tcp_v4_rcv, .err_handler = tcp_v4_err, .no_policy = 1, .netns_ok = 1, }; static const struct net_protocol udp_protocol = { .early_demux = udp_v4_early_demux, .handler = udp_rcv, .err_handler = udp_err, .no_policy = 1, .netns_ok = 1, }; static const struct net_protocol icmp_protocol = { .handler = icmp_rcv, .err_handler = icmp_err, .no_policy = 1, .netns_ok = 1, };
這些結構都在inet address family的初始化代碼中被註冊
/* * Add all the base protocols. */ if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0) pr_crit("%s: Cannot add ICMP protocol\n", __func__); if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0) pr_crit("%s: Cannot add UDP protocol\n", __func__); if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0) pr_crit("%s: Cannot add TCP protocol\n", __func__);
咱們關注的是UDP協議層,所以對應的處理函數是udp_rcv。這就是數據從IP層通往UDP層的入口
UDP protocol layer
udp_rcv
udp_rcv函數只有一行代碼用於直接調用__udp4_lib_rcv用於接收數據
__udp4_lib_rcv
__udp4_lib_rcv函數會檢查packet是否合法,接着再獲取UDP header,UDP數據報長度,源地址,目的地址,而後是一些完整性檢查和checksum verification。
以前在IP層的時候,咱們已經看到在將包傳送到上層協議以前會將dst_entry和packet相綁定。若是socket和對應的dst_entry已經找到了,那麼__udp4_lib_rcv會將包存入socket:
sk = skb_steal_sock(skb); if (sk) { struct dst_entry *dst = skb_dst(skb); int ret; if (unlikely(sk->sk_rx_dst != dst)) udp_sk_rx_dst_set(sk, dst); ret = udp_queue_rcv_skb(sk, skb); sock_put(sk); /* a return value > 0 means to resubmit the input, but * it wants the return to be -protocol, or 0 */ if (ret > 0) return -ret; return 0; } else {
若是在以前的early_demux操做中沒有找到socket,那麼就會調用__udp4_lib_lookup_skb對receiving socket進行查找。不管上述哪一種狀況,最終數據將被存入socket:
ret = udp_queue_rcv_skb(sk, skb); sock_put(sk);
若是沒有找到socket,那麼數據報將被丟棄:
/* No socket. Drop packet silently, if checksum is wrong */ if (udp_lib_checksum_complete(skb)) goto csum_error; UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE); icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0); /* * Hmm. We got an UDP packet to a port to which we * don't wanna listen. Ignore it. */ kfree_skb(skb); return 0;
udp_queue_rcv_skb
這個函數的初始部分以下所示:
最終咱們到達了處理receive queue的邏輯,首先檢查socket對應的receive queue是否是已經滿了:
if (sk_rcvqueues_full(sk, skb, sk->sk_rcvbuf)) goto drop;
sk_rcvqueue_full
sk_rcvqueue_full函數會檢查socket的backlog長度以及socket的sk_rmem_alloc來確認它們的和是否大於socket的sk_rcvbuf
/* * Take into account size of receive queue and backlog queue * Do not take into account this skb truesize, * to allow even a single big packet to come. */ static inline bool sk_rcvqueues_full(const struct sock *sk, const struct sk_buff *skb, unsigned int limit) { unsigned int qsize = sk->sk_backlog.len + atomic_read(&sk->sk_rmem_alloc); return qsize > limit; }
udp_queue_rcv_skb
一旦證實隊列未滿,則會繼續將數據加入隊列
bh_lock_sock(sk); if (!sock_owned_by_user(sk)) rc = __udp_queue_rcv_skb(sk, skb); else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) { bh_unlock_sock(sk); goto drop; } bh_unlock_sock(sk); return rc;
第一步先判斷socket當前是否被用戶進程佔用。若是不是,則調用__udp_queue_rcv_skb將數據加入receive queue。若是是,則經過調用sk_add_backlog將數據加入backlog。backlog中的數據最終都會加入receive queue,socket相關的系統調用經過調用release_sock釋放了該socket
__udp_queue_rcv_skb
__udp_queue_rcv_skb經過調用sock_queue_rcv_skb將數據加入receive queue,若是該數據不能被加入receive queue,則更新統計數據
rc = sock_queue_rcv_skb(sk, skb); if (rc < 0) { int is_udplite = IS_UDPLITE(sk); /* Note that an ENOMEM error is charged twice */ if (rc == -ENOMEM) UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_RCVBUFERRORS,is_udplite); UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_INERRORS, is_udplite); kfree_skb(skb); trace_udp_fail_queue_rcv_skb(rc, sk); return -1; }
Queuing data to a socket
如今數據已經經過調用sock_queue_rcv加入socket的隊列了。這個函數在將數據加入隊列前作了以下的操做:
以上就是數據如何到達系統,並經過整個協議棧到達socket並準備給用戶進程使用的過程
原文連接:
https://blog.packagecloud.io/eng/2016/06/22/monitoring-tuning-linux-networking-stack-receiving-data/