问题描述
我遇到了关于hadoop2.2.0追加操作的问题。我通过HDFS java API将一些字节附加到hdfs文件中。首先,我将创建目标文件,如果该文件在附加操作之前不存在,则代码如下: String fileUri =hdfs:// hadoopmaster:9000 / in / append_test.txt;
//创建hdfs文件(如果不存在)
HdfsClient.createPathIfNotExist(fileUri);
//做5次追加操作
for(int i = 0; i HdfsClient.appendTo(fileUri,(append content+ i).getBytes( UTF-8));
createPathIfNotExist
函数:
Path p = null;
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(uri),conf);
p =新路径(uri);
if(!fs.exists(p)){
if(uri.charAt(uri.length() - 1)=='/'){//创建一个目录
if (fs.mkdirs(p)){
//创建成功
}
} else {//创建文件
FSDataOutputStream fos = fs.create(p);
fos.close();
}
} else {
System.out.println(uri +existing);
}
} catch(IOException e){
e.printStackTrace();
} finally {
if(fs!= null)
try {
fs.close();
fs = null;
} catch(IOException e){
e.printStackTrace();
appendTo
function:
ByteArrayInputStream in = null;
OutputStream out = null;
FileSystem fs = null;
尝试{
in = new ByteArrayInputStream(bytes);
fs = FileSystem.get(URI.create(uri),conf);
out = fs.append(new Path(uri)); //获得append outputstream
IOUtils.copyBytes(in,out,bufferSize,false);
} catch(Exception e){
e.printStackTrace();
} finally {
if(in!= null)IOUtils.closeStream(in);
if(out!= null)IOUtils.closeStream(out);
if(fs!= null){
try {
fs.close();
fs = null;
} catch(IOException e){
e.printStackTrace();
结果是append_test。 txt已创建,但内容仅包含:
追加内容0
并发生异常:
org.apache.hadoop.ipc .RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):未能创建[/in/append_test.txt]为[DFSClient_NONMAPREDUCE_-1148656837_1]在客户端[192.168.141.1],因为已经由创建该文件的文件[DFSClient_NONMAPREDUCE_2099912242_1]在[192.168.141.1]在org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2320)
。在org.apache.hadoop.hdfs.server .namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
在org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
在org.apache.hadoop .hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.j AVA:2347)
在org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
在org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append( ClientNamenodeProtocolServerSideTranslatorPB.java:320)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos $ ClientNamenodeProtocol $ 2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59572)
at org.apache.hadoop.ipc.ProtobufRpcEngine $服务器$ ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
在org.apache.hadoop.ipc.RPC $ Server.call(RPC.java:928)
在org.apache.hadoop.ipc .Server $ Handler $ 1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server $ Handler $ 1.run(Server.java:2044)$ b $ at java.security.AccessController .doPrivileged(Native方法)
位于javax.security.auth.Subject.doAs(Subject.java:415)
位于org.apache.hadoop.security.UserGroupInformation.doAs(UserGroup Information.java:1491)
在org.apache.hadoop.ipc.Server $ Handler.run(Server.java:2042)
$ b $在org.apache.hadoop.ipc.Client。调用(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at org.apache.hadoop.ipc.ProtobufRpcEngine $ Invoker.invoke( ProtobufRpcEngine.java:206)
at com.sun.proxy $ Proxy10.append(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect。 NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)在org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
在org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
在com.sun.proxy。$ Proxy10.appen d(来源不明)在org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245)
在org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java :1480)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508)
。在org.apache.hadoop.hdfs.DistributedFileSystem $ 4.doCall(DistributedFileSystem.java:310)
在org.apache.hadoop.hdfs.DistributedFileSystem $ 4.doCall(DistributedFileSystem.java:306)
。在在org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306)
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
。在组织。 apache.hadoop.fs.FileSystem.append(FileSystem.java:1160)
在org.lh.blog.message.hadoop.HdfsClient $ 2.run(HdfsClient.java:130)
在org.lh .blog.message.hadoop.HdfsC lient $ 2.run(HdfsClient.java:1)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:356)
。在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471)
在org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:121)
在org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:110)
在org.lh.blog.message.test.HdfsClientTests.testCreateFileBeforeAppend(HdfsClientTests.java:26)
在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod $ 1.runReflectiveCall(FrameworkMethod.java:45)
在org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
在org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
。在组织.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
在org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
在org.junit.runners .BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
在org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
在org.junit.runners.ParentRunner $ 3.run(ParentRunner。 java:231)
at org.junit.runners.ParentRunner $ 1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
在org.junit.runners.ParentRunner.access $ 000(ParentRunner.java:50)
在org.junit.runners.ParentRunner $ 2.evaluate(ParentRunner.java:222)
在org.junit。 runners.ParentR unner.run(ParentRunner.java:300)
在org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
在org.eclipse.jdt.internal。在org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests junit.runner.TestExecution.run(TestExecution.java:38)
(RemoteTestRunner.java:467)
。在org.eclipse。 jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
在org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
。在org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
也就是说,创建非文件文件后只做了一次追加操作,其他4个追加操作失败,发生错误以上。
我在append之前创建了这个文件,但它说 AlreadyBeingCreatedException
,我有些困惑。
我也有一些尝试。我发现由java API创建的hdfs文件,都不能做追加操作。但是由hdfs命令(例如hdfs dfs -put)创建的hdfs文件可以执行附加操作。
您能帮我,给我一些建议吗? / p>
谢谢,
解决方案您可以将投机行为设置为false
job.setSpeculativeExecution(假);
或者可以创建一个文件到某个临时位置,并在路径和合并中添加时间戳或其他任何变量后来的结果是基本路径
I have meet a problem about hadoop2.2.0 append operation. I append some bytes to a hdfs file by HDFS java API.First I will create the target file if the file doesn't exist before appending operation, the codes like:
String fileUri = "hdfs://hadoopmaster:9000/in/append_test.txt";
// create the hdfs file, if not exists
HdfsClient.createPathIfNotExist(fileUri);
// do 5 times append operation
for (int i=0; i<5; i++){
HdfsClient.appendTo(fileUri, ("append content"+i).getBytes("UTF-8"));
}
The createPathIfNotExist
function:
Path p = null;
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(uri), conf);
p = new Path(uri);
if (!fs.exists(p)) {
if (uri.charAt(uri.length() - 1) == '/'){ //create a directory
if(fs.mkdirs(p)){
// create successfully
}
}else{ //create a file
FSDataOutputStream fos = fs.create(p);
fos.close();
}
} else{
System.out.println(uri + "existing");
}
} catch (IOException e) {
e.printStackTrace();
} finally{
if (fs != null)
try {
fs.close();
fs = null;
} catch (IOException e) {
e.printStackTrace();
}
}
The appendTo
function:
ByteArrayInputStream in = null;
OutputStream out = null;
FileSystem fs = null;
try {
in = new ByteArrayInputStream(bytes);
fs = FileSystem.get(URI.create(uri), conf);
out = fs.append(new Path(uri)); //get append outputstream
IOUtils.copyBytes(in, out, bufferSize, false);
} catch(Exception e){
e.printStackTrace();
} finally{
if (in != null) IOUtils.closeStream(in);
if (out != null) IOUtils.closeStream(out);
if (fs != null){
try {
fs.close();
fs = null;
} catch (IOException e) {
e.printStackTrace();
}
}
}
The result is the append_test.txt is created, but the content only has:
append content0
And occur exception:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to create file [/in/append_test.txt] for [DFSClient_NONMAPREDUCE_-1148656837_1] on client [192.168.141.1], because this file is already being created by [DFSClient_NONMAPREDUCE_2099912242_1] on [192.168.141.1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2320)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2347)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:320)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59572)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)
at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy10.append(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.append(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245)
at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1480)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:310)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:306)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1160)
at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:130)
at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:1)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:356)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471)
at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:121)
at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:110)
at org.lh.blog.message.test.HdfsClientTests.testCreateFileBeforeAppend(HdfsClientTests.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
That says, it only did one time append operation after creating the nonexist file, other 4 append operations failed, occur above errors.I have created the file before append, but it says AlreadyBeingCreatedException
, I am some confused.
I also have some tries. I found the hdfs files created by java API, all can't do append operation. But the hdfs files created by hdfs command(etc, "hdfs dfs -put"), can do append operation.
Can you help me, give me some suggestions?
Thanks & Regards.
解决方案 you can set the speculative action to false
job.setSpeculativeExecution(false);
or can create a file to some temporary location with the timestamp or any other variable things in path and merge the result to base path later
这篇关于hadoop2.2.0附加文件发生AlreadyBeingCreatedException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!