本文介绍了如何清除ReplaySubject上的缓冲区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何清除 ReplaySubject 上的缓冲区?

我定期需要清除缓冲区(对于我来说,这是一天结束的事件),以防止 ReplaySubject 持续增长并最终耗尽所有内存.

Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject continually growing and eventually eating all the memory.

理想情况下,我希望保持相同的 ReplaySubject ,因为客户端订阅仍然有效.

Ideally I want to keep the same ReplaySubject as the client subscriptions are still good.

推荐答案

ReplaySubject 没有提供清除缓冲区的方法,但是有一些重载以不同的方式限制其缓冲区:

ReplaySubject doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:

  • 保留项目的最大 TimeSpan
  • 最大项目数
  • 上述内容的组合,一旦满足任一条件,该物品就会掉落.

这是一个非常有趣的问题-我决定看看使用现有主题和运算符(如这些功能非常强大).事实证明,这相当简单.

This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.

我已经通过内存探查器运行它来检查它是否做对了.调用 Clear()刷新缓冲区,否则它就像常规的无界 ReplaySubject :

I've run this through a memory profiler to check it does the right thing. Call Clear() to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject:

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}

遵守常规规则(与任何 Subject 一样),不要同时在此类上调用方法-包括 Clear().如果需要,您可以轻松添加同步锁.

Respect usual rules (as with any Subject) and don't call methods on this class concurrently - including Clear(). You could add synchronization locks trivially if needed.

它通过在主ReplaySubject内嵌套一系列ReplaySubject来工作.外部ReplaySubject( _subjects )容纳一个正好是内部ReplaySubject( _currentSubject )的缓冲区,并且在构造时填充该缓冲区.

It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects) holds a buffer of exactly one inner ReplaySubject (_currentSubject), and it is populated on construction.

OnXXX 方法调用 _currentSubject ReplaySubject.

The OnXXX methods call through to the _currentSubject ReplaySubject.

观察者订阅了嵌套的ReplaySubject的串联投影(保存在 _concatenatedSubjects 中).因为 _subjects 的缓冲区大小仅为1,所以新订阅者只能获取最近的 ReplaySubject 之后的事件.

Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects). Because the buffer size of _subjects is just 1, new subscribers acquire the events of only the most recent ReplaySubject onwards.

每当我们需要清除缓冲区"时,现有的 _currentSubject OnCompleted ,并且将新的ReplaySubject添加到 _subjects 并变为新的 _currentSubject .

Whenever we need to "clear the buffer", the existing _currentSubject is OnCompleted and a new ReplaySubject is added to _subjects and becomes the new _currentSubject.

按照@Brandon的建议,我创建了 RollingReplaySubject 版本,该版本使用 TimeSpan 或输入流来发出缓冲区清除信号.我在这里为此创建了一个要点: https://gist.github.com/james-world/c46f09f32e2d4f338b07

Following @Brandon's suggestion, I created a version of RollingReplaySubject that uses either a TimeSpan or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07

这篇关于如何清除ReplaySubject上的缓冲区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-14 20:10