我正在研究Luigi管道,该管道检查是否存在手动创建的文件,如果存在,则继续执行下一个任务:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

我想要的是luigi在创建手动文件并将其粘贴到路径后继续。当我这样做时,它没有查找文件并继续执行任务,而是每隔几秒钟重新检查一次新任务:
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

经过相当长的时间(15-20分钟左右)后,luigi将找到该文件,然后可以根据需要继续操作。我该如何防止这种延迟?我希望luigi在文件存在后立即继续。

最佳答案

请记住以下几点:

  • Luigi工作线程将在至少有一个任务正在运行之前退出(或者如果是keep_alive = True,在没有更多待处理任务的情况下它将退出)。
  • 对于失败的任务,有重试逻辑,默认重试间隔为15分钟。
  • 重试逻辑如下。在指定的重试间隔后,调度程序将忘记任务的失败(与单击UI中的“原谅失败”按钮相同),并将任务的状态更改为“待处理”。下次 worker 要求调度员进行工作时,可以将此任务分配给 worker 。
  • 不完整的外部任务将计为FAILED,但要重试逻辑。
  • 外部任务的重试行为由retry_external_tasks部分中的[worker]配置设置控制。

  • 我认为您正在观察的是这样的。您的管道正在运行,任务ProcessExternalFile失败,然后添加文件,该任务在retry_delay的持续时间内保持FAILED,然后最终变为PENDING,并再次为工作人员提供了此任务,这时它将发现文件和任务变得完整。

    这是否是所需的行为取决于您。如果您希望更快地找到文件,则可以更改重试间隔。或者,您可以在while方法中进行无限run循环,并定期检查文件,并在找到时退出循环。您也可以将Luigi配置为完全禁用重试逻辑。

    关于Python Luigi-满意时继续执行外部任务,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37988423/

    10-14 19:06
    查看更多