环境
  apache-flume-1.6.0

一、多agent连接

【Flume学习之二】Flume 使用场景-LMLPHP

1、node101配置 option2

    # Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node101
a1.sources.r1.port = # Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node102
a1.sinks.k1.port = # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、node102配置 option1

############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node102
a1.sources.r1.port = # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

3、启动顺序
先启动node102-flume,后启动node101-flume,看一下flume启动顺序就知道,要先创建sink,然后创建channel,最后创建source;然后channel连接sink和channel;最后启动channel、sink、source

[root@node102 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.-bin/conf -f /usr/local/apache-flume-1.6.-bin/conf/option1 -n a1 -Dflume.root.logger=INFO,console
[root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.-bin/conf -f /usr/local/apache-flume-1.6.-bin/conf/option2 -n a1 -Dflume.root.logger=INFO,console

4、测试:在node101 telnet测试,在node102查看输出日志
node101 telnet:

[root@node101 ~]# telnet node101
Trying 192.168.118.101...
Connected to node101.
Escape character is '^]'.
hello world
OK
haha wjy
OK
hi xiaoming
OK
^]
telnet> quit
Connection closed.
[root@node101 ~]#

node102 flume日志:

-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{} body:   6C 6C 6F   6F  6C  0D hello world. }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{} body: 6A 0D haha wjy. }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{} body: 6F 6D 6E 0D hi xiaoming. }

二、Exec Source
Source类型选择Exec
1、配置 option3

############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

2、启动

[root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.-bin/conf -f /usr/local/apache-flume-1.6.-bin/conf/option3 -n a1 -Dflume.root.logger=INFO,console

3、测试

[root@node101 home]# echo "wjy" >> flume.exec.log
[root@node101 home]# echo "hi" >> flume.exec.log
[root@node101 home]# echo "hello wjy" >> flume.exec.log

flume输出:

-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{} body:  6A  wjy }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{} body: hi }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{} body: 6C 6C 6F 6A hello wjy }

三、Spooling Directory Source

监测配置的目录下新增的文件,并将文件中的数据读取出来:
1)拷贝到spool目录下的文件不可以再打开编辑;
2) spool目录下不可包含相应的子目录;

1、配置

    ############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

2、启动

[root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.-bin/conf -f /usr/local/apache-flume-1.6.-bin/conf/option4 -n a1 -Dflume.root.logger=INFO,console

3、测试

日志目录:/home/logs

[root@node101 home]# cat flume.exec.log
hello
hello
hello
wjy
hi
hello wjy
[root@node101 home]# mkdir logs && mv flume.exec.log ./logs && cd logs && ls
flume.exec.log.COMPLETED

flume输出:

-- ::, (pool--thread-) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:)] Preparing to move file /home/logs/flume.exec.log to /home/logs/flume.exec.log.COMPLETED
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{file=/home/logs/flume.exec.log} body: 6C 6C 6F hello }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{file=/home/logs/flume.exec.log} body: 6C 6C 6F hello }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{file=/home/logs/flume.exec.log} body: 6C 6C 6F hello }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{file=/home/logs/flume.exec.log} body: 6A wjy }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{file=/home/logs/flume.exec.log} body: hi }
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:)] Event: { headers:{file=/home/logs/flume.exec.log} body: 6C 6C 6F 6A hello wjy }

四、日志输出到HDFS

1、配置

############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true # Describe the sink
***只修改上一个spool sink的配置代码块 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://node101:8020/flume/%Y-%m-%d/%H%M ##每隔60s或者文件大小超过10M的时候产生新文件
# hdfs有多少条消息时新建文件,0不基于消息个数
a1.sinks.k1.hdfs.rollCount=
# hdfs创建多长时间新建文件,0不基于时间
a1.sinks.k1.hdfs.rollInterval=
# hdfs多大时新建文件,0不基于文件大小
a1.sinks.k1.hdfs.rollSize=
# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
a1.sinks.k1.hdfs.idleTimeout= a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true ## 每五分钟生成一个目录:
# 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式
a1.sinks.k1.hdfs.round=true
# 时间上进行“舍弃”的值;
a1.sinks.k1.hdfs.roundValue=
# 时间上进行”舍弃”的单位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

创建HDFS目录

[root@node101 conf]# hdfs dfs -mkdir /flume

2、启动

[root@node101 conf]# flume-ng agent -c /usr/local/apache-flume-1.6.-bin/conf -f /usr/local/apache-flume-1.6.-bin/conf/option5 -n a1 -Dflume.root.logger=INFO,console

3、测试

制造测试数据:

[root@node101 home]# echo "hello wjy" >> test.log
[root@node101 home]# echo "hello xiaoming" >> test.log
[root@node101 home]# echo "hi xiaowang" >> test.log
[root@node101 home]# cp test.log ./logs

flume执行日志:

-- ::, (pool--thread-) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
-- ::, (pool--thread-) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:)] Preparing to move file /home/logs/test.log to /home/logs/test.log.COMPLETED
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:)] Serializer = TEXT, UseRawLocalFileSystem = false
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:)] Creating hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp
-- ::, (hdfs-k1-roll-timer-) [INFO - org.apache.flume.sink.hdfs.BucketWriter$.call(BucketWriter.java:)] Closing idle bucketWriter hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp at 1561978108285
-- ::, (hdfs-k1-roll-timer-) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:)] Closing hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp
-- ::, (hdfs-k1-call-runner-) [INFO - org.apache.flume.sink.hdfs.BucketWriter$.call(BucketWriter.java:)] Renaming hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198.tmp to hdfs://node101:8020/flume/2019-07-01/1845/FlumeData.1561978100198
-- ::, (hdfs-k1-roll-timer-) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$.run(HDFSEventSink.java:)] Writer callback called.

hdfs文件:

【Flume学习之二】Flume 使用场景-LMLPHP

五、其他:
多路日志合并

【Flume学习之二】Flume 使用场景-LMLPHP

多路日志输出

【Flume学习之二】Flume 使用场景-LMLPHP

Apache Flume使用手册

Flume概念与原理

Flume(一)Flume原理解析

Flume构建日志采集系统

05-28 22:42