官方文檔:http://kafka.apache.org/23/documentation.html#introductionhtml
中文文檔:https://kafka.apachecn.org/java
Apache Kafka 是一個分佈式流處理平臺:distributed streaming platform
。算法
kafka主要應用於兩大類應用:shell
Topic:kafka將消息分類,每一類的消息都有一個主題topic。apache
Producer:生產者,發佈消息的對象。bootstrap
Consumer:消費者,訂閱消息的對象。segmentfault
Broker:代理,已發佈的消息保存在一組服務器中,稱之爲kafka集羣,集羣中每一個服務器都是一個代理(broker)。消費者能夠訂閱一個或多個主題,並從broker上拉取數據,從而消費這些已發佈的消息。windows
Partition:Topic物理上的分組,一個Topic能夠分爲多個partition,每一個partition都是一個順序的、不可變的消息隊列,且能夠持續添加。Partition中的每條消息都會被分配一個有序的序列號,稱爲偏移量(offset),所以每一個分區中偏移量都是惟一的。api
Consumer Group:每一個Consumer屬於一個特定的Consumer Group,這是kafka用來實現一個Topic消息的廣播【發送給全部的consumer的發佈訂閱式消息模型】和單播【發送給任意一個consumer隊列消息模型】的手段。一個topic能夠有多個consumer group。緩存
關於Consumer group的補充:通常來講,咱們能夠建立一些consumer group做爲邏輯上的訂閱者,每一個組中包含數目不等的consumer,一個組內的多個消費者能夠用來擴展性能和容錯。
關於partition分區的補充:
一、【負載均衡】處理更多的消息,不受單臺服務器的限制。
二、【順序保證】kafka不能保證並行的時候消息的有序性,可是能夠保證一個partition分區之中,消息只能由消費者組中的惟一一個消費者處理,以保證一個分區的消息前後順序。
以下圖:2個kafka集羣託管4個分區(p0-p3),2個消費者組,組A有2個消費者實例,組B有4個消費者實例。
關於偏移量的補充:kafka集羣將會保持全部的消息,直到他們過時,不管他們是否被消費。當消費者消費消息時,偏移量offset將會線性增長,可是消費者其實能夠控制實際的偏移量,能夠重置偏移量爲更早的位置,意爲着從新讀取消息,且不會影響其餘消費者對此log的處理。
Kafka的安裝配置啓動須要依賴於Zookeeper,Zookeeper的安裝配置能夠參考個人前一篇文章。
固然,其實你下載kafka以後,就自動已經集成了Zookeeper,你能夠經過修改配置,啓動內置的zookeeper。
關於使用內置的Zookeeper仍是本身安裝的Zookeeper的區別,能夠看看這篇文章:https://segmentfault.com/q/1010000021110446
下載地址:http://kafka.apache.org/downloads
下載二進制版本【Binary downloads】,下載完成以後,解壓到合適的目錄下。
筆者目錄爲:D:\dev\kafka_2.11-2.3.1
。
進入config
目錄下,找到server.properties
文件並修改以下:
log.dirs=D:\\dev\\kafka_2.11-2.3.1\\config\\kafka-logs zookeeper.connect=localhost:2182 # 默認端口是2181,這裏修改成2182
找到zookeeper.properties
文件,修改以下:
dataDir=D:\\softs\\zookeeper-3.4.13\\datas dataLogDir=D:\\softs\\zookeeper-3.4.13\\logs clientPort=2182
在bin目錄下存放着全部可使用的命令行指令,Linux和Windows的存放目錄須要注意:
D:\dev\kafka_2.11-2.3.1> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
D:\dev\kafka_2.11-2.3.1> .\bin\windows\kafka-server-start.bat .\config\server.properties
建立1個分區1個副本,topic爲test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test-topic Created topic test-topic.
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2182 test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning
若是kafka啓動時加載的配置文件中 server.properties 中沒有配置delete.topic.enable=true,則此刪除非真正刪除,而是僅僅將topic標記爲marked for deletion
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --delete --zookeeper localhost:2182 --topic test-topic Topic test-topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
D:\dev\kafka_2.11-2.3.1\bin\windows>zookeeper-shell.bat localhost:2182 Connecting to localhost:2182 Welcome to ZooKeeper! JLine support is disabled
ls /brokers [ids, topics, seqid] ls /brokers/topics [test, test-topic, __consumer_offsets] rmr /brokers/topics/test-topic # 物理刪除 test-topic ls /brokers/topics [test, __consumer_offsets]
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency>
public class ProducerExample { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); props.put("zk.connect", "localhost:2182"); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); String topic = "test"; for (int i = 1; i <= 100; i++) { // send方法是異步的 , 返回Future對象,若是調用get(),將阻塞,直到相關請求完成並返回消息的metadata或拋出異常 producer.send(new ProducerRecord<>(topic, "key" + i, "msg" + i * 100)); } // 生產者的傳衝空間池保留還沒有發送到服務器的消息,後臺I/O線程負責將這些消息轉換程請求發送到集羣 // 若是使用後不關閉生產者,將會丟失這些消息。 producer.close(); } }
zk.connect:設置zookeeper的地址。
bootstrap.servers:用於創建與 kafka 集羣鏈接的 host/port 組。
acks:判斷是否是成功發送,指定all
將會阻塞消息,這種設置性能最低,可是是最可靠的。
retries:若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性。
batch.size:(生產者)緩存每一個分區未發送的消息。緩存的大小是經過 batch.size
配置指定的。值較大的話將會產生更大的批。並須要更多的內存(由於每一個「活躍」的分區都有1個緩衝區)。
linger.ms:默認緩衝可當即發送,即使緩衝空間尚未滿,可是,若是你想減小請求的數量,能夠設置linger.ms大於0。這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區,這個設置將增長1毫秒的延遲請求以等待更多的消息。須要注意的是,在高負載下,相近的時間通常也會組成批,即便是 linger.ms=0
。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。
buffer.memory:控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms
設定,以後它將拋出一個TimeoutException。
key.serializer:用於序列化。
value.serializer:用於序列化。
public class ConsumerSample { public static void main(String[] args) { String topic = "test";// topic name Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "testGroup1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer(props); // 訂閱多個主題 consumer.subscribe(Arrays.asList(topic)); while (true) { // 訂閱一組topic以後,調用poll時,消費者將自動加入到組中。 // 只要持續調用poll,消費者將一直保持可用,並繼續從分配的分區中接收消息。 // 消費者向服務器定時發送心跳,若是在session.timeout.ms配置的時間內沒法發送心跳,被視爲死亡,分區將從新分配 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("*****************partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }
Kafka經過進程池瓜分消息並處理消息,這些進程能夠在同一臺機器運行,也能夠分佈到多臺機器上,以增長可擴展型和容錯性,相同的group.id
的消費者將視爲同一個消費者組。
組中的每一個消費者都經過subscribe API
動態的訂閱一個topic列表。kafka將已訂閱topic的消息發送到每一個消費者組中。並經過平衡分區在消費者分組中全部成員之間來達到平均。所以每一個分區剛好地分配1個消費者(一個消費者組中)。全部若是一個topic有4個分區,而且一個消費者分組有隻有2個消費者。那麼每一個消費者將消費2個分區。
消費者組的成員是動態維護的:若是一個消費者故障。分配給它的分區將從新分配給同一個分組中其餘的消費者。一樣的,若是一個新的消費者加入到分組,將從現有消費者中移一個給它。這被稱爲從新平衡分組
。
建立topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test
啓動zookeeper
D:\dev\kafka_2.11-2.3.1>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
啓動kafka
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-server-start.bat .\config\server.properties
先啓動消費者ConsumerExample,再啓動生產者ProducerExample,觀察控制檯。