在Java中建立一個新Rowjava
import org.apache.spark.sql.RowFactory; Row row = RowFactory.create("odd", i);
在Java中建立一個Seqsql
import scala.collection.JavaConversions; import scala.collection.Seq; import static java.util.Arrays.asList; Seq<String> seq = JavaConversions.asScalaBuffer(asList("col_1","col_2"));
List<Row> data = new ArrayList<>(); for(int i=0;i<5;i++){ List<String> mlist = new ArrayList<>(); mlist.add("odd"); mlist.add(String.valueOf(i)); Row row = RowFactory.create(mlist.toArray()); data.add(row); } StructType schema = DataTypes.createStructType(new StructField[]{ createStructField("types", DataTypes.StringType, false), createStructField("nums", DataTypes.StringType, false) } ); List<Row> data2 = new ArrayList<>(); for(int i=0;i<5;i++){ List<String> mlist = new ArrayList<>(); mlist.add("odd"); mlist.add(String.valueOf(i)); mlist.add(String.valueOf(5-i)); Row row = RowFactory.create(mlist.toArray()); data2.add(row); } StructType schema2 = DataTypes.createStructType(new StructField[]{ createStructField("types", DataTypes.StringType, false), createStructField("nums", DataTypes.StringType, false), createStructField("ad", DataTypes.StringType, false), } ); Dataset<Row> df = spark.createDataFrame(data, schema); Dataset<Row> df2 = spark.createDataFrame(data2, schema2); df.show(); df2.show(); Dataset<Row> df_join = df.join(df2, df.col("types").equalTo(df2.col("types")) .and(df.col("nums").equalTo(df2.col("nums"))), "inner" ); df_join.show(); df_join = df.join(df2, JavaConversions.asScalaBuffer(asList("nums","types"))); df_join.show();