问题描述
如何清除 ReplaySubject
上的缓冲区?
我定期需要清除缓冲区(对于我来说,这是一天结束的事件),以防止 ReplaySubject
持续增长并最终耗尽所有内存.
Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject
continually growing and eventually eating all the memory.
理想情况下,我希望保持相同的 ReplaySubject
,因为客户端订阅仍然有效.
Ideally I want to keep the same ReplaySubject
as the client subscriptions are still good.
推荐答案
ReplaySubject
没有提供清除缓冲区的方法,但是有一些重载以不同的方式限制其缓冲区:
ReplaySubject
doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:
- 保留项目的最大
TimeSpan
- 最大项目数
- 上述内容的组合,一旦满足任一条件,该物品就会掉落.
这是一个非常有趣的问题-我决定看看使用现有主题和运算符(如这些功能非常强大).事实证明,这相当简单.
This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject
you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.
我已经通过内存探查器运行它来检查它是否做对了.调用 Clear()
刷新缓冲区,否则它就像常规的无界 ReplaySubject
:
I've run this through a memory profiler to check it does the right thing. Call Clear()
to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
遵守常规规则(与任何 Subject
一样),不要同时在此类上调用方法-包括 Clear()
.如果需要,您可以轻松添加同步锁.
Respect usual rules (as with any Subject
) and don't call methods on this class concurrently - including Clear()
. You could add synchronization locks trivially if needed.
它通过在主ReplaySubject内嵌套一系列ReplaySubject来工作.外部ReplaySubject( _subjects
)容纳一个正好是内部ReplaySubject( _currentSubject
)的缓冲区,并且在构造时填充该缓冲区.
It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects
) holds a buffer of exactly one inner ReplaySubject (_currentSubject
), and it is populated on construction.
OnXXX
方法调用 _currentSubject
ReplaySubject.
The OnXXX
methods call through to the _currentSubject
ReplaySubject.
观察者订阅了嵌套的ReplaySubject的串联投影(保存在 _concatenatedSubjects
中).因为 _subjects
的缓冲区大小仅为1,所以新订阅者只能获取最近的 ReplaySubject
之后的事件.
Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects
). Because the buffer size of _subjects
is just 1, new subscribers acquire the events of only the most recent ReplaySubject
onwards.
每当我们需要清除缓冲区"时,现有的 _currentSubject
为 OnCompleted
,并且将新的ReplaySubject添加到 _subjects
并变为新的 _currentSubject
.
Whenever we need to "clear the buffer", the existing _currentSubject
is OnCompleted
and a new ReplaySubject is added to _subjects
and becomes the new _currentSubject
.
按照@Brandon的建议,我创建了 RollingReplaySubject
版本,该版本使用 TimeSpan
或输入流来发出缓冲区清除信号.我在这里为此创建了一个要点: https://gist.github.com/james-world/c46f09f32e2d4f338b07
Following @Brandon's suggestion, I created a version of RollingReplaySubject
that uses either a TimeSpan
or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07
这篇关于如何清除ReplaySubject上的缓冲区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!