故障描述
java
前段時間在測試Spark的RDD轉換的lazy特性是發現了一個Spark內部對taskSet在executor的運行分配不均勻問題。先上兩張圖出現問題時間點的圖,你們估計就明白怎麼回事了:
apache
再看看簡單的測試代碼:
dom
import org.apache.spark._ import org.apache.spark.storage.StorageLevel /** * Created by zhaozhengzeng on 2015/1/5. */ import java.util.Random import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ object JoinTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Spark count test"). set("spark.kryoserializer.buffer.max.mb", "128"). set("spark.shuffle.manager", "sort") // set("spark.default.parallelism","1000") val sc = new SparkContext(sparkConf) //鏈接表1 val textFile1 = sc.textFile("/user/hive/warehouse/test1.db/st_pc_lifecycle_list/dt=2014-07-01").map(p => { val line = p.split("\\|") (line(10), 1) } ).reduceByKey((x, y) => x + y) //測試RDD的lozy特性 val textFile3 = sc.textFile("/user/hive/warehouse/test1.db/st_pc_lifecycle_list/dt=2014-09-*").map(p => { val line = p.split("\\|") (line(11),"") }) val textFile2 = sc.textFile("/user/hive/warehouse/test1.db/st_pc_lifecycle_list/*").mapPartitions({ it => for { line <- it } yield (line.split("\\|")(10), "") }) val count = textFile1.join(textFile2).count() println("join 以後的記錄數據:" + count) //textFile1.saveAsTextFile("/user/hive/warehouse/test1.db/testRs/rs2") sc.stop() } }
描述下,上面代碼主要測試RDD的Join轉換,以及測試textFile3的translation的lazy特性。在整個測試過程經過觀察Spark UI看到上面這種TaskSet分佈不均勻狀況。第一個圖中的Active Task爲0的executor中在運行第一個stage的taskSet後,spark不會講第二個stage的taskSet分配到這些executor中執行了。可是奇怪的是這種狀況並非常常會出現,我再接下來的N次重跑做業又不會出現這種狀況,具體什麼緣由暫時沒法找到,連重現的機會都沒有,哈哈。這裏先記錄下吧,再觀察...ide