C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息 链接-MQTT订阅接收消息
  2. 反写内容写入通信类,添加到写入队列中
  3. 实现Modbustcp通信写入

具体实现

2. 消息内容写入通信类,添加到写入队列中

在服务类DAqService中添加通信集合_modbusTcps用于存储每个设备的通信类,使用键值对Dictionary存储设备ID和通信类,用于快速查找

然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl

    public class DAqService
    {
         public static string MainTopic = "DTSDAQ/";
         
         private Dictionary<string, ModbusTcp> _modbusTcps;
         
         public DAqService(DAqOption option)
        {
            _modbusTcps = new Dictionary<string, ModbusTcp>();
           //...
        }

        /// <summary>
        /// 启动服务
        /// </summary>
        public void Start()
        {
            MqttControllor = new MqttControllor(_option.MqttConfig);

            foreach (var item in _deviceLinks)
            {
                ModbusTcp modbusTcp = new ModbusTcp(item);
                modbusTcp.DoMonitor();
                modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;
                //将
                _modbusTcps.Add(item.UID, modbusTcp);
                MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);
            }

            if (_serviceConfig.IsPushScheduled)
            {
                timer.Start();
            }
        }
    }

实现消息订阅方法-设备控制DeviceControl

处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法

 <summary>
/// 设备控制,反写
/// </summary>
/// <param name="topic"></param>
/// <param name="msg"></param>
private void DeviceControl(string topic, string msg)
{
    var message = JsonSerializer.Deserialize<DeviceMessage>(msg);
    //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值
    if (message != null)
    {
        var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象
        if (link != null)
        {
            var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象
            //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值
            foreach (var item in message.Data)
            {
                var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象
                if (point != null)
                {
                    var parseMethod = point.Type.GetMethod(
                        "Parse",
                        BindingFlags.Public | BindingFlags.Static,
                        new[] { typeof(string) }
                    );
                    point.WriteValue = parseMethod.Invoke(
                        null,
                        new object[] { item.Value.ToString() }
                    ); //通过点位id找到对应的点位对象
                }
                modbusTcp.Write(point);
            }
        }
    }
}

在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入

 public class ModbusTcp
 {
         /// <summary>
         /// 写入队列
         /// </summary>
         private Queue<RegisterPoint> _writeQueue = new Queue<RegisterPoint>();
         
         ·
         //写入值先加入一个队列
          public void Write(RegisterPoint point)
          {
              _writeQueue.Enqueue(point);
          }
 }

完整代码

public class DAqService
{
    public static string MainTopic = "DTSDAQ/";

    private MqttControllor MqttControllor;
    private Dictionary<string, ModbusTcp> _modbusTcps;

    private DAqOption _option;
    private List<DeviceLink> _deviceLinks;
    private ServiceConfig _serviceConfig;

    private System.Timers.Timer timer;

    public DAqService(DAqOption option)
    {
        _modbusTcps = new Dictionary<string, ModbusTcp>();
        _option = option;
        _deviceLinks = option.DeviceLinks;
        _serviceConfig = option.ServiceConfig;
        timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);
        timer.Elapsed += Timer_Elapsed;
    }

    /// <summary>
    /// 启动服务
    /// </summary>
    public void Start()
    {
        MqttControllor = new MqttControllor(_option.MqttConfig);

        foreach (var item in _deviceLinks)
        {
            ModbusTcp modbusTcp = new ModbusTcp(item);
            modbusTcp.DoMonitor();
            modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;
            _modbusTcps.Add(item.UID, modbusTcp);
            MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);
        }

        if (_serviceConfig.IsPushScheduled)
        {
            timer.Start();
        }
    }

    /// <summary>
    /// 设备控制,反写
    /// </summary>
    /// <param name="topic"></param>
    /// <param name="msg"></param>
    private void DeviceControl(string topic, string msg)
    {
        var message = JsonSerializer.Deserialize<DeviceMessage>(msg);
        //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值
        if (message != null)
        {
            var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象
            if (link != null)
            {
                var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象
                //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值
                foreach (var item in message.Data)
                {
                    var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象
                    if (point != null)
                    {
                        var parseMethod = point.Type.GetMethod(
                            "Parse",
                            BindingFlags.Public | BindingFlags.Static,
                            new[] { typeof(string) }
                        );
                        point.WriteValue = parseMethod.Invoke(
                            null,
                            new object[] { item.Value.ToString() }
                        ); //通过点位id找到对应的点位对象
                    }
                    modbusTcp.Write(point);
                }
            }
        }
    }

    private void Timer_Elapsed(object? sender, ElapsedEventArgs e)
    {
        foreach (var link in _deviceLinks)
        {
            try
            {
                DeviceMessage device = new DeviceMessage { DeviceId = link.UID };
                foreach (RegisterPoint point in link.Points)
                {
                    // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");
                    device.Data.Add(point.UID, point.Value);
                }
                var data = JsonSerializer.Serialize(device);
                MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }

    private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)
    {
        if (_serviceConfig.IsPushChanged)
        {
            try
            {
                DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };
                device.Data.Add(point.UID, value);
                var data = JsonSerializer.Serialize(device);
                MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        Console.WriteLine($"Point:{point.UID}-->Value:{value}");
    }
}
08-23 04:48