Kafka源碼分析-序列3 -Producer -Java NIO(Reactor VS Peactor)

上一篇咱們分析了Metadata的更新機制,其中涉及到一個問題,就是Sender如何跟服務器通訊,也就是網絡層。同不少Java項目同樣,Kafka client的網絡層也是用的Java NIO,而後在上面作了一層封裝。html

下面首先看一下,在Sender和服務器之間的部分:react

這裏寫圖片描述

能夠看到,Kafka client基於Java NIO封裝了一個網絡層,這個網絡層最上層的接口是KakfaClient。其層次關係以下:
這裏寫圖片描述linux

在本篇中,先詳細對最底層的Java NIO進行講述。< 喎�"/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxoMSBpZD0="nio的4大組件">NIO的4大組件編程

Buffer與Channel

Channel: 在一般的Java網絡編程中,咱們知道有一對Socket/ServerSocket對象,每1個socket對象表示一個connection,ServerSocket用於服務器監聽新的鏈接。
在NIO中,與之相對應的一對是SocketChannel/ServerSocketChannel。windows

下圖展現了SocketChannel/ServerSocketChannel的類繼承層次設計模式

這裏寫圖片描述

?安全

1服務器

2網絡

3框架

4

5

6

7

8

9

10

11

12

public interface Channel extends Closeable {

    public boolean isOpen();

    public void close() throws IOException;

}

 

public interface ReadableByteChannel extends Channel {

    public int read(ByteBuffer dst) throws IOException;

}

 

public interface WritableByteChannel extends Channel {

    public int write(ByteBuffer src) throws IOException;

}

從代碼能夠看出,一個Channel最基本的操做就是read/write,而且其傳進去的必須是ByteBuffer類型,而不是普通的內存buffer。

Buffer:在NIO中,也有1套圍繞Buffer的類繼承層次,在此就不詳述了。只需知道Buffer就是用來封裝channel發送/接收的數據。

Selector

Selector的主要目的是網絡事件的 loop 循環,經過調用selector.poll,不斷輪詢每一個Channel上讀寫事件

SelectionKey

SelectionKey用來記錄一個Channel上的事件集合,每一個Channel對應一個SelectionKey。
SelectionKey也是Selector和Channel之間的關聯,經過SelectionKey能夠取到對應的Selector和Channel。

關於這4大組件的協做、配合,下面來詳細講述。

4種網絡IO模型

epoll與IOCP

在《Unix環境高級編程》中介紹瞭如下4種IO模型(實際不止4種,但經常使用的就這4種):

阻塞IO: read/write的時候,阻塞調用

非阻塞IO: read/write,沒有數據,立馬返回,輪詢

IO複用:read/write一次都只能監聽一個socket,但對於服務器來說,有成千上完個socket鏈接,如何用一個函數,能夠監聽全部的socket上面的讀寫事件呢?這就是IO複用模型,對應linux上面,就是select/poll/epoll3種技術。

異步IO:linux上沒有,windows上對應的是IOCP。

Reactor模式 vs. Preactor模式

相信不少人都據說過網絡IO的2種設計模式,關於這2種模式的具體闡述,能夠自行google之。

在此處,只想對這2種模式作一個「最通俗的解釋「:

Reactor模式:主動模式,所謂主動,是指應用程序不斷去輪詢,問操做系統,IO是否就緒。Linux下的select/poll/epooll就屬於主動模式,須要應用程序中有個循環,一直去poll。
在這種模式下,實際的IO操做仍是應用程序作的。

Proactor模式:被動模式,你把read/write所有交給操做系統,實際的IO操做由操做系統完成,完成以後,再callback你的應用程序。Windows下的IOCP就屬於這種模式,再好比C++ Boost中的Asio庫,就是典型的Proactor模式。

epoll的編程模型--3個階段

在Linux平臺上,Java NIO就是基於epoll來實現的。全部基於epoll的框架,都有3個階段:
註冊事件(connect,accept,read,write), 輪詢IO是否就緒,執行實際IO操做。

下面的代碼展現了在linux下,用c語言epoll編程的基本框架:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

//階段1: 調用epoll_ctl(xx) 註冊事件

 

for( ; ; )

    {

        nfds = epoll_wait(epfd,events,20,500);     //階段2:輪詢全部的socket

 

        for(i=0;i<nfds;++i) .data.fd="=listenfd)" 0="" accept="" connfd="accept(listenfd,(sockaddr" else="" epollin="" ev.data.fd="connfd;" ev.data.ptr="md;" ev.events="EPOLLIN|EPOLLET;" md="(myepoll_data*)events[i].data.ptr;" n="read(sockfd," sockfd="md-" struct="">fd;

                send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //階段3: 執行實際的io操做

                ev.data.fd=sockfd;

                ev.events=EPOLLIN|EPOLLET;

                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //回到階段1,從新註冊事件

            }

            else

            {

                //其餘的處理

            }

        }

    }</nfds;++i)>

一樣, NIO中的Selector一樣有如下3個階段,下面把Selector和epoll的使用作個對比:

這裏寫圖片描述

能夠看到,2者只是寫法不一樣,一樣的, 都有這3個階段。

下面的表格展現了connect, accept, read, write 這4種事件,分別在這3個階段對應的函數:

這裏寫圖片描述

下面看一下Kafka client中Selector的核心實現:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

@Override

