文章目錄
- Python多線程與線程池
- 一、Python多線程
- 1.1 線程簡介
- 1.2 Python中的多線程
- 1.3 GIL限制
- 二、線程池
- 2.1 Python中的線程池
- 三、代碼分析
- 四、參考資料
Python多線程與線程池
一、Python多線程
在進行復雜的計算或處理大量數據時,可以通過創建多個線程來同時執行多個任務,從而提高程序的執行效率。這種技術稱為多線程編程。
1.1 線程簡介
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務。
1.2 Python中的多線程
Python中的threading
模塊提供了對線程的支持。使用threading
模塊創建線程,直接從threading.Thread
繼承,然后重寫__init__
方法和run
方法。
import threadingclass MyThread(threading.Thread):def __init__(self, n):super(MyThread, self).__init__()self.n = ndef run(self):print('running task', self.n)t1 = MyThread(1)
t2 = MyThread(2)t1.start()
t2.start()
1.3 GIL限制
由于Python解釋器設計中的全局解釋器鎖(Global Interpreter Lock,GIL)的存在,使得Python的多線程并不能利用多核優勢。GIL是計算機程序設計語言解釋器用于同步線程的工具,使得任何時刻只有一個線程在執行,即使在多核CPU平臺上,Python的線程也無法同時執行。
二、線程池
線程池是一種基于池化思想管理線程的工具。在開始任務時不再重新創建新的線程,而是直接從線程池中獲取一個空閑線程來執行。如果線程池中沒有空閑線程,新的任務就會等待(排隊),直到有線程空閑。當任務執行完畢后,線程并不立即銷毀,而是返回線程池等待下次被利用。
2.1 Python中的線程池
Python的concurrent.futures
模塊提供了高級別的異步執行封裝,包括線程池ThreadPoolExecutor和進程池ProcessPoolExecutor,它們都是Executor的子類。
from concurrent.futures import ThreadPoolExecutordef func(n):print(n)with ThreadPoolExecutor(max_workers=4) as executor:executor.map(func, range(1,5))
其中max_workers
參數表示線程池中最多可以同時運行的線程數量。
三、代碼分析
import requests
from requests.models import PreparedRequest
import json
import concurrent.futuresdef get_score_models(url):url_score = "https://bizapi.csdn.net/trends/api/v1/get-article-score"headers = {"accept": "application/json, text/plain, */*","x-ca-key": "203930474","x-ca-nonce": "22cd11a0-760a-45c1-8089-14e53123a852","x-ca-signature": "RaEczPkQ22Ep/k9/AI737gCtn8qX67CV/uGdhQiPIdQ=","x-ca-signature-headers": "x-ca-key,x-ca-nonce","x-ca-signed-content-type": "multipart/form-data"}data = {"url": url}response = send_request(url_score, data, headers)data1 = response.json()score_model = data1["data"]return score_modeldef send_request(url, data, headers):session = requests.Session()prepared_request = PreparedRequest()prepared_request.prepare(method='POST', url=url,headers=headers, data=data)return session.send(prepared_request)def process_article_json(article):# score_model = get_score_models(article['article_url'])score_model = get_score_models(article['url'])article['article_score'] = score_model['score']print(article["url"])return articleif __name__ == '__main__':# 讀取articles.json文件with open('articles.json', 'r') as f:articles = json.load(f)# 創建一個 ThreadPoolExecutor 實例,max_workers 表示線程池中最多可以同時運行的線程數量with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# 使用 map 函數將 process_article_json 應用到每個元素,并在多線程環境下并行處理processed_articles = list(executor.map(process_article_json, articles))# 保存處理后的結果到新的JSON文件output_file = 'processed_articles.json'with open(output_file, 'w') as f:json.dump(processed_articles, f, ensure_ascii=False, indent=4)
import requests
from requests.models import PreparedRequest
import json
import concurrent.futures
def get_score_models(url):
url_score = “https://bizapi.csdn.net/trends/api/v1/get-article-score”
headers = {
“accept”: “application/json, text/plain, /”,
“x-ca-key”: “203930474”,
“x-ca-nonce”: “22cd11a0-760a-45c1-8089-14e53123a852”,
“x-ca-signature”: “RaEczPkQ22Ep/k9/AI737gCtn8qX67CV/uGdhQiPIdQ=”,
“x-ca-signature-headers”: “x-ca-key,x-ca-nonce”,
“x-ca-signed-content-type”: “multipart/form-data”
}
data = {“url”: url}
response = send_request(url_score, data, headers)
data1 = response.json()
score_model = data1[“data”]
return score_model
def send_request(url, data, headers):
session = requests.Session()
prepared_request = PreparedRequest()
prepared_request.prepare(method=‘POST’, url=url,
headers=headers, data=data)
return session.send(prepared_request)
def process_article_json(article):
# score_model = get_score_models(article[‘article_url’])
score_model = get_score_models(article[‘url’])
article[‘article_score’] = score_model[‘score’]
print(article[“url”])
return article
if name == ‘main’:
# 讀取articles.json文件
with open(‘articles.json’, ‘r’) as f:
articles = json.load(f)
# 創建一個 ThreadPoolExecutor 實例,max_workers 表示線程池中最多可以同時運行的線程數量
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 使用 map 函數將 process_article_json 應用到每個元素,并在多線程環境下并行處理
processed_articles = list(executor.map(process_article_json, articles))
# 保存處理后的結果到新的JSON文件
output_file = ‘processed_articles.json’
with open(output_file, ‘w’) as f:
json.dump(processed_articles, f, ensure_ascii=False, indent=4)
以上給出的代碼片段主要涉及到的是線程池的使用。具體來說,首先從一個名為articles.json
的文件中讀取文章信息,然后利用線程池并發地獲取每篇文章的評分,并將評分添加到文章信息中,最后將處理后的文章信息保存到新的JSON文件。
代碼的主要部分如下:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:processed_articles = list(executor.map(process_article_json, articles))
這里,首先創建了一個ThreadPoolExecutor
實例,并設置最大并發線程數為5。然后使用executor.map()
函數將process_article_json
函數應用到articles
列表的每個元素上,這樣就可以在多線程環境下并行處理每篇文章了。由于executor.map()
函數返回的是一個迭代器,因此需要用list()
函數將其轉換為列表。
這種方式可以有效地提高處理大量文章信息的效率,特別是當獲取文章評分的過程涉及到網絡請求等I/O操作時,通過線程池并發處理可以顯著減少總的處理時間。
四、參考資料
- Python官方文檔:threading — Thread-based parallelism
- Python官方文檔:concurrent.futures — Launching parallel tasks
- Python線程池使用示例
- Python多線程與GIL
- 3. 爬取自己CSDN博客列表(分頁查詢)(網站反爬蟲策略,需要在代碼中添加合適的請求頭User-Agent,否則response返回空)