一、问题:Windows 下 flume采集配置TAILDIR的时候,会报如下错误:

agent.sources.seqGenSrc.type = TAILDIR
agent.sources.seqGenSrc.positionFile = .\\taildir_mongodb_position.json
agent.sources.seqGenSrc.filegroups = seqGenSrc
agent.sources.seqGenSrc.filegroups.seqGenSrc = D:\\bigdata-tax-crawler-python\\results\\jiangsu.log
agent.sources.seqGenSrc.fileHeader = false

错误日志:

java.lang.UnsupportedOperationException: View 'unix' not available
at sun.nio.fs.AbstractFileSystemProvider.readAttributes(AbstractFileSystemProvider.java:91)
at java.nio.file.Files.readAttributes(Files.java:1964)
at java.nio.file.Files.getAttribute(Files.java:1869)
at org.apache.flume.source.taildir.ReliableTaildirEventReader.getInode(ReliableTaildirEventReader.java:284)
at org.apache.flume.source.taildir.ReliableTaildirEventReader.updateTailFiles(ReliableTaildirEventReader.java:248)
at org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:93)
at org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:49)
at org.apache.flume.source.taildir.ReliableTaildirEventReader$Builder.build(ReliableTaildirEventReader.java:355)
at org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:105)
at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

二、问题原因:flume的源码(此处以1.9版本为例)中ReliableTaildirEventReader.java获取inode时通过inode = (long) Files.getAttribute(file.toPath(), "unix:ino");进行获取,该方法只支持unix系统,无法支持windows操作系统,故而报错。

TaildirSource动态监听文件变化的技术基础就是获取文件的inode,建立inode和文件之间的一一对应关系,利用RandomAccessFile去读取文件,并将inode和读取的位置以及文件位置保存成json文件进行持久化,以便后续的继续跟踪。inode是linux文件的概念,而获取inode是在ReliableTaildirEventReader的getInode方法里,在这个方法里是不支持unix操作系统的。TaildirSource的思想是获取一个文件的标识(linux里inode可以作为文件的标识使用,当系统读取文件时,其实就是根据文件路径转换成对应的inode值来做的操作)并记录对应的文件路径,windows中是有file id这种类似于inode的存在的,file id是跟文件系统有关的, 在FAT系统中,如果修改的名字长于旧名字,file id可能会发生改变,但是在NTFS系统中,在删除之前file id都是稳定的。如果是windows系统 并且文件系统是ntfs,那么我们就使用file id去获取文件作为inode。

本文作者:张永清 转载请注明来源于博客园:https://www.cnblogs.com/laoqing/p/12836826.html

