SuperSocket 2.0从入门到懵逼

1 使用SuperSocket 2.0在AspNetCore项目中搭建一个Socket服务器

1.1 引入SuperSocket 2.0

SuperSocket 2.0目前仍然处于Beta阶段, 但是功能基本可靠, 可以用于生产环境.

可以从Github源地址自行fork代码进行其它修改, 如需直接引用, 在默认的Nuget服务器是没有的, 需要添加作者自己的Nuget服务器https://www.myget.org/F/supersocket/api/v3/index.json, 获取预览版本.

1.2 在AspNetCore中搭建一个Socket服务器

使用SuperSocket 2.0快速搭建一个Socket服务器非常简单, 在框架中, 作者实现了一个SuperSocketHostBuilder进行构建, 我们要做的只是简单的配置一些参数和业务逻辑.

此外, SuperSocketHostBuilder是可以直接嵌入到AspNetCore项目的CreateHostBuilder上的, 但是并不推荐这么做...

  1. 配置数据包和过滤器/选择器

    • 数据包, 即我们进行Socket通信时的数据包, 可以是包对象, 也可以是byte[], 如果是包对象, 则需要在过滤器中配置解码器.
    • 过滤器, 作用是对数据包进行筛选然后截取一包数据, 可以将解码器挂载进来, 直接将二进制数据映射为对象.
    • 选择器, 我们可以使用选择器, 来实现一个端口服务于多种协议的情况, 此时, 一个选择器则会搭配多个过滤器进行使用.
  2. 配置IP和端口

    • 配置IP和端口可以选择两种方式, 一种方式是从程序写入, 另一种是从配置文件中写入.
    • 在本系列的介绍中, 我们大多数采用程序写入的方式, 从配置文件写入的方式, 后续会采用其它方式实现, 来扩展业务.
  3. 配置Session

    • 在程序中作者内置了IAppSessionAppSession供我们使用, 如果我们需要自定义Session, 则需要继承AppSession并且在程序中进行引用, 即可切换为我们定义的Session.
    • 自定义Session时, 由于在程序中大多数提供的参数都为IAppSession, 所以, 需要实现SuperSocket的更多其它接口进行重写, 来维持程序运转.
    • 自定义Session大多数时候是为了添加更多自定义属性, 作者在设计中提供了另外的方式提供我们选择:

      // AppSession源码
      public class AppSession : IAppSession, ILogger, ILoggerAccessor
      {
          //......
      
          private Dictionary<object, object> _items;
      
          public object this[object name]
          {
              get
              {
                  var items = _items;
      
                  if (items == null)
                      return null;
      
                  object value;
      
                  if (items.TryGetValue(name, out value))
                      return value;
      
                  return null;
              }
      
              set
              {
                  lock (this)
                  {
                      var items = _items;
      
                      if (items == null)
                          items = _items = new Dictionary<object, object>();
      
                      items[name] = value;
                  }
              }
          }
      
          //......
      }

      可以看到, 其中内置了一个字典, 我们可以将属性的Key-Value值直接存在字典中, 即session["prop"] = value;的形式.

  4. 配置SessionHandler

    • SessionHandler会在建立和断开Socket连接时触发, 用于处理连接和断开时的业务, 可以在SuperSocketHostBuilder中直接配置, 也可以通过重写相应的接口实现.
    • 连接时, 会将创建好的Session传入该方法.
    • 断开时, 会将断开的Session以及触发断开的事件传入该方法.
  5. 配置PackageHandler

    • PackageHandler会在接收到数据包时触发.
    • 该方法自动触发时我们将获取两个参数, 一个是Session, 一个是Package. 但是要注意的是, 这里的Session是IAppSession类型, 并不是我们自定义的Session.

      • Session即为当前Socket连接, 里面附带了各种连接信息以及状态等.
      • Package是通过过滤器后得到的数据包, 不过要注意的是, 例如:如果数据包为头尾标识符的数据包, 如果采用的是byte[]的形式, 得到的可能不是原包, 而是去除了包头尾标识后的数据体. 即:7E 01 02 03 7E, 去掉头尾的7E标识得到的是01 02 03.
  6. Build & Run

    • 构建这里同时提供了几种方式, 推荐采用BuildAsServer(), 然后通过StartAsync()进行启用.

此时, 一个Socket服务器就搭建完成了. 具体实现:

