本文介绍了根据流中的值暂停 RxJS 流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的组件,带有一个按钮,可以启动和暂停由 RxJS 计时器生成的数字流.

I have a simple component with a single button that starts and pauses a stream of numbers generated by RxJS timer.

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  active$ = new BehaviorSubject<boolean>(true);

  ngOnInit(): void {
    const on$ = this.active$.pipe(filter(v => v));
    const off$ = this.active$.pipe(filter(v => !v));

    const stream$ = timer(500, 500).pipe(share());

    const out$ = merge(
      stream$.pipe(
        bufferToggle(off$, () => on$),
        mergeAll(),
      ),
      stream$.pipe(
        windowToggle(on$, () => off$),
        mergeAll(),
      ),
    );

    out$.subscribe(v => console.log(v));
  }

  toggle(): void {
    this.active$.next(!this.active$.value);
  }
}

这很完美,但我需要再添加一项功能!

This works perfectly but I need to add one more feature!

我需要根据流中满足条件的值自动暂停流.

例如,如果最新值是 5 的倍数,则暂停流.

For example, pause the stream if the latest value is a multiple of 5.

您有什么想法可以做到这一点吗?

Do you have any ideas how to do this?

这是一个关于 stackblitz 的可运行示例 https://stackblitz.com/edit/angular-6hjznn

Here is a runnable example on stackblitz https://stackblitz.com/edit/angular-6hjznn

推荐答案

可以 (1) 扩展您当前的 bufferToggle/windowToggle 方法或 (2) 使用自定义缓冲区实现.

It's possible to either (1) expand your current bufferToggle / windowToggle approach or to (2) use a custom buffer implementation.

您可以在bufferToggle之后向操作员队列中添加一个数组.

You can add an array to the operator queue after bufferToggle.

  1. bufferToggle 发出时,将这些值附加到数组中.
  2. 从数组中获取值,直到数组中的某个元素与停止条件匹配.
  3. 发出这些值并暂停您的直播.
  1. When bufferToggle emits append those values to the array.
  2. Take values from the array until a certain element in the array matches a halt condition.
  3. Emit those values and pause your stream.

可暂停(演示)

pausable 运算符将发出与暂停条件匹配的值,然后立即停止流.

pausable (Demo)

The pausable operator will emit values that match the halt condition and then stop the stream immediately.

export function pausable<T, O>(
  on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values
  off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
  haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
  pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
  spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
  return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
    let buffer: T[] = [];
    return merge(
      source.pipe(
        bufferToggle(off$, () => on$),
        tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
        map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
        tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
        map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
        mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
      ),
      source.pipe(
        windowToggle(on$, () => off$),
        mergeMap(x => x),
        tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
      ),
    );
  });
}

您可以根据您的特定需求调整此运算符,例如使用较少的输入参数并将share 加入其中,参见这个参数较少的版本.

You can adjust this operator to your specific needs e.g. use less input parameters and incorporate share into it, see this version with less parameters.

使用

active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));

interval(500).pipe(
  share(),
  pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0


2.一个完全自定义的缓冲区

您可以仅使用一个类似于 Brandon 方法的输入 observable 来采用完全自定义的方法.


2. A fully custom buffer

You can go with a fully custom approach using only one input observable similar to Brandon's approach.

bufferIf 将在给定的 condition 发出 true 时缓冲传入的值,并在 true 时从缓冲区发出所有值或传递新值code>condition 为 false.

bufferIf will buffer incoming values when the given condition emits true and emits all values from the buffer or passes new ones through when the condition is false.

export function bufferIf<T>(condition: Observable<boolean>) {
  return (source: Observable<T>) => defer(() => {
    const buffer: T[] = [];
    let paused = false;
    let sourceTerminated = false;
    return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
      source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
      condition.pipe(map(v => [v, 1]))
    ).pipe( // add values from the source to the buffer or set the paused variable
      tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean),
      switchMap(_ => new Observable<T>(s => {
        setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
          while (buffer.length > 0 && !paused) s.next(buffer.shift())
        }, 0)
      })), // complete the stream when the source terminated and the buffer is empty
      takeWhile(_ => !sourceTerminated || buffer.length > 0, true)
    );
  })
}

使用

pause$ = new BehaviorSubject<boolean>(false);

interval(500).pipe(
  bufferIf(this.pause$),
  tap(value => this.pauseOn(value) ? this.pause$.next(true) : null)
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

这篇关于根据流中的值暂停 RxJS 流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 04:21