1,项目图如下:

kafka+flume+HDFS日志采集项目框架-LMLPHP

2, 实现过程

启动HDFS:

sbin/start-dfs.sh

启动zookeeper(三台):

 bin/zkServer.sh start

启动kafka(三台):

root@Ubuntu-:/usr/local/kafka# bin/kafka-server-start.sh config/server.properties >logs/kafka3.log >&

在131中创建access的topic:

root@Ubuntu-:/usr/local/kafka# bin/kafka-topics.sh --create --topic access --zookeeper 192.168.22.131:,192.168.22.132:,192.168.22.135: --replication-factor  --partitions  

查看创建的主题:

root@Ubuntu-:/usr/local/kafka# bin/kafka-topics.sh --list --zookeeper localhost:

131启动flume:

bin/flume-ng agent --conf conf/ --conf-file conf/access.conf  --name a1 -Dflume.root.logger=INFO,console &

内容:

#定义各个模块
a1.sources = exec
a1.sinks = hdfs_sink kafka_sink
a1.channels = hdfs_channel kafka_channel #配置 exec source
a1.sources.exec.type = exec
a1.sources.exec.command = tail -F /usr/local/apache-flume/logs/hu.log
#配置拦截器
a1.sources.exec.interceptors = i1
a1.sources.exec.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder # 配置 channel a1.channels.hdfs_channel.type = memory
a1.channels.hdfs_channel.capacity =
a1.channels.hdfs_channel.transactionCapacity = a1.channels.kafka_channel.type = memory
a1.channels.kafka_channel.capacity =
a1.channels.kafka_channel.transactionCapacity = # 配置hdfs sink
a1.sinks.hdfs_sink.type = hdfs
a1.sinks.hdfs_sink.hdfs.path =hdfs://Ubuntu-1:9000/source/%{type}/%Y%m%d
a1.sinks.hdfs_sink.hdfs.filePrefix = events-
a1.sinks.hdfs_sink.hdfs.fileType = DataStream
#a1.sinks.hdfs_sink.hdfs.fileType = CompressedStream
#a1.sinks.hdfs_sink.hdfs.codeC = gzip
#不按照条数生成文件
a1.sinks.hdfs_sink.hdfs.rollCount =
#HDFS上的文件达到64M时生成一个文件
a1.sinks.hdfs_sink.hdfs.rollSize =
a1.sinks.hdfs_sink.hdfs.rollInterval =
a1.sinks.hdfs_sink.hdfs.batchSize= # 配置 kafka sink
a1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafka_sink.topic = access
a1.sinks.kafka_sink.brokerList = 192.168.22.131:,192.168.22.132:,192.168.22.135:
a1.sinks.kafka_sink.requiredAcks =
a1.sinks.kafka_sink.batchSize = # 绑定三种组件的关系
a1.sources.exec.channels = hdfs_channel kafka_channel
a1.sinks.hdfs_sink.channel = hdfs_channel
a1.sinks.kafka_sink.channel = kafka_channel

132中创建kafka的producer:

root@Ubuntu-:/usr/local/kafka# bin/kafka-console-consumer.sh --zookeeper 192.168.22.131:,192.168.22.132:,192.168.22.135: --topic access
05-23 16:22