如何從 0 到 1 開發 PyFlink API 做業

簡介: 以 Flink 1.12 爲例,介紹如何使用 Python 語言,經過 PyFlink API 來開發 Flink 做業。html

Apache Flink 做爲當前最流行的流批統一的計算引擎,在實時 ETL、事件處理、數據分析、CEP、實時機器學習等領域都有着普遍的應用。從 Flink 1.9 開始,Apache Flink 社區開始在原有的 Java、Scala、SQL 等編程語言的基礎之上,提供對於 Python 語言的支持。通過 Flink 1.9 ~ 1.12 以及即將發佈的 1.13 版本的多個版本的開發,目前 PyFlink API 的功能已經日趨完善,能夠知足絕大多數狀況下 Python 用戶的需求。接下來,咱們以 Flink 1.12 爲例,介紹如何使用 Python 語言,經過 PyFlink API 來開發 Flink 做業。內容包括:java

環境準備
做業開發
做業提交
問題排查
總結python

環境準備

第一步:安裝 Python

PyFlink 僅支持 Python 3.5+,您首先須要確認您的開發環境是否已安裝了 Python 3.5+,若是沒有的話,首先須要安裝 Python 3.5+。git

第二步:安裝 JDK

咱們知道 Flink 的運行時是使用 Java 語言開發的,因此爲了執行 Flink 做業,您還須要安裝 JDK。Flink 提供了對於 JDK 8 以及 JDK 11 的全面支持,您須要確認您的開發環境中是否已經安裝了上述版本的 JDK,若是沒有的話,首先須要安裝 JDK。github

第三步:安裝 PyFlink

接下來須要安裝 PyFlink,能夠經過如下命令進行安裝:sql

# 建立 Python 虛擬環境
python3 -m pip install virtualenv
virtualenv -p `which python3` venv

# 使用上述建立的 Python 虛擬環境
./venv/bin/activate

# 安裝 PyFlink 1.12
python3 -m pip install apache-flink==1.12.2

做業開發

PyFlink Table API 做業

咱們首先介紹一下如何開發 PyFlink Table API 做業。express

1)建立 TableEnvironment 對象
對於 Table API 做業來講,用戶首先須要建立一個 TableEnvironment 對象。如下示例定義了一個 TableEnvironment 對象,使用該對象的定義的做業,運行在流模式,且使用 blink planner 執行。apache

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

■ 2)配置做業的執行參數
能夠經過如下方式,配置做業的執行參數。如下示例將做業的默認併發度設置爲4。編程

t_env.get_config().get_configuration().set_string('parallelism.default', '4')

■ 3)建立數據源表
接下來,須要爲做業建立一個數據源表。PyFlink 中提供了多種方式來定義數據源表。bootstrap

方式一:from_elements

PyFlink 支持用戶從一個給定列表,建立源表。如下示例定義了包含了 3 行數據的表:[("hello", 1), ("world", 2), ("flink", 3)],該表有 2 列,列名分別爲 a 和 b,類型分別爲 VARCHAR 和 BIGINT。

tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])

說明:

這種方式一般用於測試階段,能夠快速地建立一個數據源表,驗證做業邏輯
from_elements 方法能夠接收多個參數,其中第一個參數用於指定數據列表,列表中的每個元素必須爲 tuple 類型;第二個參數用於指定表的 schema

方式二:DDL

除此以外,數據也能夠來自於一個外部的數據源。如下示例定義了一個名字爲my_source,類型爲 datagen 的表,表中有兩個類型爲 VARCHAR 的字段。

t_env.execute_sql("""
        CREATE TABLE my_source (
          a VARCHAR,
          b VARCHAR
        ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '10'
        )
    """)

tab = t_env.from_path('my_source')

說明:

經過 DDL 的方式來定義數據源表是目前最推薦的方式,且全部 Java Table API & SQL 中支持的 connector,均可以經過 DDL 的方式,在 PyFlink Table API 做業中使用,詳細的 connector 列表請參見 Flink 官方文檔 [1]。
當前僅有部分 connector 的實現包含在 Flink 官方提供的發行包中,好比 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的實現當前沒有包含在 Flink 官方提供的發行包中,好比 Kafka、ES 等。針對沒有包含在 Flink 官方提供的發行包中的 connector,若是須要在 PyFlink 做業中使用,用戶須要顯式地指定相應 FAT JAR,好比針對 Kafka,須要使用 JAR 包 [2],JAR 包能夠經過以下方式指定:
注意:file:///前綴不能省略

