Flume
一切尽在官网
flume的配置
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
flume代码示例
flume主要组成是agent,agent的组成分为Source(数据进入端口),Channel(数据管道),Sink(数据输出端)
# example.conf: A single-node Flume configuration #对agent的组件其名称 # Name the components on this agent //定义agent的名称,对agent中的三个组件进行命名 //sources,sinks,channels 后加S所以可以同时定义多个,来适应不同的业务场景 a1.sources = r1 a1.sinks = k1 a1.channels = c1 //设置source端 source的数据来源是什么? 根据不同的数据来源,设置source的内容 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 //设置sinks端 sink端输出的数据类型,根据不同的业务场景,来设置 # Describe the sink a1.sinks.k1.type = logger //设置channels channels存在的类型,大小为1000,每次传输的大小为100 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 //规定source、channel对应关系和channnel、sink对应的关系 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
模型简介
不同的模型对应不同的业务逻辑
flume与flume之间连接使用的数据类型是AVRO 为了将数据通过多个代理或跳数据流,前一代理和当前跳转源的接收器需要是 avro 类型,该接收器指向主机名(或 IP 地址)和源的端口。 多个flume连接
代码示例:在两台机器上
//从第一台webService上获取数据,传输送到第二台机器上,写入到第二台机器的磁盘上 //第一台上的配置文件 //对agent组件进行命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source //source端的配置 a1.sources.r1.type = exec a1.sources.r1.command = tail -f /home/bigdata/webapp.log # Describe the sink //sink短的配置 a1.sinks.k1.type = avro a1.sinks.k1.hostname = bigdata a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory # Bind the source and sink to the channel //channel的配置 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 //第二台的配置文件 # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = bigdata a2.sources.r1.port = 8888 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://bigdata:9000/logDir # Use a channel which buffers events in memory a2.channels.c1.type = memory # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
多个flume汇聚到一个flume在输出
A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster. 日志集合中一个非常常见的场景是大量的日志生成客户端将数据发送给附属于存储子系统的一些消费者代理。 例如,从数百个网络服务器收集的日志发送给十几个写给 HDFS 集群的代理 This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination 通过使用 avro 接收器配置一些一级代理可以在 Flume 中实现,这些代理都指向单个代理的 avro 源(在这种情况下,你可以使用节约源 / 汇 / 客户端)。 第二层代理的这个源将接收到的事件合并到一个通道中,这个通道被一个接收器消耗到它的最终目的地
Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels. A fan-out flow using a (multiplexing) channel selector The above example shows a source from agent “foo” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called “txnType” is set to “customer”, then it should go to channel1 and channel3, if it’s “vendor” then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file. 支持将事件流向一个或多个目的地。 这是通过定义一个可以复制或选择性地将事件路由到一个或多个通道的流多路器来实现的。 一种使用(多路复用)通道选择器的扇形流 上面的例子显示了来自"foo"的源代码,将流程分散到三个不同的通道。 这个风扇可以复制或复用。 在复制流程的情况下,每个事件被发送到所有三个通道。 对于多路复用情况,当事件的属性与预配置值匹配时,将事件传递给可用的子集通道。 例如,如果一个被称为"txnType"的事件属性设置为"customer",那么它应该被引导到信道1和信道3,如果它是"供应商",那么它应该被引导到信道2,否则是信道3。 映射可以设置在代理的配置文件。
flume中有多个sink输出到不同的位置
第一台机器收集flume日志信息
第二台机器实时显示第一台的日志信息
第三台机器将日志信息保存到hdfs上
Flume通过文件来读取数据 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #source端的根据数据类型来确定type a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/app/hive-0.13.1-cdh5.3.6/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname =hh4 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = hh4 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hh4 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hh4 a3.sources.r1.port = 4141 # Describe the sink a3.sinks.k1.type = hdfs a3.sinks.k1.hdfs.path = hdfs://hh4:8020/flume3/%Y%m%d/%H #上传文件的前缀 a3.sinks.k1.hdfs.filePrefix = flume3- #是否按照时间滚动文件夹 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是 128M a3.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k1.hdfs.rollCount = 0 #最小冗余数 a3.sinks.k1.hdfs.minBlockReplicas = 1 # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1