Python 鏈接各種中間件[Mysql\redis\mssql\tdengine]

文章目錄

  • 鏈接參數
  • 設置logger 日志
  • redis 鏈接
  • mysql 鏈接
  • emqx 鏈接
  • mssql 鏈接
  • tdengine 鏈接
  • 采集OPCUA的點表的配置信息
    • 設備
    • 點表
  • OPCUA 采集 數據程序
  • 數據采集邏輯

鏈接參數

import randomtdengine_connection_params = {'username': 'root','password': 'taosdata','host': '127.0.0.1','port': 6030,'database': 'test'
}mysql_connection_params = {'username': 'root','password': 'root','host': '127.0.0.1','port': 3306,'database': 'th'
}mssql_connection_params = {'username': 'sa','password': 'Z','host': '127.0.0.1','port': 1433,'database': 'SistarData','driver': 'ODBC+Driver+17+for+SQL+Server'
}redis_connection_params = {'host': 'localhost',  # Redis 服務器的地址'port': 6379,  # Redis 服務器的端口'db': 0,  # 使用的 Redis 數據庫編號'max_connections': 10,  # 連接池的最大連接數'decode_responses': True
}emqx_connection_params = {'broker': '127.0.0.1','port': 1883,'client_id': f'python_mqtt_client_{random.randint(1, 10)}','keep_alive_interval': 60,'password': None,'username': None,'topic_sub': None,'topic_pub': None
}logger_params = {"logger_name": "opcua_adapter_logger","total_level": 20,"file_handler_level": 10,"control_handler_level": 20,"file_name": r"E:\TH\core\basic_service\opcua_adapter\logs\opcua_adapter_logger.txt","mode": "a","max_bytes": 10485760,"backup_count": 10,"encoding": "UTF-8","format": "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
}opcua_adapter_params = {'alarm_consumer_queue_length': 100,'archive_consumer_queue_length': 100,'emqx_consumer_queue_length': 100,'acquisition_frequency':1, # 采集周期'monitor_frequency': 2, # 監控周期'alarm_table_name': 'alarm', # 報警的表格'archive_table_name': 'test', # 歸檔的數據庫名稱'emqx_worker': 1,'alarm_worker': 5,'archive_worker':5
}

設置logger 日志

import logging
from concurrent_log_handler import ConcurrentRotatingFileHandlerfrom config import logger_paramsdef get_logger(logger_name, total_level, file_name, mode, max_bytes, backup_count, encoding, file_handler_level,control_handler_level, format) -> logging.getLogger():logger = logging.getLogger(logger_name)logger.setLevel(total_level)  # 設置日志級別file_handler = ConcurrentRotatingFileHandler(filename=file_name,mode=mode,maxBytes=max_bytes,backupCount=backup_count,encoding=encoding)  # 設置輸出文件file_handler.setLevel(file_handler_level)control_handler = logging.StreamHandler()control_handler.setLevel(control_handler_level)formatter = logging.Formatter(format)file_handler.setFormatter(formatter)control_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(control_handler)return loggerlogger = get_logger(**logger_params)

redis 鏈接

import redisfrom basic_service.opcua_adapter.config import redis_connection_paramspool = redis.ConnectionPool(host=redis_connection_params['host'],port=redis_connection_params['port'],db=redis_connection_params['db'],max_connections=redis_connection_params['max_connections'],decode_responses=redis_connection_params['decode_responses'],)# 獲取連接對象
def get_redis_connection():while True:try:redis_conn = redis.Redis(connection_pool=pool)response = redis_conn.ping()if response:return redis_connelse:continueexcept Exception as e:print.error(str(e))conn = get_redis_connection()

mysql 鏈接

