启动NullPointerException
作业时出现MapReduce
。它是由SerializationFactory
的getSerializer()
方法抛出的。我正在使用自定义的InputSplit
,InputFormat
,RecordReader
和MapReduce
值类。
我知道在由InputFormat
类创建拆分之后但在创建RecordReader
之前的一段时间会引发错误。据我所知,它是在“清理登台区域”消息之后立即发生的。
通过在堆栈跟踪指示的位置检查Hadoop源,当getSerialization()
接收到空Class<T>
指针时,看起来好像发生了错误。 JobClient的writeNewSplits()
调用如下方法:
Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
因此,我假设在自定义的
getClass()
对象上调用InputSplit
时,它返回的是null
指针,但这只是莫名其妙。有任何想法吗?错误的完整堆栈跟踪如下:
12/06/24 14:26:49 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/tmp/hadoop-s3cur3/mapred/staging/s3cur3/.staging/job_201206240915_0035 Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:123) at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:74) at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:968) at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:979) at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850) at org.apache.hadoop.mapreduce.Job.submit(Job.java:500) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530) at edu.cs.illinois.cogcomp.hadoopinterface.infrastructure.CuratorJob.start(CuratorJob.java:94) at edu.cs.illinois.cogcomp.hadoopinterface.HadoopInterface.main(HadoopInterface.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Thanks!
EDIT: My code for the custom InputSplit follows:
import . . .
/**
* A document directory within the input directory.
* Returned by DirectoryInputFormat.getSplits()
* and passed to DirectoryInputFormat.createRecordReader().
*
* Represents the data to be processed by an individual Map process.
*/
public class DirectorySplit extends InputSplit {
/**
* Constructs a DirectorySplit object
* @param docDirectoryInHDFS The location (in HDFS) of this
* document's directory, complete with all annotations.
* @param fs The filesystem associated with this job
*/
public DirectorySplit( Path docDirectoryInHDFS, FileSystem fs )
throws IOException {
this.inputPath = docDirectoryInHDFS;
hash = FileSystemHandler.getFileNameFromPath(inputPath);
this.fs = fs;
}
/**
* Get the size of the split so that the input splits can be sorted by size.
* Here, we calculate the size to be the number of bytes in the original
* document (i.e., ignoring all annotations).
*
* @return The number of characters in the original document
*/
@Override
public long getLength() throws IOException, InterruptedException {
Path origTxt = new Path( inputPath, "original.txt" );
HadoopInterface.logger.log( msg );
return FileSystemHandler.getFileSizeInBytes( origTxt, fs);
}
/**
* Get the list of nodes where the data for this split would be local.
* This list includes all nodes that contain any of the required data---it's
* up to Hadoop to decide which one to use.
*
* @return An array of the nodes for whom the split is local
* @throws IOException
* @throws InterruptedException
*/
@Override
public String[] getLocations() throws IOException, InterruptedException {
FileStatus status = fs.getFileStatus(inputPath);
BlockLocation[] blockLocs = fs.getFileBlockLocations( status, 0,
status.getLen() );
HashSet<String> allBlockHosts = new HashSet<String>();
for( BlockLocation blockLoc : blockLocs ) {
allBlockHosts.addAll( Arrays.asList( blockLoc.getHosts() ) );
}
return (String[])allBlockHosts.toArray();
}
/**
* @return The hash of the document that this split handles
*/
public String toString() {
return hash;
}
private Path inputPath;
private String hash;
private FileSystem fs;
}
最佳答案
InputSplit不扩展Writable,您将需要明确声明您的输入拆分实现Writable
关于java - 调用InputSplit的getClass()时,来自Hadoop JobSplitWriter/SerializationFactory的NullPointerException,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/11180776/