圖像畸變-徑向切向畸變實時圖像RTSP推流

實驗環境

注意:ffmpeg進程stdin寫入兩張圖片的時間間隔不能太長,否則mediamtx會出現對應的推流session超時退出。

在這里插入圖片描述
在這里插入圖片描述

實驗效果

在這里插入圖片描述

全部代碼

my_util.py

#進度條
import os
import sys
import time
import shutil
import logging
import time
from datetime import datetimedef print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, fill='█', print_end="\r"):"""調用在Python終端中打印自定義進度條的函數iteration - 當前迭代(Int)total - 總迭代(Int)prefix - 前綴字符串(Str)suffix - 后綴字符串(Str)decimals - 正數的小數位數(Int)length - 進度條的長度(Int)fill - 進度條填充字符(Str)print_end - 行尾字符(Str)"""percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))filled_length = int(length * iteration // total)bar = fill * filled_length + '-' * (length - filled_length)print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)# 打印新行,完成進度條if iteration == total:print()class Logger(object):"""終端打印不同顏色的日志"""ch = logging.StreamHandler()  # 創建日志處理器對象,在__init__外創建,是類當中的靜態屬性,不是__init__中的實例屬性# #創建靜態的日志處理器可以減少內存消耗# # 創建 FileHandler 實例,指定日志文件路徑# ch = logging.FileHandler(filename='app1.log')def __init__(self):self.logger = logging.getLogger()  # 創建日志記錄對象self.logger.setLevel(logging.DEBUG)  # 設置日志等級info,其他低于此等級的不打印def debug(self, message):self.fontColor('\033[0;37m%s\033[0m')self.logger.debug(message)def info(self, message):self.fontColor('\033[0;32m%s\033[0m')self.logger.info(message)def warning(self, message):self.fontColor('\033[0;33m%s\033[0m')self.logger.warning(message)def error(self, message):self.fontColor('\033[0;31m%s\033[0m')self.logger.error(message)def fontColor(self, color):formatter = logging.Formatter(color % '%(asctime)s - %(name)s - %(levelname)s - %(message)s')  # 控制日志輸出顏色self.ch.setFormatter(formatter)self.logger.addHandler(self.ch)  # 向日志記錄對象中加入日志處理器對象def delete_files(folder_path, max_files):"""監控指定文件夾中的文件數量,并在超過max_files時刪除最舊的文件。"""print("進入刪除圖片文件夾"+folder_path)print("需要刪除文件數量")print(max_files)if True:# 獲取文件夾中的文件列表files = os.listdir(folder_path)file_count = len(files)print(f"當前文件夾 {folder_path} 中的文件數量: {file_count}")# 如果文件數量超過max_files,則刪除最舊的文件if file_count > max_files:# 獲取文件夾中所有文件的完整路徑,并帶上修改時間file_paths_with_mtime = [(os.path.join(folder_path, f), os.path.getmtime(os.path.join(folder_path, f))) forf in files]# 按修改時間排序sorted_files = sorted(file_paths_with_mtime, key=lambda x: x[1])# 刪除最舊的文件,直到文件數量在閾值以下for file_path, mtime in sorted_files[:file_count - max_files]:try:os.remove(file_path)print(f"已刪除文件: {file_path}")except OSError as e:print(f"刪除文件時出錯: {e.strerror}")def copy_file(src, dst):shutil.copy2(src, dst)  # copy2會嘗試保留文件的元數據def end_sentence(text, max_length):'''保證在max_length長度前以句號或點號結束文本:param text: 文本:param max_length: 最大長度:return:'''# 如果文本長度已經超過最大長度,則直接截斷if len(text) > max_length:text = text[:max_length]# print("結果長度 {}".format(len(text)))# 查找句號的位置(en)period_index = max(text.rfind('.'), text.rfind(','),text.rfind(':'), text.rfind(';'),text.rfind('!'), text.rfind('?'))  # 從后往前找,找到最后一個句號# 如果找到了句號且它在最大長度內if period_index != -1 and (period_index + 1 < max_length ormax_length == -1):# 如果需要替換,則替換句號text = text[:period_index] + '.'# 查找句號的位置(cn)period_index = max(text.rfind('。'), text.rfind(','),text.rfind(':'), text.rfind(';'),text.rfind('!'), text.rfind('?'))  # 從后往前找,找到最后一個句號# 如果找到了句號且它在最大長度內if period_index != -1 and (period_index + 1 < max_length ormax_length == -1):# 如果需要替換,則替換句號text = text[:period_index] + '。'return textimport base64def encode_base64(input_string):"""對字符串進行Base64編碼"""encoded_bytes = base64.b64encode(input_string.encode('utf-8'))encoded_string = encoded_bytes.decode('utf-8')return encoded_stringdef decode_base64(input_string):"""對Base64編碼的字符串進行解碼"""decoded_bytes = base64.b64decode(input_string.encode('utf-8'))decoded_string = decoded_bytes.decode('utf-8')return decoded_stringimport socketdef get_local_ip():try:# 創建一個 UDP 套接字s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)# 連接到一個公共的 IP 地址和端口s.connect(("8.8.8.8", 80))# 獲取本地 IP 地址local_ip = s.getsockname()[0]s.close()return local_ipexcept Exception as e:print(f"獲取本地 IP 地址時出錯: {e}")return None

make_pics.py

import numpy as np
import cv2
import math
import time
from PIL import Image, ImageDraw, ImageFontdef distort_image(image, k1, k2, p1, p2):"""對圖像應用徑向和切向畸變:param image: 輸入圖像:param k1: 徑向畸變系數:param k2: 徑向畸變系數:param p1: 切向畸變系數:param p2: 切向畸變系數:return: 畸變后的圖像"""h, w = image.shape[:2]camera_matrix = np.array([[w, 0, w / 2],[0, h, h / 2],[0, 0, 1]], dtype=np.float32)distort_coeffs = np.array([k1, k2, p1, p2, 0], dtype=np.float32)# 生成畸變映射map1, map2 = cv2.initUndistortRectifyMap(camera_matrix, distort_coeffs, np.eye(3), camera_matrix, (w, h), cv2.CV_32FC1)# 應用畸變映射distorted_img = cv2.remap(image, map1, map2, cv2.INTER_LINEAR)return distorted_imgdef put_chinese_text(img, text, position, font_path, font_size, color):"""在圖像上添加中文文字:param img: 輸入圖像:param text: 要添加的文字:param position: 文字位置:param font_path: 字體文件路徑:param font_size: 字體大小:param color: 文字顏色:return: 添加文字后的圖像"""img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))draw = ImageDraw.Draw(img_pil)font = ImageFont.truetype(font_path, font_size)draw.text(position, text, font=font, fill=color)return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)def make_pics(pic_path="picture.jpg", k1=-0.5, k2=0.0, p1=0.0, p2=0.0):# 生成棋盤圖像chessboard = np.zeros((400, 400, 3), dtype=np.uint8)for i in range(0, 400, 40):for j in range(0, 400, 40):if (i // 40 + j // 40) % 2 == 0:chessboard[i:i + 40, j:j + 40] = [255, 255, 255]# 生成雷達圖radar = chessboard.copy()x0, y0 = radar.shape[1] // 2, radar.shape[0] // 2for radius in range(0, 400, 40):cv2.circle(radar, (x0, y0), radius, (0, 0, 255), 1)# 繪制徑向線for angle in range(0, 360, 40):# 使用最大半徑 400 計算徑向線的終點坐標x = int(x0 + 400 * math.cos(math.radians(angle)))y = int(y0 + 400 * math.sin(math.radians(angle)))cv2.line(radar, (x0, y0), (x, y), (0, 0, 255), 1)font_size = 15font_color = (250, 100, 0)combined_distorted_chessboard = distort_image(radar, k1, k2, p1, p2)text1 = "k1={:.2f},k2={:.2f},p1={:.2f},p2={:.2f}".format(k1,k2,p1,p2)text2 = "圖像畸變"combined_distorted_chessboard = put_chinese_text(combined_distorted_chessboard, text1, (10, 30), 'simhei.ttf', font_size, font_color)combined_distorted_chessboard = put_chinese_text(combined_distorted_chessboard, text2, (10, 60), 'simhei.ttf', font_size, font_color)# 保存圖像cv2.imwrite(pic_path, combined_distorted_chessboard)# cv2.imshow(pic_path, combined_distorted_chessboard)# cv2.waitKey(0)# cv2.destroyAllWindows()returnif False:for k1 in np.arange(-100,100,0.1):for k2 in np.arange(-100, 100, 0.1):for p1 in np.arange(-100, 100, 0.1):for p2 in np.arange(-100, 100, 0.1):make_pics("picture.jpg", k1, k2, p1, p2)

pic_2_rtsp.py

import numpy as np
import make_pics
import sys
import msvcrt
import subprocess
import time
import shlex
import my_util
from PIL import Image, ImageDraw
import random
import oslog = my_util.Logger()
# RTSP_DEF_IP = "192.168.31.185"
RTSP_DEF_IP = my_util.get_local_ip()
RTSP_PORT = 8554
local_ip = my_util.get_local_ip()
if local_ip:RTSP_URL = "rtsp://{}:{}/live".format(local_ip, RTSP_PORT)
else:RTSP_URL = "rtsp://{}:{}/live".format(RTSP_DEF_IP, RTSP_PORT)
frame_duration = 1/25  # 每張圖片顯示的時長(秒),process.stdin.wirte寫入速度需要夠快,否則可能接收端接受數據不足無法獲取解碼信息(抓包看看)
frame_num = 0old_picname = "past.jpg"
new_picname = "now.jpg"
k1 = k2 = p1 = p2 = 0.0def generate_orig(old_picname):"""生成默認圖片"""image = Image.new('RGB', (640, 480), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))draw = ImageDraw.Draw(image)draw.text((100, 100), 'No Signal', fill=(255, 255, 255))image.save(old_picname)returndef generate_image(new_picname, k1, k2, p1, p2):"""生成圖片"""make_pics.make_pics(new_picname, k1, k2, p1, p2)return# 構建 ffmpeg 命令使用圖片序列
command_line = 'ffmpeg -loglevel error -re -i - -c:v libx264 -pix_fmt yuv420p -r {} -f rtsp {}'.format(1/frame_duration, RTSP_URL)
command_list = shlex.split(command_line)log.debug(command_list)def start_process():global processprocess = subprocess.Popen(command_list, stdin=subprocess.PIPE, text=False)log.info("Process started.")start_time = time.time()class QuitException(Exception):passtry:# 默認先生成初始圖step_length = 0.125generate_orig(old_picname)start_process()while True:for p1 in np.arange(-1 * step_length, step_length, step_length):for p2 in np.arange(-1 * step_length, step_length, step_length):for k1 in np.arange(-1, 1, step_length):for k2 in np.arange(-1, 1, step_length):log.debug("畸變系數 k1={}, k2={}, p1={}, p2={}".format(k1, k2, p1, p2))generate_image(new_picname, k1, k2, p1, p2)if msvcrt.kbhit():  # 檢查是否有鍵盤輸入input_char = msvcrt.getch().decode('utf-8')if input_char == 'q' or input_char == 'Q':try:# 向進程的標準輸入發送 'q' 并換行if process.stdin:process.stdin.write('q\n'.encode())process.stdin.flush()except Exception as e:passraise QuitException()# 持續生成新圖片替換舊圖片try:if os.path.exists(new_picname):with open(new_picname, 'rb') as f:process.stdin.write(f.read())else:with open(old_picname, 'rb') as f:process.stdin.write(f.read())except Exception as e:log.error(f"Error writing to process stdin: {e}")log.info("Restarting process...")process.terminate()try:process.wait(timeout=1)except subprocess.TimeoutExpired:process.kill()start_process()time.sleep(frame_duration)except QuitException:pass
finally:try:process.terminate()try:process.wait(timeout=1)except subprocess.TimeoutExpired:process.kill()except Exception:passtry:if os.path.exists(new_picname):os.remove(new_picname)except Exception as e:log.error(f"Error removing {new_picname}: {e}")end_time = time.time()time_cnt = end_time - start_timelog.info("FFmpeg進程已執行{}秒并通過輸入 'q' 退出。".format(round(time_cnt)))

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/78825.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/78825.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/78825.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Redis Sentinel 和 Redis Cluster 各自的原理、優缺點及適用場景是什么?

