文章目錄
- 鏈接參數
- 設置logger 日志
- redis 鏈接
- mysql 鏈接
- emqx 鏈接
- mssql 鏈接
- tdengine 鏈接
- 采集OPCUA的點表的配置信息
- 設備
- 點表
- OPCUA 采集 數據程序
- 數據采集邏輯
鏈接參數
import randomtdengine_connection_params = {'username': 'root','password': 'taosdata','host': '127.0.0.1','port': 6030,'database': 'test'
}mysql_connection_params = {'username': 'root','password': 'root','host': '127.0.0.1','port': 3306,'database': 'th'
}mssql_connection_params = {'username': 'sa','password': 'Z','host': '127.0.0.1','port': 1433,'database': 'SistarData','driver': 'ODBC+Driver+17+for+SQL+Server'
}redis_connection_params = {'host': 'localhost', # Redis 服務器的地址'port': 6379, # Redis 服務器的端口'db': 0, # 使用的 Redis 數據庫編號'max_connections': 10, # 連接池的最大連接數'decode_responses': True
}emqx_connection_params = {'broker': '127.0.0.1','port': 1883,'client_id': f'python_mqtt_client_{random.randint(1, 10)}','keep_alive_interval': 60,'password': None,'username': None,'topic_sub': None,'topic_pub': None
}logger_params = {"logger_name": "opcua_adapter_logger","total_level": 20,"file_handler_level": 10,"control_handler_level": 20,"file_name": r"E:\TH\core\basic_service\opcua_adapter\logs\opcua_adapter_logger.txt","mode": "a","max_bytes": 10485760,"backup_count": 10,"encoding": "UTF-8","format": "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
}opcua_adapter_params = {'alarm_consumer_queue_length': 100,'archive_consumer_queue_length': 100,'emqx_consumer_queue_length': 100,'acquisition_frequency':1, # 采集周期'monitor_frequency': 2, # 監控周期'alarm_table_name': 'alarm', # 報警的表格'archive_table_name': 'test', # 歸檔的數據庫名稱'emqx_worker': 1,'alarm_worker': 5,'archive_worker':5
}
設置logger 日志
import logging
from concurrent_log_handler import ConcurrentRotatingFileHandlerfrom config import logger_paramsdef get_logger(logger_name, total_level, file_name, mode, max_bytes, backup_count, encoding, file_handler_level,control_handler_level, format) -> logging.getLogger():logger = logging.getLogger(logger_name)logger.setLevel(total_level) # 設置日志級別file_handler = ConcurrentRotatingFileHandler(filename=file_name,mode=mode,maxBytes=max_bytes,backupCount=backup_count,encoding=encoding) # 設置輸出文件file_handler.setLevel(file_handler_level)control_handler = logging.StreamHandler()control_handler.setLevel(control_handler_level)formatter = logging.Formatter(format)file_handler.setFormatter(formatter)control_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(control_handler)return loggerlogger = get_logger(**logger_params)
redis 鏈接
import redisfrom basic_service.opcua_adapter.config import redis_connection_paramspool = redis.ConnectionPool(host=redis_connection_params['host'],port=redis_connection_params['port'],db=redis_connection_params['db'],max_connections=redis_connection_params['max_connections'],decode_responses=redis_connection_params['decode_responses'],)# 獲取連接對象
def get_redis_connection():while True:try:redis_conn = redis.Redis(connection_pool=pool)response = redis_conn.ping()if response:return redis_connelse:continueexcept Exception as e:print.error(str(e))conn = get_redis_connection()
mysql 鏈接
"""
用于連接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"mysql+pymysql://{mysql_connection_params['username']}:{mysql_connection_params['password']}@{mysql_connection_params['host']}:{mysql_connection_params['port']}/{mysql_connection_params['database']}")def get_mysql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MySQL連接建立失敗,請檢查網絡是否暢通(ping {mysql_connection_params['host']}),服務器是否正常,是否開啟遠程連接")time.sleep(3)continueget_mysql_connection()
emqx 鏈接
import paho.mqtt.client as mqtt
import timefrom basic_service.opcua_adapter.config import emqx_connection_params
from get_logger import loggerclass MQTTClient:def __init__(self, broker, port, client_id, keep_alive_interval, password=None, username=None, topic_sub=None,topic_pub=None):self.broker = brokerself.port = portself.client_id = client_idself.keep_alive_interval = keep_alive_intervalself.topic_sub = topic_subself.topic_pub = topic_pubself.password = passwordself.username = username# 創建 MQTT 客戶端self.client = mqtt.Client(client_id=self.client_id)if self.username and self.password:self.client.username_pw_set(username=self.username, password=self.password) # 如果有需要設置用戶名密碼self.client.on_connect = self.on_connectself.client.on_disconnect = self.on_disconnectself.client.on_message = self.on_messageself.client.on_publish = self.on_publishself.connect()self.client.loop_start()def on_connect(self, client, userdata, flags, rc):if rc == 0:logger.info(f"Connected to MQTT Broker! Returned code={rc}")if self.topic_sub:client.subscribe(self.topic_sub) # 連接成功后,訂閱主題else:logger.error(f"Failed to connect, return code {rc}")def on_disconnect(self, client, userdata, rc):logger.info("Disconnected from MQTT Broker with code: " + str(rc))# 這里可以實現重連邏輯self.reconnect()def on_message(self, client, userdata, msg):# logger.info(f"Received message: {msg.topic} -> {msg.payload.decode()}")passdef on_publish(self, client, userdata, mid):# print(f"Message {mid} has been published.")pass# 重連機制def reconnect(self):try:logger.error("Attempting to reconnect...")self.client.reconnect() # 嘗試重連except Exception as e:logger.error(f"Reconnection failed: {e}")time.sleep(5) # 延時后再嘗試def connect(self):try:self.client.connect(self.broker, self.port, self.keep_alive_interval)except Exception as e:logger.error(f"Connection failed: {e}")self.reconnect()get_emqx_connection = MQTTClient(**emqx_connection_params)# 主循環發布消息
# try:
# while True:
# message = "Hello MQTT"
# result = mqtt_client.client.publish('/test', message, qos=0)
# status = result[0]
# if status == 0:
# print(f"Sent `{message}` to topic /test")
# else:
# print(f"Failed to send message to topic /test")
# time.sleep(10) # 每10秒發布一次
# except KeyboardInterrupt:
# print("Interrupted by user, stopping...")
# finally:
# mqtt_client.client.loop_stop()
# mqtt_client.client.disconnect()
mssql 鏈接
"""
用于連接tdengine
"""
import time
#from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_params, mssql_connection_paramsfrom sqlalchemy import create_engine, textengine = create_engine(f'mssql+pyodbc://sa:Z#@127.0.0.1:1433/sistarData?driver=ODBC+Driver+17+for+SQL+Server')def get_mssql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MSSQL連接建立失敗,請檢查網絡是否暢通(ping {mssql_connection_params['host']}),服務器是否正常,是否開啟遠程連接")time.sleep(3)continueconn = get_mssql_connection()result = conn.execute(text("SELECT TABLE_NAME,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'sistar_eng_areas';"))print(result.fetchall())
tdengine 鏈接
"""
用于連接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import tdengine_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"taos://{tdengine_connection_params['username']}:{tdengine_connection_params['password']}@{tdengine_connection_params['host']}:{tdengine_connection_params['port']}/{tdengine_connection_params['database']}")def get_tdengine_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"TDEngine連接建立失敗,請檢查網絡是否暢通(ping {tdengine_connection_params['host']}),服務器是否正常,是否開啟遠程連接,是否安裝與服務器相同版本的客戶端")time.sleep(3)continue
采集OPCUA的點表的配置信息
設備
[{"No.": 1,"device_name": "device1","url": "opc.tcp:\/\/192.168.10.132:4862"},{"No.": 2,"device_name": "device2","url": "opc.tcp:\/\/192.168.10.132:4863"}
]
點表
[{"No": 1,"tag_uuid": "tag0001","node_id": "ns=1;s=t|tag1","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "add","scale_factor": 1,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea61"},{"No": 2,"tag_uuid": "tag0002","node_id": "ns=1;s=t|tag2","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "sub","scale_factor": 2,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea62"}]
用戶可以支持從excel導入數據。
從excel 上傳數據到 系統中
"""
用于初始化,創建數據庫,創建數據庫表
"""
import randomimport pandas as pd
from sqlalchemy import text
from connectors.tdengine import get_tdengine_connection
from get_logger import loggerxlsx = r'E:\TH\core\basic_service\opcua_adapter\static\opcua_template.xlsx'connection = get_tdengine_connection()db_name = 'test'
stable = 'meters'df = pd.read_excel(xlsx, sheet_name='device1')tag_uuid = df.loc[:, ['tag_uuid', 'unit']]create_table_sql = ''for item in tag_uuid.values:_sql = f"""CREATE TABLE IF NOT EXISTS {db_name}.{item[0]} USING {db_name}.{stable} TAGS ('{item[1]}', '{random.randint(1, 10)}');"""create_table_sql += _sqlcreate_db_stable_sql = f"""CREATE DATABASE IF NOT EXISTS {db_name} KEEP 3650;
use {db_name};
CREATE STABLE IF NOT EXISTS {db_name}.{stable} (ts TIMESTAMP, val FLOAT) TAGS ( unit BINARY(20), s_type BINARY(20));
"""
total_sql = create_db_stable_sql + create_table_sql
print(total_sql)
try:connection.execute(text(total_sql))connection.commit()logger.info('init success')
except Exception as e:print(str(e))connection.rollback()logger.error('init failed')
OPCUA 采集 數據程序
import datetime
import json
import os
import queue
import threading
import time
from threading import Thread
from typing import List, Any
import opcua.client.client
import psutil as psutil
from opcua.client.client import Client
from opcua.common.node import Node
from opcua.ua import UaStatusCodeError
from config import opcua_adapter_params
from connectors._redis import get_redis_connection
from connectors.emqx import get_emqx_connection
from connectors.mysql import get_mysql_connection
from connectors.tdengine import get_tdengine_connection
from get_logger import logger
from sqlalchemy import textemqx_connection = get_emqx_connection.clientclass OPCUAAdapter(object):"""OPCUA數據采集類"""def __init__(self, device_info, tag_id_to_node_id_map, tag_id_to_detail_map, use_subscribe=True):self.url: str = device_info.get('device_url')self._ua: opcua.client.client.Client = Client(url=self.url, timeout=5)self.connected: bool = Falseself.thread_list: List[Thread] = []self.raw_dict = {}self.tag_id_to_node_id_map = tag_id_to_node_id_mapself.tag_id_to_detail_map = tag_id_to_detail_mapself.device_info = device_infoself.alarm_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['alarm_consumer_queue_length'])self.archive_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['archive_consumer_queue_length'])self.emqx_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['emqx_consumer_queue_length'])def connect(self):"""連接函數:return:"""try:if self.connected:returnelse:self._ua.connect()self.connected = Truelogger.info(f"初次連接{self.url}成功!")returnexcept Exception as e:self.disconnect()self.connected = Falselogger.error("初次連接失敗,失敗原因:", e)# 開始重連self.reconnect()def disconnect(self):"""斷開連接:return:"""try:if self._ua and self.connected:self._ua.disconnect()self.connected = Falselogger.info("主動斷開連接成功")except Exception as e:logger.error("主動斷開連接失敗,失敗原因:", str(e))def reconnect(self):"""重連:return:"""index = 0while True:try:self._ua.connect()self.connected = Trueindex = 0logger.info(f"重連{self.url}成功!")returnexcept AttributeError as e:index += 1logger.error(f"第{index}次重連失敗,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept ConnectionRefusedError as e:index += 1logger.error(f"第{index}次重連失敗,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept OSError as e:index += 1logger.error(f"與OPCUA服務器未能建立連接,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept Exception as e:index += 1logger.error(f"第{index}次重連失敗,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continuedef interval_read(self, interval: int) -> None:"""按照采集頻率定時去采集:param interval::return:"""connection = get_redis_connection()thread_name = threading.current_thread().namenodes = []while True:# 每分鐘采集多少次,采集超時多少次,采集node數量,多少個node是Nonestart_time = time.time()if not self.connected:# 如果沒有連接成功,開啟重連self.reconnect()else:try:nodes_str_list = self.tag_id_to_node_id_map.keys()nodes = [self._ua.get_node(node) for node in nodes_str_list]values = self._ua.get_values(nodes)self.raw_dict = dict(zip(nodes_str_list, values))except AttributeError as e:logger.error(f"屬性讀取錯誤:{str(e)}!")except TimeoutError:logger.error(f"接收服務端報文超時")except ConnectionRefusedError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except ConnectionAbortedError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except UaStatusCodeError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except OSError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except RuntimeError as e:self.disconnect()self.reconnect()logger.error(f'運行錯誤,失敗原因:{str(e)}')except Exception as e:self.disconnect()self.reconnect()logger.error(f"未捕獲到的異常:{str(e)}")finally:end_time = time.time()try:connection.hmset('performance',mapping={'nodes': len(nodes), f'{thread_name}_use_time': f'{(end_time - start_time):.2f}'})except Exception:if connection:connection.close()connection = get_redis_connection()time.sleep(interval)def node_write(self, nodes: List[Node], values: List[Any]):"""寫入node:param nodes::param values::return:"""try:self._ua.set_values(nodes, values)except Exception as e:logger.error(f"數據寫入失敗,失敗原因:{str(e)}")def monitor_thread(self):"""監視線程:return:"""redis_connection = get_redis_connection()while True:try:current_process_id = os.getpid()process = psutil.Process(current_process_id)# 獲取進程的基本信息process_name = process.name()cpu_usage = process.cpu_percent(interval=1) # 進程的 CPU 使用率,間隔 1 秒memory_info = process.memory_info() # 進程的內存使用情況io_counters = process.io_counters() # 進程的 IO 計數disk_usage = psutil.disk_usage('/') # 獲取根目錄的磁盤使用情況thread_list = []for thread in threading.enumerate():thread_list.append((thread.ident, thread.name, thread.is_alive()))if thread.name == 'continuous_thread' and thread.is_alive() == False:logger.error(f'讀取線程出錯,請盡快聯系管理員處理!')redis_connection.hmset(name='performance',mapping={'process_name': process_name, 'cpu_usage': cpu_usage,'memory_info_RSS': f'{memory_info.rss / (1024 * 1024):.2f} MB','memory_info_VMS': f'{memory_info.vms / (1024 * 1024): .2f} MB','io_read': f"{io_counters.read_bytes / (1024 * 1024):.2f} MB",'io_write': f"{io_counters.write_bytes / (1024 * 1024):.2f} MB",'disk_usage': f'{disk_usage.percent}%','threads': json.dumps(thread_list),'pid': current_process_id,'archive_consumer_length': self.archive_consumer_queue.qsize(),'alarm_consumer_length': self.alarm_consumer_queue.qsize(),'emqx_consumer_length': self.emqx_consumer_queue.qsize(),})except Exception as e:logger.error(f'監控子線程出錯:{str(e)}')if redis_connection:redis_connection.close()redis_connection = get_redis_connection()finally:time.sleep(int(opcua_adapter_params['monitor_frequency']))def change_data_notifier(self, timestamp, node_id, new_data, old_data):""":param timestamp::param node::param new_data::param old_data::return:"""try:tag_id = self.tag_id_to_node_id_map[node_id]except KeyError:passelse:content = {'timestamp': timestamp,'tag_id': tag_id,'new_data': new_data,'old_data': old_data}self.alarm_consumer_queue.put(content)self.emqx_consumer_queue.put(content)self.archive_consumer_queue.put(content)def consumer_alarm_info(self):"""處理報警信息:return:"""redis_connection = get_redis_connection()alarm_table_name = opcua_adapter_params['alarm_table_name']connection = get_mysql_connection()thread_name = threading.current_thread().namealarm_count = 0while True:try:start_time = time.time()content = self.alarm_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])active_alarm = tag_detail.get('active_alarm')if active_alarm:up_limit = tag_detail.get('alarm_up')down_limit = tag_detail.get('alarm_down')if float(content['new_data']) > float(up_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_up_info")}", "{tag_detail.get("alarm_up")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'數據插入錯誤,錯誤原因:{str(e)}!')connection.rollback()elif float(content['new_data']) < float(down_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_down_info")}", "{tag_detail.get("alarm_down")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'數據插入錯誤,錯誤原因:{str(e)}!')connection.rollback()except Exception as e:logger.error(str(e))if connection:connection.close()connection = get_mysql_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','alarm_count': alarm_count})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_archive_info(self):thread_name = threading.current_thread().nameredis_connection = get_redis_connection()connection = get_tdengine_connection()exception_buffer = {}buffer = {}db_name = opcua_adapter_params['archive_table_name']exception_time = 0total_time = 0for k in self.tag_id_to_node_id_map.values():buffer.setdefault(k, [])# 異常buffer,當數據沒有被正常插入時,數據不被丟棄,放到異常隊列中,一旦恢復了連接,先將異常隊列中的數據恢復exception_buffer.setdefault(k, [])while True:try:start_time = time.time()for k, v in exception_buffer.items():if len(v) > 0:sql = f"""INSERT INTO {db_name}.{k} VALUES {str(v).replace('[', '').replace(']', '')}"""try:connection.execute(sql)connection.commit()exception_buffer.setdefault(k, [])except Exception:exception_time += 1connection.rollback()content = self.archive_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])if tag_detail.get('active_archive'):timestamp = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')try:data = float(content['new_data'])except ValueError:data = 0.0except TypeError:data = 0.0except Exception:data = 0.0if len(buffer[content['tag_id']]) < 100:buffer[content['tag_id']].append((timestamp, data))else:sql = f"""INSERT INTO {db_name}.{content['tag_id']} VALUES {str(buffer[content["tag_id"]]).replace('[', '').replace(']', '')};"""try:connection.execute(text(sql))connection.commit()buffer[content['tag_id']] = []except Exception:logger.error(f'insert error:{sql}')connection.rollback()exception_time += 1if len(exception_buffer) < 10000:exception_buffer[content['tag_id']].extend(buffer[content['tag_id']])buffer[content['tag_id']] = []else:# 如果超過設定的緩存值滯后,將舊值丟棄掉exception_buffer[content['tag_id']] = exception_buffer[content['tag_id']][100:]total_time += 1except Exception as e:logger.error(str(e))exception_time += 1if connection:connection.close()connection = get_tdengine_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={'buffer': len(buffer), 'exception_buffer': len(exception_buffer),f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','total_time': total_time,'exception_time': exception_time})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_emqx_info(self):while True:try:content = self.emqx_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])mqtt_topic_str = tag_detail.get('mqtt_topic_name')topic_list = []for topic in mqtt_topic_str.split(';'):topic_name, qos = topic.split(',')topic_list.append({'topic_name': topic_name.split(':')[1].strip().replace('\n', ''),'qos': qos.split(':')[1].strip().replace('\n', '')})payload = json.dumps({content['tag_id']: content['new_data']})for topic in topic_list:emqx_connection.publish(topic=topic['topic_name'], payload=payload, qos=int(topic['qos']))except Exception as e:logger.error(str(e))def subscribe_data_change(self):copy_raw_dict = self.raw_dict.copy()flag = Falsewhile True:d1_keys = self.raw_dict.keys()d2_keys = copy_raw_dict.keys()if _ := d1_keys - d2_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k,self.raw_dict[k],0)if _ := d2_keys - d1_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k, 0,self.raw_dict[k])commen_keys = d1_keys & d2_keysfor key in commen_keys:if copy_raw_dict[key] != self.raw_dict[key]:self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), key,copy_raw_dict[key], self.raw_dict[key])flag = Trueif flag:copy_raw_dict = self.raw_dict.copy()flag = Falsetime.sleep(0.5)def run(self):"""啟動服務:return:"""try:self.connect()interval_acqusition_task = Thread(target=self.interval_read, name='continuous_thread', args=(1,))monitor_thread_task = Thread(target=self.monitor_thread, name='monitor_thread')subscribe_thread_task = Thread(target=self.subscribe_data_change, name='subscribe_thread')for i in range(opcua_adapter_params['alarm_worker']):consumer_alarm_info_task = Thread(target=self.consumer_alarm_info, name=f'consumer_alarm_info_{i+1}')self.thread_list.append(consumer_alarm_info_task)for i in range(opcua_adapter_params['archive_worker']):consumer_archive_info_task = Thread(target=self.consumer_archive_info, name=f'consumer_archive_info_{i+1}')self.thread_list.append(consumer_archive_info_task)for i in range(opcua_adapter_params['emqx_worker']):consumer_emqx_info_task = Thread(target=self.consumer_emqx_info, name=f'consumer_emqx_info_{i+1}')self.thread_list.append(consumer_emqx_info_task)self.thread_list.append(interval_acqusition_task)self.thread_list.append(monitor_thread_task)self.thread_list.append(subscribe_thread_task)for th in self.thread_list:th.start()for th in self.thread_list:th.join()except Exception as e:logger.error(str(e))finally:get_emqx_connection.client.loop_stop()get_emqx_connection.client.disconnect()def init():conn = get_redis_connection()while True:try:device_info = json.loads(conn.get('device_info'))tag_id_to_node_id_map = json.loads(conn.get('tag_id_to_node_id_map'))tag_id_to_detail_map = json.loads(conn.get('tag_id_to_detail_map'))if device_info and tag_id_to_detail_map and tag_id_to_node_id_map:return device_info, tag_id_to_node_id_map, tag_id_to_detail_mapelse:logger.error('init error')time.sleep(3)continueexcept Exception:logger.error('Init Failed!')time.sleep(3)continuedef main():device_info, tag_id_to_node_id_map, tag_id_to_detail_map = init()opcua_adapter = OPCUAAdapter(device_info=device_info, tag_id_to_node_id_map=tag_id_to_node_id_map,tag_id_to_detail_map=tag_id_to_detail_map)opcua_adapter.run()if __name__ == '__main__':main()
數據采集邏輯
# 斷線重連機制
多進程中# 多進程模型
一個設備一個進程# 多線程模型
不同的采集頻率一個線程,一個進程中的多個線程共享一個鏈接,進程# 客戶端采用訂閱的方式# 客戶端采用輪旋的方式# 字段的含義
字段名稱 字段含義 數值
No 自增id
tag_uuid 變量的唯一ID
node_id UA node_id
interval 采集周期,以秒為單位
active 是否激活,TURE, FALSE 必須大寫
active_alarm 是否激活報警
alarm_up 報警上限
alarm_down 報警下限
alarm_up_info 報警上限值
alarm_down_info 報警下限值
alarm_up_change 達到報警上限的修改值
alarm_down_change 達到報警下限的修改值
active_archive 激活歸檔
archive_onchange 變化時歸檔 為FALSE archive_interval有效
archive_interval 輪訓歸檔周期
active_scale 激活變換
scale_sign 符號
scale_factor 因子
mqtt_topic_name
unit
comments