文本語音 rtsp適時播放叫號系統的底層邏輯
發布Linux, unix socket 和window win32做為音頻源的 python10下的(ffmpeg version 7.1) 可運行版本.
這兩天在弄這個,前2篇是通過虛擬聲卡,達到了最簡單的一個邏輯,播放文本就從聲卡發聲,不播無所謂,自動忙音。 那個工作在windows平臺,
而今天的這個相似功能的代碼是mac os,理論支持windows,和linux,依賴ffmpeg和xiu(一個rust流服務器)的rtsp服務。
今天的難點有點多
- asyncio的任務 async def _tts_worker(self, text: str) 運行中有各種錯誤, engine runAndWait是不行的。 內部有它的event loop。所以init和endLoop,是暫時找到的解決辦法。同時經歷了,這個,和調用 ffmpeg 外部指令,并直接獲取- 代表的stdout。 會遇到各種問題。做了捕獲和處理。但是查找的時候,不是太容易。
- self._start_ffmpeg() 他需要, create socket 或pipe完成以后,才能運行。 調試我都手工在外部啟動。 作用就是,輸出到rtsp服務器,以備播放。
- input handle,等都是ai生成的,因為有好多種循環,這是比較省心在。
- 最緊急隱蔽在是, async def _heartbeat(self) 他需要計算播放靜音的時間,長了不行,短了不行。 這個最初在測試代碼,就幾個函數。然后AI,生成了三個theading的版本,兩個Queue。 然后轉到了異步版本,明顯快了很多。
- 在windows上使用win32pipen可以達到unix socket的效果很相似, 記得還有FIFO是linux專用的,當然還有stdin,和stdout。對于ffmpeg,這是一些程序內部的傳送機制
- rtsp是需要一個后臺的服務的,xiu是開源的rust項目,可以使。另外window推薦metamtx,雙擊運行,什么也不管。
音畫同步應該是另個問題了,幾天前,鼓搗了一下圖片。讓編輯后的,馬上 在視頻中顯示。 這個另外一個話題了。做的這些就為了,讓報號和點單,有個界面。
ffmpeg -re -framerate 30 -f image2 -loop 1 -i "image1.jpg" -c:v libx264 -preset ultrafast -tune zerolatency -pix_fmt rgba -f rtsp -rtsp_transport tcp rtsp://localhost:8554/live
合并的代碼,就當成剩下的作業,有空再來做。
對于剛接觸的,最好是慢慢和AI調試著來,一些功能就做出來。
語音推送使用ffmpeg獨立進程,實現了前后中斷后自動重啟。
程序主體
可獨立運行,也可以結合ffmg管理推送進程
- macos ,理論Linux適用,單文件可執行
main.py
import asyncio
import struct
import pyttsx3
import tempfile
import os
import socket
from aioconsole import ainput
from contextlib import suppress
from typing import Optionalclass AsyncTTSController:def __init__(self):# 使用Unix域套接字self.socket_path = "/tmp/tts_audio.sock"self.server_socket: Optional[socket.socket] = Noneself.client_socket: Optional[socket.socket] = None# 進程控制self.ffmpeg_process: Optional[asyncio.subprocess.Process] = Noneself.running = False# TTS引擎self.engine = pyttsx3.init()self.engine.setProperty('rate', 180)self.engine.setProperty('volume', 1.0)# 音頻參數self.sample_rate = 24000self.channels = 1self.bits_per_sample = 16self.silence = self._generate_silence(0.2)self.wav_header = self._generate_wav_header()# 狀態管理self.connection_active = Falseself.last_heartbeat = 0.0self.heartbeat_interval = 2.0self.sending_audio = 0def _generate_wav_header(self) -> bytes:"""生成WAV文件頭"""byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8block_align = self.channels * self.bits_per_sample // 8return struct.pack('<4sI4s4sIHHIIHH4sI',b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,self.sample_rate, byte_rate, block_align, self.bits_per_sample,b'data', 0)def _generate_silence(self, duration: float) -> bytes:"""生成靜音數據"""samples = int(self.sample_rate * duration)return bytes(samples * self.channels * (self.bits_per_sample // 8))async def _async_create_socket(self) -> None:"""創建Unix域套接字"""with suppress(Exception):if os.path.exists(self.socket_path):os.unlink(self.socket_path)self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)self.server_socket.setblocking(False)self.server_socket.bind(self.socket_path)self.server_socket.listen(1)loop = asyncio.get_running_loop()while self.running and not self.connection_active:try:self.client_socket, _ = await loop.sock_accept(self.server_socket)self.connection_active = Trueprint("客戶端已連接")await loop.sock_sendall(self.client_socket, self.wav_header)except (BlockingIOError, InterruptedError):await asyncio.sleep(0.1)except Exception as e:print(f"連接錯誤: {str(e)}")self.connection_active = Falseawait asyncio.sleep(1)async def _start_ffmpeg(self) -> None:"""啟動FFmpeg進程"""with suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()socketid='unix:'+self.socket_pathself.ffmpeg_process = await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', socketid, # 修改輸入源為套接字路徑'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)asyncio.create_task(self._monitor_ffmpeg_errors())async def _monitor_ffmpeg_errors(self) -> None:"""監控FFmpeg錯誤輸出"""while self.running and self.ffmpeg_process:line = await self.ffmpeg_process.stderr.readline()if not line:break# print(f"[FFmpeg Error] {line.decode().strip()}")async def _async_write_socket(self, data: bytes) -> None:"""安全寫入套接字"""try:if self.client_socket and self.connection_active:loop = asyncio.get_running_loop()await loop.sock_sendall(self.client_socket, data)except (BrokenPipeError, ConnectionResetError):print("連接已斷開,嘗試重連...")await self._reconnect_pipeline()except Exception as e:print(f"寫入錯誤: {str(e)}")self.connection_active = Falseasync def _reconnect_pipeline(self) -> None:"""完整重連流程"""print("啟動重連流程...")self.connection_active = Falseif self.client_socket:self.client_socket.close()task1=asyncio.create_task(self._async_create_socket()),task2=asyncio.create_task( self._start_ffmpeg()), await task2await task1# await asyncio.gather(task1, task2)#await self._async_create_socket()#await self._start_ffmpeg()# 剩余的heartbeat、tts_worker、input_handler等方法保持相同...async def stop(self) -> None:"""安全關閉"""self.running = Falsewith suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()if self.client_socket:self.client_socket.close()if self.server_socket:self.server_socket.close()if os.path.exists(self.socket_path):os.unlink(self.socket_path)print("所有資源已釋放")async def _heartbeat(self) -> None:"""心跳維持機制"""while self.running:if self.connection_active :for i in range(10):if self.sending_audio<0:await self._async_write_socket(self.silence)else :self.sending_audio-= 2await asyncio.sleep(0.2) # print(self.sending_audio,"slend")# await asyncio.sleep(self.heartbeat_interval)else:await asyncio.sleep(0.5)def _sync_tts(self,text,tmp_filename):eng=pyttsx3.init()# eng.say(text)eng.save_to_file(text, 'temp3.wav')eng.runAndWait()eng.endLoop()async def _tts_worker(self, text: str) -> None:"""異步TTS處理核心"""tmp_filename = None#with open('audio1.raw','rb') as chunkf:# data=chunkf.read()# secdd=len(data)/48000# self.sending_audio=int(secdd*10) # await self._async_write_socket(data)# #await asyncio.sleep(secdd)# print (secdd,len(data) ) # 創建臨時文件with tempfile.NamedTemporaryFile(delete=False) as tmp:tmp_filename = tmp.name# # 同步TTS操作轉異步執行loop = asyncio.get_running_loop()await loop.run_in_executor(None, self._sync_tts, *(text, 'temp3.wav',))# 轉換音頻格式# await asyncio.sleep(1.3)# self._sync_tts(text,tmp_filename)try: proc = await asyncio.create_subprocess_exec('ffmpeg','-hide_banner','-loglevel', 'error','-y','-i', 'temp3.wav', # 輸入文件路徑'-f', 's16le', # 強制輸出格式為PCM s16le'-acodec', 'pcm_s16le', # 明確指定音頻編解碼器 👈 關鍵修復'-ar', str(self.sample_rate),'-ac', str(self.channels),'-', # 輸出到標準輸出stdout=asyncio.subprocess.PIPE
)# 流式發送音頻數據sum=0while chunk := await proc.stdout.read(4096):sum+=len(chunk)await self._async_write_socket(chunk)self.sending_audio=int(sum*10/48000) print("write data x0.1s:",self.sending_audio)finally:if tmp_filename and os.path.exists(tmp_filename):1# os.unlink(tmp_filename)async def _input_handler(self) -> None:"""異步輸入處理"""while self.running:try:text = await ainput("請輸入文本(輸入q退出): ")if text.lower() == 'q':self.running = Falsebreakif text.strip():await self._tts_worker(text)except Exception as e:print(f"輸入錯誤: {str(e)}")async def run(self) -> None:"""主運行循環"""self.running = True# #await self._start_ffmpeg()tasks = [asyncio.create_task(self._async_create_socket()),asyncio.create_task( self._start_ffmpeg()),asyncio.create_task(self._input_handler()),asyncio.create_task(self._heartbeat()),]await asyncio.gather(*tasks)# 以下保持不變...
if __name__ == "__main__":controller = AsyncTTSController()try:asyncio.run(controller.run())except KeyboardInterrupt:asyncio.run(controller.stop())
"""
ffmpeg -y -i temp.wav -f s16le -acodec pcm_s16le -ar 24000 -ac 1 audio.raw
ffmpeg -ar 24000 -ac 1 -f s16le -i unix:/tmp/tts_audio.sock -f rtsp rtsp://localhost:8554/mystream
"""
- window10系統python10 可運行版本
主要讓deepseek,執行了,socket 到 win32pipen的替換.因為本來就是換過去的.這一塊的代碼完全沒有手工介入. 唯一改的是注釋eng.endLoop(),并不用每次init() ,應改是pyttsx3的一個跨平臺特性. ,異步的win32下支持穩定.
def _sync_tts(self, text, tmp_filename):eng = self.engine #pyttsx3.init()eng.save_to_file(text, 'temp3.wav')eng.runAndWait()# eng.endLoop()
main-win.py
import asyncio
import struct
import pyttsx3
import tempfile
import os
from aioconsole import ainput
from contextlib import suppress
from typing import Optional
import win32pipe
import win32file
import pywintypesclass AsyncTTSController:def __init__(self):# 使用Windows命名管道self.pipe_name = r'\\.\pipe\tts_audio_pipe'self.pipe_handle = None# 進程控制self.ffmpeg_process: Optional[asyncio.subprocess.Process] = Noneself.running = False# TTS引擎self.engine = pyttsx3.init()self.engine.setProperty('rate', 180)self.engine.setProperty('volume', 1.0)# 音頻參數self.sample_rate = 24000self.channels = 1self.bits_per_sample = 16self.silence = self._generate_silence(0.2)self.wav_header = self._generate_wav_header()# 狀態管理self.connection_active = Falseself.last_heartbeat = 0.0self.heartbeat_interval = 2.0self.sending_audio = 0def _generate_wav_header(self) -> bytes:"""生成WAV文件頭"""byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8block_align = self.channels * self.bits_per_sample // 8return struct.pack('<4sI4s4sIHHIIHH4sI',b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,self.sample_rate, byte_rate, block_align, self.bits_per_sample,b'data', 0)def _generate_silence(self, duration: float) -> bytes:"""生成靜音數據"""samples = int(self.sample_rate * duration)return bytes(samples * self.channels * (self.bits_per_sample // 8))async def _async_create_pipe(self) -> None:"""創建命名管道"""while self.running and not self.connection_active:try:# 創建命名管道self.pipe_handle = win32pipe.CreateNamedPipe(self.pipe_name,win32pipe.PIPE_ACCESS_DUPLEX,win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,1, # 最大實例數65536, 65536, # 輸入輸出緩沖區大小0, # 默認超時None # 安全屬性)# 異步等待連接loop = asyncio.get_running_loop()await loop.run_in_executor(None, win32pipe.ConnectNamedPipe, self.pipe_handle, None)self.connection_active = Trueprint("客戶端已連接")await self._async_write_socket(self.wav_header)except pywintypes.error as e:if e.winerror == 536: # ERROR_PIPE_CONNECTEDself.connection_active = Trueprint("客戶端已連接")elif e.winerror == 232: # 客戶端斷開print("客戶端斷開連接")self.connection_active = Falseif self.pipe_handle:win32file.CloseHandle(self.pipe_handle)self.pipe_handle = Noneawait asyncio.sleep(1)else:print(f"管道錯誤: {e}")await asyncio.sleep(1)except Exception as e:print(f"其他錯誤: {e}")await asyncio.sleep(1)async def _start_ffmpeg(self) -> None:"""啟動FFmpeg進程"""with suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()self.ffmpeg_process = await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', self.pipe_name,'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)asyncio.create_task(self._monitor_ffmpeg_errors())async def _monitor_ffmpeg_errors(self) -> None:"""監控FFmpeg錯誤輸出"""while self.running and self.ffmpeg_process:line = await self.ffmpeg_process.stderr.readline()if not line:break# print(f"[FFmpeg Error] {line.decode().strip()}")async def _async_write_socket(self, data: bytes) -> None:"""安全寫入管道"""try:if self.connection_active and self.pipe_handle:loop = asyncio.get_running_loop()await loop.run_in_executor(None, win32file.WriteFile, self.pipe_handle, data)except pywintypes.error as e:print(f"寫入錯誤: {e}")self.connection_active = Falseawait self._reconnect_pipeline()except Exception as e:print(f"其他寫入錯誤: {e}")self.connection_active = Falseasync def _reconnect_pipeline(self) -> None:"""完整重連流程"""print("啟動重連流程...")self.connection_active = Falseif self.pipe_handle:win32file.CloseHandle(self.pipe_handle)self.pipe_handle = Noneawait asyncio.gather(self._async_create_pipe(),self._start_ffmpeg())async def _heartbeat(self) -> None:"""心跳維持機制"""while self.running:if self.connection_active:for i in range(10):if self.sending_audio < 0:await self._async_write_socket(self.silence)else:self.sending_audio -= 2await asyncio.sleep(0.2)else:await asyncio.sleep(0.5)def _sync_tts(self, text, tmp_filename):eng = pyttsx3.init()eng.save_to_file(text, 'temp3.wav')eng.runAndWait()# eng.endLoop()async def _tts_worker(self, text: str) -> None:"""異步TTS處理核心"""await asyncio.get_event_loop().run_in_executor(None, self._sync_tts, text, 'temp3.wav')try:proc = await asyncio.create_subprocess_exec('ffmpeg','-hide_banner','-loglevel', 'error','-y','-i', 'temp3.wav','-f', 's16le','-acodec', 'pcm_s16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-',stdout=asyncio.subprocess.PIPE)sum_bytes = 0while chunk := await proc.stdout.read(4096):sum_bytes += len(chunk)await self._async_write_socket(chunk)self.sending_audio = int(sum_bytes * 10 / 48000)print(f"寫入數據 x0.1s: {self.sending_audio}")finally:if os.path.exists('temp3.wav'):os.remove('temp3.wav')async def _input_handler(self) -> None:"""異步輸入處理"""while self.running:try:text = await ainput("請輸入文本(輸入q退出): ")if text.lower() == 'q':self.running = Falsebreakif text.strip():await self._tts_worker(text)except Exception as e:print(f"輸入錯誤: {str(e)}")async def run(self) -> None:"""主運行循環"""self.running = Truetasks = [asyncio.create_task(self._async_create_pipe()),asyncio.create_task(self._start_ffmpeg()),asyncio.create_task(self._input_handler()),asyncio.create_task(self._heartbeat()),]await asyncio.gather(*tasks)async def stop(self) -> None:"""安全關閉"""self.running = Falsewith suppress(Exception):if self.ffmpeg_process:self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()if self.pipe_handle:win32pipe.DisconnectNamedPipe(self.pipe_handle)win32file.CloseHandle(self.pipe_handle)print("所有資源已釋放")if __name__ == "__main__":controller = AsyncTTSController()try:asyncio.run(controller.run())except KeyboardInterrupt:asyncio.run(controller.stop())
獨立的ffmpeg啟動和監控的獨立代碼
驗證了一下rtsp斷線重建連結,也驗證了 上面 的main.py的socket server退出后,ffmpeg自動重啟連接。 要使用這個更穩健球的程序,需要
注釋main.py run中的asyncio.create_task( self._start_ffmpeg()),
代碼不用修改,把管道名這個快, 徹底修改為,ffmpeg認識的window樣式,就可運行.
r’\.\pipe\tts_audio_pipe’
ffmg,py
import asyncio
from contextlib import suppress
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class FFmpegManager:def __init__(self):self.ffmpeg_process = Noneself._retry_count = 0self._max_retries = 5self._retry_lock = asyncio.Lock()self._is_running = Falseself.sample_rate=24000self.channels=1self.socket_path = "/tmp/tts_audio.sock"async def _start_ffmpeg(self) -> None:"""帶自動重試的FFmpeg啟動函數"""async with self._retry_lock:await self._safe_terminate()try:socketid = 'unix:' + self.socket_pathself.ffmpeg_process =await asyncio.create_subprocess_exec('ffmpeg','-f', 's16le','-ar', str(self.sample_rate),'-ac', str(self.channels),'-i', socketid, # 修改輸入源為套接字路徑'-c:a', 'aac','-f', 'rtsp','-rtsp_transport', 'tcp','rtsp://localhost:8554/mystream',stdout=asyncio.subprocess.DEVNULL,stdin=asyncio.subprocess.DEVNULL,stderr=asyncio.subprocess.PIPE)self._retry_count = 0 # 重置重試計數器asyncio.create_task(self._monitor_ffmpeg_errors())self._is_running = Trueexcept Exception as e:logging.error(f"FFmpeg啟動失敗: {str(e)}")await self._handle_retry()async def _monitor_ffmpeg_errors(self):"""增強型進程監控"""while self._is_running:logging.info("loop error cathch")stderr = await self.ffmpeg_process.stderr.readline()if stderr:logging.error(f"FFmpeg錯誤輸出: {stderr.decode().strip()}")# 檢測進程狀態return_code = self.ffmpeg_process.returncodeif return_code is not None:logging.warning(f"FFmpeg異常退出,返回碼: {return_code}")self._is_running = Falseawait self._handle_retry()breakasync def _handle_retry(self):"""智能重試策略"""if self._retry_count >= self._max_retries:logging.critical("達到最大重試次數,放棄重啟")return# 指數退避算法delay = min(2 ** self._retry_count, 30) # 最大間隔30秒self._retry_count += 1logging.info(f"將在 {delay} 秒后嘗試第 {self._retry_count} 次重啟")await asyncio.sleep(delay)await self._start_ffmpeg()async def _safe_terminate(self):"""安全終止現有進程"""if self.ffmpeg_process:with suppress(Exception):self.ffmpeg_process.terminate()await self.ffmpeg_process.wait()self.ffmpeg_process = None
# 以下保持不變...
async def main():controller=FFmpegManager()try:await controller._start_ffmpeg()logging.info('rung')await asyncio.sleep(1160)except KeyboardInterrupt:logging.info(3)asyncio.run(controller._safe_terminate())
if __name__ == "__main__":asyncio.run(main())