scala-spark練手--dataframe數據可視化初稿

成品:http://www.cnblogs.com/drawwindows/p/5640606.html

初稿:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, _}
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.collection.mutable.ArrayBuffer

object DataFrameVisiualize extends Logging {

  def runforstatistic(hiveContext: HiveContext, params: JSONObject) = {
    val arr = params.getJSONArray("targetType")
    var i = 0
    while( i < arr.size()){
      val obj = arr.getJSONObject(i)
      if("dataset".equalsIgnoreCase(obj.getString("targetType"))){
        val tableNameKey = obj.getString("targetName")
        val tableName = params.getString(tableNameKey)
        val user = params.getString("user")
        run(hiveContext, tableName, user)
      }
      i = i+1
    }
  }

  def run(hiveContext: HiveContext, tableName: String, user: String) = {
    val pathParent = s"/user/$user/mlaas/tableStatistic/$tableName"
//    val conf = new SparkConf().setAppName("DataFrameVisiualizeJob")
//    val sc = new SparkContext(conf)
//    val hiveContext = new HiveContext(sc)
//    val sqlContext = new SQLContext(sc)
    //0.獲取DB的schema信息
    val schemadf = hiveContext.sql("desc " + tableName)
    //schema信息落地
    val filePathSchema = pathParent + "/schemajson"
    schemadf.write.mode(SaveMode.Overwrite).format("json").save(filePathSchema)

    //1.加載表到dataframe
    val df = hiveContext.sql("select * from " + tableName)
    //2.獲取dataframe的describe信息,默認爲獲取到的都爲數值型列
    val dfdesc = df.describe()
    //    //3.描述信息落地
    //    val filePath = pathParent + "/describejson"
    //    des.write.mode(SaveMode.Overwrite).format("json").save(filePath)
    //    val dfdesc = sqlContext.read.format("json").load(filePath)

    //4.列信息區分爲mathColArr 和 strColArr
    val mathColArr = dfdesc.columns.filter(!_.equalsIgnoreCase("summary"))
    val (colMin, colMax, colMean, colStddev, colMedian) = getDesfromDF(dfdesc, mathColArr)
    val allColArr = df.columns

    val strColArr = allColArr.filter(!_.equalsIgnoreCase("summary")).diff(mathColArr)


    saveRecords(hiveContext, tableName, 100, pathParent + "/recordsjson")
    val jsonobj = getAllStatistics(hiveContext, tableName, allColArr, strColArr, mathColArr, 10, colMin, colMax)

    jsonobj.put("colMin", colMin)
    jsonobj.put("colMax", colMax)
    jsonobj.put("colMean", colMean)
    jsonobj.put("colStddev", colStddev)
    jsonobj.put("colMedian", colMedian)

    val jsonStr = jsonobj.toString
    val conf1 = new Configuration()
    val fs = FileSystem.get(conf1)
    val fileName = pathParent + "/jsonObj"
    val path = new Path(fileName)
    val hdfsOutStream = fs.create(path)
    hdfsOutStream.writeBytes(jsonStr)
    hdfsOutStream.flush()
    hdfsOutStream.close()
    //    fs.close();

  }

