问题描述
财务和强化学习中的一个常用术语是基于原始奖励R[i]
的时间序列的折现累积奖励C[i]
.给定一个数组R
,我们想计算满足C[-1] = R[-1]
和C[-1] = R[-1]
递归C[i] = R[i] + discount * C[i+1]
的C[i]
(并返回完整的数组C
).
A common term in finance and reinforcement learning is the discounted cumulative reward C[i]
based on a time series of raw rewards R[i]
. Given an array R
, we'd like to calculate C[i]
satisfying the recurrence C[i] = R[i] + discount * C[i+1]
with C[-1] = R[-1]
(and return the full array C
).
使用numpy数组在python中计算数值的稳定方法可能是:
A numerically stable way of calculating this in python with numpy arrays might be:
import numpy as np
def cumulative_discount(rewards, discount):
future_cumulative_reward = 0
assert np.issubdtype(rewards.dtype, np.floating), rewards.dtype
cumulative_rewards = np.empty_like(rewards)
for i in range(len(rewards) - 1, -1, -1):
cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
future_cumulative_reward = cumulative_rewards[i]
return cumulative_rewards
但是,这依赖于python循环.鉴于这是一种常见的计算方法,因此肯定有一个现有的矢量化解决方案,它依赖于其他一些标准功能,而无需求助于cythonization.
However, this relies on a python loop. Given that this is such a common calculation, surely there's an existing vectorized solution relying on some other standard functions without resorting to cythonization.
请注意,任何使用np.power(discount, np.arange(len(rewards))
之类的解决方案都是不稳定的.
Note that any solution using something like np.power(discount, np.arange(len(rewards))
won't be stable.
推荐答案
您可以使用 scipy.signal.lfilter 来解决递归关系:
You could use scipy.signal.lfilter to solve the recurrence relation:
def alt(rewards, discount):
"""
C[i] = R[i] + discount * C[i+1]
signal.lfilter(b, a, x, axis=-1, zi=None)
a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
- a[1]*y[n-1] - ... - a[N]*y[n-N]
"""
r = rewards[::-1]
a = [1, -discount]
b = [1]
y = signal.lfilter(b, a, x=r)
return y[::-1]
此脚本测试结果是否相同:
This script tests that the result is the same:
import scipy.signal as signal
import numpy as np
def orig(rewards, discount):
future_cumulative_reward = 0
cumulative_rewards = np.empty_like(rewards, dtype=np.float64)
for i in range(len(rewards) - 1, -1, -1):
cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
future_cumulative_reward = cumulative_rewards[i]
return cumulative_rewards
def alt(rewards, discount):
"""
C[i] = R[i] + discount * C[i+1]
signal.lfilter(b, a, x, axis=-1, zi=None)
a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
- a[1]*y[n-1] - ... - a[N]*y[n-N]
"""
r = rewards[::-1]
a = [1, -discount]
b = [1]
y = signal.lfilter(b, a, x=r)
return y[::-1]
# test that the result is the same
np.random.seed(2017)
for i in range(100):
rewards = np.random.random(10000)
discount = 1.01
expected = orig(rewards, discount)
result = alt(rewards, discount)
if not np.allclose(expected,result):
print('FAIL: {}({}, {})'.format('alt', rewards, discount))
break
这篇关于向量化Numpy折扣计算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!