1.9中flume-taildir-source 中ReliableTaildirEventReader.java中的完整源码如下(ReliableTaildirEventReader.java中的284行的方法只能运行于unix操作系统):

 /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/ package org.apache.flume.source.taildir; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.gson.stream.JsonReader;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.source.taildir.util.WinFileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry; @InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReliableTaildirEventReader implements ReliableEventReader {
private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class); private final List<TaildirMatcher> taildirCache;
private final Table<String, String, String> headerTable; private TailFile currentFile = null;
private Map<Long, TailFile> tailFiles = Maps.newHashMap();
private long updateTime;
private boolean addByteOffset;
private boolean cachePatternMatching;
private boolean committed = true;
private final boolean annotateFileName;
private final String fileNameHeader;
public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
/**
* Create a ReliableTaildirEventReader to watch the given directory.
*/
private ReliableTaildirEventReader(Map<String, String> filePaths,
Table<String, String, String> headerTable, String positionFilePath,
boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
boolean annotateFileName, String fileNameHeader) throws IOException {
// Sanity checks
Preconditions.checkNotNull(filePaths);
Preconditions.checkNotNull(positionFilePath); if (logger.isDebugEnabled()) {
logger.debug("Initializing {} with directory={}, metaDir={}",
new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths });
} List<TaildirMatcher> taildirCache = Lists.newArrayList();
for (Entry<String, String> e : filePaths.entrySet()) {
taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching));
}
logger.info("taildirCache: " + taildirCache.toString());
logger.info("headerTable: " + headerTable.toString()); this.taildirCache = taildirCache;
this.headerTable = headerTable;
this.addByteOffset = addByteOffset;
this.cachePatternMatching = cachePatternMatching;
this.annotateFileName = annotateFileName;
this.fileNameHeader = fileNameHeader;
updateTailFiles(skipToEnd); logger.info("Updating position from position file: " + positionFilePath);
loadPositionFile(positionFilePath);
} /**
* Load a position file which has the last read position of each file.
* If the position file exists, update tailFiles mapping.
*/
public void loadPositionFile(String filePath) {
Long inode, pos;
String path;
FileReader fr = null;
JsonReader jr = null;
try {
fr = new FileReader(filePath);
jr = new JsonReader(fr);
jr.beginArray();
while (jr.hasNext()) {
inode = null;
pos = null;
path = null;
jr.beginObject();
while (jr.hasNext()) {
switch (jr.nextName()) {
case "inode":
inode = jr.nextLong();
break;
case "pos":
pos = jr.nextLong();
break;
case "file":
path = jr.nextString();
break;
}
}
jr.endObject(); for (Object v : Arrays.asList(inode, pos, path)) {
Preconditions.checkNotNull(v, "Detected missing value in position file. "
+ "inode: " + inode + ", pos: " + pos + ", path: " + path);
}
TailFile tf = tailFiles.get(inode);
if (tf != null && tf.updatePos(path, inode, pos)) {
tailFiles.put(inode, tf);
} else {
logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
}
}
jr.endArray();
} catch (FileNotFoundException e) {
logger.info("File not found: " + filePath + ", not updating position");
} catch (IOException e) {
logger.error("Failed loading positionFile: " + filePath, e);
} finally {
try {
if (fr != null) fr.close();
if (jr != null) jr.close();
} catch (IOException e) {
logger.error("Error: " + e.getMessage(), e);
}
}
} public Map<Long, TailFile> getTailFiles() {
return tailFiles;
} public void setCurrentFile(TailFile currentFile) {
this.currentFile = currentFile;
} @Override
public Event readEvent() throws IOException {
List<Event> events = readEvents(1);
if (events.isEmpty()) {
return null;
}
return events.get(0);
} @Override
public List<Event> readEvents(int numEvents) throws IOException {
return readEvents(numEvents, false);
} @VisibleForTesting
public List<Event> readEvents(TailFile tf, int numEvents) throws IOException {
setCurrentFile(tf);
return readEvents(numEvents, true);
} public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
throws IOException {
if (!committed) {
if (currentFile == null) {
throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
}
logger.info("Last read was never committed - resetting position");
long lastPos = currentFile.getPos();
currentFile.updateFilePos(lastPos);
}
List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
if (events.isEmpty()) {
return events;
} Map<String, String> headers = currentFile.getHeaders();
if (annotateFileName || (headers != null && !headers.isEmpty())) {
for (Event event : events) {
if (headers != null && !headers.isEmpty()) {
event.getHeaders().putAll(headers);
}
if (annotateFileName) {
event.getHeaders().put(fileNameHeader, currentFile.getPath());
}
}
}
committed = false;
return events;
} @Override
public void close() throws IOException {
for (TailFile tf : tailFiles.values()) {
if (tf.getRaf() != null) tf.getRaf().close();
}
} /** Commit the last lines which were read. */
@Override
public void commit() throws IOException {
if (!committed && currentFile != null) {
long pos = currentFile.getLineReadPos();
currentFile.setPos(pos);
currentFile.setLastUpdated(updateTime);
committed = true;
}
} /**
* Update tailFiles mapping if a new file is created or appends are detected
* to the existing file.
*/
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
updateTime = System.currentTimeMillis();
List<Long> updatedInodes = Lists.newArrayList(); for (TaildirMatcher taildir : taildirCache) {
Map<String, String> headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) {
long inode;
try {
inode = getInode(f);
} catch (NoSuchFileException e) {
logger.info("File has been deleted in the meantime: " + e.getMessage());
continue;
}
TailFile tf = tailFiles.get(inode);
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
} else {
boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
if (updated) {
if (tf.getRaf() == null) {
tf = openFile(f, headers, inode, tf.getPos());
}
if (f.length() < tf.getPos()) {
logger.info("Pos " + tf.getPos() + " is larger than file size! "
+ "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
tf.updatePos(tf.getPath(), inode, 0);
}
}
tf.setNeedTail(updated);
}
tailFiles.put(inode, tf);
updatedInodes.add(inode);
}
}
return updatedInodes;
} public List<Long> updateTailFiles() throws IOException {
return updateTailFiles(false);
} private long getInode(File file) throws IOException {
long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
return inode;
} private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
try {
logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
return new TailFile(file, headers, inode, pos);
} catch (IOException e) {
throw new FlumeException("Failed opening file: " + file, e);
}
} /**
* Special builder class for ReliableTaildirEventReader
*/
public static class Builder {
private Map<String, String> filePaths;
private Table<String, String, String> headerTable;
private String positionFilePath;
private boolean skipToEnd;
private boolean addByteOffset;
private boolean cachePatternMatching;
private Boolean annotateFileName =
TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
private String fileNameHeader =
TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY; public Builder filePaths(Map<String, String> filePaths) {
this.filePaths = filePaths;
return this;
} public Builder headerTable(Table<String, String, String> headerTable) {
this.headerTable = headerTable;
return this;
} public Builder positionFilePath(String positionFilePath) {
this.positionFilePath = positionFilePath;
return this;
} public Builder skipToEnd(boolean skipToEnd) {
this.skipToEnd = skipToEnd;
return this;
} public Builder addByteOffset(boolean addByteOffset) {
this.addByteOffset = addByteOffset;
return this;
} public Builder cachePatternMatching(boolean cachePatternMatching) {
this.cachePatternMatching = cachePatternMatching;
return this;
} public Builder annotateFileName(boolean annotateFileName) {
this.annotateFileName = annotateFileName;
return this;
} public Builder fileNameHeader(String fileNameHeader) {
this.fileNameHeader = fileNameHeader;
return this;
} public ReliableTaildirEventReader build() throws IOException {
return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
addByteOffset, cachePatternMatching,
annotateFileName, fileNameHeader);
}
} }