我們來詳細分析下 Redis Sentinel (哨兵) 和 Redis Cluster (集群) 這兩種方案的原理和使用場景。 Redis Sentinel (哨兵) 原理: Sentinel 本身是一個或一組獨立于 Redis 數據節點的進程。它的核心職責是監控一個 Redis 主從復制 (Master-Slave) 架構。多個 Sentinel 進程協同…

基于機器學習的電影票房預測

目錄 摘 要(完整下載鏈接附在文末) Abstract 1 緒 論 1.1 研究背景概述 1.2 國內外相關領域研究進展 1.3 電影票房預測技術概覽 1.3.1 利用人口統計學特征的方法 1.3.2 基于機器學習的預測模型 2 機器學習相關理論介紹與分析 2.1 機器學習算法理論 2.1.1卷積…

SVMSPro平臺獲取HTTP-FLV規則

SVMSPro平臺獲取HTTP-FLV規則 HTTP-FLV的服務端口為&#xff1a;53372&#xff0c;如需要公網訪問需要開啟這個端口 這里講的是如何獲取長效URL&#xff0c;短效&#xff08;時效性&#xff09;URL也支持&#xff0c;下回講 一、如何獲取HTTP-FLV實時流視頻 http://host:po…

ARM架構的微控制器總線矩陣

