1、项目流程-LMLPHP

最近有个需求,实时统计pv,uv,结果按照date,hour,pv,uv来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv,比如按照date,hour,pv,uv,type来展示。这里介绍最基本的pv,uv的展示。

1 155599 306053 20180727 00
2 255496 596223 20180727 01
10 10490270 12927245 20180727 10

关于什么是pv,uv,可以参见这篇博客https://blog.csdn.net/petermsh/article/details/78652246

分析

这是一个常用的实时数据统计需求,实时处理目前可供选择的有sparkStreaming和flink,使用sparkStreaming可以使用累加器,如果字段取值水平过多,不现实了,这时候考虑使用状态算子updateStateByKey或mapWithState(),或者使用redis、mysql等做累加,去重可以使用内存、redis的Set集合,或者使用算法bloomfilter过滤器、HyperLogLog近似去重,如果是数字还可以使用bitmap去重,这里的guid是38位字符串,选择使用redis的Set集合去重。

SparkStreaming实时统计pv uv

1、项目流程

1、项目流程-LMLPHP

日志数据从flume采集过来,落到hdfs供其它离线业务使用,也会sink到kafka,sparkStreaming从kafka拉数据过来,计算pv,uv,uv是用的redis的set集合去重,最后把结果写入mysql数据库,供前端展示使用。

2、具体过程

1)PV的计算

拉取数据有两种方式,基于received和direct方式,这里用direct直拉的方式,用的mapWithState算子保存状态,这个算子与updateStateByKey一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。

定义一个状态函数

|

1234567

|

// 实时流量状态更新函数  val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => {    val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)    val output = (datehour,accuSum)    state.update(accuSum)    output  }

|

|

123

|

计算pvval stateSpec = StateSpec.function(mapFunction)val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)

|

这样就很容易的把pv计算出来了。

2)UV的计算

uv是要全天去重的,每次进来一个batch的数据,如果用原生的reduceByKey或者groupByKey对配置要求太高,在配置较低情况下,我们申请了一个93G的redis用来去重,原理是每进来一条数据,将date作为key,guid加入set集合,20秒刷新一次,也就是将set集合的尺寸取出来,更新一下数据库即可。

|

1234567891011121314151617181920212223242526272829303132333435363738394041

|

