我对rxjs印象深刻,并开始着手于此。但是,我下面的nodejs代码并没有按预期工作,至少对我来说是这样。

let events = new EventEmitter();
let source = Rx.Observable.fromEvent( events, 'data' );

source
    .groupBy( event => event.type )
    .flatMap( group => group.reduce( ( acc, cur ) => _.merge( acc, cur ), [] ) )
    .subscribe( ( data ) => {
        console.log( data );
    } );


events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 2, msg: 'Test 3' } );

我希望subscribe会产生一些输出

最佳答案

如其他人所建议的,可观察的永远不会完成,因此任何要求前面可观察的完成的运算符永远不会发出任何信号:
但是,groupBy()操作符会为每个组发出一个GroupedObservable的实例,以便您可以订阅它。我知道这会产生一个不同于你预期的结果,但也许你可以用它:

const Rx = require('rxjs/Rx');
const EventEmitter = require('events');

let events = new EventEmitter();
let source = Rx.Observable.fromEvent(events, 'data');

source
    .groupBy( event => event.type )
    .subscribe( ( groupedObservable ) => {
        groupedObservable.subscribe(val => {
            console.log(groupedObservable.key, val);
        });
    } );


events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 5, msg: 'Test 3' } );

每个组都有自己的GroupedObservable
这将打印到控制台:
1 { type: 1, msg: 'Test 1' }
1 { type: 1, msg: 'Test 2' }
5 { type: 5, msg: 'Test 3' }

09-20 05:48