一、rt-service
运行环境:hadoop2.4.1、hbase 0.98.16-hadoop2 、spark1.4.0 详细流程: 1.spark-submit提交任务到yarn 集群 2.ResourceMaster收到提交到的任务后分配资源,创建ApplicationMaster 3.Executor中的任务起动执行,向zookeeper读取消费kafkaoffset 4.拿到offset后,向kafka集群取数据 5.处理数据 6.处理结果保存至HBase 7.处理成功后,提交offset至zookeeper 关于spark onyarn执行流程: 代码部分比较简单,SparkStreaming是程序入口,根据传入的topic,调用相就的解析器,如NewTvadpv等,都是与topic名称相符 ,解析器会按不同日志规则解析相应的字段,并入HBase。SparkStreaming中配置spark任务参数,如kafka broker list, zookeeper ,及提交offset。 为了方便日后维护,避免数据不连续,需要控制客户端消费kafka offset,所以引入了KafkaManager,并且调用高效的底层api directStream。KafkaManager封装了一层KafkaUtils.createDirectStream方法,在拉取数据前,会设置此topic、groupId 在zookeeper上保存的各分区的消费offset位置。如果此groupid,之前没有消费过,则从最新的数据开始。如果有消费过,且此offset大于kakfa现有最小offset,此从此位置开始消费,否则从kafka中,现有最小的offset开始消费。 在10.16.34.80 运行/opt/bd-warehouse/spark/run.sh 脚本提交8个topic任务 单个任务提交可用如下命令: topic="tvadfpv" maxRate=1000 base_home="/opt/bd-warehouse/spark" nohup /usr/lib/spark/bin/spark-submit--master yarn-cluster --principal=bd-warehouse/[email protected]=/home/bd-warehouse/bd-warehouse.keytab --confspark.hadoop.fs.hdfs.impl.disable.cache=true --queue datacenter --classcom.sohu.spark.SparkStreaming --driver-memory 4g --executor-memory 4G--total-executor-cores 16 ${base_home}/libs/rt-service-1.0-SNAPSHOT.jar ${topic}${maxRate} >> ${base_home}/logs/${topic}.log 2>&1 & 说明: --master yarn-cluster 以yarn cluster模式提交,在yarn集群上运行 由于运行的是sparkstreaming,相当于yarn的长任务,所以,需要配置以下三项参数。否则,任务运行7天,TOCKEN超期,导致任务失败。 --principal=bd-warehouse/[email protected] --keytab=/home/bd-warehouse/bd-warehouse.keytab --confspark.hadoop.fs.hdfs.impl.disable.cache=true tvadfpv 是程序参数,指从kafka读取数据的topic 5000 设置spark.streaming.kafka.maxRatePerPartition 值,每秒每个partition可处理记录数 tvadfpv为一个分区,且是pv的,量比较大,所以此时值设置为5000,其它topic,如newtvadpv量最大,且为18个分区,值设置为 1500以下,太大会影响处理时间,导致处理延迟越来越严重。 由于在前期上线,任务经常有跑死的现象,所以,监控是必不可少了。Spark Streaming在Yarn上是一个长任务,需要一直运行。目前的监控方法是,使用YARN的API,获取application的状态信息,查看任务是否在Running状态,如果不是,则发出报警。 部分重要代码片断及说明如下: 调用方法如下: nohup hadoop jar /opt/bd-warehouse/spark/libs/rt-service-1.0-SNAPSHOT-jar-with-dependencies.jaryarn.YarnAplicationMonitor ${topic} ${appid} >>/opt/bd-warehouse/spark/ monitor/logs/${topic}-monitor.log 2>&1 & topic名称 appid 类似application_1428668992862_7651596 为应用任务id,可以日志中查看到 数据存入HBase 表BD_WH:REAL_TIME_DATA, 列簇名为:cf 如下: get'BD_WH:REAL_TIME_DATA','20160312','cf:click' 获取'20160312' 当天的点击数 get 'BD_WH:REAL_TIME_DATA','20160312','cf:exec' 获取'20160312' 当天的完成数 scan 'BD_WH:REAL_TIME_DATA',{FILTER=>"PrefixFilter('331300|201603281223')"} 获取排期包331300 在201603281223分点击完成: 其中,cf:click-30403 为点击,cf:exec-30403为完成量,30403为平台编码。二、程序结构:
三、部署上线
四、任务监控
五、结果数据结构