代碼
主python文件
import serial
import serial.tools.list_ports
import time
import tkinter as tk
from tkinter import ttk
import numpy as np
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import threading
from queue import Queue
from PIL import Image, ImageTk
import tkinter as tk
import LightSwitch
import tkinter as tk
from datetime import datetime
import cv2 as cv
import cv2
import numpy as np
import os
'''
智能紅綠燈上位機和MCU通信協議:
MCU->上位機 0x01 0xfe ...(320*240*2)BYTE...0xfe 0x01
上位機->MCU 0x02 0xff 0xXX 0xXX 0xXX 0xXX 0xff 0x02
字節2:高5bit傳輸當前系統時間0h-23h;低3bit傳輸時間增減標志位(0非深夜和高峰模式 1高峰模式 2深夜模式)
字節3:按鈕值(0無按鍵 1按鍵+ 2按鍵- 3按鍵切換燈)
字節4:0空閑(識別關閉);如果無按鍵和深夜或者高峰模式(判斷標志位),當接收到圖像數據后進行識別,否則接收到的圖像數據丟棄,若圖片中方形白色>20個,高4bit發送綠燈增加5s標志位1;當白色方塊0<白色方塊數<20,正常計時標志2(也為默認開啟值);當白色方形==0,發送下次紅燈長紅標志位3。
字節5:識別到的車輛數量
'''
# 初始化窗口和控件
window = tk.Tk()
window.title('智能紅綠燈串口助手')
window.geometry('900x600+200+100')
serialName = []#串口名稱
ser = serial.Serial() # 串口實例對象def opencom(*args):if not ser.is_open:ser.open()setbaudrate()setPort()setstop()setShujv()setcheck()if ser.is_open:print(f"串口已打開")light.turn_on()def closecom(*args):if ser.is_open:ser.close()if not ser.is_open:print(f"串口已關閉")light.turn_off()def clearreceive(*args):t0.delete(1.0, 'end')def send(*args):passdef hex_string_to_byte_list(hex_str):if len(hex_str) % 2 != 0:raise ValueError("Hex string length must be even.")return [int(hex_str[i:i+2], 16) for i in range(0, len(hex_str), 2)]Str_Get_List = []#存儲接收到的Hex字符串轉成的十六進制字節列表
Get_Picture_List = [] #存儲圖像數據
PackHead = [1,254]
PackEnd = [254,1]
PackHeadGetFlag = False
PackGetFlag = False
Picture_ByteLen = 320*240*2+4
lock = threading.Lock()# 創建一個線程鎖
rgb_array = []#存儲RGB888轉換的數組數據,用于保存圖片
SendData = [0x02,0xff,0x00,0x00,0x00,0x00,0xff,0x02]#存儲發送數據包
IdentificationCount = 0 #圖像識別數量
def showdata(*args):global Str_GetStringglobal Str_Get_Listglobal PackHeadGetFlagglobal PackGetFlagwhile not stop_event.is_set():if ser.is_open:try:try:with lock: # 使用鎖確保線程安全s = ser.read_all() # 獲取字節對象if s:sHex = s.hex() # 獲取Hex字符串 ,2個字符表示一個十六進制1字節t0.delete("1.0", tk.END)t0.insert('end', sHex)t0.see('end') # 自動滾動到最新接收到的數據# Str_Get_List = Str_Get_List.append(hex_string_to_byte_list(sHex))# print(sHex,len(Str_Get))#獲取數據列表sHexList = hex_string_to_byte_list(sHex)for i in sHexList:Str_Get_List.append(i)#處理數據和發送相關數據包if Hour_CarFlag ==0:pictureDataDispose()else:print("特殊模式,攝像頭不啟用")#情況接收到的數據Str_Get_List = []except Exception as e:print(f"Unexpected error: {e}")except serial.SerialException as e:print(f"Error reading from serial port: {e}")except Exception as e:print(f"Unexpected error: {e}")else:passdef pictureDataDispose(*args):global PackHeadGetFlagglobal Get_Picture_Listglobal Picture_ByteLenglobal PackHeadglobal Str_Get_Listglobal rgb_array#print(Str_Get_List)if not PackHeadGetFlag:#未接收到包頭index_Str_Get_List = find_sublist(Str_Get_List,PackHead)if index_Str_Get_List!=-1:Get_Picture_List = Str_Get_List[index_Str_Get_List:]PackHeadGetFlag = Trueprint(f"Element {PackHead} found at index {index_Str_Get_List}")else:print(f"Element {PackHead} not found")else:Get_Picture_List.extend(Str_Get_List)#添加所有元素到尾部if len(Get_Picture_List) >= Picture_ByteLen:Get_Picture_List_PackEnd = Get_Picture_List[Picture_ByteLen-2:Picture_ByteLen]index_PackEnd = find_sublist(Get_Picture_List_PackEnd,PackEnd)if index_PackEnd==0:print("接收到一幀圖片數據",print(len(Get_Picture_List)))PictureData = Get_Picture_List[2:Picture_ByteLen-2]# 轉換圖像數據rgb565_values = [(PictureData[i] << 8 | PictureData[i + 1]) for i in range(0, len(PictureData), 2)]rgb888_values = [rgb565_to_rgb888(val) for val in rgb565_values]#print(rgb888_values, len(rgb888_values), len(rgb565_values))update_image(rgb888_values)# 將RGB數據轉換為numpy數組,并調整其形狀以匹配圖像尺寸和通道數(高度, 寬度, 通道)rgb_array = np.array(rgb888_values).reshape((240, 320, 3))#print(rgb_array)SavePicture(rgb_array)PictureIdentification()#---------------------處理第二幀圖片包頭Get_Picture_List = []PackHeadGetFlag = Falseelse:Get_Picture_List = []PackHeadGetFlag = Falsedef PictureIdentification(*args):global IdentificationCountimg = cv.imread('./image/2.jpg')# cv_show('img', img)#print(img)HSV = cv.cvtColor(img, cv.COLOR_BGR2HSV) # 轉換圖像lowerColor = np.array([0, 0, 200]) # 設置最低閾值 np.array([Hmin, Smin, Vmin])upperColor = np.array([180, 20, 255]) # 設置最高閾值# 因為要保留的就是白色區域,因此根據白色閾值填入,提取白色部分(指定區域變白,其他變黑)binary = cv.inRange(HSV, lowerColor, upperColor)#print(binary)# 運用中值濾波去除噪聲median = cv.medianBlur(binary, 9)# 顯示二值圖#cv_show('median', median) # 這里是我自己寫的cv_show()函數,函數聲明可放開頭contours, hierachy = cv.findContours(median, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE)# 后兩個為輪廓檢索模式和輪廓逼近模式res = cv.drawContours(img, contours, -1, (0, 0, 255), 4)#cv_show('res', res)# 4、原圖白色中心點L = len(contours) # contours輪廓數據是數組,因此用len()測數組長度,為了循環畫點使用IdentificationCount = L#識別白色塊的數量for i in range(L):cnt = contours[i] # cnt表示第i個白色快的輪廓信息(x, y), radius = cv.minEnclosingCircle(cnt) # 得到白色塊外接圓的圓心坐標和半徑center = (int(x), int(y)) # 畫center圓心時。x,y必須是整數# 標出中心點img2 = cv.circle(img, center, 3, (0, 0, 255), 5) # 傳入圓心信息,并畫在原圖上print(center) # 輸出各個中心點# 顯示有中心點的圖像#cv_show("frame", img2) # 展示花了中心點的魔方圖# 寫一個顯示函數
def cv_show(name, img):cv.imshow(name, img)cv.waitKey(0)cv.destroyAllWindows()def SavePicture(*args):global rgb_array# 檢查并創建保存圖像的目錄(如果不存在)output_dir = './image'os.makedirs(output_dir, exist_ok=True)# 圖像保存路徑output_path = os.path.join(output_dir, '2.jpg')# 使用OpenCV將RGB圖像保存為JPEG格式cv2.imwrite(output_path, rgb_array[:, :, ::-1]) # 注意這里對顏色通道進行了轉換(BGR到RGB)print(f"Image saved successfully at {output_path}")# def dispose_LastData(*args):
# global PackHeadGetFlag
# global Get_Picture_List
# global Picture_ByteLen
# global PackHead
#
# if len(Get_Picture_List[Picture_ByteLen:]) >= 2:
# Get_Picture_List_last = Get_Picture_List[Picture_ByteLen:] # 保留剩下的數據
# index_Str_Get_List = find_sublist(Get_Picture_List_last, PackHead) # 判斷剩下的數據
# if index_Str_Get_List != -1:
# Get_Picture_List = Get_Picture_List_last[index_Str_Get_List:]
# print(f"Element {PackHead} found at index {index_Str_Get_List}")
# else:
# print(f"Element {PackHead} not found")
# Get_Picture_List = []
# PackHeadGetFlag = False
# print(f"剩余非圖片數據")
# else :
# Get_Picture_List = []
# PackHeadGetFlag = Falsedef update_image(rgb888_data):"""更新Canvas顯示圖像"""image = Image.new('RGB', (320, 240))image.putdata(rgb888_data)tk_image = ImageTk.PhotoImage(image)# 保持圖像引用if hasattr(canvas, 'image'):canvas.image = tk_imageelse:canvas.image = tk_imagecanvas.create_image(0, 0, anchor="nw", image=tk_image)print("圖像已更新")def rgb565_to_rgb888(rgb565):"""將單個RGB565值轉換為RGB888元組"""# 提取紅色部分 (5位),并將其從5位擴展到8位r = (rgb565 & 0xF800) >> 11r = int(r * 255 / 31)# 提取綠色部分 (6位),并將其從6位擴展到8位g = (rgb565 & 0x07E0) >> 5g = int(g * 255 / 63)# 提取藍色部分 (5位),并將其從5位擴展到8位b = rgb565 & 0x001Fb = int(b * 255 / 31)return (r, g, b)# 設置串口參數
def setPort(*args):ser.port = comboxlist0.get()# 獲取可用串口列表
port_list = list(serial.tools.list_ports.comports())
for port in port_list:serialName.append(port[0])def closethread(*args):while ser.is_open:ser.close()if not ser.is_open:breakwhile not ser.is_open:print(f"串口已關閉")stop_event.set()stop_event1.set()stop_event2.set()if stop_event.is_set() and stop_event1.is_set() and stop_event2.is_set():breakwhile stop_event2.is_set():print(f"時間線程等待stop_event2事件關閉")th2.join()if not th2.is_alive():breakwhile stop_event1.is_set():print(f"時間線程等待stop_event1事件關閉")th1.join()if not th1.is_alive():breakwhile stop_event.is_set():print(f"串口線程等待stop_event事件關閉")th0.join()window.quit()if not th0.is_alive():breakdef printfData(*args):if not ser.is_open:print("串口未打開")returnser.write(SendData)def find_sublist(main_list, sub_list):"""查找子列表在主列表中的位置"""len_main, len_sub = len(main_list), len(sub_list)if len_sub == 0 or len_main < len_sub:return -1for i in range(len_main - len_sub + 1):if main_list[i:i+len_sub] == sub_list:return ireturn -1# 設置串口參數
def setPort(*args):ser.port = comboxlist0.get()def setbaudrate(*args):while True:baudrate = int(comboxlist1.get())ser.baudrate = baudrateif ser.baudrate == baudrate:breakdef setShujv(*args):ser.bytesize = int(comboxlist2.get())check = {"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD}
def setcheck(*args):ser.parity = check[comboxlist3.get()]stop = {"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO}
def setstop(*args):ser.stopbits = stop[comboxlist4.get()]# 獲取當前系統時間的小時并更新 Label
def get_current_hour():global current_hourglobal Hour_CarFlag# 獲取當前時間now = datetime.now()# 提取小時(0-23)current_hour = now.hour# 更新 Label 的文本label6.config(text = f"{now}")label8.config(text=f"{IdentificationCount}")#當前時間對應模式和標志位判斷'''current_hour其他時間 : 0 正常模式(使用攝像頭功能)7-9 and 16-18 : 1 ,早晚高峰,綠燈增加10s22-23 and 0-3 : 2 ,深夜模式,紅燈時間變為20s'''if (current_hour>=7 and current_hour<=9)or(current_hour>=16 and current_hour<=18):Hour_CarFlag = 1elif (current_hour>=22 and current_hour<=23)or(current_hour>=0 and current_hour<=3):Hour_CarFlag = 2else:Hour_CarFlag = 0current_hour = 0 #當前h
Hour_CarFlag = 0 #模式標志位 0正常模式(使用攝像頭) 1早晚高峰模式 2深夜模式
#th1線程
def TimeShow(*args):while not stop_event1.is_set():get_current_hour()time.sleep(0.5)#th2線程
def SendDatathread(*args):global SendDataglobal current_hourglobal IdentificationCountwhile not stop_event2.is_set():time.sleep(1)#1s發送1次數據#修改發送的數據if Hour_CarFlag == 0:#正常模式if IdentificationCount==0:SendData[4] = 3 #下次紅燈停止計數標志elif IdentificationCount >= 20:SendData[4] = 1 #車多,綠燈增加5s標志位,正常計時elif IdentificationCount > 0 and IdentificationCount < 20:SendData[4] = 2 #正常計時標志位elif Hour_CarFlag == 1:#高峰模式SendData[4] = 0 #攝像頭失能 正常計時IdentificationCount = 0 #識別車輛數清零elif Hour_CarFlag == 2:#夜間模式SendData[4] = 0 #攝像頭失能 正常計時IdentificationCount = 0 #識別車輛數清零SendData[2] = ((current_hour<<3)&0xff)+Hour_CarFlagSendData[3] = 0 #按鍵清空,方式按鍵事件重復SendData[5] = IdentificationCount #備用 當前采集車輛#發送if not ser.is_open:print("串口未打開")else:ser.write(SendData)print(SendData)#信號燈時間增點擊事件
def LightTimeIncrease(*args):global SendData'''SendData[3]無按鍵:0當前燈時間增加:1 當前信號燈增加10s 最大99s當前燈時間減少:2 當前信號燈減少10s 最小5s切換燈:3 當前信號燈變為5s'''SendData[3] = 1 # 按鍵清空ser.write(SendData)SendData[3] = 0 # 按鍵清空#信號燈時間減點擊事件
def LightTimeDecrease(*args):SendData[3] = 2 # 按鍵清空ser.write(SendData)SendData[3] = 0 # 按鍵清空#信號燈切換點擊事件
def SwihchOver(*args):SendData[3] = 3 # 按鍵清空ser.write(SendData)SendData[3] = 0 # 按鍵清空# 創建文本控件和下拉列表控件
label0 = tk.Label(window, text='串口', font=10)
label0.place(y=40, x=50)
label1 = tk.Label(window, text='波特率', font=10)
label1.place(y=70, x=50)
label2 = tk.Label(window, text='數據位', font=10)
label2.place(y=100, x=50)
label3 = tk.Label(window, text='校驗位', font=10)
label3.place(y=130, x=50)
label4 = tk.Label(window, text='停止位', font=10)
label4.place(y=160, x=50)
label5 = tk.Label(window, text='時間:', font=10)
label5.place(y=5, x=350)
label6 = tk.Label(window, text='XXXX', font=10)
label6.place(y=5, x=420)
label7 = tk.Label(window, text='車輛:', font=10)
label7.place(y=5, x=700)
label8 = tk.Label(window, text='XXXX', font=10)
label8.place(y=5, x=770)comvalue0 = tk.StringVar()
comboxlist0 = ttk.Combobox(window, textvariable=comvalue0)
comboxlist0["values"] = tuple(serialName)
comboxlist0.bind("<<ComboboxSelected>>", setPort)
comboxlist0.place(y=40, x=150)#波特率下拉框
comvalue1 = tk.StringVar()
comboxlist1 = ttk.Combobox(window, textvariable=comvalue1)
comboxlist1["values"] = (1200, 2400, 4800, 9600, 14400, 19200, 115200, 256000)
comboxlist1.current(7)
comboxlist1.bind("<<ComboboxSelected>>", setbaudrate)
comboxlist1.place(y=70, x=150)#數據位下拉框
comvalue2 = tk.StringVar()
comboxlist2 = ttk.Combobox(window, textvariable=comvalue2)
comboxlist2["values"] = ("8", "7", "6", "5")
comboxlist2.current(0)
comboxlist2.bind("<<ComboboxSelected>>", setShujv)
comboxlist2.place(y=100, x=150)#校驗位下拉框
comvalue3 = tk.StringVar()
comboxlist3 = ttk.Combobox(window, textvariable=comvalue3)
comboxlist3["values"] = ("N", "E", "O")
comboxlist3.current(0)
comboxlist3.bind("<<ComboboxSelected>>", setcheck)
comboxlist3.place(y=130, x=150)#停止位下拉框
comvalue4 = tk.StringVar()
comboxlist4 = ttk.Combobox(window, textvariable=comvalue4)
comboxlist4["values"] = ("1", "1.5", "2")
comboxlist4.current(0)
comboxlist4.bind("<<ComboboxSelected>>", setstop)
comboxlist4.place(y=160, x=150)b4 = tk.Button(window, text='關閉線程', width=35, height=1, command=closethread)
b4.place(y=0, x=0)
b0 = tk.Button(window, text='打開串口', width=35, height=1, command=opencom)
b0.place(y=190, x=50)
light = LightSwitch.BooleanLight(window, size=35, x=10, y=190)#串口連接狀態顯示燈b1 = tk.Button(window, text='清除接收', width=35, height=1, command=clearreceive)
b1.place(y=250, x=50)
b2 = tk.Button(window, text='關閉串口', width=35, height=1, command=closecom)
b2.place(y=220, x=50)
e0 = tk.Entry(window, show=None, width=35)
e0.place(y=290, x=50)
b5 = tk.Button(window, text='Send', width=35, height=1, command=send)
b5.place(y=320, x=50)
t0 = tk.Text(window, width=65, height=3)
t0.place(y=50, x=350)
b6 = tk.Button(window, text='輸出接收到的數據-測試按鈕', width=35, height=1, command=printfData)
b6.place(y=350, x=50)
b6 = tk.Button(window, text='當前信號燈時間+', width=35, height=1, command=LightTimeIncrease)
b6.place(y=380, x=50)
b6 = tk.Button(window, text='當前信號燈時間-', width=35, height=1, command=LightTimeDecrease)
b6.place(y=410, x=50)
b6 = tk.Button(window, text='切換信號燈', width=35, height=1, command=SwihchOver)
b6.place(y=440, x=50)
label5 = tk.Label(window, text='Canvas', font=10)
label5.place(y=100, x=350)
# 圖像參數
canvas = tk.Canvas(window,width=320, height=240,bg='#{:02x}{:02x}{:02x}'.format(*(255,255,255)),bd=1,highlightthickness=1,relief="groove")
canvas.place(x=420, y=125)if __name__ == '__main__':stop_event = threading.Event() #等待線程th0停止事件stop_event1 = threading.Event() # 等待線程th1停止事件stop_event2 = threading.Event() # 等待線程th2停止事件th0 = threading.Thread(target=showdata)th0.start()th1 = threading.Thread(target=TimeShow)th1.start()th2 = threading.Thread(target=SendDatathread)th2.start()window.mainloop()
?從文件,同目錄下LightSwitch.py
import tkinter as tk
class BooleanLight(tk.Canvas):def __init__(self, master=None, size=50, x=0, y=0):super().__init__(master, width=size, height=size)self.pack()self.place(x=x, y=y)self.size = sizeself.is_light_on = False # 初始狀態為關閉# 創建初始的圓形表示燈的狀態self.light = self.create_oval(5, 5, size - 5, size - 5, fill="gray")def turn_on(self):"""打開燈"""self.itemconfig(self.light, fill="green")self.is_light_on = Truedef turn_off(self):"""關閉燈"""self.itemconfig(self.light, fill="gray")self.is_light_on = Falsedef toggle(self):"""切換燈的狀態"""if self.is_light_on:self.turn_off()else:self.turn_on()
?完成功能
- tk、ttk基礎的UI界面建立,并建立相關列表和按鈕的電機事件方法。
- 選擇串口參數,每次打開時自動獲取串口端口列表,可以選擇后在進行連接(顯示連接成功)。
- 通過多線程,串口接收數據,十六進制字節對象字符串顯示。按鈕發送輸入框內容(有函數但是目前沒寫,可以自己寫,比較簡單)。
- 建立3個線程(和三個線程關閉事件,防止線程異常不能正確結束導致的死循環),分別用來接收數據包、發送數據包、更新界面顯示內容。
- 根據接收的數據包,在canvas進行圖像顯示(先將獲取的RGB565數據轉換為RGB888,數據進行保存成圖片保存在文件夾中、然后從文件夾內讀出,進行HSV處理,閾值化處理,邊框圈出,計算白色內容數量)。
- 實時顯示系統時間和接收數據包圖像采集的車輛數量。
- 手動關閉線程,每次點擊關閉線程來關閉應用,確保線程徹底關閉。