本文主要研究一下flink Table的Joinshtml
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.join(right).where("a = d").select("a, b, e");
Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e"); Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e"); Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime"); Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime"); Table result = left.join(right) .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes") .select("a, b, e, ltime");
可使用<, <=, >=, >
)// register User-Defined Table Function TableFunction<String> split = new MySplitUDTF(); tableEnv.registerFunction("split", split); // join Table orders = tableEnv.scan("Orders"); Table result = orders .join(new Table(tableEnv, "split(c)").as("s", "t", "v")) .select("a, b, s, t, v");
// register User-Defined Table Function TableFunction<String> split = new MySplitUDTF(); tableEnv.registerFunction("split", split); // join Table orders = tableEnv.scan("Orders"); Table result = orders .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v")) .select("a, b, s, t, v");
Table ratesHistory = tableEnv.scan("RatesHistory"); // register temporal table function with a time attribute and primary key TemporalTableFunction rates = ratesHistory.createTemporalTableFunction( "r_proctime", "r_currency"); tableEnv.registerFunction("rates", rates); // join with "Orders" based on the time attribute and key Table orders = tableEnv.scan("Orders"); Table result = orders .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalanode
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { //...... def join(right: Table): Table = { join(right, None, JoinType.INNER) } def join(right: Table, joinPredicate: String): Table = { join(right, joinPredicate, JoinType.INNER) } def join(right: Table, joinPredicate: Expression): Table = { join(right, Some(joinPredicate), JoinType.INNER) } def leftOuterJoin(right: Table): Table = { join(right, None, JoinType.LEFT_OUTER) } def leftOuterJoin(right: Table, joinPredicate: String): Table = { join(right, joinPredicate, JoinType.LEFT_OUTER) } def leftOuterJoin(right: Table, joinPredicate: Expression): Table = { join(right, Some(joinPredicate), JoinType.LEFT_OUTER) } def rightOuterJoin(right: Table, joinPredicate: String): Table = { join(right, joinPredicate, JoinType.RIGHT_OUTER) } def rightOuterJoin(right: Table, joinPredicate: Expression): Table = { join(right, Some(joinPredicate), JoinType.RIGHT_OUTER) } def fullOuterJoin(right: Table, joinPredicate: String): Table = { join(right, joinPredicate, JoinType.FULL_OUTER) } def fullOuterJoin(right: Table, joinPredicate: Expression): Table = { join(right, Some(joinPredicate), JoinType.FULL_OUTER) } private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = { val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate) join(right, Some(joinPredicateExpr), joinType) } private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { // check if we join with a table or a table function if (!containsUnboundedUDTFCall(right.logicalPlan)) { // regular table-table join // check that the TableEnvironment of right table is not null // and right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } new Table( tableEnv, Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false) .validate(tableEnv)) } else { // join with a table function // check join type if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) { throw new ValidationException( "TableFunctions are currently supported for join and leftOuterJoin.") } val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall] val udtfCall = LogicalTableFunctionCall( udtf.functionName, udtf.tableFunction, udtf.parameters, udtf.resultType, udtf.fieldNames, this.logicalPlan ).validate(tableEnv) new Table( tableEnv, Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true) .validate(tableEnv)) } } //...... }
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scalaexpress
case class Join( left: LogicalNode, right: LogicalNode, joinType: JoinType, condition: Option[Expression], correlated: Boolean) extends BinaryNode { override def output: Seq[Attribute] = { left.output ++ right.output } private case class JoinFieldReference( name: String, resultType: TypeInformation[_], left: LogicalNode, right: LogicalNode) extends Attribute { val isFromLeftInput: Boolean = left.output.map(_.name).contains(name) val (indexInInput, indexInJoin) = if (isFromLeftInput) { val indexInLeft = left.output.map(_.name).indexOf(name) (indexInLeft, indexInLeft) } else { val indexInRight = right.output.map(_.name).indexOf(name) (indexInRight, indexInRight + left.output.length) } override def toString = s"'$name" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { // look up type of field val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType // create a new RexInputRef with index offset new RexInputRef(indexInJoin, fieldType) } override def withName(newName: String): Attribute = { if (newName == name) { this } else { JoinFieldReference(newName, resultType, left, right) } } } override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { val node = super.resolveExpressions(tableEnv).asInstanceOf[Join] val partialFunction: PartialFunction[Expression, Expression] = { case field: ResolvedFieldReference => JoinFieldReference( field.name, field.resultType, left, right) } val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction)) Join(node.left, node.right, node.joinType, resolvedCondition, correlated) } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { left.construct(relBuilder) right.construct(relBuilder) val corSet = mutable.Set[CorrelationId]() if (correlated) { corSet += relBuilder.peek().getCluster.createCorrel() } relBuilder.join( convertJoinType(joinType), condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)), corSet.asJava) } private def convertJoinType(joinType: JoinType) = joinType match { case JoinType.INNER => JoinRelType.INNER case JoinType.LEFT_OUTER => JoinRelType.LEFT case JoinType.RIGHT_OUTER => JoinRelType.RIGHT case JoinType.FULL_OUTER => JoinRelType.FULL } private def ambiguousName: Set[String] = left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet) override def validate(tableEnv: TableEnvironment): LogicalNode = { val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join] if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) { failValidation(s"Filter operator requires a boolean expression as input, " + s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}") } else if (ambiguousName.nonEmpty) { failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}") } resolvedJoin.condition.foreach(testJoinCondition) resolvedJoin } private def testJoinCondition(expression: Expression): Unit = { def checkIfJoinCondition(exp: BinaryComparison) = exp.children match { case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil if x.isFromLeftInput != y.isFromLeftInput => true case _ => false } def checkIfFilterCondition(exp: BinaryComparison) = exp.children match { case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false case (x: JoinFieldReference) :: (_) :: Nil => true case (_) :: (y: JoinFieldReference) :: Nil => true case _ => false } var equiJoinPredicateFound = false // Whether the predicate is literal true. val alwaysTrue = expression match { case x: Literal if x.value.equals(true) => true case _ => false } def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match { case x: And => x.children.foreach(validateConditions(_, isAndBranch)) case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false)) case x: EqualTo => if (isAndBranch && checkIfJoinCondition(x)) { equiJoinPredicateFound = true } case x: BinaryComparison => // The boolean literal should be a valid condition type. case x: Literal if x.resultType == Types.BOOLEAN => case x => failValidation( s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x") } validateConditions(expression, isAndBranch = true) // Due to a bug in Apache Calcite (see CALCITE-2004 and FLINK-7865) we cannot accept join // predicates except literal true for TableFunction left outer join. if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) { if (!alwaysTrue) failValidation("TableFunction left outer join predicate can only be " + "empty or literal true.") } else { if (!equiJoinPredicateFound) { failValidation( s"Invalid join condition: $expression. At least one equi-join predicate is " + s"required.") } } } }