環境極其惡劣狀況下:java
import org.apache.spark.SparkContextnode
import org.apache.spark.rdd.RDDsql
import org.apache.spark.sql.{DataFrame, Row, SQLContext}apache
import org.apache.spark.sql.hive.HiveContextoop
val sqlContext = new HiveContext(sc)測試
val sql = sqlContext.sql("select * from ysylbs9 ").collectfetch
中間發生報錯:spa
cluster.YarnScheduler: Lost executor 2 on zdbdsps025.iccc.com: Container marked as failed: container_e55_1478671093534_0624_01_000003 on host: zdbdsps025.iccc.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143scala
Container exited with a non-zero exit code 143調試
Killed by external signal
是由於yarn管理的某個節點掉了,因此spark將任務移至其餘節點執行:
16/11/15 14:24:28 WARN scheduler.TaskSetManager: Lost task 224.0 in stage 0.0 (TID 224, zdbdsps025.iccc.com): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_e55_1478671093534_0624_01_000003 on host: zdbdsps025.iccc.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal
16/11/15 14:24:28 INFO cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 2
中間又報錯:
16/11/15 14:30:43 WARN spark.HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 133569 ms exceeds timeout 120000 ms
16/11/15 14:30:43 ERROR cluster.YarnScheduler: Lost executor 6 on zdbdsps027.iccc.com: Executor heartbeat timed out after 133569 ms
每一個task 都超時了
16/11/15 14:30:43 WARN scheduler.TaskSetManager: Lost task 329.0 in stage 0.0 (TID 382, zdbdsps027.iccc.com): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 133569 ms
DAGScheduler發現Executor 6 也掛了,因而將executor移除
16/11/15 14:30:43 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 1)
16/11/15 14:30:43 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.
16/11/15 14:30:43 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, zdbdsps027.iccc.com, 38641)
16/11/15 14:30:43 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor
16/11/15 14:30:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6
而後移至其餘節點,隨後又發現RPC出現問題
16/11/15 14:32:58 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=4735002570883429008, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=47]}} to zdbdsps027.iccc.com/172.19.189.53:51057; closing connection
java.io.IOException: 斷開的管道
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
Spark是移動計算而不是移動數據的,因此因爲其餘節點掛了,因此任務在數據不在的節點,再進行拉取,因爲極端狀況下,環境惡劣,經過namenode知道數據所在節點位置,spark依舊會去有問題的節點fetch數據,因此還會報錯 再次kill掉,因爲hadoop是備份三份數據的,spark經過會去其餘節點拉取數據。隨之一直髮現只在一個節點完成task. 最終問題查找,yarn的節點掛了,
下面是部分代碼調試:
import org.slf4j.{Logger, LoggerFactory}
import java.util.{Calendar, Date, GregorianCalendar}
import algorithm.DistanceCalculator
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{HTable, Scan}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.mutable.ArrayBuffer
case class LBS_STATIC_TABLE(LS_certifier_no: String,LS_location: String,LS_phone_no: String,time: String)
該case class 做爲最終註冊轉換爲hive表
val logger: Logger = LoggerFactory.getLogger(LbsCalculator.getClass)
//從hbase獲取數據轉換爲RDD
def hbaseInit() = {
val tableName = "EVENT_LOG_LBS_HIS"
val conf = HBaseConfiguration.create()
// conf.addResource("hbase-site.xml ")
val HTable = new HTable(conf, tableName)
HTable
}
def tableInitByTime(sc : SparkContext,tablename:String,columns :String,fromdate: Date,todate:Date):RDD[(ImmutableBytesWritable,Result)] = {
val configuration = HBaseConfiguration.create()
//這裏上生產註釋掉,調試時可打開,由於提交yarn會自動加載yarn管理的hbase配置文件
configuration.addResource("hbase-site.xml")
configuration.set(TableInputFormat.INPUT_TABLE, tablename)
val scan = new Scan
//這裏按timestrap進行過濾,比用scan過濾器要高效,由於用hbase的過濾器其實也是先scan全表再進行過濾的,效率很低。
scan.setTimeRange(fromdate.getTime,todate.getTime)
val column = columns.split(",")
for(columnName <- column){
scan.addColumn("f1".getBytes, columnName.getBytes)
}
val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
System.out.println(hbaseRDD.count())
hbaseRDD
}
//這裏寫了一種過濾器方法,後續將全部hbase過濾器方法寫成公共類
val filter: Filter = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator("20160830"))
scan.setFilter(filter)
//這裏要注意,拿到的數據在1個partition中,在拿到後須要進行repartition,由於若是一個task可以承載好比1G的數據,那麼將只有1個patition,因此要從新repatition加大後續計算的並行度。這裏repatition的個數須要根據具體多少數據量,進行調整,後續測試完畢寫成公共方法。經過Rdd map 轉換爲(身份證號,經緯度座標,手機號碼,時間)這裏就將獲取的數據repatition了
val transRDD = hbRDD.repartition(200).map{ p => {
val id =Bytes.toString(p._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))
val loc = Bytes.toString(p._2.getValue("f1".getBytes, "LS_location".getBytes))
val phone = Bytes.toString(p._2.getValue("f1".getBytes, "LS_phone_no".getBytes))
val rowkey = Bytes.toString(p._2.getValue("f1".getBytes, "rowkey".getBytes))
val hour = rowkey.split("-")(2).substring(8,10)
(id,loc,phone,hour)
}
}
//這裏進行了字段過濾,由於不少時候數據具備不完整性,會致使後續計算錯誤
val calculateRDD = transRDD.repartition(200).filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null)
須要注意的是reduceByKey並不會在監控頁面單獨爲其建立監控stage,因此你會發現與以前的map(filer)的stage中,同時監控中會發現已經進行了repartition
.reduceByKey(_ + _)
//進行hiveContext對象的建立,爲後續進行表操做作準備。
val hiveSqlContext = HiveTableHelper.hiveTableInit(sc)
def hiveTableInit(sc:SparkContext): HiveContext ={
val sqlContext = new HiveContext(sc)
sqlContext
}
//傳入以前數據分析過的結果,生成表
val hiveRDD = hRDD.map(p => LBS_STATIC_TABLE(p._1,p._2,p._3,p._4,p._5)
//建立DataFrame並以parquet格式保存爲表。這裏須要注意的是,儘可能少的直接用hiveSqlContext.sql()直接輸入sql的形式,由於這樣還會走spark本身的解析器。須要調用RDD的DataFrame API會加快數據處理速度。後續整理全部算子。
val hiveRDDSchema = hiveSqlContext.createDataFrame(hiveRDD)
val aaa = hiveRDDSchema.show(10)
hiveSqlContext.sql("drop table if exists " + hivetablename)
hiveRDDSchema.registerTempTable("LBS_STATIC_TABLE")
hiveRDDSchema.write.format("parquet").saveAsTable(hivetablename)