本文首發于知乎
本文使用多線程實現一個簡易爬蟲框架,讓我們只需要關注網頁的解析,不用自己設置多線程、隊列等事情。調用形式類似scrapy,而諸多功能還不完善,因此稱為簡易爬蟲框架。
這個框架實現了Spider
類,讓我們只需要寫出下面代碼,即可多線程運行爬蟲
class DouBan(Spider):def __init__(self):super(DouBan, self).__init__()self.start_url = 'https://movie.douban.com/top250'self.filename = 'douban.json' # 覆蓋默認值self.output_result = False self.thread_num = 10def start_requests(self): # 覆蓋默認函數yield (self.start_url, self.parse_first)def parse_first(self, url): # 只需要yield待爬url和回調函數r = requests.get(url)soup = BeautifulSoup(r.content, 'lxml')movies = soup.find_all('div', class_ = 'info')[:5]for movie in movies:url = movie.find('div', class_ = 'hd').a['href']yield (url, self.parse_second)nextpage = soup.find('span', class_ = 'next').aif nextpage:nexturl = self.start_url + nextpage['href']yield (nexturl, self.parse_first)else:self.running = False # 表明運行到這里則不會繼續添加待爬URL隊列def parse_second(self, url):r = requests.get(url)soup = BeautifulSoup(r.content, 'lxml')mydict = {}title = soup.find('span', property = 'v:itemreviewed')mydict['title'] = title.text if title else Noneduration = soup.find('span', property = 'v:runtime')mydict['duration'] = duration.text if duration else Nonetime = soup.find('span', property = 'v:initialReleaseDate')mydict['time'] = time.text if time else Noneyield mydictif __name__ == '__main__':douban = DouBan()douban.run()
復制代碼
可以看到這個使用方式和scrapy非常相似
- 繼承類,只需要寫解析函數(因為是簡易框架,因此還需要寫請求函數)
- 用yield返回數據或者新的請求及回調函數
- 自動多線程(scrapy是異步)
- 運行都一樣只要
run
- 可以設置是否存儲到文件等,只是沒有考慮可擴展性(數據庫等)
下面我們來說一說它是怎么實現的
我們可以對比下面兩個版本,一個是上一篇文章中的使用方法,另一個是進行了一些修改,將一些功能抽象出來,以便擴展功能。
上一篇文章版本代碼請讀者自行點擊鏈接去看,下面是修改后的版本代碼。
import requests
import time
import threading
from queue import Queue, Empty
import json
from bs4 import BeautifulSoupdef run_time(func):def wrapper(*args, **kw):start = time.time()func(*args, **kw)end = time.time()print('running', end-start, 's')return wrapperclass Spider():def __init__(self):self.start_url = 'https://movie.douban.com/top250'self.qtasks = Queue()self.data = list()self.thread_num = 5self.running = Truedef start_requests(self):yield (self.start_url, self.parse_first)def parse_first(self, url):r = requests.get(url)soup = BeautifulSoup(r.content, 'lxml')movies = soup.find_all('div', class_ = 'info')[:5]for movie in movies:url = movie.find('div', class_ = 'hd').a['href']yield (url, self.parse_second)nextpage = soup.find('span', class_ = 'next').aif nextpage:nexturl = self.start_url + nextpage['href']yield (nexturl, self.parse_first)else:self.running = Falsedef parse_second(self, url):r = requests.get(url)soup = BeautifulSoup(r.content, 'lxml')mydict = {}title = soup.find('span', property = 'v:itemreviewed')mydict['title'] = title.text if title else Noneduration = soup.find('span', property = 'v:runtime')mydict['duration'] = duration.text if duration else Nonetime = soup.find('span', property = 'v:initialReleaseDate')mydict['time'] = time.text if time else Noneyield mydictdef start_req(self):for task in self.start_requests():self.qtasks.put(task)def parses(self):while self.running or not self.qtasks.empty():try:url, func = self.qtasks.get(timeout=3)print('crawling', url)for task in func(url):if isinstance(task, tuple):self.qtasks.put(task)elif isinstance(task, dict):self.data.append(task)else:raise TypeError('parse functions have to yield url-function tuple or data dict')except Empty:print('{}: Timeout occurred'.format(threading.current_thread().name))print(threading.current_thread().name, 'finished')@run_timedef run(self, filename=False):ths = []th1 = threading.Thread(target=self.start_req)th1.start()ths.append(th1)for _ in range(self.thread_num):th = threading.Thread(target=self.parses)th.start()ths.append(th)for th in ths:th.join()if filename:s = json.dumps(self.data, ensure_ascii=False, indent=4)with open(filename, 'w', encoding='utf-8') as f:f.write(s)print('Data crawling is finished.')if __name__ == '__main__':Spider().run(filename='frame.json')
復制代碼
這個改進主要思路如下
- 我們希望寫解析函數時,像scrapy一樣,用yield返回待抓取的URL和它對應的解析函數,于是就做了一個包含(URL,解析函數)的元組隊列,之后只要不斷從隊列中獲取元素,用函數解析url即可,這個提取的過程使用多線程
yield
可以返回兩種類型數據,一種是元組(URL,解析函數),一種是字典(即我們要的數據),通過判斷分別加入不同隊列中。元組隊列是不斷消耗和增添的過程,而字典隊列是一只增加,最后再一起輸出到文件中- 在
queue.get
時,加入了timeout
參數并做異常處理,保證每一個線程都能結束
這里其實沒有特別的知識,也不需要解釋很多,讀者自己復制代碼到文本文件里對比就知道了
然后框架的形式就是從第二種中,剝離一些通用的設定,讓用戶自定義每個爬蟲獨特的部分,完整代碼如下(本文開頭的代碼就是下面這塊代碼的后半部分)
import requests
import time
import threading
from queue import Queue, Empty
import json
from bs4 import BeautifulSoupdef run_time(func):def wrapper(*args, **kw):start = time.time()func(*args, **kw)end = time.time()print('running', end-start, 's')return wrapperclass Spider():def __init__(self):self.qtasks = Queue()self.data = list()self.thread_num = 5self.running = Trueself.filename = Falseself.output_result = Truedef start_requests(self):yield (self.start_url, self.parse)def start_req(self):for task in self.start_requests():self.qtasks.put(task)def parses(self):while self.running or not self.qtasks.empty():try:url, func = self.qtasks.get(timeout=3)print('crawling', url)for task in func(url):if isinstance(task, tuple):self.qtasks.put(task)elif isinstance(task, dict):if self.output_result:print(task)self.data.append(task)else:raise TypeError('parse functions have to yield url-function tuple or data dict')except Empty:print('{}: Timeout occurred'.format(threading.current_thread().name))print(threading.current_thread().name, 'finished')@run_timedef run(self):ths = []th1 = threading.Thread(target=self.start_req)th1.start()ths.append(th1)for _ in range(self.thread_num):th = threading.Thread(target=self.parses)th.start()ths.append(th)for th in ths:th.join()if self.filename:s = json.dumps(self.data, ensure_ascii=False, indent=4)with open(self.filename, 'w', encoding='utf-8') as f:f.write(s)print('Data crawling is finished.')class DouBan(Spider):def __init__(self):super(DouBan, self).__init__()self.start_url = 'https://movie.douban.com/top250'self.filename = 'douban.json' # 覆蓋默認值self.output_result = False self.thread_num = 10def start_requests(self): # 覆蓋默認函數yield (self.start_url, self.parse_first)def parse_first(self, url): # 只需要yield待爬url和回調函數r = requests.get(url)soup = BeautifulSoup(r.content, 'lxml')movies = soup.find_all('div', class_ = 'info')[:5]for movie in movies:url = movie.find('div', class_ = 'hd').a['href']yield (url, self.parse_second)nextpage = soup.find('span', class_ = 'next').aif nextpage:nexturl = self.start_url + nextpage['href']yield (nexturl, self.parse_first)else:self.running = False # 表明運行到這里則不會繼續添加待爬URL隊列def parse_second(self, url):r = requests.get(url)soup = BeautifulSoup(r.content, 'lxml')mydict = {}title = soup.find('span', property = 'v:itemreviewed')mydict['title'] = title.text if title else Noneduration = soup.find('span', property = 'v:runtime')mydict['duration'] = duration.text if duration else Nonetime = soup.find('span', property = 'v:initialReleaseDate')mydict['time'] = time.text if time else Noneyield mydictif __name__ == '__main__':douban = DouBan()douban.run()
復制代碼
我們這樣剝離之后,就只需要寫后半部分的代碼,只關心網頁的解析,不用考慮多線程的實現了。
歡迎關注我的知乎專欄
專欄主頁:python編程
專欄目錄:目錄
版本說明:軟件及包版本說明