最近有个需求,实时统计pv,uv
,结果按照date,hour,pv,uv
来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv
,比如按照date,hour,pv,uv,type
来展示。这里介绍最基本的pv,uv
的展示。
1 | 155599 | 306053 | 20180727 | 00 |
2 | 255496 | 596223 | 20180727 | 01 |
… | … | … | … | … |
10 | 10490270 | 12927245 | 20180727 | 10 |
关于什么是pv,uv,可以参见这篇博客https://blog.csdn.net/petermsh/article/details/78652246
分析
这是一个常用的实时数据统计需求,实时处理目前可供选择的有sparkStreaming和flink,使用sparkStreaming可以使用累加器,如果字段取值水平过多,不现实了,这时候考虑使用状态算子updateStateByKey或mapWithState(),或者使用redis、mysql等做累加,去重可以使用内存、redis的Set集合,或者使用算法bloomfilter过滤器、HyperLogLog近似去重,如果是数字还可以使用bitmap去重,这里的guid是38位字符串,选择使用redis的Set集合去重。
SparkStreaming实时统计pv uv
1、项目流程
日志数据从flume采集过来,落到hdfs供其它离线业务使用,也会sink到kafka,sparkStreaming从kafka拉数据过来,计算pv,uv,uv是用的redis的set集合去重,最后把结果写入mysql数据库,供前端展示使用。
2、具体过程
1)PV的计算
拉取数据有两种方式,基于received和direct方式,这里用direct直拉的方式,用的mapWithState算子保存状态,这个算子与updateStateByKey一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。
定义一个状态函数
|
1234567
|
// 实时流量状态更新函数 val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => { val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L) val output = (datehour,accuSum) state.update(accuSum) output }
|
|
123
|
计算pvval stateSpec = StateSpec.function(mapFunction)val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
|
这样就很容易的把pv计算出来了。
2)UV的计算
uv是要全天去重的,每次进来一个batch的数据,如果用原生的reduceByKey或者groupByKey对配置要求太高,在配置较低情况下,我们申请了一个93G的redis用来去重,原理是每进来一条数据,将date作为key,guid加入set集合,20秒刷新一次,也就是将set集合的尺寸取出来,更新一下数据库即可。
|
1234567891011121314151617181920212223242526272829303132333435363738394041
|
helper_data_dis.foreachRDD(rdd => { rdd.foreachPartition(eachPartition => { var jedis: Jedis = null try { jedis = getJedis eachPartition.foreach(x => { val arr = x._2.split("\t") val date: String = arr(0).split(":")(0) // helper 统计 val key0 = "helper_" + date jedis.sadd(key0, x._1) jedis.expire(key0, ConfigFactory.rediskeyexists) // helperversion 统计 val key = date + "_" + arr(1) jedis.sadd(key, x._1) jedis.expire(key, ConfigFactory.rediskeyexists) }) } catch { case e: Exception => { logger.error(e) logger2.error(HelperHandle.getClass.getSimpleName + e) } } finally { if (jedis != null) { closeJedis(jedis) } } })})// 获取jedis连接def getJedis: Jedis = { val jedis = RedisPoolUtil.getPool.getResource jedis}// 释放jedis连接def closeJedis(jedis: Jedis): Unit = { RedisPoolUtil.getPool.returnResource(jedis)}
|
redis连接池代码RedisPoolUtil.scala
:
package com.js.ipflow.utilsimport com.js.ipflow.start.ConfigFactoryimport org.apache.commons.pool2.impl.GenericObjectPoolConfigimport redis.clients.jedis.JedisPool/** * redis 连接池工具类 * @author keguang */object RedisPoolUtil extends Serializable{ @transient private var pool: JedisPool = null /** * 读取jedis配置信息, 出发jedis初始化 */ def initJedis: Unit ={ ConfigFactory.initConfig() val maxTotal = 50 val maxIdle = 30 val minIdle = 10 val redisHost = ConfigFactory.redishost val redisPort = ConfigFactory.redisport val redisTimeout = ConfigFactory.redistimeout val redisPassword = ConfigFactory.redispassword makePool(redisHost, redisPort, redisTimeout, redisPassword, maxTotal, maxIdle, minIdle) } def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,redisPassword:String, maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = { init(redisHost, redisPort, redisTimeout, redisPassword, maxTotal, maxIdle, minIdle, true, false, 10000) } /** * 初始化jedis连接池 * @param redisHost host * @param redisPort 端口 * @param redisTimeout 连接redis超时时间 * @param redisPassword redis密码 * @param maxTotal 总的连接数 * @param maxIdle 最大空闲连接数 * @param minIdle 最小空闲连接数 * @param testOnBorrow * @param testOnReturn * @param maxWaitMillis */ def init(redisHost: String, redisPort: Int, redisTimeout: Int,redisPassword:String, maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean, testOnReturn: Boolean, maxWaitMillis: Long): Unit = { if (pool == null) { val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxTotal(maxTotal) poolConfig.setMaxIdle(maxIdle) poolConfig.setMinIdle(minIdle) poolConfig.setTestOnBorrow(testOnBorrow) poolConfig.setTestOnReturn(testOnReturn) poolConfig.setMaxWaitMillis(maxWaitMillis) pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout,redisPassword) val hook = new Thread { override def run = pool.destroy() } sys.addShutdownHook(hook.run) } } def getPool: JedisPool = { if(pool == null){ initJedis } pool }}
3)结果保存到数据库
结果保存到mysql,数据库,20秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv的目的。
略。。。
msql 连接池代码MysqlPoolUtil.scala
package com.js.ipflow.utilsimport java.sql.{Connection, PreparedStatement, ResultSet}import com.js.ipflow.start.ConfigFactoryimport org.apache.commons.dbcp.BasicDataSourceimport org.apache.logging.log4j.LogManager/** *jdbc mysql 连接池工具类 * @author keguang */object MysqlPoolUtil { val logger = LogManager.getLogger(MysqlPoolUtil.getClass.getSimpleName) private var bs:BasicDataSource = null /** * 创建数据源 * @return */ def getDataSource():BasicDataSource={ if(bs==null){ ConfigFactory.initConfig() bs = new BasicDataSource() bs.setDriverClassName("com.mysql.jdbc.Driver") bs.setUrl(ConfigFactory.mysqlurl) bs.setUsername(ConfigFactory.mysqlusername) bs.setPassword(ConfigFactory.mysqlpassword) bs.setMaxActive(50) // 设置最大并发数 bs.setInitialSize(20) // 数据库初始化时,创建的连接个数 bs.setMinIdle(20) // 在不新建连接的条件下,池中保持空闲的最少连接数。 bs.setMaxIdle(20) // 池里不会被释放的最多空闲连接数量。设置为0时表示无限制。 bs.setMaxWait(5000) // 在抛出异常之前,池等待连接被回收的最长时间(当没有可用连接时)。设置为-1表示无限等待。 bs.setMinEvictableIdleTimeMillis(10*1000) // 空闲连接5秒中后释放 bs.setTimeBetweenEvictionRunsMillis(1*60*1000) //1分钟检测一次是否有死掉的线程 bs.setTestOnBorrow(true) } bs } /** * 释放数据源 */ def shutDownDataSource(){ if(bs!=null){ bs.close() } } /** * 获取数据库连接 * @return */ def getConnection():Connection={ var con:Connection = null try { if(bs!=null){ con = bs.getConnection() }else{ con = getDataSource().getConnection() } } catch{ case e:Exception => logger.error(e) } con } /** * 关闭连接 */ def closeCon(rs:ResultSet ,ps:PreparedStatement,con:Connection){ if(rs!=null){ try { rs.close() } catch{ case e:Exception => println(e.getMessage) } } if(ps!=null){ try { ps.close() } catch{ case e:Exception => println(e.getMessage) } } if(con!=null){ try { con.close() } catch{ case e:Exception => println(e.getMessage) } } }}
4)数据容错
流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper等到。这里用SparkStreaming自带的checkpoint机制来实现应用重启时数据恢复。
checkpoint
这里采用的是checkpoint机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka对应offset读取数据。
|
123456789101112131415
|
// 初始化配置文件ConfigFactory.initConfig()val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)conf.set("spark.streaming.stopGracefullyOnShutdown","true")conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate)conf.set("spark.default.parallelism","24")val sc = new SparkContext(conf)while (true){ val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ ) ssc.start() ssc.awaitTerminationOrTimeout(resetTime) ssc.stop(false,true)}
|
checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。
应用迁移或者程序升级
在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint,可能会有两个问题:
其实有时候,修改代码后不用删除checkpoint也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint,就会导致上一次未完成的任务和消费kafka的offset丢失,直接导致数据丢失,这种情况下我一般这么做。
5)日志
日志用的log4j2,本地保存一份,ERROR级别的日志会通过邮件发送到手机。
|
123
|
val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName) // 邮件level=error日志 val logger2 = LogManager.getLogger("email")
|
3、主要代码
pom.xml
文件:
略。。。
读取配置文件代码ConfigFactory .java
:
略。。。
组合起来。
Exactly Once消费
由于基于checkpoint的在程序升级时,可能出现反序列化错误,导致启动失败的情况,这里使用direct直拉kafka,保存结果和偏移量到mysql的一条数据行中,要么全部失败,要么全部成功,保证精确一次消费。
1package com.test 2 3import java.sql.{Connection, PreparedStatement, Statement} 4import java.text.SimpleDateFormat 5import java.util.{Calendar, Date} 6 7import com.js.ipflow.config.Configuration 8import org.apache.spark.{SparkConf, SparkContext} 9import org.apache.spark.streaming.{Seconds, StreamingContext} 10import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} 11import kafka.common.TopicAndPartition 12import kafka.message.MessageAndMetadata 13import kafka.serializer.StringDecoder 14import org.apache.logging.log4j.LogManager 15import org.apache.spark.rdd.RDD 16import org.apache.spark.streaming.dstream.InputDStream 17import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} 18 19import scala.collection.mutable 20import scala.collection.mutable.ArrayBuffer 21 22object Hm4HandlePlus { 23 24 val logger = LogManager.getLogger(Hm4Handle.getClass.getSimpleName) 25 // 邮件level=error日志 26 val logger2 = LogManager.getLogger("email") 27 28 import collection.JavaConverters._ 29 30 // 读取配置 31 val config: mutable.Map[String, String] = Configuration.initConfig("commons.xml").asScala 32 33 def main(args: Array[String]): Unit = { 34 run(args(0)) 35 } 36 37 def run(consumeRate: String): Unit = { 38 val sparkConf = new SparkConf().setAppName(config.get("spark-streamname").getOrElse("Hm4Handle")) 39 // .setMaster("local[*]") 40 sparkConf.set("spark.streaming.kafka.maxRatePerPartition", consumeRate) 41 sparkConf.set("spark.default.parallelism", "36") 42 43 val sc = new SparkContext(sparkConf) 44 45 // StreamingContext按天滚动 46 while (true) { 47 val ssc = new StreamingContext(sc, Seconds(config.get("spark-seconds").getOrElse("5").toLong)) 48 49 val topics = config.get("kafka-topic").getOrElse("test") 50 val topicSet = Set(topics) 51 val kafkaParams: Map[String, String] = Map[String, String]( 52 "metadata.broker.list" -> config.get("kafka-ipport").getOrElse("0.0.0.0:2181") 53 , "group.id" -> config.get("kafka-groupid").getOrElse("test") 54 , "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString 55 // "enable.auto.commit" -> "false" 56 ) 57 58 val table_name= "table_name" 59 val tpMap = getLastOffsets(hm4_realtime_mini_count) 60 61 var messages: InputDStream[(String, String)] = null 62 if (tpMap.nonEmpty) { 63 messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( 64 ssc 65 , kafkaParams 66 , tpMap.toMap 67 , (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) 68 ) 69 } else { 70 71 messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( 72 ssc 73 , kafkaParams 74 , topicSet 75 ) 76 } 77 78 // (date, hour, event, version, _type, value, opentype, times, vertype) 79 messages.foreachRDD(rdd => { 80 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 81 val offset = offsetRanges2Json(offsetRanges).toString 82 83 // 当前rdd聚合结果 84 val currentRdd = FilterHm4.getMiniRDD(rdd 85 .filter(x => { 86 try { 87 JSON.parseObject(x._2) 88 true 89 } catch { 90 case e: Exception => { 91 logger.error("错误数据 => " + x) 92 logger.error(e.getMessage) 93 } 94 false 95 } 96 }).repartition(36) 97 .map(x => { 98 JSON.parseObject(x._2) 99 })).map({ case (date, hour, event, version, _type, value, opentype, times, vertype) => {100 (date + formatKey(hour) + formatKey(event) + formatKey(version) + formatKey(_type) + formatKey(value)101 + formatKey(opentype) + formatKey(times) + formatKey(vertype), 1L)102 }103 }).reduceByKey((x, y) => x + y).repartition(2)104105 // 保存到mysql106 insertMysql(currentRdd, table_name, offset)107108 })109110 ssc.start()111 ssc.awaitTerminationOrTimeout(resetTime) // 距离第二天0点,ssc销毁时间112 ssc.stop(false, true)113 }114 }115116 def offsetRanges2Json(arr: Array[OffsetRange]): JSONArray = {117 val jSONArray = new JSONArray()118 arr.foreach(offsetRange => {119 val jsonObject = new JSONObject()120 jsonObject.put("partition", offsetRange.partition)121 jsonObject.put("fromOffset", offsetRange.fromOffset)122 jsonObject.put("untilOffset", offsetRange.untilOffset)123 jsonObject.put("topic", offsetRange.topic)124125 jSONArray.add(jsonObject)126 })127128 jSONArray129 }130131 /**132 * 从mysql查询offset133 *134 * @param tbName135 * @return136 */137 def getLastOffsets(tbName: String): mutable.HashMap[TopicAndPartition, Long] = {138 val sql = s"select offset from ${tbName} where id = (select max(id) from ${tbName})"139 val conn = MysqlPool.getConnection(config)140 val psts = conn.prepareStatement(sql)141 val res = psts.executeQuery()142 var tpMap: mutable.HashMap[TopicAndPartition, Long] = mutable.HashMap[TopicAndPartition, Long]()143 while (res.next()) {144 val o = res.getString(1)145 val jSONArray = JSON.parseArray(o)146 jSONArray.toArray().foreach(offset => {147 val json = JSON.parseObject(offset.toString)148 val topicAndPartition = TopicAndPartition(json.getString("topic"), json.getInteger("partition"))149 tpMap.put(topicAndPartition, json.getLong("untilOffset"))150 })151 }152 MysqlPool.closeCon(res, psts, conn)153 tpMap154155 }156}
Flink统计 pv
我们需要统计不同数据类型每天的pv,uv情况,并且有如下要求.
-
每秒钟要输出最新的统计结果
-
程序永远跑着不会停,所以要定期清理内存里的过时数据
-
收到的消息里的时间字段并不是按照顺序严格递增的,所以要有一定的容错机制
-
访问uv并不一定每秒钟都会变化,重复输出对IO是巨大的浪费,所以要在uv变更时在一秒内输出结果,未变更时不输出
flink数据流上的类型和操作
DataStream是flink流处理最核心的数据结构,其它的各种流都可以直接或者间接通过DataStream来完成相互转换,一些常用的流直接的转换关系如图:
可以看出,DataStream可以与KeyedStream相互转换,KeyedStream可以转换为WindowedStream,DataStream不能直接转换为WindowedStream,WindowedStream可以直接转换为DataStream。各种流之间虽然不能相互直接转换,但是都可以通过先转换为DataStream,再转换为其它流的方法来实现。
在这个计算pv,uv的需求中就主要用到DataStream、KeyedStream以及WindowedStream这些数据结构。
这里需要用到window和watermark,使用窗口把数据按天分割,使用watermark可以通过“水位”来定期清理窗口外的迟到数据,起到清理内存的作用。
业务代码
我们的数据是json类型的,含有date,helperversion,guid这3个字段,在实时统计pv,uv这个功能中,其它字段可以直接丢掉,当然了在离线数据仓库中,所有有含义的业务字段都是要保留到hive当中的。
其它相关概念就不说了,会专门介绍,这里直接上代码吧。
由于包含了很多其它的非flink的依赖,可以选择flink的依赖,减少下载依赖的时间。
主要代码,主要使用scala开发:
import java.util.Propertiesimport com.alibaba.fastjson.JSONimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTriggerimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010import org.apache.flink.streaming.util.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.extensions._import org.apache.flink.api.scala._object PvUvCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 容错 env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend("file:///D:/space/IJ/bigdata/src/main/scala/com/ddxygq/bigdata/flink/checkpoint/flink/tagApp")) // kafka 配置 val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKERS = "localhost:9092" val TRANSACTION_GROUP = "flink-helper-label-count" val TOPIC_NAME = "test" val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) // watrmark 允许数据延迟时间 val MaxOutOfOrderness = 86400 * 1000L // 消费kafka数据 val streamData: DataStream[(String, String, String)] = env.addSource( new FlinkKafkaConsumer010[String](TOPIC_NAME, new SimpleStringSchema(), kafkaProps) ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(MaxOutOfOrderness)) { override def extractTimestamp(element: String): Long = { val t = JSON.parseObject(element) val time = JSON.parseObject(JSON.parseObject(t.getString("message")).getString("data")).getString("time") time.toLong } }).map(x => { var date = "error" var guid = "error" var helperversion = "error" try { val messageJsonObject = JSON.parseObject(JSON.parseObject(x).getString("message")) val datetime = messageJsonObject.getString("time") date = datetime.split(" ")(0) // hour = datetime.split(" ")(1).substring(0, 2) val data_string = messageJsonObject.getString("data") if (!"".equals(data_string)) { val data = JSON.parseObject(data_string) guid = data.getString("guid").trim helperversion = data.getString("helperversion") } } catch { case e: Exception => { println(e) } } (date, helperversion, guid) }) // 这上面是设置watermark并解析json部分 // 聚合窗口中的数据,可以研究下applyWith这个方法和OnWindowedStream这个类 val resultStream = streamData.keyBy(x => { x._1 + x._2 }).timeWindow(Time.days(1)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .applyWith(("", List.empty[Int], Set.empty[Int], 0L, 0L))( foldFunction = { case ((_, list, set, _, 0), item) => { val date = item._1 val helperversion = item._2 val guid = item._3 (date + "_" + helperversion, guid.hashCode +: list, set + guid.hashCode, 0L, 0L) } } , windowFunction = { case (key, window, result) => { result.map { case (leixing, list, set, _, _) => { (leixing, list.size, set.size, window.getStart, window.getEnd) } } } } ).keyBy(0) .flatMapWithState[(String, Int, Int, Long, Long),(Int, Int)]{ case ((key, numpv, numuv, begin, end), curr) => curr match { case Some(numCurr) if numCurr == (numuv, numpv) => (Seq.empty, Some((numuv, numpv))) //如果之前已经有相同的数据,则返回空结果 case _ => (Seq((key, numpv, numuv, begin, end)), Some((numuv, numpv))) } } // 最终结果 val resultedStream = resultStream.map(x => { val keys = x._1.split("_") val date = keys(0) val helperversion = keys(1) (date, helperversion, x._2, x._3) }) resultedStream.print() env.execute("PvUvCount") } }
参考资料
https://flink.sojb.cn/dev/event_time.html
http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/
https://segmentfault.com/a/1190000006235690
猜你喜欢
本文分享自微信公众号 - Java大数据与数据仓库(ikeguang2)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。