kafka java代碼的使用[Producer和Consumer]

用java代碼對kafka消息進行消費與發送,首先咱們得引入相關jar包java

maven:apache

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>

gradle:api

compile("org.apache.kafka:kafka_2.10:0.8.2.1")

 在新版本的kafka中(具體版本記不清楚了),添加了java代碼實現的producer,consumer目前仍是Scala的,以前的producer和consumer均是Scala編寫的,在這裏則介紹java版本的producer。數組

另外一點須要特別注意:服務器

當發送消息時咱們不指定key時,producer將消息分發到各partition的機制是:dom

Scala版本的producer:在你的producer啓動的時候,隨機得到一個partition,而後後面的消息都會發送到這個partition,也就是說,只要程序啓動了,這個producer都會往同一個partition裏發送消息maven

java版本的producer會輪詢每一個partition,因此發送的會比較平均ide

因此當使用Scala版本的producer時,儘可能傳入key,保證消息在partition的平均性gradle

下面是具體的代碼:spa

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.SerializationUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import cn.qlt.study.domain.User;

public class KafkaUtil {

	
	private static KafkaProducer<String, byte[]> producer=null;

	private static ConsumerConnector consumer=null;
	
	static{
		//生產者配置文件,具體配置可參考ProducerConfig類源碼,或者參考官網介紹
		Map<String,Object> config=new HashMap<String, Object>();
		//kafka服務器地址
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.90:9092,192.168.100.91:9092");
		//kafka消息序列化類 即將傳入對象序列化爲字節數組
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
		//kafka消息key序列化類 若傳入key的值,則根據該key的值進行hash散列計算出在哪一個partition上
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);
		//往kafka服務器提交消息間隔時間,0則當即提交不等待
		config.put(ProducerConfig.LINGER_MS_CONFIG,0);
		//消費者配置文件
		Properties props = new Properties();
		//zookeeper地址
	    props.put("zookeeper.connect", "192.168.100.90:2181");
	    //組id
	    props.put("group.id", "123");
	    //自動提交消費狀況間隔時間
	    props.put("auto.commit.interval.ms", "1000");
	    
	    ConsumerConfig consumerConfig=new ConsumerConfig(props);
		producer=new KafkaProducer<String,byte[]>(config);
		consumer=Consumer.createJavaConsumerConnector(consumerConfig);
	}

	/**
	 *啓動一個消費程序 
	* @param topic 要消費的topic名稱
	* @param handler 本身的處理邏輯的實現
	* @param threadCount 消費線程數,該值應小於等於partition個數,多了也沒用
	 */
	public static <T extends Serializable>void startConsumer(String topic,final MqMessageHandler<T> handler,int threadCount) throws Exception{
		if(threadCount<1)
			throw new Exception("處理消息線程數最少爲1");
	   //設置處理消息線程數,線程數應小於等於partition數量,若線程數大於partition數量,則多餘的線程則閒置,不會進行工做
	   //key:topic名稱 value:線程數
	   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	   topicCountMap.put(topic, new Integer(threadCount));
	   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
	   //聲明一個線程池,用於消費各個partition
	   ExecutorService executor=Executors.newFixedThreadPool(threadCount);
	   //獲取對應topic的消息隊列
	   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	   //爲每個partition分配一個線程去消費
	   for (final KafkaStream stream : streams) {
		   executor.execute(new Runnable() {
			@Override
			public void run() {
			     ConsumerIterator<byte[], byte[]> it = stream.iterator();
			     //有信息則消費,無信息將會阻塞
			     while (it.hasNext()){
			        T message=null;
					try {
						//將字節碼反序列化成相應的對象
						byte[] bytes=it.next().message();
						message = (T) SerializationUtils.deserialize(bytes);
					} catch (Exception e) {
						e.printStackTrace();
						return;
					}
			    	//調用本身的業務邏輯
			    	try {
						handler.handle(message);
					} catch (Exception e) {
						e.printStackTrace();
					}
			     }
			}
		});
       }
	}
	/**
	 *發送消息,發送的對象必須是可序列化的 
	 */
	public static Future<RecordMetadata> send(String topic,Serializable value) throws Exception{
		try {
			//將對象序列化稱字節碼
			byte[] bytes=SerializationUtils.serialize(value);
			Future<RecordMetadata> future=producer.send(new ProducerRecord<String,byte[]>(topic,bytes));
			return future;
		}catch(Exception e){
			throw e;
		}
	}
	
	//內部抽象類 用於實現本身的處理邏輯
	public static abstract class MqMessageHandler<T extends Serializable>{
		public abstract void handle(T message);
	}
	
	
	public static void main(String[] args) throws Exception {
		//發送一個信息
		send("test",new User("id","userName", "password"));
		//爲test啓動一個消費者,啓動後每次有消息則打印對象信息
		KafkaUtil.startConsumer("test", new MqMessageHandler<User>() {
			@Override
			public void handle(User user) {
				//實現本身的處理邏輯,這裏只打印出消息
				System.out.println(user.toString());
			}
		},2);
	}
}

相關配置解釋:

producer:

一、producer的配置不須要zookeeper地址,會直接獲取kafka的元數據,直接和broker進行通訊

二、ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG即value.serializer,kafka生產者與broker之間數據是以byte進行傳遞的,因此這個參數的意思是把咱們傳入對象轉換成byte[]的類,通常使用org.apache.kafka.common.serialization.ByteArraySerializer便可,咱們本身把對象序列化爲byte[]

三、ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG即key.serializer,首先說明下key值是幹什麼的,若咱們指定了key的值,生產者則會根據該key進行hash散列計算出具體的partition。若不指定,則隨機選擇partition。通常狀況下咱們不必指定該值。這個類與上面功能同樣,即將key轉換成byte[]

四、ProducerConfig.LINGER_MS_CONFIG即linger.ms,爲了減小請求次數提升吞吐率,這個參數爲每次提交間隔的次數,若設置了該值,如1000,則意味着咱們的消息可能不會立刻提交到kafka服務器,須要等上1秒中,纔會進行批量提交。咱們能夠適當的配置該值。0爲不等待馬上提交。

consumer:

一、zookeeper.connect:zookeeper的地址,多個之間用,分割

二、group.id:這個值能夠隨便寫,但建議寫點有意義的值,別隨便寫個123。kafka保證同一個組內的消息只會被消費一次,若須要重複消費消息,則能夠配置不一樣的groupid。

三、auto.commit.interval.ms:consumer本身會記錄消費的偏移量,並定時往zookeeper上提交,該值即爲提交時間間隔,若該值設置太大可能會出現重複消費的狀況,如咱們中止了某個consumer,但該consumer還未往zookeeper提交某段時間的消費記錄,這致使咱們下次啓動該消費者的時候,它會從上次提交的偏移量進行消費,這就致使了某些數據的重複消費。

注意:在殺死consumer進程後,應等一下子再去重啓,由於殺死consumer進程時,會刪除zookeeper的一些臨時節點,若咱們立刻重啓的話,可能會在啓動的時候那些節點還沒刪除掉,出現寫沒必要要的錯誤

相關文章
相關標籤/搜索