Spark代碼調優(一)

環境極其惡劣狀況下:java

import org.apache.spark.SparkContextnode

import org.apache.spark.rdd.RDDsql

import org.apache.spark.sql.{DataFrame, Row, SQLContext}apache

import org.apache.spark.sql.hive.HiveContextoop

 

val sqlContext = new HiveContext(sc)測試

val sql = sqlContext.sql("select * from ysylbs9 ").collectfetch

 

中間發生報錯:spa

cluster.YarnScheduler: Lost executor 2 on zdbdsps025.iccc.com: Container marked as failed: container_e55_1478671093534_0624_01_000003 on host: zdbdsps025.iccc.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143scala

Container exited with a non-zero exit code 143調試

Killed by external signal

 

是由於yarn管理的某個節點掉了,因此spark將任務移至其餘節點執行:

 

16/11/15 14:24:28 WARN scheduler.TaskSetManager: Lost task 224.0 in stage 0.0 (TID 224, zdbdsps025.iccc.com): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_e55_1478671093534_0624_01_000003 on host: zdbdsps025.iccc.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

Killed by external signal

 

16/11/15 14:24:28 INFO cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 2

中間又報錯:

16/11/15 14:30:43 WARN spark.HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 133569 ms exceeds timeout 120000 ms

16/11/15 14:30:43 ERROR cluster.YarnScheduler: Lost executor 6 on zdbdsps027.iccc.com: Executor heartbeat timed out after 133569 ms

每一個task 都超時了

16/11/15 14:30:43 WARN scheduler.TaskSetManager: Lost task 329.0 in stage 0.0 (TID 382, zdbdsps027.iccc.com): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 133569 ms

 

 DAGScheduler發現Executor 6 也掛了,因而將executor移除

16/11/15 14:30:43 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 1)

16/11/15 14:30:43 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.

16/11/15 14:30:43 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, zdbdsps027.iccc.com, 38641)

16/11/15 14:30:43 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor

16/11/15 14:30:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6

 

 

 

而後移至其餘節點,隨後又發現RPC出現問題

16/11/15 14:32:58 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=4735002570883429008, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=47]}} to zdbdsps027.iccc.com/172.19.189.53:51057; closing connection

 

java.io.IOException: 斷開的管道

        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)

        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)

        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)

 

Spark是移動計算而不是移動數據的,因此因爲其餘節點掛了,因此任務在數據不在的節點,再進行拉取,因爲極端狀況下,環境惡劣,經過namenode知道數據所在節點位置,spark依舊會去有問題的節點fetch數據,因此還會報錯 再次kill掉,因爲hadoop是備份三份數據的,spark經過會去其餘節點拉取數據。隨之一直髮現只在一個節點完成task. 最終問題查找,yarn的節點掛了,

 

 

 

 

 

下面是部分代碼調試:

 

import org.slf4j.{Logger, LoggerFactory}

import java.util.{Calendar, Date, GregorianCalendar}

 

import algorithm.DistanceCalculator

import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}

import org.apache.hadoop.hbase.client.{HTable, Scan}

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.{SparkConf, SparkContext}

import org.slf4j.{Logger, LoggerFactory}

 

import scala.collection.mutable.ArrayBuffer

 

 

  case class LBS_STATIC_TABLE(LS_certifier_no: String,LS_location: String,LS_phone_no: String,time: String)

該case class 做爲最終註冊轉換爲hive表

  val logger: Logger = LoggerFactory.getLogger(LbsCalculator.getClass)

 

//從hbase獲取數據轉換爲RDD

def hbaseInit() = {

    val tableName = "EVENT_LOG_LBS_HIS"

    val conf = HBaseConfiguration.create()

   // conf.addResource("hbase-site.xml ")

    val HTable = new HTable(conf, tableName)

    HTable

  }

 

 

def tableInitByTime(sc : SparkContext,tablename:String,columns :String,fromdate: Date,todate:Date):RDD[(ImmutableBytesWritable,Result)] = {

val configuration = HBaseConfiguration.create()

//這裏上生產註釋掉,調試時可打開,由於提交yarn會自動加載yarn管理的hbase配置文件

    configuration.addResource("hbase-site.xml")

    configuration.set(TableInputFormat.INPUT_TABLE, tablename)

val scan = new Scan

//這裏按timestrap進行過濾,比用scan過濾器要高效,由於用hbase的過濾器其實也是先scan全表再進行過濾的,效率很低。

    scan.setTimeRange(fromdate.getTime,todate.getTime)

    val column = columns.split(",")

    for(columnName <- column){

      scan.addColumn("f1".getBytes, columnName.getBytes)

    }

    val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

    System.out.println(hbaseRDD.count())

    hbaseRDD

  }

 

//這裏寫了一種過濾器方法,後續將全部hbase過濾器方法寫成公共類

  val filter: Filter = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator("20160830"))

  scan.setFilter(filter)

 

 

   

 

//這裏要注意,拿到的數據在1個partition中,在拿到後須要進行repartition,由於若是一個task可以承載好比1G的數據,那麼將只有1個patition,因此要從新repatition加大後續計算的並行度。這裏repatition的個數須要根據具體多少數據量,進行調整,後續測試完畢寫成公共方法。經過Rdd map 轉換爲(身份證號,經緯度座標,手機號碼,時間)這裏就將獲取的數據repatition了

    val transRDD = hbRDD.repartition(200).map{ p => {

      val id =Bytes.toString(p._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))

      val loc = Bytes.toString(p._2.getValue("f1".getBytes, "LS_location".getBytes))

      val phone = Bytes.toString(p._2.getValue("f1".getBytes, "LS_phone_no".getBytes))

      val rowkey = Bytes.toString(p._2.getValue("f1".getBytes, "rowkey".getBytes))

      val hour = rowkey.split("-")(2).substring(8,10)

      (id,loc,phone,hour)

    }

}

 

//這裏進行了字段過濾,由於不少時候數據具備不完整性,會致使後續計算錯誤

val calculateRDD = transRDD.repartition(200).filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null)

須要注意的是reduceByKey並不會在監控頁面單獨爲其建立監控stage,因此你會發現與以前的map(filer)的stage中,同時監控中會發現已經進行了repartition

.reduceByKey(_ + _)

//進行hiveContext對象的建立,爲後續進行表操做作準備。

val hiveSqlContext = HiveTableHelper.hiveTableInit(sc)       

def hiveTableInit(sc:SparkContext): HiveContext ={

    val sqlContext = new HiveContext(sc)

    sqlContext

  }

//傳入以前數據分析過的結果,生成表

val hiveRDD = hRDD.map(p => LBS_STATIC_TABLE(p._1,p._2,p._3,p._4,p._5)

//建立DataFrame並以parquet格式保存爲表。這裏須要注意的是,儘可能少的直接用hiveSqlContext.sql()直接輸入sql的形式,由於這樣還會走spark本身的解析器。須要調用RDD的DataFrame API會加快數據處理速度。後續整理全部算子。

val hiveRDDSchema = hiveSqlContext.createDataFrame(hiveRDD)

      val aaa = hiveRDDSchema.show(10)

       hiveSqlContext.sql("drop table if exists " + hivetablename)

       hiveRDDSchema.registerTempTable("LBS_STATIC_TABLE")

       hiveRDDSchema.write.format("parquet").saveAsTable(hivetablename)

相關文章
相關標籤/搜索