文件发生AlreadyBeingCreatedException

文件发生AlreadyBeingCreatedException

本文介绍了hadoop2.2.0追加文件发生AlreadyBeingCreatedException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了一个关于 hadoop2.2.0 追加操作的问题.我通过 HDFS java API 将一些字节附加到 hdfs 文件中.首先,如果在附加操作之前该文件不存在,我将创建目标文件,代码如下:

I have meet a problem about hadoop2.2.0 append operation. I append some bytes to a hdfs file by HDFS java API.First I will create the target file if the file doesn't exist before appending operation, the codes like:

String fileUri = "hdfs://hadoopmaster:9000/in/append_test.txt";
// create the hdfs file, if not exists
HdfsClient.createPathIfNotExist(fileUri);
// do 5 times append operation
for (int i=0; i<5; i++){
    HdfsClient.appendTo(fileUri, ("append content"+i).getBytes("UTF-8"));
}

createPathIfNotExist 函数:

Path p = null;
FileSystem fs = null;
try {
    fs = FileSystem.get(URI.create(uri), conf);
    p = new Path(uri);
    if (!fs.exists(p)) {
    if (uri.charAt(uri.length() - 1) == '/'){ //create a directory
        if(fs.mkdirs(p)){
               // create successfully
            }
        }else{ //create a file
        FSDataOutputStream fos = fs.create(p);
            fos.close();
        }
    } else{
        System.out.println(uri + "existing");
    }
} catch (IOException e) {
    e.printStackTrace();
} finally{
    if (fs != null)
    try {
       fs.close();
       fs = null;
    } catch (IOException e) {
       e.printStackTrace();
        }
}

appendTo 函数:

ByteArrayInputStream in = null;
OutputStream out = null;
FileSystem fs = null;
try {
   in = new ByteArrayInputStream(bytes);
   fs = FileSystem.get(URI.create(uri), conf);
   out = fs.append(new Path(uri)); //get append outputstream
   IOUtils.copyBytes(in, out, bufferSize, false);
} catch(Exception e){
   e.printStackTrace();
} finally{
   if (in != null) IOUtils.closeStream(in);
   if (out != null) IOUtils.closeStream(out);
   if (fs != null){
    try {
           fs.close();
           fs = null;
    } catch (IOException e) {
       e.printStackTrace();
    }
   }
}

结果是创建了append_test.txt,但是内容只有:

The result is the append_test.txt is created, but the content only has:

append content0

并发生异常:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to create file [/in/append_test.txt] for [DFSClient_NONMAPREDUCE_-1148656837_1] on client [192.168.141.1], because this file is already being created by [DFSClient_NONMAPREDUCE_2099912242_1] on [192.168.141.1]
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2320)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2347)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:320)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59572)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

        at org.apache.hadoop.ipc.Client.call(Client.java:1347)
        at org.apache.hadoop.ipc.Client.call(Client.java:1300)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
        at com.sun.proxy.$Proxy10.append(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy10.append(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245)
        at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1480)
        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520)
        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:310)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:306)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306)
        at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1160)
        at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:130)
        at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:1)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:356)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471)
        at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:121)
        at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:110)
        at org.lh.blog.message.test.HdfsClientTests.testCreateFileBeforeAppend(HdfsClientTests.java:26)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
        at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
        at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

也就是说,它在创建不存在的文件后只做了一次追加操作,其他4次追加操作失败,出现上述错误.我在追加之前创建了文件,但它说AlreadyBeingCreatedException,我有些困惑.

That says, it only did one time append operation after creating the nonexist file, other 4 append operations failed, occur above errors.I have created the file before append, but it says AlreadyBeingCreatedException, I am some confused.

我也有一些尝试.我发现java API创建的hdfs文件,都不能做append操作.但是hdfs命令(等,hdfs dfs -put")创建的hdfs文件可以做追加操作.

I also have some tries. I found the hdfs files created by java API, all can't do append operation. But the hdfs files created by hdfs command(etc, "hdfs dfs -put"), can do append operation.

你能帮我,给我一些建议吗?

Can you help me, give me some suggestions?

谢谢&问候.

推荐答案

为了解决问题,

  1. 读取文件内容并将其存储到变量中.
  2. 添加您希望附加到此变量的新内容.
  3. 重新创建文件并将您的内容写回其中.

这个过程对我来说效果很好并解决了问题.

This process worked fine for me and solved the issue.

APPEND 操作代价高昂,如果您尝试并行执行,则会出现此问题.因此重新创建文件并将内容重新写入其中而不是追加.

APPEND operation is expensive and if you're trying to attempt it in parallel, then this issue my arise. Hence re-create file and re-write the contents to it rather than appending.

这篇关于hadoop2.2.0追加文件发生AlreadyBeingCreatedException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 05:26