问题描述
为了使资源安全,我只是资助自己实施基于计时器的将事件列表作为一堆处理"版本-再次-我想知道是否存在一种不错的通用pythonic方法.
I just fund myself implementing a timer-based version of "handle a list of events as a bunch" in order to safe resources - again - and I'm wondering whether there is a nice common pythonic approach.
您可能知道这一点:您正在处理重复发生的事件,例如鼠标移动,文件系统更改等,并且必须对这些事件做出一些反应,但是如果您可以稍稍休息一下,那将是很好的一堆事件来处理它们.可能是因为较旧的事件因较新的事件而失效(足以处理最旧的事件),或者因为可以以某种方式将事件压缩在一起.
You probably know this: you're handling recurring events like mouse movements, file system changes etc. and you have to do some calculation as a reaction to those events but it would be great if you could use a little break in the stream of events to handle them in a bunch. Maybe because older events get invalidated by newer events (and it's enough to handle the oldest ones) or because events can somehow be squashed together.
示例包括:鼠标移动(仅绘制最新位置),编辑器中的自动保存"或文件系统上的自动同步,或者(在我的示例中)监视文件系统的更改并重新编译某些内容.
Examples are: mouse movements (draw only latest position), "auto save" in editors or auto-sync on file systems, or (in my example) monitoring file system changes and re-compile something.
通常,我查找如何使用Timer
并思考如何避免多余的线程,并提出一些半成品但复杂的解决方案,以解决一个非常简单的问题.出现了很多问题:
Usually I look up how to use a Timer
and think about how I could avoid an extra thread and come up with some semi-finished but complex solution for a - in my eyes - very simple problem.Lot of questions arise:
- 如何避免并发处理(例如,如果我使用
threading.Timer
并启动执行工作的线程) - 如何确保有时间限制要处理事件(连续不断地输入事件而不会中断)
- 如果可能的话如何避免线程
- 如何避免创建过于复杂的框架
- (您命名)
- how to avoid concurrent handling (e.g. if I use a
threading.Timer
and start a thread doing the work) - how to make sure there is a time limit for events to be handled (in case of a continuous incoming of events without break)
- how to avoid threads if possible
- how to avoid creating an overly-complex framework
- (you name it)
我想要的是这样的东西:
What I'd like to have is something which works like this:
timer = SomeContinuousTimer()
new_events = []
while True:
event = wait_for(inotify_adapter.event_gen(), timer.timeout())
if event == timer.TIMEOUT:
my_handler_func(new_events)
else:
new_events.append(event)
timer.restart(1500)
但是wait_for
必须像select
一样工作,为此,我需要文件描述符,并且上面的代码已经超出了我的实际预期.
But wait_for
would have to act like select
and for this I'd need file descriptors and the above code is already a bit more than I would actually expect it to be.
我真的很高兴拥有这样的东西:
What I would be really glad about to have would be used like this:
bunch_handler = BunchHandler()
new_events = []
def read_events():
for event in inotify_adapter.event_gen():
new_events.append(event)
while True:
# will run `read_events` asynchronously until 1.5sec have passed since the
# last event
bunch_handler.read(read_fn=read_events, bunch_wait=1500)
handle_events(new_events)
这是我应该使用async
/await
的典型情况吗?如果没有async
选项,是否有框架?是否有用于此确切场景的异步框架?
Is this a typical scenario I should use async
/ await
for? Are there frameworks for the case where async
is not an option? Is there an async framework for this exact scenario?
推荐答案
这不是很好,但是它可以满足我的要求,并且可以作为示例说明我在说什么:)
This is not nice but it does what I want and might act as an example which shows, what I'm talking about :)
import asyncio
import time
async def event_collector(*, listener_fn, bunch_wait=1.0, max_wait=2.0):
"""Wait for (but don't handle) events and wait for a maximum of @bunch_wait seconds after the
last event before returning. Force return after @max_wait seconds"""
max_time_task = asyncio.Task(asyncio.sleep(max_wait))
while True:
resetable = asyncio.Task(asyncio.sleep(bunch_wait))
done, _ = await asyncio.wait(
{listener_fn.__anext__(), resetable, max_time_task},
return_when=asyncio.FIRST_COMPLETED)
if resetable in done or max_time_task in done:
return
resetable.cancel()
async def generate_events(events):
"""Simulates bursts of events with side-effects"""
while True:
for i in range(5):
await asyncio.sleep(.01)
events.append(i)
print("*" * len(events))
yield
await asyncio.sleep(3.200)
def handle_events(events):
"""Simulates an event handler operating on a given structure"""
print("Handle %d events" % len(events))
events.clear()
async def main():
new_events = []
t = time.time()
while True:
await event_collector(listener_fn=generate_events(new_events), bunch_wait=1.1, max_wait=2.2)
now = time.time()
print("%.2f" % (now - t))
t = now
handle_events(new_events)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这种方法有一些缺点:*您需要使用async
异步侦听事件* event_collector将在max_wait
秒后返回,无论是否已经看到任何事件(因此,如果没有事件发生,它就像超时)*代替重置计时器,每次都会创建一个新计时器
This approach has some shortcomings:* you need to listen for events asynchronously using async
* event_collector will return after max_wait
seconds regardless whether any events have been seen yet (so it acts like a timeout if no events occur)* instead of resetting a timer, a new one gets created every time
这篇关于最佳实践是“一堆收集并执行".在Python中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!