三、问题解决方式(windows如何支持tail和taildir):

1、增加tail 命令支持,windows中并没有tail 命令。可以通过链接: https://files.cnblogs.com/files/laoqing/tail.zip  下载tail 命令放到windows32 目录下。

2、修改源码来支持taildir。

在flume的flume-taildir-source工程中引入如下依赖:

本文作者:张永清 转载请注明来源于博客园:https://www.cnblogs.com/laoqing/p/12836826.html

    <dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.2.2</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>4.2.2</version>
</dependency>

1)、新增Kernel32.java

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source.taildir.util;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Structure;
import com.sun.jna.platform.win32.WinBase.FILETIME;
import com.sun.jna.platform.win32.WinDef.DWORD;
import com.sun.jna.platform.win32.WinNT.HANDLE;
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIFunctionMapper;
import com.sun.jna.win32.W32APITypeMapper; /**
* Created by zhangyongqing on 2020-05-06.
*/
public interface Kernel32 extends StdCallLibrary {
final static Map WIN32API_OPTIONS = new HashMap() {
private static final long serialVersionUID = 1L; {
put(Library.OPTION_FUNCTION_MAPPER, W32APIFunctionMapper.UNICODE);
put(Library.OPTION_TYPE_MAPPER, W32APITypeMapper.UNICODE);
}
}; Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("Kernel32",
Kernel32.class, WIN32API_OPTIONS); int GetLastError(); class BY_HANDLE_FILE_INFORMATION extends Structure {
public DWORD dwFileAttributes;
public FILETIME ftCreationTime;
public FILETIME ftLastAccessTime;
public FILETIME ftLastWriteTime;
public DWORD dwVolumeSerialNumber;
public DWORD nFileSizeHigh;
public DWORD nFileSizeLow;
public DWORD nNumberOfLinks;
public DWORD nFileIndexHigh;
public DWORD nFileIndexLow; public static class ByReference extends BY_HANDLE_FILE_INFORMATION implements Structure.ByReference { } ; public static class ByValue extends BY_HANDLE_FILE_INFORMATION implements Structure.ByValue { } @Override
protected List getFieldOrder() {
List fields = new ArrayList();
fields.addAll(Arrays.asList(new String[]{"dwFileAttributes",
"ftCreationTime", "ftLastAccessTime", "ftLastWriteTime",
"dwVolumeSerialNumber", "nFileSizeHigh", "nFileSizeLow",
"nNumberOfLinks", "nFileIndexHigh", "nFileIndexLow"}));
return fields; } ;
} ; boolean GetFileInformationByHandle(HANDLE hFile,
BY_HANDLE_FILE_INFORMATION lpFileInformation);
}

