在度过了令人沮丧和无聊的一天之后,我在这里发布以寻求帮助。
我正在使用第三方库,该库以未知方式启动网络连接(我确实知道这是非托管库的托管包装器)。通过调用事件StatusChanged(status)
,您可以了解连接状态。
因为显然调用网络是昂贵的,并且我的Service
可能不需要它,所以我注入了AsyncLazy<Connection>
,然后在必要时调用它。服务由ParallelForEachAsync
访问,这是我基于this post并发处理Tasks
的扩展。
如果顺序访问,一切都很好。任何并发,甚至两个并行任务都将在90%的时间内导致死锁。我知道这绝对与第三方库如何与我的代码交互有关,因为a)我无法使用相同的结构来重现效果,但不调用它,并且b)事件StatusChanged(Connecting)
正常接收,此时我假设网络操作已启动,并且我从未收到StatusChanged(Connected)
的回调。
这是代码结构的尽可能真实的再现,不幸的是不会重现死锁。
关于如何解决这个问题的任何想法?
class Program
{
static void Main(string[] args)
{
AsyncContext.Run(() => MainAsync(args));
}
static async Task MainAsync(string[] args)
{
var lazy = new AsyncLazy<Connection>(() => ConnectionFactory.Create());
var service = new Service(lazy);
await Enumerable.Range(0, 100)
.ParallelForEachAsync(10, async i =>
{
await service.DoWork();
Console.WriteLine("did some work");
}, CancellationToken.None);
}
}
class ConnectionFactory
{
public static Task<Connection> Create()
{
var tcs = new TaskCompletionSource<Connection>();
var session = new Session();
session.Connected += (sender, args) =>
{
Console.WriteLine("connected");
tcs.SetResult(new Connection());
};
session.Connect();
return tcs.Task;
}
}
class Connection
{
public async Task DoSomethinElse()
{
await Task.Delay(1000);
}
}
class Session
{
public event EventHandler Connected;
public void Connect()
{
Console.WriteLine("Simulate network operation with unknown scheduling");
Task.Delay(100).Wait();
Connected(this, EventArgs.Empty);
}
}
class Service
{
private static Random r = new Random();
private readonly AsyncLazy<Connection> lazy;
public Service(AsyncLazy<Connection> lazy)
{
this.lazy = lazy;
}
public async Task DoWork()
{
Console.WriteLine("Trying to do some work, will connect");
await Task.Delay(r.Next(0, 100));
var connection = await lazy;
await connection.DoSomethinElse();
}
}
public static class AsyncExtensions
{
public static async Task<AsyncParallelLoopResult> ParallelForEachAsync<T>(
this IEnumerable<T> source,
int degreeOfParallelism,
Func<T, Task> body,
CancellationToken cancellationToken)
{
var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
bool wasBroken = false;
var tasks =
from partition in partitions
select Task.Run(async () =>
{
using (partition)
{
while (partition.MoveNext())
{
if (cancellationToken.IsCancellationRequested)
{
Volatile.Write(ref wasBroken, true);
break;
}
await body(partition.Current);
}
}
});
await Task.WhenAll(tasks)
.ConfigureAwait(false);
return new AsyncParallelLoopResult(Volatile.Read(ref wasBroken));
}
}
public class AsyncParallelLoopResult
{
public bool IsCompleted { get; private set; }
internal AsyncParallelLoopResult(bool isCompleted)
{
IsCompleted = isCompleted;
}
}
编辑
我想我知道为什么会发生,但不确定如何解决。当上下文在等待
DoWork
时,DoWork
在等待惰性连接。这个丑陋的hack似乎可以解决它:
Connection WaitForConnection()
{
connectionLazy.Start();
var awaiter = connectionLazy.GetAwaiter();
while (!awaiter.IsCompleted)
Thread.Sleep(50);
return awaiter.GetResult();
}
还有更好的解决方案吗?
最佳答案
我怀疑第三方库需要某种形式的STA抽水。这在旧式异步代码中很常见。
我有一个可以尝试的类型AsyncContextThread
,将true
传递给构造函数以启用手动STA泵送。 AsyncContextThread
就像AsyncContext
一样,只不过它在新线程(在本例中为STA线程)中运行上下文。
static void Main(string[] args)
{
using (var thread = new AsyncContextThread(true))
{
thread.Factory.Run(() => MainAsync(args)).Wait();
}
}
要么
static void Main(string[] args)
{
AsyncContext.Run(() => async
{
using (var thread = new AsyncContextThread(true))
{
await thread.Factory.Run(() => MainAsync(args));
}
}
}
请注意,
AsyncContextThread
并非在所有STA方案中都适用。我在做(有些扭曲的)COM互操作时遇到了问题,需要真正的UI线程(WPF或WinForms线程)。由于某些原因,STA抽水不足以容纳那些COM对象。关于c# - 与第三方lib aka野鹅追逐异步TPL僵局,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/25177560/