当我运行写入Google Cloud Datastore的数据流作业时,有时我会看到指标显示我有一两个datastoreRpcErrors
:
由于这些数据存储区写入通常包含一批密钥,因此我想知道在RpcError的情况下是否会自动进行某些重试。如果没有,什么是处理这些情况的好方法?
最佳答案
tl;博士:默认情况下,datastoreRpcErrors
将自动使用5次重试。
我在Beam python sdk中研究了 datastoreio
的代码。看起来最终实体突变是通过DatastoreWriteFn()
批量刷新的。
# Flush the current batch of mutations to Cloud Datastore.
_, latency_ms = helper.write_mutations(
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000)
RPCError被
write_mutations
中helper
中的此代码块捕获;还有一个用于@retry.with_exponential_backoff
方法的装饰器commit
;且默认重试次数设置为5; retry_on_rpc_error
定义触发重试的具体RPCError
和SocketError
原因。for mutation in mutations:
commit_request.mutations.add().CopyFrom(mutation)
@retry.with_exponential_backoff(num_retries=5,
retry_filter=retry_on_rpc_error)
def commit(request):
# Client-side throttling.
while throttler.throttle_request(time.time()*1000):
try:
response = datastore.commit(request)
...
except (RPCError, SocketError):
if rpc_stats_callback:
rpc_stats_callback(errors=1)
raise
...