我看到了很多代码片段,其中在Spout.nextTuple()内使用了一个循环(例如,读取整个文件并为每一行发出一个元组):

public void nextTuple() {
    // do other stuff here

    // reader might be BufferedReader that is initialized in open()
    String str;
    while((str = reader.readLine()) != null) {
        _collector.emit(new Values(str));
    }

    // do some more stuff here
}

这段代码似乎很简单,但是,有人告诉我,不应在nextTuple()内循环。问题是为什么?

最佳答案

执行Spout时,它将在单个线程中运行。该线程“永远”循环并具有多个职责:

  • 调用Spout.nextTuple()
  • 检索“acks”并对其进行处理
  • 检索“失败”并对其进行处理
  • 超时元组

  • 为了做到这一点,至关重要的是,不要“永远”停留在nextTuple()中(即循环或阻塞),而是在将元组发送给系统后返回(或者如果没有元组可以被发送,则仅返回,但是不要这样做)阻止)。否则,Spout无法正常工作。 Storm将在循环中调用nextTuple()。因此,在处理了确认/失败消息等之后,对nextTuple()的下一次调用很快发生。

    因此,在单个调用nextTuple()的过程中发出多个元组也被认为是不好的做法。只要代码停留在nextTuple()中,喷口线程就无法(例如)对传入的ack使用react。这可能导致不必要的超时,因为无法及时处理停机。

    最佳实践是为每次调用nextTuple()发出一个元组。如果没有可用的元组发出,则应返回(不进行发射),而不要等到可用的元组为止。

    10-07 20:01