Apache Griffin定位爲大數據的數據質量監控工具,支持批處理數據源hive、text文件、avro文件和實時數據源kafka,而一些以關係型數據庫如mysql、oracle爲存儲的項目也一樣須要可配置化的數據質量監控工具,因此擴展griffin的mysql數據源就能夠爲項目的數據質量監控提供多一種選擇。前端
從上一篇文章apache griffin 中已經介紹了griffin的特性、執行流程及其架構,本文主要介紹一下其代碼結構及擴展數據源的簡單實現,先了解一下代碼結構:java
代碼主要分爲measure、service、ui三部分,measure爲spark定時任務代碼;service爲spring boot代碼,作web端配置和監控界面;ui爲前端angular js相關代碼和資源。mysql
擴展數據源主要實現代碼在measure模塊,下面以griffin項目中的demo讀取avro數據源的批處理爲實例介紹一下griffin如何讀取配置和選擇數據源:git
環境配置文件: env-batch.jsongithub
{ # spark 配置 "spark": { "log.level": "WARN", "config": { "spark.master": "local[*]" } }, # 對比結果輸出配置,console、hdfs、elasticsearch "sinks": [ { "type": "CONSOLE", "config": { "max.log.lines": 10 } }, { "type": "HDFS", "config": { "path": "hdfs://localhost/griffin/batch/persist", "max.persist.lines": 10000, "max.lines.per.file": 10000 } }, { "type": "ELASTICSEARCH", "config": { "method": "post", "api": "http://10.148.181.248:39200/griffin/accuracy", "connection.timeout": "1m", "retry": 10 } } ], "griffin.checkpoint": [] }
數據源配置文件:config-batch.jsonweb
{ # 任務名稱 "name": "accu_batch", # 任務類型,batch 或 streaming "process.type": "batch", # 數據源 和 數據對比目標 配置 "data.sources": [ { "name": "source", "baseline": true, "connectors": [ { "type": "avro", "version": "1.7", "config": { "file.name": "src/test/resources/users_info_src.avro" } } ] }, { "name": "target", "connectors": [ { "type": "avro", "version": "1.7", "config": { "file.name": "src/test/resources/users_info_target.avro" } } ] } ], # 數據校驗規則,這裏選擇 accuracy 準確性對比 "evaluate.rule": { "rules": [ { "dsl.type": "griffin-dsl", "dq.type": "accuracy", "out.dataframe.name": "accu", "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code" } ] }, # 數據對比結果輸出 控制檯和es "sinks": ["CONSOLE","ELASTICSEARCH"] }
package org.apache.griffin.measure import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param} import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.launch.batch.BatchDQApp import org.apache.griffin.measure.launch.streaming.StreamingDQApp /** * application entrance */ object Application extends Loggable { def main(args: Array[String]): Unit = { info(args.toString) if (args.length < 2) { error("Usage: class <env-param> <dq-param>") sys.exit(-1) } // 配置運行參數讀取 env-batch.json 和 config-batch.json val envParamFile = args(0) val dqParamFile = args(1) info(envParamFile) info(dqParamFile) // read param files val envParam = readParamFile[EnvConfig](envParamFile) match { case Success(p) => p case Failure(ex) => error(ex.getMessage, ex) sys.exit(-2) } val dqParam = readParamFile[DQConfig](dqParamFile) match { case Success(p) => p case Failure(ex) => error(ex.getMessage, ex) sys.exit(-2) } // 環境配置和數據源配置組合成 griffin配置 val allParam: GriffinConfig = GriffinConfig(envParam, dqParam) // 根據數據源配置選擇數據源 // 從數據源配置 process.type 獲得配置類型爲 batch val procType = ProcessType(allParam.getDqConfig.getProcType) val dqApp: DQApp = procType match { case BatchProcessType => BatchDQApp(allParam) case StreamingProcessType => StreamingDQApp(allParam) case _ => error(s"${procType} is unsupported process type!") sys.exit(-4) } startup // 初始化 griffin 定時任務執行環境 // 具體代碼見下個代碼塊,主要邏輯是建立 sparkSession 和註冊griffin自定義的spark udf dqApp.init match { case Success(_) => info("process init success") case Failure(ex) => error(s"process init error: ${ex.getMessage}", ex) shutdown sys.exit(-5) } // 執行定時任務,這裏根據配置是執行批處理任務 val success = dqApp.run match { case Success(result) => info("process run result: " + (if (result) "success" else "failed")) result case Failure(ex) => error(s"process run error: ${ex.getMessage}", ex) if (dqApp.retryable) { throw ex } else { shutdown sys.exit(-5) } } // 關閉定時任務 dqApp.close match { case Success(_) => info("process end success") case Failure(ex) => error(s"process end error: ${ex.getMessage}", ex) shutdown sys.exit(-5) } shutdown // 退出執行程序 if (!success) { sys.exit(-5) } } private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = { val paramReader = ParamReaderFactory.getParamReader(file) paramReader.readConfig[T] } private def startup(): Unit = { } private def shutdown(): Unit = { } }
批處理任務處理類spring
package org.apache.griffin.measure.launch.batch import java.util.Date import scala.util.Try import org.apache.spark.SparkConf import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.griffin.measure.configuration.dqdefinition._ import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context._ import org.apache.griffin.measure.datasource.DataSourceFactory import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent case class BatchDQApp(allParam: GriffinConfig) extends DQApp { val envParam: EnvConfig = allParam.getEnvConfig val dqParam: DQConfig = allParam.getDqConfig val sparkParam = envParam.getSparkParam val metricName = dqParam.getName // val dataSourceParams = dqParam.dataSources // val dataSourceNames = dataSourceParams.map(_.name) val sinkParams = getSinkParams var sqlContext: SQLContext = _ implicit var sparkSession: SparkSession = _ def retryable: Boolean = false // 初始化並建立sparkSession、註冊griffin自定義udf def init: Try[_] = Try { // build spark 2.0+ application context val conf = new SparkConf().setAppName(metricName) conf.setAll(sparkParam.getConfig) conf.set("spark.sql.crossJoin.enabled", "true") sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel) sqlContext = sparkSession.sqlContext // register udf GriffinUDFAgent.register(sqlContext) } // 定時任務執行方法 def run: Try[Boolean] = Try { // start time val startTime = new Date().getTime val measureTime = getMeasureTime val contextId = ContextId(measureTime) // get data sources // 根據配置獲取數據源,即config-batch.json的data.sources配置,讀取avro文件數據,有source和target兩個數據源 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources) // 數據源初始化 dataSources.foreach(_.init) // 建立griffin執行上下文 val dqContext: DQContext = DQContext( contextId, metricName, dataSources, sinkParams, BatchProcessType )(sparkSession) // 根據配置,輸入結果到 console 和 elasticsearch val applicationId = sparkSession.sparkContext.applicationId dqContext.getSink().start(applicationId) // 建立數據檢查對比job val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule) // 執行數據對比job,根據在web端配置的步驟執行,demo主要執行配置中的rule sql,將執行結果寫入sink中 val result = dqJob.execute(dqContext) // 打印本次檢查結束時間 val endTime = new Date().getTime dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms") // 關閉griffin context dqContext.clean() // 輸出結束標記 dqContext.getSink().finish() result } def close: Try[_] = Try { sparkSession.close() sparkSession.stop() } }
到這裏,對於measure的代碼執行順序已經作了一個簡單說明,仔細看的同窗不難發現,其實執行過程並不複雜,代碼邏輯的比較清晰;sql
其中,本文關注的數據建立主要在:BatchDQApp 類的 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
;數據庫
咱們看下DataSourceFactory類的代碼:apache
package org.apache.griffin.measure.datasource import scala.util.Success import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory} object DataSourceFactory extends Loggable { def getDataSources(sparkSession: SparkSession, ssc: StreamingContext, dataSources: Seq[DataSourceParam] ): Seq[DataSource] = { dataSources.zipWithIndex.flatMap { pair => val (param, index) = pair getDataSource(sparkSession, ssc, param, index) } } private def getDataSource(sparkSession: SparkSession, ssc: StreamingContext, dataSourceParam: DataSourceParam, index: Int ): Option[DataSource] = { val name = dataSourceParam.getName val connectorParams = dataSourceParam.getConnectors val timestampStorage = TimestampStorage() // streaming 數據緩存 val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt( sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage) // 獲取數源鏈接 val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => // 從鏈接工廠獲取鏈接 DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, timestampStorage, streamingCacheClientOpt) match { case Success(connector) => Some(connector) case _ => None } } Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt)) } }
DataConnectorFactory 數據源鏈接工廠
package org.apache.griffin.measure.datasource.connector import scala.util.Try import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.datasource.connector.batch._ import org.apache.griffin.measure.datasource.connector.streaming._ object DataConnectorFactory extends Loggable { val HiveRegex = """^(?i)hive$""".r val AvroRegex = """^(?i)avro$""".r val TextDirRegex = """^(?i)text-dir$""".r val KafkaRegex = """^(?i)kafka$""".r val CustomRegex = """^(?i)custom$""".r /** * create data connector * @param sparkSession spark env * @param ssc spark streaming env * @param dcParam data connector param * @param tmstCache same tmst cache in one data source * @param streamingCacheClientOpt for streaming cache * @return data connector */ def getDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): Try[DataConnector] = { val conType = dcParam.getType val version = dcParam.getVersion Try { // 數據源映射 conType match { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case KafkaRegex() => getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("connector creation error!") } } } private def getStreamingDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): StreamingDataConnector = { if (ssc == null) throw new Exception("streaming context is null!") val conType = dcParam.getType val version = dcParam.getVersion conType match { case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("streaming connector creation error!") } } // 自定義數據源標識方法 private def getCustomConnector(session: SparkSession, context: StreamingContext, param: DataConnectorParam, storage: TimestampStorage, maybeClient: Option[StreamingCacheClient]): DataConnector = { val className = param.getConfig("class").asInstanceOf[String] val cls = Class.forName(className) if (classOf[BatchDataConnector].isAssignableFrom(cls)) { val ctx = BatchDataConnectorContext(session, param, storage) val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext]) meth.invoke(null, ctx).asInstanceOf[BatchDataConnector] } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) { val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient) val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext]) meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector] } else { throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector") } } private def getKafkaDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): KafkaStreamingDataConnector = { val KeyType = "key.type" val ValueType = "value.type" val config = dcParam.getConfig val keyType = config.getOrElse(KeyType, "java.lang.String").toString val valueType = config.getOrElse(ValueType, "java.lang.String").toString (keyType, valueType) match { case ("java.lang.String", "java.lang.String") => KafkaStreamingStringDataConnector( sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("not supported type kafka data connector") } } }
看到這裏,相信你們都已經知道數據源建立的方法,這裏對數據源配置作一個映射,運行時獲得相應的數據,demo選擇avro數據源,咱們接着看看AvroBatchDataConnector的實現:
package org.apache.griffin.measure.datasource.connector.batch import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.HdfsUtil import org.apache.griffin.measure.utils.ParamUtil._ /** * batch data connector for avro file */ case class AvroBatchDataConnector(@transient sparkSession: SparkSession, dcParam: DataConnectorParam, timestampStorage: TimestampStorage ) extends BatchDataConnector { val config = dcParam.getConfig val FilePath = "file.path" val FileName = "file.name" val filePath = config.getString(FilePath, "") val fileName = config.getString(FileName, "") val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName private def pathPrefix(): Boolean = { filePath.nonEmpty } private def fileExist(): Boolean = { HdfsUtil.existPath(concreteFileFullPath) } def data(ms: Long): (Option[DataFrame], TimeRange) = { val dfOpt = try { val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { case e: Throwable => error(s"load avro file ${concreteFileFullPath} fails", e) None } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) } }
跟着代碼能夠看到 AvroBatchDataConnector 實現了 DataConnector 接口,主要實現了data 從文件獲取數據的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
;
相似的,咱們看看griffin默認數據源hive的數據源實現方式:
package org.apache.griffin.measure.datasource.connector.batch import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.ParamUtil._ /** * batch data connector for hive table */ case class HiveBatchDataConnector(@transient sparkSession: SparkSession, dcParam: DataConnectorParam, timestampStorage: TimestampStorage ) extends BatchDataConnector { val config = dcParam.getConfig val Database = "database" val TableName = "table.name" val Where = "where" val database = config.getString(Database, "default") val tableName = config.getString(TableName, "") val whereString = config.getString(Where, "") val concreteTableName = s"${database}.${tableName}" val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty) def data(ms: Long): (Option[DataFrame], TimeRange) = { val dfOpt = try { val dtSql = dataSql info(dtSql) val df = sparkSession.sql(dtSql) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { case e: Throwable => error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e) None } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) } private def tableExistsSql(): String = { // s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql s"tableName LIKE '${tableName}'" } private def metaDataSql(): String = { s"DESCRIBE ${concreteTableName}" } private def dataSql(): String = { val tableClause = s"SELECT * FROM ${concreteTableName}" if (wheres.length > 0) { val clauses = wheres.map { w => s"${tableClause} WHERE ${w}" } clauses.mkString(" UNION ALL ") } else tableClause } }
hive數據源鏈接的實現是否是看上去比較簡單,從配置文件中獲得源表、目標表和對比sql,由sparkSession.sql執行val df = sparkSession.sql(dtSql)
,返回對比結果數據; 熟悉spark的同窗看到這裏,大概已經想到,擴展一個mysql數據源已經不是很難的事情了,由於spark sql支持mysql數據源。
因爲各類緣由,實現代碼及demo下回補上。