概述

Apache Griffin定位为大数据的数据质量监控工具,支持批处理数据源hive、text文件、avro文件和实时数据源kafka,而一些以关系型数据库如mysql、oracle为存储的项目也同样需要可配置化的数据质量监控工具,所以扩展griffin的mysql数据源就可以为项目的数据质量监控提供多一种选择。

代码结构

从上一篇文章apache griffin 中已经介绍了griffin的特性、执行流程及其架构,本文主要介绍一下其代码结构及扩展数据源的简单实现,先了解一下代码结构:

Appache Griffin 扩展Mysql数据源-LMLPHP

代码主要分为measure、service、ui三部分,measure为spark定时任务代码;service为spring boot代码,做web端配置和监控界面;ui为前端angular js相关代码和资源。

扩展数据源主要实现代码在measure模块,下面以griffin项目中的demo读取avro数据源的批处理为实例介绍一下griffin如何读取配置和选择数据源:

  1. 批处理avro文件数据源配置、measure模块执行环境配置

环境配置文件: env-batch.json

{
  # spark 配置
  "spark": {
    "log.level": "WARN",
    "config": {
      "spark.master": "local[*]"
    }
  },

  # 对比结果输出配置,console、hdfs、elasticsearch
  "sinks": [
    {
      "type": "CONSOLE",
      "config": {
        "max.log.lines": 10
      }
    },
    {
      "type": "HDFS",
      "config": {
        "path": "hdfs://localhost/griffin/batch/persist",
        "max.persist.lines": 10000,
        "max.lines.per.file": 10000
      }
    },
    {
      "type": "ELASTICSEARCH",
      "config": {
        "method": "post",
        "api": "http://10.148.181.248:39200/griffin/accuracy",
        "connection.timeout": "1m",
        "retry": 10
      }
    }
  ],

  "griffin.checkpoint": []
}

数据源配置文件:config-batch.json

{
  # 任务名称
  "name": "accu_batch",
  # 任务类型,batch 或 streaming
  "process.type": "batch",
  # 数据源 和 数据对比目标 配置
  "data.sources": [
    {
      "name": "source",
      "baseline": true,
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "src/test/resources/users_info_src.avro"
          }
        }
      ]
    }, {
      "name": "target",
      "connectors": [
        {
          "type": "avro",
          "version": "1.7",
          "config": {
            "file.name": "src/test/resources/users_info_target.avro"
          }
        }
      ]
    }
  ],
  # 数据校验规则,这里选择 accuracy 准确性对比
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code"
      }
    ]
  },
  # 数据对比结果输出 控制台和es
  "sinks": ["CONSOLE","ELASTICSEARCH"]
}
  1. measure 模块代码入口及简单说明
package org.apache.griffin.measure

import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp


/**
  * application entrance
  */
object Application extends Loggable {

  def main(args: Array[String]): Unit = {
    info(args.toString)
    if (args.length < 2) {
      error("Usage: class <env-param> <dq-param>")
      sys.exit(-1)
    }

    // 配置运行参数读取 env-batch.json 和 config-batch.json
    val envParamFile = args(0)
    val dqParamFile = args(1)

    info(envParamFile)
    info(dqParamFile)

    // read param files
    val envParam = readParamFile[EnvConfig](envParamFile) match {
      case Success(p) => p
      case Failure(ex) =>
        error(ex.getMessage, ex)
        sys.exit(-2)
    }
    val dqParam = readParamFile[DQConfig](dqParamFile) match {
      case Success(p) => p
      case Failure(ex) =>
        error(ex.getMessage, ex)
        sys.exit(-2)
    }

    // 环境配置和数据源配置组合成 griffin配置
    val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)

    // 根据数据源配置选择数据源
    // 从数据源配置 process.type 得到配置类型为 batch
    val procType = ProcessType(allParam.getDqConfig.getProcType)
    val dqApp: DQApp = procType match {
      case BatchProcessType => BatchDQApp(allParam)
      case StreamingProcessType => StreamingDQApp(allParam)
      case _ =>
        error(s"${procType} is unsupported process type!")
        sys.exit(-4)
    }

