概述
Apache Griffin定位为大数据的数据质量监控工具,支持批处理数据源hive、text文件、avro文件和实时数据源kafka,而一些以关系型数据库如mysql、oracle为存储的项目也同样需要可配置化的数据质量监控工具,所以扩展griffin的mysql数据源就可以为项目的数据质量监控提供多一种选择。
代码结构
从上一篇文章apache griffin 中已经介绍了griffin的特性、执行流程及其架构,本文主要介绍一下其代码结构及扩展数据源的简单实现,先了解一下代码结构:
代码主要分为measure、service、ui三部分,measure为spark定时任务代码;service为spring boot代码,做web端配置和监控界面;ui为前端angular js相关代码和资源。
扩展数据源主要实现代码在measure模块,下面以griffin项目中的demo读取avro数据源的批处理为实例介绍一下griffin如何读取配置和选择数据源:
- 批处理avro文件数据源配置、measure模块执行环境配置
环境配置文件: env-batch.json
{
# spark 配置
"spark": {
"log.level": "WARN",
"config": {
"spark.master": "local[*]"
}
},
# 对比结果输出配置,console、hdfs、elasticsearch
"sinks": [
{
"type": "CONSOLE",
"config": {
"max.log.lines": 10
}
},
{
"type": "HDFS",
"config": {
"path": "hdfs://localhost/griffin/batch/persist",
"max.persist.lines": 10000,
"max.lines.per.file": 10000
}
},
{
"type": "ELASTICSEARCH",
"config": {
"method": "post",
"api": "http://10.148.181.248:39200/griffin/accuracy",
"connection.timeout": "1m",
"retry": 10
}
}
],
"griffin.checkpoint": []
}
数据源配置文件:config-batch.json
{
# 任务名称
"name": "accu_batch",
# 任务类型,batch 或 streaming
"process.type": "batch",
# 数据源 和 数据对比目标 配置
"data.sources": [
{
"name": "source",
"baseline": true,
"connectors": [
{
"type": "avro",
"version": "1.7",
"config": {
"file.name": "src/test/resources/users_info_src.avro"
}
}
]
}, {
"name": "target",
"connectors": [
{
"type": "avro",
"version": "1.7",
"config": {
"file.name": "src/test/resources/users_info_target.avro"
}
}
]
}
],
# 数据校验规则,这里选择 accuracy 准确性对比
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "accuracy",
"out.dataframe.name": "accu",
"rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code"
}
]
},
# 数据对比结果输出 控制台和es
"sinks": ["CONSOLE","ELASTICSEARCH"]
}
- measure 模块代码入口及简单说明
package org.apache.griffin.measure
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
/**
* application entrance
*/
object Application extends Loggable {
def main(args: Array[String]): Unit = {
info(args.toString)
if (args.length < 2) {
error("Usage: class <env-param> <dq-param>")
sys.exit(-1)
}
// 配置运行参数读取 env-batch.json 和 config-batch.json
val envParamFile = args(0)
val dqParamFile = args(1)
info(envParamFile)
info(dqParamFile)
// read param files
val envParam = readParamFile[EnvConfig](envParamFile) match {
case Success(p) => p
case Failure(ex) =>
error(ex.getMessage, ex)
sys.exit(-2)
}
val dqParam = readParamFile[DQConfig](dqParamFile) match {
case Success(p) => p
case Failure(ex) =>
error(ex.getMessage, ex)
sys.exit(-2)
}
// 环境配置和数据源配置组合成 griffin配置
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
// 根据数据源配置选择数据源
// 从数据源配置 process.type 得到配置类型为 batch
val procType = ProcessType(allParam.getDqConfig.getProcType)
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
case _ =>
error(s"${procType} is unsupported process type!")
sys.exit(-4)
}
startup
// 初始化 griffin 定时任务执行环境
// 具体代码见下个代码块,主要逻辑是创建 sparkSession 和注册griffin自定义的spark udf
dqApp.init match {
case Success(_) =>
info("process init success")
case Failure(ex) =>
error(s"process init error: ${ex.getMessage}", ex)
shutdown
sys.exit(-5)
}
// 执行定时任务,这里根据配置是执行批处理任务
val success = dqApp.run match {
case Success(result) =>
info("process run result: " + (if (result) "success" else "failed"))
result
case Failure(ex) =>
error(s"process run error: ${ex.getMessage}", ex)
if (dqApp.retryable) {
throw ex
} else {
shutdown
sys.exit(-5)
}
}
// 关闭定时任务
dqApp.close match {
case Success(_) =>
info("process end success")
case Failure(ex) =>
error(s"process end error: ${ex.getMessage}", ex)
shutdown
sys.exit(-5)
}
shutdown
// 退出执行程序
if (!success) {
sys.exit(-5)
}
}
private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
val paramReader = ParamReaderFactory.getParamReader(file)
paramReader.readConfig[T]
}
private def startup(): Unit = {
}
private def shutdown(): Unit = {
}
}
批处理任务处理类
package org.apache.griffin.measure.launch.batch
import java.util.Date
import scala.util.Try
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
val envParam: EnvConfig = allParam.getEnvConfig
val dqParam: DQConfig = allParam.getDqConfig
val sparkParam = envParam.getSparkParam
val metricName = dqParam.getName
// val dataSourceParams = dqParam.dataSources
// val dataSourceNames = dataSourceParams.map(_.name)
val sinkParams = getSinkParams
var sqlContext: SQLContext = _
implicit var sparkSession: SparkSession = _
def retryable: Boolean = false
// 初始化并创建sparkSession、注册griffin自定义udf
def init: Try[_] = Try {
// build spark 2.0+ application context
val conf = new SparkConf().setAppName(metricName)
conf.setAll(sparkParam.getConfig)
conf.set("spark.sql.crossJoin.enabled", "true")
sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
sqlContext = sparkSession.sqlContext
// register udf
GriffinUDFAgent.register(sqlContext)
}
// 定时任务执行方法
def run: Try[Boolean] = Try {
// start time
val startTime = new Date().getTime
val measureTime = getMeasureTime
val contextId = ContextId(measureTime)
// get data sources
// 根据配置获取数据源,即config-batch.json的data.sources配置,读取avro文件数据,有source和target两个数据源
val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
// 数据源初始化
dataSources.foreach(_.init)
// 创建griffin执行上下文
val dqContext: DQContext = DQContext(
contextId, metricName, dataSources, sinkParams, BatchProcessType
)(sparkSession)
// 根据配置,输入结果到 console 和 elasticsearch
val applicationId = sparkSession.sparkContext.applicationId
dqContext.getSink().start(applicationId)
// 创建数据检查对比job
val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
// 执行数据对比job,根据在web端配置的步骤执行,demo主要执行配置中的rule sql,将执行结果写入sink中
val result = dqJob.execute(dqContext)
// 打印本次检查结束时间
val endTime = new Date().getTime
dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")
// 关闭griffin context
dqContext.clean()
// 输出结束标记
dqContext.getSink().finish()
result
}
def close: Try[_] = Try {
sparkSession.close()
sparkSession.stop()
}
}
到这里,对于measure的代码执行顺序已经做了一个简单说明,仔细看的同学不难发现,其实执行过程并不复杂,代码逻辑的比较清晰;
其中,本文关注的数据创建主要在:BatchDQApp 类的val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
;
我们看下DataSourceFactory类的代码:
package org.apache.griffin.measure.datasource
import scala.util.Success
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
object DataSourceFactory extends Loggable {
def getDataSources(sparkSession: SparkSession,
ssc: StreamingContext,
dataSources: Seq[DataSourceParam]
): Seq[DataSource] = {
dataSources.zipWithIndex.flatMap { pair =>
val (param, index) = pair
getDataSource(sparkSession, ssc, param, index)
}
}
private def getDataSource(sparkSession: SparkSession,
ssc: StreamingContext,
dataSourceParam: DataSourceParam,
index: Int
): Option[DataSource] = {
val name = dataSourceParam.getName
val connectorParams = dataSourceParam.getConnectors
val timestampStorage = TimestampStorage()
// streaming 数据缓存
val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
// 获取数源连接
val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
// 从连接工厂获取连接
DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
timestampStorage, streamingCacheClientOpt) match {
case Success(connector) => Some(connector)
case _ => None
}
}
Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
}
}
DataConnectorFactory 数据源连接工厂
package org.apache.griffin.measure.datasource.connector
import scala.util.Try
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.batch._
import org.apache.griffin.measure.datasource.connector.streaming._
object DataConnectorFactory extends Loggable {
val HiveRegex = """^(?i)hive$""".r
val AvroRegex = """^(?i)avro$""".r
val TextDirRegex = """^(?i)text-dir$""".r
val KafkaRegex = """^(?i)kafka$""".r
val CustomRegex = """^(?i)custom$""".r
/**
* create data connector
* @param sparkSession spark env
* @param ssc spark streaming env
* @param dcParam data connector param
* @param tmstCache same tmst cache in one data source
* @param streamingCacheClientOpt for streaming cache
* @return data connector
*/
def getDataConnector(sparkSession: SparkSession,
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]
): Try[DataConnector] = {
val conType = dcParam.getType
val version = dcParam.getVersion
Try {
// 数据源映射
conType match {
case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case KafkaRegex() =>
getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ => throw new Exception("connector creation error!")
}
}
}
private def getStreamingDataConnector(sparkSession: SparkSession,
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]
): StreamingDataConnector = {
if (ssc == null) throw new Exception("streaming context is null!")
val conType = dcParam.getType
val version = dcParam.getVersion
conType match {
case KafkaRegex() =>
getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ => throw new Exception("streaming connector creation error!")
}
}
// 自定义数据源标识方法
private def getCustomConnector(session: SparkSession,
context: StreamingContext,
param: DataConnectorParam,
storage: TimestampStorage,
maybeClient: Option[StreamingCacheClient]): DataConnector = {
val className = param.getConfig("class").asInstanceOf[String]
val cls = Class.forName(className)
if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
val ctx = BatchDataConnectorContext(session, param, storage)
val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext])
meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
} else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient)
val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext])
meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
} else {
throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector")
}
}
private def getKafkaDataConnector(sparkSession: SparkSession,
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]
): KafkaStreamingDataConnector = {
val KeyType = "key.type"
val ValueType = "value.type"
val config = dcParam.getConfig
val keyType = config.getOrElse(KeyType, "java.lang.String").toString
val valueType = config.getOrElse(ValueType, "java.lang.String").toString
(keyType, valueType) match {
case ("java.lang.String", "java.lang.String") =>
KafkaStreamingStringDataConnector(
sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ =>
throw new Exception("not supported type kafka data connector")
}
}
}
看到这里,相信大家都已经知道数据源创建的方法,这里对数据源配置做一个映射,运行时得到相应的数据,demo选择avro数据源,我们接着看看AvroBatchDataConnector的实现:
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
import org.apache.griffin.measure.utils.ParamUtil._
/**
* batch data connector for avro file
*/
case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage
) extends BatchDataConnector {
val config = dcParam.getConfig
val FilePath = "file.path"
val FileName = "file.name"
val filePath = config.getString(FilePath, "")
val fileName = config.getString(FileName, "")
val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
private def pathPrefix(): Boolean = {
filePath.nonEmpty
}
private def fileExist(): Boolean = {
HdfsUtil.existPath(concreteFileFullPath)
}
def data(ms: Long): (Option[DataFrame], TimeRange) = {
val dfOpt = try {
val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
case e: Throwable =>
error(s"load avro file ${concreteFileFullPath} fails", e)
None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
}
跟着代码可以看到 AvroBatchDataConnector 实现了 DataConnector 接口,主要实现了data 从文件获取数据的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
;
类似的,我们看看griffin默认数据源hive的数据源实现方式:
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._
/**
* batch data connector for hive table
*/
case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage
) extends BatchDataConnector {
val config = dcParam.getConfig
val Database = "database"
val TableName = "table.name"
val Where = "where"
val database = config.getString(Database, "default")
val tableName = config.getString(TableName, "")
val whereString = config.getString(Where, "")
val concreteTableName = s"${database}.${tableName}"
val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
def data(ms: Long): (Option[DataFrame], TimeRange) = {
val dfOpt = try {
val dtSql = dataSql
info(dtSql)
val df = sparkSession.sql(dtSql)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
case e: Throwable =>
error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
private def tableExistsSql(): String = {
// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
s"tableName LIKE '${tableName}'"
}
private def metaDataSql(): String = {
s"DESCRIBE ${concreteTableName}"
}
private def dataSql(): String = {
val tableClause = s"SELECT * FROM ${concreteTableName}"
if (wheres.length > 0) {
val clauses = wheres.map { w =>
s"${tableClause} WHERE ${w}"
}
clauses.mkString(" UNION ALL ")
} else tableClause
}
}
hive数据源连接的实现是不是看上去比较简单,从配置文件中得到源表、目标表和对比sql,由sparkSession.sql执行val df = sparkSession.sql(dtSql)
,返回对比结果数据;熟悉spark的同学看到这里,大概已经想到,扩展一个mysql数据源已经不是很难的事情了,因为spark sql支持mysql数据源。
扩展MySQL数据源思路
- 在配置文件中添加mysql配置
- DataConnectorFactory添加相应的数据源映射
- 新增MySQLBatchDataConnector实现BatchDataConnector接口
- 考虑分库分表数据源读取方式
由于各种原因,实现代码及demo下回补上。