我只是read this introduction,但是在实现任何一个示例时都遇到了麻烦(注释的代码是第二个示例):
import asyncio
import pandas as pd
from openpyxl import load_workbook
async def loop_dfs(dfs):
async def clean_df(df):
df.drop(["column_1"], axis=1, inplace=True)
... a bunch of other inplace=True functions ...
return "Done"
# tasks = [clean_df(df) for (table, dfs) in dfs.items()]
# await asyncio.gather(*tasks)
tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
def main():
dfs = {
sn: pd.read_excel("excel.xlsx", sheet_name=sn)
for sn in load_workbook("excel.xlsx").sheetnames
}
# loop = asyncio.get_event_loop()
# loop.run_until_complete(loop_dfs(dfs))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop_dfs(dfs))
finally:
loop.close()
main()
我还看到了其他几篇有关熊猫如何不支持asyncio的文章,也许我只是想念一幅大图,但是如果我执行就地操作就没关系了吗? I saw recommendations for Dask但没有立即支持阅读excel,我想先尝试一下,但我不断
RuntimeError: Event loop already running
最佳答案
我还看到了其他几篇有关熊猫如何不支持asyncio的文章,也许我只是想念一幅大图,但是如果我执行就地操作就没关系了吗?
就地操作是那些modify existing data的操作。那是效率问题,而您的目标似乎是并行化,这是完全不同的问题。
Pandas不支持asyncio不仅是因为尚未实现,而且还因为Pandas通常不执行asyncio支持的那种操作:网络和子进程IO。熊猫函数要么使用CPU要么等待磁盘访问,但这两种都不适合异步。 Asyncio允许使用看起来像普通同步代码的协程来表达网络通信。在协程内部,对每个阻止操作(例如,网络读取)进行await
操作,如果尚无可用数据,则会自动暂停整个任务。在每次此类暂停时,系统都会切换到下一个任务,从而有效地创建协作式多任务系统。
尝试调用不支持异步的库(例如pandas)时,表面上看起来会起作用,但是您不会获得任何好处,并且代码将串行运行。例如:
async def loop_dfs(dfs):
async def clean_df(df):
...
tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
由于
clean_df
不包含await
的单个实例,因此它只是名称上的协程-它从不实际挂起其执行以允许其他协程运行。因此,await asyncio.wait(tasks)
将按顺序运行任务,就像您编写的一样:for table, df in dfs.items():
clean_df(df)
为了使事情并行运行(所提供的熊猫有时会在操作期间释放GIL),您应该将各个CPU绑定的函数移交给线程池:
async def loop_dfs(dfs):
def clean_df(df): # note: ordinary def
...
loop = asyncio.get_event_loop(0
tasks = [loop.run_in_executor(clean_df, df)
for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)
如果沿着这条路线走,则首先不需要asyncio,只需使用
concurrent.futures
。例如:def loop_dfs(dfs): # note: ordinary def
def clean_df(df):
...
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(clean_df, df)
for (table, df) in dfs.items()]
concurrent.futures.wait(futures)
想我会先尝试这个,但我不断得到
RuntimeError: Event loop already running
该错误通常表示您已经在已经为您运行asyncio的环境(例如jupyter笔记本)中启动了脚本。如果是这种情况,请确保使用股票
python
运行脚本,或查阅笔记本的文档,了解如何更改代码以将协程提交到已运行的事件循环。关于python - Asyncio Pandas 与就地,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52340476/