"""
用于連接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"mysql+pymysql://{mysql_connection_params['username']}:{mysql_connection_params['password']}@{mysql_connection_params['host']}:{mysql_connection_params['port']}/{mysql_connection_params['database']}")def get_mysql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MySQL連接建立失敗,請檢查網絡是否暢通(ping {mysql_connection_params['host']}),服務器是否正常,是否開啟遠程連接")time.sleep(3)continueget_mysql_connection()

emqx 鏈接

import paho.mqtt.client as mqtt
import timefrom basic_service.opcua_adapter.config import emqx_connection_params
from get_logger import loggerclass MQTTClient:def __init__(self, broker, port, client_id, keep_alive_interval, password=None, username=None, topic_sub=None,topic_pub=None):self.broker = brokerself.port = portself.client_id = client_idself.keep_alive_interval = keep_alive_intervalself.topic_sub = topic_subself.topic_pub = topic_pubself.password = passwordself.username = username# 創建 MQTT 客戶端self.client = mqtt.Client(client_id=self.client_id)if self.username and self.password:self.client.username_pw_set(username=self.username, password=self.password)  # 如果有需要設置用戶名密碼self.client.on_connect = self.on_connectself.client.on_disconnect = self.on_disconnectself.client.on_message = self.on_messageself.client.on_publish = self.on_publishself.connect()self.client.loop_start()def on_connect(self, client, userdata, flags, rc):if rc == 0:logger.info(f"Connected to MQTT Broker! Returned code={rc}")if self.topic_sub:client.subscribe(self.topic_sub)  # 連接成功后,訂閱主題else:logger.error(f"Failed to connect, return code {rc}")def on_disconnect(self, client, userdata, rc):logger.info("Disconnected from MQTT Broker with code: " + str(rc))# 這里可以實現重連邏輯self.reconnect()def on_message(self, client, userdata, msg):# logger.info(f"Received message: {msg.topic} -> {msg.payload.decode()}")passdef on_publish(self, client, userdata, mid):# print(f"Message {mid} has been published.")pass# 重連機制def reconnect(self):try:logger.error("Attempting to reconnect...")self.client.reconnect()  # 嘗試重連except Exception as e:logger.error(f"Reconnection failed: {e}")time.sleep(5)  # 延時后再嘗試def connect(self):try:self.client.connect(self.broker, self.port, self.keep_alive_interval)except Exception as e:logger.error(f"Connection failed: {e}")self.reconnect()get_emqx_connection = MQTTClient(**emqx_connection_params)# 主循環發布消息
# try:
#     while True:
#         message = "Hello MQTT"
#         result = mqtt_client.client.publish('/test', message, qos=0)
#         status = result[0]
#         if status == 0:
#             print(f"Sent `{message}` to topic /test")
#         else:
#             print(f"Failed to send message to topic /test")
#         time.sleep(10)  # 每10秒發布一次
# except KeyboardInterrupt:
#     print("Interrupted by user, stopping...")
# finally:
#     mqtt_client.client.loop_stop()
#     mqtt_client.client.disconnect()

mssql 鏈接

"""
用于連接tdengine
"""
import time
#from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_params, mssql_connection_paramsfrom sqlalchemy import create_engine, textengine = create_engine(f'mssql+pyodbc://sa:Z#@127.0.0.1:1433/sistarData?driver=ODBC+Driver+17+for+SQL+Server')def get_mssql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MSSQL連接建立失敗,請檢查網絡是否暢通(ping {mssql_connection_params['host']}),服務器是否正常,是否開啟遠程連接")time.sleep(3)continueconn = get_mssql_connection()result = conn.execute(text("SELECT TABLE_NAME,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'sistar_eng_areas';"))print(result.fetchall())

tdengine 鏈接

"""
用于連接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import tdengine_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"taos://{tdengine_connection_params['username']}:{tdengine_connection_params['password']}@{tdengine_connection_params['host']}:{tdengine_connection_params['port']}/{tdengine_connection_params['database']}")def get_tdengine_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"TDEngine連接建立失敗,請檢查網絡是否暢通(ping {tdengine_connection_params['host']}),服務器是否正常,是否開啟遠程連接,是否安裝與服務器相同版本的客戶端")time.sleep(3)continue

采集OPCUA的點表的配置信息

設備

[{"No.": 1,"device_name": "device1","url": "opc.tcp:\/\/192.168.10.132:4862"},{"No.": 2,"device_name": "device2","url": "opc.tcp:\/\/192.168.10.132:4863"}
]

