应用一:kafka数据同步到kudu

1 准备kafka topic

# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -create --topic test_sync --partitions 2 --replication-factor 2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_sync".
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -describe --topic test_sync
Topic:test_sync PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: test_sync        Partition: 0    Leader: 112     Replicas: 112,111       Isr: 112,111
        Topic: test_sync        Partition: 1    Leader: 110     Replicas: 110,112       Isr: 110,112

2 准备kudu表

impala-shell

CREATE TABLE test.test_sync (
id int,
name string,
description string,
create_time timestamp,
update_time timestamp,
primary key (id)
)
PARTITION BY HASH (id) PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='$kudu_master:7051');

3 准备flume kudu支持

3.1 下载jar

# wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-flume-sink/1.7.0-cdh5.16.1/kudu-flume-sink-1.7.0-cdh5.16.1.jar
# mv kudu-flume-sink-1.7.0-cdh5.16.1.jar $FLUME_HOME/lib/

# wget http://central.maven.org/maven2/org/json/json/20160810/json-20160810.jar
# mv json-20160810.jar $FLUME_HOME/lib/

3.2 开发

代码库:https://github.com/apache/kudu/tree/master/java/kudu-flume-sink

kudu-flume-sink默认使用的producer是

org.apache.kudu.flume.sink.SimpleKuduOperationsProducer

public List<Operation> getOperations(Event event) throws FlumeException {
    try {
      Insert insert = table.newInsert();
      PartialRow row = insert.getRow();
      row.addBinary(payloadColumn, event.getBody());

      return Collections.singletonList((Operation) insert);
    } catch (Exception e) {
      throw new FlumeException("Failed to create Kudu Insert object", e);
    }
  }

是将消息直接存放到一个payload列中

如果想要支持json格式数据,需要二次开发

package com.cloudera.kudu;
public class JsonKuduOperationsProducer implements KuduOperationsProducer {

代码详见:https://www.cnblogs.com/barneywill/p/10573221.html

打包放到$FLUME_HOME/lib下

4 准备flume conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.0.1:9092
a1.sources.r1.kafka.topics = test_sync
a1.sources.r1.kafka.consumer.group.id = flume-consumer

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink
a1.sinks.k1.producer = com.cloudera.kudu.JsonKuduOperationsProducer
a1.sinks.k1.masterAddresses = 192.168.0.1:7051
a1.sinks.k1.tableName = impala::test.test_sync
a1.sinks.k1.batchSize = 50

5 启动flume

bin/flume-ng agent --conf conf --conf-file conf/order.properties --name a1

6 kudu确认

impala-shell

select * from test_sync limit 10;

参考:https://kudu.apache.org/2016/08/31/intro-flume-kudu-sink.html

03-13 22:22