// SampleSession
public class SampleSession: AppSession
{

}
// Socket Server代码
var tcpHost = SuperSocketHostBuilder.Create<byte[], PipelineFilter>()
    .ConfigureSuperSocket(options =>
    {
        options.AddListener(new ListenOptions
        {
            Ip = "Any",
            Port = 4040
        })
        .AddListener(new ListenOptions()
        {
            Ip = "Any",
            Port = 8888
        });
    })
    .UseSession<JT808TcpSession>()
    .UseClearIdleSession()
    .UseSessionHandler(s =>
    {
    })
    .UsePackageHandler(async (s, p) =>
    {
        //解包/应答/转发
    })
    .ConfigureErrorHandler((s, v) =>
    {
    })
    .UseMiddleware<InProcSessionContainerMiddleware>()
    .UseInProcSessionContainer()
    .BuildAsServer();

await tcpHost.RunAsync();

.UseClearIdleSession()请务必调用, 在使用各类Socket框架时, 不可避免的我们的应用程序都会维持大量的僵尸连接, SuperSocket中提供了UseClearIdleSession()来自动复用已经闲置或者失去连接的资源.

2 基本的协议概念

2.1 基本协议种类

2.1.1 固定头格式协议

顾名思义, 这类协议的Header是固定的, 并且一般Header的长度是固定的, 但是也有例外情况, 此外, Header中也会包含数据体的长度等信息. 然后可以根据长度来截取一包数据.

2.1.2 固定头尾标识协议

这类协议的数据包, 前几字节和后几字节是固定的, 这样就可以通过头尾的标识来截取一包数据.

通常, 这类协议的数据包, 为了避免数据内容和协议头、尾的标识冲突, 通常会设置转义, 即将数据包中出现头尾标识的地方, 转义为其它数据, 避免识别时出现错误.

2.1.3 固定包大小协议

这类协议每一包数据的大小都是固定的, 所以可以直接根据长度进行读取.

2.1.4 命令行协议

这类协议通常以\r\n结尾, 采用字符串转为二进制流进行传输.

2.1.5 一些其它协议

PS: 关于协议的一些硬件厂商的私有协议比较奇葩, 他们的协议五花八门的...不过我们这里不做阐述, 有时间我会再讲

3 SuperSocket中的几个基本概念

3.1 Package Type

Package Type即包类型, 这里描述的是数据包的结构, 例如SuperSocket中就提供了一些基础的包类型TextPackageInfo等.

public class TextPackageInfo
{
    public string Text{get; set;}
}

这里的TextPackageInfo标识了这类类型的数据包中, 仅包含了一个字符串, 当然, 我们通常会有更复杂的网络数据包结构.例如, 我将在下列展示一个包含首尾标识的通信包, 它包含了首尾标识, 消息号, 终端Id, 以及消息体:

public class SamplePackage
{
    public byte Begin{get; set;}

    public MessageId MessageId{get; set;}

    public string TerminalId{get; set;}

    public SampleBody Body{get; set;}

    public byte End{get; set;}
}

当然, 在SuperSocket中也提供了一些接口供我们实现一些类似格式的包, 不过个人不太喜欢这种方式, 官方文档也举了一些例子, 例如,有的包会有一个特殊的字段来代表此包内容的类型. 我们将此字段命名为 "Key". 此字段也告诉我们用何种逻辑处理此类型的包. 这是在网络应用程序中非常常见的一种设计. 例如,你的 Key 字段是整数类型,你的包类型需要实现接口IKeyedPackageInfo

public class MyPackage : IKeyedPackageInfo<int>
{
    public int Key { get; set; }

    public short Sequence { get; set; }

    public string Body { get; set; }
}

3.2 PipelineFilter Type

这种类型在网络协议解析中作用重要. 它定义了如何将 IO 数据流解码成可以被应用程序理解的数据包. 换句话说, 就是把你的二进制流数据, 能够一包一包的识别出来, 同时可以解析成你构建的Package对象. 当然, 你也可以选择不构建, 然后将源数据直接返回.

这些是 PipelineFilter 的基本接口. 你的系统中至少需要一个实现这个接口的 PipelineFilter 类型.

public interface IPipelineFilter
{
    void Reset();

    object Context { get; set; }
}

