PyFlink 教程:PyFlink DataStream API - state & timer

簡介:介紹如何在 Python DataStream API 中使用 state & timer 功能。

1、背景

Flink 1.13 已於近期正式發佈,超過 200 名貢獻者參與了 Flink 1.13 的開發,提交了超過 1000 個 commits,完成了若干重要功能。其中,PyFlink 模塊在該版本中也新增了若干重要功能,好比支持了 state、自定義 window、row-based operation 等。隨着這些功能的引入,PyFlink 功能已經日趨完善,用戶可使用 Python 語言完成絕大多數類型Flink做業的開發。接下來,咱們詳細介紹如何在 Python DataStream API 中使用 state & timer 功能。python

2、state 功能介紹

做爲流計算引擎,state 是 Flink 中最核心的功能之一。sql

  • 在 1.12 中,Python DataStream API 尚不支持 state,用戶使用 Python DataStream API 只能實現一些簡單的、不須要使用 state 的應用;
  • 而在 1.13 中,Python DataStream API 支持了此項重要功能。

state 使用示例

以下是一個簡單的示例,說明如何在 Python DataStream API 做業中使用 state:編程

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.LONG())
        # 定義value state
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        new_cnt = cnt + 1
        self.cnt_state.update(new_cnt)
        return value[0], new_cnt


def state_access_demo():
    # 1. 建立 StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. 建立數據源
    seq_num_source = NumberSequenceSource(1, 100)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. 定義執行邏輯
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. 將打印結果數據
    ds.print()

    # 5. 執行做業
    env.execute()


if __name__ == '__main__':
    state_access_demo()

在上面的例子中,咱們定義了一個 MapFunction,該 MapFunction 中定義了一個名字爲 「cnt\_state」 的 ValueState,用於記錄每個 key 出現的次數。緩存

說明:架構

  • 除了 ValueState 以外,Python DataStream API 還支持 ListState、MapState、ReducingState,以及 AggregatingState;
  • 定義 state 的 StateDescriptor 時,須要聲明 state 中所存儲的數據的類型(TypeInformation)。另外須要注意的是,當前 TypeInformation 字段並未被使用,默認使用 pickle 進行序列化,所以建議將 TypeInformation 字段定義爲 Types.PICKLED\_BYTE\_ARRAY() 類型,與實際所使用的序列化器相匹配。這樣的話,當後續版本支持使用 TypeInformation 以後,能夠保持後向兼容性;
  • state 除了能夠在 KeyedStream 的 map 操做中使用,還能夠在其它操做中使用;除此以外,還能夠在鏈接流中使用 state,好比:
ds1 = ...  # type DataStream
ds2 = ...  # type DataStream
ds1.connect(ds2) \
    .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \
    .map(MyCoMapFunction())  # 能夠在MyCoMapFunction中使用state

可使用 state 的 API 列表以下:app

操做 自定義函數
KeyedStream map MapFunction
flat\_map FlatMapFunction
reduce ReduceFunction
filter FilterFunction
process KeyedProcessFunction
ConnectedStreams map CoMapFunction
flat\_map CoFlatMapFunction
process KeyedCoProcessFunction
WindowedStream apply WindowFunction
process ProcessWindowFunction

state 工做原理

img

上圖是 PyFlink 中,state 工做原理的架構圖。從圖中咱們能夠看出,Python 自定義函數運行在 Python worker 進程中,而 state backend 運行在 JVM 進程中(由 Java 算子來管理)。當 Python 自定義函數須要訪問 state 時,會經過遠程調用的方式,訪問 state backend。機器學習

咱們知道,遠程調用的開銷是很是大的,爲了提高 state 讀寫的性能,PyFlink 針對 state 讀寫作了如下幾個方面的優化工做:異步

  • Lazy Read:編程語言

    對於包含多個 entry 的 state,好比 MapState,當遍歷 state 時,state 數據並不會一次性所有讀取到 Python worker 中,只有當真正須要訪問時,才從 state backend 讀取。函數

  • Async Write:

    當更新 state 時,更新後的 state,會先存儲在 LRU cache 中,並不會同步地更新到遠端的 state backend,這樣作能夠避免每次 state 更新操做都訪問遠端的 state backend;同時,針對同一個 key 的屢次更新操做,能夠合併執行,儘可能避免無效的 state 更新。

  • LRU cache:

    在 Python worker 進程中維護了 state 讀寫的 cache。當讀取某個 key 時,會先查看其是否已經被加載到讀 cache 中;當更新某個 key 時,會先將其存放到寫 cache 中。針對頻繁讀寫的 key,LRU cache 能夠避免每次讀寫操做,都訪問遠端的 state backend,對於有熱點 key 的場景,能夠極大提高 state 讀寫性能。

  • Flush on Checkpoint:

    爲了保證 checkpoint 語義的正確性,當 Java 算子須要執行 checkpoint時,會將 Python worker中的寫 cache 都 flush 回 state backend。