點表

[{"No": 1,"tag_uuid": "tag0001","node_id": "ns=1;s=t|tag1","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "add","scale_factor": 1,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea61"},{"No": 2,"tag_uuid": "tag0002","node_id": "ns=1;s=t|tag2","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "sub","scale_factor": 2,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea62"}]

在這里插入圖片描述

在這里插入圖片描述

用戶可以支持從excel導入數據。

從excel 上傳數據到 系統中

"""
用于初始化,創建數據庫,創建數據庫表
"""
import randomimport pandas as pd
from sqlalchemy import text
from connectors.tdengine import get_tdengine_connection
from get_logger import loggerxlsx = r'E:\TH\core\basic_service\opcua_adapter\static\opcua_template.xlsx'connection = get_tdengine_connection()db_name = 'test'
stable = 'meters'df = pd.read_excel(xlsx, sheet_name='device1')tag_uuid = df.loc[:, ['tag_uuid', 'unit']]create_table_sql = ''for item in tag_uuid.values:_sql = f"""CREATE TABLE IF NOT EXISTS {db_name}.{item[0]} USING {db_name}.{stable} TAGS ('{item[1]}', '{random.randint(1, 10)}');"""create_table_sql += _sqlcreate_db_stable_sql = f"""CREATE DATABASE IF NOT EXISTS {db_name} KEEP 3650;
use {db_name};
CREATE STABLE IF NOT EXISTS {db_name}.{stable} (ts TIMESTAMP, val FLOAT) TAGS ( unit BINARY(20),   s_type BINARY(20));
"""
total_sql = create_db_stable_sql + create_table_sql
print(total_sql)
try:connection.execute(text(total_sql))connection.commit()logger.info('init success')
except Exception as e:print(str(e))connection.rollback()logger.error('init failed')

OPCUA 采集 數據程序

