问题描述
我的服务器端给我发邮件的批次。消息在分批和频率没有是任意的。有时我得到的消息间隔为1分钟,有时没有消息了一个小时。有时1消息,有时10.我的当前实现使用 Observable.Buffer(TimeSpan.FromSeconds(5))
组并发送消息给用户。
My server side sends me batches of messages. The no of messages in a batch and frequency is arbitrary. At times I get messages at 1 minute intervals and at times not messages for an hour. At times 1 message and at times 10. My current implementation uses Observable.Buffer(TimeSpan.FromSeconds(5))
to group and send the messages to subscriber.
而不必检查每5秒,有没有办法来配置观测说把你的缓冲消息到用户,如果有斧秒延时两个消息的。
Instead of having to check every 5 seconds, is there a way to configure the Observable to say send your buffered messages to subscriber if there's a x seconds delay between two message.
我在哪里得到的是避免不必要的计时器滴答每5秒。接受其他的建议,以优化批量处理。
Where I am getting at is to avoid a unnecessary timer ticking every 5 seconds. Open to other suggestions to optimize the batch processing.
推荐答案
decPL建议一个新的缓冲区。它产生的流,其第一 OnNext()
或 OnCompleted()
信号刷新当前缓冲区。 decPLs code是这样的:
Using a bufferClosingSelector factory method
decPL suggested using the overload of Buffer
that accepts a bufferClosingSelector
- a factory function that is called at the opening of a new buffer. It produces a stream whose first OnNext()
or OnCompleted()
signals flushing the current buffer. decPLs code looked like this:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
这使得建立一个解决方案长足的进步,但它有几个问题:
This makes considerable progress towards a solution, but it has a couple of problems:
- 服务器不会在这里的消息油门时间内持续公布活动期间发送消息。这可能会导致大的,很少公布名单。
- 有多种订阅源;如果是冷的,这可能会产生意想不到的副作用。在
bufferClosingSelector
工厂被称为的每个的缓冲结束,所以如果源是冷的,将被从最初的事件节流,而不是最新的
- The server will not send messages during periods of activity where messages are published consistently within the throttle duration. This could result in large, infrequently published lists.
- There are multiple subscriptions to the source; if it is cold this may have unintended side effects. The
bufferClosingSelector
factory is called after each buffer closing, so if the source is cold it would be throttling from the initial events, rather than the most recent.
我们需要使用额外的机制来限制缓冲区的长度和prevent不确定的限制。 缓存
的过载,使您可以指定最大长度,但不幸的是你不能用一个结束选择结合起来。
We need to use an additional mechanism to limit the buffer length and prevent indefinite throttling. Buffer
has an overload that allows you to specify a maximum length, but unfortunately you can't combine this with a closing selector.
让我们把所需要的缓冲区长度的限制的的 N 的。回想一下第一个 OnNext
闭幕选择就足以关闭缓冲,所以我们需要做的就是合并
的油门与发送 OnNext
之后的的 N 的事件从源头计数流。我们可以使用。取(N).LastAsync()
来做到这一点;拿第一的的 N 的事件,但忽略所有,但最后这一点。这是在接收一个非常有用的模式。
Let's call the desired buffer length limit n. Recall the first OnNext
of the closing selector is enough to close the buffer, so all we need to do is Merge
the throttle with a counting stream that sends OnNext
after n events from the source. We can use .Take(n).LastAsync()
to do this; take the first n events but ignore all but the last of this. This is a very useful pattern in Rx.
为了解决的 bufferClosingSelector
工厂resubscribing到源的问题,我们需要使用的常见模式.Publish()。引用计数()
在源上给我们只会发送最近的事件给用户流。这也是一个非常有用的模式记。
In order to address the issue of the bufferClosingSelector
factory resubscribing to the source, we need to use the common pattern of .Publish().RefCount()
on the source to give us a stream that will only send the most recent events to subscribers. This is also a very useful pattern to remember.
下面是返工code,其中油门持续时间合并计数器:
Here is the reworked code, where the throttle duration is merged with a counter:
var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;
// single subscription to source
var sourcePub = source.Publish().RefCount();
var output = sourcePub.Buffer(
() => sourcePub.Throttle(throttleDuration)
.Merge(sourcePub.Take(bufferSize).LastAsync()));
生产就绪code和;测试
下面是一个投入生产的实施与测试(使用的NuGet包RX-测试和放大器; NUnit的)。注意调度程序的参数,以支持测试。
Production Ready Code & Tests
Here is a production ready implementation with tests (use nuget packages rx-testing & nunit). Note the parameterization of the scheduler to support testing.
public static partial class ObservableExtensions
{
public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
this IObservable<TSource> source,
TimeSpan maxInterval,
int maxBufferSize,
IScheduler scheduler)
{
if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
if (maxBufferSize <= 0)
throw new ArgumentOutOfRangeException(
"maxBufferSize", "maxBufferSize must be positive");
var publishedSource = source.Publish().RefCount();
return publishedSource.Buffer(
() => publishedSource
.Throttle(maxInterval, scheduler)
.Merge(publishedSource.Take(maxBufferSize).LastAsync()));
}
}
public class BufferNearEventsTests : ReactiveTest
{
[Test]
public void CloseEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int> expectedBuffer = new [] {1, 2, 3};
var expectedTime = maxInterval.Ticks + 300;
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
}
[Test]
public void FarEventsAreUnbuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(1000, 1),
OnNext(2000, 2),
OnNext(3000, 3));
IList<int>[] expectedBuffers =
{
new[] {1},
new[] {2},
new[] {3}
};
var expectedTimes = new[]
{
maxInterval.Ticks + 1000,
maxInterval.Ticks + 2000,
maxInterval.Ticks + 3000
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
}
[Test]
public void UpToMaxEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 2;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int>[] expectedBuffers =
{
new[] {1,2},
new[] {3}
};
var expectedTimes = new[]
{
200, /* Buffer cap reached */
maxInterval.Ticks + 300
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
}
private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
{
CollectionAssert.AreEquivalent(expected, actual);
return true;
}
}
这篇关于使用的IObservable批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!