本文介绍了GAE:单元测试taskqueue与测试平台的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



当我在一个单元中向任务队列提交一个任务时测试,看起来任务在队列中,但任务不执行。



在单元测试期间如何获得要执行的任务?

解决方案

开发应用程序服务器是单线程的,所以无法在前台线程运行测试时在后台运行任务。



我修改了taskqueue.py中的TaskQueueTestCase,添加了以下函数:

  def execute_tasks(self,application):

执行所有当前排队的任务,并将它们从
队列中移除
任务执行提供的Web应用程序


#设置webtest使用的应用程序(如果以前使用了
#不同的应用程序,请重置_app)。
self._app =无
self.APPLICATION = application

#获取所有任务,然后清除它们。
tasks = self.get_tasks()
self.clear_task_queue()

#运行每个任务,检查它们是否成功。
用于任务中的任务:
response = self.post(task ['url'],task ['params'])
self.assertOK(response)

为此,我还必须将TaskQueueTestCase的基类从BaseTestCase更改为WebTestCase。



我的测试然后做这样的事情:

 #做一些使任务排队的东西。 

#检查任务是否已排队,然后执行它。
self.assertTrue(len(self.get_tasks()),1)
self.execute_tasks(some_module.application)

#现在测试该任务是否符合预期。

这因此直接从前台单元测试执行任务。这与生产中的不完全相同(即,任务将在一段时间后在单独的请求中执行),但它对我来说足够好。


I'm using testbed to unit test my google app engine app, and my app uses a taskqueue.

When I submit a task to a taskqueue during a unit test, it appears that the task is in the queue, but the task does not execute.

How do I get the task to execute during a unit test?

解决方案

The dev app server is single-threaded, so it can't run tasks in the background while the foreground thread is running the tests.

I modified TaskQueueTestCase in taskqueue.py in gaetestbed to add the following function:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before).
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

For this to work, I also had to change the base class of TaskQueueTestCase from BaseTestCase to WebTestCase.

My tests then do something like this:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

This therefore executes the task directly from the foreground unit test. This is not quite the same as in production (ie, the task will get executed 'some time later' on a separate request), but it works well enough for me.

这篇关于GAE:单元测试taskqueue与测试平台的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-03 00:07