问题描述
我正在努力将字典 dask.bag
转换为 dask.delayed
pandas.DataFrames
到最终的 dask.dataframe
I am struggling to convert a dask.bag
of dictionaries into dask.delayed
pandas.DataFrames
into a final dask.dataframe
我只有一个函数(make_dict ),将文件读取到一个相当复杂的嵌套字典结构中,并使用另一个函数(make_df)将这些字典转换为 pandas.DataFrame
(每个文件的数据帧约为100 mb) 。我想将所有数据框附加到单个 dask.dataframe
中以进行进一步的分析。
I have one function (make_dict) that reads files into a rather complex nested dictionary structure and another function (make_df) to turn these dictionaries into a pandas.DataFrame
(resulting dataframe is around 100 mb for each file). I would like to append all dataframes into a single dask.dataframe
for further analysis.
到目前为止,我是使用 dask.delayed
对象加载,转换和附加所有运行良好的数据(请参见下面的示例)。但是,为了以后的工作,我想使用 dask.persist()
将加载的字典存储在 dask.bag
中。
Up to now I was using dask.delayed
objects to load, convert and append all data which works fine (see example below). However for future work I would like to store the loaded dictionaries in a dask.bag
using dask.persist()
.
我设法将数据加载到 dask.bag
中,生成了词典列表或 pandas.DataFrame
,我可以在调用 compute()
后在本地使用。当我尝试使用 to_delayed()将
dask.bag
转换为 dask.dataframe
时/ code>,但是,我遇到了一个错误(见下文)。
I managed to load the data into dask.bag
, resulting in a list of dicts or list of pandas.DataFrame
that I can use locally after calling compute()
. When I tried turning the dask.bag
into a dask.dataframe
using to_delayed()
however, I got stuck with an error (see below).
感觉好像我在这里错过了一些相当简单的东西,或者也许我对 dask.bag
是错误的吗?
It feels like I am missing something rather simple here or maybe my approach to dask.bag
is wrong?
下面的示例显示了我使用简化函数的方法并抛出了相同的错误。
The below example shows my approach using simplified functions and throws the same error. Any advice on how to tackle this is appreciated.
import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag
print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2
def make_dict(n=1):
return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}
def make_df(d):
return pd.DataFrame(d['data'])
k = [1,2,3]
# using dask.delayed
dfs = []
for n in k:
delayed_1 = dask.delayed(make_dict)(n)
delayed_2 = dask.delayed(make_df)(delayed_1)
dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected
# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)
df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails
# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [ A B
# 0 0 0]
我最终想使用分布式调度程序做什么:
what I ultimately would like to do using the distributed scheduler:
b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())
推荐答案
在袋子中,延迟的对象指向元素列表,因此您有一个熊猫数据框列表列表,这并不是您想要的。两条建议
In the bag case the delayed objects point to lists of elements, so you have a list of lists of pandas dataframes, which is not quite what you want. Two recommendations
- 只要坚持一下dask.delayed。看来对您来说效果很好
- 使用方法,该方法需要一堆字典,并自行进行数据框转换
- Just stick with dask.delayed. It seems to work well for you
- Use the Bag.to_dataframe method, which expects a bag of dicts, and does the dataframe conversion itself
这篇关于使用dask.delayed和pandas.DataFrame将字典的dask.bag转换为dask.dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!