UDP的epoll併發框架—解決OpenUOM的併發問題

UDP具備是一種很好的封裝協議,好比OpenUOM使用UDP封裝會比TCP好不少,如今愈來愈多的業務採用UDP傳輸,而後本身定義按序到達以及流控邏輯,然而就我我的的使用經驗來看,UDP太難作併發,大多數狀況下,使用UDP會讓epoll等高性能event機制優點全無。本文以OpenUOM爲例,說明一下我是怎麼解決UDP併發問題的。nginx

異步併發模型與epoll

和apache相比,nginx採用異步的處理方式,也就是說,一個線程能夠處理多個鏈接,基於event模型,來了個數據包就讀,可能依次到達的數據不屬於同一個鏈接,可是不要緊,只要能將可讀的socket描述符和具體的鏈接對應上便可。這樣會使得在大併發場景下,讓CPU逼近其極限運轉,由於它幾乎沒有時間閒着,它會一直處理到達的數據包。apache的模型就不是這樣,它會讓一個鏈接單獨佔有一個線程,若是有大量的鏈接就會有大量的線程,然而對於每個線程而言,其數據讀寫的壓力並非很大,這就會致使大量線程之間頻繁切換,而切換會致使cache的刷新等反作用...所以在一樣的硬件配置情形下,nginx的異步模型要比apache好不少。golang

咱們已經知道,異步處理是搞定大併發的根本,接下來的問題是,如何讓一個就緒的socket和一個業務邏輯鏈接對應起來,這個問題在同步模型下並不存在,由於一個線程只處理一個鏈接。曾經的event機制好比select,poll,它們只能告訴你socket n就緒了,你不得不本身去經過數據結構來組織socket n和該鏈接信息之間的關係,典型的以下:算法

struct conn {
    int sd;
    void *others;
};

list conns;

一個鏈表conns囊括了該線程負責的全部鏈接,若是select/poll告訴你socket n就緒了,你不得不遍歷這個conns鏈表,比較誰的sd是n,而後取出conn來處理,雖然能夠用更加高效的數據結構,可是查找是必不可少的。然而epoll解決了這個問題。apache

在調用epoll_ctrl將一個socket加入到epoll中時,API會爲你提供一個指針,讓你直接綁定一個socket描述符和一個指針,一旦socket就緒,取出的是一個結構體,其中包含了與該socket對應的指針,所以你即可以這麼作:編程

conn.sd = sd;
conn.others = all;
ev.events = EPOLLIN;
ev.data.ptr = &conn;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev);
while (1) { 
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        conn = events[n].data.ptr;
        recv(conn.sd, ....);
        ....
    }
}

conn會一會兒取出來。這是合理的方式。畢竟,內核中已經通過socket查找了,一個5元組惟一表明瞭一個鏈接,爲什麼要在用戶態程序再找一次呢?所以除了epoll不須要遍歷全部的被監視socket以外,能夠保存用戶的指針也是其相對於select/poll的一大優點。nginx正是用的這種方式。咱們回到OpenUOM。服務器

使用TCP的OpenUOM

使用TCP的OpenUOM跟nginx幾乎是如出一轍,其核心處理邏輯以下:數據結構

