问题描述
我正在使用 SemaphoreSlim
FIFO行为,现在我想向其中添加一个 Starve(int amount)
方法以从池中删除线程,这与 Release()
相反
I'm using a SemaphoreSlim
with a FIFO behaviour and now I want to add to it a Starve(int amount)
method to remove threads from the pool, sort of the opposite to Release()
.
如果有任何正在运行的任务,它们当然会一直持续到完成为止,因为此刻信号灯无法跟踪实际运行的内容,并且欠"该信号灯一个释放调用.
If there are any running tasks, they will of course continue until they are done, since for the moment the semaphore is not keeping track of what is actually running and "owes" the semaphore a release call.
原因是用户将随时动态控制给定信号量所允许的进程数.
The reason is that the user will dynamically control how many processes are allowed at any time for a given semaphore.
我遵循的策略是:
- 如果有可用线程,即
CurrentCount>0
,然后在不释放回来的情况下在SemaphoreSlim上调用Await()
. - 如果没有更多的线程可用,因为可能正在运行任务,甚至可能排队,那么下次调用
Release()
时,请忽略它以防止线程被释放(一个int变量会保持计数)
- if there are threads available, i.e.,
CurrentCount > 0
, then callAwait()
on the SemaphoreSlim without releasing back. - if there are no more threads available, because presumably tasks are running and potentially even queuing, then next time that
Release()
is called ignore it to prevent threads being released (an int variable keeps count)
我已经添加了到目前为止的代码.我正在努力解决的主要问题是如何确保线程安全,没有死锁以及没有令人惊讶的竞争条件.
I have added the code I have so far below. The main issues I'm struggling with are how to ensure thread safety, no deadlocks and no surprising race conditions.
鉴于我无法访问信号量的private lock(),我创建了一个新对象,至少可以尝试防止多个线程同时(在包装器内)操纵新变量.
Given that I cannot access the private lock() of the semaphore, I created a new object to at least try and prevent several threads to manipulate the new variables (within the wrapper) at the same time.
但是,我担心SemaphoreSlim内的其他变量(例如 CurrentCount
)也可能会改变一半并将事情弄乱...我希望 Release()中有锁
方法来防止对 CurrentCount
进行更改,但是也许我也应该将该锁应用于Wait和WaitAsync(这也可能会更改CurrentCount)?这可能还会导致两次调用Wait(?)之间不必要的锁定
However, I fear that other variables like CurrentCount
which are within the SemaphoreSlim could also change half way through and mess things up... I would expect the lock in the Release()
method to prevent changes to CurrentCount
, but maybe I should also apply the lock to the Wait and WaitAsync (which potentially could also change CurrentCount)? That would probably also result in uneccessary locks between two calls to Wait (?)
在这种情况下,对 semaphore.Wait()
的调用比 await semaphore.WaitAsync()
吗?
The call to semaphore.Wait()
is in this situation any better or worse than await semaphore.WaitAsync()
?
是否有更好的方法来扩展诸如SemaphoreSlim之类的功能,该类包含许多可能需要的私有变量或对访问有用的私有变量?
Are there any better ways to extend the functionality of a class such as SemaphoreSlim, which contains many private variables that potentially are needed or that would be useful to have access to?
我简要地考虑过创建一个从SemaphoreSlim继承的新类,或者研究扩展方法,也许使用反射来访问私有变量,但是似乎没有一个明显的或有效的.
I briefly considered creating a new class which inherits from SemaphoreSlim, or looking at extension methods, maybe using reflection to access the private variables,... but none seem to be obvious or valid.
public class SemaphoreQueue
{
private SemaphoreSlim semaphore;
private ConcurrentQueue<TaskCompletionSource<bool>> queue = new ConcurrentQueue<TaskCompletionSource<bool>>();
private int releasesToIgnore;
private object lockObj;
private const int NO_MAXIMUM = Int32.MaxValue; // cannot access SemaphoreSlim.NO_MAXIMUM
public SemaphoreQueue(int initialCount) : this(initialCount, NO_MAXIMUM) { }
public SemaphoreQueue(int initialCount, int maxCount)
{
semaphore = new SemaphoreSlim(initialCount, maxCount);
lockObj = new object();
releasesToIgnore = 0;
}
public void Starve(int amount)
{
lock (lockObj)
{
// a maximum of CurrentCount threads can be immediatelly starved by calling Wait without release
while ((semaphore.CurrentCount > 0) && (amount > 0))
{
semaphore.Wait();
amount -= 1;
}
// presumably there are still tasks running. The next Releases will be ignored.
if (amount > 0)
releasesToIgnore += amount;
}
}
public int Release()
{
return Release(1);
}
public int Release(int num)
{
lock (lockObj)
{
if (releasesToIgnore > num)
{
releasesToIgnore -= num;
return semaphore.CurrentCount;
}
else
{
int oldReleasesToIgnore = releasesToIgnore;
releasesToIgnore = 0;
return semaphore.Release(num - oldReleasesToIgnore);
}
}
}
public void Wait(CancellationToken token)
{
WaitAsync(token).Wait();
}
public Task WaitAsync(CancellationToken token)
{
var tcs = new TaskCompletionSource<bool>();
queue.Enqueue(tcs);
QueuedAwait(token);
return tcs.Task;
}
public int CurrentCount { get => this.semaphore.CurrentCount; }
private void QueuedAwait(CancellationToken token)
{
semaphore.WaitAsync(token).ContinueWith(t =>
{
TaskCompletionSource<bool> popped;
if (queue.TryDequeue(out popped))
popped.SetResult(true);
});
}
public void Dispose()
{
semaphore.Dispose();
}
}
推荐答案
我认为在 SemaphoreSlim
类存在问题,因为我们无法访问内置实现所使用的同步原语.因此,我建议仅使用 TaskCompletionSource
对象的队列来实现它.以下是缺少功能的基本实现. WaitAsync
方法缺少取消功能,而 Release
方法也缺少 releaseCount
参数.
I think that implementing a custom semaphore on top of the SemaphoreSlim
class is problematic, because we don't have access to the synchronization primitives used by the built-in implementation. So I would suggest to implement it using solely a queue of TaskCompletionSource
objects. Below is a basic implementation with missing features. The WaitAsync
method lacks cancellation, and the Release
method lacks the releaseCount
argument as well.
为简单起见,未使用 releasesToIgnore
计数器,而是允许现有的 currentCount
具有负值. Starve
方法只会减少此计数器.
For simplicity a releasesToIgnore
counter is not used, and instead the existing currentCount
is allowed to have negative values. The Starve
method just decreases this counter.
public class SemaphoreFifo
{
private readonly Queue<TaskCompletionSource<bool>> _queue
= new Queue<TaskCompletionSource<bool>>();
private readonly object _locker = new object();
private readonly int _maxCount;
private int _currentCount;
public SemaphoreFifo(int initialCount, int maxCount)
{
_currentCount = initialCount;
_maxCount = maxCount;
}
public SemaphoreFifo(int initialCount) : this(initialCount, Int32.MaxValue) { }
public int CurrentCount { get { lock (_locker) return _currentCount; } }
public async Task WaitAsync()
{
TaskCompletionSource<bool> tcs;
lock (_locker)
{
if (_currentCount > 0)
{
_currentCount--;
return;
}
tcs = new TaskCompletionSource<bool>();
_queue.Enqueue(tcs);
}
await tcs.Task;
}
public void Starve(int starveCount)
{
lock (_locker) _currentCount -= starveCount;
}
public void Release()
{
TaskCompletionSource<bool> tcs;
lock (_locker)
{
if (_currentCount < 0)
{
_currentCount++;
return;
}
if (_queue.Count == 0)
{
if (_currentCount >= _maxCount) throw new SemaphoreFullException();
_currentCount++;
return;
}
tcs = _queue.Dequeue();
}
tcs.SetResult(true);
}
}
这篇关于为SemaphoreSlim实现Starve方法("Unrelease"/"Hold")的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!