本质上,用Google Cloud Datastore计算滚动余额以确定何时该补充用户钱包的最佳方法是什么?
付款与交易
我维护一个付款平台,我们的用户可以在该平台上向第三方代理商付款。不幸的是,我这个行业的本质是这些付款事件不是实时发送给我们的,而是捆绑在一起,在几小时到几周后才发送给我们。
这些是影响用户钱包余额的主要对象:
class Transaction(ndb.model):
user = ndb.KeyProperty(User, required=True)
amount = ndb.FloatProperty(required=True)
# ... other fields
class Payment(ndb.model):
user = ndb.KeyProperty(User, required=True)
amount = ndb.FloatProperty(required=True)
# ... other fields
@classmethod
def charge(cls, user, amount):
# ... make a call to braintree/stripe & save result if successful
(未显示退款,“商店抵免额”,调整项等)
钱包
但是,很大一部分交易金额
他们可以加载$ 10- $ 200,交易会从该余额中扣除,而当他们的余额低(少于$ 2)时,我们会从他们的卡中扣除充值的费用。
这就是我设想钱包活动模型的方式
class WalletActivity(ndb.Model):
user = ndb.KeyProperty(User, required=True)
post_date = ndb.DateTimeProperty(required=True)
balance_increment = ndb.FloatProperty(required=True)
balance_result = ndb.FloatProperty(required=True)
# the key to the Transaction or Payment object that this is for
object_key = ndb.KeyProperty(required=True)
@classmethod
def create(cls, obj, previous_balance):
return WalletActivity(
user_key=obj.user,
post_date=datetime.datetime.now(),
balance_increment=obj.amount,
balance_result=previous_balance+obj.amount,
object_key=obj.key)
@classmethod
def fetch_last_wallet_activity(cls, user_key):
return cls.query(cls.user == user_key).order(-cls.post_date).get()
计算余额
为了确定平衡,频谱的两端似乎是:
快速计算,总结一个账户的全部钱包历史
存储预先计算的值(
WalletActivity.fetch_last_wallet_activity().balance_result
)正确的答案听起来像是2的组合。
每天结束时在每个帐户上存储某种BalanceUpdate / WalletDaySummary对象。
然后,您仅汇总今天的活动,并将其添加到昨天的BalanceUpdate。
https://stackoverflow.com/a/4376221/4458510
class BalanceUpdate(ndb.model):
user = ndb.KeyProperty(User)
cut_off_date = ndb.DateTimeProperty()
balance = ndb.IntegerProperty()
@classmethod
def current_balance(cls, user_key):
last_balance_update = cls.query(cls.user == user_key).order(
-cls.cut_off_date).get()
recent_wallet_activity = WalletActivity.query(cls.user == user_key,
cls.post_date > last_balance_update.cut_off_date).fetch()
return (last_balance_update.balance +
sum([i.balance_increment for i in recent_wallet_activity]))
但是,这可能不适用于在一天之内产生大量交易的公司帐户。
使用最新
balance_result
的WalletActivity
可能更好如何处理交易
选项1
为了处理一批交易,我们将
获取用户的余额
如果现有余额不足,请补充其帐户
将交易添加到他们的钱包
码:
def _process_transactions(user, transactions, last_wallet_activity):
transactions_amount = sum([i.amount for i in transactions])
# 2. Replenish their account if the existing balance is low
if last_wallet_activity.balance_result - transactions_amount < user.wallet_bottom_threshold:
payment = Payment.charge(
user=user,
amount=user.wallet_replenish_amount + transactions_amount)
payment.put()
last_wallet_activity = WalletActivity.create(
obj=payment,
previous_balance=last_wallet_activity.balance_result)
last_wallet_activity.put()
# 3. Add the transactions to their wallet
new_objects = []
for transaction in transactions:
last_wallet_activity = WalletActivity.create(
obj=transaction,
previous_balance=last_wallet_activity.balance_result)
new_objects.append(last_wallet_activity)
ndb.put_multi(new_objects)
return new_objects
def process_transactions_1(user, transactions):
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = WalletActivity.fetch_last_wallet_activity(user_key=user.key)
return _process_transactions(user, transactions, last_wallet_activity)
WalletActivity.fetch_last_wallet_activity().balance_result
和的问题BalanceUpdate.current_balance()
是数据存储区查询最终是一致的。我考虑过使用实体组和祖先查询,但是听起来您将面临争用错误:
Is there an Entity Group Max Size?
Google Datastore Strong consistency and Entity Group max size
选项2-通过密钥获取最后的WalletActivity
我们可以跟踪最后一个
WalletActivity
的密钥,因为通过密钥进行的提取是高度一致的:class LastWalletActivity(ndb.Model):
last_wallet_activity = ndb.KeyProperty(WalletActivity, required=True)
@classmethod
def get_for_user(cls, user_key):
# LastWalletActivity has the same key as the user it is for
return ndb.Key(cls, user_key.id()).get(use_cache=False, use_memcache=False)
def process_transactions_2(user, transactions):
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)
# update LastWalletActivity
last_wallet_activity.last_wallet_activity = new_objects[-1].key
last_wallet_activity.put()
return new_objects
另外,我可以将
last_wallet_activity
存储在User
对象上,但是我不想担心竞争条件用户在其中更新其电子邮件并清除我的
last_wallet_activity
新值选项3-付款锁定
但是在2个作业试图同时处理同一用户的事务的竞争条件下呢?
我们可以添加另一个对象来“锁定”帐户。
class UserPaymentLock(ndb.Model):
lock_time = ndb.DateTimeProperty(auto_now_add=True)
@classmethod
@ndb.transactional()
def lock_user(cls, user_key):
# UserPaymentLock has the same key as the user it is for
key = ndb.Key(cls, user_key.id())
lock = key.get(use_cache=False, use_memcache=False)
if lock:
# If the lock is older than a minute, still return False, but delete it
# There are situations where the instance can crash and a user may never get unlocked
if datetime.datetime.now() - lock.lock_time > datetime.timedelta(seconds=60):
lock.key.delete()
return False
key.put()
return True
@classmethod
def unlock_user(cls, user_key):
ndb.Key(cls, user_key.id()).delete()
def process_transactions_3(user, transactions):
# Attempt to lock the account, abort & try again if already locked
if not UserPaymentLock.lock_user(user_key=user.key):
raise Exception("Unable to acquire payment lock")
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)
# update LastWalletActivity
last_wallet_activity.last_wallet_activity = new_objects[-1].key
last_wallet_activity.put()
# unlock the account
UserPaymentLock.unlock_user(user_key=user.key)
return new_objects
我曾想过将整个事情包装成一个事务,但我需要防止将2 http改为braintree / stripe。
我倾向于选择3,但是随着我引入的每个新模型,系统都变得越来越脆弱。
最佳答案
有关竞争的设计注意事项
通常,对于您的Dan's answer,我完全同意original question的观点,尽管在您的特定用例中,使用大型实体组可能是合理的。
争用错误可能会以每秒超过1个写入op的速度发生在同一实体组上,例如一个特定的钱包。此限制还适用于从数据存储事务中的数据存储读取的实体,而无需显式地写回它们(我的理解是它们是written together with the modified entities for serializability)。
虽然1秒规则不是强制性的限制,但根据我的经验,Cloud Datastore通常可以处理略高于该限制的短脉冲,但不能保证,因此通常建议并采取最佳做法来避免大型实体组,尤其是在写操作不是来自同一用户。相反,将用户发布的所有评论存储在同一实体组(作者=父级)中可能是安全的,因为用户每秒发布多个评论的可能性很小,甚至是不可取的。另一个示例可能是对时间不敏感且不面向用户的后台任务,其中对实体组的写操作是按每个实体组编排的,或者至少在发生争用时可以大大减少写操作。
在突然以非常高的速率添加新实体的情况下,争用错误也可能是由单调增加的键/ ID或索引属性和复合索引(索引值彼此之间过于接近)(例如时间戳)引起的。建议让数据存储区自动创建新实体的ID(例如用户ID),因为数据存储区会充分分散密钥。并且要么避免在可能出现单调递增值的位置建立索引属性,要么在值前面加上哈希值。
Cloud Datastore文章Best Practices包含有关Designing for scale的部分,该部分提供了非常有用的建议。
就是说,与编写试图模仿事务方面的应用程序逻辑相比,设计一种应用程序以使其在写限制内安全地工作并依靠数据存储区(或其他数据库)的事务性和强大的一致性支持可能更容易。竞争状况,死锁等等可能导致错误,并使系统更脆弱,更容易出错。
(A)小团体的钱包活动
在最初的问题中,您提到公司帐户和付款,这建议了一些实时付款解决方案。方案:该公司中的成千上万个用户可以为一个帐户提交新交易,但很多人可能同时提交新交易,因此风险很高。如果每笔交易都属于同一实体组(公司帐户),则很容易导致争用错误。如果在写操作中执行重试,则这些操作会导致延迟延长,直到用户得到对其事务请求的响应为止。但是即使重试,写操作也很可能经常失败,并且用户经常会遇到服务器错误。这将带来糟糕的用户体验。
我倾向于选择(2),但使用Wallet
类型。您对在何处存储last_wallet_activity
表示担忧。可能有自己的Wallet
类型,该ID始终与User
相同。在这种情况下,您可以有两个单独的实体组,而不关心用户触发的User
对象上的中间更改。我还将使用数据存储事务。在同一笔交易中最多可以允许25个不同的实体组,即一批中可能有23个事件。使用当前的设计,这也将是每个钱包的最大写入率。不确定这是您的应用可接受的限制。
但是,如果某个钱包确实有很多交易(通过频繁更新last_wallet_activity
),您可能会再次面临争用的风险。为了获得每个钱包更高的写入率并避免这种争用,您还可以将选项(1)与某种sharding结合使用。
选项(3)尝试实现事务方面(请参见前面)。选项(1)确实遭受这样的事实,即查询仅最终是一致的。
(B)单个大型实体组中的钱包活动
但是,这里的问题提到所有这些钱包活动都是在后台处理的(不是通过用户请求),并且事件不是实时处理的,而是要延迟几个小时或几周。这将允许通过后台任务分批处理它们。假设您的应用将位于其他Cloud Datastore limits之内,例如如果交易的最大大小为10 MiB,则您的应用可以将每个公司钱包(实体组)每秒最多进行500次钱包活动批量写入单个写入操作。这相当于每个公司帐户每小时180万个钱包活动。即使发生重试和中断,这对于最大的公司帐户也足够了吗?如果是,并且您的产品从不更改为面向用户的实时钱包活动,那么我不明白为什么不应该将钱包活动放入每个钱包的实体组中。当然,另一种方法可能是每个公司帐户只有多个钱包。
在这种情况下,您的选项(1)应该可以使用,因为祖先查询(WalletActivity
查询中的钱包是祖先)是高度一致的。
使用ndb.put_multi()
我已经看到您在同一请求中使用了多个put()
调用,您可以在其中完美地将实体收集在某个toPut
列表中,然后将它们全部一起编写。这样,您可以节省实例运行时,还可以减少对同一实体组的写操作次数。
如何避免重复的付款请求
关于向Braintree或Stripe的付款请求:
在将钱包活动添加到将写入数据存储的下一批之前,请检查钱包的余额是否足够。
如果余额不足,请停止添加更多活动。
在将批处理写入数据存储区的数据存储区事务内,添加transactional push task(如果事务失败,则不会创建)。我相信,GAE / NDB每个HTTP请求最多接受5个事务任务。
该任务将负责将请求发送到Braintree / Stripe,并更新钱包的余额。并且这也应该在数据存储事务中,并具有事务任务以继续处理它之前离开的事件。
您需要处理Braintree / Stripe拒绝付款请求的情况。
我也不知道您的事件如何到达应用程序,因此我不确定安排每个钱包的批处理任务的最佳方法。上面的模式表明,对于每个钱包来说,只有一个任务正在运行(即,对于同一个钱包,没有并行的多个任务/批次)。但是,您可以通过不同的方式执行此操作,具体取决于事件到达应用程序的方式。
关于python - 用户的Google NDB数据存储支票帐户/电子钱包。如何计算余额,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48738119/