    startup

    // 初始化 griffin 定时任务执行环境
    // 具体代码见下个代码块,主要逻辑是创建 sparkSession 和注册griffin自定义的spark udf
    dqApp.init match {
      case Success(_) =>
        info("process init success")
      case Failure(ex) =>
        error(s"process init error: ${ex.getMessage}", ex)
        shutdown
        sys.exit(-5)
    }

    // 执行定时任务,这里根据配置是执行批处理任务
    val success = dqApp.run match {
      case Success(result) =>
        info("process run result: " + (if (result) "success" else "failed"))
        result

      case Failure(ex) =>
        error(s"process run error: ${ex.getMessage}", ex)

        if (dqApp.retryable) {
          throw ex
        } else {
          shutdown
          sys.exit(-5)
        }
    }

    // 关闭定时任务
    dqApp.close match {
      case Success(_) =>
        info("process end success")
      case Failure(ex) =>
        error(s"process end error: ${ex.getMessage}", ex)
        shutdown
        sys.exit(-5)
    }

    shutdown
    // 退出执行程序
    if (!success) {
      sys.exit(-5)
    }
  }

  private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
    val paramReader = ParamReaderFactory.getParamReader(file)
    paramReader.readConfig[T]
  }

  private def startup(): Unit = {
  }

  private def shutdown(): Unit = {
  }

}

批处理任务处理类

package org.apache.griffin.measure.launch.batch

import java.util.Date

import scala.util.Try

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, SQLContext}

import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent


case class BatchDQApp(allParam: GriffinConfig) extends DQApp {

  val envParam: EnvConfig = allParam.getEnvConfig
  val dqParam: DQConfig = allParam.getDqConfig

  val sparkParam = envParam.getSparkParam
  val metricName = dqParam.getName
//  val dataSourceParams = dqParam.dataSources
//  val dataSourceNames = dataSourceParams.map(_.name)
  val sinkParams = getSinkParams

  var sqlContext: SQLContext = _

  implicit var sparkSession: SparkSession = _

  def retryable: Boolean = false

  // 初始化并创建sparkSession、注册griffin自定义udf
  def init: Try[_] = Try {
    // build spark 2.0+ application context
    val conf = new SparkConf().setAppName(metricName)
    conf.setAll(sparkParam.getConfig)
    conf.set("spark.sql.crossJoin.enabled", "true")
    sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
    sqlContext = sparkSession.sqlContext

    // register udf
    GriffinUDFAgent.register(sqlContext)
  }

  // 定时任务执行方法
  def run: Try[Boolean] = Try {
    // start time
    val startTime = new Date().getTime

    val measureTime = getMeasureTime
    val contextId = ContextId(measureTime)

    // get data sources
    // 根据配置获取数据源,即config-batch.json的data.sources配置,读取avro文件数据,有source和target两个数据源
    val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
    // 数据源初始化
    dataSources.foreach(_.init)

    // 创建griffin执行上下文
    val dqContext: DQContext = DQContext(
      contextId, metricName, dataSources, sinkParams, BatchProcessType
    )(sparkSession)

    // 根据配置,输入结果到 console 和 elasticsearch
    val applicationId = sparkSession.sparkContext.applicationId
    dqContext.getSink().start(applicationId)

    // 创建数据检查对比job
    val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)

    // 执行数据对比job,根据在web端配置的步骤执行,demo主要执行配置中的rule sql,将执行结果写入sink中
    val result = dqJob.execute(dqContext)

    // 打印本次检查结束时间
    val endTime = new Date().getTime
    dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")

    // 关闭griffin context
    dqContext.clean()

    // 输出结束标记
    dqContext.getSink().finish()

    result
  }

  def close: Try[_] = Try {
    sparkSession.close()
    sparkSession.stop()
  }

}

到这里,对于measure的代码执行顺序已经做了一个简单说明,仔细看的同学不难发现,其实执行过程并不复杂,代码逻辑的比较清晰;

其中,本文关注的数据创建主要在:BatchDQApp 类的val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)