import datetime
import json
import os
import queue
import threading
import time
from threading import Thread
from typing import List, Any
import opcua.client.client
import psutil as psutil
from opcua.client.client import Client
from opcua.common.node import Node
from opcua.ua import UaStatusCodeError
from config import opcua_adapter_params
from connectors._redis import get_redis_connection
from connectors.emqx import get_emqx_connection
from connectors.mysql import get_mysql_connection
from connectors.tdengine import get_tdengine_connection
from get_logger import logger
from sqlalchemy import textemqx_connection = get_emqx_connection.clientclass OPCUAAdapter(object):"""OPCUA數據采集類"""def __init__(self, device_info, tag_id_to_node_id_map, tag_id_to_detail_map, use_subscribe=True):self.url: str = device_info.get('device_url')self._ua: opcua.client.client.Client = Client(url=self.url, timeout=5)self.connected: bool = Falseself.thread_list: List[Thread] = []self.raw_dict = {}self.tag_id_to_node_id_map = tag_id_to_node_id_mapself.tag_id_to_detail_map = tag_id_to_detail_mapself.device_info = device_infoself.alarm_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['alarm_consumer_queue_length'])self.archive_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['archive_consumer_queue_length'])self.emqx_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['emqx_consumer_queue_length'])def connect(self):"""連接函數:return:"""try:if self.connected:returnelse:self._ua.connect()self.connected = Truelogger.info(f"初次連接{self.url}成功!")returnexcept Exception as e:self.disconnect()self.connected = Falselogger.error("初次連接失敗,失敗原因:", e)# 開始重連self.reconnect()def disconnect(self):"""斷開連接:return:"""try:if self._ua and self.connected:self._ua.disconnect()self.connected = Falselogger.info("主動斷開連接成功")except Exception as e:logger.error("主動斷開連接失敗,失敗原因:", str(e))def reconnect(self):"""重連:return:"""index = 0while True:try:self._ua.connect()self.connected = Trueindex = 0logger.info(f"重連{self.url}成功!")returnexcept AttributeError as e:index += 1logger.error(f"第{index}次重連失敗,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept ConnectionRefusedError as e:index += 1logger.error(f"第{index}次重連失敗,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept OSError as e:index += 1logger.error(f"與OPCUA服務器未能建立連接,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept Exception as e:index += 1logger.error(f"第{index}次重連失敗,失敗原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continuedef interval_read(self, interval: int) -> None:"""按照采集頻率定時去采集:param interval::return:"""connection = get_redis_connection()thread_name = threading.current_thread().namenodes = []while True:# 每分鐘采集多少次,采集超時多少次,采集node數量,多少個node是Nonestart_time = time.time()if not self.connected:# 如果沒有連接成功,開啟重連self.reconnect()else:try:nodes_str_list = self.tag_id_to_node_id_map.keys()nodes = [self._ua.get_node(node) for node in nodes_str_list]values = self._ua.get_values(nodes)self.raw_dict = dict(zip(nodes_str_list, values))except AttributeError as e:logger.error(f"屬性讀取錯誤:{str(e)}!")except TimeoutError:logger.error(f"接收服務端報文超時")except ConnectionRefusedError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except ConnectionAbortedError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except UaStatusCodeError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except OSError as e:self.disconnect()self.reconnect()logger.error(f"數據獲取失敗,失敗原因:{str(e)}!")except RuntimeError as e:self.disconnect()self.reconnect()logger.error(f'運行錯誤,失敗原因:{str(e)}')except Exception as e:self.disconnect()self.reconnect()logger.error(f"未捕獲到的異常:{str(e)}")finally:end_time = time.time()try:connection.hmset('performance',mapping={'nodes': len(nodes), f'{thread_name}_use_time': f'{(end_time - start_time):.2f}'})except Exception:if connection:connection.close()connection = get_redis_connection()time.sleep(interval)def node_write(self, nodes: List[Node], values: List[Any]):"""寫入node:param nodes::param values::return:"""try:self._ua.set_values(nodes, values)except Exception as e:logger.error(f"數據寫入失敗,失敗原因:{str(e)}")def monitor_thread(self):"""監視線程:return:"""redis_connection = get_redis_connection()while True:try:current_process_id = os.getpid()process = psutil.Process(current_process_id)# 獲取進程的基本信息process_name = process.name()cpu_usage = process.cpu_percent(interval=1)  # 進程的 CPU 使用率,間隔 1 秒memory_info = process.memory_info()  # 進程的內存使用情況io_counters = process.io_counters()  # 進程的 IO 計數disk_usage = psutil.disk_usage('/')  # 獲取根目錄的磁盤使用情況thread_list = []for thread in threading.enumerate():thread_list.append((thread.ident, thread.name, thread.is_alive()))if thread.name == 'continuous_thread' and thread.is_alive() == False:logger.error(f'讀取線程出錯,請盡快聯系管理員處理!')redis_connection.hmset(name='performance',mapping={'process_name': process_name, 'cpu_usage': cpu_usage,'memory_info_RSS': f'{memory_info.rss / (1024 * 1024):.2f} MB','memory_info_VMS': f'{memory_info.vms / (1024 * 1024): .2f} MB','io_read': f"{io_counters.read_bytes / (1024 * 1024):.2f} MB",'io_write': f"{io_counters.write_bytes / (1024 * 1024):.2f} MB",'disk_usage': f'{disk_usage.percent}%','threads': json.dumps(thread_list),'pid': current_process_id,'archive_consumer_length': self.archive_consumer_queue.qsize(),'alarm_consumer_length': self.alarm_consumer_queue.qsize(),'emqx_consumer_length': self.emqx_consumer_queue.qsize(),})except Exception as e:logger.error(f'監控子線程出錯:{str(e)}')if redis_connection:redis_connection.close()redis_connection = get_redis_connection()finally:time.sleep(int(opcua_adapter_params['monitor_frequency']))def change_data_notifier(self, timestamp, node_id, new_data, old_data):""":param timestamp::param node::param new_data::param old_data::return:"""try:tag_id = self.tag_id_to_node_id_map[node_id]except KeyError:passelse:content = {'timestamp': timestamp,'tag_id': tag_id,'new_data': new_data,'old_data': old_data}self.alarm_consumer_queue.put(content)self.emqx_consumer_queue.put(content)self.archive_consumer_queue.put(content)def consumer_alarm_info(self):"""處理報警信息:return:"""redis_connection = get_redis_connection()alarm_table_name = opcua_adapter_params['alarm_table_name']connection = get_mysql_connection()thread_name = threading.current_thread().namealarm_count = 0while True:try:start_time = time.time()content = self.alarm_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])active_alarm = tag_detail.get('active_alarm')if active_alarm:up_limit = tag_detail.get('alarm_up')down_limit = tag_detail.get('alarm_down')if float(content['new_data']) > float(up_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_up_info")}", "{tag_detail.get("alarm_up")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'數據插入錯誤,錯誤原因:{str(e)}!')connection.rollback()elif float(content['new_data']) < float(down_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_down_info")}", "{tag_detail.get("alarm_down")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'數據插入錯誤,錯誤原因:{str(e)}!')connection.rollback()except Exception as e:logger.error(str(e))if connection:connection.close()connection = get_mysql_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','alarm_count': alarm_count})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_archive_info(self):thread_name = threading.current_thread().nameredis_connection = get_redis_connection()connection = get_tdengine_connection()exception_buffer = {}buffer = {}db_name = opcua_adapter_params['archive_table_name']exception_time = 0total_time = 0for k in self.tag_id_to_node_id_map.values():buffer.setdefault(k, [])# 異常buffer,當數據沒有被正常插入時,數據不被丟棄,放到異常隊列中,一旦恢復了連接,先將異常隊列中的數據恢復exception_buffer.setdefault(k, [])while True:try:start_time = time.time()for k, v in exception_buffer.items():if len(v) > 0:sql = f"""INSERT INTO {db_name}.{k} VALUES {str(v).replace('[', '').replace(']', '')}"""try:connection.execute(sql)connection.commit()exception_buffer.setdefault(k, [])except Exception:exception_time += 1connection.rollback()content = self.archive_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])if tag_detail.get('active_archive'):timestamp = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')try:data = float(content['new_data'])except ValueError:data = 0.0except TypeError:data = 0.0except Exception:data = 0.0if len(buffer[content['tag_id']]) < 100:buffer[content['tag_id']].append((timestamp, data))else:sql = f"""INSERT INTO {db_name}.{content['tag_id']} VALUES {str(buffer[content["tag_id"]]).replace('[', '').replace(']', '')};"""try:connection.execute(text(sql))connection.commit()buffer[content['tag_id']] = []except Exception:logger.error(f'insert error:{sql}')connection.rollback()exception_time += 1if len(exception_buffer) < 10000:exception_buffer[content['tag_id']].extend(buffer[content['tag_id']])buffer[content['tag_id']] = []else:# 如果超過設定的緩存值滯后,將舊值丟棄掉exception_buffer[content['tag_id']] = exception_buffer[content['tag_id']][100:]total_time += 1except Exception as e:logger.error(str(e))exception_time += 1if connection:connection.close()connection = get_tdengine_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={'buffer': len(buffer), 'exception_buffer': len(exception_buffer),f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','total_time': total_time,'exception_time': exception_time})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_emqx_info(self):while True:try:content = self.emqx_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])mqtt_topic_str = tag_detail.get('mqtt_topic_name')topic_list = []for topic in mqtt_topic_str.split(';'):topic_name, qos = topic.split(',')topic_list.append({'topic_name': topic_name.split(':')[1].strip().replace('\n', ''),'qos': qos.split(':')[1].strip().replace('\n', '')})payload = json.dumps({content['tag_id']: content['new_data']})for topic in topic_list:emqx_connection.publish(topic=topic['topic_name'], payload=payload, qos=int(topic['qos']))except Exception as e:logger.error(str(e))def subscribe_data_change(self):copy_raw_dict = self.raw_dict.copy()flag = Falsewhile True:d1_keys = self.raw_dict.keys()d2_keys = copy_raw_dict.keys()if _ := d1_keys - d2_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k,self.raw_dict[k],0)if _ := d2_keys - d1_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k, 0,self.raw_dict[k])commen_keys = d1_keys & d2_keysfor key in commen_keys:if copy_raw_dict[key] != self.raw_dict[key]:self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), key,copy_raw_dict[key], self.raw_dict[key])flag = Trueif flag:copy_raw_dict = self.raw_dict.copy()flag = Falsetime.sleep(0.5)def run(self):"""啟動服務:return:"""try:self.connect()interval_acqusition_task = Thread(target=self.interval_read, name='continuous_thread', args=(1,))monitor_thread_task = Thread(target=self.monitor_thread, name='monitor_thread')subscribe_thread_task = Thread(target=self.subscribe_data_change, name='subscribe_thread')for i in range(opcua_adapter_params['alarm_worker']):consumer_alarm_info_task = Thread(target=self.consumer_alarm_info, name=f'consumer_alarm_info_{i+1}')self.thread_list.append(consumer_alarm_info_task)for i in range(opcua_adapter_params['archive_worker']):consumer_archive_info_task = Thread(target=self.consumer_archive_info, name=f'consumer_archive_info_{i+1}')self.thread_list.append(consumer_archive_info_task)for i in range(opcua_adapter_params['emqx_worker']):consumer_emqx_info_task = Thread(target=self.consumer_emqx_info, name=f'consumer_emqx_info_{i+1}')self.thread_list.append(consumer_emqx_info_task)self.thread_list.append(interval_acqusition_task)self.thread_list.append(monitor_thread_task)self.thread_list.append(subscribe_thread_task)for th in self.thread_list:th.start()for th in self.thread_list:th.join()except Exception as e:logger.error(str(e))finally:get_emqx_connection.client.loop_stop()get_emqx_connection.client.disconnect()def init():conn = get_redis_connection()while True:try:device_info = json.loads(conn.get('device_info'))tag_id_to_node_id_map = json.loads(conn.get('tag_id_to_node_id_map'))tag_id_to_detail_map = json.loads(conn.get('tag_id_to_detail_map'))if device_info and tag_id_to_detail_map and tag_id_to_node_id_map:return device_info, tag_id_to_node_id_map, tag_id_to_detail_mapelse:logger.error('init error')time.sleep(3)continueexcept Exception:logger.error('Init Failed!')time.sleep(3)continuedef main():device_info, tag_id_to_node_id_map, tag_id_to_detail_map = init()opcua_adapter = OPCUAAdapter(device_info=device_info, tag_id_to_node_id_map=tag_id_to_node_id_map,tag_id_to_detail_map=tag_id_to_detail_map)opcua_adapter.run()if __name__ == '__main__':main()

