Zookeeper-watcher機制源碼分析(二)

服務端接收請求處理流程

在前一篇文章中Zookeeper-watcher機制源碼分析(一)說過Watcher的基本流程,在此文中詳細剖析服務端幾首請求處理流程。node

服務端有一個NettyServerCnxn類,用來處理客戶端發送過來的請求 程序員

NettyServerCnxn 數據庫

1編程

2promise

3服務器

4session

5架構

6app

7異步

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

public void receiveMessage(ChannelBuffer message) {

        try {

            while(message.readable() && !throttled) {

                if (bb != null) { //ByteBuffer不爲空

                    if (LOG.isTraceEnabled()) {

                        LOG.trace("message readable " + message.readableBytes()

                                " bb len " + bb.remaining() + " " + bb);

                        ByteBuffer dat = bb.duplicate();

                        dat.flip();

                        LOG.trace(Long.toHexString(sessionId)

                                " bb 0x"

                                + ChannelBuffers.hexDump(

                                        ChannelBuffers.copiedBuffer(dat)));

                    }

                    //bb剩餘空間大於message中可讀字節大小

                    if (bb.remaining() > message.readableBytes()) {

                        int newLimit = bb.position() + message.readableBytes();

                        bb.limit(newLimit);

                    }

                    // 將message寫入bb中

                    message.readBytes(bb);

                    bb.limit(bb.capacity());

 

                    if (LOG.isTraceEnabled()) {

                        LOG.trace("after readBytes message readable "

                                + message.readableBytes()

                                " bb len " + bb.remaining() + " " + bb);

                        ByteBuffer dat = bb.duplicate();

                        dat.flip();

                        LOG.trace("after readbytes "

                                + Long.toHexString(sessionId)

                                " bb 0x"

                                + ChannelBuffers.hexDump(

                                        ChannelBuffers.copiedBuffer(dat)));

                    }

                    if (bb.remaining() == 0) { // 已經讀完message,表示內容已經所有接收

                        packetReceived(); // 統計接收信息

                        bb.flip();

 

                        ZooKeeperServer zks = this.zkServer;

                        if (zks == null || !zks.isRunning()) {//Zookeeper服務器爲空 ,說明服務端掛了

                            throw new IOException("ZK down");

                        }

                        if (initialized) {

                            //處理客戶端傳過來的數據包

                            zks.processPacket(this, bb);

 

                            if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {

                                disableRecvNoWait();

                            }

                        else {

                            LOG.debug("got conn req request from "

                                    + getRemoteSocketAddress());

                            zks.processConnectRequest(this, bb);

                            initialized = true;

                        }

                        bb = null;

                    }

                else //bb爲null的狀況,你們本身去看,我就不細講了

                    if (LOG.isTraceEnabled()) {

                        LOG.trace("message readable "

                                + message.readableBytes()

                                " bblenrem " + bbLen.remaining());

                        ByteBuffer dat = bbLen.duplicate();

                        dat.flip();

                        LOG.trace(Long.toHexString(sessionId)

                                " bbLen 0x"

                                + ChannelBuffers.hexDump(

                                        ChannelBuffers.copiedBuffer(dat)));

                    }

 

                    if (message.readableBytes() < bbLen.remaining()) {

                        bbLen.limit(bbLen.position() + message.readableBytes());

                    }

                    message.readBytes(bbLen);

                    bbLen.limit(bbLen.capacity());

                    if (bbLen.remaining() == 0) {

                        bbLen.flip();

 

                        if (LOG.isTraceEnabled()) {

                            LOG.trace(Long.toHexString(sessionId)

                                    " bbLen 0x"

                                    + ChannelBuffers.hexDump(

                                            ChannelBuffers.copiedBuffer(bbLen)));

                        }

                        int len = bbLen.getInt();

                        if (LOG.isTraceEnabled()) {

                            LOG.trace(Long.toHexString(sessionId)

                                    " bbLen len is " + len);

                        }

 

                        bbLen.clear();

                        if (!initialized) {

                            if (checkFourLetterWord(channel, message, len)) {

                                return;

                            }

                        }

                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {

                            throw new IOException("Len error " + len);

                        }

                        bb = ByteBuffer.allocate(len);

                    }

                }

            }

        catch(IOException e) {

            LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);

            close();

        }

    }

ZookeeperServer-zks.processPacket(this, bb);

處理客戶端傳送過來的數據包

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {

        // We have the request, now process and setup for next

        InputStream bais = new ByteBufferInputStream(incomingBuffer);

        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);

        RequestHeader h = new RequestHeader();

        h.deserialize(bia, "header"); //反序列化客戶端header頭信息

        // Through the magic of byte buffers, txn will not be

        // pointing

        // to the start of the txn

        incomingBuffer = incomingBuffer.slice();

        if (h.getType() == OpCode.auth) { //判斷當前操做類型,若是是auth操做,則執行下面的代碼

            LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());

            AuthPacket authPacket = new AuthPacket();

            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);

            String scheme = authPacket.getScheme();

            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);

            Code authReturn = KeeperException.Code.AUTHFAILED;

            if(ap != null) {

                try {

                    authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());

                catch(RuntimeException e) {

                    LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);

                    authReturn = KeeperException.Code.AUTHFAILED;

                }

            }

            if (authReturn == KeeperException.Code.OK) {

                if (LOG.isDebugEnabled()) {

                    LOG.debug("Authentication succeeded for scheme: " + scheme);

                }

                LOG.info("auth success " + cnxn.getRemoteSocketAddress());

                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,

                        KeeperException.Code.OK.intValue());

                cnxn.sendResponse(rh, nullnull);

            else {

                if (ap == null) {

                    LOG.warn("No authentication provider for scheme: "

                            + scheme + " has "

                            + ProviderRegistry.listProviders());

                else {

                    LOG.warn("Authentication failed for scheme: " + scheme);

                }

                // send a response...

                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,

                        KeeperException.Code.AUTHFAILED.intValue());

                cnxn.sendResponse(rh, nullnull);

                // ... and close connection

                cnxn.sendBuffer(ServerCnxnFactory.closeConn);

                cnxn.disableRecv();

            }

            return;

        else //若是不是受權操做,再判斷是否爲sasl操做

            if (h.getType() == OpCode.sasl) {

                Record rsp = processSasl(incomingBuffer,cnxn);

                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());

                cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?

                return;

            }

            else {//最終進入這個代碼塊進行處理

                //封裝請求對象

                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),

                  h.getType(), incomingBuffer, cnxn.getAuthInfo());

                si.setOwner(ServerCnxn.me);

                // Always treat packet from the client as a possible

                // local request.

                setLocalSessionFlag(si);

                submitRequest(si); //提交請求

            }

        }

        cnxn.incrOutstandingRequests(h);

    }

  

