TCP Server是使用SocketAsyncEventArgs开发的,它是Windows服务的异步方法。我在Main的开头有以下两行代码:
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
两者都返回true(记录返回的值)。现在,从2000到3000个客户端开始向该服务器发送消息,并且它开始接受连接(我计算了连接数,这与预期的一样-有一个连接池)。服务器进程的线程数将增长到〜2050至〜3050。到现在为止还挺好!
现在有一个Received方法,该方法将在ReceiveAsync返回true之后或由SocketAsyncEventArgs的Completed事件调用。
问题就从这里开始了:无论连接了多少客户端以及它们发送了多少消息,“接收”一秒钟最多被调用20次!随着客户数量的增加,该数量(20)下降到约10。
环境:在同一台计算机上模拟TCP Server和客户端。我已经在2台计算机上测试了代码,其中一台具有2核CPU和4GB RAM,另一台具有8核CPU和12GB RAM。目前还没有数据丢失,有时在每个接收操作中我会收到多于一条消息。没关系。但是如何增加接收操作的数量呢?
有关实现的其他说明:该代码很大,并且包含许多不同的逻辑。总体描述如下:我有一个SocketAsyncEventArgs用于接受新连接。效果很好。现在,对于每个新的接受的连接,我创建一个新的SocketAsyncEventArgs来接收数据。我把这个(为接收而创建的SocketAsyncEventArgs)放在一个池中。它不会被重用,但是它的UserToken被用于跟踪连接。例如,那些断开连接或7分钟未发送任何数据的连接将被关闭和处置(SocketAsyncEventArgs的AcceptSocket将被关闭,关闭和处置,并且SocketAsyncEventArgs对象本身也将被处置)。这是一个执行这些任务的Sudo类,但是所有其他逻辑,日志记录和错误检查以及所有其他内容都已删除,以使其变得简单明了(也许这样就更容易发现有问题的代码):
class Sudo
{
Socket _listener;
int _port = 8797;
public Sudo()
{
var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(ipEndPoint);
_listener.Listen(100);
Accept(null);
}
void Accept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptCompleted;
}
else acceptEventArg.AcceptSocket = null;
bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
if (!willRaiseEvent) Accepted(acceptEventArg);
}
void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
Accepted(e);
}
void Accepted(SocketAsyncEventArgs e)
{
var acceptSocket = e.AcceptSocket;
var readEventArgs = CreateArg(acceptSocket);
var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
Accept(e);
if (!willRaiseEvent) Received(readEventArgs);
}
SocketAsyncEventArgs CreateArg(Socket acceptSocket)
{
var arg = new SocketAsyncEventArgs();
arg.Completed += IOCompleted;
var buffer = new byte[64 * 1024];
arg.SetBuffer(buffer, 0, buffer.Length);
arg.AcceptSocket = acceptSocket;
arg.SocketFlags = SocketFlags.None;
return arg;
}
void IOCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
Received(e);
break;
default: break;
}
}
void Received(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
{
// Kill(e);
return;
}
var bytesList = new List<byte>();
for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
var bytes = bytesList.ToArray();
Process(bytes);
ReceiveRest(e);
Perf.IncOp();
}
void ReceiveRest(SocketAsyncEventArgs e)
{
e.SocketFlags = SocketFlags.None;
for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
e.SetBuffer(0, e.Buffer.Length);
var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
if (!willRaiseEvent) Received(e);
}
void Process(byte[] bytes) { }
}
最佳答案
之所以变慢,是因为这些线程中的每个线程都需要进行上下文切换,这是一个相对昂贵的操作。添加的线程越多,仅用于上下文切换而不是实际代码中的CPU所占的比例就越高。
您已经以一种非常奇怪的方式解决了每个客户端一个线程的瓶颈。服务器端异步的全部要点是减少线程数-每个客户端不具有一个线程,但理想情况下,系统中的每个逻辑处理器只需要一个或两个线程。
您发布的异步代码看起来不错,因此我只能猜测您的Process
方法可以轻松忽略非异步,从而阻止其中的I/O,即数据库或文件访问。当I/O阻塞时,.NET线程池会检测到此情况并自动启动一个新线程-基本上,这种情况在螺旋式增长中失去了控制,Process
中的I/O成为了瓶颈。
异步管道实际上需要100%异步才能从中获得任何重大 yield 。 Half-in可以让您编写复杂的代码,其性能与简单的同步代码一样差。
如果您绝对不能完全使Process
方法异步,则可能有些伪造它。让事情在队列中等待由有限大小的小型线程池进行处理。