问题描述
我想使用带有collect()的信号量来限制api调用.我认为我必须使用create_task(),但是会遇到一个运行时错误:"RuntimeError:await未与将来一起使用".我该如何解决?
I want to use a semaphore with a gather() to limit api calls. I think I have to use create_task() but I obtain a runtime error: "RuntimeError: await wasn't used with future". How can I fix it?
代码如下:
import asyncio
# pip install git+https://github.com/sammchardy/python-binance.git@00dc9a978590e79d4aa02e6c75106a3632990e8d
from binance import AsyncClient
async def catch_up_aggtrades(client, symbols):
tasks = asyncio.create_task(get_historical_aggtrades(client, symbol) for symbol in symbols)
sem = asyncio.Semaphore(1)
async with sem:
await asyncio.gather(*tasks)
async def get_historical_aggtrades(client, symbol):
async for trade in client.aggregate_trade_iter(symbol, '1 day ago UTC'):
print(f"symbol {symbol}")
async def main():
client = await AsyncClient.create()
symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
await catch_up_aggtrades(client, symbols)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
推荐答案
限制资源使用的方法实际上是一个非常简单的概念.这类似于计算免费停车场.(汽车进入时为-1,离开汽车时为+1).当计数器降为零时,等待的汽车队列开始建立.
A sempahore limiting a resource usage is actually a very simple concept. It is similar to counting free parking lots. (-1 when a car enters, +1 when it leaves). When the counter drops to zero, a queue of waiting cars starts to build.
这意味着:
- 每个资源一个信号量
- 初始值=并发资源用户的上限
- 每种资源的使用由
与sem异步进行保护:
现有代码:
sem = asyncio.Semaphore(1)
async with sem:
await asyncio.gather(*tasks)
将 asyncio.gather
的使用限制为一次只能收集1个任务.它不限制任务,仅限制任务的收集.由于 gather
始终只被调用一次,因此该信号量不会改变任何内容.
limits the use of asyncio.gather
to 1 task gathering at a time. It does not limit the tasks, just their gathering. Since the gather
is called just once anyway, the semaphore does not change anything.
您的程序可能更改为(包括注释中解决的问题):
Your program might be changed to (including the issue resolved in comments):
LIMIT = 1
async def catch_up_aggtrades(client, symbols):
sem = asyncio.Semaphore(LIMIT)
tasks = [asyncio.create_task(get_historical_aggtrades(client, symbol, sem)) for symbol in symbols]
await asyncio.gather(*tasks)
async def get_historical_aggtrades(client, symbol, sem):
async with sem:
async for trade in client.aggregate_trade_iter(symbol, '1 day ago UTC'):
print(f"symbol {symbol}")
这篇关于python asyncio-RuntimeError:未来未使用await的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!