Spark經常使用函數(源碼閱讀六)

  源碼層面整理下咱們經常使用的操做RDD數據處理與分析的函數,從而能更好的應用於工做中。apache

      鏈接Hbase,讀取hbase的過程,首先代碼以下:函數

def tableInitByTime(sc : SparkContext,tableName : String,columns : String,fromdate: Date,todate : Date) : RDD[(ImmutableBytesWritable,Result)] = {
      val configuration = HBaseConfiguration.create()
      configuration.addResource("hbase-site.xml ")
      configuration.set(TableInputFormat.INPUT_TABLE,tableName )
      val scan = new Scan
      //scan.setTimeRange(fromdate.getTime,todate.getTime)
      val column = columns.split(",")
      for(columnName <- column){
        scan.addColumn("f1".getBytes(),columnName.getBytes())
      }
      val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
      System.out.println(hbaseRDD.count())
      hbaseRDD
  }

  咱們來一點一點解析整個過程。oop

  一、val configuration = HBaseConfiguration.create()ui

  這個用過hbase的夥伴們都知道,加載配置文件,其實調用的是HBase的API,返回的RDD是個Configuration。加載的配置文件信息包含core-default.xml,core-site.xml,mapred-default.xml等。加載源碼以下:spa

  

  二、隨之設置表名信息,並聲明scan對象,而且set讀取的列有哪些,隨後調用newAPIHadoopRDD,加載指定hbase的數據,固然你能夠加上各類filter。那麼下來 咱們看看newAPIHadoopRDD是幹了什麼呢?咱們來閱讀下里面的實現。code

  

  能夠看到咱們調用API,其實就是一個input過程,建立了一個newHadoopRDD對象,那麼後臺是一個input數據隨後轉化爲RDD的過程。節點之間的數據傳輸是經過序列化數據,經過broadCast傳輸的conf信息。orm

  

  

  三、隨之進行count驗證操做,查找數據的partition個數,hbase的數據固然是以block塊的形式存儲於HDFS。xml

  

  四、下來開始map遍歷,取出以前咱們設置的字段,存入新的transRDD中,那麼這個map函數幹了什麼呢?它實際上是將原RDD所作的操做組織成一個function,建立一個MapPartitionsRDD。對象

  五、下來咱們看下filter函數幹了什麼呢?blog

 val calculateRDD = transRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null)
      //map轉換爲字段((身份證號,經度(保留兩位小數),緯度(保留兩位小數),電話號碼,時間段標誌),1),最後的1表明出現一次,用於後邊作累加
      .map(data => {
      val locsp = data._2.split(",").take(2)
      val df   = new DecimalFormat("######0.000")
      val hour = data._4.split(":")(0).toInt
      val datarange = if(hour >= 9 && hour <= 18) 1 else 0
      ((data._1,df.format(locsp(0).toDouble),df.format(locsp(1).toDouble),data._3,datarange),1)
    })

   這裏的filter是進行爲空判斷,咱們從源碼中能夠看到傳入的是一個布爾類型的變量,與map相同經過MapPartitionsRDD進行function的條件過濾,那麼也就是說,其實咱們能夠在map中直接提取咱們須要的數據,或者用filter進行爲空過濾,條件過濾。

  六、隨後咱們要進行相同key值的合併,那麼,咱們開始使用reduceByKey:

      //按key作reduce,value作累加
      .reduceByKey(_ + _)

  

  底層調用了combineByKeyWithClassTag,這裏的Partitioner參數咱們之因此沒有傳入,是由於在map的RDD中已包含該RDD的partitioner的信息。它內部的實現將map的結果調用了require先進行merge,隨後建立shuffleRDD.shuffleRDD就是最終reduce後的RDD。而後看不懂了。。。由於須要與整個流程相結合。因此後續繼續深刻~

  

相關文章
相關標籤/搜索