应用一:kafka数据同步到kudu
1 准备kafka topic
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -create --topic test_sync --partitions 2 --replication-factor 2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_sync".
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -describe --topic test_sync
Topic:test_sync PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test_sync Partition: 0 Leader: 112 Replicas: 112,111 Isr: 112,111
Topic: test_sync Partition: 1 Leader: 110 Replicas: 110,112 Isr: 110,112
2 准备kudu表
impala-shell
CREATE TABLE test.test_sync (
id int,
name string,
description string,
create_time timestamp,
update_time timestamp,
primary key (id)
)
PARTITION BY HASH (id) PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='$kudu_master:7051');
3 准备flume kudu支持
3.1 下载jar
# wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-flume-sink/1.7.0-cdh5.16.1/kudu-flume-sink-1.7.0-cdh5.16.1.jar
# mv kudu-flume-sink-1.7.0-cdh5.16.1.jar $FLUME_HOME/lib/
# wget http://central.maven.org/maven2/org/json/json/20160810/json-20160810.jar
# mv json-20160810.jar $FLUME_HOME/lib/
3.2 开发
代码库:https://github.com/apache/kudu/tree/master/java/kudu-flume-sink
kudu-flume-sink默认使用的producer是
org.apache.kudu.flume.sink.SimpleKuduOperationsProducer
public List<Operation> getOperations(Event event) throws FlumeException {
try {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addBinary(payloadColumn, event.getBody());
return Collections.singletonList((Operation) insert);
} catch (Exception e) {
throw new FlumeException("Failed to create Kudu Insert object", e);
}
}
是将消息直接存放到一个payload列中
如果想要支持json格式数据,需要二次开发
package com.cloudera.kudu;
public class JsonKuduOperationsProducer implements KuduOperationsProducer {
代码详见:https://www.cnblogs.com/barneywill/p/10573221.html
打包放到$FLUME_HOME/lib下
4 准备flume conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.0.1:9092
a1.sources.r1.kafka.topics = test_sync
a1.sources.r1.kafka.consumer.group.id = flume-consumer
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink
a1.sinks.k1.producer = com.cloudera.kudu.JsonKuduOperationsProducer
a1.sinks.k1.masterAddresses = 192.168.0.1:7051
a1.sinks.k1.tableName = impala::test.test_sync
a1.sinks.k1.batchSize = 50
5 启动flume
bin/flume-ng agent --conf conf --conf-file conf/order.properties --name a1
6 kudu确认
impala-shell
select * from test_sync limit 10;
参考:https://kudu.apache.org/2016/08/31/intro-flume-kudu-sink.html