原文链接:[转]Flume NG 配置详解
(说明,名词对应解释 源-Source,接收器-Sink,通道-Channel)
代理需要知道如何加载各个组件以及它们是如何连接,以构成流。这是通过列出的源,接收器及通道的名称,然后指定每个接收器和源的连接通道。例如,流定义,Avro源avroWeb 到HDFS接收器hdfs-cluster1,通过JDBC通道jdbc-channel。该配置文件将包含这些组件,jdbc-channel通道作为avroWeb源和hdfs-cluster接收器共享存在。
flume-ng 命令行参数
- Usage: ./flume-ng <command> [options]...
- commands:
- help display this help text
- agent run a Flume agent
- avro-client run an avro Flume client
- global options:
- --conf,-c <conf> use configs in <conf> directory
- --classpath,-C <cp> append to the classpath
- --dryrun,-d do not actually start Flume, just print the command
- -Dproperty=value sets a JDK system property value
- agent options:
- --conf-file,-f specify a config file (required)
- --name,-n the name of this agent (required)
- --help,-h display help text
- avro-client options:
- --host,-H <host> hostname to which events will be sent (required)
- --port,-p <port> port of the avro source (required)
- --filename,-F <file> text file to stream to avro source [default: std input]
- --headerFile,-R <file> headerFile containing headers as key/value pairs on each new line
- --help,-h display help text
- Note that if <conf> directory is specified, then it is always included first
- in the classpath.
$ bin/flume-ng agent -n foo -f conf/flume-conf.properties.template |
Avro客户端包含在Flume发行版本中,可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10 |
上面的命令,将要发送/usr/logs/log.10到Flume Source(监听在41414端口)
还有一个exec执行一个给定的命令获得输出的源。一个单一的输出,即“line”。回车('\ R')或换行符('\ N'),或两者一起的文本。Flume不支持tail做为一个源,不过可以通过exec tail。
#List the sources, sinks and channels for the agent <agent>.sources = <Source> <agent>.sinks = <Sink> <agent>.channels = <Channel1> <Channel2> #set channel for source <agent>.sources.<Source>.channels = <Channel1> <Channel2> ... #set channel for sink <agent>.sinks.<Sink>.channel = <Channel1> |
weblog-agent.sources = avro-AppSrv-source weblog-agent.sinks = hdfs-Cluster1-sink weblog-agent.channels = mem-channel-1 #set channel for source weblog-agent.sources.avro-AppSrv-source.channels = mem-channel-1 #set channel for sink weblog-agent.sinks.hdfs-Cluster1-sink.channel = mem-channel-1 |
#Properties for sources <agent>.sources.<Source>.<someProperty> = <someValue> .. #Properties for channels <agent>.channel.<Channel>.<someProperty> = <someValue> .. #Properties for sinks <agent>.sources.<Sink>.<someProperty> = <someValue> |
weblog-agent.sources = avro-AppSrv-source weblog-agent.sinks = hdfs-Cluster1-sink weblog-agent.channels = mem-channel-1 #set channel for sources, sinks .. #properties of avro-AppSrv-source weblog-agent.sources.avro-AppSrv-source.type = avro weblog-agent.sources.avro-AppSrv-source.bind = localhost weblog-agent.sources.avro-AppSrv-source.port = 10000 #properties of mem-channel-1 weblog-agent.channels.mem-channel-1.type = memory weblog-agent.channels.mem-channel-1.capacity = 1000 weblog-agent.channels.mem-channel-1.transactionCapacity = 100 #properties of hdfs-Cluster1-sink weblog-agent.sinks.hdfs-Cluster1-sink.type = hdfs weblog-agent.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata/ … |
#List the sources, sinks and channels for the agent <agent>.sources = <Source1> <Source2> <agent>.sinks = <Sink1> <Sink2> <agent>.channels = <Channel1> <Channel2> |
#List the sources, sinks and channels in the agent weblog-agent.sources = avro-AppSrv-source1 exec-tail-source2 weblog-agent.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 weblog-agent.channels = mem-channel-1 jdbc-channel-2 ## Flow-1 configuration weblog-agent.sources.avro-AppSrv-source1.channels = mem-channel-1 weblog-agent.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 ## Flow-2 configuration weblog-agent.sources.exec-tail-source2.channels = jdbc-channel-2 weblog-agent.sinks.avro-forward-sink2.channel = jdbc-channel-2 |
设置一个多层的流,你需要有一个指向下一跳avro源的第一跳的avro 接收器。这将导致第一Flume代理转发事件到下一个Flume代理。例如,如果您定期发送的文件,每个事件(1文件)AVRO客户端使用本地Flume代理,那么这个当地的代理可以转发到另一个有存储的代理。
## weblog agent config #List sources, sinks and channels in the agent weblog-agent.sources = avro-AppSrv-source weblog-agent.sinks = avro-forward-sink weblog-agent.channels = jdbc-channel #define the flow weblog-agent.sources.avro-AppSrv-source.channels = jdbc-channel weblog-agent.sinks.avro-forward-sink.channel = jdbc-channel #avro sink properties weblog-agent.sources.avro-forward-sink.type = avro weblog-agent.sources.avro-forward-sink.hostname = weblog-agent.sources.avro-forward-sink.port = 10000 #configure other pieces ... |
## hdfs-agent config #List sources, sinks and channels in the agent hdfs-agent.sources = avro-collection-source hdfs-agent.sinks = hdfs-sink hdfs-agent.channels = mem-channel #define the flow hdfs-agent.sources.avro-collection-source.channels = mem-channel hdfs-agent.sinks.hdfs-sink.channel = mem-channel #avro source properties hdfs-agent.sources.avro-collection-source.type = avro hdfs-agent.sources.avro-collection-source.bind = hdfs-agent.sources.avro-collection-source.port = 10000 #configure other pieces ... |
这里我们连接从weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最终结果从外部源的appserver最终存储在HDFS的事件。
#List the sources, sinks and channels for the agent <agent>.sources = <Source1> <agent>.sinks = <Sink1> <Sink2> <agent>.channels = <Channel1> <Channel2> #set list of channels for source (separated by space) <agent>.sources.<Source1>.channels = <Channel1> <Channel2> #set channel for sinks <agent>.sinks.<Sink1>.channel = <Channel1> <agent>.sinks.<Sink2>.channel = <Channel2> <agent>.sources.<Source1>.selector.type = replicating |
# Mapping for multiplexing selector <agent>.sources.<Source1>.selector.type = multiplexing <agent>.sources.<Source1>.selector.header = <someHeader> <agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> ... <agent>.sources.<Source1>.selector.default = <Channel2> |
#List the sources, sinks and channels in the agent weblog-agent.sources = avro-AppSrv-source1 weblog-agent.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 weblog-agent.channels = mem-channel-1 jdbc-channel-2 # set channels for source weblog-agent.sources.avro-AppSrv-source1.channels = mem-channel-1 jdbc-channel-2 #set channel for sinks weblog-agent.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 weblog-agent.sinks.avro-forward-sink2.channel = jdbc-channel-2 # weblog-agent.sources.avro-AppSrv-source1.selector.type = multiplexing weblog-agent.sources.avro-AppSrv-source1.selector.header = State weblog-agent.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 weblog-agent.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 weblog-agent.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2 weblog-agent.sources.avro-AppSrv-source1.selector.default = mem-channel-1 |
Flume 源(Source)
Avro 源
Property Name | Default | Description |
type | - | The component type name, needs to be avro |
bind | - | hostname or IP address to listen on |
port | - | Port # to bind to |
Exec 源
此源启动运行一个给定的Unix命令,预计这一过程中不断产生标准输出(stderr被简单地丢弃,除非logStdErr= TRUE)上的数据。如果因任何原因的进程退出时,源也退出,并不会产生任何进一步的数据。
Property Name | Default | Description |
type | - | The component type name, needs to be exec |
command | - | The command to execute |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
备注: 在ExecSource不能保证,如果有一个失败的放入到通道的事件,客户也知道。在这种情况下,数据将丢失。 |
exec-agent.sources = tail exec-agent.channels = memoryChannel-1 exec-agent.sinks = logger exec-agent.sources.tail.type = exec exec-agent.sources.tail.command = tail -f /var/log/secure |
Property Name | Default | Description |
type | - | The component type name, needs to be netcat |
bind | - | Host name or IP address to bind to |
port | - | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
Property Name | Default | Description |
type | - | The component type name, needs to be seq |
Syslog 源
读取syslog数据,并生成Flume事件。 UDP源将作为一个单一的事件的整个消息。 TCP源回车(\ n)来分隔的字符串创建一个新的事件。
Syslog TCP
Property Name | Default | Description |
type | - | The component type name, needs to be syslogtcp |
host | - | Host name or IP address to bind to |
port | - | Port # to bind to |
例如, a syslog TCP source:
syslog-agent.sources = syslog syslog-agent.channels = memoryChannel-1 syslog-agent.sinks = logger syslog-agent.sources.syslog.type = syslogtcp syslog-agent.sources.syslog.port = 5140 syslog-agent.sources.syslog.host = localhost |
Syslog UDP
Property Name | Default | Description |
type | - | The component type name, needs to be syslogudp |
host | - | Host name or IP address to bind to |
port | - | Port # to bind to |
例如, a syslog UDP source:
syslog-agent.sources = syslog syslog-agent.channels = memoryChannel-1 syslog-agent.sinks = logger syslog-agent.sources.syslog.type = syslogudp syslog-agent.sources.syslog.port = 5140 syslog-agent.sources.syslog.host = localhost |
Avro Legacy
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | - | The hostname or IP address to bind to |
port | - | The port # to listen on |
Thrift Legacy
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.source.thriftLegacy.ThriftLegacySource |
host | - | The hostname or IP address to bind to |
port | - | The port # to listen on |
注:Flume1.x中的可靠性语义不同的是从0.9.x.端到端或DFO模式的0.9.x版本的代理不会被遗留源支持。 0.9.x版本唯一支持的模式是Best Effort。 |
%{host} | host name stored in event header |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, …) |
%A | locale’s full weekday name (Monday, Tuesday, …) |
%b | locale’s short month name (Jan, Feb,…) |
%B | locale’s long month name (January, February,…) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%M | minute (00..59) |
%P | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
Name | Default | Description |
type | - | The component type name, needs to be hdfs |
hdfs.path | - | HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Name prefixed to files created by Flume in hdfs directory |
hdfs.rollInterval | 30 | Number of seconds to wait before rolling current file |
hdfs.rollSize | 1024 | File size to trigger roll (in bytes) |
hdfs.rollCount | 10 | Number of events written to file before it rolled |
hdfs.batchSize | 1 | number of events written to file before it flushed to HDFS |
hdfs.txnEventMax | 100 | |
hdfs.codeC | - | Compression codec. one of following : gzip, bzip2, lzo, snappy |
hdfs.fileType | SequenceFile | File format - currently SequenceFile or DataStream |
hdfs.maxOpenFiles | 5000 | |
hdfs.writeFormat | - | “Text” or “Writable” |
hdfs.appendTimeout | 1000 | |
hdfs.callTimeout | 5000 | |
hdfs.threadsPoolSize | 10 | |
hdfs.kerberosPrincipal | “” | Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab | “” | Kerberos keytab for accessing secure HDFS |
Logger sink
type | - | The component type name, needs to be logger |
Property Name | Default | Description |
type | - | The component type name, needs to be avro |
hostname | - | The hostname or IP address to bind to |
port | - | The port # to listen on |
batch-size | 100 | number of event to batch together for send. |
The IRC sink takes messages from attached channel and relays those to configured IRC destinations.
Property Name | Default | Description |
type | - | The component type name, needs to be irc |
hostname | - | The hostname or IP address to connect to |
port | 6667 | The port number of remote host to connect |
nick | - | Nick name |
user | - | User name |
password | - | User password |
chan | - | channel |
name | ||
splitlines | - | (boolean) |
splitchars | \n | line separator (if you were to enter the default value into the config file, the you would need to escape the backslash, like this: \\n) |
File Role
Property Name | Default | Description |
type | - | The component type name, needs to be file_roll |
sink.directory | - | |
sink.rollInterval | 30 |
Property Name | Default | Description |
type | - | The component type name, needs to be null |
Property Name | Default | Description |
type | - | The component type name, needs to be memory |
capacity | 100 | The max number of events stored in the channel |
transactionCapacity | 100 | The max number of events stored in the channel per transaction |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
Property Name | Default | Description |
type | - | The component type name, needs to be jdbc |
db.type | DERBY | Database vendor, needs to be DERBY. |
driver.class | org.apache.derby.jdbc.EmbeddedDriver | Class for vendors JDBC driver |
driver.url | (constructed from other properties) | JDBC connection URL |
db.username | sa | User id for db connection |
db.password | password for db connection | |
connection.properties.file | - | JDBC Connection property file path |
create.schema | true | If true, then creates db schema if not there |
create.index | true | Create indexes to speed up lookups |
create.foreignkey | true | |
transaction.isolation | READ_COMMITTED | Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ |
maximum.connections | 10 | Max connections allowed to db |
maximum.capacity | 0 (unlimited) | Max number of events in the channel |
sysprop.* | DB Vendor specific properties | |
sysprop.user.home | Home path to store embedded Derby database |
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel |
wal.dataDir | (${user.home}/.flume/recoverable-memory-channel | |
wal.rollSize | (0x04000000) | Max size (in bytes) of a single file before we roll |
wal.minRetentionPeriod | 300000 | Min amount of time (in millis) to keep a log |
wal.workerInterval | 60000 | How often (in millis) the background worker checks for old logs |
wal.maxLogsSize | (0x20000000) | Total amt (in bytes) of logs to keep, excluding the current log |
NOTE: 目前还不可用
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.channel.file.FileChannel |
备注: 仅仅用来测试目的,不是在生产环境中使用。
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity | 50 | The max number of events stored in the channel |
keep-alive | 3 | Timeout in seconds for adding or removing an event |