Appache Griffin 擴展Mysql數據源

概述

Apache Griffin定位爲大數據的數據質量監控工具,支持批處理數據源hive、text文件、avro文件和實時數據源kafka,而一些以關係型數據庫如mysql、oracle爲存儲的項目也一樣須要可配置化的數據質量監控工具,因此擴展griffin的mysql數據源就能夠爲項目的數據質量監控提供多一種選擇。前端

代碼結構

從上一篇文章apache griffin 中已經介紹了griffin的特性、執行流程及其架構,本文主要介紹一下其代碼結構及擴展數據源的簡單實現,先了解一下代碼結構:java

代碼主要分爲measure、service、ui三部分,measure爲spark定時任務代碼;service爲spring boot代碼,作web端配置和監控界面;ui爲前端angular js相關代碼和資源。mysql

擴展數據源主要實現代碼在measure模塊,下面以griffin項目中的demo讀取avro數據源的批處理爲實例介紹一下griffin如何讀取配置和選擇數據源:git

  1. 批處理avro文件數據源配置、measure模塊執行環境配置

環境配置文件: env-batch.jsongithub

{
  # spark 配置
  "spark": {
    "log.level": "WARN",
    "config": {
      "spark.master": "local[*]"
    }
  },
  
  # 對比結果輸出配置,console、hdfs、elasticsearch
  "sinks": [
    {
      "type": "CONSOLE",
      "config": {
        "max.log.lines": 10
      }
    },
    {
      "type": "HDFS",
      "config": {
        "path": "hdfs://localhost/griffin/batch/persist",
        "max.persist.lines": 10000,
        "max.lines.per.file": 10000
      }
    },
    {
      "type": "ELASTICSEARCH",
      "config": {
        "method": "post",
        "api": "http://10.148.181.248:39200/griffin/accuracy",
        "connection.timeout": "1m",
        "retry": 10
      }
    }
  ],

  "griffin.checkpoint": []
}

數據源配置文件:config-batch.jsonweb

{
  # 任務名稱
  "name": "accu_batch",
  # 任務類型,batch 或 streaming
  "process.type": "batch",
  # 數據源 和 數據對比目標 配置
  "data.sources": [
    {
      "name": "source",
      "baseline": true,
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "src/test/resources/users_info_src.avro"
          }
        }
      ]
    }, {
      "name": "target",
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "src/test/resources/users_info_target.avro"
          }
        }
      ]
    }
  ],
  # 數據校驗規則,這裏選擇 accuracy 準確性對比
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code"
      }
    ]
  },
  # 數據對比結果輸出 控制檯和es
  "sinks": ["CONSOLE","ELASTICSEARCH"]
}
  1. measure 模塊代碼入口及簡單說明
package org.apache.griffin.measure

import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp


/**
  * application entrance
  */
object Application extends Loggable {

  def main(args: Array[String]): Unit = {
    info(args.toString)
    if (args.length < 2) {
      error("Usage: class <env-param> <dq-param>")
      sys.exit(-1)
    }
    
    // 配置運行參數讀取 env-batch.json 和 config-batch.json
    val envParamFile = args(0)
    val dqParamFile = args(1)

    info(envParamFile)
    info(dqParamFile)

    // read param files
    val envParam = readParamFile[EnvConfig](envParamFile) match {
      case Success(p) => p
      case Failure(ex) =>
        error(ex.getMessage, ex)
        sys.exit(-2)
    }
    val dqParam = readParamFile[DQConfig](dqParamFile) match {
      case Success(p) => p
      case Failure(ex) =>
        error(ex.getMessage, ex)
        sys.exit(-2)
    }

    // 環境配置和數據源配置組合成 griffin配置
    val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)

    // 根據數據源配置選擇數據源
    // 從數據源配置 process.type 獲得配置類型爲 batch
    val procType = ProcessType(allParam.getDqConfig.getProcType)
    val dqApp: DQApp = procType match {
      case BatchProcessType => BatchDQApp(allParam)
      case StreamingProcessType => StreamingDQApp(allParam)
      case _ =>
        error(s"${procType} is unsupported process type!")
        sys.exit(-4)
    }

