Flume配置以上传具有相同名称的文件

Flume配置以上传具有相同名称的文件

本文介绍了Flume配置以上传具有相同名称的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有10个文件,其中一些数据的长度不同.我想将相应的数据存储在相同的文件中,并使用相同的文件名,但是flume正在拆分数据并将其另存为FlumeData.timestamp.我正在使用以下配置:

I have 10 files with some data varying in length.I would like to store corresponding data in same file and with same filename, but flume is splitting up the data and saving as FlumeData.timestamp. I am using the configuration as below:

 a1.sources = r1
a1.sinks =  k2
a1.channels = c1

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
a1.channels.c1.trackerDir = /mnt/flume/track
a1.channels.c1.transactionCapacity = 10000000
a1.channels.c1.capacity = 500000000
a1.channels.c1.maxFileSize = 10000000
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1.backupCheckpointDir = /mnt/flume/backup
a1.channels.c1.checkpointInterval = 2000
a1.channels.c1.minimumRequiredSpace = 9000000

a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/netlog/
a1.sources.r1.fileHeader = true
a1.sources.r1.bufferMaxLineLength = 500
a1.sources.r1.bufferMaxLines = 10000
a1.sources.r1.batchSize = 100000
#a1.sources.r1.deletePolicy = immediate

a1.sinks.k2.type = hdfs
a1.sinks.k2.channel = c1

a1.sinks.k2.hdfs.filePrefix = %{file}
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.batchSize = 100000
a1.sinks.k2.hdfs.rollSize = 10000000
a1.sinks.k2.hdfs.rollInterval = 0
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.idleTimeout = 0
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.path = /user/flume

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1

请建议我如何在其中存储具有相同文件名和数据的相同10个文件.文件大小可以从2 MB到15 MB不等.

Kindly suggest how i can store same 10 files with same filename and data within it. File size can vary from 2 MB to 15 MB.

我在日志中看到的错误是

The error i see in logs is

lib/native org.apache.flume.node.Application --conf-file conf/flume-conf.properties --name a1
2014-12-03 20:49:47,545 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2014-12-03 20:49:47,550 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2014-12-03 20:49:47,555 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-conf.properties for changes
2014-12-03 20:49:47,555 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/flume-conf.properties
2014-12-03 20:49:47,571 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,571 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for k2: hdfs.batchSize
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k2 Agent: a1
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,575 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,576 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: a1, initial-configuration: AgentConfiguration[a1]
SOURCES: {r1={ parameters:{bufferMaxLineLength=500, channels=c1, spoolDir=/usr/local/netlog/, bufferMaxLines=10000, fileHeader=true, batchSize=100000, type=spooldir} }}
CHANNELS: {c1={ parameters:{trackerDir=/mnt/flume/track, maxFileSize=10000000, dataDirs=/mnt/flume/data, type=file, transactionCapacity=10000000, capacity=500000000, checkpointDir=/mnt/flume/checkpoint} }}
SINKS: {k2={ parameters:{hdfs.batchSize=100000, hdfs.idleTimeout=0, hdfs.filePrefix=%{file}, hdfs.path=/user/flume, hdfs.writeFormat=Text, hdfs.rollSize=0, hdfs.rollCount=0, channel=c1, hdfs.rollInterval=0, hdfs.fileType=DataStream, type=hdfs} }}

2014-12-03 20:49:47,583 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel c1
2014-12-03 20:49:47,593 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: k2 using HDFS
2014-12-03 20:49:47,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for a1
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[a1]
SOURCES: {r1={ parameters:{bufferMaxLineLength=500, channels=c1, spoolDir=/usr/local/netlog/, bufferMaxLines=10000, fileHeader=true, batchSize=100000, type=spooldir} }}
CHANNELS: {c1={ parameters:{trackerDir=/mnt/flume/track, maxFileSize=10000000, dataDirs=/mnt/flume/data, type=file, transactionCapacity=10000000, capacity=500000000, checkpointDir=/mnt/flume/checkpoint} }}
SINKS: {k2={ parameters:{hdfs.batchSize=100000, hdfs.idleTimeout=0, hdfs.filePrefix=%{file}, hdfs.path=/user/flume, hdfs.writeFormat=Text, hdfs.rollSize=0, hdfs.rollCount=0, channel=c1, hdfs.rollInterval=0, hdfs.fileType=DataStream, type=hdfs} }}

