目錄
前置:
代碼:
視頻:
前置:
1 本系列將以 “PyQt6實例_批量下載pdf工具”開頭,放在 【PyQt6實例】 專欄
2 本系列涉及到的PyQt6知識點:
線程池:QThreadPool,QRunnable;
信號與槽:pyqtSignal,pyqtSlot;
界面:QTextEdit,QLabel,QLineText,QPushButton,QMainWindow,QWidget;
布局:QHBoxLayout,QVBoxLayout;
彈框:QFileDialog,QMessageBox。
3 本系列后續會在B站錄制視頻,到時會在文末貼出鏈接。本人還是建議先看博文,不懂的再看視頻,這樣效率高,節約時間。
代碼:
def execute_btn_clicked(self):txt_dir = self.txtdir_lineedit.text()if txt_dir is None or len(txt_dir.strip())<=0:self.information_dialog('請先選擇txt所在目錄')returntxt_list = os.listdir(txt_dir)if len(txt_list)<=0:self.information_dialog('txt所在目錄為空')returnpdf_dir = self.savedir_lineedit.text()if pdf_dir is None or len(pdf_dir.strip())<=0:self.information_dialog('請設置pdf存儲目錄')returnanswer = QMessageBox.question(self,'確認啟動?',f'如果確定啟動,程序將把任務分成 {self.max_thread_count} 個線程執行。執行過程將占用設備資源。',QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)if answer == QMessageBox.StandardButton.Yes:self.this_time_start_yeah = Trueself.thread_finished_count = 0self.txtdir_lineedit.setDisabled(True)self.savedir_lineedit.setDisabled(True)self.choicedir_btn.setDisabled(True)self.savedir_btn.setDisabled(True)self.execute_btn.setDisabled(True)if self.last_time_executed_tickers is not None:for one in self.last_time_executed_tickers:txt_list.remove(f'{one}.txt')pass# 分發任務interval = len(txt_list)//self.max_thread_countif interval == 0:self.max_thread_count = 1self.insert_executelog('需要執行的內容很少,只開啟一個線程')passfor i in range(0,self.max_thread_count):if i == self.max_thread_count-1:node_txt_list = txt_list[i*interval:]else:node_txt_list = txt_list[i*interval:(i+1)*interval]task_data = {'txt_dir':txt_dir,'pdf_dir':pdf_dir,'txt_list':node_txt_list,'temp_dict':self.last_time_data}worker = Worker(i,task_data)worker.signals.result.connect(self.thread_result_fn)worker.signals.finished.connect(self.thread_finished_fn)worker.signals.error.connect(self.thread_error_fn)self.runner_list.append(worker)self.insert_otherlog(f'線程 {i} 啟動。')self.threadpool.start(worker)passelse:returnpassdef thread_result_fn(self,res:tuple):# (thread_num,stoped,ticker,executed_url_list,excuted_ticker_list)# (thread_num,success,None,None,excuted_ticker_list)thread_num = res[0]status = res[1]if status == 'stoped':self.insert_otherlog(f'線程 {thread_num} 停止.')if res[2] is not None:self.pre_last_time_data[res[2]] = res[3]self.pre_last_time_executed_tickers.extend(res[4])passelse:self.insert_otherlog(f'線程 {thread_num} 正常結束。')passdef thread_finished_fn(self,res:int):self.thread_finished_count += 1res_str = f'線程 {res} 結束.'self.insert_otherlog(res_str)if self.thread_finished_count == self.max_thread_count:temp_str = '上次執行正常結束'if self.pre_last_time_data:temp_str = '上次被強制停止'with open(os.path.join(basedir,'data','temp.json'),'w',encoding='utf-8') as fw:json.dump(self.pre_last_time_data,fw)passif len(self.pre_last_time_executed_tickers)>0:temp_str = '上次被強制停止'tickers_str = '\n'.join(self.pre_last_time_executed_tickers)with open(os.path.join(basedir,'data','executed.txt'),'w',encoding='utf-8') as fw:fw.write(tickers_str)passpre_str = f"{self.txtdir_lineedit.text()};{self.savedir_lineedit.text()};{temp_str}"with open(os.path.join(basedir,'data','params.txt'),'w',encoding='utf-8') as fw:fw.write(pre_str)if self.waitting_close:self.close()else:self.txtdir_lineedit.setDisabled(False)self.savedir_lineedit.setDisabled(False)self.choicedir_btn.setDisabled(False)self.savedir_btn.setDisabled(False)self.execute_btn.setDisabled(False)self.infomation_dialog('所有工作線程停止完畢')passpassdef thread_error_fn(self,res:tuple):error_str = f"線程 {res[0]} 報錯。報錯類型:{res[1]}。值:{res[2]}。異常棧:{res[3]}"self.insert_executelog(error_str)pass
視頻:
https://www.bilibili.com/video/BV1zeZcYQEax/
https://www.bilibili.com/video/BV1BeZcYQEZq/
https://www.bilibili.com/video/BV1VSZAYJEUf/
https://www.bilibili.com/video/BV1KKZPYBEJV/
https://www.bilibili.com/video/BV1KKZPYBEG2/