數據采集邏輯

# 斷線重連機制
多進程中# 多進程模型
一個設備一個進程# 多線程模型
不同的采集頻率一個線程,一個進程中的多個線程共享一個鏈接,進程# 客戶端采用訂閱的方式# 客戶端采用輪旋的方式# 字段的含義
字段名稱                字段含義            數值
No                     自增id
tag_uuid	           變量的唯一ID
node_id	               UA node_id
interval	           采集周期,以秒為單位
active	               是否激活,TURE, FALSE 必須大寫
active_alarm           是否激活報警
alarm_up               報警上限
alarm_down             報警下限
alarm_up_info          報警上限值
alarm_down_info        報警下限值
alarm_up_change        達到報警上限的修改值
alarm_down_change      達到報警下限的修改值
active_archive         激活歸檔
archive_onchange       變化時歸檔 為FALSE archive_interval有效
archive_interval       輪訓歸檔周期
active_scale           激活變換
scale_sign             符號
scale_factor           因子
mqtt_topic_name
unit
comments

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916228.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916228.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916228.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C Primer Plus 第6版 編程練習——第11章(上)

本章共16題&#xff0c;分上中下三篇1.設計并測試一個函數&#xff0c;從輸入中獲取n個字符&#xff08;包括空白、制表符、換行符)&#xff0c;把結果存儲在一個數組里&#xff0c;它的地址被傳遞作為一個參數。int get_n_char(char arr[], int n) {int i 0;char ch;while (i…

