Kafka是apache開源的一款用Scala編寫的消息隊列中間件,具備高吞吐量,低延時等特性。java
Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)稱爲broker。算法
不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。apache
Producer:消息生產者,就是向kafka broker發消息的客戶端。bootstrap
Consumer:消息消費者,向kafka broker取消息的客戶端緩存
Topic :咱們能夠理解爲一個隊列,消息根據Topic進行歸類。服務器
Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠對應多個CG。topic的消息會複製(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic。網絡
Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。session
併發
partiton中文件存儲方式app
partiton中segment文件存儲結構
在partition中經過offset查找message
pom.xml添加依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
生產者:
package com.linys.scala.KAFKA_producer import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} /** * 實現producer */ object KafkaProducerDemo { def main(args: Array[String]): Unit = { val prop = new Properties // 指定請求的kafka集羣列表 prop.put("bootstrap.servers", "essum:9092")// 指定響應方式 //prop.put("acks", "0") prop.put("acks", "all") // 請求失敗重試次數 //prop.put("retries", "3") // 指定key的序列化方式, key是用於存放數據對應的offset prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 指定value的序列化方式 prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 配置超時時間 prop.put("request.timeout.ms", "60000") //prop.put("batch.size", "16384") //prop.put("linger.ms", "1") //prop.put("buffer.memory", "33554432") // 獲得生產者的實例 val producer = new KafkaProducer[String, String](prop) // 模擬一些數據併發送給kafka for (i <- 1 to 100) { val msg = s"${i}: this is a linys ${i} kafka data" println("send -->" + msg) // 獲得返回值 val rmd: RecordMetadata = producer.send(new ProducerRecord[String, String]("linys", msg)).get() println(rmd.toString) Thread.sleep(500) } producer.close() } }
消費者:
package com.linys.scala.KAFKA_consumer import java.util.{Collections, Properties} import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer} /** * 實現consumer */ object KafkaConsumerDemo { def main(args: Array[String]): Unit = { // 配置信息 val prop = new Properties prop.put("bootstrap.servers", "essum:9092") // 指定消費者組 prop.put("group.id", "group01") // 指定消費位置: earliest/latest/none prop.put("auto.offset.reset", "earliest") // 指定消費的key的反序列化方式 prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // 指定消費的value的反序列化方式 prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") prop.put("enable.auto.commit", "true") prop.put("session.timeout.ms", "30000") // 獲得Consumer實例 val kafkaConsumer = new KafkaConsumer[String, String](prop) // 首先須要訂閱topic kafkaConsumer.subscribe(Collections.singletonList("linys")) // 開始消費數據 while (true) { // 若是Kafak中沒有消息,會隔timeout這個值讀一次。好比上面代碼設置了2秒,也是就2秒後會查一次。 // 若是Kafka中還有消息沒有消費的話,會立刻去讀,而不須要等待。 val msgs: ConsumerRecords[String, String] = kafkaConsumer.poll(2000) // println(msgs.count()) val it = msgs.iterator() while (it.hasNext) { val msg = it.next() println(s"partition: ${msg.partition()}, offset: ${msg.offset()}, key: ${msg.key()}, value: ${msg.value()}") } } } }
執行輸出:
生產者配置參數解釋:
bootstrap.servers: kafka集羣broker的地址
key.serializer:關鍵字的序列化方式
value.serializer:消息值的序列化方式
acks:指定必需要有多少個分區的副本接收到該消息,服務端纔會向生產者發送響應,可選值爲:0,1,2,…,all,若是設置爲0,producter就只管發出無論kafka server有沒有確認收到。設置all則表示kafka全部的分區副本所有確認接收到才返回。
buffer.memory:生產者的內存緩衝區大小。若是生產者發送消息的速度 > 消息發送到kafka的速度,那麼消息就會在緩衝區堆積,致使緩衝區不足。這個時候,send()方法要麼阻塞,要麼拋出異常。
max.block.ms:表示send()方法在拋出異常以前能夠阻塞多久的時間,默認是60s
compression.type:消息在發往kafka以前能夠進行壓縮處理,以此來下降存儲開銷和網絡帶寬。默認值是null,可選的壓縮算法有snappy、gzip和lz4
retries:生產者向kafka發送消息可能會發生錯誤,有的是臨時性的錯誤,好比網絡忽然阻塞了一下子,有的不是臨時的錯誤,好比「消息太大了」,對於出現的臨時錯誤,能夠經過重試機制來從新發送
retry.backoff.ms:每次重試之間間隔的時間,第一次失敗了,那麼休息一會再重試,休息多久,能夠經過這個參數來調節
batch.size:生產者在發送消息時,能夠將即將發往同一個分區的消息放在一個批次裏,而後將這個批次總體進行發送,這樣能夠節約網絡帶寬,提高性能。該參數就是用來規約一個批次的大小的。可是生產者並非說要等到一個批次裝滿以後,纔會發送,不是這樣的,有時候半滿,甚至只有一個消息的時候,也可能會發送,具體怎麼選擇,咱們不知道,可是不是說非要等裝滿才發。所以,若是把該參數調的比較大的話,是不會形成消息發送延遲的,可是會佔用比較大的內存。可是若是設置的過小,會形成消息發送次數增長,會有額外的IO開銷
linger.ms:生產者在發送一個批次以前,能夠適當的等一小會,這樣可讓更多的消息加入到該批次。這樣會形成延時增長,可是下降了IO開銷,增長了吞吐量
client.id:服務器用來標誌消息的來源,是一個任意的字符串
max.in.flight.requests.per.connection:一個消息發送給kafka集羣,在收到服務端的響應以前的這段時間裏,生產者還能夠發n-1個消息。這個參數配置retries,能夠保證消息的順序,後面會介紹
request.timeout.ms:生產者在發送消息以後,到收到服務端響應時,等待的時間限制
max.request.size:生產者發送消息的大小。能夠是一個消息的大小,也能夠發送的一個批次的消息大小
receive.buffer.bytes和send.buffer.bytes:tcp socket接收和發送消息的緩衝區大小,其實指的就是ByteBuffer的大小
消費者配置參數解釋:
groupid:一個字符串用來指示一組consumer所在的組羣。實現同一個topic可由不一樣的組羣消費
auto.offset.reset:可選三個參數
earliest ---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest---當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none---topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
socket.timeout.ms:默認值:3000,socket超時時間。
socket.buffersize: 默認值:64*1024,socket receive buffer。
fetch.size: 默認值:300 * 1024,控制在一個請求中獲取的消息的字節數。 這個參數在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代。
backoff.increment.ms:默認值:1000,這個參數避免在沒有新數據的狀況下重複頻繁的拉數據。 若是拉到空數據,則多推後這個時間。
queued.max.message.chunks:默認值:2,consumer內部緩存拉回來的消息到一個隊列中。 這個值控制這個隊列的大小。
auto.commit.enable:默認值:true,若是true,consumer按期地往zookeeper寫入每一個分區的offset。
auto.commit.interval.ms:默認值:10000,往zookeeper上寫offset的頻率。
auto.offset.reset:默認值:largest,若是offset出了返回,則 smallest: 自動設置reset到最小的offset. largest : 自動設置offset到最大的offset. 其它值不容許,會拋出異常。
consumer.timeout.ms:默認值:-1,默認-1,consumer在沒有新消息時無限期的block。若是設置一個正值, 一個超時異常會拋出。
rebalance.retries.max:默認值:4,rebalance時的最大嘗試次數。
max.poll.interval.ms:
拉取的最大時間間隔,若是你一次拉取的比較多,建議加大這個值,
長時間沒有調用poll,且間隔超過這個值時,就會認爲這個consumer失敗了
max.poll.records:
默認值:
500,
Consumer每次調用poll()時取到的records的最大數。