submitRequest

負責在服務端提交當前請求

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

public void submitRequest(Request si) {

        if (firstProcessor == null) { //processor處理器,request過來之後會經歷一系列處理器的處理過程

            synchronized (this) {

                try {

                    // Since all requests are passed to the request

                    // processor it should wait for setting up the request

                    // processor chain. The state will be updated to RUNNING

                    // after the setup.

                    while (state == State.INITIAL) {

                        wait(1000);

                    }

                catch (InterruptedException e) {

                    LOG.warn("Unexpected interruption", e);

                }

                if (firstProcessor == null || state != State.RUNNING) {

                    throw new RuntimeException("Not started");

                }

            }

        }

        try {

            touch(si.cnxn);

            boolean validpacket = Request.isValid(si.type); //判斷是否合法

            if (validpacket) {

                firstProcessor.processRequest(si);  調用firstProcessor發起請求,而這個firstProcess是一個接口,有多個實現類,具體的調用鏈是怎麼樣的?往下看吧

                if (si.cnxn != null) {

                    incInProcess();

                }

            else {

                LOG.warn("Received packet at server of unknown type " + si.type);

                new UnimplementedRequestProcessor().processRequest(si);

            }

        catch (MissingSessionException e) {

            if (LOG.isDebugEnabled()) {

                LOG.debug("Dropping request: " + e.getMessage());

            }

        catch (RequestProcessorException e) {

            LOG.error("Unable to process request:" + e.getMessage(), e);

        }

    }

  

firstProcessor的請求鏈組成

1.firstProcessor的初始化是在ZookeeperServer的setupRequestProcessor中完成的,代碼以下

1

2

3

4

5

6

7

protected void setupRequestProcessors() {

     RequestProcessor finalProcessor = new FinalRequestProcessor(this);

     RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);

     ((SyncRequestProcessor)syncProcessor).start();

     firstProcessor = new PrepRequestProcessor(this, syncProcessor);//須要注意的是,PrepRequestProcessor中傳遞的是一個syncProcessor

     ((PrepRequestProcessor)firstProcessor).start();

 }

  

從上面咱們能夠看到firstProcessor的實例是一個PrepRequestProcessor,而這個構造方法中又傳遞了一個Processor構成了一個調用鏈。

RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);

而syncProcessor的構造方法傳遞的又是一個Processor,對應的是FinalRequestProcessor

2.因此整個調用鏈是PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor

PredRequestProcessor.processRequest(si);

經過上面瞭解到調用鏈關係之後,咱們繼續再看firstProcessor.processRequest(si); 會調用到PrepRequestProcessor

 

1

2

3

public void processRequest(Request request) {

    submittedRequests.add(request);

}

  

唉,很奇怪,processRequest只是把request添加到submittedRequests中,根據前面的經驗,很天然的想到這裏又是一個異步操做。而subittedRequests又是一個阻塞隊列

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

而PrepRequestProcessor這個類又繼承了線程類,所以咱們直接找到當前類中的run方法以下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

public void run() {

        try {

            while (true) {

                Request request = submittedRequests.take(); //ok,從隊列中拿到請求進行處理

                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;

                if (request.type == OpCode.ping) {

                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;

                }

                if (LOG.isTraceEnabled()) {

                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");

                }

                if (Request.requestOfDeath == request) {

                    break;

                }

                pRequest(request); //調用pRequest進行預處理

            }

        catch (RequestProcessorException e) {

            if (e.getCause() instanceof XidRolloverException) {

                LOG.info(e.getCause().getMessage());

            }

            handleException(this.getName(), e);

        catch (Exception e) {

            handleException(this.getName(), e);

        }

        LOG.info("PrepRequestProcessor exited loop!");

    }

  

pRequest

預處理這塊的代碼太長,就很差貼了。前面的N行代碼都是根據當前的OP類型進行判斷和作相應的處理,在這個方法中的最後一行中,咱們會看到以下代碼

nextProcessor.processRequest(request);

1

nextProcessor.processRequest(request);

很顯然,nextProcessor對應的應該是SyncRequestProcessor

SyncRequestProcessor. processRequest

1

2

3

4

public void processRequest(Request request) {

    // request.addRQRec(">sync");

    queuedRequests.add(request);

}

這個方法的代碼也是同樣,基於異步化的操做,把請求添加到queuedRequets中,那麼咱們繼續在當前類找到run方法

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

public void run() {

        try {

            int logCount = 0;

 

            // we do this in an attempt to ensure that not all of the servers

            // in the ensemble take a snapshot at the same time

            int randRoll = r.nextInt(snapCount/2);

            while (true) {

                Request si = null;

                //從阻塞隊列中獲取請求

                if (toFlush.isEmpty()) {

                    si = queuedRequests.take(); 

                else {

                    si = queuedRequests.poll();

                    if (si == null) {

                        flush(toFlush);

                        continue;

                    }

                }

                if (si == requestOfDeath) {

                    break;

                }

                if (si != null) {

                    // track the number of records written to the log

                    //下面這塊代碼,粗略看來是觸發快照操做,啓動一個處理快照的線程

                    if (zks.getZKDatabase().append(si)) {

                        logCount++;

                        if (logCount > (snapCount / 2 + randRoll)) {

                            randRoll = r.nextInt(snapCount/2);

                            // roll the log

                            zks.getZKDatabase().rollLog();

                            // take a snapshot

                            if (snapInProcess != null && snapInProcess.isAlive()) {

                                LOG.warn("Too busy to snap, skipping");

                            else {

                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {

                                        public void run() {

                                            try {

                                                zks.takeSnapshot();

                                            catch(Exception e) {

                                                LOG.warn("Unexpected exception", e);

                                            }

                                        }

                                    };

                                snapInProcess.start();

                            }

                            logCount = 0;

                        }

                    else if (toFlush.isEmpty()) {

                        // optimization for read heavy workloads

                        // iff this is a read, and there are no pending

                        // flushes (writes), then just pass this to the next

                        // processor

                        if (nextProcessor != null) {

                            nextProcessor.processRequest(si); //繼續調用下一個處理器來處理請求

                            if (nextProcessor instanceof Flushable) {

                                ((Flushable)nextProcessor).flush();

                            }

                        }

                        continue;

                    }

                    toFlush.add(si);

                    if (toFlush.size() > 1000) {

                        flush(toFlush);

                    }

                }

            }

        catch (Throwable t) {

            handleException(this.getName(), t);

        finally{

            running = false;

        }

        LOG.info("SyncRequestProcessor exited!");

    }

FinalRequestProcessor. processRequest

這個方法就是咱們在課堂上分析到的方法了,FinalRequestProcessor.processRequest方法並根據Request對象中的操做更新內存中Session信息或者znode數據。

這塊代碼有小300多行,就不所有貼出來了,咱們直接定位到關鍵代碼,根據客戶端的OP類型找到以下的代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

            case OpCode.exists: {

                lastOp = "EXIS";

                // TODO we need to figure out the security requirement for this!

                ExistsRequest existsRequest = new ExistsRequest();

                //反序列化  (將ByteBuffer反序列化成爲ExitsRequest.這個就是咱們在客戶端發起請求的時候傳遞過來的Request對象

                ByteBufferInputStream.byteBuffer2Record(request.request,

                        existsRequest);

                String path = existsRequest.getPath(); //獲得請求的路徑

                if (path.indexOf('\0') != -1) {

                    throw new KeeperException.BadArgumentsException();

                }

                //終於找到一個很關鍵的代碼,判斷請求的getWatch是否存在,若是存在,則傳遞cnxn(servercnxn)

//對於exists請求,須要監聽data變化事件,添加watcher 

                Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);

                rsp = new ExistsResponse(stat); //在服務端內存數據庫中根據路徑獲得結果進行組裝,設置爲ExistsResponse

                break;

            }

  

statNode這個方法作了什麼?

1

2

3

public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {

    return dataTree.statNode(path, serverCnxn);

}

一路向下,在下面這個方法中,講ServerCnxn向上轉型爲Watcher了。 由於ServerCnxn實現了Watcher接口

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public Stat statNode(String path, Watcher watcher)

            throws KeeperException.NoNodeException {

        Stat stat = new Stat();

        DataNode n = nodes.get(path); //得到節點數據

        if (watcher != null) { //若是watcher不爲空,則講當前的watcher和path進行綁定

            dataWatches.addWatch(path, watcher);

        }

        if (n == null) {

            throw new KeeperException.NoNodeException();

        }

        synchronized (n) {

            n.copyStat(stat);

            return stat;

        }

    }

WatchManager.addWatch(path, watcher);

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

synchronized void addWatch(String path, Watcher watcher) {

        HashSet<Watcher> list = watchTable.get(path);  //判斷watcherTable中是否存在當前路徑對應的watcher

        if (list == null) { //不存在則主動添加

            // don't waste memory if there are few watches on a node

            // rehash when the 4th entry is added, doubling size thereafter

            // seems like a good compromise

            list = new HashSet<Watcher>(4); // 新生成watcher集合

            watchTable.put(path, list);

        }

        list.add(watcher); //添加到watcher表

 

        HashSet<String> paths = watch2Paths.get(watcher);

        if (paths == null) {

            // cnxns typically have many watches, so use default cap here

            paths = new HashSet<String>();

            watch2Paths.put(watcher, paths); // 設置watcher到節點路徑的映射

        }

        paths.add(path);  // 將路徑添加至paths集合

    }

  

其大體流程以下

①經過傳入的路徑(節點路徑)從watchTable獲取相應的觀察者集合,進入②

② 判斷①中的觀察者是否爲空,若爲空,則進入③,不然,進入④

③ 新生成觀察者集合,並將路徑路徑和此集合添加至watchTable中,進入

④將傳輸的觀察者添加至觀察者集合,即完成了路徑和觀察者添加至watchTable的步驟,進入

⑤經過傳入的觀察者從watch2Paths中獲取相應的路徑集合,進入⑥

⑥ 判斷路徑集合是否爲空,若爲空,則進入⑦,不然,進入⑧

⑦ 新生成路徑集合,並將觀察者和路徑添加至watch2Paths中,進入

⑧將傳入的路徑(節點路徑)添加至路徑集合,即完成了路徑和觀察者添加至watch2Paths步驟的

總結
調用關係鏈以下

圖文裏的技術如何學習,有沒有免費資料?

對Java技術,架構技術感興趣的同窗,歡迎加QQ羣619881427,一塊兒學習,相互討論。

羣內已經有小夥伴將知識體系整理好(源碼,筆記,PPT,學習視頻),歡迎加羣免費領取。

分享給喜歡Java的,喜歡編程,有夢想成爲架構師的程序員們,但願可以幫助到大家。

不是Java的程序員也不要緊,幫忙轉發給更多朋友!謝謝。

一個分享小技巧點擊閱讀原文也。能夠輕鬆到電子雜誌學習資料哦!

相關文章
相關標籤/搜索