主要內容:Socket發送函數在TCP層的實現java
內核版本:3.15.2緩存
個人博客:http://blog.csdn.net/zhangskdapp
在上篇blog中分析了tcp_sendmsg()這個主要函數的實現,如今來看下以前略過的一些細節,less
包括等待鏈接的創建、tcp_push()的實現、tcp_autocorking和數據的複製。socket
在tcp_sendmsg()中若是發現鏈接還沒有創建,會調用sk_stream_wait_connect()來等待鏈接的創建,tcp
鏈接成功創建時返回0,以後才能發送數據。函數
/* Wait for a socket to get into the connected state * @sk: sock to wait on * @timeo_p: for how long to wait * Must be called with the socket locked. */ int sk_stream_wait_connect(struc sock *sk, long *timeo_p) { struct task_struct *tsk = current; DEFINE_WAIT(wait); /* 初始化等待任務 */ int done; do { int err = sock_error(sk); /* 鏈接發生錯誤 */ if (err) return err; /* 此時鏈接必須處於SYN_SENT或SYN_RECV的狀態 */ if (1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) return -EPIPE; /* Broken pipe */ /* 若是是非阻塞的,或者等待時間耗盡了,直接返回 */ if (! *timeo_p) return -EAGAIN; /* Try again */ /* 若是進程有待處理的信號,返回 */ if (signal_pending(tsk)) return sock_intr_errno(*timeo_p); /* 把等待任務加入到socket等待隊列頭部,把進程的狀態設爲TASK_INTERRUPTIBLE */ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); sk->sk_write_pending++; /* 更新寫等待計數 */ /* 進入睡眠,返回值爲真的條件: * 鏈接沒有發生錯誤,且狀態爲ESTABLISHED或CLOSE_WAIT。 */ done = sk_wait_event(sk, timeo_p, ! sk->sk_err && ! ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))); /* 把等待任務從等待隊列中刪除,把當前進程的狀態設爲TASK_RUNNING */ finish_wait(sk_sleep(sk), &wait); sk->sk_write_pending--; /* 更新寫等待計數 */ } while (! done) return 0; }
#define sk_wait_event(__sk, __timeo, __condition) \ ({ int __rc; \ release_sock(__sk); \ __rc = __condition; \ if (! __rc) { \ *(__timeo) = schedule_timeout(*(__timeo)); \ } \ lock_sock(__sk); \ __rc = __condition; \ __rc; \ }) static inline long sock_sndtimeo(const struct sock *sk, bool noblock) { return noblock ? 0 : sk->sk_sndtimeo; }
tcp_sendmsg()中,在sock發送緩存不足、系統內存不足或應用層的數據都拷貝完畢等狀況下,ui
都會調用tcp_push()來把已經拷貝到發送隊列中的數據給發送出去。this
tcp_push()主要作了如下事情:atom
1. 檢查是否有未發送過的數據。
2. 檢查是否須要設置PSH標誌。
3. 檢查是否使用了緊急模式。
4. 檢查是否須要使用自動阻塞。
5. 儘量地把發送隊列中的skb給發送出去。
static void tcp_push(struct sock *sk, int flags, int mss_now, int nonagle, int size_goal) { struct tcp_sock *tp = tcp_sk(sk); struct sk_buff *skb; /* 若是沒有未發送過的數據 */ if (! tcp_send_head(sk)) return; /* 發送隊列的最後一個skb */ skb = tcp_write_queue_tail(sk); /* 若是接下來沒有更多的數據須要發送,或者距離上次PUSH後又有比較多的數據, * 那麼就須要設置PSH標誌,讓接收端立刻把接收緩存中的數據提交給應用程序。 */ if (! (flags & MSG_MORE) || forced_push(tp)) tcp_mark_push(tp, skb); /* 若是設置了MSG_OOB標誌,就記錄緊急指針 */ tcp_mark_urg(tp, flags); /* 若是須要自動阻塞小包 */ if (tcp_should_autocork(sk, skb, size_goal)) { /* avoid atomic op if TSQ_THROTTED bit is already set, 設置阻塞標誌位 */ if (! test_bit(TSQ_THROTTLED, &tp->tsq_flags)) { NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPAUTOCORKING); set_bit(TSQ_THROTTLED, &tp->tsq_flags); } /* It is possible TX completion already happened before we set TSQ_THROTTED. * 個人理解是,當提交給IP層的數據包都發送出去後,sk_wmem_alloc的值就會變小, * 此時這個條件就爲假,以後能夠發送被阻塞的數據包了。 */ if (atomic_read(&sk->sk_wmem_alloc) > skb->truesize) return; } /* 若是以後還有更多的數據,那麼使用TCP CORK,顯式地阻塞發送 */ if (flags & MSG_MORE) nonagle = TCP_NAGLE_CORK; /* 儘量地把發送隊列中的skb發送出去。 * 若是發送失敗,檢查是否須要啓動零窗口探測定時器。 */ __tcp_push_pending_frames(sk, mss_now, nonagle); }
判斷是否要設置PSH標誌:
若是此時發送隊列的最後一個字節序號,和上次PSH的最後一個字節序號,
它們的間隔超過了對端通告過的最大接收窗口的一半,就須要設置。
static inline bool forced_push(const struct tcp_sock *tp) { /* write_seq:發送隊列最後一個字節的序號+1 * pushed_seq:上次PUSH的最後一個字節 * max_window:對端層通告過的最大接收窗口 */ return after(tp->write_seq, tp->pushed_seq + (tp->max_window >> 1)); } static inline void tcp_mark_push(struct tcp_sock *tp, struct sk_buff *skb) { TCP_SKB_CB(skb)->tcp_flags |= TCPHDR_PSH; /* 設置PSH標誌 */ tp->pushed_seq = tp->write_seq; /* 記錄本次PUSH的最後一個字節序號 */ } static inline void tcp_mark_urg(struct tcp_sock *tp, int flags) { if (flags & MSG_OOB) tp->snd_up = tp->write_seq; }
tcp_push_pending_frames()和__tcp_push_pending_frames()簡單的封裝了下tcp_write_xmit()。
從tcp_write_xmit()開始,TCP層才真正開始發送數據。
/* Push out any pending frames which were held back due to TCP_CORK * or attempt at coalescing tiny packets. * The socket must be locked by the caller. */ void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss, int nonagle) { /* If we are closed, the bytes will have to remain here. * In time closedown will finish, we empty the write queue and * all will be happy. */ if (unlikely(sk->sk_state == TCP_CLOSE)) return; /* 若是發送失敗 */ if (tcp_write_xmit(sk, cur_mss, nonagle, 0, sk_gfp_atomic(sk, GFP_ATOMIC))) tcp_check_probe_timer(sk); /* 檢查是否須要啓用0窗口探測定時器*/ }
當應用程序連續地發送小包時,若是可以把這些小包合成一個全尺寸的包再發送,無疑能夠減小
總的發包個數。tcp_autocorking的思路是當規則隊列Qdisc、或網卡的發送隊列中有還沒有發出的
數據包時,那麼就延遲小包的發送,等待應用層的後續數據,直到Qdisc或網卡發送隊列的數據
包成功發送出去爲止。
sysctl_tcp_autocorking - BOOLEAN
Enable TCP auto corking:
When applications do consecutive small write()/sendmsg() system calls,
we try to coalesce these small writes as much as possible, to lower total
amount of sent packets. This is done if at least one prior packet for the flow
is waiting in Qdisc queues or device transmit queue. Applications can still
use TCP_CORK for optimal behavior when they know how/when to uncork
their sockets.
Default: 1
Patch: http://lwn.net/Articles/576263/
同時知足如下條件時,tcp_push()纔會自動阻塞:
1. 數據包爲小包,即數據長度小於最大值。
2. 使用了tcp_autocorking,這個值默認爲1。
3. 此數據包不是發送隊列的第一個包,即前面有數據包被髮送了。
4. Qdisc或Nic queues必須有數據包,而不能只是純ACK包。
/* If a not yet filled skb is pushed, do not send it if we have data packets * in Qdisc or NIC queues: Because TX completion will happen shortly, * it gives a chance to coalesce future sendmsg() payload into this skb, * without need for a timer, and with no latency trade off. * As packts containing data payload have a bigger truesize than pure * acks (dataless) packets, the last checks prevent autocorking if we only * have an ACK in Qdisc/NIC queues, or if TX completion was delayed * after we processed ACK packet. */ static bool tcp_should_autocork(struct sock *sk, struct sk_buff *skb, int size_goal) { return skb->len < size_goal && sysctl_tcp_autocorking && skb != tcp_write_queue_head(sk) && atomic_read(&sk->sk_wmem_alloc) > skb->truesize; }
Q:何時會取消自動阻塞呢?
A:在tcp_push()中會檢查,if (atomic_read(&sk->sk_wmem_alloc) > skb->truesize)
當提交給IP層的數據包都發送出去後,sk_wmem_alloc的值就會變小,此時這個條件就爲假,
以後能夠發送被阻塞的數據包了。
tcp_sendmsg()的一項主要工做,就是把用戶層的數據填充到發送隊列的skb中。
skb_availroom返回skb的data room大小,若是有非線性數據區,就返回0。
/* * bytes at buffer end. * Return the number of bytes of free space at the tail of an sk_buff * allocated by sk_stream_alloc() */ static inline int skb_availroom(const struct sk_buff *skb) { /* skb->data_len不爲零,表示有非線性的數據區 */ if (skb_is_nonlinear(skb)) return 0; /* data room的大小 */ return skb->end - skb->tail - skb->reserved_tailroom; }
驗證用戶空間的數據可讀,拷貝用戶空間的數據到內核空間。若是須要TCP本身計算校驗和,
那麼同時計算用戶層數據的校驗和。
static inline int skb_add_data_nocache(struct sock *sk, struct sk_buff *skb, char __user *from, int copy) { int err, offset = skb->len; /* 拷貝用戶空間的數據到內核空間,同時計算校驗和 */ err = skb_do_copy_data_nocache(sk, skb, from, skb_put(skb, copy), copy, offset); /* 若是拷貝失敗,恢復skb->len和data room的大小 */ if (err) __skb_trim(skb, offset); return err; } static inline int skb_do_copy_data_nocache(struct sock *sk, struct sk_buff *skb, char __user *from, char *to, int copy, int offset) { /* 須要TCP本身計算校驗和 */ if (skb->ip_summed == CHECKSUM_NONE) { int err = 0; /* 拷貝用戶空間的數據到內核空間,同時計算用戶數據的校驗和 */ __wsum csum = csum_and_copy_from_user(from, to, copy, 0, &err); skb->csum = csum_block_add(skb->csum, csum, offset); /* 累加校驗和 */ } else if (sk->sk_route_caps & NETIF_F_NOCACHE_COPY) { if (! access_ok(VERIFY_READ, from, copy) || __copy_from_user_nocache(to, from, copy)) return -EFAULT; } else if (copy_from_user(to, from, copy)) return -EFAULT; return 0; }
向下擴大data room,返回擴大以前的tail指針。
/* add data to a buffer. * This function extends the used data area of the buffer. If this would * exceed the total buffer size the kernel will panic. A pointer to the first * byte of the extra data is returned. */ unsigned char *skb_put(struct sk_buff *skb, unsigned int len) { unsigned char *tmp = skb_tail_pointer(skb); SKB_LINEAR_ASSERT(skb); skb->tail += len; skb->len += len; if (unlikely(skb->tail > skb->end)) skb_over_panic(skb, len, __builtin_return_address(0)); return tmp; } static inline void __skb_trim(struct sk_buff *skb, unsigned int len) { if (unlikely(skb_is_nolinear(skb))) { WARN_ON(1); return; } skb->len = len; /* 恢復原來的長度 */ skb_set_tail_pointer(skb, len); /* 恢復data room的大小 */ }
拷貝用戶空間的數據到內核空間,同時計算校驗和。同時更新skb->len、skb->data_len、
skb->truesize、sk->sk_wmem_queued和sk->sk_forward_alloc。
static inline int skb_copy_to_page_nocache(struct sock *sk, char __user *from, struct sk_buff *skb, struct page *page, int off, int copy) { int err; /* 拷貝用戶空間的數據到內核空間,同時計算校驗和 */ err = skb_do_copy_data_nocache(sk, skb, from, page_address(page) + off, copy, skb->len); if (err) return err; skb->len += copy; skb->data_len += copy; skb->truesize += copy; sk->sk_wmem_queued += copy; sk_mem_charge(sk, copy); return 0; }