04:数据源

  • 目标了解数据源的格式及实现模拟数据的生成

  • 路径

    • step1:数据格式
    • step2:数据生成
  • 实施

    • 数据格式

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源-LMLPHP

    • 数据生成

      • 创建原始文件目录

        mkdir /export/data/momo_init
        
      • 上传模拟数据程序

        cd /export/data/momo_init
        rz
        

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源-LMLPHP

      • 创建模拟数据目录

        mkdir /export/data/momo_data
        
      • 运行程序生成数据

        • 语法

          java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
          
        • 测试:每500ms生成一条数据

          java -jar /export/data/momo_init/MoMo_DataGen.jar \
          /export/data/momo_init/MoMo_Data.xlsx \
          /export/data/momo_data/ \
          500
          
        • 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源-LMLPHP

  • 小结

    • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型

  • 路径

    • step1:需求分析
    • step2:技术选型
    • step3:技术架构
  • 实施

    • 需求分析

      • 离线存储计算
        • 提供离线T + 1的统计分析
        • 提供离线数据的即时查询
      • 实时存储计算
        • 提供实时统计分析
    • 技术选型

      • 离线
        • 数据采集:Flume
        • 离线存储:Hbase
        • 离线分析:Hive:复杂计算
        • 即时查询:Phoenix:高效查询
      • 实时
        • 数据采集:Flume
        • 实时存储:Kafka
        • 实时计算:Flink
        • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
    • 技术架构

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源-LMLPHP

      • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
        • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
        • 保证数据一致性
  • 小结

    • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

  • 目标回顾Flume基本使用及实现Flume的安装测试

  • 路径

    • step1:Flume回顾
    • step2:Flume的安装
    • step3:Flume的测试
  • 实施

    • Flume的回顾

      • 功能:实时对文件或者网络端口进行数据流监听采集
      • 场景:文件实时采集
      • 开发
        • step1:先开发一个配置文件:properties【K=V】
        • step2:运行这个文件即可
      • 组成
        • Agent:一个Agent就是一个Flume程序
        • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
        • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
        • Sink:负责从Channel拉取数据写入目标地
        • Event:代表一条数据对象
          • head:Map集合[KV]
          • body:byte[]
    • Flume的安装

      • 上传安装包

        cd /export/software/
        rz
        

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源-LMLPHP

      • 解压安装

        tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
        cd /export/server
        mv apache-flume-1.9.0-bin flume-1.9.0-bin
        
      • 修改配置

        #集成HDFS,拷贝HDFS配置文件
        cd /export/server/flume-1.9.0-bin
        cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
        #修改Flume环境变量
        cd /export/server/flume-1.9.0-bin/conf/
        mv flume-env.sh.template flume-env.sh
        vim flume-env.sh 
        
        #修改22行
        export JAVA_HOME=/export/server/jdk1.8.0_65
        #修改34行
        export HADOOP_HOME=/export/server/hadoop-3.3.0
        
      • 删除Flume自带的guava包,替换成Hadoop的

        cd /export/server/flume-1.9.0-bin 
        rm -rf lib/guava-11.0.2.jar
        cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
        
      • 创建目录

        cd /export/server/flume-1.9.0-bin
        #程序配置文件存储目录
        mkdir usercase
        #Taildir元数据存储目录
        mkdir position
        
    • Flume的测试

      • 需求:采集聊天数据,写入HDFS

      • 分析

        • Source:taildir:动态监听多个文件实现实时数据采集
        • Channel:mem:将数据缓存在内存
        • Sink:hdfs
      • 开发

        vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
        
        # define a1
        a1.sources = s1 
        a1.channels = c1
        a1.sinks = k1
        
        #define s1
        a1.sources.s1.type = TAILDIR
        #指定一个元数据记录文件
        a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
        #将所有需要监控的数据源变成一个组
        a1.sources.s1.filegroups = f1
        #指定了f1是谁:监控目录下所有文件
        a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
        #指定f1采集到的数据的header中包含一个KV对
        a1.sources.s1.headers.f1.type = momo
        a1.sources.s1.fileHeader = true
        
        #define c1
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 1000
        
        #define k1
        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
        a1.sinks.k1.hdfs.fileType = DataStream
        #指定按照时间生成文件,一般关闭
        a1.sinks.k1.hdfs.rollInterval = 0
        #指定文件大小生成文件,一般120 ~ 125M对应的字节数
        a1.sinks.k1.hdfs.rollSize = 102400
        #指定event个数生成文件,一般关闭
        a1.sinks.k1.hdfs.rollCount = 0
        a1.sinks.k1.hdfs.filePrefix = momo
        a1.sinks.k1.hdfs.fileSuffix = .log
        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        
        #bound
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      • 启动HDFS

        start-dfs.sh
        
      • 运行Flume

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
        
      • 运行模拟数据

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        100
        
      • 查看结果

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源-LMLPHP

  • 小结

    • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

  • 目标实现案例Flume采集程序的开发

  • 路径

    • step1:需求分析
    • step2:程序开发
    • step3:测试实现
  • 实施

    • 需求分析

      • 需求:采集聊天数据,实时写入Kafka

      • Source:taildir

      • Channel:mem

      • Sink:Kafka sink

        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
        a1.sinks.k1.kafka.producer.acks = 1
        a1.sinks.k1.kafka.topic = mytopic
        a1.sinks.k1.kafka.flumeBatchSize = 20
        a1.sinks.k1.kafka.producer.linger.ms = 1
        a1.sinks.k1.kafka.producer.compression.type = snappy
        
    • 程序开发

      vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
      
      # define a1
      a1.sources = s1 
      a1.channels = c1
      a1.sinks = k1
      
      #define s1
      a1.sources.s1.type = TAILDIR
      #指定一个元数据记录文件
      a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
      #将所有需要监控的数据源变成一个组
      a1.sources.s1.filegroups = f1
      #指定了f1是谁:监控目录下所有文件
      a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
      #指定f1采集到的数据的header中包含一个KV对
      a1.sources.s1.headers.f1.type = momo
      a1.sources.s1.fileHeader = true
      
      #define c1
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 1000
      
      #define k1
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.kafka.topic = MOMO_MSG
      a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
      a1.sinks.k1.kafka.flumeBatchSize = 10
      a1.sinks.k1.kafka.producer.acks = 1
      a1.sinks.k1.kafka.producer.linger.ms = 100
      
      #bound
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    • 测试实现

      • 启动Kafka

        start-zk-all.sh
        start-kafka.sh 
        
      • 创建Topic

        kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • 列举

        kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • 启动消费者

        kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • 启动Flume程序

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
        
      • 启动模拟数据

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        50
        
      • 观察结果

  • 小结

    • 实现案例Flume采集程序的开发
10-13 18:26