文章目录
助记提要
- Lua脚本载入的参数;
- Lua脚本返回值的转换;
- Lua脚本是原子操作;
- Lua脚本可以提升锁性能的原理;
- Lua脚本替代事务的原因;
- 分片列表的构成;
- 分片列表的推入、弹出操作;
11章 Lua脚本编程
Lua脚本扩展Redis + 提升Redis性能
11.1 不编写C的情况下添加新功能
当需要添加Redis没有的高级功能时,只能通过客户端编写代码,或修改Redis的C源代码实现。
Lua脚本载入Redis
脚本载入Redis需要用到SCRIPT LOAD
命令。该命令接收字符串格式的Lua脚本为,然后将脚本存储,返回被存储脚本的SHA1校验和。
之后只要调用EVALSHA
命令,输入脚本的SHA1校验和,就能调用存储的脚本。
def script_load(script):
# 已载入脚本的SHA1校验和,便于后续调用
sha = [None]
def call(conn, keys=[], args=[], force_eval=False):
if not force_eval:
if not sha[0]:
# 如果未载入,就载入脚本
sha[0] = conn.execute_command("SCRIPT", "LOAD", script, parse="LOAD")
try:
# 使用缓存的SHA1校验和执行命令
return conn.execute_command("EVALSHA", sha[0], len(keys), *(keys+args))
except redis.exceptions.ResponseError as msg:
# 抛出脚本缺失之外的异常
if not msg.args[0].startswith("NOSCRIPT"):
raise
# 脚本错误或要求强制执行时,使用EVAL命令执行指定脚本。
# EVAl也会把脚本缓存起来,并且产生的SHA1校验和跟EVALSHA相同
return conn.execute_command("EVAL", script, len(keys), *(keys+args))
# call函数在被调用时会自动载入和执行脚本
return call
-
call的参数
keys表示脚本可能会读取或写入的键。软件层在有需要的时候,可能会检查这些键是否位于同一分片里。有自动分片功能的Redis集群在执行脚本前,也会对脚本将要访问的键进行检查,如果它们不是位于同一个服务器里面,Redis就会返回错误。脚本在尝试对不可用的键进行读取或写入时,Redis集群可以拒绝。
args是脚本内部使用的其他参数组成的列表。
force_eval当需要在流水线和事务中执行脚本时,会很有用。 -
脚本缺失
脚本缺失是指函数存了SHA1校验和,但是服务器却没有存储对应的脚本。
在服务器重启或使用SCRIPT FLUSH
命令清空脚本缓存,或提供不同的Redis服务器连接时都会出现脚本缺失。
Python对Lua脚本返回值的转换
Lua脚本有些数据类型在传给Python后会做相应的修改:
Lua脚本是原子操作
Redis一次只会执行一个命令,每个单独的命令都是原子的。
MULTI/EXEC命令组成的事务是原子的,EVAL和EVALSHA两个命令也是原子的,执行时不会受到其他命令的干扰。
- 停止正在运行的Lua脚本
EVAL和EVALSHA执行的Lua脚本可能永远不会返回值,导致其他客户端无法正常执行命令。
对于不执行写命令的脚本,用户可以在脚本运行时间超过lua-time-limit
配置项指定的时间后,执行SCRIPT KILL
命令杀死正在运行的脚本。
但是已经执行了写入的Lua脚本,杀死脚本可能会造成数据不一致的状态。用户只能使用SHUTDOWN NOSAVE
命令停止Redis服务器,Redis丢掉最近一次创建快照后的变化。
因此,Lua脚本必须测试后才可以进入生产环境。
使用Lua脚本创建新的状态消息
之前实现的状态发布操作:
def create_status(conn, uid, message, **data):
pipeline = conn.pipeline(True)
pipeline.hget('user:%s' % uid, 'login')
# 新建消息id
pipeline.incr('status:id:')
login, id = pipeline.execute()
# 验证账号存在
if not login:
return None
data.update({
'message': message, 'posted': time.time(),
'id': id, 'uid': uid, 'login': login,
})
pipeline.hmset('status:%s', % id, data)
pipeline.hincrby('user:%s' % uid, 'posts')
pipeline.execute()
return id
使用Lua脚本改写之前的创建消息的函数:
def create_status(conn, uid, message, **data):
args = [
'message': message,
'posted': time.time(),
'uid': uid,
]
for key, value in data.iteritems():
args.append(key)
args.append(value)
return create_status_lua(conn, ['user:%s' % uid, 'status:id:'], args)
create_status_lua = script_load("""
-- 根据用户ID获取用户名。Lua的表格(列表)索引是从1开始的。
local login = redis.call('hget', keys[1], 'login')
-- 未登录不执行后续操作
if not login then
return false
end
-- 获取新的状态消息ID
local id = redis.call('incr', KEYS[2])
local key = string.format('status:%s', id)
-- 设置状态消息的数据
redis.call('hmset', key, 'login', login, 'id', id, unpack(ARGV))
redis.call('hincrby', KEYS[1], 'posts', 1)
return id
""")
新的消息发布操作第一次执行前需要载入Lua脚本,之后直接调用载入的脚本即可。
新的函数和之前操作是完全一样的,但是每次发布状态的通信次数从两次变为1次。对于大多程序来说,多次通信会花费不必要的时间,甚至造成WATCH/MULTI/EXEC事务冲突。
11.2 用Lua重写锁和信号量
高流量场景下,使用锁和信号量可以减少WATCH/MULTI/EXEC事务带来的竞争问题。
但是获取和释放锁至少会做两次的通信,并且锁本身也可能出现冲突。
使用Lua实现锁的原因
- 脚本如果对未包含在KEYS参数中的键进行了读取或写入,可能会在程序迁移到Redis集群时不兼容。
- 处理Redis数据时,程序可能会需要一些无法在最开始的调用中取到的数据。不加锁的话,多个同时取数到Redis的操作会有更多额外消耗,还可能导致新数据被旧数据覆盖。
Lua重写锁
之前的获取锁的操作。粗腰处理各种失败和重试的情况。
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
identifier = str(uuid.uuid4())
lockname = 'lock:' + lockname
lock_timeout = int(math.cell(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
if conn.setnx(lockname, identifier):
# 获取锁并设置过期时间
conn.expire(lockname, lock_timeout)
return identifier
elif not conn.ttl(lockname):
# 检查过期时间,有需要就更新
conn.expire(lockname, lock_timeout)
time.sleep(.001)
return False
改写获取锁的操作
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
identifier = str(uuid.uuid4())
lockname = 'lock:' + lockname
lock_timeout = int(math.cell(lock_timeout))
acquired = False
end = time.time() + acquire_timeout
while time.time() < end and not acquired:
# 实际的锁获取操作
acquired = acquire_lock_with_timeout_lua(conn, [lockname], [lock_timeout, identifier]) == 'OK'
time.sleep(.001 * (not acquired))
return acquired and identifier
acquire_lock_with_timeout_lua = script_load("""
-- 检测锁是否已经存在
if redis.call('exists', KEYS[1]) == 0 then
-- 使用过期时间和标识符设置键
return redis.call('setex', KEYS[1], unpack(ARGV))
end
""")
def release_lock(conn, lockname, identifier):
lockname = 'lock:' + lockname
return release_lock_lua(conn, [lockname], [identifier])
release_lock_lua = script_load("""
-- 检查锁是否匹配
if redis.call('get', KEYS[1]) == ARGV[1] then
-- 删除锁,并确保脚本总是返回真值
return redis.call('del', KEYS[1]) or True
end
""")
Lua版本的加锁和释放锁的操作不需要执行WATCH/MULTI/EXEC步骤,并且减少了通信往返次数,因此实际性能比原版的要好。并且代码更加简洁。
Lua实现计数信号量
之前实现的获取信号量的函数
def acquire_semaphore(conn, semname, limit, timeout=10):
identifier = str(uuid,uuid4())
now - time.time()
pipeline = conn.pipeline(True)
# 清理过期信号量
pipeline.zremrangebyscore(semname, '-inf', now-timeout)
# 尝试获取信号量
pipeline.zadd(semname, identifier, now)
# 检查是否成功获取
pipeline.zrank(semname, identifier)
if pipeline.execute()[-1] < limit:
return identifier
# 获取失败,删除添加的标识符
conn.zrem(semname, identifier)
return None
Lua重写获取信号量的函数
def acquire_semaphore(conn, semname, limit, timeout=10):
# 取当前时间,用来处理超时信号量
now = time.time()
return acquire_semaphore_lua(conn, [semname], [now-timeout, limit, now, str(uuid.uuid4())])
acquire_semaphore_lua = script_load("""
-- 清除过期的信号量
redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1])
-- 检查是否有剩余信号量可用
if redis.call('zcard', KEYS[1] < tonumber(ARGV[2])) then
-- 时间戳记录在超时有序集合
redis.call('zadd', KEYS[1], ARGV[3], ARGV[4])
return ARGV[4]
end
""")
Lua实现的信号量获取操作,不需要计数器和信号量拥有者集合,因为第一个执行脚本的客户端就是获取信号量的客户端。也不需要使用锁、ZINTERSTORE、ZRANGEBYRANK,所以运行速度变快了很多。
Lua实现信号量刷新函数
def refresh_semaphore(conn, semname, identifier):
# 信号量没被刷新,Lua脚本返回空值,Python会将它转为None
return refresh_semaphore_lua(conn, [semaname], [identifier, time.time()]) != None
refresh_semaphore_lua = script_load("""
-- 信号量存在的话,更新时间戳
if redis.call('zscore', KEYS[1], ARGV[1]) then
return redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) or true
end
""")
11.3 移除WATCH/MULTI/EXEC事务
由WATCH、MULTI和EXEC组成的事务,在只有少数客户端尝试修改被WATCH监视的数据时,事务可以没有冲突或重试的情况下完成。
但是如果操作需要多次通信往返、或冲突几率高,或网络延迟影响的话,客户端可能要重试很多次。
改进自动补全程序
之前的自动补全程序需要使用大量的代码来处理重试的情况
def autocomplete_on_prefix(conn, guild, prefix):
start, end = find_prefix_range(prefix)
identifier = str(uuid.uuid4())
# 计算查找范围的起点和终点
start += identifier
end += identifier
zset_name = 'members:' + guild
# 范围的起始元素和结束元素加到有序集合
conn.zadd(zset_name, start, 0, end, 0)
pipeline = conn.pipeline(True)
while 1:
try:
pipeline.watch(zset_name)
# 获取被插入元素在有序集合的排名
sindex = pipeline.zrank(zset_name, start)
eindex = pipeline.zrank(zset_name, end)
erange = min(sindex + 9, eindex - 2)
pipeline.multi()
# 获取范围内的值,删除之前插入的起始元素和结束元素
pipeline.zrem(zset_name, start, end)
pipeline.zrange(zset_name, sindex, erange)
items = pipeline.execute()[-1]
break
except redis.exceptions.WatchError:
# 自动补全集合被别的客户端修改了,重试
continue
# 如果其他自动补全操作正在执行,就从获取到的元素里面移除起始元素和结束元素
return [item for item in items if '{' not in item]
使用Lua脚本实现自动补全。减少通信往返次数,并且无须担心竞争引发的WATCH错误,更适应高并发的场景。
def autocomplete_on_prefix(conn, guild, prefix):
start, end = find_prefix_range(prefix)
identifier = str(uuid.uuid4())
items = autocomplete_on_prefix_lua(conn, ['members:' + guild], [start + identifier, end + identifier])
return [item for item in items if '{' not in item]
autocomplete_on_prefix_lua = script_load("""
-- 标记起点终点的标识符加到有序集合
redis.call('zadd', KEYS[1], 0, ARGV[1], 0, ARGV[2])
-- 在有序集合找范围元素的位置
local sindex = redis.call('zrank', KEYS[1], ARGV[1])
local eindex = redis.call('zrank', KEYS[1], ARGV[2])
-- 计算想获取的元素所处的范围
eindex = math.min(sindex + 9, eindex - 2)
-- 移除范围元素
redis.call('zrem', KEYS[1], unpack(ARGV))
return redis.call('zrange', KEYS[1], sindex, eindex)
""")
改进商品买卖程序
之前的商品买卖程序,使用了锁代替事务,并且通过调整锁的粒度减少冲突。
def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
buyer = "users:%s" % buyerid
seller = "users:%s" % sellerid
item = "%s.%s" % (itemid, sellerid)
inventory = "inventory:%s" % buyerid
locked = acquire_lock(conn, 'market:')
if not locked:
return False
pipe = conn.pipeline(True)
try:
# 检查商品是否还在,买家是否钱够
pipe.zscore("market:", item)
pipe.hget(buyer, 'funds')
price, funds = pipe.execute()
if price is None or price > funds:
return None
# 买家付钱拿货,卖家收钱
pipe.hincrby(seller, 'funds', int(price))
pipe.hincrby(buyer, 'funds', int(-price))
pipe.sadd(inventory, itemid)
pipe.zrem("market:", item)
pipe.execute()
return True
finally:
# 释放锁
release_lock(conn, 'market:', locked)
使用Lua脚本实现商品买卖,不需要锁,降低了通信次数。
def purchase_item(conn, buyerid, itemid, sellerid):
buyer = "users:%s" % buyerid
seller = "users:%s" % sellerid
item = "%s.%s" % (itemid, sellerid)
inventory = "inventory:%s" % buyerid
return purchase_item_lua(conn, ['market:', buyer, seller, inventory], [item, itemid])
purchase_item_lua = script_load("""
-- 商品价格和买家钱数
local price = tonumber(redis.call('zscore', KEYS[1], ARGV[1]))
local funds = tonumber(redis.call('hget', KEYS[2], 'funds'))
-- 商品在售,且买家钱够
if price and funds and funds >= price then
redis.call('hincrby', KEYS[3], 'funds', price)
redis.call('hincrby', KEYS[2], 'funds', -price)
redis.call('sadd', KEYS[4], ARGV[2])
redis.call('zrem', KEYS[1], ARGV[1])
return true
end
""")
11.4 使用Lua对列表分片
散列、集合和字符串做分片可以降低内存占用(分片后可以使用短结构)。
有序集合做分片能扩展搜索索引的大小,提升搜索操作的性能。
列表可以通过Lua脚本实现分片。分片列表支持两端的推入操作,和阻塞、非阻塞的弹出操作。
分片列表的所有操作命令都能使用WATCH/MULTI/EXEC事务实现,但是由于这些列表操作不仅需要同时对多个键进行处理,还需要对一些事务相关的结构进行处理。所以不适合在事务冲突较多出现的情况。这种情况加锁只能一定程度上减轻问题,只有Lua才能显著提升性能。
分片列表的构成
为了能对分片列表的两端执行推入操作和弹出操作,除了需要存储各个分片外,还需要记录第一个分片和最后一个分片的ID。分片列表为空时,这两个字符串存储的ID是相同的。
分片ID按顺序进行分配。总是左边小,右边大。如果是右进左出,新的分片ID会越来越大,如果是左进右出,新的分片ID会越来越小。
位于分片列表两端的列表可能是未被填满的,但是位于两端之间的列表总是被填满的。这样可以快速地计算分片列表的总长度。
元素推入
def sharded_push_helper(conn, key, *items, **kwargs):
# 元素转列表
items = list(items)
total = 0
# 存在元素需要推入,则使用Lua脚本把元素推入分片列表
while items:
# 每次推入64个元素,可以根据压缩列表的最大长度调整这个值
pushed = sharded_push_lua(conn, [key+':', key+':first', key+':last'], [kwargs['cmd']] + items[:64])
# 统计被推入的元素数量
total += pushed
# 移除已推入的元素
del items[:pushed]
return total
def sharded_lpush(conn, key, *items):
return sharded_push_helper(conn, key, *items, cmd='lpush')
def sharded_rpush(conn, key, *items):
return sharded_push_helper(conn, key, *items, cmd='rpush')
sharded_push_lua = script_load("""
-- 确定每个列表分片的最大长度
local max = tonumber(redis.call('config', 'get', 'list-max-ziplist-entries')[2])
-- 没有元素需要推入或压缩列表的最大程度太小
if #ARGV < 2 or max < 2 then return 0 end
local skey = ARGV[1] == 'lpush' and KEYS[2] or KEYS[3]
local shard = redis.call('get', skey) or '0'
while 1 do
-- 取分片的当前长度
local current = tonumber(redis.call('llen', KEYS[1]..shard))
-- 不超过上限的情况下,这个分片允许推入的元素数量
-- 减1,是保留一个节点的空间,方便后续的阻塞弹出操作
local topush = math.min(#ARGV - 1, max - current - 1)
if topush > 0 then
-- 满足限制条件时,向列表推入尽可能多的元素
redis.call(ARGV[1], KEYS[1]..shard, UNPACK(ARGV, 2, topush+1))
return topush
end
-- 当前分片已满,生成新分片,继续推入
shard = redis.call(ARGV[1] == 'lpush' and 'decr' or 'incr', skey)
end
""")
元素推入的时候,并不清楚是否有客户端在进行阻塞弹出操作,因此推入大量数据时,需要将数据分拆,然后进行多次推入。可以根据自身的压缩列表最大长度来调整每次推入元素的数量。
由于Lua脚本不能提前知道元素会被推入到哪个分片,所以无法在KEYS中记录修改的键。因此上述实现只能在单个Redis服务器上使用。
Lua脚本中的While循环,最多会进行两次,第一次发现分片被填满,第二次把元素推到新的分片里面。
元素弹出
Lua脚本实现元素弹出,需要处理弹出端分片为空的情况,先判断是当前端的分片为空还是整个列表都为空。如果是当前端为空,需要调整弹出分片的位置。
def sharded_lpop(conn, key):
return sharded_list_pop_lua(conn, [key+':', key+':first', key+':last'], ['lpop'])
def sharded_rpop(conn, key):
return sharded_list_pop_lua(conn, [key+':', key+':first', key+':last'], ['rpop'])
sharded_list_pop_lua = script_load("""
-- 需要执行弹出操作的分片
local skey = ARGV[1] == 'lpop' and KEYS[2] or KEYS[3]
-- 不需要执行弹出的分片
local okey = ARGV[1] == 'lpop' and KEYS[2] or KEYS[3]
-- 需要执行弹出操作的分片ID
local shard = redis.call('get', skey) or '0'
-- 弹出一个元素
local ret = redis.call(ARGV[1], KEYS[1]..shard)
-- 空分片没有弹出元素,或者弹出后分片变空
if not ret or redis.call('llen', KEYS[1]...shard) == '0' then
-- 不需要执行弹出的分片ID
local oshard = redis.call('get', okey) or '0'
-- 分片列表的两端ID相同,说明整个列表是空的
if shard == oshard then
return ret
end
-- 根据弹出元素来自左端还是右端,确定分片的ID该增加还是减少
local cmd = ARGV[1] == 'lpop' and 'incr' or 'decr'
-- 调整分片端点
shard = redis.call(cmd, skey)
-- 没有取出元素,则对新分片进行弹出
if not ret then
ret = redis.call(ARGV[1], KEYS[1]..shard)
end
end
return ret
""")
阻塞式弹出
阻塞式弹出表示一直重试直到获取到元素。
-
程序流程
先通过非阻塞弹出操作获取元素,如果成功获取,就完成操作。如果获取失败,就循环执行一些步骤,直到成功取得元素或达到用户指定的时限为止。 -
端点发生变化的处理
由于通信往返存在延迟,程序获取分片列表的两个端点,到程序尝试执行弹出操作期间,列表的端点可能已经变化。
为了处理这个问题,在阻塞弹出之前,先在流水线里进行一次EVAL脚本调用。这个脚本会检查程序是否在从正确的分片中弹出元素。如果是错误的分片,脚本会向那个列表推入一个额外的伪元素,它会被之后的阻塞弹出操作弹出。 -
处理竞争条件
Lua脚本被执行后,阻塞弹出操作被执行之前,如果有另一个客户端也处于这个状态,并同一个分片做了推入或弹出操作,程序就可能获取不正确的数据(新推入),或者阻塞在错误的分片上(刚变空取不到)。为了防止程序被阻塞在错误的分片上,最好限定最大阻塞时间。
对于阻塞弹出取到的数据并非来自列表两端分片的问题,程序会基于这个假设进行操作:数据在两个非事务流水线调用之间到达,就当其是正确的。
# 预先定义的伪元素
DUMMY = str(uuid.uuid4())
# 负责循环尝试获取元素的辅助函数
def sharded_bpop_helper(conn, key, timeout, pop, bpop, endp, push):
# 流水线对象和超时信息
pipe = conn.pipeline(False)
timeout = max(timeout, 0) or 2**64
end = time.time() + timeout
while time.time() < end:
# 执行一次非阻塞式弹出,如果成功取得弹出值并且这个值不是伪元素,就返回
result = pop(conn, key)
if result not in (None, DUMMY):
return result
# 需要执行弹出操作的分片
shard = conn.get(key + endp) or '0'
# Lua脚本会在程序尝试从错误的分片里弹出元素时,把伪元素推入这个分片
sharded_bpop_helper_lua(pipe, [key+':', key + endp], [shard, push, DUMMY], force_eval=True)
# 使用用户传入的BLPOP或BRPOP,执行阻塞式弹出操作
getattr(pipe, bpop)(key + ':' + shard, 1)
# 返回元素则执行完毕,否则重试
result = (pipe.execute()[-1] or [None])[-1]
if result not in (None, DUMMY):
return result
# 用户实际使用的API
def sharded_blpop(conn, key, timeout=0):
return sharded_bpop_helper(conn, key, timeout, sharded_lpop, 'blpop', ':first', 'lpush')
def sharded_brpop(conn, key, timeout=0):
return sharded_bpop_helper(conn, key, timeout, sharded_lpop, 'brpop', ':first', 'rpush')
# 处理阻塞
sharded_bpop_helper_lua = script_load("""
-- 找到需要执行弹出操作的列表端,取得这一端的分片
local shard = redis.call('get', KEYS[2]) or '0'
-- 如果程序传入的分片不是当前的端点分片
if shard ~= ARGV[1] then
-- 把伪元素推入这个分片
redis.call(ARGV[2], KEYS[1]..ARGV[1], ARGV[3])
end
""")