最近一直忙着各類設計和文檔,終於有時間來更新一點兒關於kafka的東西。以前有一篇文章講述的是kafka Producer端的程序,也就是日誌的生產者,這部分比較容易理解,業務系統將運行日誌或者業務日誌發送到broker中,由broker代爲存儲。那講的是如何收集日誌,今天要寫的是如何獲取日誌,而後再作相關的處理。 前端
以前寫過kafka是講日誌按照topic的形式存儲,一個topic會按照partition存在同一個文件夾下,目錄在config/server.properties中指定,具體的存儲規則能夠查看以前的文章: java
# The directory under which to store log files log.dir=/tmp/kafka-logs
Consumer端的目的就是爲了獲取log日誌,而後作進一步的處理。在這裏咱們能夠將數據的處理按照需求分爲兩個方向,線上和線下,也能夠叫實時和離線。實時處理部分相似於網站裏的站短,有消息了立刻就推送到前端,這是一種對實時性要求極高的模式,kafka能夠作到,固然針對站短這樣的功能還有更好的處理方式,我主要將kafka線上消費功能用在了實時統計上,處理一些如實時流量彙總、各系統實時吞吐量彙總等。 ios
這種應用,通常採用一個consumer中的一個group對應一個業務,配合多個producer提供數據,以下圖模
式: shell
採用這種方式處理很簡單,採用官網上給的例子便可解決,只是因爲版本的問題,代碼稍做更改便可: api
package com.a2.kafka.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class CommonConsumer { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zk.connect", "192.168.181.128:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> map=new HashMap<String,Integer>(); map.put("test", 2); // create 4 partitions of the stream for topic 「test」, to allow 4 threads to consume Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<Message>> streams = topicMessageStreams.get("test"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for(final KafkaStream<Message> stream: streams) { executor.submit(new Runnable() { public void run() { for(MessageAndMetadata<Message> msgAndMetadata: stream) { // process message (msgAndMetadata.message()) System.out.println(msgAndMetadata.message()); } } }); } } }
這是一個user level的API,還有一個low level的API能夠從官網找到,這裏就不貼出來了。這個consumer是底層採用的是一個阻塞隊列,只要一有producer生產數據,那consumer就會將數據打印出來,這是否是十分符合實時性的要求。 網絡
固然這裏會產生一個很嚴重的問題,若是你重啓一下上面這個程序,那你連一條數據都抓不到,可是你去log文件中明明能夠看到全部數據都好好的存在。換句話說,一旦你消費過這些數據,那你就沒法再次用同一個groupid消費同一組數據了。我已經把結論說出來了,要消費同一組數據,你能夠採用不一樣的group。 性能
簡單說下產生這個問題的緣由,這個問題相似於transaction commit,在消息系統中都會有這樣一個問題存在,數據消費狀態這個信息到底存哪裏。是存在consumer端,仍是存在broker端。對於這樣的爭論,通常會出現三種狀況:
第一種狀況是將消費的狀態存儲在了broker端,一旦消費了就改變狀態,但會由於網絡緣由少消費信息,第二種是存在兩端,而且先在broker端將狀態記爲send,等consumer處理完以後將狀態標記爲consumed,但也有可能由於在處理消息時產生異常,致使狀態標記錯誤等,而且會產生性能的問題。第三種固然是最好的結果。 fetch
Kafka解決這個問題採用high water mark這樣的標記,也就是設置offset: 網站
Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section.
因此在每次消費信息時,log4j中都會輸出不一樣的offset: ui
[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 0 from 192.168.181.128:9092 [FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 15 from 192.168.181.128:9092除了採用不一樣的groupid去抓取已經消費過的數據,kafka還提供了另外一種思路,這種方式更適合線下的操做,鏡像。
經過一些配置,就能夠將線上產生的數據同步到鏡像中去,而後再由特定的集羣區處理大批量的數據,這種方式能夠採用low level的API按照不一樣的partition和offset來抓取數據,以得到更好的並行處理效果。