KafkaClient接口與Kafka處理請求的若干特性

(依據於0.10.0.0版本)java

這個接口的惟一實現類就是NetworkClient,它被用於實現Kafka的consumer和producer. 這個接口實際上抽象出來了Kafka client與網絡交互的方式。node

爲了對它的API有清楚的認識,先要了解下Kafka protocol所要求的client和broker對於網絡請求的處理規則。react

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocolapache

The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.緩存

 這一段的信息量挺大的。網絡

順序性

首先,broker按照請求被髮送的順序處理請求,而且按照一樣的順序發送響應。由於Kafka對消息的順序性有以下的保證:session

  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
  • A consumer instance sees messages in the order they are stored in the log.

爲了實現這種順序性保證,最簡單可靠的行爲就是"The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. ", 也就是說對於一個TCP鏈接,broker的請求處理鏈條中只會有一個正在處理的(in-flight)消息.app

那麼,Kafka在broker端需不須要緩存待處理的消息呢?異步

首先,若是緩存請求的話,可能會佔用大量內存.其次,若是緩存請求的話,在請求處理出錯時,會使得Kafka client難以控制消息的順序,由於本質上,這種緩存使得client的請求是異步處理的.而若是不進行緩存,那麼broker的行爲對於client而言更容易理解.socket

因此,broker是不會在本地緩存請求的.當它從某個鏈接讀取一個請求以後,就會中止從這個鏈接繼續讀取請求.也就是說對於每一個TCP鏈接,broker的處理流程是:接收一個請求 -> 處理請求 -> 發送響應 -> 接收下一個請求 -> ...

具體的作法,能夠在kafka.network.Processor(也就是reactive模型裏的subRactor) 找到,在其run方法中,對於已經完整讀取的request和發送完畢的response, 有如下的處理

        selector.completedReceives.asScala.foreach { receive =>
          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req) //把請求送入requestChannel,之後request handler會從中取出request來處理
            selector.mute(receive.source) //中止從這個request的來源(並不僅用host來區分)讀取消息
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              close(selector, receive.source)
          }
        }
        selector.completedSends.asScala.foreach { send =>
          val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
          }
          resp.request.updateRequestMetrics()
          selector.unmute(send.destination) //將已發送完畢的response的源設爲可讀的
        }

可見,對於正在處理的請求,broker不會從它的來源再讀取新的消息,直至請求被處理完畢,而且其響應被髮送完畢。

預抓取

另外一方面,對於client,若是它接收到上一個請求的響應以後,纔開始生成新的請求,而後再發送新請求,那麼在等待響應的過程當中,client就處理等待狀態,這樣挺沒效率.所以,"clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer",也就是說client能夠在等待響應的過程當中繼續發送請求,由於即便broker不去經過網絡讀這些請求,這些請求也會被緩存在OS的socket buffer中,所以,當broker處理完以前的請求,就能夠當即讀出來新的請求.不過,若是client這麼作的話,會使得它的行爲更復雜(由於涉及到出錯時的順序性).

對於consumer,在接收到響應以前難以肯定下一次fetch開始的offset,所以在收到前一個fetch respones以後才發送下一次fetch request是比較穩妥的作法.不過若是能夠比較準確判斷fetch響應包含消息的數目,比而提早發出fetch request,的確有可能會提交consumer的性能.

並且,"收到fetch respone"和"用戶處理完fetch到的消息"這兩個時間點仍是有所不一樣的,在收到fetch response以後,把抓取到的消息交給用戶處理以前,發出下一個fetch request,這樣能夠提升consumer抓取的效率.新的consumer-KafkaConsumer的確是這麼作的.這是KafkaConsumer的poll方法裏的一段代碼(用戶經過執行這個poll方法來獲取消息)

 do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE that we use quickPoll() in this case which disables wakeups and delayed
                    // task execution since the consumed positions has already been updated and we
                    // must return these records to users to process before being interrupted or
                    // auto-committing offsets
                    fetcher.sendFetches(metadata.fetch());
                    client.quickPoll();
                    return this.interceptors == null
                        ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

中間的那一大段就是在說這個事情,可是它考慮的狀況比剛纔提到的要複雜一些.

首先,若是pollOnce獲得的records不爲空,就要把這些records返回給用戶,因此在此以前要先發送一批fetch rquest(利用Fetcher#sendFetches).若是爲空的話,在do-while循環裏的pollOnce會發送新的fetch request. 

其次,因爲Fetcher的sendFetches並不會執行網絡IO操做,而只是生成而且緩存fetch request,因此還須要利用ConsumerNetworkClient的quickPoll方法來執行一次IO操做把這些fetch request發出去.可是因爲此時用戶尚未獲得此次pollOnce返回的records, 所以不能進行auto-commit操做,不然就會把還沒返回給用戶的records給commit了,而且也不能使得處理的過程被別的線程中斷,由於這樣用戶也拿不到這些records了.因此,這裏調用quickPoll,quickPoll會禁止wakeUp,而且不執行DelayedTasks(由於AutoCommitTask就是經過DelayedTask機制執行的).

 


對Kafka內部隊列選擇的影響

Kafka的broker是一個典型的Reactor模型的socket server。其中Processor相關於sub reactor,而HandlerPool至關於worker pool. Processor和Handler 都有各自的線程,它們之間經過一些隊列來傳遞請求和響應。Kafka把這些隊列封裝成了RequestChannel。

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
 for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
... }

