Flink基礎:實時處理管道與ETL

 

往期推薦:api

Flink基礎:入門介紹網絡

Flink基礎:DataStream API分佈式

Flink深刻淺出:資源管理ide

Flink深刻淺出:部署模式函數

Flink深刻淺出:內存模型源碼分析

Flink深刻淺出:JDBC Source從理論到實戰this

Flink深刻淺出:Sql Gateway源碼分析spa

Flink深刻淺出:JDBC Connector源碼分析設計

Flink的經典使用場景是ETL,即Extract抽取、Transform轉換、Load加載,能夠從一個或多個數據源讀取數據,通過處理轉換後,存儲到另外一個地方,本篇將會介紹如何使用DataStream API來實現這種應用。注意Flink Table和SQL 
api 會很適合來作ETL,可是不妨礙從底層的DataStream API來了解其中的細節。3d

1 無狀態的轉換

無狀態即不須要在操做中維護某個中間狀態,典型的例子如map和flatmap。

map()

下面是一個轉換操做的例子,須要根據輸入數據建立一個出租車起始位置和目標位置的對象。首先定義出租車的位置對象:

public static class EnrichedRide extends TaxiRide {
    public int startCell;
    public int endCell;

    public EnrichedRide() {}

    public EnrichedRide(TaxiRide ride) {
        this.rideId = ride.rideId;
        this.isStart = ride.isStart;
        ...
        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
    }

    public String toString() {
        return super.toString() + "," +
            Integer.toString(this.startCell) + "," +
            Integer.toString(this.endCell);
    }
}
 

使用的時候能夠註冊一個MapFunction,該函數接收TaxiRide對象,輸出EnrichRide對象。

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

使用時只須要建立map對象便可:

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .filter(new RideCleansingSolution.NYCFilter())
    .map(new Enrichment());

enrichedNYCRides.print();

 

 

flatmap()

MapFunction適合一對一的轉換,對於輸入流的每一個元素都有一個元素輸出。若是須要一對多的場景,可使用flatmap:

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .flatMap(new NYCEnrichment());

enrichedNYCRides.print();

FlatMapFunction的定義:

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

經過collector,能夠在flatmap中任意添加零個或多個元素。

2 Keyed Streams

keyBy()

有時須要對數據流按照某個字段進行分組,每一個事件會根據該字段相同的值彙總到一塊兒。好比,但願查找相同出發位置的路線。若是在SQL中可能會使用GROUP BY startCell,在Flink中能夠直接使用keyBy函數:

rides
    .flatMap(new NYCEnrichment())
    .keyBy(value -> value.startCell)

keyBy會引發重分區而致使網絡數據shuffle,一般這種代價都很昂貴,由於每次shuffle時須要進行數據的序列化和反序列化,既浪費CPU資源,又佔用網絡帶寬。

經過對startCell進行分組,這種方式的分組可能會因爲編譯器而丟失字段的類型信息,所以Flink也支持把字段包裝成Tuple,基於元素位置進行分組。固然也支持使用KeySelector函數,自定義分組規則。

rides
    .flatMap(new NYCEnrichment())
    .keyBy(
        new KeySelector<EnrichedRide, int>() {

            @Override
            public int getKey(EnrichedRide enrichedRide) throws Exception {
                return enrichedRide.startCell;
            }
        })

能夠直接使用lambda表達式:

rides
    .flatMap(new NYCEnrichment())
    .keyBy(enrichedRide -> enrichedRide.startCell)

key能夠自定義計算規則

keyselector不限制從必須從事件中抽取key,也能夠自定義任何計算key的方法。但須要保證輸出的key是一致的,而且實現了對應的hashCode和equals方法。生成key的規則必定要穩定,由於生成key可能在應用運行的任什麼時候間,所以必定要保證key生成規則的持續穩定。

key能夠經過某個字段選擇:

keyBy(enrichedRide -> enrichedRide.startCell)

也能夠直接替換成某個方法:

keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
 

Keyed Stream的聚合

下面的例子中,建立了一個包含startCell和花費時間的二元組:

import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });
 

如今須要輸出每一個起始位置最長距離的路線,有不少種方式能夠實現。以上面的數據爲例,能夠經過startcell進行聚合,而後選擇時間最大的元素輸出:

minutesByStartCell
  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
  .maxBy(1) // duration
  .print();
 

能夠獲得輸出結果:

4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)
 

狀態

