使用.NET的 BlockingCollection<T>来包装一个
ConcurrentQueue<T>来实现golang的channel。
代码如下:
public class Channel<T>
{
private BlockingCollection<T> _buffer; public Channel() : this() { }
public Channel(int size)
{
_buffer = new BlockingCollection<T>(new ConcurrentQueue<T>(), size);
} public bool Send(T t)
{
try
{
_buffer.Add(t);
}
catch (InvalidOperationException)
{
// will be thrown when the collection gets closed
return false;
}
return true;
} public bool Receive(out T val)
{
try
{
val = _buffer.Take();
}
catch (InvalidOperationException)
{
// will be thrown when the collection is empty and got closed
val = default(T);
return false;
}
return true;
} public void Close()
{
_buffer.CompleteAdding();
} public IEnumerable<T> Range()
{
T val;
while (Receive(out val))
{
yield return val;
}
}
}
测试程序
[TestCase]
public void TestSPSC_Performance()
{
int numItems = ;
int numIterations = ; var stopWatch = new Stopwatch();
stopWatch.Start();
for (int i = ; i < numIterations; ++i)
{
var channel = new Channel<int>();
var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(, numItems)) { channel.Send(num); } channel.Close(); });
var reader = Task.Factory.StartNew<List<int>>(() => { var res = new List<int>(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; });
Task.WaitAll(writer, reader);
}
stopWatch.Stop(); var elapsedMs = stopWatch.Elapsed.TotalMilliseconds;
Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems, elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations);
}