在Kafka中使用Avro編碼消息:Producter篇

本文將介紹如何在 Kafka 中使用 Avro 來序列化消息,並提供完整的 Producter 代碼共你們使用。html

Avro

Avro 是一個數據序列化的系統,它能夠將數據結構或對象轉化成便於存儲或傳輸的格式。Avro設計之初就用來支持數據密集型應用,適合於遠程或本地大規模數據的存儲和交換。由於本文並非專門介紹 Avro 的文章,如須要更加詳細地瞭解,請參見《Apache Avro使用入門指南》java

 

在使用 Avro 以前,咱們須要先定義模式(schemas)。模式一般使用 JSON 來編寫,咱們不須要再定義相關的類,這篇文章中,咱們將使用以下的模式:apache

{
     "fields" : [
         { "name" : "str1" , "type" : "string" },
         { "name" : "str2" , "type" : "string" },
         { "name" : "int1" , "type" : "int" }
     ],
     "name" : "Iteblog" ,
     "type" : "record"
}

上面的模式中,咱們定義了一種 record 類型的對象,名字爲 Iteblog,這個對象包含了兩個字符串和一個 int 類型的fields。定義好模式以後,咱們可使用 avro 提供的相應方法來解析這個模式:bootstrap

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

這裏的 USER_SCHEMA 變量存儲的就是上面定義好的模式。api

解析好模式定義的對象以後,咱們須要將這個對象序列化成字節數組,或者將字節數組轉換成對象。Avro 提供的 API 不太易於使用,因此本文使用 twitter 開源的 Bijection 庫來方便地實現這些操做。咱們先建立 Injection 對象來說對象轉換成字節數組:數組

Injection<GenericRecord, byte []> recordInjection = GenericAvroCodecs.toBinary(schema);

如今咱們能夠根據以前定義好的模式來建立相關的 Record,並使用 recordInjection 來序列化這個 Record :bash

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put( "str1" , "My first string" );
avroRecord.put( "str2" , "My second string" );
avroRecord.put( "int1" , 42 );
 
byte [] bytes = recordInjection.apply(record);

Producter實現

有了上面的介紹以後,咱們如今就能夠在 Kafka 中使用 Avro 來序列化咱們須要發送的消息了:數據結構

package com.iteblog.avro;
 
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.<span class = "wp_keywordlink_affiliate" ><a href= "https://www.iteblog.com/archives/tag/kafka/" title= "" target= "_blank" data-original-title= "View all posts in Kafka" >Kafka</a></span>Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Properties;
 
/**
  * Created by yangping.wu on 2017-07-20.
  */
public class AvroKafkaProducter {
     Logger logger = LoggerFactory.getLogger( "AvroKafkaProducter" );
     public static final String USER_SCHEMA = "{"
             + "\"type\":\"record\","
             + "\"name\":\"Iteblog\","
             + "\"fields\":["
             + "  { \"name\":\"str1\", \"type\":\"string\" },"
             + "  { \"name\":\"str2\", \"type\":\"string\" },"
             + "  { \"name\":\"int1\", \"type\":\"int\" }"
             + "]}" ;
 
     public static void main(String[] args) throws InterruptedException {
         Properties props = new Properties();
         props.put( "bootstrap.servers" , "www.iteblog.com:9092" );
         props.put( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
         props.put( "value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer" );
 
         Schema.Parser parser = new Schema.Parser();
         Schema schema = parser.parse(USER_SCHEMA);
         Injection<GenericRecord, byte []> recordInjection = GenericAvroCodecs.toBinary(schema);
 
         KafkaProducer<String, byte []> producer = new KafkaProducer<>(props);
 
         for ( int i = 0 ; i < 1000 ; i++) {
             GenericData.Record avroRecord = new GenericData.Record(schema);
             avroRecord.put( "str1" , "Str 1-" + i);
             avroRecord.put( "str2" , "Str 2-" + i);
             avroRecord.put( "int1" , i);
 
             byte [] bytes = recordInjection.apply(avroRecord);
 
             ProducerRecord<String, byte []> record = new ProducerRecord<>( "iteblog" , "" + i, bytes);
             producer.send(record);
             Thread.sleep( 250 );
 
         }
 
         producer.close();
     }
}

由於咱們使用到 Avro 和 Bijection 類庫,全部咱們須要在 pom.xml 文件裏面引入如下依賴:app

< dependency >
   < groupId >org.apache.avro</ groupId >
   < artifactId >avro</ artifactId >
   < version >1.8.0</ version >
</ dependency >
 
< dependency >
   < groupId >com.twitter</ groupId >
   < artifactId >bijection-avro_2.10</ artifactId >
   < version >0.9.2</ version >
</ dependency >

運行

如今一切準備就緒,咱們可使用下面的命令來運行這個消息發送者了。運行這個程序咱們須要準備好 avro-1.8.1.jar,slf4j-api-1.7.21.jar,log4j-1.2.17.jar,slf4j-log4j12-1.7.7.jar 以及 scala-library.jar等相關Jar包,爲了方便我將這些 jar 包放到 lib 目錄下,而後咱們以下編寫運行的腳本:post

CLASSPATH=$CLASSPATH:
  
for i in /home/iteblog/lib/ *.jar ; do
     CLASSPATH=$CLASSPATH:$i
done
 
java - cp $CLASSPATH:flink-kafka-1.0-SNAPSHOT.jar com.iteblog.avro.AvroKafkaProducter

固然,咱們也能夠將全部這些依賴所有打包進 flink-kafka-1.0-SNAPSHOT.jar 裏面,成爲一個 fat 包,這時候咱們就不須要單獨添加其餘的依賴了。

相關文章
相關標籤/搜索