我正在编写一个C#(.NET 4.5)应用程序,该应用程序用于汇总基于时间的事件以进行报告。为了使查询逻辑可同时用于实时数据和历史数据,我使用了Reactive Extensions(2.0)及其IScheduler基础结构(HistoricalScheduler和好友)。

例如,假设我们创建了一个事件列表(按时间顺序排序,但它们可能是重合的!),其唯一的有效载荷带有时间戳,并希望知道它们在固定持续时间的缓冲区中的分布:

const int num = 100000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();

var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

var stream = Observable.Generate<int, DateTimeOffset>(
    0,
    s => s < events.Count,
    s => s + 1,
    s => events[s],
    s => events[s],
    time);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

运行此代码将生成带有以下堆栈跟踪的System.StackOverflowException(这是最后三行)
mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes
...

好的,问题似乎出在我对Observable.Generate()的使用上,取决于列表大小(num)以及不考虑调度程序的选择。

我究竟做错了什么?或更笼统地说,从提供自己时间戳的事件的IObservable创建IEnumerable的首选方法是什么?

最佳答案

(更新-意识到我没有提供其他选择:请参阅答案底部)

问题在于Observable.Generate的工作方式-用于根据参数展开corecursive(认为递归内翻)生成器;如果这些参数最终生成了一个非常嵌套的corecursive生成器,则将使您的堆栈崩溃。

从现在开始,我正在猜测很多(没有Rx源在我的面前)(请参阅下文),但是我敢打赌,您的定义最终会扩展为类似以下内容:

initial_state =>
generate_next(initial_state) =>
generate_next(generate_next(initial_state)) =>
generate_next(generate_next(generate_next(initial_state))) =>
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ...

一直这样,直到您的调用堆栈变得足够大到可以溢出为止。例如,方法签名+您的int计数器,大约相当于每个递归调用8-16个字节(更多取决于状态机生成器的实现方式),因此大约有60,000个声音(最大1M/16〜62500)深度)

编辑:拉起源-确认:Generate的“运行”方法如下所示-注意对Generate的嵌套调用:
protected override IDisposable Run(
    IObserver<TResult> observer,
    IDisposable cancel,
    Action<IDisposable> setSink)
{
    if (this._timeSelectorA != null)
    {
        Generate<TState, TResult>.α α =
                new Generate<TState, TResult>.α(
                     (Generate<TState, TResult>) this,
                     observer,
                     cancel);
        setSink(α);
        return α.Run();
    }
    if (this._timeSelectorR != null)
    {
        Generate<TState, TResult>.δ δ =
               new Generate<TState, TResult>.δ(
                   (Generate<TState, TResult>) this,
                   observer,
                   cancel);
        setSink(δ);
        return δ.Run();
    }
    Generate<TState, TResult>._ _ =
             new Generate<TState, TResult>._(
                  (Generate<TState, TResult>) this,
                  observer,
                  cancel);
    setSink(_);
    return _.Run();
}

编辑:德尔普,没有提供任何替代选择...这里可能可行:

(编辑:固定的Enumerable.Range,因此流大小不会乘以chunkSize)
const int num = 160000;
const int dist = 10;

var events = new List<DateTimeOffset>();
var curr = DateTimeOffset.Now;
var gap = new Random();
var time = new HistoricalScheduler(curr);

for (int i = 0; i < num; i++)
{
    events.Add(curr);
    curr += TimeSpan.FromMilliseconds(gap.Next(dist));
}

    // Size too big? Fine, we'll chunk it up!
const int chunkSize = 10000;
var numberOfChunks = events.Count / chunkSize;

    // Generate a whole mess of streams based on start/end indices
var streams =
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1)
    let startIdx = chunkIndex * chunkSize
    let endIdx = Math.Min(events.Count, startIdx + chunkSize)
    select Observable.Generate<int, DateTimeOffset>(
        startIdx,
        s => s < endIdx,
        s => s + 1,
        s => events[s],
        s => events[s],
        time);

    // E pluribus streamum
var stream = Observable.Concat(streams);

stream.Buffer(TimeSpan.FromMilliseconds(num), time)
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count));

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist));

关于c# - 为什么Observable.Generate()抛出System.StackOverflowException?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13462713/

10-13 02:08