问题描述
我对Hadoop和MapReduce完全陌生,正试图通过它来工作。
我想在Python中开发一个mapreduce应用程序,其中使用了2个.CSV文件中的数据。我只是读取mapper中的两个文件,然后将键值对从文件打印到sys.stdout中。
当我在单个程序中使用它时,程序运行正常机器,但与Hadoop流,我得到一个错误。我认为我在读取Hadoop上的映射器中的文件时犯了一些错误。请帮助我解决代码问题,并告诉我如何在Hadoop Streaming中使用文件处理。 mapper.py代码如下。 (您可以从注释中了解代码):
#!/ usr / bin / env python
import sys
from numpy import genfromtxt
def read_input(inVal):
inVal中的行:
#将行分割成单词
yield line.strip( )
def main(separator ='\t'):
#输入来自STDIN(标准输入)
labels = []
data = []
incoming = read_input(sys.stdin)
传入vals:
#将结果写入STDOUT(标准输出);
#这里我们输出的是
#Reduce步骤的输入,即reducer.py的输入
#
#tab-delimited;
if len(vals)> 10:
data.append(vals)
else:
labels.append(vals)
在范围内(0,len(标签)):
打印%s%s%s\\\
%(labels [i],separator,data [i])
if __name__ ==__main__:
main()
从两个.csv文件输入到此映射器的记录有60000条如下所示(在单机上,不是hadoop集群):
cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py
我能解决在搜索解决方案后3天后发布。
问题出在Hadoop的新版本(在我的情况下是2.2.0)。当从文件读取值时,映射器代码在某个时间点给出非零的退出代码(可能是因为它一次读取了大量值(784))。 Hadoop 2.2.0中有一个设置,它告诉Hadoop系统给出一个通用错误(子进程失败,代码为1)。该设置默认设置为True。我只需将此属性的值设置为False,并使代码运行时没有任何错误。
设置为: stream.non.zero。 exit.is.failure 即可。流式传输时将其设置为false。因此,流式命令会有点像:
** hadoop jar ... -D stream.non.zero.exit。 is.failure = false ... **
希望它可以帮助某人,并节省3天.. 。;)
I am completely new to Hadoop and MapReduce and am trying to work my way through it.I am trying to develop a mapreduce application in python, in which I use data from 2 .CSV files. I am just reading the two files in mapper and then printing the key value pair from the files to the sys.stdout
The program runs fine when I use it on a single machine, but with the Hadoop Streaming, I get an error. I think I am making some mistake in reading files in the mapper on Hadoop. Please help me out with the code, and tell me how to use file-handling in Hadoop Streaming. The mapper.py code is as below. (You can understand the code from the comments):
#!/usr/bin/env python
import sys
from numpy import genfromtxt
def read_input(inVal):
for line in inVal:
# split the line into words
yield line.strip()
def main(separator='\t'):
# input comes from STDIN (standard input)
labels=[]
data=[]
incoming = read_input(sys.stdin)
for vals in incoming:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited;
if len(vals) > 10:
data.append(vals)
else:
labels.append(vals)
for i in range(0,len(labels)):
print "%s%s%s\n" % (labels[i], separator, data[i])
if __name__ == "__main__":
main()
There are 60000 records which are entered to this mapper from two .csv files as follows (on single machine, not hadoop cluster):
cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py
I was able to resolve the issue after searching a solution for like 3 days.
The problem is with the newer version of Hadoop (2.2.0 in my case). The mapper code, when reading values from files was giving an exit code of non-zero at some point (maybe because it was reading a huge list of values(784) at a time). There is a setting in the Hadoop 2.2.0, which tells the Hadoop System to give a general error (subprocess failed with code 1). This setting is set to True by default. I just had to set the value of this property to False, and it made my code run without any errors.
Setting is: stream.non.zero.exit.is.failure. Just set it to false when streaming. So the streaming command would be somewhat like:
**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**
Hope it helps someone, and saves 3 days... ;)
这篇关于在Python中使用Hadoop Streaming中的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!