水槽版本:-1.6
Kafka版本:-1.0.0
Zookeeper版本:-3.4.10
我们很快就要求将Flume与Kafka和Hadoop连接,因此我们从Kafka Consumer中提取了事件,并将事件提取到了Hadoop中。一切都是使用conf文件配置的,直到这里一切都很好。
现在,我们需要检查是否可以使用自定义Java代码完成此操作。我尝试了互联网上提供的许多选项来设计kafka源和HDFS接收器。我在cloudera VM中尝试了此操作。
Kafka和zookeeper都已启动并正在运行。
代码正在运行,但是在生成消息时,HDFS中没有插入任何内容。
如果有人可以指出我所缺少的内容,那将非常有帮助。
我尝试过的代码是..
KafkaChannel channel = new KafkaChannel();
Map<String, String> channelParamters = new HashMap<String, String>();
channelParamters.put("brokerList", "localhost:9092");
channelParamters.put("zookeeperConnect","localhost:2181");
channelParamters.put("topic","integration");
channelParamters.put("groupId","channel");
channelParamters.put("batchSize", "15");
channelParamters.put("zookeeper.connect","localhost:2181");
channelParamters.put("clientId", "channel");
channelParamters.put("readSmallestOffset","true");
channelParamters.put("interceptors","i1");
channelParamters.put("interceptors.i1.type","host");
channelParamters.put("consumer.timeout.ms","1000");
channelParamters.put("parseAsFlumeEvent", "false");
channel.setName("KafkaSource");
Context channelContext = new Context(channelParamters);
final Map<String, String> properties = new HashMap<String, String>();
/** Sink Properties start **/
HDFSEventSink eventSink = new HDFSEventSink();
eventSink.setName("HDFSEventSink-" + "kafkaEventSink");
String hdfsBasePath = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events";
properties.put("hdfs.type", "hdfs");
properties.put("hdfs.path", hdfsBasePath + "/%Y/%m/%d/%H");
properties.put("hdfs.rollInterval ", "0");
properties.put("hdfs.rollSize ", "2048");
properties.put("hdfs.rollCount ", "0");
properties.put("hdfs.fileType ", " DataStream");
properties.put("channel", channel.getName());
properties.put("hdfs.maxOpenFiles", String.valueOf(1));
properties.put("hdfs.filePrefix ", " kafka_host");
properties.put("hdfs.fileSuffix ", " .txt");
properties.put("hdfs.idleTimeout ", "60");
/** Sink Properties end **/
Context sinkContext = new Context(properties);
eventSink.configure(sinkContext);
eventSink.setChannel(channel);
Configurables.configure(channel, channelContext);
eventSink.start();
channel.start();
最佳答案
我不清楚您要使用自定义Java代码实现的目标。
更好的方法是使用Kafka Connect(Apache Kafka的一部分)和开源HDFS connector。它确实可以完成您在这里所做的事情,只是要设置的配置文件,处理架构,扩展,执行自动故障转移等。