问题描述
我目前有一个在 appengine 上运行的应用程序,我正在使用延迟库执行一些作业,其中一些任务每天运行,而其中一些任务每月执行一次.大多数这些任务查询 Datastore 以检索文档,然后将实体存储在索引中(搜索 API).其中一些表每月更换一次,我必须在所有实体(4~5M)上运行这些任务.
I currently have an application running on appengine and I am executing a few jobs using the deferred library, some of these tasks run daily, while some of them are executed once a month. Most of these tasks query Datastore to retrieve documents and then store the entities in an index (Search API). Some of these tables are replaced monthly and I have to run these tasks on all entities (4~5M).
此类任务的一个示例是:
One exemple of such a task is:
def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
#get index
BATCH_SIZE = 200
cps, next_cursor, more = Company.query().\
fetch_page(BATCH_SIZE,
start_cursor=cursor)
doc_list = []
for i in range(0, len(cps)):
cp = cps[i]
#create a Index Document using the Datastore entity
#this document has only about 5 text fields and one date field
cp_doc = getCompanyDocument(cp)
doc_list.append(cp_doc)
index = search.Index(name='Company')
index.put(doc_list)
n_entities += len(doc_list)
if more:
logging.debug('Company: %d added to index', n_entities)
#to_put[:] = []
doc_list[:] = []
deferred.defer(addCompaniesToIndex,
cursor=next_cursor,
n_entities=n_entities,
mindate=mindate)
else:
logging.debug('Finished Company index creation (%d processed)', n_entities)
当我只运行一项任务时,每个延迟任务的执行时间大约为 4-5 秒,因此索引我的 500 万个实体需要大约 35 小时.
When I run one task only, the execution takes around 4-5s per deferred task, so indexing my 5M entities would take about 35 hours.
另一件事是,当我在同一队列上使用不同的延迟任务对另一个索引(例如,每日更新之一)运行更新时,两者的执行速度都会慢很多.并且每个延迟呼叫开始花费大约 10-15 秒,这是无法忍受的.
Another thing is that when I run an update on another index (eg, one of the daily updates) using a different deferred task on the same queue, both are executed a lot slower. And start taking about 10-15 seconds per deferred call which is just unbearable.
我的问题是:有没有办法更快地做到这一点并将推送队列扩展到每次运行的多个作业?或者我应该使用不同的方法来解决这个问题?
My question is: is there a way to do this faster and scale the push queue to more than one job running each time? Or should I use a different approach for this problem?
提前致谢,
推荐答案
我想我终于设法通过使用上一个答案提出的两个队列和想法来解决这个问题.
I think I finally managed to get around this issue by using two queues and idea proposed by the previous answer.
- 在第一个队列中,我们只查询主要实体(使用 keys_only).并在第二个队列中为这些键启动另一个任务.然后,第一个任务将使用 next_cursor 在 队列 1 上重新启动.
- 第二个队列获取实体键并在全文搜索/BigQuery/PubSub 上执行所有查询和插入操作.(这很慢~每组 100 个键 15 秒)
我也尝试过只使用一个队列,但处理吞吐量并不好.我相信这可能来自这样一个事实,即我们在同一队列和调度程序上运行缓慢和快速的任务 在这种情况下可能无法正常工作.
I tried using only one queue as well but the processing throughput was not as good. I believe that this might come from the fact that we have slow and fast tasks running on the same queue and the scheduler might not work as well in this case.
这篇关于Google Appengine:任务队列性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!