Java開發崗面試記錄合集

一、Java 核心1. 基礎語法final關鍵字的作用修飾類&#xff1a;類不可被繼承&#xff08;如String類&#xff09;&#xff0c;保證類的穩定性和安全性。修飾方法&#xff1a;方法不可被重寫&#xff08;防止子類篡改父類核心邏輯&#xff0c;如工具類方法&#xff09;。修飾變量…

Linux 系統時間設置(date 和 ntpdate)-linux028

date 命令&#xff1a;查看或設置系統時間1. 查看當前時間date示例輸出&#xff1a;Tue Mar 4 01:36:45 CST 20142. 設置時間&#xff08;不設置日期&#xff09;date -s 09:38:40設置后輸出&#xff1a;Tue Mar 4 09:38:40 CST 20143. 設置完整日期和時間&#xff08;推薦格…

iOS上使用WebRTC推拉流的案例

一、庫集成 首先&#xff0c;確保在你的 Podfile 中添加依賴&#xff1a; pod GoogleWebRTC然后執行 pod install 安裝庫。 二、代碼示例 2.1、權限配置&#xff1a;在 Info.plist 中添加攝像頭、麥克風權限 <!-- 需要在 Info.plist 中添加以下權限 --> <key>NSCam…

API: return response as HTML table

