我有一个要使用Python SDK作为有效的.CSV文件写入GCS的值的字典。我可以将字典写成换行符分隔的文本文件,但似乎找不到将字典转换为有效.CSV的示例。有人可以建议在数据流管道中生成csv的最佳方法吗?这回答了question地址从CSV文件读取的问题,但实际上并没有解决写入CSV文件的问题。我知道CSV文件只是带有规则的文本文件,但是我仍在努力将数据字典转换为可以使用WriteToText编写的CSV文件。
这是一个简单的示例字典,我想将其转换为CSV:
test_input = [{'label': 1, 'text': 'Here is a sentence'},
{'label': 2, 'text': 'Another sentence goes here'}]
test_input | beam.io.WriteToText(path_to_gcs)
以上将导致一个文本文件,其中每个字典都位于换行符上。我可以利用Apache Beam中的任何功能(类似于csv.DictWriter)吗?
最佳答案
通常,您将需要编写一个函数,该函数可以将原始的dict
数据元素转换为csv格式的string
表示形式。
该函数可以写为DoFn
,您可以将其应用到Beam的PCollection
数据中,这会将每个收集元素转换为所需的格式;您可以通过DoFn
将PCollection
应用于ParDo
来实现。您也可以将此DoFn
包装在更加用户友好的PTransform
中。
您可以在Beam Programming Guide中了解有关此过程的更多信息
这是一个简单的,可翻译的非Beam示例:
# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
{'label': 2, 'text': 'Another sentence goes here'}]
def convert_my_dict_to_csv_record(input_dict):
""" Turns dictionary values into a comma-separated value formatted string """
return ','.join(map(str, input_dict.values()))
# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]
converted_test_input
将如下所示:['Here is a sentence,1', 'Another sentence goes here,2']
使用
DictWriter
的Beam DictToCSV DoFn和PTransform示例from csv import DictWriter
from csv import excel
from cStringIO import StringIO
...
def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
""" Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
Note: This implementation does not support unicode
"""
buf = StringIO()
writer = DictWriter(buf,
fieldnames=column_order,
restval=missing_val,
extrasaction=('ignore' if discard_extras else 'raise'),
dialect=dialect)
writer.writerow(element)
return buf.getvalue().rstrip(dialect.lineterminator)
class _DictToCSVFn(DoFn):
""" Converts a Dictionary to a CSV-formatted String
column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
missing_val: The value to be written when a named field from `column_order` is not found in the input element
discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect
"""
def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
self._column_order = column_order
self._missing_val = missing_val
self._discard_extras = discard_extras
self._dialect = dialect
def process(self, element, *args, **kwargs):
result = _dict_to_csv(element,
column_order=self._column_order,
missing_val=self._missing_val,
discard_extras=self._discard_extras,
dialect=self._dialect)
return [result,]
class DictToCSV(PTransform):
""" Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings
column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
missing_val: The value to be written when a named field from `column_order` is not found in an input element
discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect
"""
def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
self._column_order = column_order
self._missing_val = missing_val
self._discard_extras = discard_extras
self._dialect = dialect
def expand(self, pcoll):
return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
missing_val=self._missing_val,
discard_extras=self._discard_extras,
dialect=self._dialect)
)
要使用该示例,您可以将
test_input
放入PCollection
,然后将DictToCSV
PTransform
应用于PCollection
;您可以将生成的转换后的PCollection
用作WriteToText
的输入。请注意,您必须通过column_order
参数提供与字典输入元素的键相对应的列名列表或元组;结果CSV格式的字符串列将按照提供的列名的顺序排列。同样,该示例的基础实现也不支持unicode
。