我正在尝试扩展StreamConsumer
,这是我实现它的方式:
import 'dart:async';
class Consumer extends StreamConsumer {
List list = new List();
StreamSubscription subscription;
Future addStream(Stream stream) {
Completer c = new Completer();
subscription = stream.listen((d) => list.add(d),
onError: c.completeError,
onDone: () => c.complete(this));
return c.future;
}
Future close() {
subscription.cancel();
return new Future.value(this);
}
}
void main() {
List data = [1,2,3,4,5];
Stream stream = new Stream.fromIterable(data);
Consumer streamConsumer = new Consumer();
stream.pipe(streamConsumer).then((Consumer consumer) {
print(consumer.list);
});
}
正如预期的那样,它会打印
[1,2,3,4,5]
,但是如果我将Stream
的实现更改为以下形式:StreamController controller = new StreamController();
Stream stream = controller.stream;
controller.add(1);
controller.add(2);
controller.add(3);
stream.pipe(streamConsumer).then((Consumer consumer) {
print(consumer.list);
});
controller.add(4);
controller.add(5);
它什么也不打印。可以得出结论,根本没有调用
onDone
回调(我认为)。有什么想法吗? 最佳答案
您需要致电StreamController.close()
new Stream.fromIterable(data)
基本上会创建一个StreamController,一次添加一个可迭代元素,然后在迭代完成时关闭 Controller 。在第二个代码块中,您正在手动执行此操作,但没有关闭 Controller 。
以下是相关文档:http://api.dartlang.org/docs/releases/latest/dart_async/StreamController.html#close
关于dart - Dart有任何方法可以从stream.listen()调用onDone回调,其中流来自StreamController,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/17897217/