自定義系列化方式Encoderjava
kafka自帶的序列化方式正則表達式
DefaultEncoder默認的這個Encoder事實上不作任何處理,接收到什麼byte[]就返回什麼byte[]:api
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] { override def toBytes(value: Array[Byte]): Array[Byte] = value }服務器
NullEncoder無論接收什麼都返回null:session
class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] { 架構
override def toBytes(value: T): Array[Byte] = null }app
StringEncoder則返回字符串,默認是utf-8的格式:負載均衡
class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {dom
val encoding = ide
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
override def toBytes(s: String): Array[Byte] =
if(s == null)
null
else
s.getBytes(encoding) }
本身編寫Encoder來序列化消息,只須要實現下面接口:
interface Encoder<T> {
public Message toMessage(T data);
}
例如,咱們的消息是一個對象
用四個字段分別表示消息的ID、用戶、查詢關鍵詞和查詢時間。固然你若是要設計的更復雜,能夠加入IP這些信息。這些用java寫就是一個簡單的pojo類,這是getter/setter方法便可。因爲在封轉成kafka的message時須要將數據轉化成bytep[]類型,能夠提供一個序列化的方法。我在這裏直接重寫toString了:
@Override
public
String toString() {
String keyword =
"[info kafka producer:]"
;
keyword = keyword +
this
.getId() +
"-"
+
this
.getUser() +
"-"
+
this
.getKeyword() +
"-"
+
this
.getCurrent();
return
keyword;
}
這樣尚未完成,這只是將數據格式用java對象表現出來,解析來要對其按照kafka的消息類型進行封裝,在這裏咱們只須要實現Encoder類便可:
public
class
KeywordMessage
implements
kafka.serializer.Encoder<Keyword>{
public
static
final
Logger LOG=LoggerFactory.getLogger(Keyword.
class
);
@Override
public
Message toMessage(Keyword words) {
LOG.info(
"start in encoding..."
);
return
new
Message(words.toString().getBytes());
}
}
自定義partition
kafka自帶分區方式
DefaultPartitioner默認的分區函數,他根據key的hashcode與分區數取餘,獲得相應的分區。
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
private val random = new java.util.Random
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
}
若是key爲null會在必定時間內往一個特定的分區發送,超過必定時間又會隨機選擇一個,請參考 key爲null時Kafka會將消息發送給哪一個分區? .因此推薦你發送Kafka消息時老是指定一個key,以便消息能均勻的分到每一個分區上。
自定義分區方式須要實現下面的接口:
interface Partitioner<T> {
int partition(T key, int numPartitions);
}
分區函數有兩個參數:key和可用的分區數量,從分區列表中選擇一個分區並返回id。默認的分區策略是hash(key)%numPartitions
.若是key是null,就隨機的選擇一個。能夠經過參數partitioner.class
定製分區函數,例如:
public
class
ProducerPartitioner
implements
Partitioner<String> {
public
static
final
Logger LOG=LoggerFactory.getLogger(Keyword.
class
);
@Override
public
int
partition(String key,
int
numPartitions) {
LOG.info(
"ProducerPartitioner key:"
+key+
" partitions:"
+numPartitions);
return
key.length() % numPartitions;
}
}
key咱們是在構造數據發送對象時設置的,這個key是區分存儲的關鍵,好比我想將個人數據按照不一樣的用戶類別存儲。
java編寫producer
producer api:
class Producer {
/* 將消息發送到指定分區 */
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
/* 批量發送一批消息 */
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
/* 關閉producer */
public void close();
}
例子:
Properties props = new Properties(); //指定kafka節點:注意這裏無需指定集羣中全部Boker,只要指定其中部分便可,它會自動取meta信息並鏈接到對應的Boker節點 props.put("metadata.broker.list", "172.17.1.163:9093"); //指定採用哪一種序列化方式將消息傳輸給Boker,你也能夠在發送消息的時候指定序列化類型,不指定則以此爲默認序列化類型 props.put("serializer.class", "kafka.serializer.StringEncoder"); //指定消息發送對應分區方式,若不指定,則隨機發送到一個分區,也能夠在發送消息的時候指定分區類型。 props.put("partitioner.class", "example.producer.DefaultPartitioner"); //該屬性表示你須要在消息被接收到的時候發送ack給發送者。以保證數據不丟失 props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); //申明生產者:泛型1爲分區key類型,泛型2爲消息類型 Producer<String, String> producer = new Producer<String, String>(config); //建立KeyedMessage發送消息,參數1爲topic名,參數2爲分區名(若爲null則隨機發到一個分區),參數3爲消息 producer.send(new ProducerData<String,String>("topic","partitionKey1","msg1")); //另外一種寫法producer.send(new ProducerRecord<String,String>("topic","partitionKey1","msg1")); producer.close();
java編寫consumer
Consumer API有兩個級別。低級別的和一個指定的broker保持鏈接,並在接收完消息後關閉鏈接,這個級別是無狀態的,每次讀取消息都帶着offset。
class SimpleConsumer {
/*向一個broker發送讀取請求並獲得消息集 */
public ByteBufferMessageSet fetch(FetchRequest request);
/*向一個broker發送讀取請求並獲得一個相應集 */
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
* 獲得指定時間以前的offsets
* 返回值是offsets列表,以倒序排序
* @param time: 時間,毫秒,
* 若是指定爲OffsetRequest$.MODULE$.LATIEST_TIME(), 獲得最新的offset.
* 若是指定爲OffsetRequest$.MODULE$.EARLIEST_TIME(),獲得最老的offset.
*/
publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
注意:
1.你必須本身實現當中止消費時如何持久化offset
2.你必須本身找到哪一個broker是leader以便處理topic和分區
3.你必須本身處理leader變動
使用階段:
1.找到那些broker是leader以便讀取topic和partition
2.本身決定哪一個副本做爲你的topic和分區
3.創建本身須要請求並自定義獲取你感興趣的數據
4.獲取數據
5.當leader變動時本身識別和恢復。
例子:
String topic = "test2";
int partition = 1;
String brokers = "172.17.1.163:9093";
int maxReads = 100; // 讀多少條數據
// 1.找leader
PartitionMetadata metadata = null;
for (String ipPort : brokers.split(",")) {
//咱們無須要把全部的brokers列表加進去,目的只是爲了得到metedata信息,故只要有broker可鏈接便可
SimpleConsumer consumer = null;
try {
String[] ipPortArray = ipPort.split(":");
consumer = new SimpleConsumer(ipPortArray[0],
Integer.parseInt(ipPortArray[1]), 100000, 64 * 1024,
"leaderLookup");
List<String> topics = new ArrayList<String>();
topics.add(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
// 取meta信息
TopicMetadataResponse resp = consumer.send(req)
//獲取topic的全部metedate信息(目測只有一個metedata信息,何來多個?)
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
//獲取每一個meta信息的分區信息,這裏咱們只取咱們關心的partition的metedata
System.out.println("----"+part.partitionId());
if (part.partitionId() == partition) {
metadata = part;
break;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + ipPort
+ "] to find Leader for [" + topic + ", " + partition
+ "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (metadata == null || metadata.leader() == null) {
System.out.println("meta data or leader not found, exit.");
return;
}
// 拿到leader
Broker leadBroker = metadata.leader();
// 獲取全部副本
System.out.println(metadata.replicas());
// 2.獲取lastOffset(這裏提供了兩種方式:從頭取或從最後拿到的開始取,下面這個是從頭取)
long whichTime = kafka.api.OffsetRequest.EarliestTime();
//這個是從最後拿到的開始取
// long whichTime = kafka.api.OffsetRequest.LatestTime();
System.out.println("lastTime:"+whichTime);
String clientName = "Client_" + topic + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker.host(),
leadBroker.port(), 100000, 64 * 1024, clientName);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
// 獲取指定時間前有效的offset列表
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return;
}
// 千萬不要認爲offset必定是從0開始的
long[] offsets = response.offsets(topic, partition);
System.out.println("offset list:" + Arrays.toString(offsets));
long offset = offsets[0];
// 讀數據
while (maxReads > 0) {
// 注意不要調用裏面的replicaId()方法,這是內部使用的。
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// 出錯處理。這裏只直接返回了。實際上能夠根據出錯的類型進行判斷,如code == ErrorMapping.OffsetOutOfRangeCode()表示拿到的offset錯誤
// 通常出錯處理能夠再次拿offset,或從新找leader,從新創建consumer。能夠將上面的操做都封裝成方法。再在該循環來進行消費
// 固然,在取全部leader的同時能夠用metadata.replicas()更新最新的節點信息。另外zookeeper可能不會當即檢測到有節點掛掉,故若是發現老的leader和新的leader同樣,多是leader根本沒掛,也多是zookeeper還沒檢測到,總之須要等等。
short code = fetchResponse.errorCode(topic, partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
return;
}
//取一批消息
boolean empty = true;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
topic, partition)) {
empty = false;
long curOffset = messageAndOffset.offset();
//下面這個檢測有必要,由於當消息是壓縮的時候,經過fetch獲取到的是一個整塊數據。塊中解壓後不必定第一個消息就是offset所指定的。就是說存在再次取到已讀過的消息。
if (curOffset < offset) {
System.out.println("Found an old offset: " + curOffset
+ " Expecting: " + offset);
continue;
}
// 能夠經過當前消息知道下一條消息的offset是多少
offset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
maxReads++;
}
//進入循環中,等待一會後獲取下一批數據
if(empty){
Thread.sleep(1000);
}
}
// 退出(這裏象徵性的寫一下)
if (consumer != null)
consumer.close();
高級別的API隱藏了和brokers鏈接的細節,在沒必要關心服務端架構的狀況下和服務端通訊。還能夠本身維護消費狀態,並能夠經過一些條件指定訂閱特定的topic,好比白名單黑名單或者正則表達式。
/* 建立鏈接 */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* 這個方法能夠獲得一個流的列表,每一個流都是MessageAndMetadata的迭代,經過MessageAndMetadata能夠拿到消息和其餘的元數據(目前以後topic)
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* 你也能夠獲得一個流的列表,它包含了符合TopicFiler的消息的迭代,
* 一個TopicFilter是一個封裝了白名單或黑名單的正則表達式。
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* 提交目前消費到的offset */
public commitOffsets()
/* 關閉鏈接 */
public shutdown()
}
這個API圍繞着由KafkaStream實現的迭代器展開,每一個流表明一系列從一個或多個分區多和broker上匯聚來的消息,每一個流由一個線程處理,因此客戶端能夠在建立的時候經過參數指定想要幾個流。一個流是多個分區多個broker的合併,可是每一個分區的消息只會流向一個流。
注意:
1.上層api將會內部實現持久化每一個分區最後讀到的消息的offset,數據保存在zookeeper中的消費組名中(如/consumers/id1/offsets/test2/2。其中id1是消費組,test2是topic,最後一個2表示第3個分區),每間隔一個很短的時間更新一次offset,那麼可能在重啓消費者時拿到重複的消息。此外,當分區leader發生變動時也可能拿到重複的消息。所以在關閉消費者時最好等待必定時間(10s)而後再shutdown()
2.消費組名是一個全局的信息,要注意在新的消費者啓動以前舊的消費者要關閉。若是新的進程啓動而且消費組名相同,kafka會添加這個進程到可用消費線程組中用來消費topic和觸發從新分配負載均衡,那麼同一個分區的消息就有可能發送到不一樣的進程中。
3.若是消費的線程多於分區數,一些線程可能永遠沒法看到一些消息。
4.若是分區數多於線程數,一些線程會收到多個分區的消息
5.若是一個線程對應了多個分區,那麼接收到的消息是不能保證順序的。
備註:可用zk的命令查詢:get /consumers/id1/owners/test3/2其中id1爲消費組,test3爲topic,2爲分區3.查看裏面的內容如:id1_163-PC-1382409386474-1091aef2-1表示該分區被該標示的線程所執行。
例子:
Properties props = new Properties();
// 指定zookeeper服務器地址
props.put("zookeeper.connect", "172.17.1.163:2181");
// 指定消費組(沒有它會自動添加)
props.put("group.id", "id1");
// 指定kafka等待多久zookeeper回覆(ms)以便放棄並繼續消費。
props.put("zookeeper.session.timeout.ms", "4000");
// 指定zookeeper同步最長延遲多久再產生異常
props.put("zookeeper.sync.time.ms", "2000");
// 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
// 咱們要告訴kafka該進程會有多少個線程來處理對應的topic
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
int a_numThreads = 3;
// 用3個線程來處理topic:test2
topicCountMap.put("test2", a_numThreads);
// 拿到每一個stream對應的topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test2");
// 調用thread pool來處理topic
ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(Thread.currentThread() + ":"
+ new String(it.next().message()));
}
}
});
}
System.in.read();
// 關閉
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();