用java代碼對kafka消息進行消費與發送,首先咱們得引入相關jar包java
maven:apache
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency>
gradle:api
compile("org.apache.kafka:kafka_2.10:0.8.2.1")
在新版本的kafka中(具體版本記不清楚了),添加了java代碼實現的producer,consumer目前仍是Scala的,以前的producer和consumer均是Scala編寫的,在這裏則介紹java版本的producer。數組
另外一點須要特別注意:服務器
當發送消息時咱們不指定key時,producer將消息分發到各partition的機制是:dom
Scala版本的producer:在你的producer啓動的時候,隨機得到一個partition,而後後面的消息都會發送到這個partition,也就是說,只要程序啓動了,這個producer都會往同一個partition裏發送消息maven
java版本的producer:會輪詢每一個partition,因此發送的會比較平均ide
因此當使用Scala版本的producer時,儘可能傳入key,保證消息在partition的平均性gradle
下面是具體的代碼:spa
import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.commons.lang.SerializationUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import cn.qlt.study.domain.User; public class KafkaUtil { private static KafkaProducer<String, byte[]> producer=null; private static ConsumerConnector consumer=null; static{ //生產者配置文件,具體配置可參考ProducerConfig類源碼,或者參考官網介紹 Map<String,Object> config=new HashMap<String, Object>(); //kafka服務器地址 config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.90:9092,192.168.100.91:9092"); //kafka消息序列化類 即將傳入對象序列化爲字節數組 config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); //kafka消息key序列化類 若傳入key的值,則根據該key的值進行hash散列計算出在哪一個partition上 config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5); //往kafka服務器提交消息間隔時間,0則當即提交不等待 config.put(ProducerConfig.LINGER_MS_CONFIG,0); //消費者配置文件 Properties props = new Properties(); //zookeeper地址 props.put("zookeeper.connect", "192.168.100.90:2181"); //組id props.put("group.id", "123"); //自動提交消費狀況間隔時間 props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig=new ConsumerConfig(props); producer=new KafkaProducer<String,byte[]>(config); consumer=Consumer.createJavaConsumerConnector(consumerConfig); } /** *啓動一個消費程序 * @param topic 要消費的topic名稱 * @param handler 本身的處理邏輯的實現 * @param threadCount 消費線程數,該值應小於等於partition個數,多了也沒用 */ public static <T extends Serializable>void startConsumer(String topic,final MqMessageHandler<T> handler,int threadCount) throws Exception{ if(threadCount<1) throw new Exception("處理消息線程數最少爲1"); //設置處理消息線程數,線程數應小於等於partition數量,若線程數大於partition數量,則多餘的線程則閒置,不會進行工做 //key:topic名稱 value:線程數 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(threadCount)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); //聲明一個線程池,用於消費各個partition ExecutorService executor=Executors.newFixedThreadPool(threadCount); //獲取對應topic的消息隊列 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); //爲每個partition分配一個線程去消費 for (final KafkaStream stream : streams) { executor.execute(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); //有信息則消費,無信息將會阻塞 while (it.hasNext()){ T message=null; try { //將字節碼反序列化成相應的對象 byte[] bytes=it.next().message(); message = (T) SerializationUtils.deserialize(bytes); } catch (Exception e) { e.printStackTrace(); return; } //調用本身的業務邏輯 try { handler.handle(message); } catch (Exception e) { e.printStackTrace(); } } } }); } } /** *發送消息,發送的對象必須是可序列化的 */ public static Future<RecordMetadata> send(String topic,Serializable value) throws Exception{ try { //將對象序列化稱字節碼 byte[] bytes=SerializationUtils.serialize(value); Future<RecordMetadata> future=producer.send(new ProducerRecord<String,byte[]>(topic,bytes)); return future; }catch(Exception e){ throw e; } } //內部抽象類 用於實現本身的處理邏輯 public static abstract class MqMessageHandler<T extends Serializable>{ public abstract void handle(T message); } public static void main(String[] args) throws Exception { //發送一個信息 send("test",new User("id","userName", "password")); //爲test啓動一個消費者,啓動後每次有消息則打印對象信息 KafkaUtil.startConsumer("test", new MqMessageHandler<User>() { @Override public void handle(User user) { //實現本身的處理邏輯,這裏只打印出消息 System.out.println(user.toString()); } },2); } }
相關配置解釋:
producer:
一、producer的配置不須要zookeeper地址,會直接獲取kafka的元數據,直接和broker進行通訊
二、ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG即value.serializer,kafka生產者與broker之間數據是以byte進行傳遞的,因此這個參數的意思是把咱們傳入對象轉換成byte[]的類,通常使用org.apache.kafka.common.serialization.ByteArraySerializer便可,咱們本身把對象序列化爲byte[]
三、ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG即key.serializer,首先說明下key值是幹什麼的,若咱們指定了key的值,生產者則會根據該key進行hash散列計算出具體的partition。若不指定,則隨機選擇partition。通常狀況下咱們不必指定該值。這個類與上面功能同樣,即將key轉換成byte[]
四、ProducerConfig.LINGER_MS_CONFIG即linger.ms,爲了減小請求次數提升吞吐率,這個參數爲每次提交間隔的次數,若設置了該值,如1000,則意味着咱們的消息可能不會立刻提交到kafka服務器,須要等上1秒中,纔會進行批量提交。咱們能夠適當的配置該值。0爲不等待馬上提交。
consumer:
一、zookeeper.connect:zookeeper的地址,多個之間用,分割
二、group.id:這個值能夠隨便寫,但建議寫點有意義的值,別隨便寫個123。kafka保證同一個組內的消息只會被消費一次,若須要重複消費消息,則能夠配置不一樣的groupid。
三、auto.commit.interval.ms:consumer本身會記錄消費的偏移量,並定時往zookeeper上提交,該值即爲提交時間間隔,若該值設置太大可能會出現重複消費的狀況,如咱們中止了某個consumer,但該consumer還未往zookeeper提交某段時間的消費記錄,這致使咱們下次啓動該消費者的時候,它會從上次提交的偏移量進行消費,這就致使了某些數據的重複消費。
注意:在殺死consumer進程後,應等一下子再去重啓,由於殺死consumer進程時,會刪除zookeeper的一些臨時節點,若咱們立刻重啓的話,可能會在啓動的時候那些節點還沒刪除掉,出現寫沒必要要的錯誤