Kafka對於一個鏈接一次只處理一個請求的特性,決定了這裏的兩種隊列的類型。其中,存放請求的隊列用的是ArrayBlockingQueue,隊列大小爲queuSize,而存放響應的隊列用的是LinkedBlockingQueue,它的capcity是Integer.MAX_VALUE。

有界隊列 VS 無界隊列

存放請求的隊列必須用有界的阻塞隊列,不然可能會有太多的請求撐爆內存。而使用有界隊列,事實上能夠阻塞Processor線程,使得在請求隊列滿的狀況下,Broker拒絕新的請求。

可是響應隊列選用無界的隊列,其緣由倒是很隱晦的。

總的待發送響應的個數因爲請求隊列的限制,一般不會太大。但這也不意味着這種選擇不會出問題,由於在最差狀況下,可能會有至關於總的鏈接數的待發送響應。想象一種狀況,假設有很是多的consumer(好比1W個)發送fetch請求,每一個請求抓取1M的數據,但這些consumer都不從socket中讀取響應,那麼會有什麼狀況發生呢?不是會把內存爆掉嗎?事實上,因爲Kafka在發送響應時的zero copy特性,使得FetchRepsonse自己不會佔用太大內存,因此即便有很是多的待發送響應,但響應對象所佔的大小跟要傳送的數據比,仍是一般要小不少(取決於fetch請求的fetch size)。其它的響應,實際上也不會特別大,對於一個大集羣,佔用內存最大的也就是Metadata相關的響應了。

可是另外一方面,若是這個隊列用有界的,那麼當全部Handler都阻塞於往這些隊列put元素,而全部Processor都阻塞於往RequestQueue裏put元素,那麼整個server就死鎖了。因此Kafka仍是用了無界的隊列。

非阻塞隊列

另外一個有趣的隊列就是Processor和Acceptor之間存放新創建的鏈接的隊列了。

private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

這裏用了ConcurrentLinkedQueue,由於新鏈接的處理和消息的發送/接收是在同一個循環中的,因此存放消息的隊列是非阻塞的更合適一些。


 

API

KafkaClient,是producer和consumer與broker通訊的接口,它的設計就創建在上邊的協議的基礎上。這個類包括了與鏈接狀態和請求-響應狀態有關的方法。producer和consumer實際使用的它的實現類是NetworkClient。如下方法的做用結合了KafkaClient和NetworkClient的註釋,但以NetworkClient的實現爲標準。

 

public boolean isReady(Node node, long now) 查看某個結點是否準備好發送新請求了。因爲是給client用的,所以這裏的「node"就是broker

 

public boolean ready(Node node, long now)是到指定node的鏈接已經被建立好而且能夠發送請求。若是鏈接沒有建立,就建立到這個node的鏈接。

 

public long connectionDelay(Node, long now) 基於鏈接狀態,返回須要等待的時間。鏈接的狀態有三種:disconnected, connecting, connected.  若是是disconnected狀態,就返回reconnect的backoff time。當connecting或者connected,就返回Long.MAX_VALUE,由於此時須要等待別的事件發生(好比鏈接成功,或者收到響應)

 

public long connectionFailed(Node node)  查看到這個node的鏈接是否失敗。

 

public void send(ClientRequest request, long now) 把這個request放入發送隊列。若是request是要發給尚未鏈接好的node的,那麼就會拋出IllegalStateException異常, 這是一個運行時異常。

 

public List<ClientResponse> poll(long timeout, long now) 對於socket進行讀寫操做。

 

public void close(String nodeId) 關閉到指定node的鏈接

 

public Node leastLoadedNode(long now) 選擇有最少的未發送請求的node,要求這些node至少是能夠鏈接的。這個方法會優先選擇有可用的鏈接的節點,可是若是全部的已鏈接的節點都在使用,它就會選擇尚未創建鏈接的節點。這個方法絕對不會選擇憶經斷開鏈接的節點或者正在reconnect backoff階段的鏈接。

 

public int inFlightRequestCount() 全部已發送但還沒收到響應的請求的總數

public int inFlightRequestCount(String nodeId) 對於某個特定node的in-flight request總數

 

public RequestHandler nextRequestHanlder(ApiKeys key) 爲某種請求構造它的請求頭。按照Kafka Protoocl, request包括如下部分:

RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
   ApiKey => int16
   ApiVersion => int16
   CorrelationId => int32
   ClientId => string
   RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

而這個方法構造了ApiKey, ApiVersion, CoorelationId和ClientId,做爲請求的頭部,request handler在源碼裏有對應類org.apache.kafka.common.requests.RequestHandler。

ApiKey表示請求的種類, 如produce request, fetch request, metadata request等。

puclic RequestHandler nextRequestHandler(ApiKey key, short version)  構造請求的頭部,使用特定版本號。

public void wakeup() 若是這個client正在IO阻塞狀態,就喚醒它。


總結

Kafka protocol的一些細節,在Kafka client的接口設計中獲得了體現.而且,有一些小細節是挺有意思的.

下面會看一下NetworkClient,它是KafkaClient接口的實現.

相關文章
相關標籤/搜索