sparksql工程小記

  最近作一個oracle項目遷移工做,跟着spark架構師學着作,進行一些方法的總結。mysql

  一、首先,建立SparkSession對象(老版本爲sparkContext)sql

  val session = SparkSession.builder().appName("app1").getOrCreate()數據庫

  二、數據的更新時間配置表,選用mysql,就是說每次結果數據計算寫入mysql後,還會將這次數據的更新時間寫入數據配置表。 那麼在代碼裏,須要建立配置表的case class,配置與構造數據庫schema信息,url,用戶名密碼等,隨後根據配置表中的不一樣app進行數據的過濾。緩存

  val appId = "1"session

  case class DBInformation(url:Stirng,schema:String,user:String,passwd:String)架構

  val mysqlDB = DBInformation("jdbc:mysql://....",schema,user,passowrd)oracle

  val tableName = mysqlDB.schema + "." + nameapp

  val props = new Properties()函數

  props.setProperty("user",mysqlDB.user)ui

  props.setProperty("password",mysqlDB.passwd)

  props.setProperty(JDBCOptions.JDBC_DRIVER_CLASS,"com.mysql.jdbc.Driver")

  val record = session.read.jdbc(mysqlDB.url,tableName,props).filter(row => row.getAs[Int]("app_id") == appId).take(1)

  //第一次寫入,木有數據

  if(0 == record.size){

    DBInfoMation(null,null,null)

  }else{

    DBInfoMation(record(0).getTimestmap(1),recode(0).getTimestamp(2),recode(0)..getTimestamp(3))  

  三、註冊UDF,因爲原來是用oracle的語法,現現在轉爲sparksql,須要註冊一些UDF,來兼容原有oracle的函數

  def registerUDF(session:SparkSession) : Unit = {

    session.udf.register("UDF",(value : String,modifieds:Array[String) => {

      val filter = modifieds.filter(_!=null)

      if(!filter.isEmpty){

        filter.max

      }else{

        null

      }

     })

   {

  四、不少計算是須要過往的歷史數據的,在第一次初始化的時候,先對歷史數據進行緩存。這裏有個知識點,會將一直計算的同步數據進行checkPoint落地磁盤,若是發現歷史時間在同步時間以後,則加載歷史數據,不然就加載同步數據。

  val (updateTime,initData) = if(historyTime.after(syncTime)){

    (historyTime,initFromHistory(tableName))

  } else {

    (syncTime,initFromCheckPoint(syncTime))

  }

  //記錄schema

  schema = initData.schema

  //baseData爲緩存在內存的數據,並根據數據量進行repartition

  baseData = initData.repartition(numPartitions,_partitionColumns.map(new Column()):_*).rdd.persisit(storageLevel)

  //觸發action動做

  baseData.foreach(_=>Unit)

  五、有一種狀況,下游三個表要關聯生成一張大表,這三張表的數據來源於消息中間件中的三個topic,可是數據可能不是同時到來,那麼就須要將歷史加載的大表拆根據ID拆分爲三個小表,而後逐個append到三個小表上,隨後再根據ID關聯起來,再組成最終表。

  val table1 = new createUpdatingTable(session,"tableName1",topicConf,numPartitons,...)

  val table2 = new createUpdatingTable (session,"tableName2",topicConf1,numPartitions,...)

  val table3 = new createUpdatingTable(session,"tableName3","topicConf2,numPartitions,...)

  val mergeBaseTable = (session,"mergeTableName",Array(table1,table2,table3),finallyColumn,finallyPartitions...)

  mergeBaseTable.updateAndGetData(Some(genDataFilter(currentTime)))

  //三表拆分與合併

  val tmpPartitionKey = "pd_code"

  if(baseData != null) {

    val oldData = getOldData(baseData,keyDF.rdd,tmpPartitionKey)

    oldDf = session.createDataFrame(oldData,schema)

    .repartition(numPartitions,new Column(tmpPartitionKey))

    .persist(storageLevel)

  }

  val table1 = updateShardTable(oldDf,inDfs(0)...).sparksession.createDataFrame(data,schema)

  val table2 = ....

  val table3 = ....

  

  六、三表key進行合併,經過sql進行三來源表合併

  val keySet = keys.collect()

  val broadcastKeys = session.sparkContext.broadCast(keySet)

  baseData.mapPartitions({iter =>

    val set = broadcastKey.value.toSet

    iter.filter(row=>set.contains(row.getAs[Any](keyCol)))

  },true)

  val sql ="select a.column,b.column,c.column.... from table1 a left join table2 b on a.pd_code = b.pd_code......

  val finallyTable = session.sql(sql)

 

  七、從歷史數據中篩選出這次須要更新的數據(經過ID進行過濾),隨後將新數據進行append

  val new Data = baseData.zipPartitions(updateData,true){case(liter,riter)=>

    val rset = new mutable.HashSet[Any]

    for(row <- riter){

      rset.add(row.getAs[Any](keyCol))

    }

    liter.filter(row=>!rset.contains(row.getAs[Any](keyCol))))

    }.zipPartitions(updateData,true){case (liter,riter)=>

      liter++riter

    }.persisit(storageLevel)

相關文章
相關標籤/搜索