2)、新增WinFileUtil.java

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source.taildir.util; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinBase;
import com.sun.jna.platform.win32.WinNT.HANDLE; import java.io.File;
import java.nio.file.Files; /**
* Created by zhangyongqing on 2020-05-06.
*/
public class WinFileUtil { public static WinFileUtil getWinFile(){
return new WinFileUtil();
}
private static Logger logger = LoggerFactory.getLogger(WinFileUtil.class); public static String getFileId(String filepath) { final int FILE_SHARE_READ = (0x00000001);
final int OPEN_EXISTING = (3);
final int GENERIC_READ = (0x80000000);
final int FILE_ATTRIBUTE_ARCHIVE = (0x20); WinBase.SECURITY_ATTRIBUTES attr = null;
org.apache.flume.source.taildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION lpFileInformation = new org.apache.flume.source.taildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION();
HANDLE hFile = null; hFile = Kernel32.INSTANCE.CreateFile(filepath, 0,
FILE_SHARE_READ, attr, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE,
null);
String ret = "0";
if (Kernel32.INSTANCE.GetLastError() == 0) { org.apache.flume.source.taildir.util.Kernel32.INSTANCE
.GetFileInformationByHandle(hFile, lpFileInformation); ret = lpFileInformation.dwVolumeSerialNumber.toString()
+ lpFileInformation.nFileIndexLow.toString(); Kernel32.INSTANCE.CloseHandle(hFile); if (Kernel32.INSTANCE.GetLastError() == 0) {
logger.debug("inode:" + ret);
return ret;
} else {
logger.error("close file:{} cause exception", filepath);
throw new RuntimeException("close file:" + filepath+" cause Exception");
}
} else {
if (hFile != null) {
Kernel32.INSTANCE.CloseHandle(hFile);
}
logger.error("open file:{} cause Exception", filepath);
throw new RuntimeException("open file :" + filepath+" cause Exception");
} } }

3)、修改ReliableTaildirEventReader.java 中的private long getInode(File file) throws IOException 方法,替换为如下代码

  private long getInode(File file) throws IOException {
long inode;
if (OS_NAME.contains("windows")) {
inode = Long.parseLong(WinFileUtil.getFileId(file.toPath().toString()));
} else {
inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
}
return inode;
}

4)、重新编译打包flume-taildir-source工程,将生成的flume-taildir-source-1.9.0.jar包替换到flume的lib目录中,并且将jna-platform-4.2.2.jar和jna-4.2.2.jar 拷贝到flume的lib目录中。重新启动taildir 采集,问题得以解决。

05-11 21:45