问题描述
我最近发现了 dask 模块,旨在成为一个易于使用的 Python 并行处理模块.对我来说最大的卖点是它适用于熊猫.
I recently found dask module that aims to be an easy-to-use python parallel processing module. Big selling point for me is that it works with pandas.
在其手册页上阅读了一些内容后,我找不到一种方法来完成这项微不足道的可并行化任务:
After reading a bit on its manual page, I can't find a way to do this trivially parallelizable task:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
目前,为了在 dask 中实现这一点,AFAIK,
At the moment, to achieve this in dask, AFAIK,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
这是丑陋的语法,实际上比完全慢
which is ugly syntax and is actually slower than outright
df.apply(func, axis = 1) # for pandas DF row apply
有什么建议吗?
感谢@MRocklin 提供地图功能.它似乎比普通的熊猫应用慢.这与熊猫 GIL 发布问题有关还是我做错了?
Thanks @MRocklin for the map function. It seems to be slower than plain pandas apply. Is this related to pandas GIL releasing issue or am I doing it wrong?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
推荐答案
map_partitions
您可以使用 map_partitions
函数将函数应用于数据帧的所有分区.
map_partitions
You can apply your function to all of the partitions of your dataframe with the map_partitions
function.
df.map_partitions(func, columns=...)
请注意,func 将一次只提供部分数据集,而不是像 pandas apply
那样的整个数据集(如果您想进行并行处理,您可能不希望这样做.)
Note that func will be given only part of the dataset at a time, not the entire dataset like with pandas apply
(which presumably you wouldn't want if you want to do parallelism.)
您可以使用 map
df.mycolumn.map(func)
您可以使用 apply
df.apply(func, axis=1)
线程与进程
从 0.6.0 版本开始 dask.dataframes
与线程并行化.自定义 Python 函数不会从基于线程的并行性中获得太多好处.你可以试试流程
Threads vs Processes
As of version 0.6.0 dask.dataframes
parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead
df = dd.read_csv(...)
df.map_partitions(func, columns=...).compute(scheduler='processes')
但避免apply
然而,你真的应该避免 apply
使用自定义 Python 函数,无论是在 Pandas 还是在 Dask.这通常是性能不佳的根源.可能是因为如果您找到一种以矢量化方式进行操作的方法,那么您的 Pandas 代码可能会快 100 倍,并且您根本不需要 dask.dataframe.
But avoid apply
However, you should really avoid apply
with custom Python functions, both in Pandas and in Dask. This is often a source of poor performance. It could be that if you find a way to do your operation in a vectorized manner then it could be that your Pandas code will be 100x faster and you won't need dask.dataframe at all.
对于您的特定问题,您可以考虑 numba
.这会显着提高您的表现.
For your particular problem you might consider numba
. This significantly improves your performance.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
免责声明,我为一家同时制作 numba
和 dask
的公司工作,并雇佣了许多 pandas
开发人员.
Disclaimer, I work for the company that makes both numba
and dask
and employs many of the pandas
developers.
这篇关于python dask DataFrame,是否支持(平凡可并行化)行应用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!