  def saveRecords(hiveContext: HiveContext, tableName: String, num: Int, filePath: String) : Unit = {
    hiveContext.sql(s"select * from $tableName limit $num").write.mode(SaveMode.Overwrite).format("json").save(filePath)
  }
  /**
    * 根據allCols, mathColArr, strColArr 三個數組,返回帶有全部統計信息(除去已經根據describe獲取到的)的dataframes。
    * 返回的dataframe結果進行遍歷,填充各個屬性的值。
    */
  def getAllStatistics(hiveContext: HiveContext, tableName: String, allColArr: Array[String], strColArr: Array[String], mathColArr: Array[String], partNum: Int, colMin: java.util.HashMap[String, Double], colMax: java.util.HashMap[String, Double]) :
  JSONObject = {
    val jsonobj = new JSONObject()
    val sb = new StringBuffer()
    sb.append("select ")
    for(col <- allColArr){
      sb.append(s"count(distinct($col)) as unique_$col , sum(case when $col is null then 1 else 0 end) as missing_$col, ")
    }
    sb.append(s"sum(1) as totalrows from $tableName")
    val df = hiveContext.sql(sb.toString)
    val colUnique = new java.util.HashMap[String, Long]//惟一值
    val colMissing = new java.util.HashMap[String, Long]//缺失值
    df.take(1).foreach(row => (jsonobj.put("totalrows", row.getAs[Long]("totalrows")),allColArr.foreach(col => (colUnique.put(col, row.getAs[Long]("unique_"+col)),colMissing.put(col, row.getAs[Long]("missing_"+col))) ) ))

    val dfArr = ArrayBuffer[DataFrame]()
    val strHistogramSql = new StringBuffer()
    strHistogramSql.append(s"""
         SELECT tta.colName, tta.value, tta.num
         FROM (
         SELECT ta.colName, ta.value, ta.num, ROW_NUMBER() OVER (PARTITION BY ta.colName ORDER BY ta.num DESC) AS row
         FROM (
      """)

    var vergin = 0
    for(col <- strColArr){
      if(vergin == 1){
        strHistogramSql.append(" UNION ALL ")
      }
      vergin = 1
      strHistogramSql.append(s"""
      SELECT 'StrHistogram_$col' AS colName, $col AS value, COUNT(1) AS num
      FROM $tableName
        GROUP BY $col """)
    }
    strHistogramSql.append(s"""
      ) ta
      ) tta
      WHERE tta.row <= $partNum
      """)
    val dfStrHistogram =  hiveContext.sql(strHistogramSql.toString)
    dfArr.append(dfStrHistogram)
    for(col <- mathColArr){
      val df1 =  hiveContext.sql(s"select 'Quartile_$col' as  colName, ntil, max($col) as  num from (select $col,  ntile(4) OVER (order by $col)as ntil from $tableName) tt group by ntil ")
      log.info("col is :" + col + ", min is :" + colMin.get(col) + ", max is : " + colMax.get(col))
      //need toString first, then toDouble。 or:ClassCastException
      val min = colMin.get(col).toString.toDouble
      val max = colMax.get(col).toString.toDouble
      val df2 =  getHistogramMathDF(col, hiveContext, tableName, min, max, partNum)
      dfArr.append(df1)
      dfArr.append(df2)
    }
    val dfAll = dfArr.reduce(_.unionAll(_))
    val allRows = dfAll.collect()
    val mathColMapQuartile = new java.util.HashMap[String, Array[java.util.HashMap[String,Long]]] //四分位
    val mathColMapHistogram = new java.util.HashMap[String, Array[java.util.HashMap[String,Long]]]//條形圖
    val strColMapHistogram = new java.util.HashMap[String, Array[java.util.HashMap[String,Long]]]//條形圖
    val (mathColMapQuartile1, mathColMapHistogram1, strColMapHistogram1) = readRows(allRows)
    for(col <- strColArr){
      strColMapHistogram.put(col,strColMapHistogram1.get(col).toArray[java.util.HashMap[String,Long]])
    }
    for(col <- mathColArr){
      mathColMapQuartile.put(col,mathColMapQuartile1.get(col).toArray[java.util.HashMap[String,Long]])
      mathColMapHistogram.put(col,mathColMapHistogram1.get(col).toArray[java.util.HashMap[String,Long]])
    }
    jsonobj.put("mathColMapQuartile", mathColMapQuartile)
    jsonobj.put("mathColMapHistogram", mathColMapHistogram)
    jsonobj.put("strColMapHistogram", strColMapHistogram)
    jsonobj.put("colUnique", colUnique)
    jsonobj.put("colMissing", colMissing)
    jsonobj
  }
  def readRows(rows: Array[Row]) : (java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]] , java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]], java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]])={
    val mathColMapQuartile = new java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]] //四分位
    val mathColMapHistogram = new java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]]//條形圖
    val strColMapHistogram = new java.util.HashMap[String, ArrayBuffer[java.util.HashMap[String,Long]]]//條形圖
    rows.foreach( row => {
      val colName = row.getAs[String]("colName")
      if (colName.startsWith("StrHistogram")) {
        val value = row.getAs[String](1)
        val num = row.getAs[Long](2)
        val map = new java.util.HashMap[String, Long]()
        val col = colName.substring(colName.indexOf('_') + 1)
        map.put(value, num)
        val mapValue = strColMapHistogram.get(col)
        if (mapValue == null) {
          val mapValueNew = ArrayBuffer[java.util.HashMap[String, Long]]()
          mapValueNew.append(map)
          strColMapHistogram.put(col, mapValueNew)
        } else {
          mapValue.append(map)
          strColMapHistogram.put(col, mapValue)
        }
      } else if (colName.toString.startsWith("Quartile")) {
        val value = row.getAs[String](1)
        val num = row.getAs[Long](2)
        val map = new java.util.HashMap[String, Long]()
        val col = colName.substring(colName.indexOf('_') + 1)
        map.put(value, num)
        val mapValue = mathColMapQuartile.get(col)
        if (mapValue == null) {
          val mapValueNew = ArrayBuffer[java.util.HashMap[String, Long]]()
          mapValueNew.append(map)
          mathColMapQuartile.put(col, mapValueNew)
        } else {
          mapValue.append(map)
          mathColMapQuartile.put(col, mapValue)
        }
      } else if (colName.toString.startsWith("MathHistogram")) {
        val value = row.getAs[String](1)
        val num = row.getAs[Long](2)
        val map = new java.util.HashMap[String, Long]()
        val col = colName.substring(colName.indexOf('_') + 1)
        map.put(value, num)
        val mapValue = mathColMapHistogram.get(col)
        if (mapValue == null) {
          val mapValueNew = ArrayBuffer[java.util.HashMap[String, Long]]()
          mapValueNew.append(map)
          mathColMapHistogram.put(col, mapValueNew)
        } else {
          mapValue.append(map)
          mathColMapHistogram.put(col, mapValue)
        }
      }
    })
    (mathColMapQuartile, mathColMapHistogram, strColMapHistogram)
  }
  /** 數值型的列的條形分佈獲取方法*/
  def getHistogramMathDF(col : String, hiveContext: HiveContext, tableName: String, min: Double, max: Double, partNum: Int) : DataFrame = {
    val len = (max - min) / partNum
    log.info(s"len is : $len")
    val sb = new StringBuffer()
    sb.append(s"select $col, (case ")
    val firstRight = min + len
    sb.append(s" when ($col >= $min and $col <= $firstRight) then 1 ")
    for (i <- 2 until (partNum + 1)) {
      val left = min + len * (i - 1)
      val right = min + len * i
      sb.append(s" when ($col > $left and $col <= $right) then $i ")
    }
    sb.append(s" else 0  end ) as partNum from $tableName")
    sb.insert(0, s"select 'MathHistogram_$col' as colName, partNum, count(1) as num from ( ")
    sb.append(") temptableScala group by partNum")
    log.info("getHistogram is: " + sb.toString)
    val df = hiveContext.sql(sb.toString)
    df
  }
  def getDesfromDF(dfdesc : DataFrame, mathColArr: Array[String]):
  (java.util.HashMap[String, Double], java.util.HashMap[String, Double], java.util.HashMap[String, Double], java.util.HashMap[String, Double], java.util.HashMap[String, Double])= {
    val allRows = dfdesc.collect()
    val colMin = new java.util.HashMap[String, Double]//最小值
    val colMax = new java.util.HashMap[String, Double]//最大值
    val colMean = new java.util.HashMap[String, Double]//平均值
    val colStddev = new java.util.HashMap[String, Double]//標準差
    val colMedian = new java.util.HashMap[String, Double]//中位值
    allRows.foreach(row => {
      val mapKey = row.getAs[String]("summary")
      for(col <- mathColArr){
        if("mean".equalsIgnoreCase(mapKey)){
          colMean.put(col, row.getAs[Double](col))
        }else if("stddev".equalsIgnoreCase(mapKey)){
          colStddev.put(col, row.getAs[Double](col))
        }else if("min".equalsIgnoreCase(mapKey)){
          log.info("col is " + col +", min is : "+ row.getAs[Double](col))
          colMin.put(col, row.getAs[Double](col))
        }else if("max".equalsIgnoreCase(mapKey)){
          log.info("col is " + col +", max is : "+ row.getAs[Double](col))
          colMax.put(col, row.getAs[Double](col))
        }else{
          colMedian.put(col, row.getAs[Double](col))
        }
      }
    })
    (colMin, colMax, colMean, colStddev, colMedian)
  }
}
相關文章
相關標籤/搜索