文章目录
08:离线分析:Hbase表设计及构建
-
目标:掌握Hbase表的设计及创建表的实现
-
路径
- step1:基础设计
- step2:Rowkey设计
- step3:分区设计
- step4:建表
-
实施
-
基础设计
-
Namespace:MOMO_CHAT
-
Table:MOMO_MSG
-
Family:C1
-
Qualifier:与数据中字段名保持一致
-
-
Rowkey设计
-
查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录
- 发件人账号
- 收件人账号
- 时间
-
设计规则:业务、唯一、长度、散列、组合
-
设计实现
- 加盐方案:CRC、Hash、MD5、MUR
- => 8位、16位、32位
MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间
-
-
分区设计
- Rowkey前缀:MD5编码,由字母和数字构成
- 数据并发量:高
- 分区设计:使用HexSplit16进制划分多个分区
-
建表
- 启动Hbase:start-hbase.sh
- 进入客户端:hbase shell
#创建NS create_namespace 'MOMO_CHAT' #建表 create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
-
-
小结
- 掌握Hbase表的设计及创建表的实现
09:离线分析:Kafka消费者构建
-
目标:实现离线消费者的开发
-
路径
-
整体实现的路径
//入口:调用实现消费Kafka,将数据写入Hbase public void main(){ //step1:消费Kafka consumerKafka(); } //用于消费Kafka数据 public void consumerKafka(){ prop = new Properties() KafkaConsumer consumer = new KafkaConsumer(prop) consumer.subscribe("MOMO_MSG") ConsumerRecords records = consumer.poll //基于每个分区来消费和处理 record :Topic、Partition、Offset、Key、Value //step2:写入Hbase writeToHbase(value) //提交这个分区的offset commitSycn(offset+1) } //用于将value的数据写入Hbase方法 public void writeToHbase(){ //step1:构建连接 //step2:构建Table对象 //step3:构建Put对象 //获取rowkey rowkey = getRowkey(value) Put put = new Put(rowkey) put.添加每一列 table.put() } public String getRowkey(){ value.getSender value.getReceiver value.getTime rowkey = MD5+sender+receiverId +time return rowkey }
-
-
实施
/** * 用于消费Kafka的数据,将合法数据写入Hbase */ private static void consumerKafkaToHbase() throws Exception { //构建配置对象 Properties props = new Properties(); //指定服务端地址 props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); //指定消费者组的id props.setProperty("group.id", "momo"); //关闭自动提交 props.setProperty("enable.auto.commit", "false"); //指定K和V反序列化的类型 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //构建消费者的连接 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //指定订阅哪些Topic consumer.subscribe(Arrays.asList("MOMO_MSG")); //持续拉取数据 while (true) { //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间 //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); //todo:3-处理拉取到的数据:打印 //取出每个分区的数据进行处理 Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区 //对每个分区的数据做处理 for (TopicPartition partition : partitions) { List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据 //处理这个分区的数据 long offset = 0; for (ConsumerRecord<String, String> record : partRecords) { //获取Topic String topic = record.topic(); //获取分区 int part = record.partition(); //获取offset offset = record.offset(); //获取Key String key = record.key(); //获取Value String value = record.value(); System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value); //将Value数据写入Hbase if(value != null && !"".equals(value) && value.split("\001").length == 20 ){ writeToHbase(value); } } //手动提交分区的commit offset Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)); consumer.commitSync(offsets); } } }
-
小结
- 实现离线消费者的开发
10:离线分析:Hbase连接构建
-
目标:实现Hbase连接的构建
-
实施
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static Connection conn; private static Table table; private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名 private static byte[] family = Bytes.toBytes("C1");//列族 // 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能 static{ try { //构建配置对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); //构建连接 conn = ConnectionFactory.createConnection(conf); //获取表对象 table = conn.getTable(tableName); } catch (IOException e) { e.printStackTrace(); } }
-
小结
- 实现Hbase连接的构建
11:离线分析:Rowkey的构建
-
目标:实现Rowkey的构建
-
实施
private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception { //转换时间戳 long time = format.parse(stime).getTime(); String suffix = sender_accounter+"_"+receiver_accounter+"_"+time; //构建MD5 String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8); //合并返回 return prefix+"_"+suffix; }
-
小结
- 实现Rowkey的构建
12:离线分析:Put数据列构建
-
目标:实现Put数据列的构建
-
实施
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
-
小结
- 实现Put数据列的构建
13:离线分析:存储运行测试
-
目标:测试运行消费Kafka数据动态写入Hbase
-
实施
-
启动消费者程序
-
启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 10
-
观察Hbase结果
-
-
小结
- 测试运行消费Kafka数据动态写入Hbase
14:离线分析:Hive关联测试
-
目标:使用Hive关联Hbase实现离线分析
-
路径
- step1:关联
- step2:查询
-
实施
-
启动Hive和yarn
start-yarn.sh hive-daemon.sh metastore hive-daemon.sh hiveserver2 start-beeline.sh
-
关联
create database MOMO_CHAT; use MOMO_CHAT; create external table if not exists MOMO_CHAT.MOMO_MSG ( id string, msg_time string , sender_nickyname string , sender_account string , sender_sex string , sender_ip string , sender_os string , sender_phone_type string , sender_network string , sender_gps string , receiver_nickyname string , receiver_ip string , receiver_account string , receiver_os string , receiver_phone_type string , receiver_network string , receiver_gps string , receiver_sex string , msg_type string , distance string , message string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type, C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account, C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex, C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
-
分析查询
--基础查询 select msg_time,sender_nickyname,receiver_nickyname,distance from momo_msg limit 10; --查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000 select * from momo_msg where sender_account='13280256412' and receiver_account='15260978785' and substr(msg_time,0,10) = '2021-09-29'; --统计每个小时的消息数 select substr(msg_time,0,13) as hour, count(*) as cnt from momo_msg group by substr(msg_time,0,13);
-
-
小结
- 使用Hive关联Hbase实现离线分析
15:离线分析:Phoenix关联测试
-
目标:使用Phoenix关联Hbase实现即时查询
-
路径
- step1:关联
- step2:查询
-
实施
-
启动
cd /export/server/phoenix-5.0.0-HBase-2.0-bin/ bin/sqlline.py node1:2181
-
关联
create view if not exists MOMO_CHAT.MOMO_MSG ( "id" varchar primary key, C1."msg_time" varchar , C1."sender_nickyname" varchar , C1."sender_account" varchar , C1."sender_sex" varchar , C1."sender_ip" varchar , C1."sender_os" varchar , C1."sender_phone_type" varchar , C1."sender_network" varchar , C1."sender_gps" varchar , C1."receiver_nickyname" varchar , C1."receiver_ip" varchar , C1."receiver_account" varchar , C1."receiver_os" varchar , C1."receiver_phone_type" varchar , C1."receiver_network" varchar , C1."receiver_gps" varchar , C1."receiver_sex" varchar , C1."msg_type" varchar , C1."distance" varchar , C1."message" varchar );
-
即时查询
--基础查询 select "id",c1."sender_account",c1."receiver_account" from momo_chat.momo_msg limit 10; --查询每个发送人发送的消息数 select c1."sender_account" , count(*) as cnt from momo_chat.momo_msg group by c1."sender_account"; --查询每个发送人聊天的人数 select c1."sender_account" , count(distinct c1."receiver_account") as cnt from momo_chat.momo_msg group by c1."sender_account" order by cnt desc;
-
-
小结
- 使用Phoenix关联Hbase实现即时查询