public interface IPipelineFilter<TPackageInfo> : IPipelineFilter
    where TPackageInfo : class
{

    IPackageDecoder<TPackageInfo> Decoder { get; set; }

    TPackageInfo Filter(ref SequenceReader<byte> reader);

    IPipelineFilter<TPackageInfo> NextFilter { get; }

}

事实上,由于 SuperSocket 已经提供了一些内置的 PipelineFilter 模版,这些几乎可以覆盖 90% 的场景的模版极大的简化了你的开发工作. 所以你不需要完全从头开始实现 PipelineFilter. 即使这些内置的模版无法满足你的需求,完全自己实现PipelineFilter也不是难事.

3.3 使用PackageType和PipelineFilter Type创建SuperSocket

你定义好 Package 类型和 PipelineFilter 类型之后,你就可以使用 SuperSocketHostBuilder 创建 SuperSocket 宿主了。

var host = SuperSocketHostBuilder.Create<StringPackageInfo, CommandLinePipelineFilter>();

在某些情况下,你可能需要实现接口 IPipelineFilterFactory 来完全控制 PipelineFilter 的创建。

public class MyFilterFactory : PipelineFilterFactoryBase<TextPackageInfo>
{
    protected override IPipelineFilter<TPackageInfo> CreateCore(object client)
    {
        return new FixedSizePipelineFilter(10);
    }
}

然后在 SuperSocket 宿主被创建出来之后启用这个 PipelineFilterFactory:

var host = SuperSocketHostBuilder.Create<StringPackageInfo>();
host.UsePipelineFilterFactory<MyFilterFactory>();

4 SuperSocket中的PipelineFilter, 实现自己的PipelineFilter

4.1 内置的PipelineFilter模板

SuperSocket中内置了一些PipelineFilter模板, 这些模板几乎可以覆盖到90%的应用场景, 极大简化了开发工作, 所以不需要完全从头开始实现PipelineFilter. 即使这些内置的模板无法满足你的需求, 完全自己实现PipelineFilter.

SuperSocket提供了这些PipelineFilter模板:

  • TerminatorPipelineFilter (SuperSocket.ProtoBase.TerminatorPipelineFilter, SuperSocket.ProtoBase)
  • TerminatorTextPipelineFilter (SuperSocket.ProtoBase.TerminatorTextPipelineFilter, SuperSocket.ProtoBase)
  • LinePipelineFilter (SuperSocket.ProtoBase.LinePipelineFilter, SuperSocket.ProtoBase)
  • CommandLinePipelineFilter (SuperSocket.ProtoBase.CommandLinePipelineFilter, SuperSocket.ProtoBase)
  • BeginEndMarkPipelineFilter (SuperSocket.ProtoBase.BeginEndMarkPipelineFilter, SuperSocket.ProtoBase)
  • FixedSizePipelineFilter (SuperSocket.ProtoBase.FixedSizePipelineFilter, SuperSocket.ProtoBase)
  • FixedHeaderPipelineFilter (SuperSocket.ProtoBase.FixedHeaderPipelineFilter, SuperSocket.ProtoBase)

4.2 基于内置模板实现PipelineFilter

4.2.1 FixedHeaderPipelineFilter-头部格式固定并且包含内容长度的协议

这种协议讲请求定义为两大部分, 第一部分定义了包含第二部分长度等等基础信息, 我们通常称第一部分为头部.

例如, 我们有一个这样的协议: 头部包含 3 个字节, 第 1 个字节用于存储请求的类型, 后两个字节用于代表请求体的长度:

/// +-------+---+-------------------------------+
/// |request| l |                               |
/// | type  | e |    request body               |
/// |  (1)  | n |                               |
/// |       |(2)|                               |
/// +-------+---+-------------------------------+

根据此协议的规范, 我们可以使用如下代码定义包的类型:

public class MyPackage
{
    public byte Key { get; set; }

    public string Body { get; set; }
}

下一个是设计PipelineFilter:

public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
    public MyPipelineFilter()
        : base(3) // 包头的大小是3字节,所以将3传如基类的构造方法中去
    {

    }

    // 从数据包的头部返回包体的大小
    protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
    {
        var reader = new SequenceReader<byte>(buffer);
        reader.Advance(1); // skip the first byte
        reader.TryReadBigEndian(out short len);
        return len;
    }

    // 将数据包解析成 MyPackage 的实例
    protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
    {
        var package = new MyPackage();

        var reader = new SequenceReader<byte>(buffer);

        reader.TryRead(out byte packageKey);
        package.Key = packageKey;
        reader.Advance(2); // skip the length
        package.Body = reader.ReadString();

        return package;
    }
}