    startup

    // 初始化 griffin 定時任務執行環境
    // 具體代碼見下個代碼塊,主要邏輯是建立 sparkSession 和註冊griffin自定義的spark udf
    dqApp.init match {
      case Success(_) =>
        info("process init success")
      case Failure(ex) =>
        error(s"process init error: ${ex.getMessage}", ex)
        shutdown
        sys.exit(-5)
    }

    // 執行定時任務,這裏根據配置是執行批處理任務
    val success = dqApp.run match {
      case Success(result) =>
        info("process run result: " + (if (result) "success" else "failed"))
        result

      case Failure(ex) =>
        error(s"process run error: ${ex.getMessage}", ex)

        if (dqApp.retryable) {
          throw ex
        } else {
          shutdown
          sys.exit(-5)
        }
    }

    // 關閉定時任務
    dqApp.close match {
      case Success(_) =>
        info("process end success")
      case Failure(ex) =>
        error(s"process end error: ${ex.getMessage}", ex)
        shutdown
        sys.exit(-5)
    }

    shutdown
    // 退出執行程序
    if (!success) {
      sys.exit(-5)
    }
  }

  private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
    val paramReader = ParamReaderFactory.getParamReader(file)
    paramReader.readConfig[T]
  }

  private def startup(): Unit = {
  }

  private def shutdown(): Unit = {
  }

}

批處理任務處理類spring

package org.apache.griffin.measure.launch.batch

import java.util.Date

import scala.util.Try

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, SQLContext}

import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent


case class BatchDQApp(allParam: GriffinConfig) extends DQApp {

  val envParam: EnvConfig = allParam.getEnvConfig
  val dqParam: DQConfig = allParam.getDqConfig

  val sparkParam = envParam.getSparkParam
  val metricName = dqParam.getName
//  val dataSourceParams = dqParam.dataSources
//  val dataSourceNames = dataSourceParams.map(_.name)
  val sinkParams = getSinkParams

  var sqlContext: SQLContext = _

  implicit var sparkSession: SparkSession = _

  def retryable: Boolean = false

  // 初始化並建立sparkSession、註冊griffin自定義udf
  def init: Try[_] = Try {
    // build spark 2.0+ application context
    val conf = new SparkConf().setAppName(metricName)
    conf.setAll(sparkParam.getConfig)
    conf.set("spark.sql.crossJoin.enabled", "true")
    sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
    sqlContext = sparkSession.sqlContext

    // register udf
    GriffinUDFAgent.register(sqlContext)
  }

  // 定時任務執行方法
  def run: Try[Boolean] = Try {
    // start time
    val startTime = new Date().getTime

    val measureTime = getMeasureTime
    val contextId = ContextId(measureTime)

    // get data sources
    // 根據配置獲取數據源,即config-batch.json的data.sources配置,讀取avro文件數據,有source和target兩個數據源
    val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
    // 數據源初始化
    dataSources.foreach(_.init)

    // 建立griffin執行上下文
    val dqContext: DQContext = DQContext(
      contextId, metricName, dataSources, sinkParams, BatchProcessType
    )(sparkSession)

    // 根據配置,輸入結果到 console 和 elasticsearch
    val applicationId = sparkSession.sparkContext.applicationId
    dqContext.getSink().start(applicationId)

    // 建立數據檢查對比job
    val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)

    // 執行數據對比job,根據在web端配置的步驟執行,demo主要執行配置中的rule sql,將執行結果寫入sink中
    val result = dqJob.execute(dqContext)

    // 打印本次檢查結束時間
    val endTime = new Date().getTime
    dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")

    // 關閉griffin context
    dqContext.clean()

    // 輸出結束標記
    dqContext.getSink().finish()

    result
  }

  def close: Try[_] = Try {
    sparkSession.close()
    sparkSession.stop()
  }

}

到這裏,對於measure的代碼執行順序已經作了一個簡單說明,仔細看的同窗不難發現,其實執行過程並不複雜,代碼邏輯的比較清晰;sql

其中,本文關注的數據建立主要在:BatchDQApp 類的 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)數據庫

