问题描述
我有一种方法可以并行多次调用WCF服务.为了防止目标系统过载,我想使用PLinq的功能来限制并行执行的次数.现在,我想知道如何有效地重写我的方法.
这是我当前的实现方式:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
getWorkOrderForResourceDataSetResponse worResp = await client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
);
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
// Do some more stuff with the result
}
我已经尝试了一些使用PLinq的方法,但是我还没有提出合适的解决方案.这是我目前的状态.
var p = companyGroup.Select(res => client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
).ContinueWith(worResp => {
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.Result.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
await Task.WhenAll(p.AsParallel().ToArray());
很明显,这不能正常工作.您是否有任何建议使其正常有效地工作,并将对服务器的最大调用数限制为并行8个?
PLINQ仅适用于同步代码.它具有一些不错的内置旋钮,用于控制并发 parallel 操作的数量.
要控制并发异步操作的数量,请使用SemaphoreSlim
:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
var limiter = new SemaphoreSlim(10); // or however many you want to limit to.
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
await limiter.WaitAsync();
try {
getWorkOrderForResourceDataSetResponse worResp = ...
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}
finally {
limiter.Release();
}
}));
// Do some more stuff with the result
}
有关更多信息,请参见我的书中的食谱12.5,其中涵盖了几种不同的节流解决方案. /p>
I'm having a method which calls a WCF service multiple times in parallel. To prevent an overload on the target system, I want to use PLinq's ability to limit the number of parallel executions. Now I wonder how I could rewrite my method in an efficient way.
Here's my current implementation:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
getWorkOrderForResourceDataSetResponse worResp = await client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
);
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
// Do some more stuff with the result
}
I already tried some approaches to use PLinq instead, but I don't quite come up with a proper solution. This is my current state.
var p = companyGroup.Select(res => client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
).ContinueWith(worResp => {
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.Result.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
await Task.WhenAll(p.AsParallel().ToArray());
This, quite obviously, doesn't work properly. Do you have any suggestions to make it work properly and efficient and to limit the maximum calls to the server so 8 in parallel?
PLINQ only works with synchronous code. It has some nice built-in knobs for controlling the number of concurrent parallel operations.
To control the number of concurrent asynchronous operations, use SemaphoreSlim
:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
var limiter = new SemaphoreSlim(10); // or however many you want to limit to.
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
await limiter.WaitAsync();
try {
getWorkOrderForResourceDataSetResponse worResp = ...
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}
finally {
limiter.Release();
}
}));
// Do some more stuff with the result
}
For more information, see recipe 12.5 in my book, which covers several different solutions for throttling.
这篇关于用PLinq替换Task.WhenAll的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!