其中 LRU cache 能夠細分爲二級,以下圖所示:

img

說明:

  • 二級 cache 爲 global cache,二級 cache 中的讀 cache 中存儲着當前 Python worker 進程中全部緩存的原始 state 數據(未反序列化);二級 cache 中的寫 cache 中存儲着當前 Python worker 進程中全部建立的 state 對象。
  • 一級 cache 位於每個 state 對象內,在 state 對象中緩存着該 state 對象已經從遠端的 state backend 讀取的 state 數據以及待更新回遠端的 state backend 的 state 數據。

工做流程:

  • 當在 Python UDF 中,建立一個 state 對象時,首先會查看當前 key 所對應的 state 對象是否已經存在(在二級 cache 中的 「Global Write Cache」 中查找),若是存在,則返回對應的 state 對象;若是不存在,則建立新的 state 對象,並存入 「Global Write Cache」;
  • state 讀取:當在 Python UDF 中,讀取 state 對象時,若是待讀取的 state 數據已經存在(一級 cache),好比對於 MapState,待讀取的 map key/map value 已經存在,則直接返回對應的 map key/map value;不然,訪問二級 cache,若是二級 cache 中也不存在待讀取的 state 數據,則從遠端的 state backend 讀取;
  • state 寫入:當在 Python UDF 中,更新 state 對象時,先寫到 state 對象內部的寫 cache 中(一級 cache);當 state 對象中待寫回 state backend 的 state 數據的大小超過指定閾值或者當遇到 checkpoint 時,將待寫回的 state 數據寫回遠端的 state backend。

state 性能調優

經過前一節的介紹,咱們知道 PyFlink 使用了多種優化手段,用於提高 state 讀寫的性能,這些優化行爲能夠經過如下參數配置:

配置 說明
python.state.cache-size Python worker 中讀 cache 以及寫 cache 的大小。(二級 cache)須要注意的是:讀 cache、寫 cache是獨立的,當前不支持分別配置讀 cache 以及寫 cache 的大小。
python.map-state.iterate-response-batch-size 當遍歷 MapState 時,每次從 state backend 讀取並返回給 Python worker 的 entry 的最大個數。
python.map-state.read-cache-size 一個 MapState 的讀 cache 中最大容許的 entry 個數(一級 cache)。當一個 MapState 中,讀 cache 中的 entry 個數超過該閾值時,會經過 LRU 策略從讀 cache 中刪除最近最少訪問過的 entry。
python.map-state.write-cache-size 一個 MapState 的寫 cache 中最大容許的待更新 entry 的個數(一級 cache)。當一個 MapState 中,寫 cache 中待更新的 entry 的個數超過該閾值時,會將該 MapState 下全部待更新 state 數據寫回遠端的 state backend。

須要注意的是,state 讀寫的性能不只取決於以上參數,還受其它因素的影響,好比:

  • 輸入數據中 key 的分佈:

    輸入數據的 key 越分散,讀 cache 命中的機率越低,則性能越差。

  • Python UDF 中 state 讀寫次數:

    state 讀寫可能涉及到讀寫遠端的 state backend,應該儘可能優化 Python UDF 的實現,減小沒必要要的 state 讀寫。

  • checkpoint interval:

    爲了保證 checkpoint 語義的正確性,當遇到 checkpoint 時,Python worker 會將全部緩存的待更新 state 數據,寫回 state backend。若是配置的 checkpoint interval 太小,則可能並不能有效減小 Python worker 寫回 state backend 的數據量。

  • bundle size / bundle time:

    當前 Python 算子會將輸入數據劃分紅多個批次,發送給 Python worker 執行。當一個批次的數據處理完以後,會強制將 Python worker 進程中的待更新 state 寫回 state backend。與 checkpoint interval 相似,該行爲也可能會影響 state 寫性能。批次的大小能夠經過 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 參數控制。

3、timer 功能介紹

timer 使用示例

除了 state 以外,用戶還能夠在 Python DataStream API 中使用定時器 timer。

import datetime

from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


class CountWithTimeoutFunction(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(ValueStateDescriptor(
            "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = Row(value.f1, 0, 0)

        # update the state's count
        current[1] += 1

        # set the state's timestamp to the record's assigned event time timestamp
        current[2] = ctx.timestamp()

        # write the state back
        self.state.update(current)

        # schedule the next timer 60 seconds from the current event time
        ctx.timer_service().register_event_time_timer(current[2] + 60000)

    def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
        # get the state for the key that scheduled the timer
        result = self.state.value()

        # check if this is an outdated timer or the latest timer
        if timestamp == result[2] + 60000:
            # emit the state on timeout
            yield result[0], result[1]


class MyTimestampAssigner(TimestampAssigner):

    def __init__(self):
        self.epoch = datetime.datetime.utcfromtimestamp(0)

    def extract_timestamp(self, value, record_timestamp) -> int:
        return int((value[0] - self.epoch).total_seconds() * 1000)


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)

    t_env.execute_sql("""
            CREATE TABLE my_source (
              a TIMESTAMP(3),
              b VARCHAR,
              c VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '10'
            )
        """)

    stream = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
    watermarked_stream = stream.assign_timestamps_and_watermarks(
        WatermarkStrategy.for_monotonous_timestamps()
                         .with_timestamp_assigner(MyTimestampAssigner()))

    # apply the process function onto a keyed stream
    watermarked_stream.key_by(lambda value: value[1])\
        .process(CountWithTimeoutFunction()) \
        .print()

    env.execute()

在上述示例中,咱們定義了一個 KeyedProcessFunction,該 KeyedProcessFunction 記錄每個 key 出現的次數,當一個 key 超過 60 秒沒有更新時,會將該 key 以及其出現次數,發送到下游節點。

除了 event time timer 以外,用戶還可使用 processing time timer。

timer 工做原理

timer 的工做流程是這樣的:

  • 與 state 訪問使用單獨的通訊信道不一樣,當用戶註冊 timer 以後,註冊消息經過數據通道發送到 Java 算子;
  • Java 算子收到 timer 註冊消息以後,首先檢查待註冊 timer 的觸發時間,若是已經超過當前時間,則直接觸發;不然的話,將 timer 註冊到 Java 算子的 timer service 中;
  • 當 timer 觸發以後,觸發消息經過數據通道發送到 Python worker,Python worker 回調用戶 Python UDF 中的的 on\_timer 方法。

須要注意的是:因爲 timer 註冊消息以及觸發消息經過數據通道異步地在 Java 算子以及 Python worker 之間傳輸,這會形成在某些場景下,timer 的觸發可能沒有那麼及時。好比當用戶註冊了一個 processing time timer,當 timer 觸發以後,觸發消息經過數據通道傳輸到 Python UDF 時,可能已是幾秒中以後了。

4、總結

在這篇文章中,咱們主要介紹瞭如何在 Python DataStream API 做業中使用 state & timer,state & timer 的工做原理以及如何進行性能調優。接下來,咱們會繼續推出 PyFlink 系列文章,幫助 PyFlink 用戶深刻了解 PyFlink 中各類功能、應用場景以及最佳實踐等。

另外,阿里雲實時計算生態團隊長期招聘優秀大數據人才(包括實習 + 社招),咱們的工做包括:

  • 實時機器學習:支持機器學習場景下實時特徵工程和 AI 引擎配合,基於 Apache Flink 及其生態打造實時機器學習的標準,推進例如搜索、推薦、廣告、風控等場景的全面實時化;
  • 大數據 + AI 一體化:包括編程語言一體化 (PyFlink 相關工做),執行引擎集成化 (TF on Flink),工做流及管理一體化(Flink AI Flow)。

若是你對開源、大數據或者 AI 感興趣,請發簡歷到:fudian.fd@alibaba-inc.com

此外,也歡迎你們加入 「PyFlink交流羣」,交流 PyFlink 相關的問題。

本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。
相關文章
相關標籤/搜索