async producer是將producer.type設爲async時啓用的producerjava
此時,調用send方法的線程和實際完成消息發送的線程是分開的。app
當調用java API中producer的send方法時,最終會調用kafka.producer.Producer的send方法。在kafka.producer.Producer類中,會根據producer.type配置使用不一樣的方法發送消息。async
def send(messages: KeyedMessage[K,V]*) { lock synchronized { if (hasShutdown.get) throw new ProducerClosedException recordStats(messages) sync match { case true => eventHandler.handle(messages) case false => asyncSend(messages) } } }
當async時,會使用asyncSend。asyncSend方法會根據「queue.enqueue.timeout.ms」配置選項採用BlockingQueue的put或offer方法把消息放入kafka.producer.Producer持有的一個LinkedBlockingQueue。一個ProducerSendThread線程從queue裏取消息,成批量的用eventHandler來處理。ide
當使用sync時,對每條消息會直接使用eventHandler來處理。這就是爲何前一種方式會被稱爲"asynchornization",而這一種會稱爲」synchronization"函數
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
在kafka.producer.Producer構造時,會檢查"producer.type「,若是是asnyc,就會開啓一個送發線程。spa
config.producerType match { case "sync" => case "async" => sync = false producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, eventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId) producerSendThread.start()
如今有了一個隊列,一個發送線程 。看來這個ProducerSendThread是來完成大部分發送的工做,而"async"的特性都主要都是由它來實現。線程
這個線程的run方法實現爲:scala
override def run { try { processEvents }catch { case e: Throwable => error("Error in sending events: ", e) }finally { shutdownLatch.countDown } }
看來實際工做由processEvents方法來實現嘍debug
private def processEvents() { var lastSend = SystemTime.milliseconds //上一次發送的時間,每發送一次會更新 var events = new ArrayBuffer[KeyedMessage[K,V]] //一塊兒發送的消息的集合,發送完後也會更新 var full: Boolean = false //是否消息的數量已大於指定的batch大小(batch大小指多少消息在一塊兒發送,由"batch.num.messages"肯定) // drain the queue until you get a shutdown command //構造一個流,它的每一個元素爲queue.poll(timeout)取出來的值。 //timeout的值是這麼計算的:lastSend+queueTime表示下次發送的時間,再減去當前時間,就是最多還能等多長時間,也就是poll阻塞的最長時間 //takeWhile接受的函數參數決定了當item是shutdownCommand時,流就結束了。這個shutdownCommand是shutdown()方法執行時,往隊列裏發的一個特殊消息 Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)) .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach { currentQueueItem => //對每一條處理的消息 val elapsed = (SystemTime.milliseconds - lastSend) //距上次發送已逝去的時間,只記錄在debug裏,並不會以它做爲是否發送的條件 // check if the queue time is reached. This happens when the poll method above returns after a timeout and // returns a null object val expired = currentQueueItem == null //當poll方法超時,就返回一個null,說明必定已是時候發送這批消息了。當時間到了,poll(timeout)中timeout爲負值時,poll必定返回null if(currentQueueItem != null) { trace("Dequeued item for topic %s, partition key: %s, data: %s" .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message)) events += currentQueueItem //若是當前消息不爲空,就附加在發送集合裏 } // check if the batch size is reached full = events.size >= batchSize //是否當前發送集合的大小已經大於batch size if(full || expired) { //若是發送集合有了足夠多的消息或者按時間計能夠發送了,就發送 if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") if(full) debug("Batch full. Sending..") // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds //更新lastSend,將一個新的ArrayBuffer的引用賦給events events = new ArrayBuffer[KeyedMessage[K,V]] } } // send the last batch of events tryToHandle(events) //當shutdownCommand遇到時,流會終結。此時以前的消息只要不是剛好發送完,就還會有一些在events裏,作爲最後一批發送。 if(queue.size > 0) //些時producerSendThread已經再也不發消息了,可是queue裏若還有沒發完的,就是一種異常狀況 throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" .format(queue.size)) }
看來Scala的Stream幫了很多忙。shutdown方法將一個特殊的shutdownCommand發給queue,也正好使得這個Stream能夠用takeWhile方法正確結束。orm
好吧,搞了這麼多,這個ProducerSendThread只有打包的邏輯 ,並無處理topic、partition、壓縮的邏輯,這些邏輯都在另外一個類中。明天再來看看這個handler