16:实时计算需求及技术方案

  • 目标了解实时计算需求及技术方案

  • 路径

    • step1:实时计算需求
    • step2:技术方案
  • 实施

    • 实时计算需求

      • 实时统计消息总量

        select count(*) from tbname;
        
      • 实时统计各个地区发送消息的总量

        select sender_area,count(*) from tbname group by sender_area;
        
      • 实时统计各个地区接收消息的总量

        select receiver_area,count(*) from tbname group by receiver_area;
        
      • 实时统计每个用户发送消息的总量

        select sender_account,count(*) from tbname group by sender_account;
        
      • 实时统计每个用户接收消息的总量

        select receiver_account,count(*) from tbname group by receiver_account;
        
      • |

      • 构建实时统计报表

    • 技术方案

      • 实时采集:Flume
      • 实时存储:Kafka
      • 实时计算:Flink
      • 实时结果:MySQL / Redis
      • 实时报表:FineBI / JavaWeb可视化

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

  • 小结

    • 了解实时计算需求及技术选型

17:Flink的基本介绍

  • 目标了解Flink的功能、特点及应用场景

  • 路径

    • step1:功能
    • step2:特点
    • step3:应用
  • 实施

    基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

    Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
    
    • 功能:可以基于任何普通的集群平台,对有界的数据流或者无界的数据流实现高性能的有状态的分布式实时计算

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

      • Flink DataSet:对有界数据进行批处理操作
      • Flink DataStream:对无界数据进行实时处理操作
      • Flink Table:基于DSL实现结构化数据处理
      • Flink SQL:基于SQL实现结构化数据处理
      • Flink Gelly:Flink的图计算库
      • Flink ML:Flink的机器学习库
    • 特点

      • 支持高吞吐、低延迟、高性能的流处理
      • 支持带有事件时间的窗口(Window)操作
      • 支持有状态计算的Exactly-once语义
      • 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
      • 支持具有Backpressure功能的持续流模型
      • 支持基于轻量级分布式快照(Snapshot)实现的容错
      • 一个运行时同时支持Batch on Streaming处理和Streaming处理
      • Flink在JVM内部实现了自己的内存管理
      • 支持迭代计算
      • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
    • 应用:所有实时及离线数据计算场景

      • 数据分析应用:实时数据仓库
        • Batch analytics(批处理分析)
        • Streaming analytics(流处理分析)
      • 事件驱动类应用
        • 欺诈检测(Fraud detection)
        • 异常检测(Anomaly detection)
        • 基于规则的告警(Rule-based alerting)
        • 业务流程监控(Business process monitoring)
        • Web应用程序(社交网络)
      • 数据管道ETL
        • Periodic ETL和Data Pipeline
      • 公司
        • 阿里:https://developer.aliyun.com/article/72242
        • 腾讯:https://mp.weixin.qq.com/s/tyq6ZdwsgiuXYGi-VR_6KA
        • 美团:https://tech.meituan.com/2021/08/26/data-warehouse-in-meituan-waimai.html
        • 有赞:https://tech.youzan.com/flink-practice/
        • Oppo、爱奇艺、唯品会……
  • 小结

    • 了解Flink的功能、特点及应用场景

18:代码模块构建

  • 目标实现开发环境的代码模块构建

  • 实施

    • 导入包中代码到IDEA中

      基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

    • flink包:应用类包,用于存放实际的应用类

      • MoMoFlinkCount:用于实现对每个需求的统计计算

      • MySQLSink:用于将计算的结果写入MySQL

    • pojo包:实体类包,用于存放所有实体类

      • MoMoCountBean:用于封装统计分析的结果

        private Integer id ;				//结果id
        private Long moMoTotalCount ;		//总消息数
        private String moMoProvince ;       //省份
        private String moMoUsername ;       //用户
        private Long moMo_MsgCount ;        //消息数
        //结果类型:1-总消息数 2-各省份发送消息数 3-各省份接受消息数 4-每个用户发送消息数 5-每个用户接受消息数
        private String groupType ;          
        
    • utils包:工具类包,用于存放所有工具类

      • HttpClientUtils:用于实现将经纬度地址解析为省份的工具类

        public static String findByLatAndLng(String lat , String lng)
        
        • 参数:经度,维度
        • 返回值:省份
  • 小结

    • 实现开发环境的代码模块构建