helper_data_dis.foreachRDD(rdd => {  rdd.foreachPartition(eachPartition => {    var jedis: Jedis = null    try {      jedis = getJedis      eachPartition.foreach(x => {        val arr = x._2.split("\t")        val date: String = arr(0).split(":")(0)        // helper 统计        val key0 = "helper_" + date        jedis.sadd(key0, x._1)        jedis.expire(key0, ConfigFactory.rediskeyexists)        // helperversion 统计        val key = date + "_" + arr(1)        jedis.sadd(key, x._1)        jedis.expire(key, ConfigFactory.rediskeyexists)      })    } catch {      case e: Exception => {        logger.error(e)        logger2.error(HelperHandle.getClass.getSimpleName + e)      }    } finally {      if (jedis != null) {        closeJedis(jedis)      }    }  })})// 获取jedis连接def getJedis: Jedis = {  val jedis = RedisPoolUtil.getPool.getResource  jedis}// 释放jedis连接def closeJedis(jedis: Jedis): Unit = {  RedisPoolUtil.getPool.returnResource(jedis)}

|

redis连接池代码RedisPoolUtil.scala

package com.js.ipflow.utilsimport com.js.ipflow.start.ConfigFactoryimport org.apache.commons.pool2.impl.GenericObjectPoolConfigimport redis.clients.jedis.JedisPool/**  * redis 连接池工具类  * @author keguang  */object RedisPoolUtil extends Serializable{  @transient private var pool: JedisPool = null  /**    * 读取jedis配置信息, 出发jedis初始化    */  def initJedis: Unit ={    ConfigFactory.initConfig()    val maxTotal = 50    val maxIdle = 30    val minIdle = 10    val redisHost = ConfigFactory.redishost    val redisPort = ConfigFactory.redisport    val redisTimeout = ConfigFactory.redistimeout    val redisPassword = ConfigFactory.redispassword    makePool(redisHost, redisPort, redisTimeout, redisPassword, maxTotal, maxIdle, minIdle)  }  def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,redisPassword:String, maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {   init(redisHost, redisPort, redisTimeout, redisPassword, maxTotal, maxIdle, minIdle, true, false, 10000)  }  /**    * 初始化jedis连接池    * @param redisHost host    * @param redisPort 端口    * @param redisTimeout 连接redis超时时间    * @param redisPassword redis密码    * @param maxTotal 总的连接数    * @param maxIdle 最大空闲连接数    * @param minIdle 最小空闲连接数    * @param testOnBorrow    * @param testOnReturn    * @param maxWaitMillis    */  def init(redisHost: String, redisPort: Int, redisTimeout: Int,redisPassword:String, maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean, testOnReturn: Boolean, maxWaitMillis: Long): Unit = {    if (pool == null) {      val poolConfig = new GenericObjectPoolConfig()      poolConfig.setMaxTotal(maxTotal)      poolConfig.setMaxIdle(maxIdle)      poolConfig.setMinIdle(minIdle)      poolConfig.setTestOnBorrow(testOnBorrow)      poolConfig.setTestOnReturn(testOnReturn)      poolConfig.setMaxWaitMillis(maxWaitMillis)      pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout,redisPassword)      val hook = new Thread {        override def run = pool.destroy()      }      sys.addShutdownHook(hook.run)    }  }  def getPool: JedisPool = {    if(pool == null){      initJedis    }    pool  }}

3)结果保存到数据库

结果保存到mysql,数据库,20秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv的目的。

略。。。

msql 连接池代码MysqlPoolUtil.scala

package com.js.ipflow.utilsimport java.sql.{Connection, PreparedStatement, ResultSet}import com.js.ipflow.start.ConfigFactoryimport org.apache.commons.dbcp.BasicDataSourceimport org.apache.logging.log4j.LogManager/**  *jdbc mysql 连接池工具类  * @author keguang  */object MysqlPoolUtil {  val logger = LogManager.getLogger(MysqlPoolUtil.getClass.getSimpleName)  private var bs:BasicDataSource = null  /**    * 创建数据源    * @return    */  def getDataSource():BasicDataSource={    if(bs==null){      ConfigFactory.initConfig()      bs = new BasicDataSource()      bs.setDriverClassName("com.mysql.jdbc.Driver")      bs.setUrl(ConfigFactory.mysqlurl)      bs.setUsername(ConfigFactory.mysqlusername)      bs.setPassword(ConfigFactory.mysqlpassword)      bs.setMaxActive(50)           // 设置最大并发数      bs.setInitialSize(20)          // 数据库初始化时,创建的连接个数      bs.setMinIdle(20)              // 在不新建连接的条件下,池中保持空闲的最少连接数。      bs.setMaxIdle(20)             // 池里不会被释放的最多空闲连接数量。设置为0时表示无限制。      bs.setMaxWait(5000)             // 在抛出异常之前,池等待连接被回收的最长时间(当没有可用连接时)。设置为-1表示无限等待。      bs.setMinEvictableIdleTimeMillis(10*1000)     // 空闲连接5秒中后释放      bs.setTimeBetweenEvictionRunsMillis(1*60*1000)      //1分钟检测一次是否有死掉的线程      bs.setTestOnBorrow(true)    }    bs  }  /**    * 释放数据源    */  def shutDownDataSource(){    if(bs!=null){      bs.close()    }  }  /**    * 获取数据库连接    * @return    */  def getConnection():Connection={    var con:Connection = null    try {      if(bs!=null){        con = bs.getConnection()      }else{        con = getDataSource().getConnection()      }    } catch{      case e:Exception => logger.error(e)    }    con  }  /**    * 关闭连接    */  def closeCon(rs:ResultSet ,ps:PreparedStatement,con:Connection){    if(rs!=null){      try {        rs.close()      } catch{        case e:Exception => println(e.getMessage)      }    }    if(ps!=null){      try {        ps.close()      } catch{        case e:Exception => println(e.getMessage)      }    }    if(con!=null){      try {        con.close()      } catch{        case e:Exception => println(e.getMessage)      }    }  }}

4)数据容错

流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper等到。这里用SparkStreaming自带的checkpoint机制来实现应用重启时数据恢复。

checkpoint

这里采用的是checkpoint机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka对应offset读取数据。

|

123456789101112131415

|

// 初始化配置文件ConfigFactory.initConfig()val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)conf.set("spark.streaming.stopGracefullyOnShutdown","true")conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate)conf.set("spark.default.parallelism","24")val sc = new SparkContext(conf)while (true){	val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ )    ssc.start()    ssc.awaitTerminationOrTimeout(resetTime)    ssc.stop(false,true)}

|

checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。

应用迁移或者程序升级

在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint,可能会有两个问题:

其实有时候,修改代码后不用删除checkpoint也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint,就会导致上一次未完成的任务和消费kafka的offset丢失,直接导致数据丢失,这种情况下我一般这么做。

5)日志

日志用的log4j2,本地保存一份,ERROR级别的日志会通过邮件发送到手机。

|

123

|

val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)  // 邮件level=error日志  val logger2 = LogManager.getLogger("email")

|

3、主要代码

pom.xml文件:

略。。。

读取配置文件代码ConfigFactory .java

略。。。

组合起来。

Exactly Once消费

由于基于checkpoint的在程序升级时,可能出现反序列化错误,导致启动失败的情况,这里使用direct直拉kafka,保存结果和偏移量到mysql的一条数据行中,要么全部失败,要么全部成功,保证精确一次消费。

  1package com.test  2  3import java.sql.{Connection, PreparedStatement, Statement}  4import java.text.SimpleDateFormat  5import java.util.{Calendar, Date}  6  7import com.js.ipflow.config.Configuration  8import org.apache.spark.{SparkConf, SparkContext}  9import org.apache.spark.streaming.{Seconds, StreamingContext} 10import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} 11import kafka.common.TopicAndPartition 12import kafka.message.MessageAndMetadata 13import kafka.serializer.StringDecoder 14import org.apache.logging.log4j.LogManager 15import org.apache.spark.rdd.RDD 16import org.apache.spark.streaming.dstream.InputDStream 17import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} 18 19import scala.collection.mutable 20import scala.collection.mutable.ArrayBuffer 21 22object Hm4HandlePlus { 23 24  val logger = LogManager.getLogger(Hm4Handle.getClass.getSimpleName) 25  // 邮件level=error日志 26  val logger2 = LogManager.getLogger("email") 27 28  import collection.JavaConverters._ 29 30  // 读取配置 31  val config: mutable.Map[String, String] = Configuration.initConfig("commons.xml").asScala 32 33  def main(args: Array[String]): Unit = { 34    run(args(0)) 35  } 36 37  def run(consumeRate: String): Unit = { 38    val sparkConf = new SparkConf().setAppName(config.get("spark-streamname").getOrElse("Hm4Handle")) 39    //     .setMaster("local[*]") 40    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", consumeRate) 41    sparkConf.set("spark.default.parallelism", "36") 42 43    val sc = new SparkContext(sparkConf) 44 45    // StreamingContext按天滚动 46    while (true) { 47      val ssc = new StreamingContext(sc, Seconds(config.get("spark-seconds").getOrElse("5").toLong)) 48 49      val topics = config.get("kafka-topic").getOrElse("test") 50      val topicSet = Set(topics) 51      val kafkaParams: Map[String, String] = Map[String, String]( 52        "metadata.broker.list" -> config.get("kafka-ipport").getOrElse("0.0.0.0:2181") 53        , "group.id" -> config.get("kafka-groupid").getOrElse("test") 54        , "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString 55        // "enable.auto.commit" -> "false" 56      ) 57 58      val table_name= "table_name" 59      val tpMap = getLastOffsets(hm4_realtime_mini_count) 60 61      var messages: InputDStream[(String, String)] = null 62      if (tpMap.nonEmpty) { 63        messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( 64          ssc 65          , kafkaParams 66          , tpMap.toMap 67          , (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) 68        ) 69      } else { 70 71        messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( 72          ssc 73          , kafkaParams 74          , topicSet 75        ) 76      } 77 78      // (date, hour, event, version, _type, value, opentype, times, vertype) 79      messages.foreachRDD(rdd => { 80        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 81        val offset = offsetRanges2Json(offsetRanges).toString 82 83        // 当前rdd聚合结果 84        val currentRdd = FilterHm4.getMiniRDD(rdd 85          .filter(x => { 86            try { 87              JSON.parseObject(x._2) 88              true 89            } catch { 90              case e: Exception => { 91                logger.error("错误数据 => " + x) 92                logger.error(e.getMessage) 93              } 94                false 95            } 96          }).repartition(36) 97          .map(x => { 98            JSON.parseObject(x._2) 99          })).map({ case (date, hour, event, version, _type, value, opentype, times, vertype) => {100          (date + formatKey(hour) + formatKey(event) + formatKey(version) + formatKey(_type) + formatKey(value)101            + formatKey(opentype) + formatKey(times) + formatKey(vertype), 1L)102        }103        }).reduceByKey((x, y) => x + y).repartition(2)104105        // 保存到mysql106        insertMysql(currentRdd, table_name, offset)107108      })109110      ssc.start()111      ssc.awaitTerminationOrTimeout(resetTime) // 距离第二天0点,ssc销毁时间112      ssc.stop(false, true)113    }114  }115116  def offsetRanges2Json(arr: Array[OffsetRange]): JSONArray = {117    val jSONArray = new JSONArray()118    arr.foreach(offsetRange => {119      val jsonObject = new JSONObject()120      jsonObject.put("partition", offsetRange.partition)121      jsonObject.put("fromOffset", offsetRange.fromOffset)122      jsonObject.put("untilOffset", offsetRange.untilOffset)123      jsonObject.put("topic", offsetRange.topic)124125      jSONArray.add(jsonObject)126    })127128    jSONArray129  }130131  /**132    * 从mysql查询offset133    *134    * @param tbName135    * @return136    */137  def getLastOffsets(tbName: String): mutable.HashMap[TopicAndPartition, Long] = {138    val sql = s"select offset from ${tbName} where id = (select max(id) from ${tbName})"139    val conn = MysqlPool.getConnection(config)140    val psts = conn.prepareStatement(sql)141    val res = psts.executeQuery()142    var tpMap: mutable.HashMap[TopicAndPartition, Long] = mutable.HashMap[TopicAndPartition, Long]()143    while (res.next()) {144      val o = res.getString(1)145      val jSONArray = JSON.parseArray(o)146      jSONArray.toArray().foreach(offset => {147        val json = JSON.parseObject(offset.toString)148        val topicAndPartition = TopicAndPartition(json.getString("topic"), json.getInteger("partition"))149        tpMap.put(topicAndPartition, json.getLong("untilOffset"))150      })151    }152    MysqlPool.closeCon(res, psts, conn)153    tpMap154155  }156}

Flink统计 pv

我们需要统计不同数据类型每天的pv,uv情况,并且有如下要求.

  • 每秒钟要输出最新的统计结果

  • 程序永远跑着不会停,所以要定期清理内存里的过时数据

  • 收到的消息里的时间字段并不是按照顺序严格递增的,所以要有一定的容错机制

  • 访问uv并不一定每秒钟都会变化,重复输出对IO是巨大的浪费,所以要在uv变更时在一秒内输出结果,未变更时不输出

flink数据流上的类型和操作

DataStream是flink流处理最核心的数据结构,其它的各种流都可以直接或者间接通过DataStream来完成相互转换,一些常用的流直接的转换关系如图:

1、项目流程-LMLPHP

可以看出,DataStream可以与KeyedStream相互转换,KeyedStream可以转换为WindowedStream,DataStream不能直接转换为WindowedStream,WindowedStream可以直接转换为DataStream。各种流之间虽然不能相互直接转换,但是都可以通过先转换为DataStream,再转换为其它流的方法来实现。

在这个计算pv,uv的需求中就主要用到DataStream、KeyedStream以及WindowedStream这些数据结构。

这里需要用到window和watermark,使用窗口把数据按天分割,使用watermark可以通过“水位”来定期清理窗口外的迟到数据,起到清理内存的作用。

业务代码

我们的数据是json类型的,含有date,helperversion,guid这3个字段,在实时统计pv,uv这个功能中,其它字段可以直接丢掉,当然了在离线数据仓库中,所有有含义的业务字段都是要保留到hive当中的。
其它相关概念就不说了,会专门介绍,这里直接上代码吧。

由于包含了很多其它的非flink的依赖,可以选择flink的依赖,减少下载依赖的时间。

主要代码,主要使用scala开发:

import java.util.Propertiesimport com.alibaba.fastjson.JSONimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTriggerimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010import org.apache.flink.streaming.util.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.extensions._import org.apache.flink.api.scala._object PvUvCount {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 容错    env.enableCheckpointing(5000)    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)    env.setStateBackend(new FsStateBackend("file:///D:/space/IJ/bigdata/src/main/scala/com/ddxygq/bigdata/flink/checkpoint/flink/tagApp"))    // kafka 配置    val ZOOKEEPER_HOST = "localhost:2181"    val KAFKA_BROKERS = "localhost:9092"    val TRANSACTION_GROUP = "flink-helper-label-count"    val TOPIC_NAME = "test"    val kafkaProps = new Properties()    kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS)    kafkaProps.setProperty("group.id", TRANSACTION_GROUP)    // watrmark 允许数据延迟时间    val MaxOutOfOrderness = 86400 * 1000L        // 消费kafka数据    val streamData: DataStream[(String, String, String)] = env.addSource(      new FlinkKafkaConsumer010[String](TOPIC_NAME, new SimpleStringSchema(), kafkaProps)    ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(MaxOutOfOrderness)) {      override def extractTimestamp(element: String): Long = {        val t = JSON.parseObject(element)        val time = JSON.parseObject(JSON.parseObject(t.getString("message")).getString("data")).getString("time")        time.toLong      }    }).map(x => {      var date = "error"      var guid = "error"      var helperversion = "error"      try {        val messageJsonObject = JSON.parseObject(JSON.parseObject(x).getString("message"))        val datetime = messageJsonObject.getString("time")        date = datetime.split(" ")(0)        // hour = datetime.split(" ")(1).substring(0, 2)        val data_string = messageJsonObject.getString("data")        if (!"".equals(data_string)) {          val data = JSON.parseObject(data_string)          guid = data.getString("guid").trim          helperversion = data.getString("helperversion")        }      } catch {        case e: Exception => {          println(e)        }      }      (date, helperversion, guid)    })    // 这上面是设置watermark并解析json部分    // 聚合窗口中的数据,可以研究下applyWith这个方法和OnWindowedStream这个类    val resultStream = streamData.keyBy(x => {      x._1 + x._2    }).timeWindow(Time.days(1))      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))      .applyWith(("", List.empty[Int], Set.empty[Int], 0L, 0L))(        foldFunction = {          case ((_, list, set, _, 0), item) => {            val date = item._1            val helperversion = item._2            val guid = item._3            (date + "_" + helperversion, guid.hashCode +: list, set + guid.hashCode, 0L, 0L)          }        }        , windowFunction = {          case (key, window, result) => {            result.map {              case (leixing, list, set, _, _) => {                (leixing, list.size, set.size, window.getStart, window.getEnd)              }            }          }        }      ).keyBy(0)      .flatMapWithState[(String, Int, Int, Long, Long),(Int, Int)]{      case ((key, numpv, numuv, begin, end), curr) =>        curr match {          case Some(numCurr) if numCurr == (numuv, numpv) =>            (Seq.empty, Some((numuv, numpv))) //如果之前已经有相同的数据,则返回空结果          case _ =>            (Seq((key, numpv, numuv, begin, end)), Some((numuv, numpv)))        }    }    // 最终结果    val resultedStream = resultStream.map(x => {      val keys = x._1.split("_")      val date = keys(0)      val helperversion = keys(1)      (date, helperversion, x._2, x._3)    })    resultedStream.print()    env.execute("PvUvCount")  }  }

参考资料
https://flink.sojb.cn/dev/event_time.html
http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/
https://segmentfault.com/a/1190000006235690

猜你喜欢

1、项目流程-LMLPHP

本文分享自微信公众号 - Java大数据与数据仓库(ikeguang2)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

04-11 15:09