Scala調用Kafka的生產者和消費者Demo,以及一些配置參數整理

kafka簡介

Kafka是apache開源的一款用Scala編寫的消息隊列中間件,具備高吞吐量,低延時等特性。java

Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)稱爲broker。算法

不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。apache

kafka主要的組件介紹

Producer:消息生產者,就是向kafka broker發消息的客戶端。bootstrap

Consumer:消息消費者,向kafka broker取消息的客戶端緩存

Topic :咱們能夠理解爲一個隊列,消息根據Topic進行歸類。服務器

Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠對應多個CG。topic的消息會複製(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic。網絡

Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。session

kafka文件存儲機制

  1. topic中partition存儲分佈併發

  2. partiton中文件存儲方式app

  3. partiton中segment文件存儲結構

  4. 在partition中經過offset查找message

代碼模擬生產者消費者案例

pom.xml添加依賴

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

生產者:

package com.linys.scala.KAFKA_producer
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

/**
  * 實現producer
  */
object KafkaProducerDemo {
  def main(args: Array[String]): Unit = {
    val prop = new Properties
    // 指定請求的kafka集羣列表
    prop.put("bootstrap.servers", "essum:9092")// 指定響應方式
    //prop.put("acks", "0")
    prop.put("acks", "all")
    // 請求失敗重試次數
    //prop.put("retries", "3")
    // 指定key的序列化方式, key是用於存放數據對應的offset
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 指定value的序列化方式
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 配置超時時間
    prop.put("request.timeout.ms", "60000")
    //prop.put("batch.size", "16384")
    //prop.put("linger.ms", "1")
    //prop.put("buffer.memory", "33554432")

    // 獲得生產者的實例
    val producer = new KafkaProducer[String, String](prop)

    // 模擬一些數據併發送給kafka
    for (i <- 1 to 100) {
      val msg = s"${i}: this is a linys ${i} kafka data"
      println("send -->" + msg)
      // 獲得返回值
      val rmd: RecordMetadata = producer.send(new ProducerRecord[String, String]("linys", msg)).get()
      println(rmd.toString)
      Thread.sleep(500)
    }

    producer.close()
  }
}

消費者:

package com.linys.scala.KAFKA_consumer
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
/**
  * 實現consumer
  */
object KafkaConsumerDemo {
  def main(args: Array[String]): Unit = {
    // 配置信息
    val prop = new Properties
    prop.put("bootstrap.servers", "essum:9092")
    // 指定消費者組
    prop.put("group.id", "group01")
    // 指定消費位置: earliest/latest/none
    prop.put("auto.offset.reset", "earliest")
    // 指定消費的key的反序列化方式
    prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    // 指定消費的value的反序列化方式
    prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put("enable.auto.commit", "true")
    prop.put("session.timeout.ms", "30000")
    // 獲得Consumer實例
    val kafkaConsumer = new KafkaConsumer[String, String](prop)
    // 首先須要訂閱topic
    kafkaConsumer.subscribe(Collections.singletonList("linys"))
    // 開始消費數據
    while (true) {
      // 若是Kafak中沒有消息,會隔timeout這個值讀一次。好比上面代碼設置了2秒,也是就2秒後會查一次。
      // 若是Kafka中還有消息沒有消費的話,會立刻去讀,而不須要等待。
      val msgs: ConsumerRecords[String, String] = kafkaConsumer.poll(2000)
      // println(msgs.count())
      val it = msgs.iterator()
      while (it.hasNext) {
        val msg = it.next()
        println(s"partition: ${msg.partition()}, offset: ${msg.offset()}, key: ${msg.key()}, value: ${msg.value()}")
      }
    }
  }
}

執行輸出:

 

 

 

 

 

生產者配置參數解釋:

bootstrap.servers: kafka集羣broker的地址
key.serializer:關鍵字的序列化方式
value.serializer:消息值的序列化方式
acks:指定必需要有多少個分區的副本接收到該消息,服務端纔會向生產者發送響應,可選值爲:0,1,2,…,all,若是設置爲0,producter就只管發出無論kafka server有沒有確認收到。設置all則表示kafka全部的分區副本所有確認接收到才返回。
buffer.memory:生產者的內存緩衝區大小。若是生產者發送消息的速度 > 消息發送到kafka的速度,那麼消息就會在緩衝區堆積,致使緩衝區不足。這個時候,send()方法要麼阻塞,要麼拋出異常。
max.block.ms:表示send()方法在拋出異常以前能夠阻塞多久的時間,默認是60s
compression.type:消息在發往kafka以前能夠進行壓縮處理,以此來下降存儲開銷和網絡帶寬。默認值是null,可選的壓縮算法有snappy、gzip和lz4
retries:生產者向kafka發送消息可能會發生錯誤,有的是臨時性的錯誤,好比網絡忽然阻塞了一下子,有的不是臨時的錯誤,好比「消息太大了」,對於出現的臨時錯誤,能夠經過重試機制來從新發送
retry.backoff.ms:每次重試之間間隔的時間,第一次失敗了,那麼休息一會再重試,休息多久,能夠經過這個參數來調節
batch.size:生產者在發送消息時,能夠將即將發往同一個分區的消息放在一個批次裏,而後將這個批次總體進行發送,這樣能夠節約網絡帶寬,提高性能。該參數就是用來規約一個批次的大小的。可是生產者並非說要等到一個批次裝滿以後,纔會發送,不是這樣的,有時候半滿,甚至只有一個消息的時候,也可能會發送,具體怎麼選擇,咱們不知道,可是不是說非要等裝滿才發。所以,若是把該參數調的比較大的話,是不會形成消息發送延遲的,可是會佔用比較大的內存。可是若是設置的過小,會形成消息發送次數增長,會有額外的IO開銷
linger.ms:生產者在發送一個批次以前,能夠適當的等一小會,這樣可讓更多的消息加入到該批次。這樣會形成延時增長,可是下降了IO開銷,增長了吞吐量
client.id:服務器用來標誌消息的來源,是一個任意的字符串
max.in.flight.requests.per.connection:一個消息發送給kafka集羣,在收到服務端的響應以前的這段時間裏,生產者還能夠發n-1個消息。這個參數配置retries,能夠保證消息的順序,後面會介紹
request.timeout.ms:生產者在發送消息以後,到收到服務端響應時,等待的時間限制
max.request.size:生產者發送消息的大小。能夠是一個消息的大小,也能夠發送的一個批次的消息大小
receive.buffer.bytes和send.buffer.bytes:tcp socket接收和發送消息的緩衝區大小,其實指的就是ByteBuffer的大小

消費者配置參數解釋:

groupid:一個字符串用來指示一組consumer所在的組羣。實現同一個topic可由不一樣的組羣消費

auto.offset.reset:可選三個參數

earliest ---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none---topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

socket.timeout.ms:默認值:3000,socket超時時間。

socket.buffersize: 默認值:64*1024,socket receive buffer。

fetch.size: 默認值:300 * 1024,控制在一個請求中獲取的消息的字節數。 這個參數在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代。

backoff.increment.ms:默認值:1000,這個參數避免在沒有新數據的狀況下重複頻繁的拉數據。 若是拉到空數據,則多推後這個時間。

queued.max.message.chunks:默認值:2,consumer內部緩存拉回來的消息到一個隊列中。 這個值控制這個隊列的大小。

auto.commit.enable:默認值:true,若是true,consumer按期地往zookeeper寫入每一個分區的offset。

auto.commit.interval.ms:默認值:10000,往zookeeper上寫offset的頻率。

auto.offset.reset:默認值:largest,若是offset出了返回,則 smallest: 自動設置reset到最小的offset. largest : 自動設置offset到最大的offset. 其它值不容許,會拋出異常。

consumer.timeout.ms:默認值:-1,默認-1,consumer在沒有新消息時無限期的block。若是設置一個正值, 一個超時異常會拋出。

rebalance.retries.max:默認值:4,rebalance時的最大嘗試次數。

max.poll.interval.ms:拉取的最大時間間隔,若是你一次拉取的比較多,建議加大這個值,長時間沒有調用poll,且間隔超過這個值時,就會認爲這個consumer失敗了

max.poll.records:默認值:500,Consumer每次調用poll()時取到的records的最大數。

相關文章
相關標籤/搜索