t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")

方式三:catalog

hive_catalog = HiveCatalog("hive_catalog")
t_env.register_catalog("hive_catalog", hive_catalog)
t_env.use_catalog("hive_catalog")

# 假設hive catalog中已經定義了一個名字爲source_table的表
tab = t_env.from_path('source_table')

這種方式和 DDL 的方式相似,只不過表的定義事先已經註冊到了 catalog 中了,不須要在做業中從新再定義一遍了。

■ 4)定義做業的計算邏輯

方式一:經過 Table API

獲得 source 表以後,接下來就可使用 Table API 中提供的各類操做,定義做業的計算邏輯,對錶進行各類變換了,好比:

@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
   return s[begin:end]

transformed_tab = tab.select(sub_string(col('a'), 2, 4))

方式二:經過 SQL 語句

除了可使用 Table API 中提供的各類操做以外,也能夠直接經過 SQL 語句來對錶進行變換,好比上述邏輯,也能夠經過 SQL 語句來實現:

t_env.create_temporary_function("sub_string", sub_string)
transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)
說明:

TableEnvironment 中提供了多種方式用於執行 SQL 語句,其用途略有不一樣:
image.png

■ 5)查看執行計劃
用戶在開發或者調試做業的過程當中,可能須要查看做業的執行計劃,能夠經過以下方式。

方式一:Table.explain

好比,當咱們須要知道 transformed_tab 當前的執行計劃時,能夠執行:print(transformed_tab.explain()),能夠獲得以下輸出:

== Abstract Syntax Tree ==
LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])

== Optimized Logical Plan ==
PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: PythonInputFormatTableSource(a)

    Stage 2 : Operator
        content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a])
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : StreamExecPythonCalc
            ship_strategy : FORWARD

方式二:TableEnvironment.explain_sql

方式一適用於查看某一個 table 的執行計劃,有時候並無一個現成的 table 對象可用,好比:

print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))

其執行計劃以下所示:

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
+- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
   +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
+- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: PythonInputFormatTableSource(a)

    Stage 2 : Operator
        content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a])
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : StreamExecPythonCalc
            ship_strategy : FORWARD

            Stage 4 : Data Sink
                content : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
                ship_strategy : FORWARD

■ 6)寫出結果數據

方式一:經過 DDL

和建立數據源表相似,也能夠經過 DDL 的方式來建立結果表。

t_env.execute_sql("""
        CREATE TABLE my_sink (
          `sum` VARCHAR
        ) WITH (
          'connector' = 'print'
        )
    """)

table_result = transformed_tab.execute_insert('my_sink')

說明:

當使用 print 做爲 sink 時,做業結果會打印到標準輸出中。若是不須要查看輸出,也可使用 blackhole 做爲 sink。
方式二:collect

也能夠經過 collect 方法,將 table 的結果收集到客戶端,並逐條查看。

table_result = transformed_tab.execute()
with table_result.collect() as results:
    for result in results:
        print(result)

說明:

該方式能夠方便地將 table 的結果收集到客戶端並查看
因爲數據最終會收集到客戶端,因此最好限制一下數據條數,好比:
transformed_tab.limit(10).execute(),限制只收集 10 條數據到客戶端

方式三:to_pandas

也能夠經過 to_pandas 方法,將 table 的結果轉換成 pandas.DataFrame 並查看。

result = transformed_tab.to_pandas()
print(result)
能夠看到以下輸出:

_c0
0  32
1  e6
2  8b
3  be
4  4f
5  b4
6  a6
7  49
8  35
9  6b

說明:

該方式與 collect 相似,也會將 table 的結果收集到客戶端,因此最好限制一下結果數據的條數

■ 7)總結

完整的做業示例以下:

from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf


def table_api_demo():
    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    t_env = StreamTableEnvironment.create(environment_settings=env_settings)
    t_env.get_config().get_configuration().set_string('parallelism.default', '4')

    t_env.execute_sql("""
            CREATE TABLE my_source (
              a VARCHAR,
              b VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'number-of-rows' = '10'
            )
        """)

    tab = t_env.from_path('my_source')

    @udf(result_type=DataTypes.STRING())
    def sub_string(s: str, begin: int, end: int):
        return s[begin:end]

    transformed_tab = tab.select(sub_string(col('a'), 2, 4))

    t_env.execute_sql("""
            CREATE TABLE my_sink (
              `sum` VARCHAR
            ) WITH (
              'connector' = 'print'
            )
        """)

    table_result = transformed_tab.execute_insert('my_sink')
