


I need the functionality of a hysteresis filter in RX. It should emit a value from the source stream only when the previously emitted value and the current input value differ by a certain amount. As a generic extension method, it could have the following signature:

public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)

我无法弄清楚如何使用现有的运营商来实现这一目标.我正在从RxJava寻找类似 lift 的方法,这是创建我自己的运算符的任何其他方法.我已经看到了这个清单,但我在网上找不到任何示例.

I was not able to figure out how to implement this with existing operators. I was looking for something like lift from RxJava, any other method to create my own operator. I have seen this checklist, but I haven't found any example on the web.

以下方法(实际上是相同的)对我来说似乎是解决方法,但是还有更多的 Rx方法可以做到这一点,例如无需包装subject或实际实现运算符?

The following approaches (both are actually the same) which seem workaround to me work, but is there a more Rx way to do this, like without wrapping a subject or actually implementing an operator?

async Task Main()
    var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

    var rnd = new Random();
    var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
            .Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)

    s.Subscribe(Console.WriteLine, cts.Token);

    s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
    s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);

    await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);

public static class ReactiveOperators
    public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
        return new InternalHysteresisFilter<T>(source, filter).AsObservable;

    public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
        var subject = new Subject<T>();
        T lastEmitted = default;
        bool emitted = false;

            value =>
                if (!emitted || filter(lastEmitted, value))
                    lastEmitted = value;
                    emitted = true;
            , ex => subject.OnError(ex)
            , () => subject.OnCompleted()

        return subject;

    private class InternalHysteresisFilter<T>: IObserver<T>
        Func<T, T, bool> filter;
        T lastEmitted;
        bool emitted;

        private readonly Subject<T> subject = new Subject<T>();

        public IObservable<T> AsObservable => subject;

        public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
            this.filter = filter;

        public IDisposable Subscribe(IObserver<T> observer)
            return subject.Subscribe(observer);

        public void OnNext(T value)
            if (!emitted || filter(lastEmitted, value))
                lastEmitted = value;
                emitted = true;

        public void OnError(Exception error)

        public void OnCompleted()


Sidenote: There will be several thousand of such filters applied to as many streams. I need throughput over latency, thus I am looking for the solution with the minimum of overhead both in CPU and in memory even if others look fancier.


我在书中看到的大多数示例 Rx简介正在使用方法Observable.Create创建新的运算符.

Most examples I've seen in the book Introduction to Rx are using the method Observable.Create for creating new operators.

public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source,
    Func<T, T, bool> predicate)
    return Observable.Create<T>(observer =>
        T lastEmitted = default;
        bool emitted = false;
        return source.Subscribe(value =>
            if (!emitted || predicate(lastEmitted, value))
                lastEmitted = value;
                emitted = true;
        }, observer.OnError, observer.OnCompleted);


09-06 02:31