想要把response table變成HTML的table&#xff0c;即想達到下面這種的話<table boarder"1" style"width:100%; boarder-collapse: collapse; text-align:left"><tr><th>Customer</th><th>Date</th><th>Debit Am…

OneNote 當前無法同步筆記。將繼續嘗試。 (錯誤代碼: 0xE00009C8 bb0ur)問題解決

之前因為同步錯誤&#xff0c;導致OneNote一個筆記本內容全部消失&#xff0c;筆記本內容如下圖同步狀態和錯誤如下&#xff1a;提醒錯誤為&#xff1a;OneNote 當前無法同步筆記。將繼續嘗試。 (錯誤代碼: 0xE00009C8 bb0ur)當時心態有點崩&#xff0c;也是查了好些資料&#…

OneCode3.0 Gallery 組件前后端映射機制:從注解配置到前端渲染的完整鏈路

一、注解體系與前端組件的映射基礎 ? OneCode Gallery 組件實現了 Java 注解與前端 UI 組件的深度綁定&#xff0c;通過GalleryAnnotation、GalleryItemAnnotation和GalleryViewAnnotation三個核心注解&#xff0c;構建了從后端配置到前端渲染的完整鏈路。這種映射機制的核心價…

規則分配腳本

需求&#xff1a; 1.根據用戶編寫的要報規則,去mysql庫里SysManage_Rule表獲取已經啟用的規則作為條件&#xff08;例如[{“field”: “關鍵詞”, “logic”: “AND”, “value”: “阿爾法”, “operator”: “”&#xff0c;, “assign_user”: “user222”}]&#xff09;條…

SEO實戰派白楊SEO:SEO中說的框計算、知心搜索(知識圖譜)是什么?有什么用處?

SEO里框計算是什么&#xff1f;有什么用處&#xff1f;SEO里框計劃算是百度2010年提出的&#xff0c;指當用戶搜索某些關鍵詞查詢時&#xff0c;搜索引擎在結果頁直接展示答案的技術&#xff08;如天氣、匯率等&#xff09;&#xff0c;用戶無需點擊網站即可獲取信息&#xff0…

軟件工程:軟件需求

簡介本篇博客記錄了我在軟件工程學習過程中關于軟件需求與面向對象基礎知識的學習體會和要點總結。博客共分為三個關卡內容&#xff1a;第1關圍繞“軟件需求”的定義、分類及分析過程展開&#xff0c;讓我清晰地理解了功能性需求、非功能性需求與約束條件的區別&#xff1b;第2…

MES系統是什么,有哪些特性?

