"Flink SQL UDF不該有狀態" 這個技術細節可能有些朋友已經知道了。可是爲何不該該有狀態呢?這個恐怕你們就不甚清楚了。本文就帶你一塊兒從這個問題點入手,看看Flink SQL到底是怎麼處理UDF,怎麼生成對應的SQL代碼。html
先說結論,後續一步步給你們詳述問題過程。java
結論是:Flink內部對SQL生成了java代碼,可是這些java代碼針對SQL作了優化,致使在某種狀況下,可能 會對 "在SQL中本應只調用一次" 的UDF 重複調用。node
好比:python
1. myFrequency 這個字段是由 UDF_FRENQUENCY 這個UDF函數 在本步驟生成。 SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount 2. 按說下面SQL語句就應該直接取出 myFrequency 便可。由於 myFrequency 已經存在了。 SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0 可是由於Flink作了一些優化,把 第一個SQL中 UDF_FRENQUENCY 的計算下推到了 第二個SQL。 3. 優化後實際就變成了相似這樣的SQL。 SELECT word, UDF_FRENQUENCY(frequency) FROM tableFrequency WHERE UDF_FRENQUENCY(frequency) <> 0 4. 因此UDF_FRENQUENCY就被執行了兩次:在WHERE中執行了一次,在SELECT中又執行了一次。
Flink針對UDF所生成的Java代碼 簡化轉義 版以下,能看出來調用了兩次:sql
// 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0" java.lang.Long result$12 = UDF_FRENQUENCY(frequency); // 此次 UDF 調用對應 WHERE myFrequency <> 0 if (result$12 != 0) { // 這裏說明 myFrequency <> 0,因而能夠進行 SELECT // 這裏對應的是 SELECT myFrequency,注意的是,按咱們通常的邏輯,應該直接複用result$12,可是這裏又調用了 UDF,從新計算了一遍。因此 UDF 纔不該該有狀態信息。 java.lang.Long result$9 = UDF_FRENQUENCY(frequency); long select; if (result$9 == null) { select = -1L; } else { select = result$9; // 這裏最終 SELECT 了 myFrequency } }
實際上就是Flink生成SQL代碼的流程,其中涉及到幾個重要的節點舉例以下:shell
關於具體SQL流程,請參見我以前的文章:[源碼分析] 帶你梳理 Flink SQL / Table API內部執行流程數據庫
// NOTE : 執行順序是從上至下, " -----> " 表示生成的實例類型 * * +-----> "SELECT xxxxx WHERE UDF_FRENQUENCY(frequency) <> 0" // (SQL statement) * | * | * +-----> LogicalFilter (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * | * FilterToCalcRule (RelOptRule) // Calcite優化rule * | * | * +-----> LogicalCalc (RelNode) // Optimized Logical Plan,邏輯執行計劃 * | * | * DataSetCalcRule (RelOptRule) // Flink定製的優化rule,轉化爲物理執行計劃 * | * | * +-----> DataSetCalc (FlinkRelNode) // Physical RelNode,物理執行計劃 * | * | * DataSetCalc.translateToPlanInternal // 做用是生成Flink算子 * | * | * +-----> FlatMapRunner (Operator) // In Flink Task * | * |
這裏的幾個關鍵點是:apache
關於FlatMap,請參見我以前的文章:[源碼分析] 從FlatMap用法到Flink的內部實現c#
咱們後文中主要就是排查SQL生成流程中哪裏出現了這個"UDF屢次調用的問題點"。api
Flink實時計算支持如下3類自定義函數
UDX分類 | 描述 |
---|---|
UDF(User Defined Function) | 用戶自定義標量值函數(User Defined Scalar Function)。其輸入與輸出是一對一的關係,即讀入一行數據,寫出一條輸出值。 |
UDAF(User Defined Aggregation Function) | 自定義聚合函數,其輸入與輸出是多對一的關係, 即將多條輸入記錄聚合成一條輸出值。能夠與SQL中的GROUP BY語句一塊兒使用。 |
UDTF(User Defined Table-valued Function) | 自定義表值函數,調用一次函數輸出多行或多列數據。 |
用戶定義的標量函數(UDF)將0個、1個或多個標量值映射到一個新的標量值。
實現一個標量函數須要繼承ScalarFunction,而且實現一個或者多個evaluation方法。標量函數的行爲就是經過evaluation方法來實現的。evaluation方法必須定義爲public,命名爲eval。evaluation方法的輸入參數類型和返回值類型決定着標量函數的輸入參數類型和返回值類型。
另外 UDF 也有open
方法和close
方法可選。咱們稍後會提到。
自定義聚合函數(UDAF)將多條記錄聚合成1條記錄。
聚合函數須要繼承AggregateFunction。聚合函數工做方式以下:
createAccumulator、getValue 和 accumulate3個方法一塊兒使用,就能設計出一個最基本的UDAF。可是實時計算一些特殊的場景須要您提供retract和merge兩個方法才能完成。
自定義表值函數(UDTF)與自定義的標量函數相似,自定義的表值函數(UDTF)將0個、1個或多個標量值做爲輸入參數(能夠是變長參數)。與標量函數不一樣,表值函數能夠返回任意數量的行做爲輸出,而不只是1個值。返回的行能夠由1個或多個列組成。
爲了自定義表函數,須要繼承TableFunction,實現一個或者多個evaluation方法。表函數的行爲定義在這些evaluation方法內部,函數名爲eval而且必須是public。
UDTF能夠經過屢次調用collect()
實現將1行的數據轉爲多行返回。
UDTF不只能夠作到1行轉多行,還能夠1列轉多列。若是您須要UDTF返回多列,只須要將返回值聲明成Tuple或Row。
RichFunction是Flink提供的一個函數類的接口,全部Flink函數類都有其Rich版本。它與常規函數的不一樣在於,能夠獲取運行環境的上下文,並擁有一些生命週期方法,因此能夠實現更復雜的功能。
這裏專門提到RichFunction,是由於Flink是把UDF作爲RichFunction的一部分來實現,即UDF就是RichFunction的成員變量function。因此open, close這兩個函數就是在RichFunction的相關同名函數中被調用,而eval函數在RichFunction的業務函數中被調用,好比下文中的function.flatMap就是調用了 UDF.eval:
override def flatMap(in: Row, out: Collector[Row]): Unit = function.flatMap(in, out)
沒有相關經驗的同窗應該能夠深刻了解RichFunction用法。
如下是咱們的示例程序,後續就講解這個程序的生成代碼。
這裏只實現了eval函數,沒有實現open, close。
import org.apache.flink.table.functions.ScalarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class myUdf extends ScalarFunction { private Long current = 0L; private static final Logger LOGGER = LoggerFactory.getLogger(myUdf.class); public Long eval(Long a) throws Exception { if(current == 0L) { current = a; } else { current += 1; } LOGGER.error("The current is : " + current ); return current; } }
import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ object TestUdf { def main(args: Array[String]): Unit = { // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) tEnv.registerFunction("UDF_FRENQUENCY", new myUdf()) // register the DataSet as a view "WordCount" tEnv.createTemporaryView("TableWordCount", input, 'word, 'frequency) val tableFrequency = tEnv.sqlQuery("SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount") tEnv.registerTable("TableFrequency", tableFrequency) // run a SQL query on the Table and retrieve the result as a new Table val table = tEnv.sqlQuery("SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0") table.toDataSet[WC].print() } case class WC(word: String, frequency: Long) }
// 輸出以下,能看到原本應該是調用三次,結果如今調用了六次 11:15:05,409 ERROR mytestpackage.myUdf - The current is : 1 11:15:05,409 ERROR mytestpackage.myUdf - The current is : 2 11:15:05,425 ERROR mytestpackage.myUdf - The current is : 3 11:15:05,425 ERROR mytestpackage.myUdf - The current is : 4 11:15:05,426 ERROR mytestpackage.myUdf - The current is : 5 11:15:05,426 ERROR mytestpackage.myUdf - The current is : 6
實例中,咱們使用了registerFunction函數,將UDF註冊到了TableEnvironment之中。
tEnv.registerFunction("UDF_FRENQUENCY", new myUdf())
TableEnvironment 是Table API和SQL集成的核心概念,它主要負責:
在Flink中,Catalog是目錄概念,即全部對數據庫和表的元數據信息都存放再Flink CataLog內部目錄結構中,其存放了flink內部全部與Table相關的元數據信息,包括表結構信息/數據源信息等。
全部UDF都是註冊在TableEnvImpl.functionCatalog 這個成員變量之中。這是專門存儲 "Table API/SQL函數定義" 的函數目錄 (Simple function catalog)。
FunctionCatalog類具備以下兩個成員變量,都是LinkedHashMap。
// FunctionCatalog,Table API/SQL function catalog public class FunctionCatalog implements FunctionLookup { private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>(); private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>(); }
tempCatalogFunctions:對應着SQL語句中的 "CREATE FUNCTION "功能,即Function DDL語法。其主要應用場景以下:
CREATE TEMPORARY FUNCTION catalog1.db1.func1 AS ‘com.xxx.udf.func1UDF’ LANGUAGE ’JVM’ DROP FUNCTION catalog1.db1.geofence
CREATE FUNCTION catalog1.db1.func2 AS ‘com.xxx.udf.func2UDF’ LANGUAGE JVM USING ‘http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar’
CREATE FUNCTION catalog1.db1.func3 AS ‘com.xxx.udf.func3UDF’ LANGUAGE ‘PYTHON’ USING ‘http://external.resources/flink-udf.py’
tempSystemFunctions :存儲UDX函數,就是本文所要闡述的內容。
通過本階段以後,myUdf 這個UDX函數,就作爲 "UDF_FRENQUENCY" 註冊到了系統中,能夠在後續的SQL中進行調用操做。
此時,Flink已經完成了以下操做:
Flink將RelNode串成了一個鏈,具體是由類實例的input完成這個串聯任務,即input指向本實例的上游輸入。
LogicalFilter的 input 是 LogicalProject,LogicalProject 的 input 是FlinkLogicalDataSetScan。而FlinkLogicalDataSetScan 的table中就能夠知道具體輸入表的信息。
這個RelNode鏈具體以下。
== Abstract Syntax Tree == LogicalProject(word=[$0], myFrequency=[$1]) LogicalFilter(condition=[<>($1, 0)]) LogicalProject(word=[$0], myFrequency=[UDF_FRENQUENCY($1)]) FlinkLogicalDataSetScan(ref=[1976870927], fields=[word, frequency]) 每一部分都是由 input 指向完成的。
這裏的重點是 " myFrequency <> 0" 被轉換爲 LogicalFilter。這卻是容易理解,由於 WHERE 子句實際就是用來過濾的,因此轉換爲 LogicalFilter合情合理。
另外須要注意的是:在構建RelNode鏈的時候 ,Flink已經從TableEnvImpl.functionCatalog 這個成員變量之中提取到了以前註冊的myUdf 這個UDF函數實例。當須要獲取UDF實例時候,calcite會在 SqlOperatorTable table
中尋找UDF,進而就調用到了FunctionCatalog.lookupFunction
這裏,從LinkedHashMap中取得實例。
具體是SqlToRelConverter函數中會將SQL語句轉換爲RelNode,在SqlToRelConverter (org.apache.calcite.sql2rel)完成,其打印內容摘要以下:
filter = {LogicalFilter@4814} "LogicalFilter#2" variablesSet = {RegularImmutableSet@4772} size = 0 condition = {RexCall@4771} "<>($1, 0)" input = {LogicalProject@4770} "LogicalProject#1" exps = {RegularImmutableList@4821} size = 2 input = {FlinkLogicalDataSetScan@4822} "FlinkLogicalDataSetScan#0" cluster = {RelOptCluster@4815} catalog = {CatalogReader@4826} dataSet = {DataSource@4827} fieldIdxs = {int[2]@4828} schema = {RelRecordType@4829} "RecordType(VARCHAR(65536) word, BIGINT frequency)" table = {RelOptTableImpl@4830} schema = {CatalogReader@4826} rowType = {RelRecordType@4829} "RecordType(VARCHAR(65536) word, BIGINT frequency)"
create:107, LogicalFilter (org.apache.calcite.rel.logical) createFilter:333, RelFactories$FilterFactoryImpl (org.apache.calcite.rel.core) convertWhere:993, SqlToRelConverter (org.apache.calcite.sql2rel) convertSelectImpl:649, SqlToRelConverter (org.apache.calcite.sql2rel) convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel) convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel) convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel) rel:150, FlinkPlannerImpl (org.apache.flink.table.calcite) rel:135, FlinkPlannerImpl (org.apache.flink.table.calcite) toQueryOperation:490, SqlToOperationConverter (org.apache.flink.table.sqlexec) convertSqlQuery:315, SqlToOperationConverter (org.apache.flink.table.sqlexec) convert:155, SqlToOperationConverter (org.apache.flink.table.sqlexec) parse:66, ParserImpl (org.apache.flink.table.planner) sqlQuery:457, TableEnvImpl (org.apache.flink.table.api.internal) main:55, TestUdf$ (mytestpackage) main:-1, TestUdf (mytestpackage)
下面是優化部分。優化規則分爲兩類,一類是Calcite提供的內置優化規則(如條件下推,剪枝等),另外一類是是將Logical Node轉變成 Flink Node 的規則。
這裏Flink發現了FilterToCalcRule 這個rule適合對Filter進行切換。
咱們思考下可知,Filter的Condition條件是須要進行計算才能得到的,因此須要轉換爲Calc。
具體源碼在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)
call = {VolcanoRuleMatch@5576} "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]" targetSet = {RelSet@5581} targetSubset = null digest = "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]" cachedImportance = 0.891 volcanoPlanner = {VolcanoPlanner@5526} generatedRelList = null id = 45 operand0 = {RelOptRuleOperand@5579} nodeInputs = {RegularImmutableBiMap@5530} size = 0 rule = {FilterToCalcRule@5575} "FilterToCalcRule" rels = {RelNode[1]@5582} planner = {VolcanoPlanner@5526} parents = null
onMatch:65, FilterToCalcRule (org.apache.calcite.rel.rules) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan) optimizeLogicalPlan:199, Optimizer (org.apache.flink.table.plan) optimize:56, BatchOptimizer (org.apache.flink.table.plan) translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal) toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toDataSet:53, TableConversions (org.apache.flink.table.api.scala) main:57, TestUdf$ (mytestpackage) main:-1, TestUdf (mytestpackage)
由於上述的FilterToCalcRule,因此生成了 LogicalCalc。咱們也能夠看到這裏就是包含了UDF_FRENQUENCY。
calc = {LogicalCalc@5632} "LogicalCalc#60" program = {RexProgram@5631} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], proj#0..1=[{exprs}], $condition=[$t4])" input = {RelSubset@5605} "rel#32:Subset#0.LOGICAL" desc = "LogicalCalc#60" rowType = {RelRecordType@5629} "RecordType(VARCHAR(65536) word, BIGINT frequency)" digest = "LogicalCalc#60" cluster = {RelOptCluster@5596} id = 60 traitSet = {RelTraitSet@5597} size = 1
通過轉換,最後獲得了physical RelNode,即物理 RelNode 執行計劃 DataSetCalc。
== Optimized Logical Plan == DataSetCalc(select=[word, UDF_FRENQUENCY(frequency) AS myFrequency], where=[<>(UDF_FRENQUENCY(frequency), 0:BIGINT)]) DataSetScan(ref=[1976870927], fields=[word, frequency])
具體源碼在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)。
// 這裏給出了執行函數,運行內容和調用棧 ConverterRule.onMatch(RelOptRuleCall call) { RelNode rel = call.rel(0); if (rel.getTraitSet().contains(this.inTrait)) { RelNode converted = this.convert(rel); if (converted != null) { call.transformTo(converted); } } } // 轉換後的 DataSetCalc 內容以下 converted = {DataSetCalc@5560} "Calc(where: (<>(UDF_FRENQUENCY(frequency), 0:BIGINT)), select: (word, UDF_FRENQUENCY(frequency) AS myFrequency))" cluster = {RelOptCluster@5562} rowRelDataType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)" calcProgram = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])" ruleDescription = "DataSetCalcRule" program = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])" input = {RelSubset@5564} "rel#71:Subset#5.DATASET" desc = "DataSetCalc#72" rowType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)" digest = "DataSetCalc#72" AbstractRelNode.cluster = {RelOptCluster@5562} id = 72 traitSet = {RelTraitSet@5563} size = 1
init:52, DataSetCalc (org.apache.flink.table.plan.nodes.dataset) convert:40, DataSetCalcRule (org.apache.flink.table.plan.rules.dataSet) onMatch:144, ConverterRule (org.apache.calcite.rel.convert) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan) optimizePhysicalPlan:209, Optimizer (org.apache.flink.table.plan) optimize:57, BatchOptimizer (org.apache.flink.table.plan) translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal) toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toDataSet:53, TableConversions (org.apache.flink.table.api.scala) main:57, TestUdf$ (mytestpackage) main:-1, TestUdf (mytestpackage)
在DataSetCalc中,會最後生成UDF對應的JAVA代碼。
class DataSetCalc { override def translateToPlan( tableEnv: BatchTableEnvImpl, queryConfig: BatchQueryConfig): DataSet[Row] = { ...... // 這裏生成了UDF對應的JAVA代碼 val genFunction = generateFunction( generator, ruleDescription, new RowSchema(getRowType), projection, condition, config, classOf[FlatMapFunction[Row, Row]]) // 這裏生成了FlatMapRunner val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType) inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString)) } }
translateToPlan:90, DataSetCalc (org.apache.flink.table.plan.nodes.dataset) translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal) translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal) toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toDataSet:53, TableConversions (org.apache.flink.table.api.scala) main:57, TestUdf$ (mytestpackage) main:-1, TestUdf (mytestpackage)
真正生成代碼的位置以下,能看出來生成代碼是FlatMapFunction。而本文的問題點就出如今這裏。
具體緣由從下面代碼的註釋中可以看出:針對本示例代碼,最後是生成了
// 下面能看出,針對不一樣的SQL子句,Flink會進行不一樣的轉化 trait CommonCalc { private[flink] def generateFunction[T <: Function]( generator: FunctionCodeGenerator, ruleDescription: String, returnSchema: RowSchema, calcProjection: Seq[RexNode], calcCondition: Option[RexNode], config: TableConfig, functionClass: Class[T]): GeneratedFunction[T, Row] = { // 生成投射內容,就是 SELECT。filterCondition實際上已經生成包含了調用UDF的代碼,下面會給出其內容 val projection = generator.generateResultExpression( returnSchema.typeInfo, returnSchema.fieldNames, calcProjection) // only projection val body = if (calcCondition.isEmpty) { s""" |${projection.code} |${generator.collectorTerm}.collect(${projection.resultTerm}); |""".stripMargin } else { // 生成過濾條件,就是 WHERE。filterCondition實際上已經生成包含了調用UDF的代碼,下面會給出其內容 val filterCondition = generator.generateExpression(calcCondition.get) // only filter if (projection == null) { s""" |${filterCondition.code} |if (${filterCondition.resultTerm}) { | ${generator.collectorTerm}.collect(${generator.input1Term}); |} |""".stripMargin } // both filter and projection else { // 本例中,會進入到這裏。把 filterCondition 和 projection 代碼拼接起來。這下子就有了兩個 UDF 的調用。 s""" |${filterCondition.code} |if (${filterCondition.resultTerm}) { | ${projection.code} | ${generator.collectorTerm}.collect(${projection.resultTerm}); |} |""".stripMargin } } // body 是filterCondition 和 projection 代碼的拼接,分別都有 UDF 的調用,如今就有了兩個UDF調用了,也就是咱們問題所在。 generator.generateFunction( ruleDescription, functionClass, body, returnSchema.typeInfo) } } // 此函數輸入中,calcCondition就是咱們SQL的過濾條件 calcCondition = {Some@5663} "Some(<>(UDF_FRENQUENCY($1), 0))" // 此函數輸入中,calcProjection就是咱們SQL的投影運算條件 calcProjection = {ArrayBuffer@5662} "ArrayBuffer" size = 2 0 = {RexInputRef@7344} "$0" 1 = {RexCall@7345} "UDF_FRENQUENCY($1)" // 生成過濾條件,就是 WHERE 對應的代碼。filterCondition實際上已經生成包含了調用UDF的代碼 filterCondition = {GeneratedExpression@5749} "GeneratedExpression(result$16,isNull$17,\n\n\n\njava.lang.Long result$12 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n result$13 = -1L;\n}\nelse {\n result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n result$16 = false;\n}\nelse {\n result$16 = result$13 != result$15;\n}\n,Boolean,false)" // 生成投影運算,就是 SELECT 對應的代碼。projection也包含了調用UDF的代碼 projection = {GeneratedExpression@5738} "GeneratedExpression(out,false,\n\nif (isNull$6) {\n out.setField(0, null);\n}\nelse {\n out.setField(0, result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n result$10 = -1L;\n}\nelse {\n result$10 = result$9;\n}\n\n\nif (isNull$11) {\n out.setField(1, null);\n}\nelse {\n out.setField(1, result$10);\n}\n,Row(word: String, myFrequency: Long),false)" // 具體這個類實際上是 DataSetCalcRule extends RichFlatMapFunction name = "DataSetCalcRule" // 生成的類 clazz = {Class@5773} "interface org.apache.flink.api.common.functions.FlatMapFunction" // 生成類的部分代碼,這裏對應的是UDF的業務內容 bodyCode = "\n\n\n\n\njava.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n result$13 = -1L;\n}\nelse {\n result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n result$16 = false;\n}\nelse {\n result$16 = result$13 != result$15;\n}\n\nif (result$16) {\n \n\nif (isNull$6) {\n out.setField(0, null);\n}\nelse {\n out.setField(0, result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n result$10 = -1L;\n}\nelse {\n result$10 = result$9;\n}\n\n\nif (isNull$11) {\n out.setField(1, null);\n}\nelse {\n out.setField(1, result$10);\n}\n\n c.collect(out);\n}\n"
generateFunction:94, FunctionCodeGenerator (org.apache.flink.table.codegen) generateFunction:79, CommonCalc$class (org.apache.flink.table.plan.nodes) generateFunction:45, DataSetCalc (org.apache.flink.table.plan.nodes.dataset) translateToPlan:105, DataSetCalc (org.apache.flink.table.plan.nodes.dataset) translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal) translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal) toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toDataSet:53, TableConversions (org.apache.flink.table.api.scala) main:57, TestUdf$ (mytestpackage) main:-1, TestUdf (mytestpackage)
最後還要重點說明下Flink對於SQL代碼最後的轉換包裝。
前面提到了,Flink把UDF作爲RichFunction的一部分來實現。事實上,Flink是把SQL整條語句轉化爲一個RichFunction。示例中的兩條SQL語句,分別轉換爲 RichMapFunction 和 RichFlatMapFunction。具體從下面物理執行計劃中能夠看出。
== Physical Execution Plan == Stage 3 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : from: (word, frequency) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : FlatMap content : where: (<>(UDF_FRENQUENCY(frequency), 0:BIGINT)), select: (word, UDF_FRENQUENCY(frequency) AS myFrequency) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED
咱們在org.apache.flink.table.runtime目錄下,能夠看到Flink針對每一種 physical RelNode,都定義了一種RichFunction,摘錄以下:
CRowCorrelateProcessRunner.scala FlatMapRunner.scala CRowMapRunner.scala MapJoinLeftRunner.scala CRowOutputProcessRunner.scala MapJoinRightRunner.scala CRowProcessRunner.scala MapRunner.scala CorrelateFlatMapRunner.scala MapSideJoinRunner.scala FlatJoinRunner.scala
實例中第二條SQL語句其類別就是 DataSetCalcRule extends RichFlatMapFunction。從定義可以看出來,FlatMapRunner繼承了RichFlatMapFunction,說明 Flink認爲本條SQL就是一個Flatmap操做。
package org.apache.flink.table.runtime class FlatMapRunner( name: String, code: String, @transient var returnType: TypeInformation[Row]) extends RichFlatMapFunction[Row, Row] ... { private var function: FlatMapFunction[Row, Row] = _ ... override def flatMap(in: Row, out: Collector[Row]): Unit = function.flatMap(in, out) ... }
這裏是生成的代碼縮減版,能看出具體問題點,myUdf函數被執行了兩次。
function_mytestpackage\(myUdf\)c45b0e23278f15e8f7d075abac9a121b 這個就是 myUdf 轉換以後的函數。
// 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0" java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval( isNull$8 ? null : (java.lang.Long) result$7); // 此次 UDF 調用對應 WHERE myFrequency <> 0 boolean isNull$14 = result$12 == null; boolean isNull$17 = isNull$14 || false; boolean result$16; if (isNull$17) { result$16 = false; } else { result$16 = result$13 != result$15; } if (result$16) { // 這裏說明 myFrequency <> 0,因此能夠進入 java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval( isNull$8 ? null : (java.lang.Long) result$7); // 這裏對應的是 SELECT myFrequency,注意的是,這裏又調用了 UDF,從新計算了一遍,因此 UDF 纔不該該有狀態信息。 boolean isNull$11 = result$9 == null; long result$10; if (isNull$11) { result$10 = -1L; } else { result$10 = result$9; // 這裏才進行SELECT myFrequency,可是這時候 UDF 已經被計算兩次了 } }
如下是生成的代碼,由於是自動生成,因此看起來會有點費勁,不過好在已是最後一步了。
public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction { final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b; final org.apache.flink.types.Row out = new org.apache.flink.types.Row(2); private org.apache.flink.types.Row in1; public DataSetCalcRule$18() throws Exception { function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf) org.apache.flink.table.utils.EncodingUtils.decodeStringToObject( "rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA", org.apache.flink.table.functions.UserDefinedFunction.class); } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); } @Override public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception { in1 = (org.apache.flink.types.Row) _in1; boolean isNull$6 = (java.lang.String) in1.getField(0) == null; java.lang.String result$5; if (isNull$6) { result$5 = ""; } else { result$5 = (java.lang.String) (java.lang.String) in1.getField(0); } boolean isNull$8 = (java.lang.Long) in1.getField(1) == null; long result$7; if (isNull$8) { result$7 = -1L; } else { result$7 = (java.lang.Long) in1.getField(1); } java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval( isNull$8 ? null : (java.lang.Long) result$7); boolean isNull$14 = result$12 == null; long result$13; if (isNull$14) { result$13 = -1L; } else { result$13 = result$12; } long result$15 = 0L; boolean isNull$17 = isNull$14 || false; boolean result$16; if (isNull$17) { result$16 = false; } else { result$16 = result$13 != result$15; } if (result$16) { if (isNull$6) { out.setField(0, null); } else { out.setField(0, result$5); } java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval( isNull$8 ? null : (java.lang.Long) result$7); boolean isNull$11 = result$9 == null; long result$10; if (isNull$11) { result$10 = -1L; } else { result$10 = result$9; } if (isNull$11) { out.setField(1, null); } else { out.setField(1, result$10); } c.collect(out); } } @Override public void close() throws Exception { function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close(); } }
至此,咱們把Flink SQL如何生成JAVA代碼的流程大體走了一遍。
Flink生成的內部代碼,是把"投影運算"和"過濾條件"分別生成,而後拼接在一塊兒。
即便原始SQL中只有一次UDF調用,可是若是SELECT和WHERE都間接用到了UDF,那麼最終"投影運算"和"過濾條件"就會分別調用了UDF,因此拼接以後就會有多個UDF調用。
這就是 "UDF不該該有內部歷史狀態" 的最終緣由。咱們在實際開發過程當中必定要注意這個問題。