最近一直在研究若是提升kafka中讀取效率,以前一直使用字符串
的方式將數據寫入到kafka中。當數據將特別大的時候發現效率不是很好,偶然之間接觸到了Avro
序列化,發現kafka也是支持Avro的方式因而就有了本篇文章。java
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
{
"namespace": "com.avro.bean",
"type": "record",
"name": "UserBehavior",
"fields": [
{"name": "userId", "type": "long"},
{"name": "itemId", "type": "long"},
{"name": "categoryId", "type": "int"},
{"name": "behavior", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
注意: 建立的文件後綴名必定要叫 avsc
node
咱們使用idea 生成 UserBehavior
對象git
首先咱們先使用 Java編寫Kafka客戶端寫入數據和消費數據。程序員
543462,1715,1464116,pv,1511658000 662867,2244074,1575622,pv,1511658000 561558,3611281,965809,pv,1511658000 894923,3076029,1879194,pv,1511658000 834377,4541270,3738615,pv,1511658000 315321,942195,4339722,pv,1511658000 625915,1162383,570735,pv,1511658000github
首先咱們須要實現2個類分別爲Serializer
和Deserializer
分別是序列化和反序列化面試
package com.avro.AvroUtil;
import com.avro.bean.UserBehavior;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* @author 大數據老哥
* @version V1.0
* @Package com.avro.AvroUtil
* @File :SimpleAvroSchemaJava.java
* @date 2021/1/8 20:02
*/
/**
* 自定義序列化和反序列化
*/
public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
//序列化方法
@Override
public byte[] serialize(String s, UserBehavior userBehavior) {
// 建立序列化執行器
SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
// 建立一個流 用存儲序列化後的二進制文件
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 建立二進制編碼器
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
try {
// 數據入都流中
writer.write(userBehavior, encoder);
} catch (IOException e) {
e.printStackTrace();
}
return out.toByteArray();
}
@Override
public void close() {
}
//反序列化
@Override
public UserBehavior deserialize(String s, byte[] bytes) {
// 用來保存結果數據
UserBehavior userBehavior = new UserBehavior();
// 建立輸入流用來讀取二進制文件
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
// 建立輸入序列化執行器
SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
//建立二進制解碼器
BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
try {
// 數據讀取
userBehavior=stockSpecificDatumReader.read(null, binaryDecoder);
} catch (IOException e) {
e.printStackTrace();
}
// 結果返回
return userBehavior;
}
}
package com.avro.kafka;
import com.avro.bean.UserBehavior;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @author 大數據老哥
* @version V1.0
* @Package com.avro.kafka
* @File :UserBehaviorProducerKafka.java
* @date 2021/1/8 20:14
*/
public class UserBehaviorProducerKafka {
public static void main(String[] args) throws InterruptedException {
// 獲取數據
List<UserBehavior> data = getData();
// 建立配置文件
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "com.avro.AvroUtil.SimpleAvroSchemaJava");
// 建立kafka的生產者
KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<String, UserBehavior>(props);
// 循環遍歷數據
for (UserBehavior userBehavior : data) {
ProducerRecord<String, UserBehavior> producerRecord = new ProducerRecord<String, UserBehavior>("UserBehaviorKafka", userBehavior);
userBehaviorProducer.send(producerRecord);
System.out.println("數據寫入成功"+data);
Thread.sleep(1000);
}
}
public static List<UserBehavior> getData() {
ArrayList<UserBehavior> userBehaviors = new ArrayList<UserBehavior>();
try {
BufferedReader br = new BufferedReader(new FileReader(new File("data/UserBehavior.csv")));
String line = "";
while ((line = br.readLine()) != null) {
String[] split = line.split(",");
userBehaviors.add( new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4])));
}
} catch (Exception e) {
e.printStackTrace();
}
return userBehaviors;
}
}
注意:value.serializer
必定要指定咱們本身寫好的那個反序列化類,負責會無效apache
package com.avro.kafka;
import com.avro.bean.UserBehavior;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* @author 大數據老哥
* @version V1.0
* @Package com.avro.kafka
* @File :UserBehaviorConsumer.java
* @date 2021/1/8 20:58
*/
public class UserBehaviorConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
prop.put("group.id", "UserBehavior");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 設置反序列化類爲自定義的avro反序列化類
prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaJava");
KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<String, UserBehavior>(prop);
consumer.subscribe(Arrays.asList("UserBehaviorKafka"));
while (true) {
ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000);
for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) {
System.out.println(stringStockConsumerRecord.value());
}
}
}
}
建立kafkaTopic 和啓動一個消費者bootstrap
# 建立topic
./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic UserBehaviorKafka
# 模擬消費者
./kafka-console-consumer.sh --from-beginning --topic UserBehaviorKafka --zookeeper node01:2181,node02:2node03:2181
Java實現api
到這裏好多小夥們就說我Java實現了那Flink 不就改一下Consumer 和Producer 不就完了嗎?數據結構
543462,1715,1464116,pv,1511658000 662867,2244074,1575622,pv,1511658000 561558,3611281,965809,pv,1511658000 894923,3076029,1879194,pv,1511658000 834377,4541270,3738615,pv,1511658000 315321,942195,4339722,pv,1511658000 625915,1162383,570735,pv,1511658000
當咱們建立FlinkKafka鏈接器的時候發現使用Java那個類序列化發現不行,因而咱們改成了系統自帶的那個類進行測試。點擊源碼查看發系統自帶的那個String其實實現的是DeserializationSchema
和SerializationSchema
,那咱們是否是也能夠模仿一個那?
package com.avro.AvroUtil;
import com.avro.bean.UserBehavior;
import com.typesafe.sslconfig.ssl.FakeChainedKeyStore;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* @author 大數據老哥
* @version V1.0
* @Package com.avro.AvroUtil
* @File :SimpleAvroSchemaFlink.java
* @date 2021/1/8 20:02
*/
/**
* 自定義序列化和反序列化
*/
public class SimpleAvroSchemaFlink implements DeserializationSchema<UserBehavior>, SerializationSchema<UserBehavior> {
@Override
public byte[] serialize(UserBehavior userBehavior) {
// 建立序列化執行器
SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
// 建立一個流 用存儲序列化後的二進制文件
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 建立二進制編碼器
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
try {
// 數據入都流中
writer.write(userBehavior, encoder);
} catch (IOException e) {
e.printStackTrace();
}
return out.toByteArray();
}
@Override
public TypeInformation<UserBehavior> getProducedType() {
return TypeInformation.of(UserBehavior.class);
}
@Override
public UserBehavior deserialize(byte[] bytes) throws IOException {
// 用來保存結果數據
UserBehavior userBehavior = new UserBehavior();
// 建立輸入流用來讀取二進制文件
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
// 建立輸入序列化執行器
SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
//建立二進制解碼器
BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
try {
// 數據讀取
userBehavior=stockSpecificDatumReader.read(null, binaryDecoder);
} catch (IOException e) {
e.printStackTrace();
}
// 結果返回
return userBehavior;
}
@Override
public boolean isEndOfStream(UserBehavior userBehavior) {
return false;
}
}
package com.avro.FlinkKafka
import com.avro.AvroUtil.{SimpleAvroSchemaFlink}
import com.avro.bean.UserBehavior
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
/**
* @Package com.avro.FlinkKafka
* @File :UserBehaviorConsumerFlink.java
* @author 大數據老哥
* @date 2021/1/8 21:18
* @version V1.0
*/
object UserBehaviorConsumerFlink {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 設置並行度1 方便後面測試
// 2.設置kafka 配置信息
val prop = new Properties
prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092")
prop.put("group.id", "UserBehavior")
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// 設置反序列化類爲自定義的avro反序列化類
prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaFlink")
// val kafka: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("UserBehaviorKafka", new SimpleStringSchema(), prop)
// 3.構建Kafka 鏈接器
val kafka: FlinkKafkaConsumer011[UserBehavior] = new FlinkKafkaConsumer011[UserBehavior]("UserBehavior", new SimpleAvroSchemaFlink(), prop)
//4.設置Flink層最新的數據開始消費
kafka.setStartFromLatest()
//5.基於kafka構建數據源
val data: DataStream[UserBehavior] = env.addSource(kafka)
//6.結果打印
data.print()
env.execute("UserBehaviorConsumerFlink")
}
}
package com.avro.FlinkKafka
import com.avro.AvroUtil.SimpleAvroSchemaFlink
import com.avro.bean.UserBehavior
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import java.util.Properties
/**
* @Package com.avro.FlinkKafka
* @File :UserBehaviorProducerFlink.java
* @author 大數據老哥
* @date 2021/1/8 21:38
* @version V1.0
*/
object UserBehaviorProducerFlink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val value = env.readTextFile("./data/UserBehavior.csv")
val users: DataStream[UserBehavior] = value.map(row => {
val arr = row.split(",")
val behavior = new UserBehavior()
behavior.setUserId(arr(0).toLong)
behavior.setItemId(arr(1).toLong)
behavior.setCategoryId(arr(2).toInt)
behavior.setBehavior(arr(3))
behavior.setTimestamp(arr(4).toLong)
behavior
})
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
//4.鏈接Kafka
val producer: FlinkKafkaProducer011[UserBehavior] = new FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop)
//5.將數據打入kafka
users.addSink(producer)
//6.執行任務
env.execute("UserBehaviorProducerFlink")
}
}
須要源碼的請去GitHub 自行下載 https://github.com/lhh2002/Flink_Avro
其實我在實現這個功能的時候也是蒙的,不會難道就不學了嗎,確定不是呀。我在5.2提出的那個問題的時候實際上是我本身親身經歷過的。首先遇到了問題不要想着怎麼放棄,而是想一想怎麼解決,當時個人思路看源碼
看別人寫的。最後通過不懈的努力也終成功了,我在這裏爲你們提供Flink面試題
須要的朋友能夠去下面GitHub去下載,信本身,努力和汗水總會能獲得回報的。我是大數據老哥,咱們下期見~~~
資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去
GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData
Gitee 自行下載 https://gitee.com/li_hey_hey/Framework-Of-BigData