最后,你可通过数据包的类型和 PipelineFilter 的类型来创建宿主:

var host = SuperSocketHostBuilder.Create<MyPackage, MyPipelineFilter>()
    .UsePackageHandler(async (s, p) =>
    {
        // handle your package over here
    }).Build();

你也可以通过将解析包的代码从 PipelineFilter 移到 你的包解码器中来获得更大的灵活性:

public class MyPackageDecoder : IPackageDecoder<MyPackage>
{
    public MyPackage Decode(ref ReadOnlySequence<byte> buffer, object context)
    {
        var package = new MyPackage();

        var reader = new SequenceReader<byte>(buffer);

        reader.TryRead(out byte packageKey);
        package.Key = packageKey;
        reader.Advance(2); // skip the length
        package.Body = reader.ReadString();

        return package;
    }
}

通过 host builder 的 UsePackageDecoder 方法来在SuperSocket中启用它:

builder.UsePackageDecoder<MyPackageDecoder>();

4.2.3 另一种挂载解析器的方式

在Asp.Net Core Application我们可以new(), 直接注入或者采用工厂模式等方式, 向Host中注入协议解析器, 然后在过滤波器中进行使用.

public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
    public readonly PacketConvert _packageConvert;
    public MyPipelineFilter()
        : base(3) // 包头的大小是3字节,所以将3传如基类的构造方法中去
    {
        _packageConvert = new PackageConvert();
    }

    // 从数据包的头部返回包体的大小
    protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
    {
        var reader = new SequenceReader<byte>(buffer);
        reader.Advance(1); // skip the first byte
        reader.TryReadBigEndian(out short len);
        return len;
    }

    // 将数据包解析成 MyPackage 的实例
    protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
    {
        var package = _packageConvert.Deserialize<Package>(buffer);

        return package;
    }
}

PS: 过滤器中DecodePackage返回的buffer可能不是完整的包, 例如固定头尾结构的包中, 返回的buffer可能是去掉头尾的格式

7 扩展AppSession和SuperSocketService

7.1 扩展AppSession

在SuperSocket关于Socket的管理提供了SessionContainer供大家获取程序中的Session实例, 只需在构建中调用.UseMiddleware<InProcSessionContainerMiddleware>()UseInProcSessionContainer()即可通过AppSession.Server.SessionContainer()获取.

但是为了方便管理, 个人角色还是实现一个另外的SessionManager比较好, 这样可以更方便的集成到我们的Asp.Net Core Application中. 使用ConcurrentDictionary原子字典来存储, 可以避免一些读写上的死锁问题.

public class SessionManager<TSession> where TSession : IAppSession
{
    /// <summary>
    /// 存储的Session
    /// </summary>
    public ConcurrentDictionary<string, TSession> Sessions { get; private set; } = new();

    /// <summary>
    /// Session的数量
    /// </summary>
    public int Count => Sessions.Count;

    /// <summary>
    /// </summary>
    public SessionManager()
    {
    }

    public ConcurrentDictionary<string, TSession> GetAllSessions()
    {
        return Sessions;
    }

    /// <summary>
    /// 获取一个Session
    /// </summary>
    /// <param name="key"> </param>
    /// <returns> </returns>
    public virtual async Task<TSession> TryGet(string key)
    {
        return await Task.Run(() =>
        {
            Sessions.TryGetValue(key, out var session);
            return session;
        });
    }

    /// <summary>
    /// 添加或者更新一个Session
    /// </summary>
    /// <param name="key">     </param>
    /// <param name="session"> </param>
    /// <returns> </returns>
    public virtual async Task TryAddOrUpdate(string key, TSession session)
    {
        await Task.Run(() =>
        {
            if (Sessions.TryGetValue(key, out var oldSession))
            {
                Sessions.TryUpdate(key, session, oldSession);
            }
            else
            {
                Sessions.TryAdd(key, session);
            }
        });
    }

