

我正在尝试学习将Yelp的Python API用于MapReduce和MRJob.他们简单的单词计数器示例很有意义,但是我很好奇一个人如何处理涉及多个输入的应用程序.例如,与其简单地对文档中的单词进行计数,不如将向量乘以矩阵.我想出了这个解决方案,它可以起作用,但感觉很愚蠢:

I'm trying to learn to use Yelp's Python API for MapReduce, MRJob. Their simple word counter example makes sense, but I'm curious how one would handle an application involving multiple inputs. For instance, rather than simply counting the words in a document, multiplying a vector by a matrix. I came up with this solution, which functions, but feels silly:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":

此代码运行./matrix.py < input.txt,它起作用的原因是按列存储在input.txt中的矩阵,在行的末尾具有相应的向量值.

This code is run ./matrix.py < input.txt and the reason it works is that the matrix stored in input.txt by columns, with the corresponding vector value at the end of the line.


So, the following matrix and vector:


are represented as input.txt as:


In short, how would I go about storing the matrix and vector more naturally in separate files and passing them both into MRJob?



If you're in need of processing your raw data against another (or same row_i, row_j) data set, you can either:


1) Create an S3 bucket to store a copy of your data. Pass the location of this copy to your task class, e.g. self.options.bucket and self.options.my_datafile_copy_location in the code below. Caveat: Unfortunately, it seems that the whole file must get "downloaded" to the task machines before getting processed. If the connections falters or takes too long to load, this job may fail. Here is some Python/MRJob code to do this.


Put this in your mapper function:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>

2)创建一个SimpleDB域,并将所有数据存储在其中.在boto和SimpleDB上阅读此处: http://code.google.com/p/boto/wiki/SimpleDbIntro

2) Create a SimpleDB domain, and store all of your data in there.Read here on boto and SimpleDB:http://code.google.com/p/boto/wiki/SimpleDbIntro


dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>


This second option may perform better if you have very large amounts of data, since it can make the requests for each row of data rather than the whole amount at once. Keep in mind that SimpleDB values can only be a maximum of 1024 characters long, so you may need to compress/decompress via some method if your data values are longer than that.


05-29 04:24