Cowboy.WebSockets 是一个托管在 GitHub 上的基于 .NET/C# 实现的开源 WebSocket 网络库,其完整的实现了 RFC 6455 (The WebSocket Protocol) 协议标准,并部分实现了 RFC 7692 (Compression Extensions for WebSocket) 协议标准。

WebSocket 可理解为建立在 TCP 连接通道上的更进一步的握手,并确定了消息封装格式。

Cowboy 开源 WebSocket 网络库-LMLPHP

通过定义控制帧 (Control Frame) 和数据帧 (Data Frame) 来控制通道内的通信和数据传输,下图用使用 ABNF 格式描述了帧头部的格式。

Cowboy 开源 WebSocket 网络库-LMLPHP

Cowboy.WebSockets 中对于 WebSocket 的 Client/Server 分别做了实现,分别对应代码中的:

Cowboy.WebSockets 的内部实现是基于 Cowboy.Sockets 中的 TAP 模式的 AsyncTcpSocketServer 和 AsyncTcpSocketClient 。关于 Cowboy.Sockets 可以参考文章《C#高性能TCP服务的多种实现方式》。

可通过 NuGet 查找 Cowboy 来获取 nuget 包。

Cowboy 开源 WebSocket 网络库-LMLPHP

WebSocket 服务端应用

实现 AsyncWebSocketServerModule 抽象类,其中 ModulePath 对应着 "ws://host:port/path" 中的 path 部分。可以实现多个 Module,将多个 Module 注入到 AsyncWebSocketServerModuleCatalog 中,或者采用反射机制等自动发现 Module。

  public class TestWebSocketModule : AsyncWebSocketServerModule
{
public TestWebSocketModule()
: base(@"/test")
{
} public override async Task OnSessionStarted(AsyncWebSocketSession session)
{
Console.WriteLine(string.Format("WebSocket session [{0}] has connected.", session.RemoteEndPoint));
await Task.CompletedTask;
} public override async Task OnSessionTextReceived(AsyncWebSocketSession session, string text)
{
Console.Write(string.Format("WebSocket session [{0}] received Text --> ", session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}", text)); await session.SendTextAsync(text);
} public override async Task OnSessionBinaryReceived(AsyncWebSocketSession session, byte[] data, int offset, int count)
{
var text = Encoding.UTF8.GetString(data, offset, count);
Console.Write(string.Format("WebSocket session [{0}] received Binary --> ", session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}", text)); await session.SendBinaryAsync(Encoding.UTF8.GetBytes(text));
} public override async Task OnSessionClosed(AsyncWebSocketSession session)
{
Console.WriteLine(string.Format("WebSocket session [{0}] has disconnected.", session.RemoteEndPoint));
await Task.CompletedTask;
}
}

实例化 AsyncWebSocketServer,并将 AsyncWebSocketServerModuleCatalog 实例注入,即可启动 WebSocket 的服务端监听。

  class Program
{
static AsyncWebSocketServer _server; static void Main(string[] args)
{
NLogLogger.Use(); try
{
var catalog = new AsyncWebSocketServerModuleCatalog();
catalog.RegisterModule(new TestWebSocketModule()); var config = new AsyncWebSocketServerConfiguration();
//config.SslEnabled = true;
//config.SslServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx", "Cowboy");
//config.SslPolicyErrorsBypassed = true; _server = new AsyncWebSocketServer(, catalog, config);
_server.Listen(); Console.WriteLine("WebSocket server has been started on [{0}].", _server.ListenedEndPoint);
Console.WriteLine("Type something to send to clients...");
while (true)
{
try
{
string text = Console.ReadLine();
if (text == "quit")
break;
Task.Run(async () =>
{
//await _server.BroadcastText(text);
//Console.WriteLine("WebSocket server [{0}] broadcasts text -> [{1}].", _server.ListenedEndPoint, text);
await _server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text));
Console.WriteLine("WebSocket server [{0}] broadcasts binary -> [{1}].", _server.ListenedEndPoint, text);
});
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
} _server.Shutdown();
Console.WriteLine("WebSocket server has been stopped on [{0}].", _server.ListenedEndPoint);
}
catch (Exception ex)
{
Logger.Get<Program>().Error(ex.Message, ex);
} Console.ReadKey();
}
}

WebSocket 客户端应用

客户端侧在实例化 AsyncWebSocketClient 时有两种方式:

