Spark中的多線程併發處理

Spark中的多任務處理

Spark的一個很是常見的用例是並行運行許多做業。 構建做業DAG後,Spark將這些任務分配到多個Executor上並行處理。
但這並不能幫助咱們在同一個Spark應用程序中同時運行兩個徹底獨立的做業,例如同時從多個數據源讀取數據並將它們寫到對應的存儲,或同時處理多個文件等。java

每一個spark應用程序都須要一個SparkSession(Context)來配置和執行操做。 SparkSession對象是線程安全的,能夠根據須要傳遞給你的Spark應用程序。sql

順序執行的例子

import org.apache.spark.sql.SparkSession object FancyApp { def def appMain(args: Array[String]) = { // configure spark
    val spark = SparkSession .builder .appName("parjobs") .getOrCreate() val df = spark.sparkContext.parallelize(1 to 100).toDF doFancyDistinct(df, "hdfs:///dis.parquet") doFancySum(df, "hdfs:///sum.parquet") } def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath) def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath) }

優化後的例子

import org.apache.spark.sql.SparkSession import import java.util.concurrent.Executors import scala.concurrent._ import scala.concurrent.duration._ object FancyApp { def def appMain(args: Array[String]) = { // configure spark
    val spark = SparkSession .builder .appName("parjobs") .getOrCreate() // Set number of threads via a configuration property
    val pool = Executors.newFixedThreadPool(5) // create the implicit ExecutionContext based on our thread pool
    implicit val xc = ExecutionContext.fromExecutorService(pool) val df = spark.sparkContext.parallelize(1 to 100).toDF val taskA = doFancyDistinct(df, "hdfs:///dis.parquet") val taskB = doFancySum(df, "hdfs:///sum.parquet") // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES)) } def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future { df.distinct.write.parquet(outPath) } def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future { df.agg(sum("value")).write.parquet(outPath) } }

java 實現例子

val executors = Executors.newFixedThreadPool(threadPoolNum) val completionService = new ExecutorCompletionService[String](executors) for ((branch_id, dataList) <- summary) { logInfo(s"************** applicationId is ${applicationId} about Multi-threading starting: file is ${branch_id}") completionService.submit(new Callable[String] { override def call(): String = { new VerificationTest(spark, branch_id, dataList, separator).runJob() branch_id } }) }
相關文章
相關標籤/搜索