動態路由:
方案1: 定製一個特殊的KafkaDynamicSink,內嵌多個原生的FlinkKafkaProducer,每一個對應一個下游的KAFKA隊列
在OPEN方法中讀取全部KAFKA渠道配置並構建FlinkKafkaProducer並構建一個Map: kafka channelId -> FlinkKafkaProducer
重載INVOKE方法
根據路由規則找到當前流數據對應全部的ChannelId (容許多個),再從MAP中獲取對 FlinkKafkaProducer 並調用其INVOKE方法
核心代碼:
public class DynamicKafkaSink<IN> extends RichSinkFunction<IN> {
@Override
public void open(Configuration parameters) throws Exception {
List<ChannelModel> allChannels = channelRepository.getAll();
for(ChannelModel nextChannel: allChannels) {
FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010<String>)channelFactory.createChannelProducer(nextChannel,
FlinkKafkaProducer010.class, Collections.emptyMap());
nextProducer.setRuntimeContext(this.getRuntimeContext());
nextProducer.open(parameters);
producers.put(nextChannel.getChannelId(), nextProducer);
}
}
@Override
public void invoke(IN value) throws Exception {
List<String> channelIds = channelRouteStrategy.route(value);
for (String nextChannelId: channelIds) {
FlinkKafkaProducer010 nextProducer = producers.get(nextChannelId);
nextProducer.invoke(converted);
}
}
}
注意:
Map不能在構造函數中初始化,而要在OPEN方法中初始化,FLINK分佈式特性決定了構造函數和OPEN不在同一個JVM裏執行
類級別的變量須要可序列化,不然須要聲明爲TRANSIENT
每一個新構建的FlinkKafkaProducer須要先調用
setRuntimeContext(this.getRuntimeContext())
再調用open 方法才能被使用
優勢:
能夠路由到不一樣的BROKER上的TOPIC,在不一樣的BROKER上隔離性更好
缺陷:
全部的FlinkKafkaProducer只在OPEN的時候建立一次,後面若是添加了新的KAFKA隊列沒法被動態感知並路由
更改了FlinkKafkaProducer建立和初始化的過程,從MAIN函數中轉到了KafkaDynamicSink的OPEN方法裏,未通過全面測試,可能存在問題
方案2:方案1的升級版,利用FLINK SPLIT STREAM的特性,根據路由規則將原生數據流分紅多個,每一個子數據流對應一個下游KAFKA隊列
在FLINK Main 函數中讀取全部KAFKA渠道配置並構建FlinkKafkaProducer並構建一個Map: kafka channelId -> FlinkKafkaProducer
在輸入流上構建一個SplitStream, OutputSelector 中根據路由邏輯返回一組ChannelId
遍歷Map,對於Map中的每一個Key (ChannelID) 調用 SplitStream 的 select方法獲取對應的分支流數據,而後路由到對應的 FlinkKafkaProducer
核心代碼:
public static void main(String[] args) {
List<ChannelModel> allChannels = channelRepository.getAll();
for(ChannelModel nextChannel: allChannels) {
FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010<String>)channelFactory.createChannelProducer(nextChannel,
FlinkKafkaProducer010.class, Collections.emptyMap());
nextProducer.setRuntimeContext(this.getRuntimeContext());
nextProducer.open(parameters);
producers.put(nextChannel.getChannelId(), nextProducer);
}
DataStreamSource<T> source = ....
SplitStream<T> splitStream = source.split(new OutputSelector<T>() {
@Override
public Iterable<String> select(String value) {
List<String> channelIds = channelRouteStrategy.route(value);
return channeIds;
}
});
for(String nextChannel: producers.keySet()) {
FlinkKafkaProducer010 target = producers.get(nextChannel);
splitStream.select(nextChannel).addSink(target);
}
}
優勢:
能夠路由到不一樣的BROKER上的TOPIC,在不一樣的BROKER上隔離性更好
徹底利用FLINK原生的特性,更加簡潔優雅,解決了方案1的第二點不足
缺陷:
全部的FlinkKafkaProducer只在MAIN函數中建立一次,後面若是添加了新的KAFKA隊列沒法被動態感知並路由
方案3: 利用FLINK的 KeyedSerializationSchema中的getTargetTopic函數,KeyedSerializationSchema 除了將對象轉化Kafka ProducerRecord
的鍵值對以外還能夠動態指定Topic
在FLINK Main 函數中將輸入流經過flatMap 轉化爲 Tuple2, 其中key 是目標所屬的Topic, value 是原生數據
實現一個KeyedSerializationSchema做爲構造函數傳給FlinkKafkaProducer,重載getTargetTopic方法: 返回 tuple2.f0
核心代碼:
class DynaRouteSerializationSchema implements KeyedSerializationSchema {
String getTargetTopic(T element) {
Tuple2 tuple = (Tuple2)element;
return tuple.f0;
}
}
public static void main(String[] args) {
DataStreamSource<T> source = ....
DataStream<Tuple2<String, T>> converted = source
.flatMap(new RichFlatMapFunction<Object, Tuple2<String, T>>() {
@Override
public void flatMap(T value, Collector<Tuple2<String, Object>> out)
throws Exception {
List<String> channelIds = channelRouteStrategy.route(value);
for(String nextChannel: channelIds) {
out.collect(Tuple2.valueOf(nextChannel, value));
}
}
});
}
優勢:
徹底利用FLINK原生的特性,代碼量很是少
新增長的TOPIC也能夠被路由到,不須要啓停流處理
缺陷:
沒法像前兩個方案實現Broker級別的路由,只能作到Topic級別的路由
斷流功能:分佈式
有時系統升級或者其餘組件不可用,須要暫時中止KAFKA PRODUCER
FLINK 原生機制:
被動反壓:
Kafka09Fetcher 包含了一根獨立的 KafkaConsumerThread,從KAFKA中讀取數據,再交給HANDOVER
HANDOVER能夠理解爲一個大小爲1的隊列, Kafka09Fetcher 再從隊列中獲取並處理數據,一旦當處理速度變慢,KafkaConsumerThread
沒法將數據寫入HANDOVER, 線程就會被阻塞
另外KeyedDeserializationSchema定義了一個isEndOfStream方法,若是返回true, Kafka09Fetcher就會中止循環並退出,致使整個流處理結束ide
設計思路:
函數
SignalService: 註冊SignalListener, 利用Curator TreeCache 監聽一個Zookeeper 路徑獲取起動/中止流處理的信號量測試
SignalListener: 接收ZOOKEEPER變動信息的回調接口fetch
PausableKafkaFetcher: 繼承Flink原生的KafkaFetcher, 監聽到信號變化阻塞ConsumerThread的處理
this
PausableKafkaConsumer: 繼承Flink原生的KafkaConsumer, 建立PausableKafkaFetcher
spa
核心代碼:線程
public class PausableKafkaFetcher<T> extends Kafka010Fetcher<T> implements SignalListener {設計
private final ReentrantLock pauseLock = new ReentrantLock(true);對象
private final Condition pauseCond = pauseLock.newCondition();
private volatile boolean paused = false;
public void onSignal(String path, String value) {
try {
pauseLock.lockInterruptibly();
} catch(InterruptedException e) {
}
try {
if (SIGNAL_PAUSE.equals(value)) {
paused = true;
} else if (SIGNAL_START.equals(value)) {
paused = false;
}
pauseCond.signal();
}
finally {
pauseLock.unlock();
}
}
protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord<?,?> consumerRecord) throws Exception {
super.emitRecord(record, partition, offset, consumerRecord);
pauseLock.lockInterruptibly();
try {
while (paused) {
pauseCond.await();
}
} finally {
pauseLock.unlock();
}
}
}
public class PausableKafkaConsumer<T> extends FlinkKafkaConsumer010<T> {
public void open(Configuration configuration) {
signalService = ZKSignalService.getInstance();
signalService.initialize(zkConfig);
}
public void cancel() {
super.cancel();
unregisterSignal();
}
public void close() {
super.close();
unregisterSignal();
}
private void unregisterSignal() {
if (signalService != null) {
String fullPath = WATCH_PREFIX + "/" + watchPath;
signalService.unregisterSignalListener(fullPath);
}
}
protected AbstractFetcher createFetcher(...) throws Exception {
PausableKafkaFetcher<T> fetcher = new PausableKafkaFetcher<> (...);
if (signalService != null) {
String fullPath = WATCH_PREFIX + "/" + watchPath;
signalService.registerSignalListener(fullPath, fetcher);
}
return fetcher
}
}