我们看下DataSourceFactory类的代码:

package org.apache.griffin.measure.datasource

import scala.util.Success

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext

import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}


object DataSourceFactory extends Loggable {

  def getDataSources(sparkSession: SparkSession,
                     ssc: StreamingContext,
                     dataSources: Seq[DataSourceParam]
                    ): Seq[DataSource] = {
    dataSources.zipWithIndex.flatMap { pair =>
      val (param, index) = pair
      getDataSource(sparkSession, ssc, param, index)
    }
  }

  private def getDataSource(sparkSession: SparkSession,
                            ssc: StreamingContext,
                            dataSourceParam: DataSourceParam,
                            index: Int
                           ): Option[DataSource] = {
    val name = dataSourceParam.getName
    val connectorParams = dataSourceParam.getConnectors
    val timestampStorage = TimestampStorage()

    // streaming 数据缓存
    val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
      sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)

    // 获取数源连接
    val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
      // 从连接工厂获取连接
      DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
        timestampStorage, streamingCacheClientOpt) match {
          case Success(connector) => Some(connector)
          case _ => None
        }
    }

    Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
  }

}

DataConnectorFactory 数据源连接工厂

package org.apache.griffin.measure.datasource.connector

import scala.util.Try

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext

import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.batch._
import org.apache.griffin.measure.datasource.connector.streaming._


object DataConnectorFactory extends Loggable {

  val HiveRegex = """^(?i)hive$""".r
  val AvroRegex = """^(?i)avro$""".r
  val TextDirRegex = """^(?i)text-dir$""".r

  val KafkaRegex = """^(?i)kafka$""".r

  val CustomRegex = """^(?i)custom$""".r

  /**
    * create data connector
    * @param sparkSession     spark env
    * @param ssc              spark streaming env
    * @param dcParam          data connector param
    * @param tmstCache        same tmst cache in one data source
    * @param streamingCacheClientOpt   for streaming cache
    * @return   data connector
    */
  def getDataConnector(sparkSession: SparkSession,
                       ssc: StreamingContext,
                       dcParam: DataConnectorParam,
                       tmstCache: TimestampStorage,
                       streamingCacheClientOpt: Option[StreamingCacheClient]
                      ): Try[DataConnector] = {
    val conType = dcParam.getType
    val version = dcParam.getVersion
    Try {
      // 数据源映射
      conType match {
        case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
        case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
        case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
        case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
        case KafkaRegex() =>
          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
        case _ => throw new Exception("connector creation error!")
      }
    }
  }

  private def getStreamingDataConnector(sparkSession: SparkSession,
                                        ssc: StreamingContext,
                                        dcParam: DataConnectorParam,
                                        tmstCache: TimestampStorage,
                                        streamingCacheClientOpt: Option[StreamingCacheClient]
                                       ): StreamingDataConnector = {
    if (ssc == null) throw new Exception("streaming context is null!")
    val conType = dcParam.getType
    val version = dcParam.getVersion
    conType match {
      case KafkaRegex() =>
        getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
      case _ => throw new Exception("streaming connector creation error!")
    }
  }

  // 自定义数据源标识方法
  private def getCustomConnector(session: SparkSession,
                                 context: StreamingContext,
                                 param: DataConnectorParam,
                                 storage: TimestampStorage,
                                 maybeClient: Option[StreamingCacheClient]): DataConnector = {
    val className = param.getConfig("class").asInstanceOf[String]
    val cls = Class.forName(className)
    if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
      val ctx = BatchDataConnectorContext(session, param, storage)
      val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext])
      meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
    } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
      val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient)
      val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext])
      meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
    } else {
      throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector")
    }
  }

  private def getKafkaDataConnector(sparkSession: SparkSession,
                                    ssc: StreamingContext,
                                    dcParam: DataConnectorParam,
                                    tmstCache: TimestampStorage,
                                    streamingCacheClientOpt: Option[StreamingCacheClient]
                                   ): KafkaStreamingDataConnector = {
    val KeyType = "key.type"
    val ValueType = "value.type"
    val config = dcParam.getConfig
    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
    val valueType = config.getOrElse(ValueType, "java.lang.String").toString

    (keyType, valueType) match {
      case ("java.lang.String", "java.lang.String") =>
        KafkaStreamingStringDataConnector(
          sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
      case _ =>
        throw new Exception("not supported type kafka data connector")
    }
  }
}