2014-12-03 20:49:47,597 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:c1

2014-12-03 20:49:47,597 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks k2

2014-12-03 20:49:47,597 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources r1

2014-12-03 20:49:47,597 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1]
2014-12-03 20:49:47,598 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2014-12-03 20:49:47,629 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel c1 type file
2014-12-03 20:49:47,635 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1
2014-12-03 20:49:47,636 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source r1, type spooldir
2014-12-03 20:49:47,654 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: k2, type: hdfs
2014-12-03 20:49:48,108 (conf-file-poller-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink.authenticate(HDFSEventSink.java:555)] Hadoop Security enabled: false
2014-12-03 20:49:48,111 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel c1 connected to [r1, k2]
2014-12-03 20:49:48,125 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Spool Directory source r1: { spoolDir: /usr/local/netlog/ } }} sinkRunners:{k2=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1f87c88 counterGroup:{ name:null counters:{} } }} channels:{c1=FileChannel c1 { dataDirs: [/mnt/flume/data] }} }
2014-12-03 20:49:48,130 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
2014-12-03 20:49:48,130 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:259)] Starting FileChannel c1 { dataDirs: [/mnt/flume/data] }...
2014-12-03 20:49:48,147 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.<init>(Log.java:328)] Encryption is not enabled
2014-12-03 20:49:48,149 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:373)] Replay started
2014-12-03 20:49:48,150 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:385)] Found NextFileID 0, from []
2014-12-03 20:49:48,155 (lifecycleSupervisor-1-0) [ERROR - org.apache.flume.channel.file.Log.replay(Log.java:481)] Failed to initialize Log on [channel=c1]
java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2014-12-03 20:49:48,160 (lifecycleSupervisor-1-0) [ERROR - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:290)] Failed to start the file channel [channel=c1]
java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2014-12-03 20:49:48,162 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k2
2014-12-03 20:49:48,163 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
2014-12-03 20:49:48,163 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /usr/local/netlog/
2014-12-03 20:49:48,185 (lifecycleSupervisor-1-3) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/usr/local/netlog, metaDir=.flumespool, deserializer=LINE
2014-12-03 20:49:48,204 (lifecycleSupervisor-1-3) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /usr/local/netlog/flume-spooldir-perm-check-5019906964160509405.canary
2014-12-03 20:49:48,218 (lifecycleSupervisor-1-3) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2014-12-03 20:49:48,343 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2014-12-03 20:49:48,343 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2014-12-03 20:49:48,344 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2014-12-03 20:49:48,347 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2014-12-03 20:49:48,356 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2014-12-03 20:49:48,357 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.EOFException: null
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:329)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
2014-12-03 20:49:48,659 (pool-4-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source r1: { spoolDir: /usr/local/netlog/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.EOFException: null
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:329)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:181)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    ... 7 more
2014-12-03 20:49:53,359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.EOFException: null
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:329)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

预先感谢

推荐答案

好的,您还需要HDfs下沉的一些道具:

ok, you need a few more props fro hdfs sink:

a1.sinks.k2.hdfs.filePrefix = [your prefix]
a1.sinks.k2.hdfs.fileSuffix = .[your suffix]

后缀例如是.tsv或.csv,而前缀可以是任何东西-您还可以将变量用于日期和时间-这要求您使用时间戳拦截器.您还可以创建自己的拦截器,并在文件名中生成自己的变量.如果省略此选项,则flume将在前缀和后缀之间添加其自己的序列.

suffix would be .tsv or .csv for instance, while prefix can be anything - you can also use variables for date and time - this requires that you use the timestamp interceptor. You can also create your own interceptor and generate your own variables into your file name. If you omit this, flume will add its own sequence between prefix and suffix.

除了我们之前的评论外,禁用翻转的道具还包括:

As an addition to our previous comments, the props to disable rollovers are the following:

a1.sinks.k2.rollInterval = 0
a1.sinks.k2.rollSize = 0
a1.sinks.k2.rollCount = 0
a1.sinks.k2.idleTimeout = 0

要从源访问原始文件的文件名,请在hdfs接收器配置中添加以下内容:

to access the file name of the original file from your source, append the following in your hdfs sink config:

a1.sinks.k2.hdfs.filePrefix = %{file}

为简化您的频道配置,请执行以下操作:

to simplify your channel config, do the following:

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

这篇关于Flume配置以上传具有相同名称的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-12 13:41