函数装饰器(function decorator)可以对函数进行“标注”,给函数提供更多的特性。
在理解装饰器之前需要理解闭包(closure)。Python3.0 引入了保留关键字 nonlocal,使用闭包同样也离不开 nonlocal。顺便说一句,闭包除了用在装饰器上,对于异步编程也是很重要的概念。
装饰器(decorator)是一个可调用的装饰函数,它接收另一个函数作为参数。
假设已经定义好了一个装饰器 decorate(decorate 实际上是一个接收函数并且返回函数的函数),那么以下两段代码是等价的。
@decorate
def target():
print('running target()')
和
def target():
print('running target()') target = decorate(target)
可以看到,@标注这种语法实际上是一个语法糖。target 经过标注后已经成为了另一个函数 decorate(target)。
我们再看看一个实际定义 decorate 的例子:
def decorator(func):
def inner():
print('running inner()')
return inner @decorator
def target1():
print('running target1()') def target2():
print('running target2()') inner_func1 = target1()
print(inner_func1)
print('-' * 10)
print(target1)
print('*' * 10)
inner_func2 = decorator(target2)()
print(inner_func2)
print('-' * 10)
print(decorator(target2))
输出:
running inner()
None
----------
<function decorator.<locals>.inner at 0x10ae3f598>
**********
running inner()
None
----------
<function decorator.<locals>.inner at 0x10aee7510>
根据代码和结果进一步验证我们的理解。通过装饰器函数的标注,一个函数可以变为另一个函数。至于怎么转换的,是根据装饰器函数本身定义的。装饰器函数的输入和输出都是函数,它定义了函数的变换。
装饰器的执行顺序
当定义一个函数 A 时,如果它用了装饰标注 B,那么 A 在定义时就已经执行了装饰器 B 的代码,而不是在调用函数 A 时执行。即:
def decorator(func):
print('running decorator(func)')
def inner():
print('running inner()')
return inner @decorator
def target():
print('target()') print('before calling target.')
target()
等价于
def decorator(func):
print('running decorator(func)')
def inner():
print('running inner()')
return inner def target():
print('target()') target = decorator(target)
print('before calling target.')
target()
其结果都是:
running decorator(func)
before calling target.
running inner()
因为这个原因,装饰器往往在导入模块的时候就会执行(import time),而被装饰函数(装饰器返回的函数)是在显式调用的时候执行(runtime)。
不改变原函数的装饰器
大多数装饰器往往都会改变原函数,但也有一些应用场景不会改变原函数。例如:
registry = [] def register(func):
registry.append(func)
return func
这种装饰器会收集使用过它的函数。
变量范围规则
以下代码会因为变量 b 没有定义而报错:
def f1(a):
print(a)
print(b) f1(3)
3
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-28-25d665eb58d1> in <module>
3 print(b)
4
----> 5 f1(3) <ipython-input-28-25d665eb58d1> in f1(a)
1 def f1(a):
2 print(a)
----> 3 print(b)
4
5 f1(3) NameError: name 'b' is not defined
而下面代码因为全局变量 b 的存在而不会报错。
b = 6
f1(3)
3
6
接下来才是重点,以下代码会报错:
b = 6
def f2(a):
print(a)
print(b)
b = 9 f2(3)
3
---------------------------------------------------------------------------
UnboundLocalError Traceback (most recent call last)
<ipython-input-30-55c0dd1a1ffb> in <module>
5 b = 9
6
----> 7 f2(3) <ipython-input-30-55c0dd1a1ffb> in f2(a)
2 def f2(a):
3 print(a)
----> 4 print(b)
5 b = 9
6 UnboundLocalError: local variable 'b' referenced before assignment
这是因为 Python 在编译函数时,发现了 b 在函数内进行了赋值,因此它认为 b 是一个局部变量。于是生成的字节码会认为 b 是局部变量,会试图在局部环境中找到 b,于是报错了。
如果需要修复这个问题,需要进行显式地全局声明:
b = 6
def f3(a):
global b
print(a)
print(b)
b = 9 f3(3)
3
6
闭包
由于匿名函数经常在函数内定义一个函数,而闭包也用到了嵌套函数,所以两者经常混淆。
但是实际上,闭包的关注点不在于匿名与否。闭包是这样一个函数,它在函数内绑定了一个函数外的非全局变量。
我们看看这样一个计算平均数的闭包。
def make_averager():
series = [] def averager(new_value):
series.append(new_value)
total = sum(series)
return total/len(series) return averager avg = make_averager()
avg(10)
avg(15)
avg(20)
输出:
10.0
12.5
15.0
这里的 series 称之为自由变量(free variable),尽管 make_averager 已经调用完了,但是 series 依然包含在闭包中而没有销毁,averager 函数依然可以使用它。
查看 avg 的变量:
avg.__code__.co_varnames
avg.__code__.co_freevars
输出:
('new_value', 'total')
('series',)
所以,闭包就是包含了自由变量的一个函数。
为了便于类比,我们再看看一个基于类的实现:
class Averager(): def __init__(self):
self.series = [] def __call__(self, new_value):
self.series.append(new_value)
total = sum(self.series)
return total/len(self.series) avg = Averager()
avg(10)
avg(15)
avg(20)
输出:
10.0
12.5
15.0
nonlocal
一个更优雅的闭包,避免存储一个列表:
def make_averager():
count = 0
total = 0 def averager(new_value):
nonlocal count, total
count += 1
total += new_value
return total / count return averager avg = make_averager()
avg(10)
avg(15)
avg(20)
这里使用了 nonlocal,表示该变量不是局部变量。这个关键字是用于处理前面提到的变量范围规则:当函数有赋值语句时,Python 会认为这个变量是局部变量。显然我们应该让 count 和 total 作为 averager 外的自由变量,于是需要加上 nonlocal 关键字。
实现一个简单的装饰器
以下是一个计时装饰器的示例:
import time def clock(func):
def clocked(*args, **kwargs):
"""
clocked doc
"""
t0 = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - t0
name = func.__name__
arg_str = ', '.join(repr(arg) for arg in args)
print('[%0.8fs] %s(%s) -> %r' % (elapsed, name, arg_str, result))
return result
return clocked @clock
def snooze(seconds):
"""
snooze doc
"""
time.sleep(seconds) @clock
def factorial(n):
"""
factorial doc
"""
return 1 if n < 2 else n*factorial(n-1) print('*' * 40, 'Calling snooze(.123)')
snooze(.123)
print('*' * 40, 'Calling factorial(6)')
print('6! =', factorial(6))
输出:
**************************************** Calling snooze(.123)
[0.12775655s] snooze(0.123) -> None
**************************************** Calling factorial(6)
[0.00000100s] factorial(1) -> 1
[0.00006883s] factorial(2) -> 2
[0.00012012s] factorial(3) -> 6
[0.00016687s] factorial(4) -> 24
[0.00022555s] factorial(5) -> 120
[0.00030625s] factorial(6) -> 720
6! = 720
其结果是不言而喻的,装饰器对原函数进行了包装,变为一个新函数,增加了计时信息。以上的装饰器有一些小缺陷:
snooze.__name__
snooze.__doc__
factorial.__name__
factorial.__doc__
'clocked'
'\n clocked doc\n '
'clocked'
'\n clocked doc\n
可以看到装饰器“污染”了被装饰函数的一些属性。
使用 functools.wrap:一种更优雅的做法:
import time
import functools def clock(func):
@functools.wraps(func)
def clocked(*args, **kwargs):
t0 = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['%s=%r' % (k, w) for k, w in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))
arg_str = ', '.join(arg_lst)
print('[%0.8fs] %s(%s) -> %r ' % (elapsed, name, arg_str, result))
return result
return clocked @clock
def snooze(seconds):
"""
snooze doc
"""
time.sleep(seconds) @clock
def factorial(n):
"""
factorial doc
"""
return 1 if n < 2 else n*factorial(n-1) print('*' * 40, 'Calling snooze(.123)')
snooze(.123)
print('*' * 40, 'Calling factorial(6)')
print('6! =', factorial(6))
snooze.__name__
snooze.__doc__
factorial.__name__
factorial.__doc__
输出:
**************************************** Calling snooze(.123)
[0.12614298s] snooze(0.123) -> None
**************************************** Calling factorial(6)
[0.00000119s] factorial(1) -> 1
[0.00012970s] factorial(2) -> 2
[0.00022101s] factorial(3) -> 6
[0.00031495s] factorial(4) -> 24
[0.00039506s] factorial(5) -> 120
[0.00047684s] factorial(6) -> 720
6! = 720
'snooze'
'\n snooze doc\n '
'factorial'
'\n factorial doc\n
标准库中的装饰器
Python 有 3 种用于装饰方法的内置函数:
- property
- classmathod
- staticmethod
另一种常见的装饰器是 functools.wraps。
标准库还有两个有意思的装饰器(在 functools 中定义)是:
- lru_cache
- singledispatch
functools.lru_cache
functools.lru_cache 实现了记忆功能。LRU 表示 Least Recently Used。我们看看这个装饰器如何加速 fibonacci 的递归。
普通方法:
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-2) + fibonacci(n-1) print(fibonacci(6))
输出:
[0.00000000s] fibonacci(0) -> 0
[0.00000310s] fibonacci(1) -> 1
[0.00028276s] fibonacci(2) -> 1
[0.00000095s] fibonacci(1) -> 1
[0.00000000s] fibonacci(0) -> 0
[0.00000167s] fibonacci(1) -> 1
[0.00007701s] fibonacci(2) -> 1
[0.00015092s] fibonacci(3) -> 2
[0.00051212s] fibonacci(4) -> 3
[0.00000095s] fibonacci(1) -> 1
[0.00000000s] fibonacci(0) -> 0
[0.00000095s] fibonacci(1) -> 1
[0.00007415s] fibonacci(2) -> 1
[0.00014782s] fibonacci(3) -> 2
[0.00000095s] fibonacci(0) -> 0
[0.00000095s] fibonacci(1) -> 1
[0.00007510s] fibonacci(2) -> 1
[0.00000119s] fibonacci(1) -> 1
[0.00000095s] fibonacci(0) -> 0
[0.00000000s] fibonacci(1) -> 1
[0.00007606s] fibonacci(2) -> 1
[0.00015116s] fibonacci(3) -> 2
[0.00030208s] fibonacci(4) -> 3
[0.00052595s] fibonacci(5) -> 5
[0.00111508s] fibonacci(6) -> 8
8
使用 lru_cache:
@functools.lru_cache()
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-2) + fibonacci(n-1) print(fibonacci(6))
[0.00000095s] fibonacci(0) -> 0
[0.00000191s] fibonacci(1) -> 1
[0.00041223s] fibonacci(2) -> 1
[0.00000215s] fibonacci(3) -> 2
[0.00048995s] fibonacci(4) -> 3
[0.00000215s] fibonacci(5) -> 5
[0.00056982s] fibonacci(6) -> 8
8
泛型函数:singledispatch
泛型函数:一组用不同方式(依据第一个参数的类型)执行相似操作的函数。
代码感受一下:
from functools import singledispatch
from collections import abc
import numbers
import html @singledispatch
def htmlize(obj):
content = html.escape(repr(obj))
return '<pre>{}</pre>'.format(content) @htmlize.register(str)
def _(text):
content = html.escape(text).replace('\n', '<br>\n')
return '<p>{0}</p>'.format(content) @htmlize.register(numbers.Integral)
def _(n):
return '<pre>{0} (0x{0:x})</pre>'.format(n) @htmlize.register(tuple)
@htmlize.register(abc.MutableSequence)
def _(seq):
inner = '</li>\n<li>'.join(htmlize(item) for item in seq)
return '<ul>\n<li>' + inner + '</li>\n</ul>' htmlize({1, 2, 3})
htmlize(abs)
htmlize('Heimlich & Co.\n- a game')
htmlize(42)
print(htmlize(['alpha', 66, {3, 2, 1}]))
输出:
'<pre>{1, 2, 3}</pre>'
'<pre><built-in function abs></pre>'
'<p>Heimlich & Co.<br>\n- a game</p>'
'<pre>42 (0x2a)</pre>'
<ul>
<li><p>alpha</p></li>
<li><pre>66 (0x42)</pre></li>
<li><pre>{1, 2, 3}</pre></li>
</ul>
装饰器的串联
顾名思义,一个函数可以被多个装饰器修饰。
@d1
@d2
def f():
print('f')
等价于
def f():
print('f') f = d1(d2(f))
含参的装饰器
我们知道,当函数被装饰器装饰的时候,实际上是作为参数传入给了装饰器。要实现含参装饰器,需要构建一个装饰器工厂函数,这个函数接收参数,返回一个装饰器。说白了,就是又多了一层函数嵌套,写一个返回装饰器的函数。
下面提供一些例子作为参考。
注册器
registry = set() def register(active=True):
def decorate(func):
print('running register(active=%s)->decorate(%s)'
% (active, func))
if active:
registry.add(func)
else:
registry.discard(func) return func
return decorate @register(active=False)
def f1():
print('running f1()') @register()
def f2():
print('running f2()') def f3():
print('running f3()') registry
register()(f3)
registry
register(active=False)(f2)
registry
输出:
running register(active=False)->decorate(<function f1 at 0x10aef8510>)
running register(active=True)->decorate(<function f2 at 0x10aef8950>)
{<function __main__.f2()>}
running register(active=True)->decorate(<function f3 at 0x10ad791e0>)
<function __main__.f3()>
{<function __main__.f2()>, <function __main__.f3()>}
running register(active=False)->decorate(<function f2 at 0x10aef8950>)
<function __main__.f2()>
{<function __main__.f3()>}
计时器
import time DEFAULT_FMT = '[{elapsed:0.8f}s] {name}({args}) -> {result}' def clock(fmt=DEFAULT_FMT):
def decorate(func):
def clocked(*_args):
t0 = time.time()
_result = func(*_args)
elapsed = time.time() - t0
name = func.__name__
args = ', '.join(repr(arg) for arg in _args)
result = repr(_result)
print(fmt.format(**locals()))
return _result
return clocked
return decorate @clock()
def snooze(seconds):
time.sleep(seconds) for i in range(3):
snooze(.123)
[0.12674093s] snooze(0.123) -> None
[0.12725592s] snooze(0.123) -> None
[0.12320995s] snooze(0.123) -> None
UDP 客户端/服务器
"""
UDP client/server decorator UDP client: to send data.
UDP server: to perform operation on the frame it receives.
"""
import functools
import socket
import json
import time def process_udp_server(ip='0.0.0.0', port=8999, data_size=1024 * 10):
"""
UDP server decorator
:param ip:
:param port:
:param data_size:
:return:
"""
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind((ip, port))
print(f'UDP server started at {str(ip) + ":" + str(port)}.') def start_server(func):
@functools.wraps(func)
def processed(*args, **kwargs):
while True:
data = server.recv(data_size)
data = json.loads(data.decode())
res = func(data, *args, **kwargs)
if res == -1:
break return processed return start_server def camera_udp_client(ip, port):
"""
UDP client decorator
:param ip:
:param port:
:return:
"""
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def start_client(func):
@functools.wraps(func)
def send_data(*args, **kwargs):
data = func(*args, **kwargs)
client.sendto(str.encode(json.dumps(data)), (ip, port)) return send_data return start_client if __name__ == '__main__':
import argparse parser = argparse.ArgumentParser()
parser.add_argument('-m', '--mode', type=str, help='server/client mode', default='server')
parser.add_argument('-i', '--ip', type=str, help='IP address', default='0.0.0.0')
parser.add_argument('-p', '--port', type=int, help='UDP port', default=8999)
parser.add_argument('-c', '--camera', type=int, help='camera number', default=0)
args = parser.parse_args() if args.mode == 'server':
@process_udp_server(args.ip, args.port, 1024 * 1024)
def multiply(x):
time.sleep(1)
print(x * 2) multiply()
elif args.mode == 'client':
@camera_udp_client(args.ip, args.port)
def send_single_data(x):
return x while True:
send_single_data(8)
time.sleep(1)
else:
print('python udp_decorator.py -m [server|client]')
参考
- 《Fluent Python》by Luciano Ramalho