看到这里,相信大家都已经知道数据源创建的方法,这里对数据源配置做一个映射,运行时得到相应的数据,demo选择avro数据源,我们接着看看AvroBatchDataConnector的实现:

package org.apache.griffin.measure.datasource.connector.batch

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
import org.apache.griffin.measure.utils.ParamUtil._

/**
  * batch data connector for avro file
  */
case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
                                  dcParam: DataConnectorParam,
                                  timestampStorage: TimestampStorage
                                 ) extends BatchDataConnector {

  val config = dcParam.getConfig

  val FilePath = "file.path"
  val FileName = "file.name"

  val filePath = config.getString(FilePath, "")
  val fileName = config.getString(FileName, "")

  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName

  private def pathPrefix(): Boolean = {
    filePath.nonEmpty
  }

  private def fileExist(): Boolean = {
    HdfsUtil.existPath(concreteFileFullPath)
  }

  def data(ms: Long): (Option[DataFrame], TimeRange) = {
    val dfOpt = try {
      val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
      val dfOpt = Some(df)
      val preDfOpt = preProcess(dfOpt, ms)
      preDfOpt
    } catch {
      case e: Throwable =>
        error(s"load avro file ${concreteFileFullPath} fails", e)
        None
    }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }
}

跟着代码可以看到 AvroBatchDataConnector 实现了 DataConnector 接口,主要实现了data 从文件获取数据的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)

类似的,我们看看griffin默认数据源hive的数据源实现方式:

package org.apache.griffin.measure.datasource.connector.batch

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._

/**
  * batch data connector for hive table
  */
case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
                                  dcParam: DataConnectorParam,
                                  timestampStorage: TimestampStorage
                                 ) extends BatchDataConnector {

  val config = dcParam.getConfig

  val Database = "database"
  val TableName = "table.name"
  val Where = "where"

  val database = config.getString(Database, "default")
  val tableName = config.getString(TableName, "")
  val whereString = config.getString(Where, "")

  val concreteTableName = s"${database}.${tableName}"
  val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)

  def data(ms: Long): (Option[DataFrame], TimeRange) = {
    val dfOpt = try {
      val dtSql = dataSql
      info(dtSql)
      val df = sparkSession.sql(dtSql)
      val dfOpt = Some(df)
      val preDfOpt = preProcess(dfOpt, ms)
      preDfOpt
    } catch {
      case e: Throwable =>
        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
        None
    }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }


  private def tableExistsSql(): String = {
//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
    s"tableName LIKE '${tableName}'"
  }

  private def metaDataSql(): String = {
    s"DESCRIBE ${concreteTableName}"
  }

  private def dataSql(): String = {
    val tableClause = s"SELECT * FROM ${concreteTableName}"
    if (wheres.length > 0) {
      val clauses = wheres.map { w =>
        s"${tableClause} WHERE ${w}"
      }
      clauses.mkString(" UNION ALL ")
    } else tableClause
  }

}

hive数据源连接的实现是不是看上去比较简单,从配置文件中得到源表、目标表和对比sql,由sparkSession.sql执行val df = sparkSession.sql(dtSql),返回对比结果数据;熟悉spark的同学看到这里,大概已经想到,扩展一个mysql数据源已经不是很难的事情了,因为spark sql支持mysql数据源。

扩展MySQL数据源思路

  1. 在配置文件中添加mysql配置
  2. DataConnectorFactory添加相应的数据源映射
  3. 新增MySQLBatchDataConnector实现BatchDataConnector接口
  4. 考虑分库分表数据源读取方式

由于各种原因,实现代码及demo下回补上。

by 赖泽坤@vipshop.com

相关资源:https://github.com/apache/griffin

06-30 18:06