我正在尝试将JSON文件转换为字典并应用键/值对,因此我可以使用groupbykey()基本上对键/值对进行重复数据删除。

这是文件的原始内容:

{"tax_pd":"200003","ein":"720378282"}{"tax_pd":"200012","ein":"274027765"}{"tax_pd":"200012","ein":"042746989"}{"tax_pd":"200012","ein":"205993971"}

我将其格式化为:

(u'201208', u'010620100')(u'201208', u'860785769')(u'201208', u'371650138')(u'201208', u'237253410')

我想将它们变成键/值对,因此可以在Dataflow Pipeline中应用GroupByKey。我相信我需要先将其变成字典?

我是python和google cloud应用程序的新手,那么一些帮助将非常有用!

编辑:代码段

with beam.Pipeline(options=pipeline_options) as p: (p | 'ReadInputText' >> beam.io.ReadFromText(known_args.input) | 'YieldWords' >> beam.ParDo(ExtractWordsFn()) # | 'GroupByKey' >> beam.GroupByKey() | 'WriteInputText' >> beam.io.WriteToText(known_args.output))

class ExtractWordsFn(beam.DoFn): def process(self, element): words = re.findall(r'[0-9]+', element) yield tuple(words)

最佳答案

一个快速的纯Python解决方案是:

import json

with open('path/to/my/file.json','rb') as fh:
    lines = [json.loads(l) for l in fh.readlines()]

# [{'tax_pd': '200003', 'ein': '720378282'}, {'tax_pd': '200012', 'ein': '274027765'}, {'tax_pd': '200012', 'ein': '042746989'}, {'tax_pd': '200012', 'ein': '205993971'}]


查看数据,您没有唯一的键来执行key:value by tax_pdein。假设会有冲突,您可以执行以下操作:

myresults = {}

for line in lines:
    # I'm assuming we want to use tax_pd as the key, and ein as the value, but this can be extended to other keys

    # This will return None if the tax_pd is not already found
    if not myresults.get(line.get('tax_pd')):
        myresults[line.get('tax_pd')] = [line.get('ein')]
    else:
        myresults[line.get('tax_pd')] = list(set([line.get('ein'), *myresults[line.get('tax_pd')]))

#results
#{'200003': ['720378282'], '200012': ['205993971', '042746989', '274027765']}


这样,您将拥有唯一的键,以及相应的唯一ein值的列表。不完全确定这是否是您要的。 set将自动删除列表,包装的list将转换数据类型

然后,您可以通过tax_id显式查找:

myresults.get('200012')
# ['205993971', '042746989', '274027765']


编辑:要从云存储中读取,将代码段here转换为更易于使用:

with gcs.open(filename) as fh:
    lines = fh.read().split('\n')


您可以使用其api文档设置gcs对象

08-16 23:22