    /// <summary>
    /// 移除一个Session
    /// </summary>
    /// <param name="key"> </param>
    /// <returns> </returns>
    public virtual async Task TryRemove(string key)
    {
        await Task.Run(() =>
        {
            if (Sessions.TryRemove(key, out var session))
            {
            }
            else
            {
            }
        });
    }

    /// <summary>
    /// 通过Session移除Session
    /// </summary>
    /// <param name="sessionId"> </param>
    /// <returns> </returns>
    public virtual async Task TryRemoveBySessionId(string sessionId)
    {
        await Task.Run(() =>
        {
            foreach (var session in Sessions)
            {
                if (session.Value.SessionID == sessionId)
                {
                    Sessions.TryRemove(session);
                    return;
                }
            }
        });
    }

    /// <summary>
    /// 删除僵尸链接
    /// </summary>
    /// <returns> </returns>
    [Obsolete("该方法丢弃", true)]
    public virtual async Task TryRemoveZombieSessions()
    {
        await Task.Run(() =>
        {
        });
    }

    /// <summary>
    /// 移除所有Session
    /// </summary>
    /// <returns> </returns>
    public virtual async Task TryRemoveAll()
    {
        await Task.Run(() =>
        {
            Sessions.Clear();
        });
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <param name="buffer">  </param>
    /// <returns> </returns>
    public virtual async Task SendAsync(TSession session, ReadOnlyMemory<byte> buffer)
    {
        if (session == null)
        {
            throw new ArgumentNullException(nameof(session));
        }
        await session.SendAsync(buffer);
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <param name="message"> </param>
    /// <returns> </returns>
    public virtual async Task SendAsync(ClientSession session, string message)
    {
        if (session == null)
        {
            throw new ArgumentNullException(nameof(session));
        }
        // ReSharper disable once PossibleNullReferenceException
        await session.SendAsync(message);
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <returns> </returns>
    public virtual async Task<Guid> FindIdBySession(TSession session)
    {
        return await Task.Run(() =>
        {
            return Guid.Parse(Sessions.First(x => x.Value.SessionID.Equals(session.SessionID)).Key);
        });
    }
}

7.2 如何自己实现SuperSocketService?

我们在使用SuperSocket时需要在Program.cs中来构建, 这样会导致一个问题, 这样我们的SuperSocket服务就会变得难以控制, 那么有没有一种写法来将这部分代码抽离出来呢?

答案是有的, 我们可以采用.Net Core中的BackgroundService或者IHostedService来实现后台服务, 甚至将这些服务管理起来, 根据需要随时创建, 随时启动, 随时停止. 这样做的好处还有, 我们可以随时获取依赖注入的服务来做一些更多的操作, 例如读取配置, 管理Session, 配置编解码器, 日志, 应答器, MQ等等.

public class TcpSocketServerHostedService : IHostedService
{
    private readonly IOptions<ServerOption> _serverOptions;
    private readonly IOptions<KafkaOption> _kafkaOptions;
    private readonly ClientSessionManagers _clientSessionManager;
    private readonly TerminalSessionManager _gpsTrackerSessionManager;
    private readonly ILogger<TcpSocketServerHostedService> _logger;
    private readonly IGeneralRepository _generalRepository;
    private readonly NbazhGpsSerializer _nbazhGpsSerializer = new NbazhGpsSerializer();

    private static EV26MsgIdProducer _provider = null;

    /// <summary>
    /// Tcp Server服务
    /// </summary>
    /// <param name="serverOptions">            </param>
    /// <param name="kafkaOptions">             </param>
    /// <param name="clientSessionManager">     </param>
    /// <param name="gpsTrackerSessionManager"> </param>
    /// <param name="logger">                   </param>
    /// <param name="factory">                  </param>
    public TcpSocketServerHostedService(
        IOptions<ServerOption> serverOptions,
        IOptions<KafkaOption> kafkaOptions,
        ClientSessionManagers clientSessionManager,
        TerminalSessionManager gpsTrackerSessionManager,
        ILogger<TcpSocketServerHostedService> logger,
        IServiceScopeFactory factory)
    {
        _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
        _kafkaOptions = kafkaOptions;
        _clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
        _gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var host = SuperSocketHostBuilder.Create<NbazhGpsPackage, ProtocolPipelineSwitcher>()
            .ConfigureSuperSocket(opts =>
            {
                foreach (var listener in _serverOptions.Value.TcpListeners)
                {
                    opts.AddListener(new ListenOptions()
                    {
                        Ip = listener.Ip,
                        Port = listener.Port
                    });
                }
            })
            .UseSession<GpsTrackerSession>()
            .UseClearIdleSession()
            .UseSessionHandler(onClosed: async (s, v) =>
                {
                    try
                    {
                        // Session管理
                        await _gpsTrackerSessionManager.TryRemoveBySessionId(s.SessionID);
                    }
                    catch
                    {
                        // ignored
                    }
                })
            .UsePackageHandler(async (s, packet) =>
            {
                // 处理包
            })
            .UseInProcSessionContainer()
            .BuildAsServer();

        await host.StartAsync();

        await Task.CompletedTask;
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        try
        {
            await _gpsTrackerSessionManager.TryRemoveAll();
        }
        catch
        {
            // ignored
        }

        await Task.CompletedTask;
    }
}

实现服务后, 我们可以写扩展方法将服务注入.

/// <summary>
/// 服务器扩展
/// </summary>
public static class ServerBuilderExtensions
{
    /// <summary>
    /// 添加Tcp服务器
    /// </summary>
    /// <param name="services"> </param>
    /// <returns> </returns>
    public static IServiceCollection AddTcpServer(
        this IServiceCollection services)
    {
        services.AddSingleton<TerminalSessionManager>();
        services.AddHostedService<TcpSocketServerHostedService>();
        return services;
    }

    /// <summary>
    /// 添加Ws服务器
    /// </summary>
    /// <param name="services"> </param>
    /// <returns> </returns>
    public static IServiceCollection AddWsServer(
        this IServiceCollection services)
    {
        services.AddSingleton<ClientSessionManagers>();
        services.AddHostedService<WebSocketServerHostedService>();
        return services;
    }
}

8 扩展SuperSocket的功能

8.1 多协议切换

我们有时候会面对一种需求, 就是同一个接口, 需要接收不同的终端的协议包, 这样我们通常会根据协议的不同点来区分协议. PS: 协议最好是同类型协议, 并且有明显不同的特征!

具体的实现方式就是实现一个特殊的PipelineFilter, 在下列代码中, 我们将读取该包数据的第一个字节来分辨该协议为0x78 0x78类型开头的协议还是0x79 0x79开头的协议, 然后将标记移回改包开头, 然后将这一包数据交给对应的过滤器来进行解析:

// NbazhGpsPackage: 包编解码器
public class ProtocolPipelineSwitcher : PipelineFilterBase<NbazhGpsPackage>
{
    private IPipelineFilter<NbazhGpsPackage> _filter7878;
    private byte _beginMarkA = 0x78;

    private IPipelineFilter<NbazhGpsPackage> _filter7979;
    private byte _beginMarkB = 0x79;

    public ProtocolPipelineSwitcher()
    {
        _filter7878 = new EV26PipelineFilter7878(this);
        _filter7979 = new EV26PipelineFilter7979(this);
    }

    public override NbazhGpsPackage Filter(ref SequenceReader<byte> reader)
    {
        if (!reader.TryRead(out byte flag))
        {
            throw new ProtocolException(@"flag byte cannot be read");
        }

        if (flag == _beginMarkA)
        {
            NextFilter = _filter7878;
        }
        else if (flag == _beginMarkB)
        {
            NextFilter = _filter7979;
        }
        else
        {
            return null;
            //throw new ProtocolException($"首字节未知 {flag}");
        }

        // 将标记移回开头
        reader.Rewind(1);
        return null;
    }
}

9 搭建WebSocket服务器

WebSocket Server的实现方式与之前的Socket Server实现方式大致相同, 其中不同的地方主要为: WebSocket Server不需要配置编解码器, 采用String作为消息格式等.

/// <summary>
/// </summary>
public class WebSocketServerHostedService : IHostedService
{
    private readonly IOptions<ServerOption> _serverOptions;
    private readonly ClientSessionManagers _clientSessionManager;
    private readonly TerminalSessionManager _gpsTrackerSessionManager;
    private readonly IGeneralRepository _generalRepository;

    /// <summary>
    /// </summary>
    /// <param name="serverOptions">            </param>
    /// <param name="clientSessionManager">     </param>
    /// <param name="gpsTrackerSessionManager"> </param>
    /// <param name="factory">                  </param>
    public WebSocketServerHostedService(
        IOptions<ServerOption> serverOptions,
        ClientSessionManagers clientSessionManager,
        TerminalSessionManager gpsTrackerSessionManager,
        IServiceScopeFactory factory)
    {
        _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(_serverOptions));
        _clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
        _gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
        _generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
    }

    /// <summary>
    /// WebSocketServer
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var host = WebSocketHostBuilder.Create()
            .ConfigureSuperSocket(opts =>
            {
                foreach (var listener in _serverOptions.Value.WsListeners)
                {
                    opts.AddListener(new ListenOptions()
                    {
                        Ip = listener.Ip,
                        Port = listener.Port
                    });
                }
            })
            .UseSession<ClientSession>()
            .UseClearIdleSession()
            .UseSessionHandler(onClosed: async (s, v) =>
            {
                await _clientSessionManager.TryRemoveBySessionId(s.SessionID);
            })
            .UseWebSocketMessageHandler(async (s, p) =>
            {
                var package = p.Message.ToObject<ClientPackage>();

                if (package.PackageType == PackageType.Heart)
                {

                    return;
                }

                if (package.PackageType == PackageType.Login)
                {
                    var client = _generalRepository.FindAsync<User>(x => x.Id.Equals(Guid.Parse(package.ClientId)));

                    if (client is null)
                    {
                        await s.CloseAsync(CloseReason.ProtocolError, "ClientId不存在");
                    }

                    var verifyCode = Guid.NewGuid().ToString();
                    var loginPacket = new ClientPackage()
                    {
                        PackageType = PackageType.Login,
                        ClientId = package.ClientId,
                        VerifyCode = verifyCode,
                    };
                    s["VerifyCode"] = verifyCode;

                    var msg = loginPacket.ToJson();
                    await s.SendAsync(msg);
                }

                // 追踪
                if (package.PackageType == PackageType.Trace)
                {
                    return;
                }
            })
            .UseInProcSessionContainer()
            .BuildAsServer();

        await host.StartAsync();

        await Task.CompletedTask;
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await Task.CompletedTask;
    }
}

9.1 番外: WebSocket传参

WebSocket的第一次请求是基于Http进行建立链接的, 所以WebSocket是可以在url中或者请求体中携带token等参数的. 当然后端并不像写Api时那么简单的就可以获取, 需要截取当前请求的Url或者携带的信息, 然后进行读取, 进而进行验证等操作. 这部分的代码之后再补充...

//.Net Core从Url中读取参数

10 多服务器以及不同服务间的协同

得益于我们实现的SessionManager, 我们将不用ServerSessionManger注入DI后, 我们可以在任意ServerService中做到跨Service进行消息传递, 验证等等操作.

More 1 协议的编解码器开发预览

协议编解码器样例:

EV26 Gps通信协议(使用方法在xUnit测试中):

