我想在纯Python中执行ETL任务,我想收集所考虑的每个原始输入文件的错误度量以及元数据(错误度量是根据文件数据部分中提供的错误代码计算的,而元数据存储在 header 中)。这是整个过程的伪代码:
import pandas as pd
import dask
from dask import delayed
from dask import dataframe as dd
META_DATA = {} # shared resource
ERRORS = {} # shared resource
def read_file(file_name):
global META_DATA, ERRORS
# step 1: process headers
headers = read_header(file_name)
errors = {}
data_bfr = []
# step 2: process data section
for line in data_section:
content_id, data = parse_line(line)
if contains_errors(data):
errors[content_id] = get_error_code(data)
else:
data_bfr.append(content_id, data)
# ---- Part relevant for question 1 ----
# step 3: acquire lock for shared resource and write metadata
with lock.acquire():
write_metadata(file_name, headers) # stores metadata in META_DATA[file_name]
write_errors(file_name, errors) # stores error metrics in ERRORS[file_name]
return pd.DataFrame(data=data_bfr,...)
with set_options(get=dask.multiprocessing.get):
df = dd.from_delayed([delayed(read_file)(file_name) \
for file_name in os.listdir(wd)])
# ---- Part relevant for question 2 ----
df.to_hdf('data.hdf', '/data', 'w', complevel=9, \
complib='blosc',..., metadata=(META_DATA, ERRORS))
对于每个输入文件,
read_file
返回一个pd.DataFrame
,进一步将相关的元数据和错误度量写入共享资源。我正在使用dask
的多处理调度程序从延迟的dask.dataframe
操作列表中计算read_file
。read_file
操作都写入共享资源META_DATA
和ERRORS
。要实现适用于dask.multiprocessing.get
的适当锁定策略,我该怎么做?从with locket.lock_file('.lock'):
-context内将元数据和错误信息写入集合就足够了吗? multiprocessing.RLock
有效吗?我必须怎么做才能初始化与dask
一起使用的锁?更根本地讲,如何在META_DATA
中将ERRORS
和dask
声明为共享资源? dask
目前不支持向数据帧添加元数据,但是是否有可能将信息写入HDF?如果是这样,在这种情况下如何处理对共享资源的访问? 最佳答案
不要依赖全局
Dask与pure functions一起使用效果最好。
特别是,您的情况是Python的局限性,因为它(正确地)不会在进程之间共享全局数据。相反,我建议您从函数显式返回数据:
def read_file(file_name):
...
return df, metadata, errors
values = [delayed(read_file)(fn) for fn in filenames]
dfs = [v[0] for v in values]
metadata = [v[1] for v in values]
errors = [v[2] for v in values]
df = dd.from_delayed(dfs)
import toolz
metadata = delayed(toolz.merge)(metadata)
errors = delayed(toolz.merge)(errors)