文章目錄
- 🌴通訊支持
- 🌴 功能完成情況
- 服務端架構設計
- 一、核心模塊劃分
- 二、數據層定義
- 三、協議解析層
- 四、通信業務層(以DLT645服務端為例)
- 五、通信層(以TCP為例)
- 使用例子
🌴通訊支持
功能 | 狀態 |
---|---|
TCP客戶端(方便通訊測試) 🐾 | ? |
TCP服務端(方便通訊測試) 🐾 | ? |
RTU主站 🐾 | ? |
RTU從站 🐾 | ? |
🌴 功能完成情況
功能 | 狀態 |
---|---|
讀、寫通訊地址 🐾 | ? |
廣播校時 🐾 | ? |
電能量 🐾 | ? |
最大需量及發生時間 🐾 | ? |
變量 🐾 | ? |
事件記錄 🐾 | ? |
參變量 🐾 | ? |
凍結量 🐾 | ? |
負荷紀錄 🐾 | ? |
項目地址:https://gitee.com/chen-dongyu123/dlt645
服務端架構設計
一、核心模塊劃分
- 數據層:模擬電表數據,存儲數據標識與值的映射關系。
- 協議解析層:負責幀的組裝、拆卸、校驗和轉義。
- 業務邏輯層:根據解析出的控制碼和DI,執行相應操作并組織回復數據。
- 通信層:負責底層的字節流收發(串口/TCP)。
代碼結構
├── config # 測點json,用于初始化導入
├── src # 源文件夾
│ ├── common # 存放通用函數
│ ├── model # 數據模型
│ │ ├── data
│ │ │ └── define
│ │ └── types # dlt645數據類型
│ ├── protocol # 協議解析層
│ ├── service
│ │ ├── clientsvc # dlt645客戶端api及實現
│ │ └── serversvc # dlt645服務端api及實現
│ └── transport
│ ├── client # 客戶端通訊接口,支持TCP客戶端、RTU主站
│ └── server # 服務端通訊接口,支持TCP服務端、RTU從站
└── test # 測試文件
流程圖:啟動 -> 監聽連接 -> 接收數據 -> 解析幀 -> 處理請求 -> 組織回復幀 -> 發送數據
二、數據層定義
-
通過讀取config文件夾里的json導入測點,json示例如下
[{"Di": "02010100","Name": "A相電壓","Unit": "V","DataFormat": "XXX.X"},{"Di": "02010200","Name": "B相電壓","Unit": "V","DataFormat": "XXX.X"},... ]
-
定義數據類型和映射map
class DataItem:def __init__(self, di: int, name: str, data_format: str, value: float = 0, unit: str = '', timestamp: int = 0):self.di = diself.name = nameself.data_format = data_formatself.value = valueself.unit = unitself.timestamp = timestampdef __repr__(self):return (f"DataItem(name={self.name}, di={format(self.di, '#x')}, value={self.value}, "f"unit={self.unit},data_format={self.data_format}, timestamp={self.timestamp})")DIMap: Dict[int, DataItem] = {}
-
定義設置DataItem值接口和獲取DataItem接口
def get_data_item(di: int) -> Optional[DataItem]:"""根據 di 獲取數據項"""item = DIMap.get(di)if item is None:log.info(f"未通過di {hex(di)} 找到映射")return Nonereturn itemdef set_data_item(di: int, data: Any) -> bool:"""設置指定 di 的數據項"""if data is None:log.info("data is nil")return Falseif di in DIMap:DIMap[di].value = datalog.info(f"設置數據項 {hex(di)} 成功, 值 {DIMap[di]}")return Truereturn False
-
初始化數據標識map
def init_variable_def(VariableTypes: List[DataItem]):for date_type in VariableTypes:DIMap[date_type.di] = DataItem(di=date_type.di,name=date_type.name,data_format=date_type.data_format,unit=date_type.unit)
-
初始化DLT645相關測點數據
EnergyTypes = [] DemandTypes = [] VariableTypes = []def init():global EnergyTypesEnergyTypes = initDataTypeFromJson(os.path.join(conf_path, 'energy_types.json'))DemandTypes = initDataTypeFromJson(os.path.join(conf_path, 'demand_types.json'))VariableTypes = initDataTypeFromJson(os.path.join(conf_path, 'variable_types.json'))init_energy_def(EnergyTypes)init_demand_def(DemandTypes)init_variable_def(VariableTypes)# 執行初始化 init()
三、協議解析層
-
幀類型
# 常量定義 FRAME_START_BYTE = 0x68 FRAME_END_BYTE = 0x16 BROADCAST_ADDR = 0xAAclass Frame:def __init__(self, preamble: List[int] = None, start_flag: int = 0, addr: List[int] = None,ctrl_code: int = 0, data_len: int = 0, data: List[int] = None,check_sum: int = 0, end_flag: int = 0):self.preamble = preamble if preamble is not None else [] # 前導字節self.start_flag = start_flag # 起始字節self.addr = addr if addr is not None else [0] * 6 # 地址字節self.ctrl_code = ctrl_code # 控制字節self.data_len = data_len # 數據長度字節self.data = data if data is not None else [] # 數據字節self.check_sum = check_sum # 校驗字節self.end_flag = end_flag # 結束字節
-
協議解析
-
數據域解碼
@classmethoddef decode_data(cls, data: bytes) -> bytes:"""數據域解碼(±33H轉換)"""return bytes([b - 0x33 for b in data])
-
數據域編碼
@classmethoddef encode_data(cls, data: bytes) -> bytes:"""數據域編碼"""return bytes([b + 0x33 for b in data])
-
計算校驗和
@classmethoddef calculate_checksum(cls, data: bytes) -> int:"""校驗和計算(模256求和)"""return sum(data) % 256
-
構建幀
@classmethoddef build_frame(cls, addr: bytes, ctrl_code: int, data: bytes) -> bytearray:"""幀構建(支持廣播和單播)"""if len(addr) != 6:raise ValueError("地址長度必須為6字節")buf = []buf.append(FRAME_START_BYTE)buf.extend(addr)buf.append(FRAME_START_BYTE)buf.append(ctrl_code)# 數據域編碼encoded_data = DLT645Protocol.encode_data(data)buf.append(len(encoded_data))buf.extend(encoded_data)# 計算校驗和check_sum = DLT645Protocol.calculate_checksum(bytes(buf))buf.append(check_sum)buf.append(FRAME_END_BYTE)return bytearray(buf)
-
字節數組反序列化Frame結構體
@classmethoddef deserialize(cls, raw: bytes) -> Optional[Frame]:"""將字節切片反序列化為 Frame 結構體"""# 基礎校驗if len(raw) < 12:raise Exception(f"frame too short: {raw}")# 幀邊界檢查(需考慮前導FE)try:start_idx = raw.index(FRAME_START_BYTE)except ValueError:log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx == -1 or start_idx + 10 >= len(raw):log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx + 7 >= len(raw) or raw[start_idx + 7] != FRAME_START_BYTE:log.error(f"missing second start flag: {raw}")raise Exception("missing second start flag")# 構建幀結構frame = Frame()frame.start_flag = raw[start_idx]frame.addr = raw[start_idx + 1:start_idx + 7]frame.ctrl_code = raw[start_idx + 8]frame.data_len = raw[start_idx + 9]# 數據域提取(嚴格按協議1.2.5節處理)data_start = start_idx + 10data_end = data_start + frame.data_lenif data_end > len(raw) - 2:log.error(f"invalid data length {frame.data_len}")raise Exception(f"invalid data length {frame.data_len}")# 數據域解碼(需處理加33H/減33H)frame.data = DLT645Protocol.decode_data(raw[data_start:data_end])# 校驗和驗證(從第一個68H到校驗碼前)checksum_start = start_idxchecksum_end = data_endif checksum_end >= len(raw):log.error(f"frame truncated: {raw}")raise Exception(f"frame truncated: {raw}")calculated_sum = DLT645Protocol.calculate_checksum(raw[checksum_start:checksum_end])if calculated_sum != raw[checksum_end]:log.error(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")raise Exception(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")# 結束符驗證if checksum_end + 1 >= len(raw) or raw[checksum_end + 1] != FRAME_END_BYTE:log.error(f"invalid end flag: {raw[checksum_end + 1]}")raise Exception(f"invalid end flag: {raw[checksum_end + 1]}")# 轉換為帶縮進的JSONlog.info(f"frame: {frame}")return frame
-
Frame 結構體序列化為字節數組
@classmethoddef serialize(cls, frame: Frame) -> Optional[bytes]:"""將 Frame 結構體序列化為字節切片"""if frame.start_flag != FRAME_START_BYTE or frame.end_flag != FRAME_END_BYTE:log.error(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")raise Exception(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")buf = []# 寫入前導字節buf.extend(frame.preamble)# 寫入起始符buf.append(frame.start_flag)# 寫入地址buf.extend(frame.addr)# 寫入第二個起始符buf.append(frame.start_flag)# 寫入控制碼buf.append(frame.ctrl_code)# 數據域編碼encoded_data = DLT645Protocol.encode_data(frame.data)# 寫入數據長度buf.append(len(encoded_data))# 寫入編碼后的數據buf.extend(encoded_data)# 計算并寫入校驗和check_sum = DLT645Protocol.calculate_checksum(bytearray(buf))buf.append(check_sum)# 寫入結束符buf.append(frame.end_flag)return bytearray(buf)
四、通信業務層(以DLT645服務端為例)
-
定義電表Service
class MeterServerService:def __init__(self,server: Union[TcpServer, RtuServer],address: bytearray = bytearray([0x00] * 6),password: bytearray = bytearray([0x00] * 4)):self.server = server self.address = addressself.password = password
-
設置值接口,以電能為例
# 寫電能量def set_00(self, di: int, value: float) -> bool:ok = set_data_item(di, value)if not ok:log.error(f"寫電能量失敗")return ok
-
處理數據函數
# 處理讀數據請求(協議與業務分離)def handle_request(self, frame):# 1. 驗證設備if not self.validate_device(frame.addr):log.info(f"驗證設備地址: {bytes_to_spaced_hex(frame.addr)} 失敗")raise Exception("unauthorized device")# 2. 根據控制碼判斷請求類型if frame.ctrl_code == CtrlCode.BroadcastTimeSync: # 廣播校時log.info(f"廣播校時: {frame.Data.hex(' ')}")self.set_time(frame.Data)return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, frame.data)elif frame.ctrl_code == CtrlCode.CtrlReadData:# 解析數據標識di = frame.datadi3 = di[3]if di3 == 0x00: # 讀取電能# 構建響應幀res_data = bytearray(8)# 解析數據標識為 32 位無符號整數data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4] # 僅復制前 4 字節數據標識value = data_item.value# 轉換為 BCD 碼bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x01: # 讀取最大需量及發生時間res_data = bytearray(12)data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4] # 返回數據標識value = data_item.value# 轉換為 BCD 碼bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:7] = bcd_value[:3]# 需量發生時間res_data[7:12] = time_to_bcd(time.time())log.info(f"讀取最大需量及發生時間: {res_data}")return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x02: # 讀變量data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")# 變量數據長度data_len = 4data_len += (len(data_item.data_format) - 1) // 2 # (數據格式長度 - 1 位小數點)/2# 構建響應幀res_data = bytearray(data_len)res_data[:4] = frame.data[:4] # 僅復制前 4 字節value = data_item.value# 轉換為 BCD 碼(小端序)bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:data_len] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))else:log.info(f"unknown: {hex(di3)}")return Exception("unknown di3")elif frame.ctrl_code == CtrlCode.ReadAddress:# 構建響應幀res_data = self.address[:6]return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)elif frame.ctrl_code == CtrlCode.WriteAddress:res_data = b'' # 寫通訊地址不需要返回數據# 解析數據addr = frame.data[:6]self.set_address(addr) # 設置通訊地址return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)else:log.info(f"unknown control code: {hex(frame.ctrl_code)}")raise Exception("unknown control code")
-
注入通訊協議到Service
def new_tcp_server(ip: str, port: int, timeout: int) -> MeterServerService:# 1. 先創建 TcpServer(不依賴 Service)tcp_server = TcpServer(ip, port, timeout, None)# 2. 創建 MeterServerService,注入 TcpServer(作為 Server 接口)meter_service = MeterServerService(tcp_server)# 3. 將 MeterServerService 注入回 TcpServertcp_server.service = meter_servicereturn meter_servicedef new_rtu_server(port: str, dataBits: int, stopBits: int, baudRate: int, parity: str,timeout: float) -> MeterServerService:rtu_server = RtuServer(port, dataBits, stopBits, baudRate, parity, timeout)# 2. 創建 MeterServerService,注入 RtuServer(作為 Server 接口)meter_service = MeterServerService(rtu_server)# 3. 將 MeterServerService 注入回 RtuServerrtu_server.service = meter_servicereturn meter_service
五、通信層(以TCP為例)
-
定義TCP服務端
class TcpServer:def __init__(self, ip: str, port: int, timeout: float, service):self.ip = ipself.port = portself.timeout = timeoutself.ln = Noneself.service = service # Dlt645業務
-
處理數據
def handle_connection(self, conn):try:while True:try:# 接收數據buf = conn.recv(256)if not buf:breaklog.info(f"Received data: {bytes_to_spaced_hex(buf)}")# 協議解析try:frame = DLT645Protocol.deserialize(buf)except Exception as e:log.error(f"Error parsing frame: {e}")continue# 業務處理try:resp = self.service.handle_request(frame)except Exception as e:log.error(f"Error handling request: {e}")continue# 響應if resp:try:conn.sendall(resp)log.info(f"Sent response: {bytes_to_spaced_hex(resp)}")except Exception as e:log.error(f"Error writing response: {e}")except socket.timeout:breakexcept Exception as e:log.error(f"Error handling connection: {e}")finally:try:conn.close()except Exception as e:log.error(f"Error closing connection: {e}")
-
啟動服務端
def start(self):try:# 創建 TCP 套接字self.ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 設置地址可重用self.ln.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 綁定地址和端口self.ln.bind((self.ip, self.port))# 開始監聽self.ln.listen(5)log.info(f"TCP server started on port {self.port}")while True:try:# 接受連接conn, addr = self.ln.accept()log.info(f"Accepted connection from {addr}")# 設置超時時間conn.settimeout(self.timeout)# 啟動新線程處理連接import threadingthreading.Thread(target=self.handle_connection, args=(conn,)).start()except socket.error as e:if isinstance(e, socket.timeout):continuelog.error(f"Failed to accept connection: {e}")if not e.errno == 10038: # 非套接字關閉錯誤return eexcept Exception as e:log.error(f"Failed to start TCP server: {e}")return e
-
關閉服務端
def stop(self):if self.ln:log.info("Shutting down TCP server...")try:self.ln.close()except Exception as e:log.error(f"Error closing server: {e}")return ereturn None
使用例子
-
啟動服務端
if __name__ == '__main__':server_svc = new_tcp_server("127.0.0.1", 8021, 3000)# server_svc = new_rtu_server("COM4", 8, 1, 9600, "N", 1000)server_svc.set_00(0x00000000, 100.0)server_svc.set_02(0x02010100, 86.0)server_svc.server.start()
-
使用模擬DLT645客戶端軟件測試
-