  1. Github
  2. Gitee

简单样例:

public class Nbazh0X01Test
{
    private readonly ITestOutputHelper _testOutputHelper;
    private NbazhGpsSerializer NbazhGpsSerializer = new NbazhGpsSerializer();

    public Nbazh0X01Test(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public void Test1()
    {
        //78 78 11 01 07 52 53 36 78 90 02 42 70 00 32 01 00 05 12 79 0D 0A

        var hex = "7878 11 01 07 52 53 36 78 90 02 42 7000 3201 0005 1279 0D0A".ToHexBytes();

        // ----协议解析部分----//
        var packet = NbazhGpsSerializer.Deserialize(hex);
        Nbazh0X01 body = (Nbazh0X01)packet.Bodies;
        // ----协议解析部分----//

        Assert.Equal(0x11, packet.Header.Length);
        Assert.Equal(0x01, packet.Header.MsgId);

        Assert.Equal("7 52 53 36 78 90 02 42".Replace(" ", ""), body.TerminalId);
        Assert.Equal(0x7000, body.TerminalType);
        //Assert.Equal(0x3201, body.TimeZoneLanguage.Serialize());

        Assert.Equal(0x0005, packet.Header.MsgNum);
        Assert.Equal(0x1279, packet.Header.Crc);

        // 时区 0011 001000000001
    }
}

More 2 DotNetty

以后我们会探究DotNetty与SuperSocket的异同.

03-05 15:10