问题描述
最终解决问题在底部检查我的解决方案
最近我试图在Mahout in Action中的chaper6(列表6.1〜6.4)中运行推荐器示例。但我遇到了一个问题,我搜索了一下,但我找不到解决方案。
下面是问题:我有一对mapper-reducer
public final class WikipediaToItemPrefsMapper扩展
Mapper< LongWritable,Text,VarLongWritable,VarLongWritable> {
private static final Pattern NUMBERS = Pattern.compile((\\d +));
$ b $ @Override
protected void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
String line = value.toString();
Matcher m = NUMBERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while(m.find()){
itemID.set(Long.parseLong(m.group()));
context.write(userID,itemID);
$ b public class WikipediaToUserVectorReducer
extends
Reducer< VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
$ b $ public void reduce(VarLongWritable userID,
Iterable< VarLongWritable> itemPrefs,Context context)
throws IOException,InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer。 MAX_VALUE,100); (VarLongWritable itemPref:itemPrefs){
userVector.set((int)itemPref.get(),1.0f);
}
context.write(userID,new VectorWritable(userVector));
code $
Reducer输出一个userID和一个userVector,像这样:
98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} 然后我想用另一对mapper-reducer来处理这些数据
public class UserVectorSplitterMapper
extends
Mapper< VarLongWritable,VectorWritable, IntWritable,VectorOrPrefWritable> {
public void map(VarLongWritable key,VectorWritable value,Context context)
throws IOException,InterruptedException {
long userID = key.get();
Vector userVector = value.get();
Iterator< Vector.Element> it = userVector.iterateNonZero();
IntWritable itemIndexWritable = new IntWritable();
while(it.hasNext()){
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue =(float)e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable,
VectorOrPrefWritable(userID,preferenceValue));
}
}
}
当我尝试运行工作,它表明错误说
第一个mapper-reducer将输出写入hdfs,第二个mapper-reducer尝试读取输出,mapper可以将98955转换为VarLongWritable,但不能将
{590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}转换为VectorWritable,所以我想知道是否有办法使第一个mapper-reducer直接将输出发送到第二对,那么就不需要进行数据转换。我已经在查找Hadoop的行动,并hadoop:权威指南,似乎没有这样的方式来做到这一点,有什么建议?
解决方案:通过使用 SequenceFileOutputFormat ,我们可以输出并保存第一个MapReduce工作流的reduce结果DFS,那么第二个MapReduce工作流可以在创建映射器时使用 SequenceFileInputFormat 类作为参数来读取临时文件作为输入。由于矢量将保存在具有特定格式的二进制序列文件中, SequenceFileInputFormat 可以读取它并将其转换回矢量格式。
以下是一些示例代码:
confFactory ToItemPrefsWorkFlow = new confFactory
(new Path(/ dbout),//输入文件路径
new Path(/ mahout / output.txt),//输出文件路径
TextInputFormat.class,/ /输入格式
VarLongWritable.class,//映射器密钥格式
Item_Score_Writable.class,//映射器值格式
VarLongWritable.class,// reducer密钥格式
VectorWritable.class, // reducer值格式
** SequenceFileOutputFormat.class ** //减速器输出格式
);
ToItemPrefsWorkFlow.setMapper(WikipediaToItemPrefsMapper.class);
ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
JobConf conf1 = ToItemPrefsWorkFlow.getConf();
confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
(new Path(/ mahout / output.txt),
new Path(/ mahout / UserVectorToCooccurrence),
SequenceFileInputFormat.class,//注意第二个工作流的映射器的输入格式现在是SequenceFileInputFormat.class
//UserVectorToCooccurrenceMapper.class,
IntWritable.class,
IntWritable .class,
IntWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class
);
UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
JobClient.runJob(conf1);
JobClient.runJob(conf2);
如果您有任何问题,请随时与我联系
解决方案您需要显式配置第一个作业的输出以使用SequenceFileOutputFormat并定义输出键和值类: p>
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);
没有看到您的驱动程序代码,我猜你正在使用TextOutputFormat作为输出,第一个作业,TextInputFormat作为第二个输入 - 并且此输入格式将
< Text,Text>
的对发送给第二个映射器Problem Solved Eventually check my solution in the bottom
Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.
Here is the problem: I have a pair of mapper-reducer
public final class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> { private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while (m.find()) { itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } public class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> { public void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int) itemPref.get(), 1.0f); } context.write(userID, new VectorWritable(userVector)); } }
The reducer output a userID and a userVector and it looks like this:98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}
Then I want to use another pair of mapper-reducer to process this data
public class UserVectorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> { public void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException { long userID = key.get(); Vector userVector = value.get(); Iterator<Vector.Element> it = userVector.iterateNonZero(); IntWritable itemIndexWritable = new IntWritable(); while (it.hasNext()) { Vector.Element e = it.next(); int itemIndex = e.index(); float preferenceValue = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userID, preferenceValue)); } } }
When I try to run the job, it cast error says
the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert{590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?
Problem solved
Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.
Here are some example code:
confFactory ToItemPrefsWorkFlow = new confFactory (new Path("/dbout"), //input file path new Path("/mahout/output.txt"), //output file path TextInputFormat.class, //input format VarLongWritable.class, //mapper key format Item_Score_Writable.class, //mapper value format VarLongWritable.class, //reducer key format VectorWritable.class, //reducer value format **SequenceFileOutputFormat.class** //The reducer output format ); ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class); ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class); JobConf conf1 = ToItemPrefsWorkFlow.getConf(); confFactory UserVectorToCooccurrenceWorkFlow = new confFactory (new Path("/mahout/output.txt"), new Path("/mahout/UserVectorToCooccurrence"), SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class //UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class ); UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class); UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class); JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf(); JobClient.runJob(conf1); JobClient.runJob(conf2);
If you have any problem with this please feel free to contact me
解决方案You need to explicitly configure the output of the first job to use the SequenceFileOutputFormat and define the output key and value classes:
job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(VarLongWritable.class); job.setOutputKeyClass(VectorWritable.class);
Without seeing your driver code, i'm guessing you're using TextOutputFormat as the output, of the first job, and TextInputFormat as the input to the second - and this input format sends pairs of
<Text, Text>
to the second mapper这篇关于如何将mapper-reducer的输出直接发送到另一个mapper-reducer而不将输出保存到hdfs中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!