本文將介紹如何在 Kafka 中使用 Avro 來序列化消息,並提供完整的 Producter 代碼共你們使用。html
Avro 是一個數據序列化的系統,它能夠將數據結構或對象轉化成便於存儲或傳輸的格式。Avro設計之初就用來支持數據密集型應用,適合於遠程或本地大規模數據的存儲和交換。由於本文並非專門介紹 Avro 的文章,如須要更加詳細地瞭解,請參見《Apache Avro使用入門指南》java
在使用 Avro 以前,咱們須要先定義模式(schemas)。模式一般使用 JSON 來編寫,咱們不須要再定義相關的類,這篇文章中,咱們將使用以下的模式:apache
{
"fields"
: [
{
"name"
:
"str1"
,
"type"
:
"string"
},
{
"name"
:
"str2"
,
"type"
:
"string"
},
{
"name"
:
"int1"
,
"type"
:
"int"
}
],
"name"
:
"Iteblog"
,
"type"
:
"record"
}
|
上面的模式中,咱們定義了一種 record 類型的對象,名字爲 Iteblog
,這個對象包含了兩個字符串和一個 int 類型的fields。定義好模式以後,咱們可使用 avro 提供的相應方法來解析這個模式:bootstrap
Schema.Parser parser =
new
Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
|
這裏的 USER_SCHEMA
變量存儲的就是上面定義好的模式。api
解析好模式定義的對象以後,咱們須要將這個對象序列化成字節數組,或者將字節數組轉換成對象。Avro 提供的 API 不太易於使用,因此本文使用 twitter 開源的 Bijection 庫來方便地實現這些操做。咱們先建立 Injection
對象來說對象轉換成字節數組:數組
Injection<GenericRecord,
byte
[]> recordInjection = GenericAvroCodecs.toBinary(schema);
|
如今咱們能夠根據以前定義好的模式來建立相關的 Record,並使用 recordInjection
來序列化這個 Record :bash
GenericData.Record record =
new
GenericData.Record(schema);
avroRecord.put(
"str1"
,
"My first string"
);
avroRecord.put(
"str2"
,
"My second string"
);
avroRecord.put(
"int1"
,
42
);
byte
[] bytes = recordInjection.apply(record);
|
有了上面的介紹以後,咱們如今就能夠在 Kafka 中使用 Avro 來序列化咱們須要發送的消息了:數據結構
package
com.iteblog.avro;
import
com.twitter.bijection.Injection;
import
com.twitter.bijection.avro.GenericAvroCodecs;
import
org.apache.avro.Schema;
import
org.apache.avro.generic.GenericData;
import
org.apache.avro.generic.GenericRecord;
import
org.apache.kafka.clients.producer.<span
class
=
"wp_keywordlink_affiliate"
><a href=
"https://www.iteblog.com/archives/tag/kafka/"
title=
""
target=
"_blank"
data-original-title=
"View all posts in Kafka"
>Kafka</a></span>Producer;
import
org.apache.kafka.clients.producer.ProducerRecord;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
java.util.Properties;
/**
* Created by yangping.wu on 2017-07-20.
*/
public
class
AvroKafkaProducter {
Logger logger = LoggerFactory.getLogger(
"AvroKafkaProducter"
);
public
static
final
String USER_SCHEMA =
"{"
+
"\"type\":\"record\","
+
"\"name\":\"Iteblog\","
+
"\"fields\":["
+
" { \"name\":\"str1\", \"type\":\"string\" },"
+
" { \"name\":\"str2\", \"type\":\"string\" },"
+
" { \"name\":\"int1\", \"type\":\"int\" }"
+
"]}"
;
public
static
void
main(String[] args)
throws
InterruptedException {
Properties props =
new
Properties();
props.put(
"bootstrap.servers"
,
"www.iteblog.com:9092"
);
props.put(
"key.serializer"
,
"org.apache.kafka.common.serialization.StringSerializer"
);
props.put(
"value.serializer"
,
"org.apache.kafka.common.serialization.ByteArraySerializer"
);
Schema.Parser parser =
new
Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord,
byte
[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String,
byte
[]> producer =
new
KafkaProducer<>(props);
for
(
int
i =
0
; i <
1000
; i++) {
GenericData.Record avroRecord =
new
GenericData.Record(schema);
avroRecord.put(
"str1"
,
"Str 1-"
+ i);
avroRecord.put(
"str2"
,
"Str 2-"
+ i);
avroRecord.put(
"int1"
, i);
byte
[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String,
byte
[]> record =
new
ProducerRecord<>(
"iteblog"
,
""
+ i, bytes);
producer.send(record);
Thread.sleep(
250
);
}
producer.close();
}
}
|
由於咱們使用到 Avro 和 Bijection 類庫,全部咱們須要在 pom.xml
文件裏面引入如下依賴:app
<
dependency
>
<
groupId
>org.apache.avro</
groupId
>
<
artifactId
>avro</
artifactId
>
<
version
>1.8.0</
version
>
</
dependency
>
<
dependency
>
<
groupId
>com.twitter</
groupId
>
<
artifactId
>bijection-avro_2.10</
artifactId
>
<
version
>0.9.2</
version
>
</
dependency
>
|
如今一切準備就緒,咱們可使用下面的命令來運行這個消息發送者了。運行這個程序咱們須要準備好 avro-1.8.1.jar,slf4j-api-1.7.21.jar,log4j-1.2.17.jar,slf4j-log4j12-1.7.7.jar 以及 scala-library.jar等相關Jar包,爲了方便我將這些 jar 包放到 lib 目錄下,而後咱們以下編寫運行的腳本:post
CLASSPATH=$CLASSPATH:
for
i
in
/home/iteblog/lib/
*.jar ;
do
CLASSPATH=$CLASSPATH:$i
done
java -
cp
$CLASSPATH:flink-kafka-1.0-SNAPSHOT.jar com.iteblog.avro.AvroKafkaProducter
|
固然,咱們也能夠將全部這些依賴所有打包進 flink-kafka-1.0-SNAPSHOT.jar 裏面,成爲一個 fat 包,這時候咱們就不須要單獨添加其餘的依賴了。