🚀📊 OPC UA + ABP vNext 企業級實戰:高可用數據采集框架指南 🚀
📑 目錄
- 🚀📊 OPC UA + ABP vNext 企業級實戰:高可用數據采集框架指南 🚀
- 一、前言 🎯
- 二、系統架構 🏗?
- 三、配置與校驗 🔧
- `appsettings.json`
- 校驗示例
- 📜 配置校驗流程
- 四、OpcUaService 增強:線程安全 + Polly 重試 🔐🔄
- 🔒 OPC UA 會話重連流程
- 五、數據采集作業:異常隔離 + 告警上報 🚨
- 📥 數據采集 & 緩存流程
- 六、模塊注冊補全 🎛?
- `OpcUaHealthCheck` 示例
- 七、證書 & Kubernetes 部署 ??
- 1. 生成并信任證書(Linux)
- 📦 證書生成與掛載流程
- 2. Kubernetes Secret 示例
- 3. Pod 掛載
- 4. Liveness/Readiness Probes
- ?? K8s 探針配置流程
- 八、日志采集與可觀測性 🔍
- 九、結語 🎉
一、前言 🎯
本文基于企業級生產環境需求,全面重構 OPC UA 與 ABP vNext 集成框架,涵蓋:
- 配置集中化 & 校驗 ?
- 安全封裝 & Polly 重試 🔄
- 原生作業調度 (BackgroundWorkerBase) ??
- 分布式緩存 & 更新 冪等 🔒
- 健康檢查 & 告警事件 🚨
- OpenTelemetry 跟蹤 🕵?
- 證書管理 & Kubernetes 部署 ??
實現「即克隆、即運行、即監控」的工業數據平臺!?
二、系統架構 🏗?
三、配置與校驗 🔧
appsettings.json
"OpcUa": {"Endpoint": "opc.tcp://localhost:4840","NodeIds": ["ns=2;s=Device1", "ns=2;s=Device2"],"CacheDurationSeconds": 120,"AutoAcceptUntrusted": false,"Certificate": {"StorePath": "/etc/opcua/certs","SubjectName": "CN=OpcAbpIntegration"}
}
校驗示例
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{var config = context.ServiceProvider.GetRequiredService<IConfiguration>();var section = config.GetSection("OpcUa");if (!section.Exists())throw new ConfigurationErrorsException("🔴 OpcUa 配置節缺失!");var endpoint = section["Endpoint"];if (string.IsNullOrWhiteSpace(endpoint))throw new ConfigurationErrorsException("🔴 OpcUa.Endpoint 不能為空!");var nodeIds = section.GetSection("NodeIds").Get<string[]>();if (nodeIds == null || nodeIds.Length == 0)throw new ConfigurationErrorsException("🔴 OpcUa.NodeIds 至少配置一個!");
}
📜 配置校驗流程
四、OpcUaService 增強:線程安全 + Polly 重試 🔐🔄
public class OpcUaService : IOpcUaService, ISingletonDependency
{private readonly IOptions<OpcUaOptions> _options;private readonly ILogger<OpcUaService> _logger;private Session? _session;private readonly SemaphoreSlim _lock = new(1, 1);public OpcUaService(IOptions<OpcUaOptions> options, ILogger<OpcUaService> logger){_options = options;_logger = logger;}public async Task<Session> EnsureSessionAsync(){await _lock.WaitAsync();try{if (_session?.Connected == true) return _session;var config = new ApplicationConfiguration{ApplicationName = "OpcAbpIntegration",ApplicationUri = "urn:abp:opcua",ApplicationType = ApplicationType.Client,SecurityConfiguration = new SecurityConfiguration{ApplicationCertificate = new CertificateIdentifier{StoreType = "Directory",StorePath = _options.Value.Certificate.StorePath,SubjectName = _options.Value.Certificate.SubjectName},AutoAcceptUntrustedCertificates = _options.Value.AutoAcceptUntrusted},ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },TransportQuotas = new TransportQuotas { OperationTimeout = 15000, MaxMessageSize = 4_194_304 }};await config.Validate(ApplicationType.Client);var endpointDesc = CoreClientUtils.SelectEndpoint(_options.Value.Endpoint, false);var endpoint = new ConfiguredEndpoint(null, endpointDesc, EndpointConfiguration.Create(config));_session = await Session.Create(config, endpoint, false, "OPC UA", 60000, new UserIdentity(), null);_logger.LogInformation("? OPC UA 會話已連接:{Endpoint}", _options.Value.Endpoint);return _session;}finally{_lock.Release();}}public async Task<string> ReadNodeAsync(string nodeId){return await Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: attempt => TimeSpan.FromSeconds(1 << attempt),onRetry: (ex, delay) => _logger.LogWarning(ex, "重試讀取節點 {NodeId}", nodeId)).ExecuteAsync(async () =>{var session = await EnsureSessionAsync();_logger.LogDebug("📡 讀取節點 {NodeId}", nodeId);var node = new ReadValueId { NodeId = new NodeId(nodeId), AttributeId = Attributes.Value };var results = new DataValueCollection();await session.Read(null, 0, TimestampsToReturn.Both, new[] { node }, out results, out _);return results.FirstOrDefault()?.Value?.ToString() ?? "";});}
}
🔒 OPC UA 會話重連流程
五、數據采集作業:異常隔離 + 告警上報 🚨
public class OpcUaCollectorWorker : BackgroundWorkerBase
{private readonly IOpcUaService _opcUa;private readonly IDistributedCache<MyDeviceCacheItem> _cache;private readonly IMyDeviceRepository _repository;private readonly IDistributedEventBus _eventBus;private readonly IOptions<OpcUaOptions> _options;private readonly ILogger<OpcUaCollectorWorker> _logger;public override float DelayFactor => 1; // 可配置執行間隔public OpcUaCollectorWorker(IOpcUaService opcUa,IDistributedCache<MyDeviceCacheItem> cache,IMyDeviceRepository repository,IDistributedEventBus eventBus,IOptions<OpcUaOptions> options,ILogger<OpcUaCollectorWorker> logger){_opcUa = opcUa;_cache = cache;_repository = repository;_eventBus = eventBus;_options = options;_logger = logger;}[UnitOfWork]public override async Task ExecuteAsync(CancellationToken stoppingToken){var failedNodes = new List<string>();var sw = Stopwatch.StartNew();foreach (var nodeId in _options.Value.NodeIds){try{var value = await _opcUa.ReadNodeAsync(nodeId);await _repository.InsertOrUpdateAsync(new MyDeviceData(nodeId, value),existing => existing.Update(value));await _cache.SetAsync(nodeId,new MyDeviceCacheItem(value),new DistributedCacheEntryOptions {AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(_options.Value.CacheDurationSeconds)});_logger.LogInformation("📥 節點 {NodeId} 數據已更新", nodeId);}catch (Exception ex){_logger.LogError(ex, "? 讀取節點 {NodeId} 失敗", nodeId);failedNodes.Add(nodeId);}}sw.Stop();_logger.LogInformation("🔄 本次采集用時 {Elapsed} ms", sw.ElapsedMilliseconds);if (failedNodes.Count > 0){await _eventBus.PublishAsync(new NodeReadFailedEvent(failedNodes));_logger.LogWarning("?? 發布讀取失敗告警,節點:{Nodes}", string.Join(',', failedNodes));}}
}
📥 數據采集 & 緩存流程
六、模塊注冊補全 🎛?
[DependsOn(typeof(AbpEntityFrameworkCoreModule),typeof(AbpDistributedCacheModule),typeof(AbpBackgroundWorkersModule),typeof(AbpAutofacModule))]
public class OpcUaModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// 1?? 配置綁定與校驗(見 OnApplicationInitialization)context.Services.Configure<OpcUaOptions>(context.Services.GetConfiguration().GetSection("OpcUa"));// 2?? 核心服務注冊context.Services.AddSingleton<IOpcUaService, OpcUaService>();// 3?? EF Core & 倉儲context.Services.AddAbpDbContext<MyDbContext>(opts =>{opts.AddDefaultRepositories(includeAllEntities: true);});// 4?? Background Workercontext.Services.AddBackgroundWorker<OpcUaCollectorWorker>();// 5?? 健康檢查context.Services.AddHealthChecks().AddCheck<OpcUaHealthCheck>("opcua").AddNpgSql("YourPostgreConnection").AddRedis("localhost");// 6?? OpenTelemetry 跟蹤context.Services.AddOpenTelemetryTracing(builder =>{builder.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddEntityFrameworkCoreInstrumentation().AddSource("OpcUaService").AddJaegerExporter();});}
}
OpcUaHealthCheck
示例
public class OpcUaHealthCheck : IHealthCheck
{private readonly IOpcUaService _opcUa;public OpcUaHealthCheck(IOpcUaService opcUa) => _opcUa = opcUa;public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken token = default){try{await _opcUa.EnsureSessionAsync();return HealthCheckResult.Healthy("OPC UA session is healthy");}catch (Exception ex){return HealthCheckResult.Unhealthy("OPC UA session failed", ex);}}
}
七、證書 & Kubernetes 部署 ??
1. 生成并信任證書(Linux)
openssl genrsa -out client.key 2048
openssl req -new -x509 -key client.key -out client.crt -days 365 \-subj "/CN=OpcAbpIntegration"
mkdir -p /etc/opcua/certs/trusted
cp client.crt /etc/opcua/certs/trusted/
📦 證書生成與掛載流程
2. Kubernetes Secret 示例
apiVersion: v1
kind: Secret
metadata:name: opcua-certsnamespace: your-ns
stringData:client.crt: |-----BEGIN CERTIFICATE-----...base64...-----END CERTIFICATE-----
3. Pod 掛載
volumeMounts:
- name: opcua-certsmountPath: /etc/opcua/certs
volumes:
- name: opcua-certssecret:secretName: opcua-certs
4. Liveness/Readiness Probes
readinessProbe:httpGet:path: /health/readyport: 5000initialDelaySeconds: 10periodSeconds: 30livenessProbe:httpGet:path: /health/liveport: 5000initialDelaySeconds: 30periodSeconds: 60
?? K8s 探針配置流程
八、日志采集與可觀測性 🔍
- 推薦安裝 NuGet 包:
OpenTelemetry.Extensions.Hosting
OpenTelemetry.Instrumentation.Http
,AspNetCore
,EntityFrameworkCore
- 日志平臺:Seq、ELK、Jaeger
- ABP 自帶日志面板可實時查看采集結果
九、結語 🎉
此版本已實現企業級「高可用、可復現、可維護」規范,覆蓋從 證書、配置、作業調度、緩存優化、健康檢查 到 可觀測 的全鏈路實踐。
📦 推薦 將此框架部署于 IoT Edge、Kubernetes,并結合 CI/CD 與 自動化證書腳本,打造工業物聯網的實時采集+可視化體系!