首先介紹一下 Kafka 生產者發送消息的過程:java
本項目採用 Maven 構建,想要調用 Kafka 生產者 API,須要導入 kafka-clients
依賴,以下:git
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
建立 Kafka 生產者時,如下三個屬性是必須指定的:github
建立的示例代碼以下:算法
public class SimpleProducer { public static void main(String[] args) { String topicName = "Hello-Kafka"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*建立生產者*/ Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, "world" + i); /* 發送消息*/ producer.send(record); } /*關閉生產者*/ producer.close(); } }
本篇文章的全部示例代碼能夠從 Github 上進行下載:kafka-basisshell
Kafka 的運行依賴於 zookeeper,須要預先啓動,能夠啓動 Kafka 內置的 zookeeper,也能夠啓動本身安裝的:apache
# zookeeper啓動命令 bin/zkServer.sh start # 內置zookeeper啓動命令 bin/zookeeper-server-start.sh config/zookeeper.properties
啓動單節點 kafka 用於測試:bootstrap
# bin/kafka-server-start.sh config/server.properties
# 建立用於測試主題 bin/kafka-topics.sh --create \ --bootstrap-server hadoop001:9092 \ --replication-factor 1 --partitions 1 \ --topic Hello-Kafka # 查看全部主題 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
啓動一個控制檯消費者用於觀察寫入狀況,啓動命令以下:api
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
此時能夠看到消費者控制檯,輸出以下,這裏 kafka-console-consumer
只會打印出值信息,不會打印出鍵信息。數組
在這裏可能出現的一個問題是:生產者程序在啓動後,一直處於等待狀態。這一般出如今你使用默認配置啓動 Kafka 的狀況下,此時須要對 server.properties
文件中的 listeners
配置進行更改:服務器
# hadoop001 爲我啓動kafka服務的主機名,你能夠換成本身的主機名或者ip地址 listeners=PLAINTEXT://hadoop001:9092
上面的示例程序調用了 send
方法發送消息後沒有作任何操做,在這種狀況下,咱們沒有辦法知道消息發送的結果。想要知道消息發送的結果,可使用同步發送或者異步發送來實現。
在調用 send
方法後能夠接着調用 get()
方法,send
方法的返回值是一個 Future<RecordMetadata>對象,RecordMetadata 裏面包含了發送消息的主題、分區、偏移量等信息。改寫後的代碼以下:
for (int i = 0; i < 10; i++) { try { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i); /*同步發送消息*/ RecordMetadata metadata = producer.send(record).get(); System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
此時獲得的輸出以下:偏移量和調用次數有關,全部記錄都分配到了 0 分區,這是由於在建立 Hello-Kafka
主題時候,使用 --partitions
指定其分區數爲 1,即只有一個分區。
topic=Hello-Kafka, partition=0, offset=40 topic=Hello-Kafka, partition=0, offset=41 topic=Hello-Kafka, partition=0, offset=42 topic=Hello-Kafka, partition=0, offset=43 topic=Hello-Kafka, partition=0, offset=44 topic=Hello-Kafka, partition=0, offset=45 topic=Hello-Kafka, partition=0, offset=46 topic=Hello-Kafka, partition=0, offset=47 topic=Hello-Kafka, partition=0, offset=48 topic=Hello-Kafka, partition=0, offset=49
一般咱們並不關心發送成功的狀況,更多關注的是失敗的狀況,所以 Kafka 提供了異步發送和回調函數。 代碼以下:
for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i); /*異步發送消息,並監聽回調*/ producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println("進行異常處理"); } else { System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset()); } } }); }
Kafka 有着默認的分區機制:
某些狀況下,你可能有着本身的分區需求,這時候能夠採用自定義分區器實現。這裏給出一個自定義分區器的示例:
/** * 自定義分區器 */ public class CustomPartitioner implements Partitioner { private int passLine; @Override public void configure(Map<String, ?> configs) { /*從生產者配置中獲取分數線*/ passLine = (Integer) configs.get("pass.line"); } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /*key 值爲分數,當分數大於分數線時候,分配到 1 分區,不然分配到 0 分區*/ return (Integer) key >= passLine ? 1 : 0; } @Override public void close() { System.out.println("分區器關閉"); } }
須要在建立生產者時指定分區器,和分區器所須要的配置參數:
public class ProducerWithPartitioner { public static void main(String[] args) { String topicName = "Kafka-Partitioner-Test"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*傳遞自定義分區器*/ props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner"); /*傳遞分區器所需的參數*/ props.put("pass.line", 6); Producer<Integer, String> producer = new KafkaProducer<>(props); for (int i = 0; i <= 10; i++) { String score = "score:" + i; ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score); /*異步發送消息*/ producer.send(record, (metadata, exception) -> System.out.printf("%s, partition=%d, \n", score, metadata.partition())); } producer.close(); } }
須要建立一個至少有兩個分區的主題:
bin/kafka-topics.sh --create \ --bootstrap-server hadoop001:9092 \ --replication-factor 1 --partitions 2 \ --topic Kafka-Partitioner-Test
此時輸入以下,能夠看到分數大於等於 6 分的都被分到 1 分區,而小於 6 分的都被分到了 0 分區。
score:6, partition=1, score:7, partition=1, score:8, partition=1, score:9, partition=1, score:10, partition=1, score:0, partition=0, score:1, partition=0, score:2, partition=0, score:3, partition=0, score:4, partition=0, score:5, partition=0, 分區器關閉
上面生產者的建立都僅指定了服務地址,鍵序列化器、值序列化器,實際上 Kafka 的生產者還有不少可配置屬性,以下:
acks 參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的:
設置生產者內存緩衝區的大小。
默認狀況下,發送的消息不會被壓縮。若是想要進行壓縮,能夠配置此參數,可選值有 snappy,gzip,lz4。
發生錯誤後,消息重發的次數。若是達到設定值,生產者就會放棄重試並返回錯誤。
當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。
該參數制定了生產者在發送批次以前等待更多消息加入批次的時間。
客戶端 id,服務器用來識別消息的來源。
指定了生產者在收到服務器響應以前能夠發送多少個消息。它的值越高,就會佔用越多的內存,不過也會提高吞吐量,把它設置爲 1 能夠保證消息是按照發送的順序寫入服務器,即便發生了重試。
指定了在調用 send()
方法或使用 partitionsFor()
方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。
該參數用於控制生產者發送的請求大小。它能夠指發送的單個消息的最大值,也能夠指單個請求裏全部消息總的大小。例如,假設這個值爲 1000K ,那麼能夠發送的單個最大消息爲 1000K ,或者生產者能夠在單個請求裏發送一個批次,該批次包含了 1000 個消息,每一個消息大小爲 1K。
這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 表明使用操做系統的默認值。
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南