工做也有三年了,也是由於比較懶惰和自身水平的緣由,以前對一些沒用過的技術進行實踐後總沒有過什麼過多的總結。此次也是由於在工做之中遇到的一些問題須要使用到Kafka,過程當中也遇到並解決了一些問題。在階段性完成工做後,覺得同事說起是否有什麼總結和經驗能夠分享,使我忽然意識到須要總結一下本身遇到問題與技術,從而更好地去認識這陌生的01世界。java
Kafka是一個分佈式的消息隊列,有所耳聞的基本都瞭解它是個什麼樣的「人物」。簡要的介紹一下我對它的一些認識。apache
瞭解了Kafka一些很基礎的概念後,仍是須要動手去體驗它是如何操做的。下面是在CentOS7的一些安裝過程。bootstrap
可在Kafka的官網http://kafka.apache.org/下載最新的資源文件緩存
目前僅使用單機版本的Kafka來提供服務,對於文件解壓等基礎操做不在闡述。主要須要修改的配置文件有:server.properties,配置文件kafka目錄下的config文件夾下。網絡
修改內容以下:
# listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. advertised.listeners=PLAINTEXT://:9092 # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs
簡單的說明socket
這裏我使用的是單機模式,並非集羣的運行方式。Kafka使用的是1.1.0的版本,內部自帶了Zookeeper那麼就不須要額外的下載Zookeeper了。分佈式
1)首先啓動Zookeeper服務性能
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log &
2)啓動Kafka服務學習
nohup ./bin/kafka-server-start.sh config/server.properties > kafka.log &
簡單提一下,nohup和&是爲了讓Zookeeper和Kafka的服務可以在後臺運行,並將執行命令輸出的日誌分別記錄到對應文件中去;.net
咱們使用kafka-topics.sh這個腳原本進行一系列關於Topics(隊列)的定義等操做
1)創建一個隊列
./bin/kafka-topics.sh --create --zookeeper 192.168.3.203:2181 --topic test --replication-factor 1 --partitions 2 ### 進行一些說明 ## --create 指定命令爲建立 ## --zookeeper 指定Zookeeper的服務 ## --topic 指定隊列名稱 ## --replication-factor 指定隊列的副本數,這個副本數須要和集羣的節點數相對應 ## --partitions 指定隊列的分片數量,適當的分片能夠提高性能
2)查看隊列列表
./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --list ### 說明 ## --list list命令指定列出當前的topic列表
3)查看指定隊列
./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --topic test --describe ### 說明 ## --topic 指定須要查看的隊列名 ## --describe 該命令用於描述topic隊應的基礎信息
4)刪除隊列
./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --topic test --delete ### 說明 ## --topic 指定須要刪除的隊列名 ## --delete 該命令用於刪除隊列
其餘的一些使用方式,能夠查看kafka-topics的幫助說明;
咱們使用kafka-consumer-groups.sh這個腳原本瞭解關於consumer的一些基礎信息;
1)查看消費者組列表
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --list ### 說明 ## --bootstrap-server 指定Kafka的服務地址 ## --list 列出全部消費者
2)查看消費者組信息
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --group test --describe ### 說明 ## --group 指定消費者組組名 ## --describe 指定爲描述命令
3)刪除消費者組
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --group test --delete ### 說明 ## --delete 指定爲刪除命令
在Kafka的官方API中Producer有以下的使用例子
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
還有以下針對,發送數據時使用事務去提交的例子:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();
在我的使用中出現過以下的兩個問題,但願你們也能規避;
每次新建一個Producer時,會利用網絡去創建一個鏈接。若是使用線程頻繁地去建立Producer的話,在Client端會出現java.net.BindException: Address already in use: JVM_Bind的異常。
這是因爲在Producer與Kafka Server創建鏈接時,須要在客戶端分配一個端口,每一個機器的端口數量有限,當被耗盡時,變沒法在與Kafka Server創建鏈接了。
當producer使用事務進行數據發送時,事務結束以後,須要開啓一個新的事務進行數據提交。這個時候的事務ID不能重複,須要從新設置一個transactional.id,不然進行數據提交時將會出現異常。
一樣Consumer在官方的API中也有使用說明,大體使用以下:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }
在我的使用中也遇到一些問題,拿出來與你們分享一下;
offset對應的是一個消費組的偏移量,按照如上的Consumer的代碼獲取使用腳本一個新建立Topic的數據,將獲取不到任何數據。不管使用Producer往Topic中發送了對多數據,我都沒有辦法取到。
這使我有點困惑,起初我使用命令進行數據拉取後,改代碼能夠拉取到數據,命令以下:
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.203:9092 --from-beginning --topic test
因而,經過查看kafka-console.consumer.sh,瞭解到它在Kafka包中對用的Scala代碼中,發現參數--from-beginning對應設置了一個配置,"auto.offset.reset"爲"earliest"。
auto.offset.reset參數說明以下:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
設置配置後,即可以在未初始化時獲取到數據了。
當咱們使用到隊列做爲中間件緩存消息時,咱們經常會關心當前隊列中剩餘的數據量。這便於咱們去判斷當前咱們的數據處理程序的響應能力,以便於調整部署狀況。
當前基於個人瞭解,Kafka沒有提供直接的方法返回隊列中的數據剩餘量,須要咱們手工的去計算。我是這樣作的:
List<PartitionInfo> partInfos = consumer.partitionsFor(topic); List<TopicPartition> partitions = new ArrayList<TopicPartition>(); for (PartitionInfo info : partInfos) { partitions.add(new TopicPartition(topic, info.partition())); } Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions); for (TopicPartition topicPartitionKey : endOffsets.keySet()) { count += (endOffsets.get(topicPartitionKey) - consumer.position(topicPartitionKey)); } return count;
遍歷消費者在Topic中的每個分片,在每一個分片中對offset - position的差值作累加獲得最終的未處理數量。這樣作只能獲取一個大概的計算值,不是十分準確。由於在Producer使用transition發送數據時,完成一個事務後offset會加1去作記錄,並且可能還存在一些未知的狀況。
固然這個方法是一種很笨的方法,或許你有更好的方式呢,歡迎交流。
最後,我目前使用的Kafka的經驗也總結完,Kafka內部設計很複雜也頗有趣,須要進一步地探索,學習總會有回報的。