我对rxjs印象深刻,并开始着手于此。但是,我下面的nodejs代码并没有按预期工作,至少对我来说是这样。
let events = new EventEmitter();
let source = Rx.Observable.fromEvent( events, 'data' );
source
.groupBy( event => event.type )
.flatMap( group => group.reduce( ( acc, cur ) => _.merge( acc, cur ), [] ) )
.subscribe( ( data ) => {
console.log( data );
} );
events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 2, msg: 'Test 3' } );
我希望
subscribe
会产生一些输出 最佳答案
如其他人所建议的,可观察的永远不会完成,因此任何要求前面可观察的完成的运算符永远不会发出任何信号:
但是,groupBy()
操作符会为每个组发出一个GroupedObservable
的实例,以便您可以订阅它。我知道这会产生一个不同于你预期的结果,但也许你可以用它:
const Rx = require('rxjs/Rx');
const EventEmitter = require('events');
let events = new EventEmitter();
let source = Rx.Observable.fromEvent(events, 'data');
source
.groupBy( event => event.type )
.subscribe( ( groupedObservable ) => {
groupedObservable.subscribe(val => {
console.log(groupedObservable.key, val);
});
} );
events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 5, msg: 'Test 3' } );
每个组都有自己的
GroupedObservable
。这将打印到控制台:
1 { type: 1, msg: 'Test 1' }
1 { type: 1, msg: 'Test 2' }
5 { type: 5, msg: 'Test 3' }