/* 加入偵聽socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.ptr == context) {
            child_sd = accept(context.sd, remote_addr....);
            multi_instance *mi = create_mi(child_sd, remote_addr, ...);
            entry.ptr = mi;
            entry.type = SOCKET;
            new_ev.events = EPOLLIN;
            new_ev.data.ptr = entry;
            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
            ....
        } else if (events[n].data.ptr.type == SOCKET){
            multi_instance *mi = events[n].data.ptr;
            data = read_from_socket(mi);
            // 這裏簡化了處理,由於並非每個數據包都是須要加密解密的,還有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

以上就是TCP模式下的OpenUOM所有邏輯,能夠看到,若是socket可讀,那麼就能夠直接取到multi_instance,而後順序處理就是了。我記得去年我就把OpenUOM改爲多線程了,可是如今看來那是個失敗的作法。若是使用TCP,從上述邏輯能夠看到,就算使用多線程,在socket-to-tun這個路徑上也不用加鎖,所以multi_instance直接經過epoll_wait就能夠取的到。多線程

須要C/C++ Linux服務器架構師學習資料加羣812855908(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享架構

UDP的epoll併發框架—解決OpenUOM的併發問題

使用UDP的OpenUOM

然而對於UDP而言,OpenUOM的處理邏輯根上面TCP的邏輯就大相徑庭了。由於全程只有一個UDP socket,接受全部客戶端的鏈接,此時根本不存在什麼多路複用的問題,充其量也就是那惟一的UDP socket和tun網卡字符設備兩者之間的兩路複用,使用epoll徹底沒有必要。爲了定位了具體的multi_instance,你不得不先去read惟一的那個UDP socket,而後根據recvfrom返回參數中的sockaddr結構體來構造4元組,而後根據這4元組在全局的multi_instance hash表中去查找具體multi_instance實例。其邏輯以下所示:併發

/* 加入惟一的UDP socket */
context.sd = udp_sd;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {  //實際上nfds最多也就是2
        if (events[n].data.ptr == context) {
            data = recvfrom(context.sd, remote_addr....);
            lock(mi_hashtable);   //若是多線程,這個鎖將會成爲瓶頸,即使是RW鎖也同樣
            multi_instance *mi = lookup_mi(child_sd, remote_addr, ...);  //再好的hash算法,也不是0成本的!
            unlock(mi_hashtable);
            // 這裏簡化了處理,由於並非每個數據包都是須要加密解密的,還有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);  
            ....
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

可見,TCP的OpenUOM和UDP的OpenUOM處理方式徹底不一樣,UDP的問題在於,徹底沒有充分利用epoll的多路複用機制,不得不根據數據包的recvfrom返回地址來查找multi_instance...

讓UDP socket也Listen起來

若是UDP也能像TCP同樣,每個用戶接進來就爲之建立一個單獨的socket爲其專門服務該多好,這樣在大併發的時候,就能夠充分複用內核UDP層的socket查找結論加上epoll的通知機制了。理論上這是可行的,由於UDP的4元組能夠惟一識別一個與之通訊的客戶端,雖然UDP生成無鏈接,不可靠,可是爲每個鏈接的客戶端建立一個socket並無破壞UDP的語義,只是改變了UDP的編程模型而已,內核協議棧依然不會去刻意維護一個UDP鏈接,也不會進行任何的數據確認。
須要說明的是,這種方案僅僅對「長鏈接」的UDP有意義,好比OpenUOM這類。由於UDP是沒有鏈接的,那麼你也就不知道一個客戶端何時會永遠中止發送數據,所以必然要經過定時器來定時關閉那些在必定時間段內沒有數據的socket。
爲了驗證可行性,我先在用戶態作實驗,也就是說,接受一個客戶端的「鏈接請求」(其實就是一個數據包)時,我手工爲其建立一個socket,而後bind本地地址,而且connect從recvfrom返回的對端地址,這樣理論上對於後續的數據包,epoll都應該觸發這個新的socket,畢竟它更精確。事實是否是這樣呢?如下的程序能夠證實:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include <assert.h>

#define SO_REUSEPORT    15

#define MAXBUF 10240
#define MAXEPOLLSIZE 100

int flag = 0;

int read_data(int sd) {
    char recvbuf[MAXBUF + 1];
    int  ret;
    struct sockaddr_in client_addr;
    socklen_t cli_len=sizeof(client_addr);

    bzero(recvbuf, MAXBUF + 1);
  
    ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);
    if (ret > 0) {
        printf("read[%d]: %s  from  %dn", ret, recvbuf, sd);
    } else {
        printf("read err:%s  %dn", strerror(errno), ret);
      
    }
    fflush(stdout);
}

int udp_accept(int sd, struct sockaddr_in my_addr) {
    int new_sd = -1;
    int ret = 0;
    int reuse = 1;
    char buf[16];
    struct sockaddr_in peer_addr;
    socklen_t cli_len = sizeof(peer_addr);

    ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len);
    if (ret > 0) {
    }

    if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("child socket");
        exit(1);
    } else {
        printf("parent:%d  new:%dn", sd, new_sd);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr));
    if (ret){
        perror("chid bind");
        exit(1);
    } else {
    }

    peer_addr.sin_family = PF_INET;
    printf("aaa:%sn", inet_ntoa(peer_addr.sin_addr));
    if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {
        perror("chid connect");
        exit(1);
    } else {
    }

out:
    return new_sd;
}

int main(int argc, char **argv) {
    int listener, kdpfd, nfds, n, curfds;
    socklen_t len;
    struct sockaddr_in my_addr, their_addr;
    unsigned int port;
    struct epoll_event ev;
    struct epoll_event events[MAXEPOLLSIZE];
    int opt = 1;;
    int ret = 0;

    port = 1234;
  
    if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("socket");
        exit(1);
    } else {
        printf("socket OKn");
    }

    ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    if (ret) {
        exit(1);
    }
  
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = PF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) {
        perror("bind");
        exit(1);
    } else {
        printf("IP bind OKn");
    }
  
    kdpfd = epoll_create(MAXEPOLLSIZE);

    ev.events = EPOLLIN|EPOLLET;
    ev.data.fd = listener;

    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%dn", listener);
        return -1;
    } else {
        printf("ep add OKn");
    }
 
    while (1) {
      
        nfds = epoll_wait(kdpfd, events, 10000, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }
      
        for (n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listener) {
                printf("listener:%dn", n);
                int new_sd;               
                struct epoll_event child_ev;

                new_sd = udp_accept(listener, my_addr);
                child_ev.events = EPOLLIN;
                child_ev.data.fd = new_sd;
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) {
                    fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);
                    return -1;
                }
            } else {
                read_data(events[n].data.fd);
            }
        }
    }
    close(listener);
    return 0;
}

