C#实现数据采集系统-数据反写
实现步骤
- MQTT订阅,接收消息 链接-MQTT订阅接收消息
- 反写内容写入通信类,添加到写入队列中
- 实现Modbustcp通信写入
具体实现
2. 消息内容写入通信类,添加到写入队列中
在服务类DAqService
中添加通信集合_modbusTcps
用于存储每个设备的通信类,使用键值对Dictionary
存储设备ID和通信类,用于快速查找
然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl
public class DAqService
{
public static string MainTopic = "DTSDAQ/";
private Dictionary<string, ModbusTcp> _modbusTcps;
public DAqService(DAqOption option)
{
_modbusTcps = new Dictionary<string, ModbusTcp>();
//...
}
/// <summary>
/// 启动服务
/// </summary>
public void Start()
{
MqttControllor = new MqttControllor(_option.MqttConfig);
foreach (var item in _deviceLinks)
{
ModbusTcp modbusTcp = new ModbusTcp(item);
modbusTcp.DoMonitor();
modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;
//将
_modbusTcps.Add(item.UID, modbusTcp);
MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);
}
if (_serviceConfig.IsPushScheduled)
{
timer.Start();
}
}
}
实现消息订阅方法-设备控制DeviceControl
处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法
<summary>
/// 设备控制,反写
/// </summary>
/// <param name="topic"></param>
/// <param name="msg"></param>
private void DeviceControl(string topic, string msg)
{
var message = JsonSerializer.Deserialize<DeviceMessage>(msg);
//如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值
if (message != null)
{
var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象
if (link != null)
{
var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象
//循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值
foreach (var item in message.Data)
{
var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象
if (point != null)
{
var parseMethod = point.Type.GetMethod(
"Parse",
BindingFlags.Public | BindingFlags.Static,
new[] { typeof(string) }
);
point.WriteValue = parseMethod.Invoke(
null,
new object[] { item.Value.ToString() }
); //通过点位id找到对应的点位对象
}
modbusTcp.Write(point);
}
}
}
}
在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入
public class ModbusTcp
{
/// <summary>
/// 写入队列
/// </summary>
private Queue<RegisterPoint> _writeQueue = new Queue<RegisterPoint>();
·
//写入值先加入一个队列
public void Write(RegisterPoint point)
{
_writeQueue.Enqueue(point);
}
}
完整代码
public class DAqService
{
public static string MainTopic = "DTSDAQ/";
private MqttControllor MqttControllor;
private Dictionary<string, ModbusTcp> _modbusTcps;
private DAqOption _option;
private List<DeviceLink> _deviceLinks;
private ServiceConfig _serviceConfig;
private System.Timers.Timer timer;
public DAqService(DAqOption option)
{
_modbusTcps = new Dictionary<string, ModbusTcp>();
_option = option;
_deviceLinks = option.DeviceLinks;
_serviceConfig = option.ServiceConfig;
timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000);
timer.Elapsed += Timer_Elapsed;
}
/// <summary>
/// 启动服务
/// </summary>
public void Start()
{
MqttControllor = new MqttControllor(_option.MqttConfig);
foreach (var item in _deviceLinks)
{
ModbusTcp modbusTcp = new ModbusTcp(item);
modbusTcp.DoMonitor();
modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;
_modbusTcps.Add(item.UID, modbusTcp);
MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl);
}
if (_serviceConfig.IsPushScheduled)
{
timer.Start();
}
}
/// <summary>
/// 设备控制,反写
/// </summary>
/// <param name="topic"></param>
/// <param name="msg"></param>
private void DeviceControl(string topic, string msg)
{
var message = JsonSerializer.Deserialize<DeviceMessage>(msg);
//如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值
if (message != null)
{
var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象
if (link != null)
{
var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象
//循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值
foreach (var item in message.Data)
{
var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象
if (point != null)
{
var parseMethod = point.Type.GetMethod(
"Parse",
BindingFlags.Public | BindingFlags.Static,
new[] { typeof(string) }
);
point.WriteValue = parseMethod.Invoke(
null,
new object[] { item.Value.ToString() }
); //通过点位id找到对应的点位对象
}
modbusTcp.Write(point);
}
}
}
}
private void Timer_Elapsed(object? sender, ElapsedEventArgs e)
{
foreach (var link in _deviceLinks)
{
try
{
DeviceMessage device = new DeviceMessage { DeviceId = link.UID };
foreach (RegisterPoint point in link.Points)
{
// Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");
device.Data.Add(point.UID, point.Value);
}
var data = JsonSerializer.Serialize(device);
MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)
{
if (_serviceConfig.IsPushChanged)
{
try
{
DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };
device.Data.Add(point.UID, value);
var data = JsonSerializer.Serialize(device);
MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
Console.WriteLine($"Point:{point.UID}-->Value:{value}");
}
}