# 1)等待做業執行結束,用於local執行,不然可能做業還沒有執行結束,該腳本已退出,會致使minicluster過早退出
# 2)看成業經過detach模式往remote集羣提交時,好比YARN/Standalone/K8s等,須要移除該方法
table_result.wait()


if __name__ == '__main__':
    table_api_demo()

執行結果以下:

4> +I(a1)
3> +I(b0)
2> +I(b1)
1> +I(37)
3> +I(74)
4> +I(3d)
1> +I(07)
2> +I(f4)
1> +I(7f)
2> +I(da)

PyFlink DataStream API 做業

■ 1)建立 StreamExecutionEnvironment 對象

對於 DataStream API 做業來講,用戶首先須要定義一個 StreamExecutionEnvironment 對象。

env = StreamExecutionEnvironment.get_execution_environment()
■ 2)配置做業的執行參數

能夠經過如下方式,配置做業的執行參數。如下示例將做業的默認併發度設置爲4。

env.set_parallelism(4)

■ 3)建立數據源

接下來,須要爲做業建立一個數據源。PyFlink 中提供了多種方式來定義數據源。

方式一:from_collection

PyFlink 支持用戶從一個列表建立源表。如下示例定義了包含了 3 行數據的表:[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],該表有 2 列,列名分別爲 a 和 b,類型分別爲 VARCHAR 和 BIGINT。

