我是Python的新手。我也是pysaprk的新手。我正在尝试运行一个代码,该代码采用看起来像这个(id , (span, mention))的元组的元组来执行.map(lambda (id, (span, text)): (id, text))

我正在使用的代码是:

 m = text\
            .map(lambda (id, (span, text)): (id, text))\
            .mapValues(lambda v: ngrams(v, self.max_ngram))\'''error triggered here'''
            .flatMap(lambda (target, tokens): (((target, t), 1) for t in tokens))\

这是原始数据的格式(id, source, span, text):
 {'_id': u'en.wikipedia.org/wiki/Cerambycidae',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
  'span': (61, 73),
  'text': u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.'},
 {'_id': u'en.wikipedia.org/wiki/Dru_Drury',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
  'span': (20, 29),
  'text': u'It was described by Dru Drury in 1770.'}]

我收到此错误:
 for k, v in iterator:
TypeError: tuple indices must be integers, not str

我知道groupByKey可在pairwiseRDD上使用,所以我想知道如何正确执行groupByKey来解决此问题?

任何帮助或指导将不胜感激。

我正在使用python 2.7和pyspark 2.3.0。

先感谢您。

最佳答案

首先,您需要将数据映射到具有键和值的表单中,然后再映射到groupByKey

键和值的形式始终是元组(a,b),键为a和值b。 a和b本身可能是元组。

rdd = sc.parallelize([{'_id': u'en.wikipedia.org/wiki/Cerambycidae',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
  'span': (61, 73),
  'text': u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.'},
 {'_id': u'en.wikipedia.org/wiki/Dru_Drury',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
  'span': (20, 29),
  'text': u'It was described by Dru Drury in 1770.'},
 {'_id': u'en.wikipedia.org/wiki/Dru_Drury',
  'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens2',
  'span': (20, 29, 2),
  'text': u'It was described by Dru Drury in 1770.2'}])

print rdd.map(lambda x: (x["_id"], (x["span"], x["text"]))).groupByKey()\
.map(lambda x: (x[0], list(x[1]))).collect()

关于python - 如何使用pyspark为非pairwiseRDD正确groupByKey,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50426245/

10-12 06:26