MES系統是一套面向制造企業車間執行層的生產信息化管理系統。它能夠為操作人員和管理人員提供計劃的執行、跟蹤以及所有資源&#xff08;包括人、設備、物料、客戶需求等&#xff09;的當前狀態。通過MES系統可以對從訂單下達到產品完成的整個生產過程進行優化管理。當工廠發生…

Vue2下

六&#xff1a;vue-router &#xff08;重要&#xff09; &#xff08;一&#xff09;. 對路由的理解 1.什么是路由 路由&#xff08;Router&#xff09; 是管理頁面跳轉和 URL 與視圖映射關系的機制&#xff0c;核心作用是&#xff1a;根據不同的 URL 路徑&#xff0c;展示對…

在 Windows 上安裝設置 MongoDB及常見問題

介紹 MongoDB 是一個開源的 NoSQL 數據庫系統&#xff0c;它以一種靈活的類似 JSON 的格式&#xff08;稱為 BSON&#xff08;二進制 JSON&#xff09;&#xff09;存儲數據。它使用動態模式&#xff0c;這意味著與關系型數據庫不同&#xff0c;MongoDB 不需要在向數據庫添加數…

Effective C++ 條款01:視 C++ 為一個語言聯邦

Effective C 條款01&#xff1a;視 C 為一個語言聯邦核心思想&#xff1a;C 是由多個子語言組成的聯邦&#xff0c;每個子語言有自己的編程范式。理解這些子語言及其規則切換&#xff0c;是寫出高效 C 代碼的關鍵。 四個子語言及其規則&#xff1a; C 語言 基礎&#xff1a;過程…

云效CI/CD教程(PHP項目)

參考文檔 參考云效的官方文檔https://help.aliyun.com/zh/yunxiao/ 一、新建代碼庫 這是第一步&#xff0c;和碼云的差不多 二、配SSH密鑰 這個和碼云&#xff0c;github上類似&#xff0c;都需要&#xff0c;云效的SSH密鑰證書不是采用 RSA算法&#xff0c;而是采用了ED2…

單片機是怎么控制的

單片機作為電子系統的控制核心&#xff0c;通過接收外部信號、執行預設程序、驅動外部設備的方式實現控制功能&#xff0c;其控制過程涉及信號輸入、數據處理和指令輸出三個關鍵環節&#xff0c;每個環節的協同配合決定了整體控制效果。 信號輸入&#xff1a;獲取外部信息 單片…

deepseek本地部署,輕松實現編程自由

小伙伴們&#xff0c;大家好&#xff0c;今天我們來實現deepseek本地部署&#xff0c;輕松實現編程自由&#xff01;安裝ollama 安裝ollama 首先我們安裝ollama 打開ollama官網&#xff0c;下載安裝符合自己系統的版本。 找到要安裝的模型deepseek-r1開始-運行 輸入cmd出現…

基礎NLP | 常用工具

編輯器 PycharmVSCodeSpyderPython 自帶 ideVim 機器學習相關python框架 Pytorch 學術界寵兒&#xff0c;調試方便&#xff0c;目前的主流Tensorflow 大名鼎鼎&#xff0c;工程配套完善Keras 高級封裝&#xff0c;簡單好用&#xff0c;現已和Tensorflow合體Gensim 訓練詞向…

Unity3D + VR頭顯 × RTSP|RTMP播放器:構建沉浸式遠程診療系統的技術實踐

一、背景&#xff1a;遠程醫療邁入“沉浸式協同”的新階段 過去&#xff0c;遠程醫療主要依賴視頻會議系統&#xff0c;實現基礎的遠程問診、會診或術中指導。雖然初步解決了地域限制問題&#xff0c;但其單視角、平面化、缺乏沉浸感與交互性的特征&#xff0c;已無法滿足臨床…

海云安斬獲“智能金融創新應用“標桿案例 彰顯AI安全左移技術創新實力

近日&#xff0c;由中國人民銀行廣東省分行、廣東省金融管理局、廣東省政務服務和數據管理局指導&#xff0c;廣東省金融科技協會主辦的“智能金融 創新應用”優秀案例名單最終揭曉&#xff0c;海云安開發者安全助手系統項目憑借其創新的"AI安全左移"技術架構&#x…