在 ARM 架構的微控制器&#xff08;MCU&#xff09;中&#xff0c;總線矩陣&#xff08;Bus Matrix&#xff09; 是總線系統的核心互連結構&#xff0c;負責協調多個主設備&#xff08;如 CPU、DMA、以太網控制器等&#xff09;對多個從設備&#xff08;如 Flash、SRAM、外設等…

AI賦能金融:智能投顧、風控與反欺詐的未來

AI賦能金融&#xff1a;智能投顧、風控與反欺詐的未來 系統化學習人工智能網站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目錄 AI賦能金融&#xff1a;智能投顧、風控與反欺詐的未來摘要引言一、智能投顧&#xff1a;從經驗驅動到人機協同…

【機器學習】樸素貝葉斯

目錄 一、樸素貝葉斯的算法原理 1.1 定義 1.2 貝葉斯定理 1.3 條件獨立性假設 二、樸素貝葉斯算法的幾種常見類型 2.1 高斯樸素貝葉斯 (Gaussian Naive Bayes) 【訓練階段】 - 從數據中學習模型參數 【預測階段】 - 對新樣本 Xnew? 進行分類 2. 2 多項式樸素貝葉斯 (…

鴻蒙 ArkTS 組件 通用事件 通用屬性 速查表

ArkTS 組件 組件 通用事件 速查表 通用事件事件名稱簡要說明點擊事件onClick(event: Callback<ClickEvent>, distanceThreshold: number): T相較于原有 onClick 接口&#xff0c;新增 distanceThreshold 參數作為點擊事件移動閾值&#xff0c;當手指的移動距離超出所設…

Java云原生+quarkus

一、Java如何實現云原生應用&#xff1f; 傳統的 Java 框架&#xff08;如 Spring Boot&#xff09;雖然功能強大&#xff0c;但在云原生場景下可能顯得笨重。以下是一些更適合云原生的輕量級框架&#xff1a; Quarkus(推薦) 專為云原生和 Kubernetes 設計的 Java 框架。支持…

C語言教程(二十三):C 語言強制類型轉換詳解

一、強制類型轉換的概念 強制類型轉換是指在程序中手動將一個數據類型的值轉換為另一種數據類型。在某些情況下,編譯器可能不會自動進行類型轉換,或者自動轉換的結果不符合我們的預期,這時就需要使用強制類型轉換來明確指定要進行的類型轉換。 二、強制類型轉換的語法 強制類…

Spring Boot × K8s 監控實戰-集成 Prometheus 與 Grafana

在微服務架構中&#xff0c;應用的可觀測性至關重要。Kubernetes 已成為容器化部署的標準&#xff0c;但其自身的監控能力有限&#xff0c;需要與其他工具集成才能實現詳細的運行數據采集與分析。 本文將通過 Spring Boot Kubernetes Prometheus Grafana 實戰&#xff0c;打…

phpstudy修改Apache端口號

1. 修改Listen.conf文件 本地phpstudy安裝目錄&#xff1a; 2.其他問題 ① 修改httpd.conf不起作用 ② 直接通過控制面板配置好像有延遲緩存

(done) 吳恩達版提示詞工程 6. 轉換 (翻譯,通用翻譯,語氣風格變換,文本格式轉換,拼寫檢查和語法檢查)

視頻&#xff1a;https://www.bilibili.com/video/BV1Z14y1Z7LJ/?spm_id_from333.337.search-card.all.click&vd_source7a1a0bc74158c6993c7355c5490fc600 別人的筆記&#xff1a;https://zhuanlan.zhihu.com/p/626966526 6. 轉換任務&#xff08;Transforming&#xff0…

什么是靜態住宅ip,跨境電商為什么要用靜態住宅ip

在數字時代&#xff0c;IP地址不僅是設備聯網的“ID”&#xff0c;更是跨境電商運營中的關鍵工具。尤其對于需要長期穩定、安全操作的場景&#xff0c;靜態住宅IP逐漸成為行業首選。 一、什么是靜態住宅IP&#xff1f; 靜態住宅IP&#xff08;Static Residential IP&#xff0…

Qemu-STM32(十七):STM32F103加入AFIO控制器

概述 本文主要描述了在Qemu平臺中&#xff0c;如何添加STM32F103的AFIO控制器模擬代碼&#xff0c;AFIO是屬于GPIO引腳復用配置的功能。 參考資料 STM32F1XX TRM手冊&#xff0c;手冊編號&#xff1a;RM0008 添加步驟 1、在hw/arm/Kconfig文件中添加STM32F1XX_AFIO&#x…

QuecPython+audio:實現音頻的錄制與播放

概述 QuecPython 作為專為物聯網設計的開發框架&#xff0c;通過高度封裝的 Python 接口為嵌入式設備提供了完整的音頻處理能力。本文主要介紹如何利用 QuecPython 快速實現音頻功能的開發。 核心優勢 極簡開發&#xff1a;3行代碼完成基礎音頻錄制與播放。快速上手&#xf…

企業架構之旅(3):TOGAF ADM架構愿景的核心價值

一、引言&#xff1a;為什么架構愿景是企業架構的「導航圖」 在企業數字化轉型的浪潮中&#xff0c;TOGAF ADM&#xff08;架構開發方法&#xff09;作為公認的企業架構「方法論圣經」&#xff0c;其首個關鍵階段 —— 架構愿景&#xff08;Architecture Vision&#xff09;&a…

C++:Lambda表達式

C&#xff1a;Lambda表達式 C中lambda的基本語法1. 捕獲列表&#xff08;Capture List&#xff09;2. 示例代碼示例 1&#xff1a;簡單的lambda示例 2&#xff1a;捕獲變量示例 3&#xff1a;按引用捕獲示例 4&#xff1a;捕獲所有變量示例 5&#xff1a;作為函數參數 3. lambd…

被關在idea小黑屏里寫spark程序

一、先在idea中添加Scala插件 二、使用Maven創建新項目 1.啟動idea,選擇新建項目。之后的設置如下&#xff1a; 2.將Scala添加到全局庫中&#xff08;注意&#xff1a;Scala的版本不宜太高&#xff0c;最好是2-12.否則后面會報下面這個錯誤 E:\tool接口\SparkCore_01\src\mai…

自動化立庫/AGV物流仿真詳細步驟

以下是一種可以在預算和周期內實現自動化立庫及AGV 方案仿真分析的方法&#xff1a; 一、工具選擇 軟件工具FlexSim&#xff1a;這是一款流行的離散事件仿真軟件。它具有直觀的圖形用戶界面&#xff0c;通過簡單的拖拽操作就可以構建自動化立庫和 AGV 的模型。其內置的豐富的…

使用springboot+easyexcel實現導出excel并合并指定單元格

1&#xff1a;準備一個單元格合并策略類代碼&#xff1a; import com.alibaba.excel.metadata.Head; import com.alibaba.excel.metadata.data.WriteCellData; import com.alibaba.excel.write.handler.CellWriteHandler; import com.alibaba.excel.write.metadata.holder.Writ…