上面是一個有狀態的例子,Flink須要記錄每一個key的最大值。不管什麼時候在應用中涉及到狀態,都須要考慮這個狀態有多大。若是key的空間是無限大的,那麼flink可能須要維護大量的狀態信息。當使用流時,必定要對無限窗口的聚合十分敏感,由於它是對整個流進行操做,頗有可能由於維護的狀態信息不斷膨脹,而致使內存溢出。在上面使用的maxBy就是經典的的聚合操做,也可使用更通用的reduce來自定義聚合方法。

3 有狀態的操做

Flink針對狀態的管理有不少易用的特性,好比:

  • 支持本地保存:基於進程內存來保存狀態
  • 狀態的持久化:按期保存到檢查點,保證容錯
  • 垂直擴展:Flink狀態能夠把狀態保存到RocksDB中,也支持擴展到本地磁盤
  • 水平擴展:狀態支持在集羣中擴縮容,經過調整並行度,自動拆分狀態
  • 可查詢:Flink的狀態能夠在外部直接查詢

Rich函數

Flink有幾種函數接口,包括FilterFunction, MapFunction,FlatMapFunction等。對於每一個接口,Flink都提供了對應的Rich方法。好比RichFlatMapFunction,提供了額外的一些方法:

  • open(Configuration c) 在初始化的時候調用一次,用於加載靜態數據,開啓外部服務的鏈接等
  • close() 流關閉時調用
  • getRuntimeContext() 提供進入全局狀態的方法,須要瞭解如何建立和查詢狀態

使用Keyed State的例子

下面是一個針對事件的key進行去重的例子:

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();

    env.execute();
}
 

爲了實現這個功能,deduplicator須要記住一些信息,對於每一個key,都須要記錄是否已經存在。Flink支持幾種不一樣類型的狀態,最簡單的一種是valueState。對於每一個key,flink都爲它保存一個對象,在上面的例子中對象是Boolean。Deduplicator有兩個方法:open()和flatMap()。open方法經過descriptor爲狀態起了一個標識名稱,並聲明類型爲Boolean。

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}
 

flatMap中調用state.value()獲取狀態。flink在上下文中爲每一個key保存了一個狀態值,只有當值爲null時,說明這個key以前沒有出現過,而後將其更新爲true。當flink調用open時,狀態是空的。可是當調用flatMap時,key能夠經過context進行訪問。當在集羣模式中運行時,會有不少個Deduplicator實例,每一個負責維護一部分key的事件。所以,當使用單個事件的valuestate時,要理解它背後其實不是一個值,而是每一個key都對應一個狀態值,而且分佈式的存儲在集羣中的各個節點進程上。

清除狀態

有時候key的空間多是無限制的,flink會爲每一個key存儲一個boolean對象。若是key的數量是有限的還好,可是應用每每是持續不間斷的運行,那麼key可能會無限增加,所以須要清理再也不使用的key。能夠經過state.clear()進行清理。好比針對某個key按照某一時間頻率進行清理,在processFunction中能夠了解到如何在事件驅動的應用中執行定時器操做。也能夠在狀態描述符中爲狀態設置TTL生存時間,這樣狀態能夠自動進行清理。

非keyed狀態

狀態也支持在非key類型的上下文中使用,這種叫作操做符狀態,operator state。典型的場景是Flink讀取Kafka時記錄的offset信息。

4 鏈接流

大部分場景中Flink都是接收一個數據流輸出一個數據流,相似管道式的處理數據:

也有的場景須要動態的修改函數中的信息,好比閾值、規則或者其餘的參數,這種設計叫作connected streams,流會擁有兩個輸入,相似:

在下面的例子中,經過控制流用來指定必須過濾的單詞:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);

    control
        .connect(datastreamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}
 

兩個流能夠經過key的方式鏈接,keyby用來分組數據,這樣保證相同類型的數據能夠進入到相同的實例中。上面的例子兩個流都是字符串,

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;

    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }

    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }

    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}
 

blocked用於記錄key的控制邏輯,key的state會在兩個流間共享。flatMap1和flatMap2會被兩個流調用,分別用來更新和獲取狀態,從而實現經過一個流控制另外一個流的目的。

總結:本片從狀態上講述了有狀態的操做和無狀態的操做,還介紹了狀態的使用以及鏈接流的適用場景。後面會介紹DataStream的操做和狀態的管理。

相關文章
相關標籤/搜索