本文將帶你從零實現一個基于音頻Token化的角色語音識別系統,完整復現原神角色語音分類任務,包含數據處理、模型訓練和推理全流程。
音頻波形通過滑動窗口轉換為數值Token序列的過程
一、為什么需要音頻Token化?
傳統音頻處理通常依賴MFCC、頻譜圖等特征,但這些方法:
- 丟失原始波形細節
- 難以與現代Transformer架構無縫集成
- 特征工程復雜度高
創新思路:將原始音頻波形直接轉換為離散Token序列,使音頻能像文本一樣被語言模型處理。這種方法:
- 保留完整波形信息
- 兼容預訓練語言模型架構
- 簡化特征工程流程
二、核心數據處理:wav_to_token函數詳解
def wav_to_token(path):# 1. 音頻標準化:零均值單位方差sr, audio = wavfile.read(path)left = (audio - np.mean(audio)) / np.std(audio)# 2. 生成參考信號(關鍵創新點)right = np.linspace(0.1, sr*np.pi, left.size)# 3. 滑動窗口特征提取sim_list = []slope_list = []for i in range(0, len(left) - 400, 400):x, y = left[i:i + 1200], right[i:i + 1200]# 確保窗口長度一致if len(x) > len(y): x = x[:len(y)]# 線性回歸擬合波形趨勢slope, intercept = linear_regression(x, y)# 計算擬合質量(余弦相似度)sim_list.append(cosine_similarity(slope * x + intercept, y))slope_list.append(slope)# 4. Token量化sim_list = min_max_normalize(sim_list) * 2**6 # 64級量化slope_list = min_max_normalize(slope_list) * 2**7 # 128級量化sim_list = sim_list.astype(np.int16)slope_list = slope_list.astype(np.int16)# 5. 生成最終Tokenres = sim_list * slope_list # 特征融合return res[res != 0] # 去除零值
關鍵設計解析:
-
雙特征融合機制:
sim_list
:衡量原始波形與線性趨勢的匹配度(0-63)slope_list
:表征波形局部斜率變化(0-127)- 通過乘法融合創建64×128=8192種獨特Token
-
參考信號設計:
right = np.linspace(0.1, sr*np.pi, left.size)
生成線性增長的參考信號,使回歸斜率能反映波形變化速率,避免絕對數值干擾
-
滑動窗口參數:
- 窗口大小:1200樣本(400步長)
- 采樣率影響:44.1kHz下窗口≈27ms(語音處理常用幀長)
可視化Token生成過程
# 在wav_to_token函數中添加
plt.figure(figsize=(12, 8))
plt.subplot(2,1,1)
plt.plot(left[:2400], 'b', label='原始波形')
plt.plot(right[:2400], 'r--', label='參考信號')
plt.legend()plt.subplot(2,1,2)
plt.plot(sim_list, 'g', label='相似度特征')
plt.plot(slope_list, 'm', label='斜率特征')
plt.legend()
plt.savefig('token_generation.png', dpi=150)
三、詞匯表構建:音頻Token與文本標記的融合
# 特殊標記
voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>"]# 添加角色名稱(從目錄名提取)
voc += [i.split("\\")[-1] for i in dirs] # 如"胡桃", "鐘離"# 添加音頻Token空間(0-8197)
voc += [str(i) for i in range(8198)]# 創建ID映射
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
詞匯表示例:
Token類型 | 示例 | ID | 用途 |
---|---|---|---|
特殊標記 | `< | wav | >` |
角色名稱 | 胡桃 | 12 | 分類目標 |
音頻Token | 1245 | 20 | 波形特征 |
四、數據集構建:序列化音頻樣本
# 樣本結構:[起始標記] + [音頻Token序列] + [角色ID] + [結束標記]
data_set = []
for path in paths:tokens = wav_to_token(path)token_idx = [voc_x2id[str(i)] for i in tokens]role_id = voc_x2id[path.split("\\")[-2]] # 從路徑提取角色名# 完整序列sample = [voc_x2id["<|im_start|>"]] + token_idx + [role_id] + [voc_x2id["<|im_end|>"]]data_set.append(sample)
序列化示例:
[3, 1245, 78, 309, ..., 8192, 12, 4] ↑ ↑ ↑ ↑| | | |
起始 音頻特征 胡桃 結束
未來展望:
- 擴展到語音識別任務(添加字符級Token)
- 結合對比學習提升特征表示
- 部署到移動端實現實時角色識別
技術啟示:當我們將音頻視為“可讀的文本”時,語音處理就變成了自然語言處理——這是通往統一多模態模型的重要一步。
附錄:關鍵函數完整實現
import numpy as np
import pandas as pd
from tqdm import tqdmfrom shua2 import wav_to_token
from glob import glob
from model3 import SamOut
# from system_model import SamOut
import torch
from torch import nn, optim
import timetorch.manual_seed(42)
np.random.seed(42)
dirs=glob("C:/Users/dfy918/Downloads/yuanshen_zip/yuanshen_zip/*")paths = np.hstack([glob(dir+"/wav/*.wav") for dir in dirs])
voc = set([str(i) for i in range(8198)])# voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>", "<|cat|>", "<|cow|>", "<|dog|>", "<|pig|>"]+[i.split("\\")[-1] for i in dirs] + list(voc)
voc = ["<|pad|>","<|im_start|>","<|im_end|>", "<|wav|>"]+[i.split("\\")[-1] for i in dirs] + list(voc)
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
voc_size = len(voc)
# data_set = pd.read_pickle("wav.pkl")
data_set = []
for path in tqdm(paths):tokens = wav_to_token(path)token_idx = []for i in tokens.astype("str").tolist():token_idx.append(voc_x2id[i])data_set.append([1] + token_idx + [voc_x2id[path.split("\\")[-2].split("/")[0]]]+[2])
pd.to_pickle(data_set, "wav.pkl")
np.random.shuffle(data_set)train_data_set = data_set[:int(len(data_set) * 0.8)]
val_data_set = data_set[int(len(data_set) * 0.8):]
num_layers = 2
hidden_size = 2 ** 6 * num_layers
num_heads = num_layers
learning_rate = 0.001
batch_size = 32
num_epochs = 1000model = SamOut(voc_size=voc_size, hidden_size=hidden_size, num_heads=num_heads, num_layers=num_layers)
params = 0
for i in model.parameters():if i.shape != torch.Size([]):params += i.numel()
print(f"Total parameters: {params}")criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)start_time = time.time()
for epoch in range(num_epochs):# 訓練階段np.random.shuffle(train_data_set)model.train()bar=tqdm(range(0, len(train_data_set), batch_size))for i in bar:batch_data = train_data_set[i:i + batch_size]# 填充批次使所有序列長度相同max_len = max(len(seq) for seq in batch_data)padded_batch = []for seq in batch_data:padded_seq = seq + [0] * (max_len - len(seq)) # 使用0(<|pad|>)進行填充padded_batch.append(padded_seq)data = torch.tensor(padded_batch, dtype=torch.long)input_tensor = data[:, :-1]target_tensor = data[:, 1:]output, _ = model(input_tensor)output = output.reshape(-1, voc_size)target_tensor = target_tensor.reshape(-1)loss = criterion(output, target_tensor)optimizer.zero_grad()loss.backward()optimizer.step()bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')# 驗證階段model.eval()val_loss = 0correct = []total = 0with torch.no_grad():for i in range(0, len(val_data_set), batch_size):batch_data = val_data_set[i:i + batch_size]max_len = max(len(seq) for seq in batch_data)padded_batch = []for seq in batch_data:padded_seq = seq + [0] * (max_len - len(seq)) # 使用0(<|pad|>)進行填充padded_batch.append(padded_seq)data = torch.tensor(padded_batch, dtype=torch.long)input_tensor = data[:, :-1]target_tensor = data[:, 1:]output, _ = model(input_tensor)acc=np.mean((torch.argmax(output,-1)==target_tensor).numpy())correct.append(acc)output = output.reshape(-1, voc_size)target_tensor_flat = target_tensor.reshape(-1)# 計算驗證損失val_loss += criterion(output, target_tensor_flat).item()# 計算準確率 - 只關注倒數第二個token(類別標記)# 獲取每個序列的倒數第二個位置avg_val_loss = val_loss / (len(val_data_set) / batch_size)acc = np.mean(correct)print(f'Epoch [{epoch + 1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Acc: {acc:.4f}, Time: {time.time() - start_time:.2f}s')print("Training complete.")
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile# 中文顯示支持
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = Falsedef cosine_similarity(a, b):"""計算余弦相似度"""return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))def linear_regression(x, y):"""最小二乘法線性回歸"""n = len(x)sum_x, sum_y = np.sum(x), np.sum(y)sum_xy = np.sum(x * y)sum_x2 = np.sum(x ** 2)slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)intercept = (sum_y - slope * sum_x) / nreturn slope, interceptdef min_max_normalize(data):min_val = np.min(data)max_val = np.max(data)return (data - min_val) / (max_val - min_val + 10 ** -8)def wav_to_token(path):# 數據讀取sr, audio = wavfile.read(path)left = (audio - np.mean(audio)) / np.std(audio)right = np.linspace(0.1,sr*np.pi,left.size)# 滑動窗口分析sim_list = []slope_list = []for i in range(0, len(left) - 400, 400):x, y = left[i:i + 1200], right[i:i + 1200]if len(x) > len(y): x = x[:len(y)]slope, intercept = linear_regression(x, y)sim_list.append(cosine_similarity(slope * x + intercept, y))slope_list.append(slope)# 可視化sim_list = np.array(sim_list)slope_list = np.array(slope_list)# token 化sim_list = min_max_normalize(sim_list) * 2 ** 6slope_list = min_max_normalize(slope_list) * 2 ** 7sim_list = sim_list.astype(np.int16)slope_list = slope_list.astype(np.int16)res = sim_list * slope_listreturn res[res != 0]# 歸一化 的最大值最小值應該是 全局考慮而不是 一段考慮 最好是精度極限