ds = env.from_collection(

collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

說明:

這種方式一般用於測試階段,能夠方便地建立一個數據源
from_collection 方法能夠接收兩個參數,其中第一個參數用於指定數據列表;第二個參數用於指定數據的類型
方式二:使用 PyFlink DataStream API 中定義的 connector

此外,也可使用 PyFlink DataStream API 中已經支持的 connector,須要注意的是,1.12 中僅提供了 Kafka connector 的支持。

deserialization_schema = JsonRowDeserializationSchema.builder() \

.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_consumer = FlinkKafkaConsumer(

topics='test_source_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds = env.add_source(kafka_consumer)
說明:

Kafka connector 當前沒有包含在 Flink 官方提供的發行包中,若是須要在PyFlink 做業中使用,用戶須要顯式地指定相應 FAT JAR [2],JAR 包能夠經過以下方式指定:

注意:file:///前綴不能省略

env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
即便是 PyFlink DataStream API 做業,也推薦使用 Table & SQL connector 中打包出來的 FAT JAR,能夠避免遞歸依賴的問題。

方式三:使用 PyFlink Table API 中定義的 connector

如下示例定義瞭如何將 Table & SQL 中支持的 connector 用於 PyFlink DataStream API 做業。

t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""

CREATE TABLE my_source (
      a INT,
      b VARCHAR
    ) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '10'
    )
""")

ds = t_env.to_append_stream(

t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))

說明:

因爲當前 PyFlink DataStream API 中 built-in 支持的 connector 種類還比較少,推薦經過這種方式來建立 PyFlink DataStream API 做業中使用的數據源表,這樣的話,全部 PyFlink Table API 中可使用的 connector,均可以在 PyFlink DataStream API 做業中使用。
須要注意的是,TableEnvironment 須要經過如下方式建立 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 與 PyFlink Table API 共享同一個 StreamExecutionEnvironment 對象。
■ 4)定義計算邏輯
生成數據源對應的 DataStream 對象以後,接下來就可使用 PyFlink DataStream API 中定義的各類操做,定義計算邏輯,對 DataStream 對象進行變換了,好比:

def split(s):

splits = s[1].split("|")
for sp in splits:
   yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \

.flat_map(split) \
   .key_by(lambda i: i[1]) \
   .reduce(lambda i, j: (i[0] + j[0], i[1]))

■ 5)寫出結果數據
方式一:print

能夠調用 DataStream 對象上的 print 方法,將 DataStream 的結果打印到標準輸出中,好比:

ds.print()
方式二:使用 PyFlink DataStream API 中定義的 connector

能夠直接使用 PyFlink DataStream API 中已經支持的 connector,須要注意的是,1.12 中提供了對於 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 爲例:

serialization_schema = JsonRowSerializationSchema.builder() \

.with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_producer = FlinkKafkaProducer(

topic='test_sink_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds.add_sink(kafka_producer)
說明:

JDBC、Kafka connector 當前沒有包含在 Flink 官方提供的發行包中,若是須要在 PyFlink 做業中使用,用戶須要顯式地指定相應 FAT JAR,好比 Kafka connector 可使用 JAR 包 [2],JAR 包能夠經過以下方式指定:

注意:file:///前綴不能省略

env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
推薦使用 Table & SQL connector 中打包出來的 FAT JAR,能夠避免遞歸依賴的問題。
方式三:使用 PyFlink Table API 中定義的 connector

如下示例展現瞭如何將 Table & SQL 中支持的 connector,用做 PyFlink DataStream API 做業的 sink。

寫法一:ds類型爲Types.ROW
def split(s):

splits = s[1].split("|")
for sp in splits:
    yield Row(s[0], sp)

ds = ds.map(lambda i: (i[0] + 1, i[1])) \

.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
   .key_by(lambda i: i[1]) \
   .reduce(lambda i, j: Row(i[0] + j[0], i[1]))

寫法二:ds類型爲Types.TUPLE
def split(s):

splits = s[1].split("|")
for sp in splits:
    yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \

.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
   .key_by(lambda i: i[1]) \
   .reduce(lambda i, j: (i[0] + j[0], i[1]))

將ds寫出到sink

t_env.execute_sql("""

CREATE TABLE my_sink (
      a INT,
      b VARCHAR
    ) WITH (
      'connector' = 'print'
    )
""")

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")
說明:

須要注意的是,t_env.from_data_stream(ds) 中的 ds 對象的 result type 類型必須是複合類型 Types.ROW 或者 Types.TUPLE,這也就是爲何須要顯式聲明做業計算邏輯中 flat_map 操做的 result 類型
做業的提交,須要經過 PyFlink Table API 中提供的做業提交方式進行提交
因爲當前 PyFlink DataStream API 中支持的 connector 種類還比較少,推薦經過這種方式來定義 PyFlink DataStream API 做業中使用的數據源表,這樣的話,全部 PyFlink Table API 中可使用的 connector,均可以做爲 PyFlink DataStream API 做業的 sink。
■ 7)總結
完整的做業示例以下:

方式一(適合調試):

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def data_stream_api_demo():

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

ds = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: (i[0] + j[0], i[1]))

ds.print()

env.execute()

if name == '__main__':

data_stream_api_demo()

執行結果以下:

3> (2, 'aaa')
3> (2, 'bb')
3> (6, 'aaa')
3> (4, 'a')
3> (5, 'bb')
3> (7, 'a')

方式二(適合線上做業):

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

def data_stream_api_demo():

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
env.set_parallelism(4)

t_env.execute_sql("""
        CREATE TABLE my_source (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '10'
        )
    """)

ds = t_env.to_append_stream(
    t_env.from_path('my_source'),
    Types.ROW([Types.INT(), Types.STRING()]))

def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: (i[0] + j[0], i[1]))

t_env.execute_sql("""
        CREATE TABLE my_sink (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'print'
        )
    """)

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

# 1)等待做業執行結束,用於local執行,不然可能做業還沒有執行結束,該腳本已退出,會致使minicluster過早退出
# 2)看成業經過detach模式往remote集羣提交時,好比YARN/Standalone/K8s等,須要移除該方法
table_result.wait()

if name == '__main__':

data_stream_api_demo()

做業提交
Flink 提供了多種做業部署方式,好比 local、standalone、YARN、K8s 等,PyFlink 也支持上述做業部署方式,請參考 Flink 官方文檔 [3],瞭解更多詳細信息。

local
說明:使用該方式執行做業時,會啓動一個 minicluster,做業會提交到minicluster 中執行,該方式適合做業開發階段。

示例:python3 table_api_demo.py

standalone
說明:使用該方式執行做業時,做業會提交到一個遠端的 standalone 集羣。

示例:

./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py

YARN Per-Job
說明:使用該方式執行做業時,做業會提交到一個遠端的 YARN 集羣。

示例:

./bin/flink run --target yarn-per-job --python table_api_demo.py

K8s application mode
說明:使用該方式執行做業時,做業會提交到 K8s 集羣,以 application mode 的方式執行。

示例:

./bin/flink run-application \

--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=<PyFlinkImageName> \
--pyModule table_api_demo \

--pyFiles file:///path/to/table_api_demo.py

參數說明

除了上面提到的參數以外,經過 flink run 提交的時候,還有其它一些和 PyFlink 做業相關的參數。

image.png

問題排查

當咱們剛剛上手 PyFlink 做業開發的時候,不免會遇到各類各樣的問題,學會如何排查問題是很是重要的。接下來,咱們介紹一些常見的問題排查手段。

client 端異常輸出

PyFlink 做業也遵循 Flink 做業的提交方式,做業首先會在 client 端編譯成 JobGraph,而後提交到 Flink 集羣執行。若是做業編譯有問題,會致使在 client 端提交做業的時候就拋出異常,此時能夠在 client 端看到相似這樣的輸出:

Traceback (most recent call last):
File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>

data_stream_api_demo()

File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo

table_result = table.execute_insert("my_")

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert

return TableResult(self._j_table.executeInsert(table_path, overwrite))

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in call

return_value = get_return_value(

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco

raise java_exception

pyflink.util.exceptions.TableException: Sink default_catalog.default_database.my_ does not exists

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
 at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
 at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
 at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1
好比上述報錯說明做業中使用的名字爲"my_"的表不存在。

TaskManager 日誌文件
有些錯誤直到做業運行的過程當中纔會發生,好比髒數據或者 Python 自定義函數的實現問題等,針對這種錯誤,一般須要查看 TaskManager 的日誌文件,好比如下錯誤反映用戶在 Python 自定義函數中訪問的 opencv 庫不存在。

Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute

response = task()

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>

lambda: self.create_worker().do_instruction(request), request)

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction

return getattr(self, request_type)(

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle

bundle_processor.process_bundle(instruction_id))

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle

input_op_by_transform_id[element.transform_id].process_encoded(

File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded

self.output(decoded_value)

File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream
File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split

import cv2

ModuleNotFoundError: No module named 'cv2'

at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

說明:

local 模式下,TaskManager 的 log 位於 PyFlink 的安裝目錄下:site-packages/pyflink/log/,也能夠經過以下命令找到:
\>>> import pyflink

\>>> print(pyflink.__path__)
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],則log文件位於/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目錄下

自定義日誌
有時候,異常日誌的內容並不足以幫助咱們定位問題,此時能夠考慮在 Python 自定義函數中打印一些日誌信息。PyFlink 支持用戶在 Python 自定義函數中經過 logging 的方式輸出 log,好比:

def split(s):

import logging
logging.info("s: " + str(s))
splits = s[1].split("|")
for sp in splits:
    yield s[0], sp

經過上述方式,split 函數的輸入參數,會打印到 TaskManager 的日誌文件中。

遠程調試
PyFlink 做業,在運行過程當中,會啓動一個獨立的 Python 進程執行 Python 自定義函數,因此若是須要調試 Python 自定義函數,須要經過遠程調試的方式進行,能夠參見[4],瞭解如何在 Pycharm 中進行 Python 遠程調試。

1)在 Python 環境中安裝 pydevd-pycharm:

pip install pydevd-pycharm~=203.7717.65

2)在 Python 自定義函數中設置遠程調試參數:

def split(s):

import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
splits = s[1].split("|")
for sp in splits:
    yield s[0], sp

3)按照 Pycharm 中遠程調試的步驟,進行操做便可,能夠參見[4],也能夠參考博客[5]中「代碼調試」部分的介紹。

說明:Python 遠程調試功能只在 Pycharm 的 professional 版才支持。

社區用戶郵件列表
若是經過以上步驟以後,問題還未解決,也能夠訂閱 Flink 用戶郵件列表 [6],將問題發送到 Flink 用戶郵件列表。須要注意的是,將問題發送到郵件列表時,儘可能將問題描述清楚,最好有可復現的代碼及數據,能夠參考一下這個郵件[7]。

總結

在這篇文章中,咱們主要介紹了 PyFlink API 做業的環境準備、做業開發、做業提交、問題排查等方面的信息,但願能夠幫助用戶使用 Python 語言快速構建一個 Flink 做業,但願對你們有所幫助。接下來,咱們會繼續推出 PyFlink 系列文章,幫助 PyFlink 用戶深刻了解 PyFlink 中各類功能、應用場景、最佳實踐等。
引用連接
[1] https://ci.apache.org/project...

[2] https://repo.maven.apache.org...

[3] https://ci.apache.org/project...

[4] https://www.jetbrains.com/hel...

[5] https://mp.weixin.qq.com/s?__...

[6] https://flink.apache.org/comm...

[7] http://apache-flink-user-mail...
原文連接本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索