須要說明的是,REUSEPORT是必要的,由於在connect以前,你必須爲新建的socket bind跟listener同樣的IP地址和端口,所以就須要這個socket選項。
此時,若是你用多個udp客戶端去給這個服務端發數據,會發現徹底實現了想要的效果。

內核中的UDP Listener

雖然在用戶態能夠實現效果,可是編程模型並不太好用,爲了建立一個socket,你不得不先去recvfrom一下數據,好獲得對端的地址,雖然使用PEEK標誌可讓建立好child socket後再讀一次,可是仔細想一想,最完全的方案仍是直接擴展內核,我基於3.9.6內核,對__udp4_lib_rcv這個UDP協議棧接收函數做了如下的修改:

int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
                   int proto)
{
......................
        sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);

        if (sk != NULL) {
                int ret;
#if 1
                // 這個UDP_LISTEN,經過setsockopt來設置
                if (sk->sk_state == UDP_LISTEN) {
                        // 若是是UDP的listener,建立一個數據socket
                        struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC);
                        if (newsk) {
                                struct inet_sock *newinet;
                                // 爲這個數據傳輸socket根據skb來填充4元組信息
                                newinet               = inet_sk(newsk);
                                newinet->inet_daddr   = ip_hdr(skb)->saddr;
                                newinet->inet_rcv_saddr = ip_hdr(skb)->daddr;
                                newinet->inet_saddr           = ip_hdr(skb)->daddr;
                                rcu_assign_pointer(newinet->inet_opt, NULL);

                                newinet->mc_index     = inet_iif(skb);
                                newinet->mc_ttl       = ip_hdr(skb)->ttl;
                                newinet->rcv_tos      = ip_hdr(skb)->tos;
                                newinet->inet_id = 0xffffffff ^ jiffies;
                                inet_sk_rx_dst_set(newsk, skb);
                                // sock結構體新增csk變量,相似TCP的accept queue,可是爲了簡單,目前每一個Listen socket只能持有一個csk,即child sock。
                                sk->csk = newsk;

                                // 將新的數據傳輸socket排入全局的UDP socket hash表
                                if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) {
                                        printk("[UDP listen] get port errorn");
                                        release_sock(newsk);
                                        err = -2;
                                        goto out_go;
                                }
                                ret = udp_queue_rcv_skb(newsk, skb);
                                // 喚醒epoll,讓epoll返回UDP Listener
                                sk->sk_data_ready(sk, 0);
                                sock_put(newsk);
                        } else {
                                printk("[UDP listen] create new errorn");
                                sock_put(sk);
                                return -1;
                        }
out_go:
                        sock_put(sk);
                        if (ret > 0)
                                return -ret;
                        return 0;
                }
#endif
                ret = udp_queue_rcv_skb(sk, skb);
                sock_put(sk);
......................
}

我只是測試,所以並無擴展UDP的accept方法,只是簡單的用getsocketopt來得到這個新的socket描述符併爲task安裝該文件描述符,setsockopt能夠設置一個UDP socket爲listener。這樣用戶態的編程模型就很簡單了。

使用新的Listen UDP來改造OpenUOM

有必要重構一下OpenUOM了,現現在它的邏輯變成了:

listen = 1;
listener = socket(PF_INET, SOCK_DGRAM, 0);
setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen));

/* 加入偵聽socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.ptr == context) {
            getsockopt(context.sd, SOL_SOCKET, &newsock_info....);
            child_sd = newsock_info.sd;
            multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...);
            entry.ptr = mi;
            entry.type = SOCKET;
            new_ev.events = EPOLLIN;
            new_ev.data.ptr = entry;
            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
            // 這是UDP,內核除了通知Listener以外,還會將數據排入child_sd,所以須要去讀取,能夠參考TCP的Fastopen邏輯
            data = recvfrom(child_sd, ....);
            ....
        } else if (events[n].data.ptr.type == SOCKET){
            multi_instance *mi = events[n].data.ptr;
            data = read_from_socket(mi);
            // 這裏簡化了處理,由於並非每個數據包都是須要加密解密的,還有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

除了把accept改爲了getsockopt以外,別的幾乎和TCP的OpenUOM徹底一致了。如此一來,2014年改造的OpenUOM多線程版本就完美了,用戶態根本不須要再使用recvfrom返回的address信息來定位multi_instance了,一個multi_instance惟一和一個socket綁定,而每個socket都由epoll來管理,大大下降了用戶態查找multi_instance的開銷,同時也避免了鎖定。