文檔:
https://pymodbus.readthedocs.io/en/latest/
源碼:
https://github.com/riptideio/pymodbus/
文章目錄
- Python PyModbus庫使用教程:以Modbus RTU為例
- 介紹
- 安裝PyModbus
- 配置串行連接
- 導入必要的模塊
- 創建Modbus客戶端實例
- 建立連接
- 連接到Modbus設備
- 讀取數據
- 讀取寄存器
- 寫入數據
- 寫入單個寄存器
- 寫入多個寄存器
- 處理異常
- 斷開連接
- 高級主題
- 異步客戶端(使用Twisted或Asyncio框架)
- 自定義消息
- 日志記錄
- 實例
- 1. 使用RS485 modbus協議讀取溫度濕度傳感器數據
Python PyModbus庫使用教程:以Modbus RTU為例
介紹
Modbus是一種工業領域廣泛使用的通信協議,而PyModbus是一個在Python中實現Modbus通信的庫。它支持多種Modbus模式,包括RTU(通過串行線路),ASCII和TCP/IP。本教程將重點介紹如何使用PyModbus庫進行Modbus RTU通信。
安裝PyModbus
在開始編寫代碼之前,需要確保已經安裝了PyModbus庫。可以使用pip命令輕松地安裝:
pip install pymodbus
配置串行連接
導入必要的模塊
首先,需要從pymodbus
庫中導入必要的模塊:
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
不知道是不是改版了,我用上面導入報錯,用下面的可以:
from pymodbus.client import ModbusSerialClient as ModbusClient
創建Modbus客戶端實例
接下來,創建一個ModbusClient
實例,用于建立與Modbus設備的RTU通信:
client = ModbusClient(method='rtu', port='/dev/ttyUSB0', baudrate=9600)
在這里,method
參數設置為'rtu'
以指定通信協議,port
參數根據實際連接的串行端口進行設置(例如Linux系統中可能是'/dev/ttyUSB0'
,Windows中可能是'COM3'
),baudrate
參數設置傳輸速率,這些參數應與設備文檔或配置相匹配。
建立連接
連接到Modbus設備
在配置好客戶端實例后,嘗試連接到Modbus設備:
if client.connect():print("Modbus RTU Client Connected")
else:print("Failed to connect to Modbus RTU Client")
讀取數據
讀取寄存器
Modbus協議定義了幾種類型的寄存器,最常見的是保持寄存器和輸入寄存器。以下示例展示了如何讀取保持寄存器:
response = client.read_holding_registers(address=1, count=10, unit=1)
if not response.isError():print("Register Values: ", response.registers)
else:print("Failed to read registers")
在這個例子中,read_holding_registers
方法用于讀取地址為1的起始位置、數量為10的連續寄存器。unit
參數表示從哪個單元(即設備ID)讀取數據。
注意:pymodbus某個版本已將unit字段改為slave。使用時即使寫錯也不會報錯,注意查看你的pymodbus文檔。
寫入數據
寫入單個寄存器
要向設備的單個寄存器寫入數據,可以使用以下代碼:
write_response = client.write_register(address=1, value=25, unit=1)
if not write_response.isError():print("Written successfully")
else:print("Failed to write register")
這里使用了write_register
方法,它接受地址、要寫入的值以及目標設備的單元ID。
寫入多個寄存器
如果要寫入多個寄存器,可以使用write_registers
方法:
values = [20, 40, 60, 80, 100]
write_response = client.write_registers(address=1, values=values, unit=1)
if not write_response.isError():print("Multiple registers written successfully")
else:print("Failed to write multiple registers")
這里values
列表包含了要寫入寄存器的值序列。
處理異常
處理Modbus通信過程中可能出現的異常非常重要,可以使用try-except語句捕獲這些異常:
from pymodbus.exceptions import ModbusExceptiontry:# 嘗試執行Modbus操作response = client.read_holding_registers(address=1, count=10, unit=1)
except ModbusException as ex:print("An error occurred:", str(ex))
斷開連接
在完成所有Modbus通信后,應該關閉與設備的連接:
client.close()
print("Modbus RTU Client Connection Closed")
調用close()
方法將關閉串行端口,并釋放相關資源。
高級主題
異步客戶端(使用Twisted或Asyncio框架)
除了同步客戶端,PyModbus還提供了異步客戶端選項,可以使用Twisted或Asyncio框架。異步客戶端允許程序在等待響應時執行其他任務,對于需要同時處理多個Modbus請求的應用非常有用。
自定義消息
有時候,標準的Modbus函數不足以滿足特定的需求。PyModbus允許創建自定義消息和事務處理器,使得可以擴展協議以適應特殊用例。
日志記錄
調試和監控Modbus通信過程中,日志記錄功能至關重要。PyModbus提供了詳細的日志記錄機制,可以幫助分析問題所在。
實例
1. 使用RS485 modbus協議讀取溫度濕度傳感器數據
# 請先安裝 pymodbus 和 pyserial
# pip install pymodbus
# pip install pyserial# from pymodbus.client.sync import ModbusSerialClient as ModbusClient # 報錯,說找不到 sync
from pymodbus.client import ModbusSerialClient as ModbusClient
from pymodbus.exceptions import ModbusException, ConnectionException
import logging# 配置日志記錄
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)# 初始化Modbus串行客戶端
# client = ModbusClient(method='rtu', port='/dev/ttyUSB0', baudrate=9600, timeout=3) # [Errno 2] could not open port /dev/ttyUSB0: [Errno 2] No such file or directory: '/dev/ttyUSB0'
# client = ModbusClient(method='rtu', port='/dev/ttyTHS1', baudrate=9600, timeout=3)
# ttyTHS4 ttyS0 ttyS1 ttyS2 ttyS4
# client = ModbusClient(method='rtu', port='/dev/ttyTHS1', baudrate=9600, timeout=3)
# client = ModbusClient(method='rtu', port='/dev/ttyTHS0', baudrate=9600, timeout=3, stopbits=1, bytesize=8, parity='N')
client = ModbusClient(port='/dev/ttyTHS0', baudrate=9600, timeout=3,stopbits=1, bytesize=8, parity='N') # 看文檔,method='rtu'貌似沒用def read_temperature_and_humidity(client):try:# 讀取寄存器地址0和1上的4個字節(兩個寄存器)# result = client.read_input_registers(address=0, count=3, unit=1) # 這個錯了,這是讀取輸入寄存器的)0x04# result = client.read_holding_registers(address=0, count=3, unit=1) # 這個才是讀取輸入寄存器的0x03 # unit參數錯了,當前pymodbus版本沒有這個參數,搞烏龍了,要不是用filelocator搜索函數用法,還真不知道- -result = client.read_holding_registers(address=0, count=2, slave=1) # 讀取輸入寄存器的0x03 # 讀兩個寄存器就ok,賣家說第三個寄存器是預留的,不用讀if result.isError():# 處理錯誤print("讀取錯誤:", result)return None, None# 將讀取到的結果轉換為溫度和濕度registers = result.registerstemperature_reg = registers[0]humidity_reg = registers[1]# 檢查是否有探頭錯誤if temperature_reg == 0x8000 or humidity_reg == 0x8000:print("探頭錯誤")return None, None# 計算實際的溫度和濕度值temperature = temperature_reg * 0.1humidity = humidity_reg * 0.1# 格式化溫度和濕度值,保留一位小數temperature = round(temperature, 1)humidity = round(humidity, 1)return temperature, humidityexcept ModbusException as e:print("Modbus異常:", e)return None, Noneexcept Exception as e:# 捕獲除ModbusException之外的所有異常print(f"An error occurred: {e}")return None, Nonedef main():try:if client.connect(): # 嘗試連接到Modbus服務器/設備temperature, humidity = read_temperature_and_humidity(client)if temperature is not None and humidity is not None:print(f"溫度: {temperature}°C, 濕度: {humidity}%RH")client.close() # 關閉連接else:print("無法連接到Modbus設備")except ConnectionException as e:print("連接異常:", e)if __name__ == "__main__":main()
運行結果: