源碼層面整理下咱們經常使用的操做RDD數據處理與分析的函數,從而能更好的應用於工做中。apache
鏈接Hbase,讀取hbase的過程,首先代碼以下:函數
def tableInitByTime(sc : SparkContext,tableName : String,columns : String,fromdate: Date,todate : Date) : RDD[(ImmutableBytesWritable,Result)] = { val configuration = HBaseConfiguration.create() configuration.addResource("hbase-site.xml ") configuration.set(TableInputFormat.INPUT_TABLE,tableName ) val scan = new Scan //scan.setTimeRange(fromdate.getTime,todate.getTime) val column = columns.split(",") for(columnName <- column){ scan.addColumn("f1".getBytes(),columnName.getBytes()) } val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) System.out.println(hbaseRDD.count()) hbaseRDD }
咱們來一點一點解析整個過程。oop
一、val configuration = HBaseConfiguration.create()ui
這個用過hbase的夥伴們都知道,加載配置文件,其實調用的是HBase的API,返回的RDD是個Configuration。加載的配置文件信息包含core-default.xml,core-site.xml,mapred-default.xml等。加載源碼以下:spa
二、隨之設置表名信息,並聲明scan對象,而且set讀取的列有哪些,隨後調用newAPIHadoopRDD,加載指定hbase的數據,固然你能夠加上各類filter。那麼下來 咱們看看newAPIHadoopRDD是幹了什麼呢?咱們來閱讀下里面的實現。code
能夠看到咱們調用API,其實就是一個input過程,建立了一個newHadoopRDD對象,那麼後臺是一個input數據隨後轉化爲RDD的過程。節點之間的數據傳輸是經過序列化數據,經過broadCast傳輸的conf信息。orm
三、隨之進行count驗證操做,查找數據的partition個數,hbase的數據固然是以block塊的形式存儲於HDFS。xml
四、下來開始map遍歷,取出以前咱們設置的字段,存入新的transRDD中,那麼這個map函數幹了什麼呢?它實際上是將原RDD所作的操做組織成一個function,建立一個MapPartitionsRDD。對象
五、下來咱們看下filter函數幹了什麼呢?blog
val calculateRDD = transRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null) //map轉換爲字段((身份證號,經度(保留兩位小數),緯度(保留兩位小數),電話號碼,時間段標誌),1),最後的1表明出現一次,用於後邊作累加 .map(data => { val locsp = data._2.split(",").take(2) val df = new DecimalFormat("######0.000") val hour = data._4.split(":")(0).toInt val datarange = if(hour >= 9 && hour <= 18) 1 else 0 ((data._1,df.format(locsp(0).toDouble),df.format(locsp(1).toDouble),data._3,datarange),1) })
這裏的filter是進行爲空判斷,咱們從源碼中能夠看到傳入的是一個布爾類型的變量,與map相同經過MapPartitionsRDD進行function的條件過濾,那麼也就是說,其實咱們能夠在map中直接提取咱們須要的數據,或者用filter進行爲空過濾,條件過濾。
六、隨後咱們要進行相同key值的合併,那麼,咱們開始使用reduceByKey:
//按key作reduce,value作累加 .reduceByKey(_ + _)
底層調用了combineByKeyWithClassTag,這裏的Partitioner參數咱們之因此沒有傳入,是由於在map的RDD中已包含該RDD的partitioner的信息。它內部的實現將map的結果調用了require先進行merge,隨後建立shuffleRDD.shuffleRDD就是最終reduce後的RDD。而後看不懂了。。。由於須要與整個流程相結合。因此後續繼續深刻~