咱們看下DataSourceFactory類的代碼:apache

package org.apache.griffin.measure.datasource

import scala.util.Success

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext

import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}


object DataSourceFactory extends Loggable {

  def getDataSources(sparkSession: SparkSession,
                     ssc: StreamingContext,
                     dataSources: Seq[DataSourceParam]
                    ): Seq[DataSource] = {
    dataSources.zipWithIndex.flatMap { pair =>
      val (param, index) = pair
      getDataSource(sparkSession, ssc, param, index)
    }
  }

  private def getDataSource(sparkSession: SparkSession,
                            ssc: StreamingContext,
                            dataSourceParam: DataSourceParam,
                            index: Int
                           ): Option[DataSource] = {
    val name = dataSourceParam.getName
    val connectorParams = dataSourceParam.getConnectors
    val timestampStorage = TimestampStorage()

    // streaming 數據緩存
    val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
      sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)

    // 獲取數源鏈接
    val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
      // 從鏈接工廠獲取鏈接
      DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
        timestampStorage, streamingCacheClientOpt) match {
          case Success(connector) => Some(connector)
          case _ => None
        }
    }

    Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
  }

}

DataConnectorFactory 數據源鏈接工廠

package org.apache.griffin.measure.datasource.connector

import scala.util.Try

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext

import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.batch._
import org.apache.griffin.measure.datasource.connector.streaming._


object DataConnectorFactory extends Loggable {

  val HiveRegex = """^(?i)hive$""".r
  val AvroRegex = """^(?i)avro$""".r
  val TextDirRegex = """^(?i)text-dir$""".r

  val KafkaRegex = """^(?i)kafka$""".r

  val CustomRegex = """^(?i)custom$""".r

  /**
    * create data connector
    * @param sparkSession     spark env
    * @param ssc              spark streaming env
    * @param dcParam          data connector param
    * @param tmstCache        same tmst cache in one data source
    * @param streamingCacheClientOpt   for streaming cache
    * @return   data connector
    */
  def getDataConnector(sparkSession: SparkSession,
                       ssc: StreamingContext,
                       dcParam: DataConnectorParam,
                       tmstCache: TimestampStorage,
                       streamingCacheClientOpt: Option[StreamingCacheClient]
                      ): Try[DataConnector] = {
    val conType = dcParam.getType
    val version = dcParam.getVersion
    Try {
      // 數據源映射
      conType match {
        case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
        case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
        case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
        case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
        case KafkaRegex() =>
          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
        case _ => throw new Exception("connector creation error!")
      }
    }
  }

  private def getStreamingDataConnector(sparkSession: SparkSession,
                                        ssc: StreamingContext,
                                        dcParam: DataConnectorParam,
                                        tmstCache: TimestampStorage,
                                        streamingCacheClientOpt: Option[StreamingCacheClient]
                                       ): StreamingDataConnector = {
    if (ssc == null) throw new Exception("streaming context is null!")
    val conType = dcParam.getType
    val version = dcParam.getVersion
    conType match {
      case KafkaRegex() =>
        getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
      case _ => throw new Exception("streaming connector creation error!")
    }
  }
   
  // 自定義數據源標識方法
  private def getCustomConnector(session: SparkSession,
                                 context: StreamingContext,
                                 param: DataConnectorParam,
                                 storage: TimestampStorage,
                                 maybeClient: Option[StreamingCacheClient]): DataConnector = {
    val className = param.getConfig("class").asInstanceOf[String]
    val cls = Class.forName(className)
    if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
      val ctx = BatchDataConnectorContext(session, param, storage)
      val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext])
      meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
    } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
      val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient)
      val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext])
      meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
    } else {
      throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector")
    }
  }

  private def getKafkaDataConnector(sparkSession: SparkSession,
                                    ssc: StreamingContext,
                                    dcParam: DataConnectorParam,
                                    tmstCache: TimestampStorage,
                                    streamingCacheClientOpt: Option[StreamingCacheClient]
                                   ): KafkaStreamingDataConnector = {
    val KeyType = "key.type"
    val ValueType = "value.type"
    val config = dcParam.getConfig
    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
    val valueType = config.getOrElse(ValueType, "java.lang.String").toString

    (keyType, valueType) match {
      case ("java.lang.String", "java.lang.String") =>
        KafkaStreamingStringDataConnector(
          sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
      case _ =>
        throw new Exception("not supported type kafka data connector")
    }
  }
}