  1. 实现 IAsyncWebSocketClientMessageDispatcher 接口;
  2. 直接构造函数注入接受各种事件的 Func<> 实现;
  public interface IAsyncWebSocketClientMessageDispatcher
{
Task OnServerConnected(AsyncWebSocketClient client);
Task OnServerTextReceived(AsyncWebSocketClient client, string text);
Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count);
Task OnServerDisconnected(AsyncWebSocketClient client); Task OnServerFragmentationStreamOpened(AsyncWebSocketClient client, byte[] data, int offset, int count);
Task OnServerFragmentationStreamContinued(AsyncWebSocketClient client, byte[] data, int offset, int count);
Task OnServerFragmentationStreamClosed(AsyncWebSocketClient client, byte[] data, int offset, int count);
}

下面的 DEMO 采用了方式二。

  class Program
{
static AsyncWebSocketClient _client; static void Main(string[] args)
{
NLogLogger.Use(); Task.Run(async () =>
{
try
{
var config = new AsyncWebSocketClientConfiguration();
//config.SslTargetHost = "Cowboy";
//config.SslClientCertificates.Add(new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer"));
//config.SslPolicyErrorsBypassed = true; //var uri = new Uri("ws://echo.websocket.org/");
//var uri = new Uri("wss://127.0.0.1:22222/test");
var uri = new Uri("ws://127.0.0.1:22222/test");
_client = new AsyncWebSocketClient(uri,
OnServerTextReceived,
OnServerBinaryReceived,
OnServerConnected,
OnServerDisconnected,
config);
await _client.Connect(); Console.WriteLine("WebSocket client has connected to server [{0}].", uri);
Console.WriteLine("Type something to send to server...");
while (_client.State == WebSocketState.Open)
{
try
{
string text = Console.ReadLine();
if (text == "quit")
break;
Task.Run(async () =>
{
//await _client.SendText(text);
//Console.WriteLine("Client [{0}] send text -> [{1}].", _client.LocalEndPoint, text);
await _client.SendBinaryAsync(Encoding.UTF8.GetBytes(text));
Console.WriteLine("Client [{0}] send binary -> [{1}].", _client.LocalEndPoint, text);
}).Forget();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
} await _client.Close(WebSocketCloseCode.NormalClosure);
Console.WriteLine("WebSocket client has disconnected from server [{0}].", uri);
}
catch (Exception ex)
{
Logger.Get<Program>().Error(ex.Message, ex);
}
}).Wait(); Console.ReadKey();
} private static async Task OnServerConnected(AsyncWebSocketClient client)
{
Console.WriteLine(string.Format("WebSocket server [{0}] has connected.", client.RemoteEndPoint));
await Task.CompletedTask;
} private static async Task OnServerTextReceived(AsyncWebSocketClient client, string text)
{
Console.Write(string.Format("WebSocket server [{0}] received Text --> ", client.RemoteEndPoint));
Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask;
} private static async Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count)
{
var text = Encoding.UTF8.GetString(data, offset, count);
Console.Write(string.Format("WebSocket server [{0}] received Binary --> ", client.RemoteEndPoint));
Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask;
} private static async Task OnServerDisconnected(AsyncWebSocketClient client)
{
Console.WriteLine(string.Format("WebSocket server [{0}] has disconnected.", client.RemoteEndPoint));
await Task.CompletedTask;
}
}

相关资料

本篇文章《Cowboy 开源 WebSocket 网络库》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。

04-23 21:29