概述:
css
flink kafka實時流計算時都是用默認的序列化和分區器,這篇文章主要介紹如何向Kafka發送消息,並自定義消息的key,value,自定義消息分區類,這裏選擇最新的Flink1.9.1進行講解。
java
自定義序列化類KeyedSerializationSchema: apache
一般咱們都是用默認的序列化類來發送一條消息,有時候咱們須要執行發送消息的key,value值,或者解析消息體後,在消息的key或者value加一個固定的前綴,這時候咱們就須要自定義他的序列化類,Flink提供了可自定的的序列化基類KeyedSerializationSchema,這裏先看下他的源碼,:bootstrap
package org.apache.flink.streaming.util.serialization;import java.io.Serializable;import org.apache.flink.annotation.PublicEvolving;/** @deprecated */@Deprecated@PublicEvolvingpublic interface KeyedSerializationSchema<T> extends Serializable { byte[] serializeKey(T var1);
byte[] serializeValue(T var1);
String getTargetTopic(T var1);}
是否是很簡單 ,子類只須要自定義以上三個函數便可,這裏我自定義序列化類CustomKeyedSerializationSchema,這裏實現比較簡單,只是將消息體進行拆分,分別在消息的鍵值加了前綴,代碼以下:
api
package com.hadoop.ljs.flink.utils;import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;import java.util.Map;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-24 20:57 * @version: v1.0 * @description: com.hadoop.ljs.flink.utils */public class CustomKeyedSerializationSchema implements KeyedSerializationSchema<String> { public byte[] serializeKey(String s) { /*根據傳過來的消息,自定義key*/ String[] line=s.split(","); System.out.println("key::::"+line[0]); return ("key--"+line[0]).getBytes(); } public byte[] serializeValue(String s) { /*根據傳過來的消息,自定義value*/ String[] line=s.split(","); System.out.println("value::::"+line[1]); return ("value--"+line[1]).getBytes(); } public String getTargetTopic(String topic) { /*這裏是目標topic,通常不須要操做*/ System.out.println("topic::::"+topic); return null; }}
自定義分區類FlinkKafkaPartitioner微信
自定義分區類須要繼承他的基類,只須要實現他的抽象函數partition()便可,源碼以下:架構
package org.apache.flink.streaming.connectors.kafka.partitioner;import java.io.Serializable;import org.apache.flink.annotation.PublicEvolving;public abstract class FlinkKafkaPartitioner<T> implements Serializable { private static final long serialVersionUID = -9086719227828020494L; public FlinkKafkaPartitioner() { } public void open(int parallelInstanceId, int parallelInstances) { } public abstract int partition(T var1, byte[] var2, byte[] var3, String var4, int[] var5);}
自定義分區類CustomFlinkKafkaPartitioner,我這裏只是簡單的實現,你可根據本身的業務需求,自定義:
運維
package com.hadoop.ljs.flink.utils;import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-24 21:00 * @version: v1.0 * @description: com.hadoop.ljs.flink.utils */public class CustomFlinkKafkaPartitioner extends FlinkKafkaPartitioner { /** * @param record 正常的記錄 * @param key KeyedSerializationSchema中配置的key * @param value KeyedSerializationSchema中配置的value * @param targetTopic targetTopic * @param partitions partition列表[0, 1, 2, 3, 4] * @return partition */ public int partition(Object record, byte[] key, byte[] value, String targetTopic, int[] partitions) { //這裏接收到的key是上面CustomKeyedSerializationSchema()中序列化後的key,須要轉成string,而後取key的hash值`%`上kafka分區數量 System.out.println("分區的數據量:"+partitions.length); int partion=Math.abs(new String(key).hashCode() % partitions.length); /*System.out.println("發送分區:"+partion);*/ return partion; }}
主函數:
socket
個人主函數是從Socket端接收消息,寫入Kafka集羣,這裏只是個例子實現比較簡單,代碼以下:
ide
package com.hadoop.ljs.flink.streaming;import com.hadoop.ljs.flink.utils.CustomFlinkKafkaPartitioner;import com.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-24 21:27 * @version: v1.0 * @description: com.hadoop.ljs.flink.utils */public class FlinkKafkaProducer {
public static final String topic="topic2402"; public static final String bootstrap_server="10.124.165.31:6667,10.124.165.32:6667"; public static void main(String[] args) throws Exception { final String hostname="localhost"; final int port=9000; /*獲取flink流式計算執行環境*/ final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*從Socket端接收數據*/ DataStream<String> dataSource = senv.socketTextStream(hostname, port, "\n"); /*下面能夠根據本身的需求進行自動的轉換*/ /* SingleOutputStreamOperator<Map<String, String>> messageStream = dataSource.map(new MapFunction<String, Map<String, String>>() { @Override public Map<String, String> map(String value) throws Exception { System.out.println("接收到的數據:"+value); Map<String, String> message = new HashMap<>(); String[] line = value.split(","); message.put(line[0], line[1]); return message; } });*/ /*接收的數據,中間可通過複雜的處理,最後發送到kafka端*/ dataSource.addSink(new FlinkKafkaProducer010<String>(topic, new CustomKeyedSerializationSchema(), getProducerProperties(),new CustomFlinkKafkaPartitioner())); /*啓動*/ senv.execute("FlinkKafkaProducer"); } /*獲取Kafka配置*/ public static Properties getProducerProperties(){ Properties props = new Properties(); props.put("bootstrap.servers",bootstrap_server);//kafka的節點的IP或者hostName,多個使用逗號分隔 props.put("acks", "1"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return props; }}
測試驗證:
從window命令行,經過socket端9000端口發送數據,主函數接收消息進行處理,而後發送kafka:
kafka接收消息,持久化到log中,如圖:
這裏FLink的默認序列化和分區的知識我以後會寫一篇文章詳細講解,在一個kafka沒有通過SSL加密認證,加密後的Kafka集羣如何與Flink進行集成,後面我都會統一進行講解,敬請關注!!!!
本文分享自微信公衆號 - 大數據開發運維架構(JasonLu1986)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。