本文介绍了C#-并发Foreach类线程系统的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

IObservable<Match> IObservableArray = new Regex("(.*):(.*)").Matches(file).OfType<Match>().ToList().ToObservable();
var query = IObservableArray.SelectMany(s => Observable.Start(() => {
    //do stuff
}));

上面的解释工作代码:上面的代码使用Observable with Reactive来执行并发多线程系统,同时将s保留为匹配项.

Working Code Above's Explanation: The code above uses Observable with Reactive to do a Concurrent Multi-Threading system while retaining s as a Match.

我的问题是,由于 IObservableArray 是一个很大的Matches数组,因此似乎甚至需要开始将//do 加载到内存中之前,所有内容都需要加载到内存中.很多内存导致它执行OutOfMemory异常.

My issue is that it seems to need to load everything into memory before even starting doing //do stuff since IObservableArray is a big array of Matches - this takes up a lot of the memory causing it to do a OutOfMemory Exception.

我已经研究了一个多月,我只能找到.Buffer(),如果将它放在.SelectMany()之前,然后foreach Match到s上,则无法将1000个Matches加载到内存中导致整体内存更好的时间.

I have been researching for more than a month and all I can find is .Buffer() which if I put it before the .SelectMany() and then foreach Match over the s, im able to load 1000 Matches into memory at a time causing the memory overall to be much better.

但是,由于我不得不诉诸于使用一次foreach一次遍历缓冲区中的所有1000,因此它不是并发的-意味着我基本上先检查另一个.

But, since I have to resort to using a foreach to go through all 1000 in the buffer at a time, it isnt concurrent - meaning im basically checking 1 after the other.

下面有没有办法做类似的代码,但是有并发/​​多线程的吗?(至少有150个并发运行,但现在不使用1000个将其全部加载到内存中.)

Is there a way to do similar code below, yet have it Concurrent/Multi-Threaded? (Have at least 150 running concurrently, but don't load all of it to memory, using 1000 at the moment.)

是的,我尝试使用thread.start等,使用它们使它更早地触发完成的代码,因为从技术上讲,它确实完成了,因为它完成了被告知将它们全部放入新线程的操作

IObservable<Match> IObservableArray = new Regex("(.*):(.*)").Matches(file).OfType<Match>().ToList().ToObservable();
var query = IObservableArray.Buffer(1000).SelectMany(s => Observable.Start(() => {
    //do stuff
}));
query.ObserveOn(ActiveForm).Subscribe(x =>
{
    //do finish stuff
});

推荐答案

对于这种工作, IEnumerable< T> IObservable< T> 更好.>.可枚举是您可以按需释放的东西,并在准备处理它们时取其值.相反,无论您是否能够处理负载,observable都会将其值强行推向您.

For this kind of work an IEnumerable<T> is a better match than an IObservable<T>. An enumerable is something that you can unwind on demand, and take its values when you are ready to process them. On the contrary an observable is something that forcefully pushes its values onto you, whether you are able to handle the load or not.

有多种方式可以并行处理具有特定并行度的 IEnumerable< T> .在提出任何建议之前,首先要问的是与每个 Match 相关的工作是同步的还是异步的.对于同步工作,最常用的工具是 Parallel 类, PLINQ TPL数据流库.下面是一个PLINQ示例:

There are numerous ways to process an IEnumerable<T> in parallel, with a specific degree of parallelism. Before suggesting anything, the first question to ask is whether the stuff you have to do with each Match is synchronous or asynchronous. For synchronous work the most commonly used tools are the Parallel class, the PLINQ, and the TPL Dataflow library. Below is a PLINQ example:

IEnumerable<Match> matches = RegexFindAllMatches(file, "(.*):(.*)");
Partitioner
    .Create(matches, EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(match =>
    {
        // Do stuff
    });
/// <summary>
/// Provides an enumerable whose elements are the successful matches found by
/// iteratively applying a regular expression pattern to the input string.
/// </summary>
public static IEnumerable<Match> RegexFindAllMatches(
    string input, string pattern, RegexOptions options = RegexOptions.None,
    TimeSpan matchTimeout = default)
{
    if (matchTimeout == default) matchTimeout = Regex.InfiniteMatchTimeout;
    var match = Regex.Match(input, pattern, options, matchTimeout);
    while (match.Success)
    {
        yield return match;
        match = match.NextMatch();
    }
}

以上实现避免了使用 Regex.Matches 方法以及随后的 MatchCollection 类,因为尽管该类在枚举期间懒惰地评估了下一个 Match ,但随后将找到的每个 Match 存储在内部 ArrayList 中(源代码).这可能会导致大量内存分配,与匹配总数成比例.

The above implementation avoids the use of the Regex.Matches method, and subsequently the MatchCollection class, because although this class evaluates lazily the next Match during the enumeration, it then stores each found Match in an internal ArrayList (source code). This could cause massive memory allocation, proportional to the total numbers of matches.

对于异步工作, Parallel 类和PLINQ并不是很好的选择(除非您愿意等待.NET 6 Parallel.ForEachAsync ),但是您仍然可以使用TPL Dataflow库.您还可以找到许多自定义选项此处此处.搜索 C#ForEachAsync 应该会显示更多选项.

For asynchronous work the Parallel class and the PLINQ are not good options (unless you are willing to wait for the .NET 6 Parallel.ForEachAsync), but you can still use the TPL Dataflow library. You can also find plenty of custom options here or here. Searching for C# ForEachAsync should reveal even more options.

这篇关于C#-并发Foreach类线程系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 18:44