前言
通过我前面的一篇文件,我们已经能够搭建一个OPC-UA服务端了,并且也拥有了一些基础功能。这一次咱们就来了解一下OPC-UA的服务注册与发现,如果对服务注册与发现这个概念不理解的朋友,可以先百度一下,由于近年来微服务架构的兴起,服务注册与发现已经成为一个很时髦的概念,它的主要功能可分为三点:
1、服务注册;
2、服务发现;
3、心跳检测。
如果运行过OPC-UA源码的朋友们应该已经发现了,OPC-UA服务端启动之后,每隔一会就会输出一行错误提示信息,大致内容是"服务端注册失败,xxx毫秒之后重试",通过查看源码我们可以知道,这是因为OPC-UA服务端启动之后,会自动调用"opc.tcp://localhost:4840/"的RegisterServer2方法注册自己,如果注册失败,则会立即调用RegisterServer方法再次进行服务注册,而由于我们没有"opc.tcp://localhost:4840/"这个服务,所以每隔一会儿就会提示服务注册失败。
现在我们就动手来搭建一个"opc.tcp://localhost:4840/"服务,在OPC-UA标准中,它叫Discovery Server。
一、服务配置
Discovery Server的服务配置与普通的OPC-UA服务配置差不多,只需要注意几点:
1、服务的类型ApplicationType是DiscoveryServer而不是Server;
2、服务启动时application.Start()传入的实例化对象需要实现IDiscoveryServer接口。
配置代码如下:
var config = new ApplicationConfiguration() { ApplicationName = "Axiu UA Discovery", ApplicationUri = Utils.Format(@"urn:{0}:AxiuUADiscovery", System.Net.Dns.GetHostName()), ApplicationType = ApplicationType.DiscoveryServer, ServerConfiguration = new ServerConfiguration() { BaseAddresses = { "opc.tcp://localhost:4840/" }, MinRequestThreadCount = 5, MaxRequestThreadCount = 100, MaxQueuedRequestCount = 200 }, DiscoveryServerConfiguration = new DiscoveryServerConfiguration() { BaseAddresses = { "opc.tcp://localhost:4840/" }, ServerNames = { "OpcuaDiscovery" } }, SecurityConfiguration = new SecurityConfiguration { ApplicationCertificate = new CertificateIdentifier { StoreType = @"Directory", StorePath = @"%CommonApplicationData%\OPC Foundation\CertificateStores\MachineDefault", SubjectName = Utils.Format(@"CN={0}, DC={1}", "AxiuOpcua", System.Net.Dns.GetHostName()) }, TrustedIssuerCertificates = new CertificateTrustList { StoreType = @"Directory", StorePath = @"%CommonApplicationData%\OPC Foundation\CertificateStores\UA Certificate Authorities" }, TrustedPeerCertificates = new CertificateTrustList { StoreType = @"Directory", StorePath = @"%CommonApplicationData%\OPC Foundation\CertificateStores\UA Applications" }, RejectedCertificateStore = new CertificateTrustList { StoreType = @"Directory", StorePath = @"%CommonApplicationData%\OPC Foundation\CertificateStores\RejectedCertificates" }, AutoAcceptUntrustedCertificates = true, AddAppCertToTrustedStore = true }, TransportConfigurations = new TransportConfigurationCollection(), TransportQuotas = new TransportQuotas { OperationTimeout = 15000 }, ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 }, TraceConfiguration = new TraceConfiguration() }; config.Validate(ApplicationType.DiscoveryServer).GetAwaiter().GetResult(); if (config.SecurityConfiguration.AutoAcceptUntrustedCertificates) { config.CertificateValidator.CertificateValidation += (s, e) => { e.Accept = (e.Error.StatusCode == StatusCodes.BadCertificateUntrusted); }; } var application = new ApplicationInstance { ApplicationName = "Axiu UA Discovery", ApplicationType = ApplicationType.DiscoveryServer, ApplicationConfiguration = config }; //application.CheckApplicationInstanceCertificate(false, 2048).GetAwaiter().GetResult(); bool certOk = application.CheckApplicationInstanceCertificate(false, 0).Result; if (!certOk) { Console.WriteLine("证书验证失败!"); } var server = new DiscoveryServer(); // start the server. application.Start(server).Wait();
二、实现IDiscoveryServer接口
下面我们就来看看前面Discovery服务启动时传入的实例化对象与普通服务启动时传入的对象有什么不一样,在我们启动一个普通OPC-UA服务时,我们可以直接使用StandardServer的对象,程序不会报错,只不过是没有任何节点和内容而已,而现在,如果我们直接使用DiscoveryServerBase类的对象,启动Discovery服务时会报错。哪怕是我们实现了IDiscoveryServer接口仍然会报错。为了能启动Discovery服务我们还必须重写ServerBase中的两个方法:
1、EndpointBase GetEndpointInstance(ServerBase server),默认的GetEndpointInstance方法返回的类型是SessionEndpoint对象,而Discovery服务应该返回的是DiscoveryEndpoint;
protected override EndpointBase GetEndpointInstance(ServerBase server) { return new DiscoveryEndpoint(server);//SessionEndpoint }
2、void StartApplication(ApplicationConfiguration configuration),默认的StartApplication方法没有执行任何操作,而我们需要去启动一系列与Discovery服务相关的操作。
protected override void StartApplication(ApplicationConfiguration configuration) { lock (m_lock) { try { // create the datastore for the instance. m_serverInternal = new ServerInternalData( ServerProperties, configuration, MessageContext, new CertificateValidator(), InstanceCertificate); // create the manager responsible for providing localized string resources. ResourceManager resourceManager = CreateResourceManager(m_serverInternal, configuration); // create the manager responsible for incoming requests. RequestManager requestManager = new RequestManager(m_serverInternal); // create the master node manager. MasterNodeManager masterNodeManager = new MasterNodeManager(m_serverInternal, configuration, null); // add the node manager to the datastore. m_serverInternal.SetNodeManager(masterNodeManager); // put the node manager into a state that allows it to be used by other objects. masterNodeManager.Startup(); // create the manager responsible for handling events. EventManager eventManager = new EventManager(m_serverInternal, (uint)configuration.ServerConfiguration.MaxEventQueueSize); // creates the server object. m_serverInternal.CreateServerObject( eventManager, resourceManager, requestManager); // create the manager responsible for aggregates. m_serverInternal.AggregateManager = CreateAggregateManager(m_serverInternal, configuration); // start the session manager. SessionManager sessionManager = new SessionManager(m_serverInternal, configuration); sessionManager.Startup(); // start the subscription manager. SubscriptionManager subscriptionManager = new SubscriptionManager(m_serverInternal, configuration); subscriptionManager.Startup(); // add the session manager to the datastore. m_serverInternal.SetSessionManager(sessionManager, subscriptionManager); ServerError = null; // set the server status as running. SetServerState(ServerState.Running); // monitor the configuration file. if (!String.IsNullOrEmpty(configuration.SourceFilePath)) { var m_configurationWatcher = new ConfigurationWatcher(configuration); m_configurationWatcher.Changed += new EventHandler<ConfigurationWatcherEventArgs>(this.OnConfigurationChanged); } CertificateValidator.CertificateUpdate += OnCertificateUpdate; //60s后开始清理过期服务列表,此后每60s检查一次 m_timer = new Timer(ClearNoliveServer, null, 60000, 60000); Console.WriteLine("Discovery服务已启动完成,请勿退出程序!!!"); } catch (Exception e) { Utils.Trace(e, "Unexpected error starting application"); m_serverInternal = null; ServiceResult error = ServiceResult.Create(e, StatusCodes.BadInternalError, "Unexpected error starting application"); ServerError = error; throw new ServiceResultException(error); } } }
三、注册与发现服务
服务注册之后,就涉及到服务信息如何保存,OPC-UA标准里面好像是没有固定要的要求,应该是没有,至少我没有发现...傲娇.jpg。
1.注册服务
这里我就直接使用一个集合来保存服务信息,这种方式存在一个问题:如果Discovery服务重启了,那么在服务重新注册之前这段时间内,所有已注册的服务信息都丢失了(因为OPC-UA服务的心跳间隔是30s,也就是最大可能会有30s的时间服务信息丢失)。所以如果对服务状态信息敏感的情况,请自行使用其他方式,可以存储到数据库,也可以用其他分布式缓存来保存。这些就不在我们的讨论范围内了,我们先看看服务注册的代码。
public virtual ResponseHeader RegisterServer2( RequestHeader requestHeader, RegisteredServer server, ExtensionObjectCollection discoveryConfiguration, out StatusCodeCollection configurationResults, out DiagnosticInfoCollection diagnosticInfos) { configurationResults = null; diagnosticInfos = null; ValidateRequest(requestHeader); // Insert implementation. try { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + ":服务注册:" + server.DiscoveryUrls.FirstOrDefault()); RegisteredServerTable model = _serverTable.Where(d => d.ServerUri == server.ServerUri).FirstOrDefault(); if (model != null) { model.LastRegistered = DateTime.Now; } else { model = new RegisteredServerTable() { DiscoveryUrls = server.DiscoveryUrls, GatewayServerUri = server.GatewayServerUri, IsOnline = server.IsOnline, LastRegistered = DateTime.Now, ProductUri = server.ProductUri, SemaphoreFilePath = server.SemaphoreFilePath, ServerNames = server.ServerNames, ServerType = server.ServerType, ServerUri = server.ServerUri }; _serverTable.Add(model); } configurationResults = new StatusCodeCollection() { StatusCodes.Good }; return CreateResponse(requestHeader, StatusCodes.Good); } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("客户端调用RegisterServer2()注册服务时触发异常:" + ex.Message); Console.ResetColor(); } return CreateResponse(requestHeader, StatusCodes.BadUnexpectedError); }
前面有说到,OPC-UA普通服务启动后会先调用RegisterServer2方法注册自己,如果注册失败,则会立即调用RegisterServer方法再次进行服务注册。所以,为防万一。RegisterServer2和RegisterServer我们都需要实现,但是他们的内容其实是一样的,毕竟都是干一样的活--接收服务信息,然后把服务信息保存起来。
public virtual ResponseHeader RegisterServer( RequestHeader requestHeader, RegisteredServer server) { ValidateRequest(requestHeader); // Insert implementation. try { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + ":服务注册:" + server.DiscoveryUrls.FirstOrDefault()); RegisteredServerTable model = _serverTable.Where(d => d.ServerUri == server.ServerUri).FirstOrDefault(); if (model != null) { model.LastRegistered = DateTime.Now; } else { model = new RegisteredServerTable() { DiscoveryUrls = server.DiscoveryUrls, GatewayServerUri = server.GatewayServerUri, IsOnline = server.IsOnline, LastRegistered = DateTime.Now, ProductUri = server.ProductUri, SemaphoreFilePath = server.SemaphoreFilePath, ServerNames = server.ServerNames, ServerType = server.ServerType, ServerUri = server.ServerUri }; _serverTable.Add(model); } return CreateResponse(requestHeader, StatusCodes.Good); } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("客户端调用RegisterServer()注册服务时触发异常:" + ex.Message); Console.ResetColor(); } return CreateResponse(requestHeader, StatusCodes.BadUnexpectedError); }
2.发现服务
服务注册之后,我们的Discovery服务就知道有哪些OPC-UA服务已经启动了,所以我们还需要一个方法来告诉客户端这些已启动的服务信息。FindServers()方法就是来干这件事的。
public override ResponseHeader FindServers( RequestHeader requestHeader, string endpointUrl, StringCollection localeIds, StringCollection serverUris, out ApplicationDescriptionCollection servers) { servers = new ApplicationDescriptionCollection(); ValidateRequest(requestHeader); Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + ":请求查找服务..."); string hostName = Dns.GetHostName(); lock (_serverTable) { foreach (var item in _serverTable) { StringCollection urls = new StringCollection(); foreach (var url in item.DiscoveryUrls) { if (url.Contains("localhost")) { string str = url.Replace("localhost", hostName); urls.Add(str); } else { urls.Add(url); } } servers.Add(new ApplicationDescription() { ApplicationName = item.ServerNames.FirstOrDefault(), ApplicationType = item.ServerType, ApplicationUri = item.ServerUri, DiscoveryProfileUri = item.SemaphoreFilePath, DiscoveryUrls = urls, ProductUri = item.ProductUri, GatewayServerUri = item.GatewayServerUri }); } } return CreateResponse(requestHeader, StatusCodes.Good); }
3.心跳检测
需要注意一点,在OPC-UA标准中并没有提供单独的心跳方法,它采用的心跳方式就是再次向Discovery服务注册自己,这也就是为什么服务注册失败之后会重试;服务注册成功了,它也还是会重试。所以在服务注册时,我们需要判断一下服务信息是否已经存在了,如果已经存在了,那么就执行心跳的操作。
至此,我们已经实现的服务的注册与发现,IDiscoveryServer接口要求的内容我们也都实现了,但是有没有发现我们还少了一样东西,就是如果我们的某个普通服务关闭了或是掉线了,我们的Discovery服务还是保存着它的信息,这个时候理论上来讲,已离线的服务信息就应该删掉,不应该给客户端返回了。所以这就需要一个方法来清理那些已经离线的服务。
private void ClearNoliveServer(object obj) { try { var tmpList = _serverTable.Where(d => d.LastRegistered < DateTime.Now.AddMinutes(-1) || !d.IsOnline).ToList(); if (tmpList.Count > 0) { lock (_serverTable) { foreach (var item in tmpList) { Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + ":清理服务:" + item.DiscoveryUrls.FirstOrDefault()); _serverTable.Remove(item); } } } } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("清理掉线服务ClearNoliveServer()时触发异常:" + ex.Message); Console.ResetColor(); } }
我这里以一分钟为限,如果一分钟内都没有心跳的服务,我就当它是离线了。关于这个一分钟需要根据自身情况来调整。
补充说明
OPC-UA服务默认是向localhost注册自己,当然,也可以调整配置信息,把服务注册到其他地方去,只需在ApplicationConfiguration对象中修改ServerConfiguration属性如下:
ServerConfiguration = new ServerConfiguration() { BaseAddresses = { "opc.tcp://localhost:8020/", "https://localhost:8021/" }, MinRequestThreadCount = 5, MaxRequestThreadCount = 100, MaxQueuedRequestCount = 200, RegistrationEndpoint = new EndpointDescription() { EndpointUrl = "opc.tcp://172.17.4.68:4840", SecurityLevel = ServerSecurityPolicy.CalculateSecurityLevel(MessageSecurityMode.SignAndEncrypt, SecurityPolicies.Basic256Sha256), SecurityMode = MessageSecurityMode.SignAndEncrypt, SecurityPolicyUri = SecurityPolicies.Basic256Sha256, Server = new ApplicationDescription() { ApplicationType = ApplicationType.DiscoveryServer }, } },
最新的Discovery Server代码在我的GitHub上已经上传,地址:
https://github.com/axiu233/AxiuOpcua.ServerDemo
代码文件为:
Axiu.Opcua.Demo.Service.DiscoveryManagement;
Axiu.Opcua.Demo.Service.DiscoveryServer。