我在数据帧中有一些时间序列数据,其中时间作为索引。对索引进行排序,并将数据存储在多个实木复合地板文件中,每个文件中包含一天的数据。我使用dask 2.9.1
当我从一个实木复合地板文件加载数据时,分区设置正确。
当我从多个文件加载数据时,在得到的dask数据帧中没有得到定义。
下面的示例说明了该问题:
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet" )
print(ddf.npartitions, ddf.divisions)
在这里我得到2个分区和
(None, None, None)
作为分区我可以让dd.read_parquet将分区设置为实际值吗?
更新资料
在我的实际数据中,我有一个实木复合地板文件。
通过保存来自使用时间戳作为索引的数据帧中的数据来创建文件。索引已排序。每个文件的大小为100-150MB,当加载到内存中时,它使用的应用程序为2.5GB RAM,激活索引非常重要,因为重新创建索引确实很繁琐。
我没有设法在read_parquet上找到参数或引擎的组合,从而无法在加载时创建分区。
数据文件被命名为“ yyyy-mm-dd.parquet”,因此我不得不根据该信息创建分区:
from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [ pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions
这无法使用索引,并且在某些情况下,它失败并显示“ TypeError:只能将元组(而非“列表”)连接到元组”
然后我尝试将除法设置为元组
ddf.divisions = tuple(divisions)
,然后它起作用了。当索引设置正确时,速度很快更新2
更好的方法是分别读取dask数据帧,然后将它们串联:
from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)
通过这种方式设置分隔,还解决了另一个问题,即随着时间的推移处理列的增加。
最佳答案
下面我重写了使用concat的原始问题,这解决了我的问题
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
# create two example parquet files
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq" )
# read the files and concatenate
ddf = dd.concat([dd.read_parquet( d ) for d in ["df1d.parq", "df2d.parq"] ], axis=0)
print(ddf.npartitions, ddf.divisions)
我仍然可以获得预期的2个分区,但是现在这些分区是
(Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-02 23:00:00'))
关于python - 加载多个Parquet文件时保留dask数据帧划分,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/59570546/