我有一个activejob,它应该通过http从外部系统加载数据。当该作业完成时,我希望将执行一些后处理的第二个作业排队,然后将数据提交到其他外部系统。
我不想让第一份工作知道第二份工作,因为
封装
可重用性
基本上不关第一份工作的事
同样,我不希望第一个作业关心如果数据加载失败,接下来会发生什么——也许用户会收到通知,也许我们会在超时后重试,也许我们只是记录它并举手——同样,它可能会根据异常的详细信息而变化,而且这个作业不需要包含这个逻辑,也不需要连接到其他系统来处理它。
在Java(这是我最有经验的地方),我可以使用一些类似番石榴的“AA>”来增加事实上的成功和失败回调:

MyDataLoader loader = new MyDataLoader(someDataSource)
ListenableFuture<Data> future = executor.submit(loader);
Futures.addCallback(future, new FutureCallback<Data>() {
    public void onSuccess(Data result) {
        processData(result);
    }
    public void onFailure(Throwable t) {
        handleFailure(t);
    }
});

不过,active job似乎并没有提供这种外部回调机制——正如我从“activejob basics”中的ListenableFuturerelevant所能理解的那样,after_performrescue_from只能从job类中调用。而after_peform并不是用来区分成功和失败的。
所以我能想到的最好的办法(我并不是说它很好)就是把一些lambdas传递到job的perform方法中,这样:
class MyRecordLoader < ActiveJob::Base

  # Loads data expensively (hopefully on a background queue) and passes
  # the result, or any exception, to the appropriate specified lambda.
  #
  # @param data_source [String] the URL to load data from
  # @param on_success [-> (String)] A lambda that will be passed the record
  #   data, if it's loaded successfully
  # @param on_failure [-> (Exception)] A lambda that will be passed any
  #   exception, if there is one
  def perform(data_source, on_success, on_failure)
    begin
      result = load_data_expensively_from data_source
      on_success.call(result)
    rescue => exception
      on_failure.call(exception)
    end
  end

end

(附带说明:我不知道将lambdas声明为参数的yardoc语法是什么。这看起来是正确的,还是说,如果没有,似乎是合理的?)
然后,调用者必须将这些信息传入:
MyRecordLoader.perform_later(
  some_data_source,
  method(:process_data),
  method(:handle_failure)
)

这并不可怕,至少在主叫端是这样,但看起来很笨拙,我不禁怀疑这有一个共同的模式,我只是没有找到。我有点担心,作为一个ruby/rails新手,我只是弯曲activejob去做一些本来就不该做的事情。我发现的所有activejob示例都是“fire and forget”--异步“返回”结果似乎不是activejob用例。
另外,我也不清楚这是否适用于像resque这样的后端,它在一个单独的进程中运行作业。
“鲁比的方式”是什么?
更新:由于dre hh的sections,activejob在这里不是正确的工具。它也不可靠,而且对这种情况来说过于复杂。我改为hinted at,它更适合用例,而且由于任务大多是io绑定的,即使在mri上也足够快,Concurrent Ruby

最佳答案

activejob不像future或promise那样是异步库。
它只是一个在后台执行任务的接口。当前线程/进程未收到此操作的结果。
例如,当使用sidekiq作为activejob队列时,它会将perform方法的参数序列化到redis存储中。在rails应用程序上下文中运行的另一个守护进程进程将监视redis队列并用序列化的数据实例化您的工作进程。
所以传递回调可能没问题,但是为什么要将它们作为另一个类上的方法。如果这些回调是动态的(在不同的调用中更改),则传递回调是有意义的。但是,当您在调用类上实现了这些方法时,请考虑将这些方法移到您的job worker类中。

10-07 18:13