系列文章目錄
sigrokdecode 模塊學習指南 — 準備階段
通訊協議 - Uart
sigrokdecode 模塊
UART協議解碼器源碼解析
Uart協議解碼器源碼各個方法
`
文章目錄
- 系列文章目錄
- 引入庫
- parity_ok
- 注解類型枚舉
- options參數
- annotations 注解
- annotation_rows 注解分組
- 接收(RX)相關行
- 發送(TX)相關行
- put 輸出方法
- putx(self, rxtx, data)
- putx_packet(self, rxtx, data)
- putpx(self, rxtx, data)
- putg(self, data)
- putp(self, data)
- putgse(self, ss, es, data)
- 工作流程示例
- reset
- 示例場景
- start
- Python對象通道
- 定義與用途
- 數據形式
- 邏輯分析儀注釋通道
- **定義與用途**
- **數據形式**
- 示例場景
- wait_for_start_bit
- get_start_bit
- 記錄起始位信號
- 驗證起始位有效性
- handle_packet
- 確定配置參數
- 緩存數據并記錄起始樣本點
- 判斷數據包結束條件
- get_data_bits
- format_value
- get_parity_bit
- 保存校驗位值并更新位計數器
- 校驗位驗證
- 狀態轉移
- get_stop_bits
- 記錄停止位值并更新位計數器
- 驗證停止位合法性
- 輸出停止位信息
- 檢查停止位數量
- get_wait_cond
- 取當前狀態
- 處理「等待起始位」狀態
- 處理「數據/校驗/停止位」狀態
- get_idle_cond
- inspect_sample
- 信號反轉處理
- 狀態路由與解析
- 執行流程示例
- inspect_edge
- decode
- 初始校驗
- 信號反轉配置
- 幀參數計算
- 主事件循環
- 步驟分解
'''
輸出格式說明(OUTPUT_PYTHON):
每個數據包格式為 [<ptype>, <rxtx>, <pdata>]
<ptype> 類型包括:- 'STARTBIT': 起始位值(0/1)- 'DATA': 數據值和各比特位的起止樣本號- 'PARITYBIT': 校驗位值- 'STOPBIT': 停止位值- 'INVALID ...': 無效位錯誤- 'PARITY ERROR': 校驗錯誤- 'BREAK': 線路中斷- 'FRAME': 完整幀數據及有效性- 'IDLE': 空閑狀態
<rxtx> 表示方向:0(RX接收)或1(TX發送)
'''
''' DATA = (13, [(1, ss=200, es=250), (0, ss=150, es=200), (1, ss=100, es=150), (1, ss=50, es=100)])
LSB到MSB順序:傳輸順序為 1 (LSB) → 0 → 1 → 1 (MSB)。'''
引入庫
# 引入必要的庫
import sigrokdecode as srd # Sigrok 解碼器基礎庫
from common.srdhelper import bitpack # 輔助函數:將位列表轉為整數值
from math import floor, ceil # 數學函數
parity_ok
# 校驗位檢查函數
def parity_ok(parity_type, parity_bit, data, data_bits):# 忽略校驗:直接返回Trueif parity_type == 'ignore': return True# 根據校驗類型判斷校驗位是否正確if parity_type == 'zero': return parity_bit == 0elif parity_type == 'one': return parity_bit == 1# 計算1的個數(數據+校驗位)ones = bin(data).count('1') + parity_bit # 奇校驗:1的總數為奇數if parity_type == 'odd': return (ones % 2) == 1elif parity_type == 'even': return (ones % 2) == 0
參數 | 含義 | 取值范圍 | 作用 |
---|---|---|---|
parity_type | 校驗類型 | ignore /zero /one /odd /even | 決定校驗規則 |
parity_bit | 接收到的校驗位值 | 0 或 1 | 需根據校驗類型驗證合法性 |
data | 數據整數值 | 由data_bits 決定(如0~255) | 提供原始數據用于計算1的數量 |
data_bits | 數據位長度 | 5 ~9 | 確保正確解析數據的二進制位數 |
- 函數用于驗證 UART 協議中的奇偶校驗位是否正確。
- 忽略校驗:直接返回
True
。 - 固定校驗值:強制校驗位為
0
或1
。 - 奇偶校驗:統計數據位和校驗位中
1
的總數
注解類型枚舉
# 自定義異常類
class SamplerateError(Exception): pass # 采樣率未設置錯誤
class ChannelError(Exception): pass # 通道未配置錯誤# 注解類型枚舉(用于在GUI中高亮顯示UART幀的不同部分(如起始位、數據位)。)
class Ann:RX_DATA, TX_DATA, RX_START, TX_START, RX_PARITY_OK, TX_PARITY_OK, \RX_PARITY_ERR, TX_PARITY_ERR, RX_STOP, TX_STOP, RX_WARN, TX_WARN, \RX_DATA_BIT, TX_DATA_BIT, RX_BREAK, TX_BREAK, RX_PACKET, TX_PACKET = range(18)# 二進制輸出類型 原始數據的轉儲,便于后續分析
class Bin:RX, TX, RXTX = range(3) # 分別對應接收、發送、雙向數據
range(3)
:生成一個整數序列[0, 1, 2]
。
RX_DATA | 0 | 接收端完整數據字節 | 標注接收到的完整數據字節(如 0x55 ) |
---|---|---|---|
TX_DATA | 1 | 發送端完整數據字節 | 標注發送的完整數據字節 |
RX_START | 2 | 接收起始位(Start Bit) | 標注接收通道的起始位(低電平信號) |
TX_START | 3 | 發送起始位 | 標注發送通道的起始位 |
RX_PARITY_OK | 4 | 接收校驗正確 | 校驗位驗證通過時的標注(如偶校驗正確) |
TX_PARITY_OK | 5 | 發送校驗正確 | 發送端校驗正確 |
RX_PARITY_ERR | 6 | 接收校驗錯誤 | 校驗位驗證失敗時的錯誤標注 |
TX_PARITY_ERR | 7 | 發送校驗錯誤 | 發送端校驗錯誤 |
RX_STOP | 8 | 接收停止位(Stop Bit) | 標注接收通道的停止位(高電平信號) |
TX_STOP | 9 | 發送停止位 | 標注發送通道的停止位 |
RX_WARN | 10 | 接收警告 | 接收異常(如停止位電平錯誤) |
TX_WARN | 11 | 發送警告 | 發送端異常 |
RX_DATA_BIT | 12 | 接收的單個數據位 | 標注接收的每個二進制位(如 0 或 1 ) |
TX_DATA_BIT | 13 | 發送的單個數據位 | 標注發送的每個二進制位 |
RX_BREAK | 14 | 接收線路中斷(BREAK 狀態) | 檢測到線路長時間低電平(BREAK 信號) |
TX_BREAK | 15 | 發送線路中斷 | 發送端 BREAK 信號 |
RX_PACKET | 16 | 接收完整數據包 | 標注多字節組成的完整數據包(如 [0xAA, 0x55] ) |
TX_PACKET | 17 | 發送完整數據包 | 標注發送的完整數據包 |
options參數
# 用戶可配置參數options = ({'id': 'baudrate', 'desc': '波特率', 'default': 115200},{'id': 'data_bits', 'desc': '數據位', 'default': 8, 'values': (5, 6, 7, 8, 9)},{'id': 'parity', 'desc': '校驗類型', 'default': 'none', 'values': ('none', 'odd', 'even', 'zero', 'one', 'ignore')},{'id': 'stop_bits', 'desc': '停止位', 'default': 1.0, 'values': (0.0, 0.5, 1.0, 1.5, 2.0)},{'id': 'bit_order', 'desc': '位順序', 'default': 'lsb-first', 'values': ('lsb-first', 'msb-first')},{'id': 'format', 'desc': '數據格式', 'default': 'hex', 'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},# ... 其他選項(省略) ...)
參數ID (id ) | 描述 (desc ) | 默認值 (default ) | 可選值 (values ) | 說明 |
---|---|---|---|---|
baudrate | 波特率 | 115200 | - | 每秒傳輸的符號數(常用值如9600、115200)。 |
data_bits | 數據位 | 8 | (5, 6, 7, 8, 9) | 每個數據幀的位數(通常為8位)。 |
parity | 校驗類型 | ‘none’ | (‘none’, ‘odd’, ‘even’, ‘zero’, ‘one’, ‘ignore’) | 校驗位規則(見奇偶校驗詳解)。 |
stop_bits | 停止位 | 1.0 | (0.0, 0.5, 1.0, 1.5, 2.0) | 停止位的持續時間(通常為1或2位時間)。 |
bit_order | 位傳輸順序 | ‘lsb-first’ | (‘lsb-first’, ‘msb-first’) | LSB(最低有效位)先傳輸是UART標準。 |
format | 數據格式 | ‘hex’ | (‘ascii’, ‘dec’, ‘hex’, ‘oct’, ‘bin’) | 數據以ASCII、十進制、十六進制、八進制或二進制格式顯示。 |
invert_rx | 反轉RX信號電平 | ‘no’ | (‘yes’, ‘no’) | 若為’yes’,接收信號電平邏輯取反(高→低,低→高)。 |
invert_tx | 反轉TX信號電平 | ‘no’ | (‘yes’, ‘no’) | 若為’yes’,發送信號電平邏輯取反。 |
sample_point | 采樣點位置(百分比) | 50 | - | 在位的中間位置(50%)采樣以提高抗噪性。 |
rx_packet_delim | RX數據包分隔符(十進制) | -1 | - | 分隔符的ASCII值(如-1 表示無分隔符,10 表示換行符)。 |
tx_packet_delim | TX數據包分隔符(十進制) | -1 | - | 同RX分隔符,用于發送。 |
rx_packet_len | RX數據包長度 | -1 | - | 固定長度數據包(如-1 表示可變長度)。 |
tx_packet_len | TX數據包長度 | -1 | - | 同RX數據包長度。 |
annotations 注解
# 注解定義(界面顯示的文本標簽)annotations = (('rx-data', '接收數據'), ('tx-data', '發送數據'),('rx-start', '接收起始位'), ('tx-start', '發送起始位'),# ... 其他注解(省略) ...)
標識符 (ID) | 描述 (Description) | 含義與應用場景 |
---|---|---|
rx-data | RX data | 接收數據:標識接收到的數據位部分,通常對應數據幀的有效信息。 |
tx-data | TX data | 發送數據:標識發送的數據位部分,標記正在傳輸的有效信息。 |
rx-start | RX start bit | 接收起始位:標識接收到的起始位(低電平),用于同步數據接收。 |
tx-start | TX start bit | 發送起始位:標識發送的起始位(低電平),標志數據幀開始傳輸。 |
rx-parity-ok | RX parity OK bit | 接收校驗成功:校驗位驗證通過(奇偶校驗正確),數據無錯誤。 |
tx-parity-ok | TX parity OK bit | 發送校驗成功:發送端計算的校驗位符合規則,數據已正確添加校驗。 |
rx-parity-err | RX parity error bit | 接收校驗失敗:校驗位驗證失敗,數據可能存在傳輸錯誤。 |
tx-parity-err | TX parity error bit | 發送校驗失敗:發送端校驗計算異常(通常為配置錯誤或邏輯問題)。 |
rx-stop | RX stop bit | 接收停止位:標識接收到的停止位(高電平),標志數據幀結束。 |
tx-stop | TX stop bit | 發送停止位:標識發送的停止位(高電平),確保數據幀結束。 |
rx-warning | RX warning | 接收警告:指示接收過程中的非致命異常(如波特率輕微偏差、噪聲干擾)。 |
tx-warning | TX warning | 發送警告:指示發送過程中的潛在問題(如緩沖區溢出、發送超時)。 |
rx-data-bit | RX data bit | 接收數據位:細分到單個數據位的接收狀態,用于位級調試。 |
tx-data-bit | TX data bit | 發送數據位:細分到單個數據位的發送狀態,用于位級分析。 |
rx-break | RX break | 接收中斷:檢測到線路持續低電平超過一幀時間(BREAK條件),可能表示通信終止或復位信號。 |
tx-break | TX break | 發送中斷:主動發送BREAK信號,用于通知接收端通信中斷。 |
rx-packet | RX packet | 接收數據包:標識完整接收到的數據包(包含起始位、數據位、校驗位、停止位)。 |
tx-packet | TX packet | 發送數據包:標識完整發送的數據包,用于跟蹤完整幀的傳輸過程。 |
annotation_rows 注解分組
# 注解分組(在界面中歸類顯示)annotation_rows = (('rx-data-bits', '接收位', (Ann.RX_DATA_BIT,)),('rx-data-vals', '接收數據', (Ann.RX_DATA, Ann.RX_START, Ann.RX_PARITY_OK, Ann.RX_PARITY_ERR, Ann.RX_STOP)),# ... 其他分組(省略) ...)
行分類 | 核心作用 | 典型使用場景 |
---|---|---|
數據位行 | 位級調試(時序、電平) | 硬件信號完整性分析 |
數據幀行 | 完整幀解析(起始、數據、校驗、停止) | 協議合規性驗證 |
警告/錯誤行 | 異常檢測與診斷 | 通信穩定性測試 |
BREAK行 | 中斷信號處理 | 設備復位或通信終止檢測 |
數據包行 | 聚合幀數據展示 | 應用層數據處理(如ASCII消息解析) |
接收(RX)相關行
行ID | 行描述 | 包含的注解常量 | 作用 |
---|---|---|---|
rx-data-bits | RX bits | Ann.RX_DATA_BIT | 顯示每個接收數據位的詳細信息(如位值、時序)。 |
rx-data-vals | RX data | Ann.RX_DATA, Ann.RX_START, Ann.RX_PARITY_OK, Ann.RX_PARITY_ERR, Ann.RX_STOP | 標記完整接收幀(起始位、數據、校驗、停止位)。 |
rx-warnings | RX warnings | Ann.RX_WARN | 接收過程中的警告(如波特率偏差、噪聲干擾)。 |
rx-breaks | RX breaks | Ann.RX_BREAK | 接收到的BREAK信號(持續低電平中斷)。 |
rx-packets | RX packets | Ann.RX_PACKET | 完整接收的數據包(包含所有位的聚合信息)。 |
發送(TX)相關行
行ID | 行描述 | 包含的注解常量 | 作用 |
---|---|---|---|
tx-data-bits | TX bits | Ann.TX_DATA_BIT | 顯示每個發送數據位的詳細信息。 |
tx-data-vals | TX data | Ann.TX_DATA, Ann.TX_START, Ann.TX_PARITY_OK, Ann.TX_PARITY_ERR, Ann.TX_STOP | 標記完整發送幀(起始位、數據、校驗、停止位)。 |
tx-warnings | TX warnings | Ann.TX_WARN | 發送過程中的警告(如緩沖區溢出、發送超時)。 |
tx-breaks | TX breaks | Ann.TX_BREAK | 主動發送的BREAK信號。 |
tx-packets | TX packets | Ann.TX_PACKET | 完整發送的數據包。 |
put 輸出方法
def putx(self, rxtx, data):s, halfbit = self.startsample[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)def putx_packet(self, rxtx, data):s, halfbit = self.ss_packet[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)def putpx(self, rxtx, data):s, halfbit = self.startsample[rxtx], self.bit_width / 2.0self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_python, data)def putg(self, data):s, halfbit = self.samplenum, self.bit_width / 2.0self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)def putp(self, data):s, halfbit = self.samplenum, self.bit_width / 2.0self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)def putgse(self, ss, es, data):self.put(ss, es, self.out_ann, data)
方法名 | 核心用途 | 輸出通道 | 時間范圍計算方式 |
---|---|---|---|
putx | 標注方向性單一位事件 | 邏輯分析儀注釋 | 基于通道起始采樣點擴展半位寬 |
putx_packet | 標注完整數據包 | 邏輯分析儀注釋 | 基于數據包起始采樣點擴展半位寬 |
putpx | 輸出方向性結構化數據 | Python對象 | 同 putx |
putg | 標注當前瞬時事件 | 邏輯分析儀注釋 | 以當前采樣點為中心擴展半位寬 |
putp | 輸出當前瞬時結構化數據 | Python對象 | 同 putg |
putgse | 自定義時間范圍標注 | 邏輯分析儀注釋 | 直接指定起止采樣點 |
putx(self, rxtx, data)
- 功能:標注單個數據位或控制位(如起始位、停止位)的時間范圍,基于通道(RX/TX)的起始采樣點。
- 參數:
rxtx
:方向標識,取值為Bin.RX
(接收)或Bin.TX
(發送)。data
:要輸出的注解數據(如(Ann.RX_DATA_BIT, 'Bit: 1')
)。
- 時間計算:
- 起始時間:
startsample[rxtx] - floor(halfbit)
(從通道的起始采樣點向前擴展半位寬) - 結束時間:
self.samplenum + ceil(halfbit)
(當前采樣點向后擴展半位寬)
- 起始時間:
- 輸出目標:
self.out_ann
(邏輯分析儀注釋通道)。 - 應用場景:
標記接收或發送的單個數據位,確保標注覆蓋整個位的持續時間。
示例:在解析到第3個數據位時,調用putx(Bin.RX, (Ann.RX_DATA_BIT, 'Bit: 1'))
。
putx_packet(self, rxtx, data)
- 功能:標注完整數據包(如整個UART幀)的時間范圍,基于數據包的起始采樣點。
- 參數:
rxtx
:方向標識(同putx
)。data
:數據包相關注解(如(Ann.RX_PACKET, 'Packet: 0x41')
)。
- 時間計算:
- 起始時間:
ss_packet[rxtx] - floor(halfbit)
(數據包的起始采樣點向前擴展半位寬) - 結束時間:
self.samplenum + ceil(halfbit)
(當前采樣點向后擴展半位寬)
- 起始時間:
- 輸出目標:
self.out_ann
。 - 應用場景:
標記完整的接收或發送數據包(包含起始位、數據位、校驗位、停止位)。
示例:完成一幀解析后,調用putx_packet(Bin.TX, (Ann.TX_PACKET, 'Data: A'))
。
putpx(self, rxtx, data)
- 功能:與
putx
類似,但輸出到Python對象通道,用于生成結構化數據。 - 參數:同
putx
。 - 輸出目標:
self.out_python
(Python對象輸出通道)。 - 應用場景:
將解碼后的原始數據(如字節值、校驗結果)傳遞給其他Python模塊處理,而非可視化展示。
示例:發送數據時,調用putpx(Bin.TX, {'type': 'data', 'value': 0x41})
。
putg(self, data)
- 功能:以當前采樣點為中心,標注單個事件(如警告、校驗結果)的時間范圍。
- 參數:
data
:注解數據(如(Ann.RX_PARITY_ERR, 'Parity Error')
)。
- 時間計算:
- 起始時間:
self.samplenum - floor(halfbit)
(當前采樣點向前擴展半位寬) - 結束時間:
self.samplenum + ceil(halfbit)
(當前采樣點向后擴展半位寬)
- 起始時間:
- 輸出目標:
self.out_ann
。 - 應用場景:
標記瞬時事件(如校驗錯誤、警告),確保標注在波形中居中顯示。
示例:檢測到校驗錯誤時,調用putg((Ann.RX_PARITY_ERR, 'Parity Error'))
。
putp(self, data)
- 功能:類似
putg
,但輸出到Python對象通道。 - 參數:同
putg
。 - 輸出目標:
self.out_python
。 - 應用場景:
將瞬時事件的結構化數據傳遞給其他處理邏輯。
示例:發送BREAK信號時,調用putp({'event': 'break', 'direction': 'rx'})
。
putgse(self, ss, es, data)
-
功能:自定義時間范圍標注,直接指定起始和結束采樣點。
-
參數:
ss
:起始采樣點(整數)。es
:結束采樣點(整數)。data
:注解數據。
-
輸出目標:
self.out_ann
。 -
應用場景:
處理非標準時間范圍的事件(如跨越多個位的BREAK信號)。
示例:標注一個持續低電平的BREAK信號:self.putgse(ss=100, es=200, data=(Ann.RX_BREAK, 'BREAK Detected'))
工作流程示例
- 檢測起始位:當RX線路變為低電平時,記錄
startsample[Bin.RX] = 當前采樣點
。 - 解析數據位:逐個采樣點讀取數據位,調用
putx(Bin.RX, ...)
標注每個位。 - 校驗與停止位:校驗完成后,調用
putg(...)
標記校驗結果。 - 完成數據包:調用
putx_packet(Bin.RX, ...)
輸出完整幀信息。 - 輸出到Python:同時調用
putpx
或putp
傳遞結構化數據。
reset
def reset(self):self.samplerate = Noneself.frame_start = [-1, -1]self.frame_valid = [None, None]self.cur_frame_bit = [None, None]self.startbit = [-1, -1]self.cur_data_bit = [0, 0]self.datavalue = [0, 0]self.paritybit = [-1, -1]self.stopbits = [[], []]self.startsample = [-1, -1]self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']self.databits = [[], []]self.break_start = [None, None]self.packet_cache = [[], []]self.ss_packet, self.es_packet = [None, None], [None, None]self.idle_start = [None, None]
變量 | 初始值 | 用途 | 初始值設計邏輯 |
---|---|---|---|
samplerate | None | 存儲采樣率(單位:Hz) | 未配置前未知,需外部設置。 |
frame_start | [-1, -1] | 接收(RX)和發送(TX)方向幀的起始樣本點。 | -1 表示未檢測到幀起始。 |
frame_valid | [None, None] | 標記當前幀是否有效(校驗通過)。 | None 表示幀未解析完成或未驗證。 |
cur_frame_bit | [None, None] | 當前處理的幀中的位數(如起始位=0,數據位=1~N)。 | None 表示未開始解析幀。 |
startbit | [-1, -1] | 起始位的樣本點位置。 | -1 表示未檢測到起始位。 |
cur_data_bit | [0, 0] | 當前處理的數據位索引(從0開始,對應LSB)。 | 數據位從第0位開始解析。 |
datavalue | [0, 0] | 當前幀的整數值(通過數據位計算)。 | 初始值為0,解析時按位累加。 |
paritybit | [-1, -1] | 校驗位的值(0或1)。 | -1 表示未解析到校驗位。 |
stopbits | [[], []] | 停止位的樣本點范圍列表(可能包含多個停止位)。 | 空列表表示未檢測到停止位。 |
startsample | [-1, -1] | 幀的起始樣本點(含起始位)。 | -1 表示幀未開始。 |
state | ['WAIT FOR START BIT', 'WAIT FOR START BIT'] | 解析器的狀態機狀態(RX和TX方向)。 | 空閑時持續檢測起始位,符合UART協議邏輯。 |
databits | [[], []] | 解析出的數據位列表(如 [1, 0, 1] )。 | 空列表表示未解析到數據位。 |
break_start | [None, None] | BREAK信號(持續低電平)的起始樣本點。 | None 表示未檢測到BREAK。 |
packet_cache | [[], []] | 臨時存儲數據包內容(如多幀聚合)。 | 空列表表示無緩存數據。 |
ss_packet , es_packet | [None, None] , [None, None] | 數據包的起始和結束樣本點。 | None 表示數據包未開始或未完成。 |
idle_start | [None, None] | 進入空閑狀態的起始樣本點(用于超時檢測)。 | None 表示當前未處于空閑狀態。 |
示例場景
接收一幀8位數據(值 0x41
,校驗通過):
- 起始位:
startbit[0] = 100
,startsample[0] = 100
,state[0] = 'PROCESSING DATA BITS'
。 - 數據位:逐位解析,
cur_data_bit[0]
從0遞增到7,databits[0] = [1, 0, 0, 0, 0, 0, 1, 0]
,datavalue[0] = 0x41
。 - 校驗位:
paritybit[0] = 1
,校驗計算通過,frame_valid[0] = True
。 - 停止位:
stopbits[0] = [180, 200]
。 - 數據包記錄:
ss_packet[0] = 100
,es_packet[0] = 200
,packet_cache[0].append(0x41)
。
start
def start(self):self.out_python = self.register(srd.OUTPUT_PYTHON)self.out_binary = self.register(srd.OUTPUT_BINARY)self.out_ann = self.register(srd.OUTPUT_ANN)self.bw = (self.options['data_bits'] + 7) // 8
-
作用:向協議解析框架(如
sigrok
)注冊三類輸出通道,用于分發解析結果。 -
作用:根據配置的 數據位長度(
data_bits
),計算存儲數據所需的 最小字節數。
Python對象通道
定義與用途
- 作用:將協議解析后的結構化數據(如完整數據包、校驗結果、原始字節)傳遞給其他Python模塊,用于自動化處理、存儲或分析。
- 目標用戶:軟件開發工程師、自動化測試腳本,用于程序化數據消費。
數據形式
-
數據結構:通常為 字典(Dict) 或 自定義類實例,包含機器可讀的字段:
- 事件類型:如
'data'
、'error'
、'packet'
。 - 數據內容:如字節值、ASCII字符串、十六進制數值。
- 元數據:時間戳、方向(RX/TX)、校驗狀態等。
- 事件類型:如
-
示例:
# 發送一個解析后的數據包到Python對象通道 packet_data = {'direction': 'rx', # 數據方向'timestamp': 1630450000.5, # 時間戳'data': b'\x41', # 原始字節(ASCII 'A')'parity_ok': True, # 校驗結果'length': 8, # 數據位長度 } self.put(output=self.out_python, data=packet_data)
邏輯分析儀注釋通道
定義與用途
- 作用:將協議解析后的關鍵事件(如數據位、起始位、校驗結果等)以可視化注釋的形式標注在邏輯分析儀的波形界面上,幫助用戶直觀理解信號含義。
- 目標用戶:硬件工程師、調試人員,用于實時或離線分析信號時序和協議合規性。
數據形式
-
數據結構:通常為元組(Tuple) 或 特定對象,包含以下信息:
- 時間范圍:起始和結束的采樣點(或時間戳),確定注釋在波形上的顯示范圍。
- 注釋類型:預定義的常量(如
Ann.RX_DATA
表示接收數據),用于分類事件。 - 描述信息:可讀的文本或數值(如數據位的值
'Bit: 1'
)。
-
示例:
# 標注一個接收數據位(值1)的時間范圍和類型 annotation = (Ann.RX_DATA_BIT, # 注釋類型(常量)'Bit: 1', # 描述信息 ) self.put(ss=100, es=150, output=self.out_ann, data=annotation)
- 在波形界面中,從樣本點100到150的區間內顯示“Bit: 1”。
def metadata(self, key, value):if key == srd.SRD_CONF_SAMPLERATE:self.samplerate = value# The width of one UART bit in number of samples.self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
-
self.samplerate
- 作用:存儲當前采樣率(單位:Hz),用于后續時間與樣本點轉換。
- 示例:
samplerate = 1_000_000
表示每秒采集 100 萬個樣本。
-
self.bit_width
-
作用:表示每個 UART 位占用的樣本數,直接影響解析精度和時序計算。
-
計算邏輯:
- 采樣率(
samplerate
)決定時間分辨率。 - 波特率(
baudrate
)決定每秒傳輸的位數。 - 二者的比值即為每個位的樣本數。
- 采樣率(
-
示例:
-
若
samplerate=12 MHz
,baudrate=115200
,則:bit_width=12?000?000/115?200≈104.17 個樣本/位
-
-
def get_sample_point(self, rxtx, bitnum):# Determine absolute sample number of a bit slot's sample point.# Counts for UART bits start from 0 (0 = start bit, 1..x = data,# x+1 = parity bit (if used) or the first stop bit, and so on).# Accept a position in the range of 1-99% of the full bit width.# Assume 50% for invalid input specs for backwards compatibility.perc = self.options['sample_point'] or 50if not perc or perc not in range(1, 100):perc = 50perc /= 100.0bitpos = (self.bit_width - 1) * percbitpos += self.frame_start[rxtx]bitpos += bitnum * self.bit_widthreturn bitpos
- 目的:根據配置的采樣點百分比,確定某一位(如起始位、數據位、校驗位)的采樣位置(絕對樣本數)。
rxtx | 枚舉 | 通信方向(RX 或 TX ),用于獲取對應方向的幀起始樣本點。 |
---|---|---|
bitnum | 整數 | 位編號(0 = 起始位,1~N = 數據位,N+1 = 校驗位/停止位)。 |
-
獲取采樣點百分比
perc = self.options['sample_point'] or 50 if not perc or perc not in range(1, 100):perc = 50 perc /= 100.0
- 默認值處理:若
sample_point
無效(非 1-99),默認使用 50%。 - 百分比轉換:將百分比轉換為小數(如 50% → 0.5)。
- 默認值處理:若
-
計算位內采樣偏移
bitpos = (self.bit_width - 1) * perc
- 位寬調整:
self.bit_width - 1
確保采樣點在位的時間范圍內(如位寬為 10 樣本時,采樣點范圍是 0~9)。
- 位寬調整:
-
確定絕對位置
bitpos += self.frame_start[rxtx] bitpos += bitnum * self.bit_width
- 幀起始偏移:從幀的起始樣本點開始。
- 位編號偏移:累加前序位的總樣本數(如
bitnum=2
表示跨越 2 個完整位)。
示例場景
配置參數:
- 波特率
115200
,采樣率1,000,000 Hz
→ 位寬8.68
樣本/位。 - 采樣點
70%
,方向RX
,幀起始樣本點1000
,位編號1
(第 1 個數據位)。
計算過程:
-
位內偏移:
(8.68?1)×0.7≈5.376(8.68?1)×0.7≈5.376
-
絕對位置:
1000+5.376+1×8.68≈1014.0561000+5.376+1×8.68≈1014.056
-
最終采樣點:約
1014
(取整)。
wait_for_start_bit
def wait_for_start_bit(self, rxtx, signal):# Save the sample number where the start bit begins.self.frame_start[rxtx] = self.samplenumself.frame_valid[rxtx] = Trueself.cur_frame_bit[rxtx] = 0
- 核心作用:當檢測到起始位(低電平信號)時,初始化幀解析所需的關鍵參數,并標記幀開始。
- 觸發條件:在 UART 空閑狀態(高電平)中檢測到信號跳變為低電平。
變量 | 類型 | 說明 |
---|---|---|
frame_start | 列表(int) | 存儲 RX/TX 方向幀的起始樣本點(如 [1000, -1] 表示 RX 幀從樣本 1000 開始)。 |
frame_valid | 列表(bool) | 標記幀是否有效(如校驗通過則為 True ,否則為 False )。 |
cur_frame_bit | 列表(int) | 當前處理的位索引(0 = 起始位,1~N = 數據位,N+1 = 校驗位/停止位)。 |
samplenum | 整數 | 當前解析器處理的樣本點編號(由框架自動更新)。 |
- 記錄起始位樣本點
self.frame_start[rxtx] = self.samplenum
將當前采樣點(self.samplenum
)保存到對應方向(RX/TX)的frame_start
數組中,用于后續計算數據位、校驗位和停止位的時間范圍。
- 標記幀有效性
self.frame_valid[rxtx] = True
假設幀在起始階段有效,后續可能根據校驗結果(如奇偶校驗、停止位驗證)更新此標記。
- 初始化位計數器
self.cur_frame_bit[rxtx] = 0
從0
開始計數,表示當前處理的是起始位。數據位通常從索引1
開始遞增。
get_start_bit
def get_start_bit(self, rxtx, signal):self.startbit[rxtx] = signalself.cur_frame_bit[rxtx] += 1# The startbit must be 0. If not, we report an error and wait# for the next start bit (assuming this one was spurious).if self.startbit[rxtx] != 0:self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = Falsees = self.samplenum + ceil(self.bit_width / 2.0)self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])self.advance_state(rxtx, signal, fatal = True, idle = es)return# Reset internal state for the pending UART frame.self.cur_data_bit[rxtx] = 0self.datavalue[rxtx] = 0self.paritybit[rxtx] = -1self.stopbits[rxtx].clear()self.startsample[rxtx] = -1self.databits[rxtx].clear()self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])self.advance_state(rxtx, signal)
startbit[rxtx] | 存儲起始位電平值(應為 0 )。 |
---|---|
cur_frame_bit[rxtx] | 跟蹤當前處理的幀位索引(起始位為 0 ,數據位從 1 開始)。 |
frame_valid[rxtx] | 標記幀有效性,初始為 True ,錯誤時置 False 。 |
putp / putg | 輸出日志和注釋,分別對應 Python 對象通道和邏輯分析儀注釋通道。 |
advance_state | 狀態機驅動方法,控制解析流程(如跳轉到數據位或空閑狀態)。 |
記錄起始位信號
self.startbit[rxtx] = signal
self.cur_frame_bit[rxtx] += 1
- 目的:
- 保存當前信號值到
startbit
數組,標記起始位的電平狀態。 - 遞增當前幀的位計數器(
cur_frame_bit
),表示開始處理起始位。
- 保存當前信號值到
驗證起始位有效性
if self.startbit[rxtx] != 0:# 錯誤處理:輸出日志和警告注釋self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])# 標記幀無效self.frame_valid[rxtx] = False# 計算錯誤幀的結束樣本點es = self.samplenum + ceil(self.bit_width / 2.0)# 記錄錯誤幀信息self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])# 推進狀態機到空閑狀態self.advance_state(rxtx, signal, fatal=True, idle=es)return
- 核心邏輯:python
- 若起始位非低電平(
signal != 0
),判定為無效起始位(可能由噪聲引起)。 - 錯誤處理步驟:
- 輸出日志:通過
putp
記錄錯誤類型(INVALID STARTBIT
)、方向和信號值。 - 可視化警告:通過
putg
在邏輯分析儀界面生成錯誤注釋(如Frame error
)。 - 標記幀無效:
frame_valid[rxtx] = False
,后續流程將忽略此幀。 - 記錄錯誤幀范圍:通過
putpse
標注錯誤幀的起止樣本點,供調試參考。 - 狀態轉移:調用
advance_state
強制進入空閑狀態(idle=es
),等待下一幀起始位。
- 輸出日志:通過
- 若起始位非低電平(
##有效起始位處理**
# 重置內部狀態
self.cur_data_bit[rxtx] = 0
self.datavalue[rxtx] = 0
self.paritybit[rxtx] = -1
self.stopbits[rxtx].clear()
self.startsample[rxtx] = -1
self.databits[rxtx].clear()# 輸出起始位信息
self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])# 推進狀態機到下一狀態
self.advance_state(rxtx, signal)
- 目的:
- 狀態初始化:清空或重置與幀解析相關的變量,確保無殘留數據干擾新幀。
- 日志與注釋:記錄有效起始位事件,支持后續調試和可視化。
- 狀態轉移:調用
advance_state
進入數據位解析階段。
handle_packet
def handle_packet(self, rxtx):d = 'rx' if (rxtx == RX) else 'tx'delim = self.options[d + '_packet_delim']plen = self.options[d + '_packet_len']if delim == -1 and plen == -1:return# Cache data values until we see the delimiter and/or the specified# packet length has been reached (whichever happens first).if len(self.packet_cache[rxtx]) == 0:self.ss_packet[rxtx] = self.startsample[rxtx]self.packet_cache[rxtx].append(self.datavalue[rxtx])if self.datavalue[rxtx] == delim or len(self.packet_cache[rxtx]) == plen:self.es_packet[rxtx] = self.samplenums = ''for b in self.packet_cache[rxtx]:s += self.format_value(b)if self.options['format'] != 'ascii':s += ' 'if self.options['format'] != 'ascii' and s[-1] == ' ':s = s[:-1] # Drop trailing space.self.putx_packet(rxtx, [Ann.RX_PACKET + rxtx, [s]])self.packet_cache[rxtx] = []
- 根據配置的分隔符或長度條件判斷是否結束當前數據包的解析,并在滿足條件時觸發數據包的最終處理。
packet_cache[rxtx] | 臨時存儲數據包內容(如 [0x41, 0x42] )。 |
---|---|
ss_packet[rxtx] | 數據包起始樣本點(第一個數據位的開始時間)。 |
es_packet[rxtx] | 數據包結束樣本點(觸發結束條件的當前樣本點)。 |
rx_packet_delim | 接收方向的分隔符(如 10 表示換行符)。 |
tx_packet_len | 發送方向的固定數據包長度(如 64 字節)。 |
format_value(b) | 根據 format 配置(ASCII/HEX等)將字節轉換為字符串。 |
確定配置參數
d = 'rx' if (rxtx == RX) else 'tx' # 生成配置鍵前綴(如 "rx_" 或 "tx_")
delim = self.options[d + '_packet_delim'] # 分隔符(十進制ASCII值)
plen = self.options[d + '_packet_len'] # 數據包固定長度
if delim == -1 and plen == -1:return # 未配置分隔符和長度,無需處理
- 分隔符:若配置為
-1
,表示不依賴分隔符結束數據包。 - 固定長度:若配置為
-1
,表示不依賴固定長度。 - 檢查有效性:若兩者均為
-1
,直接返回,數據包由其他邏輯(如停止位)觸發結束。
緩存數據并記錄起始樣本點
if len(self.packet_cache[rxtx]) == 0:self.ss_packet[rxtx] = self.startsample[rxtx] # 記錄數據包起始樣本點
self.packet_cache[rxtx].append(self.datavalue[rxtx]) # 緩存當前數據值
- 首次緩存:若緩存為空,記錄數據包的起始樣本點(即第一個數據位的起始位置)。
- 逐幀累積:將當前解析出的數據值(如
0x41
)存入緩存列表。
判斷數據包結束條件
if self.datavalue[rxtx] == delim or len(self.packet_cache[rxtx]) == plen:# 滿足結束條件(分隔符或長度)self.es_packet[rxtx] = self.samplenum # 記錄數據包結束樣本點# 格式化數據包內容s = ''for b in self.packet_cache[rxtx]:s += self.format_value(b) # 根據配置(ASCII/HEX等)格式化值if self.options['format'] != 'ascii':s += ' ' # 非ASCII格式添加分隔符(如十六進制值之間加空格)if self.options['format'] != 'ascii' and s[-1] == ' ':s = s[:-1] # 移除末尾多余空格# 輸出數據包注釋self.putx_packet(rxtx, [Ann.RX_PACKET + rxtx, [s]])self.packet_cache[rxtx] = [] # 清空緩存
- 結束條件:
- 分隔符觸發:當前數據值等于配置的分隔符(如
0x0A
表示換行符\n
)。 - 長度觸發:緩存數據長度達到配置的固定值(如
64
字節)。
- 分隔符觸發:當前數據值等于配置的分隔符(如
- 數據格式化:
- ASCII:直接拼接字符(如
b'\x41\x42'
→"AB"
)。 - 非ASCII(HEX/DEC/OCT/BIN):用空格分隔值(如
"41 42"
)。
- ASCII:直接拼接字符(如
- 輸出與清理:
- 調用
putx_packet
輸出數據包注釋(如Ann.RX_PACKET
)。 - 清空緩存,準備接收下一數據包。
- 調用
get_data_bits
def get_data_bits(self, rxtx, signal):# Save the sample number of the middle of the first data bit.if self.startsample[rxtx] == -1:self.startsample[rxtx] = self.samplenumself.putg([Ann.RX_DATA_BIT + rxtx, ['%d' % signal]])# Store individual data bits and their start/end samplenumbers.s, halfbit = self.samplenum, int(self.bit_width / 2)self.databits[rxtx].append([signal, s - halfbit, s + halfbit])self.cur_frame_bit[rxtx] += 1# Return here, unless we already received all data bits.self.cur_data_bit[rxtx] += 1if self.cur_data_bit[rxtx] < self.options['data_bits']:return# Convert accumulated data bits to a data value.bits = [b[0] for b in self.databits[rxtx]]if self.options['bit_order'] == 'msb-first':bits.reverse()self.datavalue[rxtx] = bitpack(bits)self.putpx(rxtx, ['DATA', rxtx,(self.datavalue[rxtx], self.databits[rxtx])])b = self.datavalue[rxtx]formatted = self.format_value(b)if formatted is not None:self.putx(rxtx, [rxtx, [formatted]])bdata = b.to_bytes(self.bw, byteorder='big')self.putbin(rxtx, [Bin.RX + rxtx, bdata])self.putbin(rxtx, [Bin.RXTX, bdata])self.handle_packet(rxtx)self.databits[rxtx] = []self.advance_state(rxtx, signal)
format_value
def format_value(self, v):# Format value 'v' according to configured options.# Reflects the user selected kind of representation, as well as# the number of data bits in the UART frames.fmt, bits = self.options['format'], self.options['data_bits']# Assume "is printable" for values from 32 to including 126,# below 32 is "control" and thus not printable, above 127 is# "not ASCII" in its strict sense, 127 (DEL) is not printable,# fall back to hex representation for non-printables.if fmt == 'ascii':if v in range(32, 126 + 1):return chr(v)hexfmt = "[{:02X}]" if bits <= 8 else "[{:03X}]"return hexfmt.format(v)# Mere number to text conversion without prefix and padding# for the "decimal" output format.if fmt == 'dec':return "{:d}".format(v)# Padding with leading zeroes for hex/oct/bin formats, but# without a prefix for density -- since the format is user# specified, there is no ambiguity.if fmt == 'hex':digits = (bits + 4 - 1) // 4fmtchar = "X"elif fmt == 'oct':digits = (bits + 3 - 1) // 3fmtchar = "o"elif fmt == 'bin':digits = bitsfmtchar = "b"else:fmtchar = Noneif fmtchar is not None:fmt = "{{:0{:d}{:s}}}".format(digits, fmtchar)return fmt.format(v)return None
- 逐位接收數據,將其累積并轉換為完整的數據值,同時生成可視化注釋和結構化輸出。其核心作用包括:
- 記錄數據位的時序信息(起始/結束樣本點)。
- 按配置的位順序組合數據位(LSB-first 或 MSB-first)。
- 格式化數據值(如十六進制、ASCII)。
- 觸發數據包處理邏輯(當數據位接收完成時)。
get_parity_bit
def get_parity_bit(self, rxtx, signal):self.paritybit[rxtx] = signalself.cur_frame_bit[rxtx] += 1if parity_ok(self.options['parity'], self.paritybit[rxtx],self.datavalue[rxtx], self.options['data_bits']):self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])self.putg([Ann.RX_PARITY_OK + rxtx, ['Parity bit', 'Parity', 'P']])else:# TODO: Return expected/actual parity values.self.putp(['PARITY ERROR', rxtx, (0, 1)]) # FIXME: Dummy tuple...self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])self.frame_valid[rxtx] = Falseself.advance_state(rxtx, signal)
- 用于處理 UART 協議中的 奇偶校驗位,驗證其正確性并根據結果更新幀狀態。以下是分步解析:
- 記錄校驗位值:保存校驗位的電平值。
- 校驗驗證:根據配置的校驗類型(奇校驗、偶校驗等),驗證校驗位是否合法。
- 狀態更新與輸出:
- 若校驗通過:輸出校驗位信息至日志和可視化界面。
- 若校驗失敗:標記幀為無效,輸出錯誤信息。
- 狀態轉移:推進狀態機至下一階段(如停止位處理)
保存校驗位值并更新位計數器
self.paritybit[rxtx] = signal
self.cur_frame_bit[rxtx] += 1
paritybit[rxtx]
:存儲校驗位的實際電平值(0
或1
)。cur_frame_bit[rxtx]
:遞增幀內位計數器,表示已處理完校驗位。
校驗位驗證
if parity_ok(self.options['parity'], self.paritybit[rxtx],self.datavalue[rxtx], self.options['data_bits']):# 校驗成功self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])self.putg([Ann.RX_PARITY_OK + rxtx, ['Parity bit', 'Parity', 'P']])
else:# 校驗失敗self.putp(['PARITY ERROR', rxtx, (0, 1)]) # 虛擬元組,需完善self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])self.frame_valid[rxtx] = False
-
parity_ok()
函數:
根據校驗類型、校驗位值、數據值(datavalue
)和數據位數,返回校驗結果。
示例邏輯:def parity_ok(parity_type, parity_bit, data, data_bits):if parity_type == 'none':return True # 無校驗,始終通過# 計算數據位中1的個數ones = bin(data).count('1') if parity_type == 'even':return (ones + parity_bit) % 2 == 0elif parity_type == 'odd':return (ones + parity_bit) % 2 == 1# 其他校驗類型(如 'zero', 'one')...
-
校驗成功:
putp
:輸出校驗位信息到 Python 對象通道(如日志記錄)。putg
:在邏輯分析儀界面標注校驗位(如 “Parity bit”)。
-
校驗失敗:
- 輸出錯誤信息到 Python 和注釋通道。
- 標記幀無效(
frame_valid[rxtx] = False
),后續流程可能忽略此幀。 - 注意:代碼中使用了虛擬元組
(0, 1)
,需替換為實際期望值與實際值(如(expected_parity, actual_parity)
)。
狀態轉移
self.advance_state(rxtx, signal)
-
作用:推進狀態機至下一狀態(如處理停止位)。
-
實現邏輯(偽代碼):
def advance_state(self, rxtx, signal):if self.state[rxtx] == 'PARITY_CHECK':self.state[rxtx] = 'PROCESS_STOP_BITS'
get_stop_bits
def get_stop_bits(self, rxtx, signal):self.stopbits[rxtx].append(signal)self.cur_frame_bit[rxtx] += 1# Stop bits must be 1. If not, we report an error.if signal != 1:self.putp(['INVALID STOPBIT', rxtx, signal])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = Falseself.putp(['STOPBIT', rxtx, signal])self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])# Postprocess the UART frame after all STOP bits were seen.if len(self.stopbits[rxtx]) < self.options['stop_bits']:returnself.advance_state(rxtx, signal)
get_stop_bits
負責 處理停止位,驗證其合法性并控制狀態機流轉。核心功能包括:
- 記錄停止位值:將每個停止位的電平值存入緩存。
- 停止位有效性檢查:驗證停止位是否為高電平(
1
),否則標記幀無效。 - 狀態機推進:當所有配置的停止位處理完成后,進入下一狀態(如空閑狀態)。
記錄停止位值并更新位計數器
self.stopbits[rxtx].append(signal)
self.cur_frame_bit[rxtx] += 1
stopbits[rxtx]
:列表存儲每個停止位的電平值(如[1, 1]
)。cur_frame_bit
:遞增幀內位計數器,表示已處理完當前停止位。
驗證停止位合法性
if signal != 1:self.putp(['INVALID STOPBIT', rxtx, signal])self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])self.frame_valid[rxtx] = False
- 停止位規則:UART 協議要求停止位為高電平(
1
)。 - 錯誤處理:
putp
:輸出錯誤信息到 Python 對象通道(如日志記錄)。putg
:在邏輯分析儀界面標注錯誤(如Frame error
)。- 標記幀無效(
frame_valid[rxtx] = False
)。
輸出停止位信息
self.putp(['STOPBIT', rxtx, signal])
self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])
- 日志與注釋:無論停止位是否有效,均記錄其存在(如
Stop bit
)。
檢查停止位數量
if len(self.stopbits[rxtx]) < self.options['stop_bits']:return
self.advance_state(rxtx, signal)
- 數量判斷:比較已接收的停止位數量與配置值(如
stop_bits=1.0
)。 - 潛在問題:
若stop_bits
為浮點數(如1.5
),len(stopbits[rxtx])
可能無法正確匹配(需特殊處理時間長度而非數量)。 - 狀態轉移:滿足數量后調用
advance_state
,進入空閑狀態或下一幀解析。
def handle_frame(self, rxtx, ss, es):# Pass the complete UART frame to upper layers.self.putpse(ss, es, ['FRAME', rxtx,(self.datavalue[rxtx], self.frame_valid[rxtx])])def handle_idle(self, rxtx, ss, es):self.putpse(ss, es, ['IDLE', rxtx, 0])def handle_break(self, rxtx, ss, es):self.putpse(ss, es, ['BREAK', rxtx, 0])self.putgse(ss, es, [Ann.RX_BREAK + rxtx,['Break condition', 'Break', 'Brk', 'B']])self.state[rxtx] = 'WAIT FOR START BIT'
get_wait_cond
def get_wait_cond(self, rxtx, inv):# Return condititions that are suitable for Decoder.wait(). Those# conditions either match the falling edge of the START bit, or# the sample point of the next bit time.state = self.state[rxtx]if state == 'WAIT FOR START BIT':return {rxtx: 'r' if inv else 'f'}if state in ('GET START BIT', 'GET DATA BITS','GET PARITY BIT', 'GET STOP BITS'):bitnum = self.cur_frame_bit[rxtx]# TODO: Currently does not support half STOP bits.want_num = ceil(self.get_sample_point(rxtx, bitnum))return {'skip': want_num - self.samplenum}
get_wait_cond
方法用于動態生成解碼器的等待條件,確保協議解析過程能夠精確同步到UART幀的各個階段。
根據當前解析狀態(如等待起始位、處理數據位等),返回以下兩種條件之一:
- 邊沿觸發條件:用于檢測起始位的下降沿(或上升沿,若信號反轉)。
- 采樣點跳轉條件:指示解碼器需要跳過的樣本數,以對齊到下一個位的采樣點。
取當前狀態
state = self.state[rxtx]
- 作用:
根據方向(RX/TX)獲取解析器的當前狀態,如'WAIT FOR START BIT'
或'GET DATA BITS'
。
處理「等待起始位」狀態
if state == 'WAIT FOR START BIT':return {rxtx: 'r' if inv else 'f'}
- 邏輯:
- 起始位是UART幀的開始,協議要求起始位為低電平(邏輯
0
)。 inv
參數的作用:- 若
inv = False
(默認):等待下降沿('f'
),即從空閑高電平跳變到起始位低電平。 - 若
inv = True
(信號反轉):等待上升沿('r'
),即反轉后的低電平變為高電平。
- 若
- 起始位是UART幀的開始,協議要求起始位為低電平(邏輯
- 返回值:
字典{RX: 'f'}
或{TX: 'r'}
,表示需要監測對應通道的邊沿事件。
處理「數據/校驗/停止位」狀態
if state in ('GET START BIT', 'GET DATA BITS', 'GET PARITY BIT', 'GET STOP BITS'):bitnum = self.cur_frame_bit[rxtx]want_num = ceil(self.get_sample_point(rxtx, bitnum))return {'skip': want_num - self.samplenum}
- 邏輯:
- 獲取當前位號:
bitnum
表示當前正在處理的幀內位索引(如起始位為0
,數據位從1
開始)。 - 計算目標樣本點:
調用get_sample_point(rxtx, bitnum)
計算當前位的采樣點(如位的中間位置)。- 示例:若位寬為
10
樣本,采樣點配置為50%
,則采樣點為起始樣本點 + 5
。 ceil
的作用:將浮點采樣點向上取整,確保跳轉到整數樣本位置。
- 示例:若位寬為
- 計算需跳過的樣本數:
want_num - self.samplenum
表示從當前樣本點 (self.samplenum
) 到目標樣本點 (want_num
) 需要跳過的樣本數。
- 獲取當前位號:
- 返回值:
字典{'skip': N}
,指示解碼器跳過N
個樣本后繼續解析。
get_idle_cond
def get_idle_cond(self, rxtx, inv):# Return a condition that corresponds to the (expected) end of# the next frame, assuming that it will be an "idle frame"# (constant high input level for the frame's length).if self.idle_start[rxtx] is None:return Noneend_of_frame = self.idle_start[rxtx] + self.frame_len_sample_countif end_of_frame < self.samplenum:return Nonereturn {'skip': end_of_frame - self.samplenum}
- 在 UART 協議解析中用于檢測空閑狀態(幀間持續高電平),其作用是根據當前空閑開始時間計算需要跳過的樣本數以確認空閑幀結束。
空閑狀態檢測條件
- 檢查空閑開始時間:若
self.idle_start[rxtx]
為None
,表示未開始檢測空閑,返回None
。 - 計算預期結束點:
end_of_frame = 空閑開始時間 + 一幀總樣本數
。 - 超時判斷:若當前樣本
samplenum
已超過end_of_frame
,返回None
(空閑已結束)。 - 返回跳過樣本數:若未超時,返回需跳過的樣本數 `{‘skip’:
inspect_sample
def inspect_sample(self, rxtx, signal, inv):# Inspect a sample returned by .wait() for the specified UART line.if inv:signal = not signalstate = self.state[rxtx]if state == 'WAIT FOR START BIT':self.wait_for_start_bit(rxtx, signal)elif state == 'GET START BIT':self.get_start_bit(rxtx, signal)elif state == 'GET DATA BITS':self.get_data_bits(rxtx, signal)elif state == 'GET PARITY BIT':self.get_parity_bit(rxtx, signal)elif state == 'GET STOP BITS':self.get_stop_bits(rxtx, signal)
- 是 UART 協議解析器的核心驅動方法,根據當前解析狀態(如等待起始位、處理數據位等),處理單個樣本點并調用相應的子方法推進解析流程。
信號反轉處理
if inv:signal = not signal
- 作用:根據
inv
配置調整電平邏輯,適配硬件差異。- 若
inv = True
:signal = 1
變為0
,signal = 0
變為1
。 - 應用場景:某些硬件(如 RS-232)使用負邏輯(高電平為
0
,低電平為1
),需反轉信號。
- 若
狀態路由與解析
state = self.state[rxtx]
if state == 'WAIT FOR START BIT':self.wait_for_start_bit(rxtx, signal)
elif state == 'GET START BIT':self.get_start_bit(rxtx, signal)
elif state == 'GET DATA BITS':self.get_data_bits(rxtx, signal)
elif state == 'GET PARITY BIT':self.get_parity_bit(rxtx, signal)
elif state == 'GET STOP BITS':self.get_stop_bits(rxtx, signal)
- 狀態機驅動:根據當前狀態調用對應的處理方法,各狀態含義如下:
狀態 | 調用的方法 | 作用 |
---|---|---|
WAIT FOR START BIT | wait_for_start_bit | 檢測起始位下降沿,初始化幀解析狀態。 |
GET START BIT | get_start_bit | 驗證起始位有效性,重置數據緩存,準備接收數據位。 |
GET DATA BITS | get_data_bits | 逐位接收數據,組合為整數值,格式化輸出。 |
GET PARITY BIT | get_parity_bit | 驗證校驗位合法性,標記幀有效性。 |
GET STOP BITS | get_stop_bits | 檢查停止位是否為高電平,完成幀解析后推進至空閑狀態。 |
執行流程示例
場景:接收一幀 8N1 數據(無校驗,1停止位)
- 初始狀態:
state = 'WAIT FOR START BIT'
- 檢測到下降沿(反轉后為低電平),調用
wait_for_start_bit
:- 記錄起始位樣本點,狀態變為
'GET START BIT'
。
- 記錄起始位樣本點,狀態變為
- 檢測到下降沿(反轉后為低電平),調用
- 處理起始位:
state = 'GET START BIT'
- 調用
get_start_bit
:- 驗證起始位為低電平,狀態變為
'GET DATA BITS'
。
- 驗證起始位為低電平,狀態變為
- 調用
- 接收數據位:
state = 'GET DATA BITS'
- 循環調用
get_data_bits
8次:- 每次讀取一位數據,存儲并更新數據值。
- 完成8位后,狀態變為
'GET STOP BITS'
。
- 循環調用
- 處理停止位:
state = 'GET STOP BITS'
- 調用
get_stop_bits
:- 驗證停止位為高電平,狀態回到
'WAIT FOR START BIT'
。
- 驗證停止位為高電平,狀態回到
- 調用
inspect_edge
def inspect_edge(self, rxtx, signal, inv):# Inspect edges, independently from traffic, to detect break conditions.if inv:signal = not signalif not signal:# Signal went low. Start another interval.self.break_start[rxtx] = self.samplenumreturn# Signal went high. Was there an extended period with low signal?if self.break_start[rxtx] is None:returndiff = self.samplenum - self.break_start[rxtx]if diff >= self.break_min_sample_count:ss, es = self.frame_start[rxtx], self.samplenumself.handle_break(rxtx, ss, es)self.break_start[rxtx] = Nonedef inspect_idle(self, rxtx, signal, inv):# Check each edge and each period of stable input (either level).# Can derive the "idle frame period has passed" condition.if inv:signal = not signalif not signal:# Low input, cease inspection.self.idle_start[rxtx] = Nonereturn# High input, either just reached, or still stable.if self.idle_start[rxtx] is None:self.idle_start[rxtx] = self.samplenumdiff = self.samplenum - self.idle_start[rxtx]if diff < self.frame_len_sample_count:returnss, es = self.idle_start[rxtx], self.samplenumself.handle_idle(rxtx, ss, es)self.idle_start[rxtx] = es
- 通過檢測信號邊沿來識別 BREAK 條件(持續低電平超過一幀時間)
-
信號反轉:
if inv:signal = not signal
- 適配硬件電平邏輯(如 RS-232 使用負邏輯)。
-
檢測下降沿(低電平):
if not signal:self.break_start[rxtx] = self.samplenumreturn
- 當信號變為低電平時,記錄起始樣本點
break_start[rxtx]
。
- 當信號變為低電平時,記錄起始樣本點
-
檢測上升沿(高電平):
if self.break_start[rxtx] is None:return diff = self.samplenum - self.break_start[rxtx] if diff >= self.break_min_sample_count:ss, es = self.frame_start[rxtx], self.samplenumself.handle_break(rxtx, ss, es) self.break_start[rxtx] = None
-
當信號恢復高電平時,計算低電平持續時間
diff
。 -
若
diff
超過閾值break_min_sample_count
(通常為一幀時間的樣本數),調用handle_break
處理 BREAK 條件。 -
潛在問題:
ss
錯誤地使用frame_start[rxtx]
(應為break_start[rxtx]
),需修正為:ss, es = self.break_start[rxtx], self.samplenum
-
- 檢測 空閑狀態(持續高電平超過一幀時間),用于驗證幀間間隔的合規性。
-
信號反轉:
if inv:signal = not signal
-
低電平處理:
if not signal:self.idle_start[rxtx] = Nonereturn
- 檢測到低電平時,重置空閑起始點
idle_start[rxtx]
。
- 檢測到低電平時,重置空閑起始點
-
高電平處理:
if self.idle_start[rxtx] is None:self.idle_start[rxtx] = self.samplenum diff = self.samplenum - self.idle_start[rxtx] if diff < self.frame_len_sample_count:return ss, es = self.idle_start[rxtx], self.samplenum self.handle_idle(rxtx, ss, es) self.idle_start[rxtx] = es
- 持續高電平時,計算持續時間
diff
。 - 若超過閾值
frame_len_sample_count
(通常為一幀時間),調用handle_idle
處理空閑狀態。 - 潛在問題:更新
idle_start[rxtx] = es
可能導致后續空閑段被分割。建議改為idle_start[rxtx] = None
以重新檢測連續空閑。
- 持續高電平時,計算持續時間
decode
def decode(self):if not self.samplerate:raise SamplerateError('Cannot decode without samplerate.')has_pin = [self.has_channel(ch) for ch in (RX, TX)]if not True in has_pin:raise ChannelError('Need at least one of TX or RX pins.')opt = self.optionsinv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']cond_data_idx = [None] * len(has_pin)# Determine the number of samples for a complete frame's time span.# A period of low signal (at least) that long is a break condition.frame_samples = 1 # STARTframe_samples += self.options['data_bits']frame_samples += 0 if self.options['parity'] == 'none' else 1frame_samples += self.options['stop_bits']frame_samples *= self.bit_widthself.frame_len_sample_count = ceil(frame_samples)self.break_min_sample_count = self.frame_len_sample_countcond_edge_idx = [None] * len(has_pin)cond_idle_idx = [None] * len(has_pin)while True:conds = []if has_pin[RX]:cond_data_idx[RX] = len(conds)conds.append(self.get_wait_cond(RX, inv[RX]))cond_edge_idx[RX] = len(conds)conds.append({RX: 'e'})cond_idle_idx[RX] = Noneidle_cond = self.get_idle_cond(RX, inv[RX])if idle_cond:cond_idle_idx[RX] = len(conds)conds.append(idle_cond)if has_pin[TX]:cond_data_idx[TX] = len(conds)conds.append(self.get_wait_cond(TX, inv[TX]))cond_edge_idx[TX] = len(conds)conds.append({TX: 'e'})cond_idle_idx[TX] = Noneidle_cond = self.get_idle_cond(TX, inv[TX])if idle_cond:cond_idle_idx[TX] = len(conds)conds.append(idle_cond)(rx, tx) = self.wait(conds)if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:self.inspect_sample(RX, rx, inv[RX])if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:self.inspect_edge(RX, rx, inv[RX])self.inspect_idle(RX, rx, inv[RX])if cond_idle_idx[RX] is not None and self.matched[cond_idle_idx[RX]]:self.inspect_idle(RX, rx, inv[RX])if cond_data_idx[TX] is not None and self.matched[cond_data_idx[TX]]:self.inspect_sample(TX, tx, inv[TX])if cond_edge_idx[TX] is not None and self.matched[cond_edge_idx[TX]]:self.inspect_edge(TX, tx, inv[TX])self.inspect_idle(TX, tx, inv[TX])if cond_idle_idx[TX] is not None and self.matched[cond_idle_idx[TX]]:self.inspect_idle(TX, tx, inv[TX])
- UART 協議解析器的入口方法,負責協調整個解析流程。其核心任務包括:
- 初始化校驗:確保采樣率和通道配置有效。
- 計算幀參數:確定幀長度和 BREAK 條件的最小樣本數。
- 事件循環:監聽 RX/TX 通道的各類事件(數據、邊沿、空閑),并調用對應的處理方法。
初始校驗
if not self.samplerate:raise SamplerateError('Cannot decode without samplerate.')has_pin = [self.has_channel(ch) for ch in (RX, TX)]
if not True in has_pin:raise ChannelError('Need at least one of TX or RX pins.')
- 校驗采樣率:必須配置采樣率以計算時間相關參數。
- 校驗通道:至少啟用 RX 或 TX 通道。
信號反轉配置
opt = self.options
inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
inv
列表:標記 RX 和 TX 是否需要信號反轉。例如,inv[RX] = True
表示 RX 通道的電平邏輯需取反。
幀參數計算
frame_samples = 1 # START
frame_samples += self.options['data_bits']
frame_samples += 0 if self.options['parity'] == 'none' else 1
frame_samples += self.options['stop_bits']
frame_samples *= self.bit_width
self.frame_len_sample_count = ceil(frame_samples)
self.break_min_sample_count = self.frame_len_sample_count
-
幀長度計算:
- 起始位:固定 1 位。
- 數據位:
data_bits
位。 - 校驗位:根據配置(
parity
)決定是否添加。 - 停止位:直接累加
stop_bits
(可能為浮點數,如 1.5)。 - 總樣本數:總位數 × 位寬(
bit_width
)。
-
問題:浮點停止位直接累加到整數
frame_samples
會導致精度丟失。
改進建議:frame_samples = (1 + # STARTself.options['data_bits'] +(0 if self.options['parity'] == 'none' else 1) +self.options['stop_bits'] ) * self.bit_width self.frame_len_sample_count = ceil(frame_samples)
主事件循環
while True:conds = []# 為 RX 和 TX 通道構建等待條件if has_pin[RX]:# 數據條件(如等待起始位或采樣點)cond_data_idx[RX] = len(conds)conds.append(self.get_wait_cond(RX, inv[RX]))# 邊沿條件(檢測任何邊沿)cond_edge_idx[RX] = len(conds)conds.append({RX: 'e'})# 空閑條件(持續高電平)cond_idle_idx[RX] = Noneidle_cond = self.get_idle_cond(RX, inv[RX])if idle_cond:cond_idle_idx[RX] = len(conds)conds.append(idle_cond)# 同理處理 TX 通道...# 等待任一條件觸發(rx, tx) = self.wait(conds)# 處理 RX 通道觸發的事件if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:self.inspect_sample(RX, rx, inv[RX])if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:self.inspect_edge(RX, rx, inv[RX])self.inspect_idle(RX, rx, inv[RX])if cond_idle_idx[RX] is not None and self.matched[cond_idle_idx[RX]]:self.inspect_idle(RX, rx, inv[RX])# 同理處理 TX 通道觸發的事件...
步驟分解
- 構建等待條件列表
conds
:- 數據條件:由
get_wait_cond
返回,如等待起始位下降沿或數據采樣點。 - 邊沿條件:監聽通道的任何邊沿事件(
'e'
)。 - 空閑條件:由
get_idle_cond
返回,如持續高電平超時。
- 數據條件:由
- 等待事件觸發:
self.wait(conds)
阻塞直到任一條件滿足,返回當前 RX/TX 信號值。
- 處理觸發事件:
- 數據條件:調用
inspect_sample
解析數據位。 - 邊沿條件:調用
inspect_edge
檢測 BREAK 條件,并檢查空閑狀態。 - 空閑條件:直接調用
inspect_idle
。
- 數據條件:調用
方法 | 作用 |
---|---|
get_wait_cond(rxtx, inv) | 返回當前狀態下的等待條件(如邊沿或采樣點跳轉)。 |
get_idle_cond(rxtx, inv) | 返回檢測空閑狀態的條件(如持續高電平超過閾值)。 |
inspect_sample | 解析數據位、校驗位或停止位。 |
inspect_edge | 檢測 BREAK 條件(長低電平)。 |
inspect_idle | 處理空閑狀態(持續高電平)。 |