簡介:介紹如何在 Python DataStream API 中使用 state & timer 功能。
Flink 1.13 已於近期正式發佈,超過 200 名貢獻者參與了 Flink 1.13 的開發,提交了超過 1000 個 commits,完成了若干重要功能。其中,PyFlink 模塊在該版本中也新增了若干重要功能,好比支持了 state、自定義 window、row-based operation 等。隨着這些功能的引入,PyFlink 功能已經日趨完善,用戶可使用 Python 語言完成絕大多數類型Flink做業的開發。接下來,咱們詳細介紹如何在 Python DataStream API 中使用 state & timer 功能。python
做爲流計算引擎,state 是 Flink 中最核心的功能之一。sql
以下是一個簡單的示例,說明如何在 Python DataStream API 做業中使用 state:編程
from pyflink.common import WatermarkStrategy, Row from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import NumberSequenceSource from pyflink.datastream.functions import RuntimeContext, MapFunction from pyflink.datastream.state import ValueStateDescriptor class MyMapFunction(MapFunction): def open(self, runtime_context: RuntimeContext): state_desc = ValueStateDescriptor('cnt', Types.LONG()) # 定義value state self.cnt_state = runtime_context.get_state(state_desc) def map(self, value): cnt = self.cnt_state.value() if cnt is None: cnt = 0 new_cnt = cnt + 1 self.cnt_state.update(new_cnt) return value[0], new_cnt def state_access_demo(): # 1. 建立 StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # 2. 建立數據源 seq_num_source = NumberSequenceSource(1, 100) ds = env.from_source( source=seq_num_source, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name='seq_num_source', type_info=Types.LONG()) # 3. 定義執行邏輯 ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \ .key_by(lambda a: a[0]) \ .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()])) # 4. 將打印結果數據 ds.print() # 5. 執行做業 env.execute() if __name__ == '__main__': state_access_demo()
在上面的例子中,咱們定義了一個 MapFunction,該 MapFunction 中定義了一個名字爲 「cnt\_state」 的 ValueState,用於記錄每個 key 出現的次數。緩存
說明:架構
ds1 = ... # type DataStream ds2 = ... # type DataStream ds1.connect(ds2) \ .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \ .map(MyCoMapFunction()) # 能夠在MyCoMapFunction中使用state
可使用 state 的 API 列表以下:app
操做 | 自定義函數 | |
---|---|---|
KeyedStream | map | MapFunction |
flat\_map | FlatMapFunction | |
reduce | ReduceFunction | |
filter | FilterFunction | |
process | KeyedProcessFunction | |
ConnectedStreams | map | CoMapFunction |
flat\_map | CoFlatMapFunction | |
process | KeyedCoProcessFunction | |
WindowedStream | apply | WindowFunction |
process | ProcessWindowFunction |
上圖是 PyFlink 中,state 工做原理的架構圖。從圖中咱們能夠看出,Python 自定義函數運行在 Python worker 進程中,而 state backend 運行在 JVM 進程中(由 Java 算子來管理)。當 Python 自定義函數須要訪問 state 時,會經過遠程調用的方式,訪問 state backend。機器學習
咱們知道,遠程調用的開銷是很是大的,爲了提高 state 讀寫的性能,PyFlink 針對 state 讀寫作了如下幾個方面的優化工做:異步
Lazy Read:編程語言
對於包含多個 entry 的 state,好比 MapState,當遍歷 state 時,state 數據並不會一次性所有讀取到 Python worker 中,只有當真正須要訪問時,才從 state backend 讀取。函數
Async Write:
當更新 state 時,更新後的 state,會先存儲在 LRU cache 中,並不會同步地更新到遠端的 state backend,這樣作能夠避免每次 state 更新操做都訪問遠端的 state backend;同時,針對同一個 key 的屢次更新操做,能夠合併執行,儘可能避免無效的 state 更新。
LRU cache:
在 Python worker 進程中維護了 state 讀寫的 cache。當讀取某個 key 時,會先查看其是否已經被加載到讀 cache 中;當更新某個 key 時,會先將其存放到寫 cache 中。針對頻繁讀寫的 key,LRU cache 能夠避免每次讀寫操做,都訪問遠端的 state backend,對於有熱點 key 的場景,能夠極大提高 state 讀寫性能。
Flush on Checkpoint:
爲了保證 checkpoint 語義的正確性,當 Java 算子須要執行 checkpoint時,會將 Python worker中的寫 cache 都 flush 回 state backend。
其中 LRU cache 能夠細分爲二級,以下圖所示:
說明:
工做流程:
經過前一節的介紹,咱們知道 PyFlink 使用了多種優化手段,用於提高 state 讀寫的性能,這些優化行爲能夠經過如下參數配置:
配置 | 說明 |
---|---|
python.state.cache-size | Python worker 中讀 cache 以及寫 cache 的大小。(二級 cache)須要注意的是:讀 cache、寫 cache是獨立的,當前不支持分別配置讀 cache 以及寫 cache 的大小。 |
python.map-state.iterate-response-batch-size | 當遍歷 MapState 時,每次從 state backend 讀取並返回給 Python worker 的 entry 的最大個數。 |
python.map-state.read-cache-size | 一個 MapState 的讀 cache 中最大容許的 entry 個數(一級 cache)。當一個 MapState 中,讀 cache 中的 entry 個數超過該閾值時,會經過 LRU 策略從讀 cache 中刪除最近最少訪問過的 entry。 |
python.map-state.write-cache-size | 一個 MapState 的寫 cache 中最大容許的待更新 entry 的個數(一級 cache)。當一個 MapState 中,寫 cache 中待更新的 entry 的個數超過該閾值時,會將該 MapState 下全部待更新 state 數據寫回遠端的 state backend。 |
須要注意的是,state 讀寫的性能不只取決於以上參數,還受其它因素的影響,好比:
輸入數據中 key 的分佈:
輸入數據的 key 越分散,讀 cache 命中的機率越低,則性能越差。
Python UDF 中 state 讀寫次數:
state 讀寫可能涉及到讀寫遠端的 state backend,應該儘可能優化 Python UDF 的實現,減小沒必要要的 state 讀寫。
checkpoint interval:
爲了保證 checkpoint 語義的正確性,當遇到 checkpoint 時,Python worker 會將全部緩存的待更新 state 數據,寫回 state backend。若是配置的 checkpoint interval 太小,則可能並不能有效減小 Python worker 寫回 state backend 的數據量。
bundle size / bundle time:
當前 Python 算子會將輸入數據劃分紅多個批次,發送給 Python worker 執行。當一個批次的數據處理完以後,會強制將 Python worker 進程中的待更新 state 寫回 state backend。與 checkpoint interval 相似,該行爲也可能會影響 state 寫性能。批次的大小能夠經過 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 參數控制。
除了 state 以外,用戶還能夠在 Python DataStream API 中使用定時器 timer。
import datetime from pyflink.common import Row, WatermarkStrategy from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import ValueStateDescriptor from pyflink.table import StreamTableEnvironment class CountWithTimeoutFunction(KeyedProcessFunction): def __init__(self): self.state = None def open(self, runtime_context: RuntimeContext): self.state = runtime_context.get_state(ValueStateDescriptor( "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()]))) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): # retrieve the current count current = self.state.value() if current is None: current = Row(value.f1, 0, 0) # update the state's count current[1] += 1 # set the state's timestamp to the record's assigned event time timestamp current[2] = ctx.timestamp() # write the state back self.state.update(current) # schedule the next timer 60 seconds from the current event time ctx.timer_service().register_event_time_timer(current[2] + 60000) def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): # get the state for the key that scheduled the timer result = self.state.value() # check if this is an outdated timer or the latest timer if timestamp == result[2] + 60000: # emit the state on timeout yield result[0], result[1] class MyTimestampAssigner(TimestampAssigner): def __init__(self): self.epoch = datetime.datetime.utcfromtimestamp(0) def extract_timestamp(self, value, record_timestamp) -> int: return int((value[0] - self.epoch).total_seconds() * 1000) if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE TABLE my_source ( a TIMESTAMP(3), b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ) """) stream = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) watermarked_stream = stream.assign_timestamps_and_watermarks( WatermarkStrategy.for_monotonous_timestamps() .with_timestamp_assigner(MyTimestampAssigner())) # apply the process function onto a keyed stream watermarked_stream.key_by(lambda value: value[1])\ .process(CountWithTimeoutFunction()) \ .print() env.execute()
在上述示例中,咱們定義了一個 KeyedProcessFunction,該 KeyedProcessFunction 記錄每個 key 出現的次數,當一個 key 超過 60 秒沒有更新時,會將該 key 以及其出現次數,發送到下游節點。
除了 event time timer 以外,用戶還可使用 processing time timer。
timer 的工做流程是這樣的:
須要注意的是:因爲 timer 註冊消息以及觸發消息經過數據通道異步地在 Java 算子以及 Python worker 之間傳輸,這會形成在某些場景下,timer 的觸發可能沒有那麼及時。好比當用戶註冊了一個 processing time timer,當 timer 觸發以後,觸發消息經過數據通道傳輸到 Python UDF 時,可能已是幾秒中以後了。
在這篇文章中,咱們主要介紹瞭如何在 Python DataStream API 做業中使用 state & timer,state & timer 的工做原理以及如何進行性能調優。接下來,咱們會繼續推出 PyFlink 系列文章,幫助 PyFlink 用戶深刻了解 PyFlink 中各類功能、應用場景以及最佳實踐等。
另外,阿里雲實時計算生態團隊長期招聘優秀大數據人才(包括實習 + 社招),咱們的工做包括:
若是你對開源、大數據或者 AI 感興趣,請發簡歷到:fudian.fd@alibaba-inc.com
此外,也歡迎你們加入 「PyFlink交流羣」,交流 PyFlink 相關的問題。
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。