问题描述
所以我问了这个问题,并尝试了ProcessPoolExecutor方法.我用装饰器建议的以下方式:
So I asked this question and tried the ProcessPoolExecutor approach. I used the decorator suggested the following way:
import asyncio
import functools
from concurrent import futures
from app.exceptions.errors import ManipulationError
_pool = futures.ProcessPoolExecutor()
def executor(function):
@functools.wraps(function)
def decorator(*args, **kwargs):
try:
partial = functools.partial(function, *args, **kwargs)
loop = asyncio.get_event_loop()
return loop.run_in_executor(_pool, partial)
except Exception as e:
raise ManipulationError(str(e))
return decorator
然后我将其用于类似的功能上:
I then used it on a function like:
@executor
@pil
def blur(image):
frame = image.convert("RGBA")
return frame.filter(ImageFilter.BLUR)
请注意, @pil
是我制作的另一个装饰器.
Note the @pil
is another decorator I made.
def pil(function):
@functools.wraps(function)
def wrapper(image, *args, **kwargs) -> BytesIO:
img = PILManip.pil_image(image)
if img.format == "GIF":
frames = []
for frame in ImageSequence.Iterator(img):
res_frame = function(frame, *args, **kwargs)
frames.append(res_frame)
return PILManip.pil_gif_save(frames), "gif"
elif img.format in ["PNG", "JPEG"]:
img = function(img, *args, **kwargs)
return PILManip.pil_image_save(img), "png"
else:
raise BadImage("Bad Format")
return wrapper
我在FastApi路线中这样称呼它:
I called it in a FastApi route like so:
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
byt = await Client.image_bytes(url)
img, image_format = await blur(byt)
return Response(img.read(), media_type=f"image/{image_format}")
我在腌制中遇到一些错误.
I get some error about pickling.
500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 391, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "/home/codespace/workspace/dagpi-image/app/middleware/timer.py", line 8, in add_process_time_header
response = await call_next(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
task.result()
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 56, in dispatch
raise e from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 52, in dispatch
response = await call_next(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
task.result()
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
raw_response = await run_endpoint_function(
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File "/home/codespace/workspace/dagpi-image/app/routes/image_routes.py", line 107, in blur_image
img, image_format = await blur(byt)
File "/opt/python/3.8.6/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/python/3.8.6/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function blur at 0x7f87524091f0>: it's not the same object as app.image.pil_manipulation.blur
有人知道为什么这种情况持续发生吗?有人告诉我对象必须是可序列化的,我相信BytesIO是映像的唯一输入/输出.那应该是可序列化的.
Does someone know why this keeps happening?I was told the objects have to be serializable, I believe BytesIO is the only in/out of the image. That should be serializable.
推荐答案
装饰器通常会产生包装的函数,因为它们包含隐藏状态,因此不容易腌制(序列化).处理多处理时,应避免使用修饰符,并将普通的全局函数发送给 run_in_executor
.例如,您可以将 executor
装饰器重新编写为实用程序函数:
Decorators typically produce wrapped functions that aren't easy to pickle (serialize) because they contain hidden state. When dealing with multiprocessing, you should avoid decorators and send ordinary global functions to run_in_executor
. For example, you could re-write your executor
decorator into a utility function:
_pool = concurrent.futures.ProcessPoolExecutor()
async def exec_async(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_pool, fn, *args)
您可以使用 await exec_async(some_function,arg1,arg2,...)
来等待它,而不是用 executor
装饰一个函数.同样,您可以将 pil
装饰器重写为另一个实用程序:
Instead of decorating a function with executor
, you can just await it using await exec_async(some_function, arg1, arg2, ...)
. Likewise, you can rewrite the pil
decorator into another utility:
def pil(image, transform):
img = PILManip.pil_image(image)
if img.format == "GIF":
frames = []
for frame in ImageSequence.Iterator(img):
res_frame = transform(frame)
frames.append(res_frame)
return PILManip.pil_gif_save(frames), "gif"
elif img.format in ["PNG", "JPEG"]:
img = transform(img)
return PILManip.pil_image_save(img), "png"
else:
raise BadImage("Bad Format")
blur
的实现现在变成了调用 pil
的普通函数,可以安全地传递给 exec_async 代码>:
The implementation of blur
now becomes an ordinary function which calls pil
, and which can be safely passed to exec_async
:
def blur(image):
def transform(frame):
frame = frame.convert("RGBA")
return frame.filter(ImageFilter.BLUR)
return pil(image, transform)
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
byt = await Client.image_bytes(url)
img, image_format = await exec_async(blur, byt)
return Response(img.read(), media_type=f"image/{image_format}")
注意:以上代码未经测试.
Note: the above code is untested.
这篇关于酸洗Python函数失败,装饰器中出现ProcessPoolExecutor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!