19:省份解析工具类测试

  • 目标:了解省份解析的实现

  • 路径

    • step1:基本设计
    • step2:注册百度开发者
    • step3:测试省份解析
  • 实施

    • 基本设计

      • 业务场景:根据IP或者经纬度解析得到用户的国家、省份、城市信息
      • 方案一:离线解析库【本地解析,快,不精准】
      • 方案二:在线解析库【远程解析,并发限制,精准】
  • 注册百度开发者

    • 百度地图开放平台:https://lbsyun.baidu.com/

    • 逆地理编码:https://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding-abroad

        https://api.map.baidu.com/reverse_geocoding/v3/?ak=您的ak&output=json&coordtype=wgs84ll&location=31.225696563611,121.49884033194  //GET请求
      
    • 注册开放平台,获取AK码:参考《附录三》

  • 测试省份解析

    • 注意:将代码中的AK码更改为自己的
      package bigdata.itcast.cn.momo.online.utils;
      
      import com.alibaba.fastjson.JSONObject;
      import org.apache.http.HttpEntity;
      import org.apache.http.client.methods.CloseableHttpResponse;
      import org.apache.http.client.methods.HttpGet;
      import org.apache.http.impl.client.CloseableHttpClient;
      import org.apache.http.impl.client.HttpClients;
      import org.apache.http.util.EntityUtils;
      
      import javax.swing.text.html.parser.Entity;
      import java.io.IOException;
      import java.util.Map;
      
      public class HttpClientUtils {
      
          //传入经纬度, 返回查询的地区
          public static String findByLatAndLng(String lat , String lng){
              try {
                  CloseableHttpClient httpClient = HttpClients.createDefault();
                  String url = "http://api.map.baidu.com/reverse_geocoding/v3/?ak=l8hKKRCuX2zrRa93jneDrPmc2UspGatO&output=json&coordtype=wgs84ll&location="+lat+","+lng;
                  System.out.println(url);
                  //请求解析
                  HttpGet httpGet = new HttpGet(url);
                  //得到结果
                  CloseableHttpResponse response = httpClient.execute(httpGet);
                  //获取数据
                  HttpEntity httpEntity = response.getEntity();
                  //转换成JSON
                  String json = EntityUtils.toString(httpEntity);
                  //从JSON中返回省份
                  Map<String,Object> result = JSONObject.parseObject(json, Map.class);
                  if(result.get("status").equals(0)){
                      Map<String,Object> resultMap =  (Map<String,Object>)result.get("result");
                      resultMap = (Map<String, Object>) resultMap.get("addressComponent");
                      String province = (String) resultMap.get("province");
                      return province;
                  }
      
              } catch (IOException e) {
                  e.printStackTrace();
              }
              return null;
          }
      
          public static void main(String[] args) {
              //测试
              String sf = findByLatAndLng("43.921297","124.655376");
              System.out.println(sf);
      
          }
      
      }
      
    

    基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

  • 小结

    • 了解省份解析的实现

20:Flink代码解读

  • 目标了解Flink代码的基本实现

  • 路径

    • step1:消费Kafka
    • step2:实时统计分析
    • step3:实时更新结果到MySQL
  • 实施

    • 消费Kafka

      //构建Kafka配置
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
      props.setProperty("group.id", "momo2");
      //构建消费者
      FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new SimpleStringSchema(),props);
      //Flink加载消费者
      DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
      
    • 实时统计分析

      //todo:3. 进行转换统计操作:
      //3.1: 统计总消息量
      countTotalMsg(streamSource);
      //3.2: 基于经纬度, 统计各省份发送消息量
      countProvinceSenderMsg(streamSource);
      //3.3: 基于经纬度, 统计各省份接收消息量
      countProvinceReceiverMsg(streamSource);
      //3.4: 统计各个用户, 发送消息量
      countUserNameSenderMsg(streamSource);
      //3.5: 统计各个用户, 接收消息量
      countUserNameReceiverMsg(streamSource);
      //5. 执行flink操作
      env.execute("momoFlinkCount");
      
    • 实时更新结果到MySQL

      streamOperator.addSink(new MysqlSink("2"));
      
      if (status.equals("2")){
          String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
          ResultSet resultSet = stat.executeQuery(sql);
          boolean flag = resultSet.next();
          if(flag) {
              sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
          }else {
              sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'2') ";
          }
          stat.executeUpdate(sql);
      }
      
  • 小结

    • 了解Flink代码的基本实现

21:Flink实时计算测试

  • 目标实现Flink实时分析测试

  • 路径

    • step1:MySQL准备
    • step2:运行测试
  • 实施

    • MySQL准备

      • 找到SQL文件

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

      • 运行SQL文件创建结果数据库、表

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

    • 运行测试

      • 启动Flink程序:运行MoMoFlinkCount

      • 启动Flume程序

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
        
      • 启动模拟数据

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        100
        
      • 观察MySQL结果

        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案-LMLPHP

  • 小结

    • 实现Flink实时分析测试
10-20 09:03