本文介绍了Google appengine:任务队列性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前有一个在appengine上运行的应用程序,我正在使用延迟库执行一些作业,其中一些任务每天运行,而其中一些任务每月执行一次。这些任务中的大多数都会查询Datastore以检索文档,然后将实体存储在索引(Search API)中。其中一些表格每月都会更换一次,我必须在所有实体上运行这些任务(4〜5M)。



这样一个任务的一个例子是:

  def addCompaniesToIndex(cursor = None,n_entities = 0,mindate = None):
#get index
BATCH_SIZE = 200
cps,next_cursor,more = Company.query()。\
fetch_page(BATCH_SIZE,
start_cursor = cursor)

doc_list = []

在范围内(0,len(cps)):
cp = cps [i]
#使用Datastore实体创建索引文档
#此文档仅包含约5个文本字段和一个日期字段
cp_doc = getCompanyDocument(cp)
doc_list.append(cp_doc)
$ b $ index = search.Index(name ='Company')
index.put(doc_list)

n_entities + = len(doc_list)

如果更多:
logging.debug('Company:%d added to index',n_entities)
#to_put [:] = []
doc_list [:] = []
deferred.defer(addCompaniesToIndex,
cursor = next_cursor,
'n_entities = n_entities,
mindate = mindate)
else:
logging.debug('已完成的公司索引创建(处理%d)',n_entities)

当我只运行一个任务时,执行每个延迟任务大约需要4-5秒,因此索引我的5M实体需要大约35个小时。 / p>

另一件事是,当我在同一队列上使用不同的延迟任务对另一个索引(例如,每日更新之一)运行更新时,两者都执行了很多比较慢。并且每个延迟调用开始需要大约10-15秒,这是无法忍受的。



我的问题是:有没有一种方法可以更快地完成此操作,并将推送队列扩展到更多比每次运行一个作业?或者我应该使用不同的方法解决这个问题?



在此先感谢, 解决方案 div>

我想我终于设法解决了这个问题,使用了前面的答案提出的两个队列和想法。




  • 开第一个队列我们只查询主要实体(带有keys_only)。并在这些密钥的第二个队列上启动另一个任务。第一个任务将在队列1 上使用 next_cursor 重新启动。第二个队列获取实体键并执行所有查询并插入全文搜索/ BigQuery / PubSub。 (这是缓慢的〜每组100个键〜15s)


    我试过只使用一个队列,但处理吞吐量不是好。我相信这可能来自这样一个事实,即我们在同一队列和调度程序上运行的任务速度缓慢而且速度很快,因此我们不得不考虑这一点。 -app-enginerel =nofollow noreferrer>在这种情况下可能无法正常工作。


    I currently have an application running on appengine and I am executing a few jobs using the deferred library, some of these tasks run daily, while some of them are executed once a month. Most of these tasks query Datastore to retrieve documents and then store the entities in an index (Search API). Some of these tables are replaced monthly and I have to run these tasks on all entities (4~5M).

    One exemple of such a task is:

    def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
        #get index
        BATCH_SIZE = 200
        cps, next_cursor, more = Company.query().\
                                         fetch_page(BATCH_SIZE,
                                                    start_cursor=cursor)
    
        doc_list = []
    
        for i in range(0, len(cps)):
            cp = cps[i]
            #create a Index Document using the Datastore entity
            #this document has only about 5 text fields and one date field
            cp_doc = getCompanyDocument(cp)
            doc_list.append(cp_doc)
    
        index = search.Index(name='Company')
        index.put(doc_list)
    
        n_entities += len(doc_list)
    
        if more:
            logging.debug('Company: %d added to index', n_entities)
            #to_put[:] = []
            doc_list[:] = []
            deferred.defer(addCompaniesToIndex,
                           cursor=next_cursor,
                           n_entities=n_entities,
                           mindate=mindate)
        else:
            logging.debug('Finished Company index creation (%d processed)', n_entities)
    

    When I run one task only, the execution takes around 4-5s per deferred task, so indexing my 5M entities would take about 35 hours.

    Another thing is that when I run an update on another index (eg, one of the daily updates) using a different deferred task on the same queue, both are executed a lot slower. And start taking about 10-15 seconds per deferred call which is just unbearable.

    My question is: is there a way to do this faster and scale the push queue to more than one job running each time? Or should I use a different approach for this problem?

    Thanks in advance,

    解决方案

    I think I finally managed to get around this issue by using two queues and idea proposed by the previous answer.

    • On the first queue we only query the main entities (with keys_only). And launch another task on a second queue for those keys. The first task will then relaunch itself on queue 1 with the next_cursor.
    • The second queue gets the entity keys and does all the queries and inserts on Full text search/BigQuery/PubSub. (this is slow ~ 15s per group of 100 keys)

    I tried using only one queue as well but the processing throughput was not as good. I believe that this might come from the fact that we have slow and fast tasks running on the same queue and the scheduler might not work as well in this case.

    这篇关于Google appengine:任务队列性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 01:50