java提供了方便的API進行kafka消息處理。簡單總結一下:html
學習參考:http://www.itnose.net/st/6095038.htmljava
POM配置(關於LOG4J的配置參看 http://www.cnblogs.com/huayu0815/p/5341712.html)apache
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.7</version> </dependency> </dependencies>
PRODUCERapi
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer { Producer<String, String> producer ; /*#指定kafka節點列表,用於獲取metadata,沒必要所有指定 metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092 # 指定分區處理類。默認kafka.producer.DefaultPartitioner,表經過key哈希到對應分區 #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner # 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。 compression.codec=none # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultEncoder,即byte[] serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder # serializer.class=kafka.serializer.DefaultEncoder # serializer.class=kafka.serializer.StringEncoder # 若是要壓縮消息,這裏指定哪些topic要壓縮消息,默認empty,表示不壓縮。 #compressed.topics= ########### request ack ############### # producer接收消息ack的時機.默認爲0. # 0: producer不會等待broker發送ack # 1: 當leader接收到消息以後發送ack # 2: 當全部的follower都同步消息成功後發送ack. request.required.acks=0 # 在向producer發送ack以前,broker容許等待的最大時間 # 若是超時,broker將會向producer發送一個error ACK.意味着上一次消息由於某種 # 緣由未能成功(好比follower未能同步成功) request.timeout.ms=10000 ########## end ##################### # 同步仍是異步發送消息,默認「sync」表同步,"async"表異步。異步能夠提升發送吞吐量, # 也意味着消息將會在本地buffer中,並適時批量發送,可是也可能致使丟失未發送過去的消息 producer.type=sync ############## 異步發送 (如下四個異步參數可選) #################### # 在async模式下,當message被緩存的時間超過此值後,將會批量發送給broker,默認爲5000ms # 此值和batch.num.messages協同工做. queue.buffering.max.ms = 5000 # 在async模式下,producer端容許buffer的最大消息量 # 不管如何,producer都沒法儘快的將消息發送給broker,從而致使消息在producer端大量沉積 # 此時,若是消息的條數達到閥值,將會致使producer端阻塞或者消息被拋棄,默認爲10000 queue.buffering.max.messages=20000 # 若是是異步,指定每次批量發送數據量,默認爲200 batch.num.messages=500 # 當消息在producer端沉積的條數達到"queue.buffering.max.meesages"後 # 阻塞必定時間後,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息) # 此時producer能夠繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間 # -1: 無阻塞超時限制,消息不會被拋棄 # 0:當即清空隊列,消息被拋棄 queue.enqueue.timeout.ms=-1 ################ end ############### # 當producer接收到error ACK,或者沒有接收到ACK時,容許消息重發的次數 # 由於broker並無完整的機制來避免消息重複,因此當網絡異常時(好比ACK丟失) # 有可能致使broker接收到重複的消息,默認值爲3. message.send.max.retries=3 # producer刷新topic metada的時間間隔,producer須要知道partition leader的位置,以及當前topic的狀況 # 所以producer須要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會當即刷新 # (好比topic失效,partition丟失,leader失效等),此外也能夠經過此參數來配置額外的刷新機制,默認值600000 topic.metadata.refresh.interval.ms=60000*/ public Producer<String, String> getClient() { if (producer == null) { Properties props = new Properties() ; //此處配置的是kafka的端口 props.put("metadata.broker.list", "xxx.xxx.xxx.xxx:9092"); //配置value的序列化類 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async"); //配置key的序列化類 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props) ; producer = new Producer<>(config) ; } return producer ; } public void shutdown(){ if (producer != null) { producer.close(); } } public static void main(String[] args) throws CloneNotSupportedException { KafkaProducer kafkaProducer = new KafkaProducer() ; for (int i=0 ; i< 10; i ++) { kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic1","topic1_" + i + "_測試")); kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic2","topic2_" + i + "_測試")); } kafkaProducer.shutdown(); } }
總結:緩存
一、producer每次new的時候,會自動建立線程池bash
二、producer在調用send方法時候,纔會真正創建socket鏈接。服務器
鏈接過程以下:網絡
1>、經過metadata.broker.list獲取對應的brokers全量信息(metadata.broker.list給的broker的ip和端口只要保證一個是可用的便可,無需所有列出。不過開發過程當中,通常所有列出)。session
2>、根據zookeeper的註冊信息獲取topic的分區信息多線程
3>、創建client和broker的socket鏈接
三、send結束後,直接關閉socket鏈接。
四、每次send會從新創建鏈接
五、client會自動獲取topic的分區信息,所以kafka rebalance的時候,是不受影響的
CONSUMER
consumer api官方有兩種,通常稱爲:high-level Consumer API 和 SimpleConsumer API 。
使用第二種的弊端:
我主要嘗試了一下第一種也是大多數狀況下使用的API。
使用high-level Consumer api,有兩種用法:單個消費者和多個消費者
單消費者:
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaSingleConsumer { /** * # zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其餘消費者要等該指定時間才能檢查到而且觸發從新負載均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000 #指定消費組 group.id=xxx # 當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息 # 注意offset信息並非每消費一次消息就向zk提交一次,而是如今本地保存(內存),並按期提交,默認爲true auto.commit.enable=true # 自動更新時間。默認60 * 1000 auto.commit.interval.ms=1000 # 當前consumer的標識,能夠設定,也能夠有系統生成,主要用來跟蹤消息消費狀況,便於觀察 conusmer.id=xxx # 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生 client.id=xxxx # 最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 # 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新 # 的consumer上,若是一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊 # "Partition Owner registry"節點信息,可是有可能此時舊的consumer尚沒有釋放此節點, # 此值用於控制,註冊節點的重試次數. rebalance.max.retries=5 # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk # 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗更多的consumer端內存 fetch.min.bytes=6553600 # 當消息的尺寸不足時,server阻塞的時間,若是超時,消息將當即發送給consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 若是zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、 # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest auto.offset.reset=smallest # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder */ public static void main(String args[]) { String topic = "topic1" ; Properties props = new Properties(); props.put("zookeeper.connect", "xxx.xxx.xxx:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ; Map<String, Integer> topicMap = new HashMap<>(); // Define single thread for topic topicMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); for (KafkaStream<byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } }
多消費者
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaMultiConsumer { /** * # zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其餘消費者要等該指定時間才能檢查到而且觸發從新負載均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000 #指定消費組 group.id=xxx # 當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息 # 注意offset信息並非每消費一次消息就向zk提交一次,而是如今本地保存(內存),並按期提交,默認爲true auto.commit.enable=true # 自動更新時間。默認60 * 1000 auto.commit.interval.ms=1000 # 當前consumer的標識,能夠設定,也能夠有系統生成,主要用來跟蹤消息消費狀況,便於觀察 conusmer.id=xxx # 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生 client.id=xxxx # 最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 # 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新 # 的consumer上,若是一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊 # "Partition Owner registry"節點信息,可是有可能此時舊的consumer尚沒有釋放此節點, # 此值用於控制,註冊節點的重試次數. rebalance.max.retries=5 # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk # 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗更多的consumer端內存 fetch.min.bytes=6553600 # 當消息的尺寸不足時,server阻塞的時間,若是超時,消息將當即發送給consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 若是zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、 # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest auto.offset.reset=smallest # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder */ public static void main(String args[]) { String topic = "topic1" ; int threadCount = 3; Properties props = new Properties(); props.put("zookeeper.connect", "xxx.xxx.xxx.xxx:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ; Map<String, Integer> topicMap = new HashMap<>(); // Define single thread for topic topicMap.put(topic, 3); ExecutorService executor = Executors.newFixedThreadPool(threadCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); int count = 0; for (final KafkaStream<byte[], byte[]> stream : streamList) { final String threadNumber = "Thread" + count ; executor.execute(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { System.out.println("Thread Number " + threadNumber + ": " + new String(consumerIte.next().message())); } } }); count++ ; } } }
總結:
一、KAFKA容許多個consumer group,每一個group容許多個consumer。不一樣group之間共享信息(相似發佈-訂閱模式),同一個group之間的多個consumer只會消費消息一次(相似生產-消費者模式)。
二、對同一個topic啓動多個java consumer線程,在zookeeper上能夠看到多個信息:
[zk:xxx.xxx.xxx.xxx:2181(CONNECTED) 120] ls /consumers/testgroup/ids [testgroup_xxx-1459926903849-fea50e90, testgroup_xxx-1459926619712-8d1caf90]
三、若是多線程方式啓動consumer,能夠看到不一樣的consumer綁定到不一樣的topic patition上
[zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 121] get /consumers/testgroup/owners/topic1/1 testgroup_xxx-1459926619712-8d1caf90-1 cZxid = 0x2000006e2 ctime = Wed Apr 06 03:15:04 EDT 2016 mZxid = 0x2000006e2 mtime = Wed Apr 06 03:15:04 EDT 2016 pZxid = 0x2000006e2 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x153413bc26e007e dataLength = 44 numChildren = 0 [zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 122] get /consumers/testgroup/owners/topic1/0 testgroup_xxx-1459926619712-8d1caf90-0 cZxid = 0x2000006e3 ctime = Wed Apr 06 03:15:04 EDT 2016 mZxid = 0x2000006e3 mtime = Wed Apr 06 03:15:04 EDT 2016 pZxid = 0x2000006e3 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x153413bc26e007e dataLength = 44 numChildren = 0
四、對於啓動多個consumer進程或是以多線程方式啓動單個consumer進程,區別僅僅在與zookeeper上註冊的consumer信息是多個或是一個「ls /consumers/testgroup/ids 」,可是對於消息的消費而言,都遵照消費只消費一次,同一個分區只會綁定一個consumer信息。
五、若是某個消費者掛掉的話,consumer和partition的綁定信息會從新分配,儘量的保證負載平衡
六、若是consumer的數量大於分區數量,會形成多餘的那部分線程沒法獲取消息,不斷 Got ping response for sessionid: 0x153413bc26e0082 after 2ms。是一種資源的浪費
若是多臺服務器都啓動consumer進程,最好根據分區數合理分配consumer進程中,消費線程的數量
更底層的細節問題,後期遇到再繼續調研,先會用,明白大體原理!