目錄
config參數配置
setup_dirs創建訓練文件夾
?load_data加載數據
build_model創建模型
train訓練
記錄一下訓練代碼中不理解的地方
config參數配置
config = {'data_root': r"D:\project\megnetometer\datasets\WISDM_ar_latest\organized_dataset",'train_dir': 'train','test_dir': 'test','seq_length': 300, # 序列長度'batch_size': 32, # 可能需減小batch_size'epochs': 60,'initial_lr': 3e-4, # 初始學習率'max_lr': 5e-4,'patience': 20}
配置好需要用到的參數,比如數據集地址,訓練輪數,批次大小,學習率等
setup_dirs創建訓練文件夾
def setup_dirs(self):self.run_dir = os.path.join(self.config['data_root'], 'run') os.makedirs(self.run_dir, exist_ok=True)print('創建運行目錄run_dir = ', self.run_dir)# 創建帶時間戳的實驗目錄timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")print('時間戳 = ', timestamp)self.exp_dir = os.path.join(self.run_dir, f"exp_{timestamp}")os.makedirs(self.exp_dir, exist_ok=True)# 保存當前配置with open(os.path.join(self.exp_dir, 'config.json'), 'w') as f:json.dump(self.config, f, indent=2) # 兩個字符縮進,沒有則壓縮成一行,把config內容存在config.json里
os.path.join(self.config['data_root'], 'run')
用于拼接文件路徑data_root的路徑加上run,中間的連接符會根據系統自動調整
os.makedirs(self.exp_dir, exist_ok=True)
創建文件,exist_ok=True當文件夾存在的時候不報錯
創建的文件夾用于存放后續訓練生成的模型以及保存訓練參數等文件
?load_data加載數據
def load_data(self):"""從按行為分類的目錄加載數據(帶多級進度條)"""def load_activity_data(subset_dir):"""加載train或test子目錄下的數據"""data = []subset_path = os.path.join(self.config['data_root'], subset_dir) #在數據集路徑內讀取,由subset_dir決定讀取的是訓練集還是測試集# 獲取所有活動類別目錄activities = [d for d in os.listdir(subset_path)if os.path.isdir(os.path.join(subset_path, d))]#print('activities=',activities)#activities= ['Downstairs', 'Jogging', 'Sitting', 'Standing', 'Upstairs', 'Walking']# 第一層進度條:活動類別pbar_activities = tqdm(activities, desc=f"掃描{subset_dir}目錄", position=0)for activity in pbar_activities:activity_lower = activity.lower()if activity_lower not in self.label_map:continueactivity_dir = os.path.join(subset_path, activity)#當前活動的目錄# 獲取所有用戶文件user_files = [f for f in os.listdir(activity_dir)if f.endswith('.txt')]#獲取所有txt結尾的文件# 第二層進度條:用戶文件#pbar_users = tqdm(user_files, desc="讀取用戶文件", leave=False, position=1)#后面要close,但是已經把所有的進度注釋掉了只留下來一個總的第一層進度#print('pbar_users=',pbar_users)for user_file in user_files:file_path = os.path.join(activity_dir, user_file)# 獲取文件行數用于進度條with open(file_path, 'r') as f:num_lines = sum(1 for _ in f)# 第三層進度條:讀取文件內容with open(file_path, 'r') as f:for line in f:line = line.strip()if not line:continuetry:x, y, z = map(float, line.split(','))data.append({'x': x,'y': y,'z': z,'activity': activity_lower})except ValueError:continuepbar_activities.close()return data# 調用示例print("\n" + "=" * 50)print("開始加載數據集...")train_data = load_activity_data(self.config['train_dir'])#print(train_data)#{'x': 5.33, 'y': 8.73, 'z': -0.42, 'activity': 'walking'},test_data = load_activity_data(self.config['test_dir'])
pbar_activities = tqdm(activities, desc=f"掃描{subset_dir}目錄", position=0)
tqdm創建進度條,desc是進度條前面的描述,position用于多級進度條之間的嵌套,以免位置混亂,在運行完之后要關閉進度條
pbar_activities.close()with open('data.txt', 'r') as f:打開文件夾,r為只讀模式
# 轉換為模型輸入格式(帶優化進度條)def create_sequences(data, desc="生成序列"):seq_length = self.config['seq_length']features, labels = [], []total_windows = len(data) - seq_lengthpbar = tqdm(range(total_windows),desc=desc,position=0,bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [速度:{rate_fmt}]")for i in pbar:window = data[i:i + seq_length]# 檢查窗口內活動是否一致if len(set(d['activity'] for d in window)) != 1:continuefeatures.append([[d['x'], d['y'], d['z']] for d in window])labels.append(self.label_map[window[0]['activity']])# 每1000次更新一次進度信息if i % 1000 == 0:pbar.set_postfix({"有效窗口": len(features),"跳過窗口": i - len(features) + 1}, refresh=True)return np.array(features), np.array(labels)print("\n正在預處理訓練集...")X_train, y_train = create_sequences(train_data, "訓練集序列化")#返回的x是數據,y是標簽print("\n正在預處理測試集...")X_test, y_test = create_sequences(test_data, "測試集序列化")# 標準化(顯示進度)print("\n正在計算標準化參數...")self.mean = np.mean(X_train, axis=(0, 1))self.std = np.std(X_train, axis=(0, 1))print("應用標準化...")X_train = (X_train - self.mean) / (self.std + 1e-8)X_test = (X_test - self.mean) / (self.std + 1e-8)# One-hot編碼# 將 NumPy 數組轉為 PyTorch 張量,并指定類型為 int64(等價于 .long())y_train = torch.from_numpy(y_train).long() # 或 .to(torch.int64)y_train = torch.nn.functional.one_hot(y_train.long(), num_classes=len(self.label_map))y_test = torch.from_numpy(y_test).long() # 或 .to(torch.int64)y_test = torch.nn.functional.one_hot(y_test.long(), num_classes=len(self.label_map))print("\n" + "=" * 50)print("數據預處理完成!")print(f"訓練集形狀: X_train{X_train.shape}, y_train{y_train.shape}")print(f"測試集形狀: X_test{X_test.shape}, y_test{y_test.shape}")print("=" * 50 + "\n")return (X_train, y_train), (X_test, y_test)
滑動窗口開銷大,改用向量化滑動窗口(NumPy)
參數標準化全部使用訓練集數據
1e-8的作用:防止除零的小常數,特別適用于某些標準差接近0的特征
axis=(0,1):假設您的數據是3D張量(樣本×時間步/空間×特征),這樣計算每個特征通道的統計量
消除量綱影響:當特征的單位/量綱不同時(如年齡0-100 vs 工資0-100000),標準化使所有特征具有可比性
只使用訓練集統計量:測試集必須使用訓練集的mean/std,這是為了避免數據泄露(data leakage)
數據泄露:是機器學習中一個常見但嚴重的問題,指在模型訓練過程中意外地使用了測試集或未來數據的信息,導致模型評估結果被高估,無法反映真實性能。這種現象會使模型在實際應用中表現遠差于預期。
將分類標簽(整數形式)轉換為 One-hot 編碼,這是機器學習中處理分類任務的常見方法。
build_model創建模型
def build_model(self):"""構建改進的BiLSTM分類模型"""model = tf.keras.Sequential([tf.keras.layers.InputLayer(input_shape=(self.config['seq_length'], 3)),# 雙向LSTM層tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),tf.keras.layers.BatchNormalization(),tf.keras.layers.Dropout(0.2),tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),tf.keras.layers.BatchNormalization(),# 全連接層tf.keras.layers.Dense(32, activation='relu'),tf.keras.layers.Dropout(0.3),tf.keras.layers.Dense(len(self.label_map), activation='softmax')])model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),loss='categorical_crossentropy',metrics=['accuracy'])return model
兩個模型框架TensorFlow更早,但PyTorch的初始設計更現代,以上是TensorFlow的模型。
計算圖(Computational Graph) 是描述數學運算和數據處理流程的抽象結構,而 靜態圖 和 動態圖 是兩種不同的計算圖構建和執行方式。
計算圖 是一個有向無環圖(DAG),用于表示計算過程:
節點(Node):代表運算(如加法、矩陣乘法)或數據(如張量、變量)。
邊(Edge):描述數據流動方向(如張量從一層傳遞到下一層)。
改用PyTorch模型需要注意
PyTorch更推薦類式構建,而且保存時僅保存模型的參數(權重和偏置),不包含模型結構。如果需要測試,加載時必須先實例化一個結構完全相同的模型,再加載參數。
先創建一個模型類,再去調用?
class BiLSTMModel(nn.Module):def __init__(self, input_size, hidden_size, num_layers, num_classes, bidirectional=True):super(BiLSTMModel, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layersself.num_directions = 2 if bidirectional else 1# 雙向LSTMself.lstm = nn.LSTM(input_size=input_size,hidden_size=hidden_size,num_layers=num_layers,batch_first=True,bidirectional=bidirectional)# 全連接層(雙向時hidden_size需*2)self.fc = nn.Linear(hidden_size * self.num_directions, num_classes)def forward(self, x):# 初始化隱藏狀態(可選,PyTorch默認全零)h0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(x.device)c0 = torch.zeros(self.num_layers * self.num_directions, x.size(0), self.hidden_size).to(x.device)# LSTM前向傳播out, _ = self.lstm(x, (h0, c0)) # out形狀: (batch, seq_len, hidden_size * num_directions)# 取最后一個時間步的輸出out = out[:, -1, :] # 形狀: (batch, hidden_size * num_directions)# 分類層out = self.fc(out)return out
此處構建的就是雙向LSTM模型,然后再構建函數調用
def build_model(self):# 使用示例model = LSTMModel(input_size=3, # 對應x/y/z特征hidden_size=32,num_layers=2,num_classes=6, # 類別數bidirectional=True)return model
train訓練
def train(self):"""PyTorch版本訓練流程"""# 1. 數據加載與預處理(X_train, y_train), (X_test, y_test) = self.load_data()# 轉換為PyTorch張量并移至設備device = torch.device("cuda" if torch.cuda.is_available() else "cpu")X_train = torch.FloatTensor(X_train).to(device)y_train = torch.LongTensor(y_train.argmax(axis=1)).to(device) # 如果y是one-hotX_test = torch.FloatTensor(X_test).to(device)y_test = torch.LongTensor(y_test.argmax(axis=1)).to(device)# 創建DataLoadertrain_dataset = TensorDataset(X_train, y_train)# 類似zip(features, labels)train_loader = DataLoader(train_dataset,batch_size=self.config['batch_size'],shuffle=True)# 2. 模型初始化self.model = self.build_model().to(device)criterion = nn.CrossEntropyLoss()optimizer = optim.Adam(self.model.parameters(),lr=self.config.get('lr', 0.001))# 3. 回調函數設置"""# 早停early_stopping = EarlyStopping(patience=self.config['patience'],verbose=True,path=os.path.join(self.exp_dir, 'best_model.pth'))"""# 學習率調度scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,mode='min',factor=0.1,patience=5,verbose=True)# TensorBoard日志writer = SummaryWriter(log_dir=os.path.join(self.exp_dir, 'logs'))print("\n開始訓練...")print(f"實驗目錄: {self.exp_dir}")print(f"使用設備: {device}")# 4. 訓練循環for epoch in range(self.config['epochs']):self.model.train()train_loss = 0.0# 訓練批次for inputs, labels in train_loader:optimizer.zero_grad()outputs = self.model(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()train_loss += loss.item()# 驗證階段self.model.eval()with torch.no_grad():test_outputs = self.model(X_test)test_loss = criterion(test_outputs, y_test)_, predicted = torch.max(test_outputs, 1)accuracy = (predicted == y_test).float().mean()# 記錄日志writer.add_scalar('Loss/train', train_loss / len(train_loader), epoch)writer.add_scalar('Loss/test', test_loss.item(), epoch)writer.add_scalar('Accuracy/test', accuracy.item(), epoch)# 打印進度print(f"Epoch {epoch + 1}/{self.config['epochs']} | "f"Train Loss: {train_loss / len(train_loader):.4f} | "f"Test Loss: {test_loss.item():.4f} | "f"Accuracy: {accuracy.item():.4f}")# 學習率調整scheduler.step(test_loss)"""# 早停檢查early_stopping(test_loss, self.model)if early_stopping.early_stop:print("Early stopping triggered")break"""# 5. 保存最終結果writer.close()self.save_results(X_test, y_test) # 需要適配PyTorch的保存方法
TensorDataset
和DataLoader
都是 PyTorch 官方庫中的核心組件,專門用于高效的數據加載和批處理。torch.utils.data.TensorDataset將多個張量(如特征張量和標簽張量)打包成一個數據集對象
dataset = TensorDataset(features, labels) # 類似zip(features, labels)
torch.utils.data.DataLoader將數據集按批次加載,支持自動批處理、打亂數據、多進程加載等
shuffle=True
代表打亂數據,此處是時序信號,但是由于從長序列中通過滑動窗口提取樣本每個窗口本身就是一個獨立樣本,此時打亂窗口順序是安全的損失函數
criterion = nn.CrossEntropyLoss()