public void poll(long timeout) throws IOException {

    。。。

    clear(); //清空各類狀態

    if (hasStagedReceives())

        timeout = 0;

    long startSelect = time.nanoseconds();

    int readyKeys = select(timeout);  //輪詢

    long endSelect = time.nanoseconds();

    currentTimeNanos = endSelect;

    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

 

    if (readyKeys > 0) {

        Set<selectionkey> keys = this.nioSelector.selectedKeys();

        Iterator<selectionkey> iter = keys.iterator();

        while (iter.hasNext()) {

            SelectionKey key = iter.next();

            iter.remove();

            KafkaChannel channel = channel(key);

 

            // register all per-connection metrics at once

            sensors.maybeRegisterConnectionMetrics(channel.id());

            lruConnections.put(channel.id(), currentTimeNanos);

 

            try {

                if (key.isConnectable()) {  //有鏈接事件

                    channel.finishConnect();

                    this.connected.add(channel.id());

                    this.sensors.connectionCreated.record();

                }

 

                if (channel.isConnected() && !channel.ready())

                    channel.prepare(); //這個只有須要安全檢查的SSL需求,普通的不加密的channel,prepare()爲空實現

 

                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { //讀就緒

                    NetworkReceive networkReceive;

                    while ((networkReceive = channel.read()) != null)

                        addToStagedReceives(channel, networkReceive); //實際的讀動做

                }

 

                if (channel.ready() && key.isWritable()) {  //寫就緒

                    Send send = channel.write(); //實際的寫動做

                    if (send != null) {

                        this.completedSends.add(send);

                        this.sensors.recordBytesSent(channel.id(), send.size());

                    }

                }

 

                /* cancel any defunct sockets */

                if (!key.isValid()) {

                    close(channel);

                    this.disconnected.add(channel.id());

                }

            } catch (Exception e) {

                String desc = channel.socketDescription();

                if (e instanceof IOException)

                    log.debug("Connection with {} disconnected", desc, e);

                else

                    log.warn("Unexpected error from {}; closing connection", desc, e);

                close(channel);

                this.disconnected.add(channel.id());

            }

        }

    }

 

    addToCompletedReceives();

 

    long endIo = time.nanoseconds();

    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    maybeCloseOldestConnection();

}</selectionkey></selectionkey>

epoll和selector在註冊上的差異

從代碼能夠看出, Selector和epoll在代碼結構上基本同樣,但在事件的註冊上面有區別:

epoll: 每次read/write以後,都要調用epoll_ctl從新註冊

Selector: 註冊一次,一直有效,一直會有事件產生,所以須要取消註冊。下面來詳細分析一下:

connect事件的註冊

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

//Selector

    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {

        if (this.channels.containsKey(id))

            throw new IllegalStateException("There is already a connection for id " + id);

 

        SocketChannel socketChannel = SocketChannel.open();

        。。。

        try {

            socketChannel.connect(address);

        } catch (UnresolvedAddressException e) {

            socketChannel.close();

            throw new IOException("Can't resolve address: " + address, e);

        } catch (IOException e) {

            socketChannel.close();

            throw e;

        }

        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);  //構造channel的時候,註冊connect事件

        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);

        key.attach(channel);

        this.channels.put(id, channel);

    }

connect事件的取消

?

1

2

3

4

5

6

7

8

9

10

11

//在上面的poll函數中,connect事件就緒,也就是指connect鏈接完成,鏈接簡歷

 if (key.isConnectable()) {  //有鏈接事件

       channel.finishConnect();

                        ...

     }

 

 //PlainTransportLayer

 public void finishConnect() throws IOException {

        socketChannel.finishConnect();  //調用channel的finishConnect()

        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); //取消connect事件,新加read事件組冊

    }

read事件的註冊

從上面也能夠看出,read事件的註冊和connect事件的取消,是同時進行的

read事件的取消

由於read是要一直監聽遠程,是否有新數據到來,因此不會取消,一直監聽

write事件的註冊

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

//Selector

    public void send(Send send) {

        KafkaChannel channel = channelOrFail(send.destination());

        try {

            channel.setSend(send);

        } catch (CancelledKeyException e) {

            this.failedSends.add(send.destination());

            close(channel);

        }

    }

 

//KafkaChannel

    public void setSend(Send send) {

        if (this.send != null)

            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");

        this.send = send;

        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);  //每調用一次Send,註冊一次Write事件

    }

Write事件的取消

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

//上面的poll函數裏面

                    if (channel.ready() && key.isWritable()) { //write事件就緒

                        Send send = channel.write(); //在這個write裏面,取消了write事件

                        if (send != null) {

                            this.completedSends.add(send);

                            this.sensors.recordBytesSent(channel.id(), send.size());

                        }

                    }

 

 

    private boolean send(Send send) throws IOException {

        send.writeTo(transportLayer);

        if (send.completed())

            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);  //取消write事件

 

        return send.completed();

    }

總結一下:
(1)「事件就緒「這個概念,對於不一樣事件類型,仍是有點歧義的

read事件就緒:這個最好理解,就是遠程有新數據到來,須要去read

write事件就緒:這個指什麼呢? 其實指本地的socket緩衝區有沒有滿。沒有滿的話,應該就是一直就緒的,可寫

connect事件就緒: 指connect鏈接完成

accept事件就緒:有新的鏈接進來,調用accept處理

(2)不一樣類型事件,處理方式是不同的:

connect事件:註冊1次,成功以後,就取消了。有且僅有1次

read事件:註冊以後不取消,一直監聽

write事件: 每調用一次send,註冊1次,send成功,取消註冊

相關文章
相關標籤/搜索