Spark Java使用DataFrame的foreach/foreachPartition

Spark已更新至2.x,DataFrame歸DataSet管了,所以API也相應統一。本文再也不適用2.0.0及以上版本。


DataFrame原生支持直接輸出到JDBC,但若是目標表有自增字段(好比id),那麼DataFrame就不能直接進行寫入了。由於DataFrame.write().jdbc()要求DataFrame的schema與目標表的表結構必須徹底一致(甚至字段順序都要一致),不然會拋異常,固然,若是你SaveMode選擇了Overwrite,那麼Spark刪除你原有的表,而後根據DataFrame的Schema生成一個。。。。字段類型會很是很是奇葩。。。。
因而咱們只能經過DataFrame.collect(),把整個DataFrame轉成List<Row>到Driver上,而後經過原生的JDBC方法進行寫入。可是若是DataFrame體積過於龐大,很容易致使Driver OOM(特別是咱們通常不會給Driver配置太高的內存)。這個問題真的很讓人糾結。
翻看Spark的JDBC源碼,發現其實是經過foreachPartition方法,在DataFrame每個分區中,對每一個Row的數據進行JDBC插入,那麼爲何咱們就不能直接用呢?java

Spark JdbcUtils.scala部分源碼:web

def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
        field.dataType match {
          case IntegerType => java.sql.Types.INTEGER
          case LongType => java.sql.Types.BIGINT
          case DoubleType => java.sql.Types.DOUBLE
          case FloatType => java.sql.Types.REAL
          case ShortType => java.sql.Types.INTEGER
          case ByteType => java.sql.Types.INTEGER
          case BooleanType => java.sql.Types.BIT
          case StringType => java.sql.Types.CLOB
          case BinaryType => java.sql.Types.BLOB
          case TimestampType => java.sql.Types.TIMESTAMP
          case DateType => java.sql.Types.DATE
          case t: DecimalType => java.sql.Types.DECIMAL
          case _ => throw new IllegalArgumentException(
            s"Can't translate null value for field $field")
        })
    }

    val rddSchema = df.schema
    val driver: String = DriverRegistry.getDriverClassName(url)
    val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
    // ****************** here ****************** 
    df.foreachPartition { iterator =>
      savePartition(getConnection, table, iterator, rddSchema, nullTypes)
    }
  }

嗯。。。既然Scala能實現,那麼做爲他的爸爸,Java也應該能玩!
咱們看看foreachPartition的方法原型:sql

def foreachPartition(f: Iterator[Row] => Unit)

又是函數式語言最愛的匿名函數。。。很是討厭寫lambda,因此咱們仍是實現個匿名類吧。要實現的抽象類爲:
scala.runtime.AbstractFunction1<Iterator<Row>,BoxedUnit> 兩個模板參數,第一個很直觀,就是Row的迭代器,做爲函數的參數。第二個BoxedUnit,是函數的返回值。不熟悉Scala的可能會很困惑,其實這就是Scala的void。因爲Scala函數式編程的特性,代碼塊的末尾必須返回點什麼,因而他們就搞出了個unit來代替本應什麼都沒有的void(解釋得可能不是很準確,我是這麼理解的)。對於Java而言,咱們能夠直接使用BoxedUnit.UNIT,來獲得這個「什麼都沒有」的東西。
來玩耍一下吧!apache

df.foreachPartition(new AbstractFunction1<Iterator<Row>, BoxedUnit>() {
    @Override
    public BoxedUnit apply(Iterator<Row> it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
        return BoxedUnit.UNIT;
    }
});

嗯,maven complete一下,spark-submit看看~
好勒~拋異常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想一想以前實現UDF的時候,UDF1/2/3/4...各接口,都extends Serializable,也就是說,在Spark運行期間,Driver會把UDF接口實現類序列化,並在Executor中反序列化,執行call方法。。。這就不難理解了,咱們foreachPartition丟進去的類,也應該implements Serializable。這樣,咱們就得本身搞一個繼承AbstractFunction1<Iterator<Row>, BoxedUnit>,又實現Serializable的抽象類,給咱們這些匿名類去實現!編程

import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
}

但是每次都要return BoxedUnit.UNIT 搞得太彆扭了,沒一點Java的風格。app

import org.apache.spark.sql.Row;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
    @Override
    public BoxedUnit apply(Iterator<Row> it) {
        call(it);
        return BoxedUnit.UNIT;
    }
    
    public abstract void call(Iterator<Row> it);
}

因而咱們能夠直接Override call方法,就能夠用滿滿Java Style的代碼去玩耍了!maven

df.foreachPartition(new JavaForeachPartitionFunc() {
    @Override
    public void call(Iterator<Row> it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
    }
});

注意!咱們實現的匿名類的方法,其實是在executor上執行的,因此println是輸出到executor機器的stdout上。這個咱們能夠經過Spark的web ui,點擊具體Application的Executor頁面去查看(調試用的虛擬機集羣,手扶拖拉機同樣的配置,別吐槽了~)
hahaide

至於foreach方法同理。只不過把Iterator<Row> 換成 Row。具體怎麼搞,慢慢玩吧~~~
have fun~函數式編程

相關文章
相關標籤/搜索