看到這裏,相信你們都已經知道數據源建立的方法,這裏對數據源配置作一個映射,運行時獲得相應的數據,demo選擇avro數據源,咱們接着看看AvroBatchDataConnector的實現:

package org.apache.griffin.measure.datasource.connector.batch

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
import org.apache.griffin.measure.utils.ParamUtil._

/**
  * batch data connector for avro file
  */
case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
                                  dcParam: DataConnectorParam,
                                  timestampStorage: TimestampStorage
                                 ) extends BatchDataConnector {

  val config = dcParam.getConfig

  val FilePath = "file.path"
  val FileName = "file.name"

  val filePath = config.getString(FilePath, "")
  val fileName = config.getString(FileName, "")

  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName

  private def pathPrefix(): Boolean = {
    filePath.nonEmpty
  }

  private def fileExist(): Boolean = {
    HdfsUtil.existPath(concreteFileFullPath)
  }

  def data(ms: Long): (Option[DataFrame], TimeRange) = {
    val dfOpt = try {
      val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
      val dfOpt = Some(df)
      val preDfOpt = preProcess(dfOpt, ms)
      preDfOpt
    } catch {
      case e: Throwable =>
        error(s"load avro file ${concreteFileFullPath} fails", e)
        None
    }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }
}

跟着代碼能夠看到 AvroBatchDataConnector 實現了 DataConnector 接口,主要實現了data 從文件獲取數據的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)

相似的,咱們看看griffin默認數據源hive的數據源實現方式:

package org.apache.griffin.measure.datasource.connector.batch

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._

/**
  * batch data connector for hive table
  */
case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
                                  dcParam: DataConnectorParam,
                                  timestampStorage: TimestampStorage
                                 ) extends BatchDataConnector {

  val config = dcParam.getConfig

  val Database = "database"
  val TableName = "table.name"
  val Where = "where"

  val database = config.getString(Database, "default")
  val tableName = config.getString(TableName, "")
  val whereString = config.getString(Where, "")

  val concreteTableName = s"${database}.${tableName}"
  val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)

  def data(ms: Long): (Option[DataFrame], TimeRange) = {
    val dfOpt = try {
      val dtSql = dataSql
      info(dtSql)
      val df = sparkSession.sql(dtSql)
      val dfOpt = Some(df)
      val preDfOpt = preProcess(dfOpt, ms)
      preDfOpt
    } catch {
      case e: Throwable =>
        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
        None
    }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }


  private def tableExistsSql(): String = {
//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
    s"tableName LIKE '${tableName}'"
  }

  private def metaDataSql(): String = {
    s"DESCRIBE ${concreteTableName}"
  }

  private def dataSql(): String = {
    val tableClause = s"SELECT * FROM ${concreteTableName}"
    if (wheres.length > 0) {
      val clauses = wheres.map { w =>
        s"${tableClause} WHERE ${w}"
      }
      clauses.mkString(" UNION ALL ")
    } else tableClause
  }

}

hive數據源鏈接的實現是否是看上去比較簡單,從配置文件中獲得源表、目標表和對比sql,由sparkSession.sql執行val df = sparkSession.sql(dtSql),返回對比結果數據; 熟悉spark的同窗看到這裏,大概已經想到,擴展一個mysql數據源已經不是很難的事情了,由於spark sql支持mysql數據源。

擴展MySQL數據源思路

  1. 在配置文件中添加mysql配置
  2. DataConnectorFactory添加相應的數據源映射
  3. 新增MySQLBatchDataConnector實現BatchDataConnector接口
  4. 考慮分庫分表數據源讀取方式

因爲各類緣由,實現代碼及demo下回補上。

by 賴澤坤@vipshop.com

相關資源: https://github.com/apache/griffin

相關文章
相關標籤/搜索