sparkjob的部署 ----------------- 1.client driver run on client 2.cluster driver on a worker 4.啓動job時,指定資源使用。 $>spark-submit --driver-memory MEM //設置driver內存,默認1g,配置2g --executor-memory MEM //控制每一個執行器內存,默認1g [只在standalone模式下] --driver-cores //控制driver使用的內核數,默認1. [standalone & mesos] --total-executor-cores NUM //控制執行器使用的總內核數 [standalone & yarn] --executor-cores NUM //控制每一個執行的內核數。 [yarn] --driver-cores NUM //控制driver內核數,默認1 --num-executors NUM //啓動的執行器個數,動態分配內核啓用時,數字就是Num的值。 5.啓動spark-shell,手動分配資源 //啓動3個executor,worker節點不能啓動2個executor spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 6g --total-executor-cores 4 --executor-cores 1 //啓動了4個executor, spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 4 --executor-cores 1 //啓動了7個executor, spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 22 --executor-cores 3 spark + yarn模式 -------------------- yarn模式,不須要spark集羣,只是在client安裝spark,提交做業時,走的是hadoop的流程。 使用spark的jar,在nodemanager上啓動的spark的executor進程。 --master的值指定yarn便可,rm的地址從配置文件中提取的。 --master yarn --deployMode client //--master yarn-client --master yarn --deployMode cluster //--master yarn-cluster [yarn-client] Appmaster只運行appmaster自身程序,負責資源請求。 Driver仍然位於client執行。 [yarn-cluster] appmaster不但負責資源請求,還負責運行driver。 //實操 1.中止spark集羣 stop-all.sh 2.啓動zk和hdfs-yarn start-yarn.sh 3.配置spark的spark-env.sh的HADOOP_CONF_DIR並分發. ... export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop 4.啓動spark-shell spark-shell --master yarn --deploy-mode client --num-executors 4 5.故障診斷 出現 is running beyond virtual memory limits. Current usage: 178.7 MB of 1 GB physical memory used; 2.3 GB of 2.1 GB virtual memory used. Killing container. 關閉yarn-site.xml虛擬內存檢查並分發文件。 [yarn-site.xml] <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 6.spark yarn運行時將spark的全部jar上傳到hdfs,協同hadoop的做業運行流程。 配置spark.yarn.jars或者spark.yarn.archive,避免每次上傳jar包。 1.spark.yarn.jars spark.yarn.jars=hdfs:///some/path 2.spark.yarn.archive spark.yarn.archive=hdfs://mycluster/user/centos/spark/spark-jars.zip 3.配置spark.yarn.archive屬性,避免每次上傳大的jar包。 a)上傳zip文件到hdfs://mycluster/user/centos/spark/spark-jars.zip b)配置spark配置文件。 [spark/conf/spark-default.conf] spark.yarn.archive hdfs://mycluster/user/centos/spark/spark-jars.zip c)啓動shell $>spark-shell --master yarn-client ShuffleMapTask ------------------ private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], //(rdd,dep) partition: Partition, @transient private var locs: Seq[TaskLocation], metrics: TaskMetrics, localProperties: Properties, jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None) } shuffle管理 ------------------- [ShuffleManager] ShuffleManager,是shuffle系統可插拔接口。 ShuffleManager在driver和每一個executor經過SparkEnv進行建立。 基於spark.shuffle.manager屬性配置建立相應shuffleManager實現。 在spark 2.1.0中只有SortShuffleManager. 在spark 1.6.0中有SortShuffleManager和HashShuffleManager. [HashShuffleManager] spark.shuffle.consolidateFiles=true,默認false,合併輸出。 slot = 併發能力 = 併發執行的線程數 = (執行器個數 * 每一個執行器的cpu內核數) / 每一個任務佔用的內核數。 spark 2.1.0的實現類是SortShuffleManager(不論sort仍是tungsten-sort(鎢絲排序)) [SortShuffleManager] 基於排序的shuffle,輸入kv按照目標分區的id進行排序,而後寫入一個map輸出文件。 reducer讀取連續文件區域來提取數據。map內存不足,溢出到磁盤,磁盤上的文件最終輸出到一個文件中。 該方式的shuffle有兩種途徑生成map輸出文件: 1.串行化排序(如下三個條件均知足使用) a)shuffle依賴沒有指定聚合或者輸出排序 b)shuffle序列化器支持序列化值得從新定位。(當前只有KryoSerializer和SQL的Serializer能夠,java不能夠) c)shuffle生成的分區少於16777216個. 2.反串行排序 全部其餘狀況。 [串行化排序模式] 該模式下,傳遞給ShuffleWriter的record便可被串行化,排序時也是串行化進行緩衝。該方式有幾點優化 處理: 1.對串行化的二進制數據進行排序,而不是針對java對象,所以能夠減小內存消耗和過分GC。 該優化機制要求串行化器具備特殊的屬性可以對串行的record進行重排序,不須要反串過程。 2.使用串行化的具備高效緩存特徵的sorter,能夠對壓縮的record指針和分區id的數組進行排序。 數組中,每條record使用8字節空間存儲。 3.溢出合併過程對串行化的數據塊(屬於同一分區)進行操做,而且合併期間不須要反串(流)。 4.支持壓縮文件塊的合成,合併過程簡單的將壓縮和串行化的分區最終合併成一個分區文件, 支持高效數據複製方式,例如NIO中的零拷貝。 ShuffleManager.registerShuffle() ----------------------------------- //1.經過ShuffleDep判斷是否須要bypass if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } //判斷依賴是否能夠串行shuffle else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } //基本shuffle else { new BaseShuffleHandle(shuffleId, numMaps, dependency) } 是否迂迴的條件 ------------------------- def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { //若是map端須要聚合,不能回調。 if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") false } //判斷依賴的分區數量是否小於指定的配置(默認時200) else { val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) dep.partitioner.numPartitions <= bypassMergeThreshold } } //結論 if(map須要聚合){ //不能迂迴 } else{ if(分區數 <= 200(可配:spark.shuffle.sort.bypassMergeThreshold)){ //能夠迂迴 } else{ //不能迂迴 } } 串行shuffle的判斷條件 ------------------------ def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions //判斷是否dep中使用的串行化器是否時kryo(kryo支持)。 if (!dependency.serializer.supportsRelocationOfSerializedObjects) { false } //判斷dep是否認義聚合器 else if (dependency.aggregator.isDefined) { false } //分區數大於特定值 else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { false } /// else { true } } //結論 if(不是kryo){ //不能用串行shuffle } // else if(dep定義了聚合器){ //不能用串行shuffle } else if(分區數 > (1 << 24) ){ //不能用串行shuffle } else{ //使用串行shuffle } 整個shuffle處理手段的優先級 --------------------------- //1.迂迴策略 if(可否迂迴){ //new BypassMergeSortShuffleHandle() } //2.串行策略 else if(是否串行){ //new SerializedShuffleHandle(); } //3.常規策略 else{ //new BaseShuffleHandle } SortShuffleManager.getWrtier() -------------------------------- handle match{ case SerializedShuffleHandle => new UnsafeShuffleWriter(); case BypassMergeSortShuffleHandle => new BypassMergeSortShuffleWriter(); case BaseShuffleHandle => new SortShuffleWriter(); } ShuffleWriter的特性 -------------------- abstract class ShuffleWriter | / \ --- | |------BypassMergeSortShuffleWriter |------UnsafeShuffleWriter |------SortShuffleWriter [BypassMergeSortShuffleWriter] 該類實現了hash方式的shuffle處理手段,將record寫入單獨文件,每一個分區一個文件。 而後對每一個分區文件合併再產生一個文件,文件的不一樣區域用於不一樣reduce,該模式下, record不在內存中緩存,這是和HashShuffleWriter本質不一樣點。 該方式對於有大量分區的shuffle處理效率不高,緣由是須要對全部分區同時打開串行化器 和文件流。 [UnsafeShuffleWriter] 將kv分開單獨以kryo串行寫入緩衝區,而後將緩衝放入ShuffleExternalSorter中。 1.ShuffleExternalSorter 專門用於基於sort的shuffle。record追加到date page,若是全部record插入 後或者內存到達limit值,這些記錄按照分區id進行排序,排序後的記錄寫入單獨 的輸出文件(或多個文件),輸出文件的格式和SortShuffleWriter輸出文件格式相同, 每條分區的記錄都是單獨串行和壓縮寫入的,一樣使用反串和解壓縮方式讀取。 和ExternalSorter不一樣,該對象不對溢出文件進行合併,而是將合併過程交給 UnsafeShuffleWriter,避免多餘串行和反串過程。 KV以串行和壓縮方式寫緩衝區,再將緩衝區字節數組寫入頁面內存(long[]),標記好 長度、偏移量、分區數等等,每一個KV在頁面內存的地址和分區進行編碼後寫入內存 排序器(InMemorySorter,該排序器使用分區id降序排列).若是內存頁默認超過1G( 能夠經過spark.shuffle.spill.numElementsForceSpillThreshold進行修改)個kv, 發生溢出,進行排序輸出到文件。 [SortShuffleWriter] Spark中的串行化 ------------------- spark默認使用java串行化器,但性能通常,優化手段之一 使用kryo串行化,可是kryo串行化器對於要串行化的類使用前 須要註冊,spark的kryo串行化器只是對java內置類、scala的內置 類核spark的內置類進行了註冊,自定義的類必須手動註冊。 也是沒有把kryo串行化器作爲默認設置的緣由. keyo串行化爲何快 —————————————————————————————————————————— 爲何kryo比其它的序列化方案要快? 爲每個類分配一個id 實現了本身的IntMap 代碼中一些取巧的地方: 利用變量memoizedRegistration和memoizedType記錄上一次的調用writeObject函數的Class,則若是兩次寫入同一類型時,能夠直接拿到,再也不查找HashMap。