1.yield返回數據的原理?
為什么要用yield返回數據給管道?
- 遍歷這個函數的返回值的時候,挨個把數據讀到內存,不會造成內存的瞬間占用過高,
Python3
中的range
和python2
中的xrange
同理。 scrapy
是異步爬取,所以通過yield
能夠將運行權限教給其他的協程任務去執行,這樣整個程序運行效果會更高。
注意點:解析函數中的yield
能夠傳遞的對象只能是:BaseItem
、Request
、dict
、None
前置知識,關于數據的存儲
可以參考我另一篇文章python爬蟲之數據存儲_爬蟲代碼w是什么意思-CSDN博客
2.Mysql存儲管道封裝
第一步創建一個管道文件夾
咱們要用多管道,所以咱們不用自帶的pipelines.py文件,雖然可以寫在一個文件,但是如果寫多個導致代碼冗余。不好查看。
類名修改如下:
?第二步在setting.py設置開啟管道文件
按你的路徑修改
?數據越小,優先級越大
第四步設置管道
先下載? pip install pymsql
以豆瓣為例,這三個字段
?初始數據
def __init__(self):# 數據庫連接信息self.host = '127.0.0.1'self.user = 'root'self.passwd = '12345'self.db = 'spider2'self.charset = 'utf8mb4'
鏈接數據庫
def open_spider(self, spider):# 連接數據庫self.conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)self.cursor = self.conn.cursor()
注意open_spider是爬蟲開啟時自動運行,當前啟動的爬蟲對象,可以通過它獲取爬蟲的名稱、設置等信息。
建表,以爬蟲名稱為表名
self.cursor = self.conn.cursor()# 根據爬蟲名字創建表table_name = spider.namecreate_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (id INT AUTO_INCREMENT PRIMARY KEY,title VARCHAR(255),rating VARCHAR(50),url VARCHAR(255))"""self.cursor.execute(create_table_sql)self.conn.commit()
?插入數據
def process_item(self, item, spider):# 獲取爬蟲名字作為表名table_name = spider.name# 插入數據insert_sql = f"""INSERT INTO {table_name} (title, rating, url)VALUES (%s, %s, %s)"""self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))self.conn.commit()return item
process_item
?方法
作用:每當爬蟲抓取到一個數據項(item
)時,該方法會被調用來處理這個數據項。這是管道的核心方法,用于對數據項進行清洗、驗證、存儲等操作。
return item 作用:返回處理后的 item
,以便后續的管道可以繼續處理。如果拋出 DropItem
異常
?,則丟棄該數據項,不再傳遞給后續的管道。
def close_spider(self, spider):# 關閉數據庫連接self.cursor.close()self.conn.close()
close_spider()在爬蟲關閉時被調用,主要用于清理資源,如關閉數據庫連接、文件等操作。
完整代碼如下:
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html# useful for handling different item types with a single interface
from itemadapter import ItemAdapterimport pymysql
class Mysql_Pipeline:def __init__(self):# 數據庫連接信息self.host = '127.0.0.1'self.user = 'root'self.passwd = '12345'self.db = 'spider2'self.charset = 'utf8mb4'def open_spider(self, spider):# 連接數據庫self.conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)self.cursor = self.conn.cursor()# 根據爬蟲名字創建表table_name = spider.namecreate_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (id INT AUTO_INCREMENT PRIMARY KEY,title VARCHAR(255),rating VARCHAR(50),url VARCHAR(255))"""self.cursor.execute(create_table_sql)self.conn.commit()def process_item(self, item, spider):# 獲取爬蟲名字作為表名table_name = spider.name# 插入數據insert_sql = f"""INSERT INTO {table_name} (title, rating, url)VALUES (%s, %s, %s)"""self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))self.conn.commit()print('mysql插入成功')return itemdef close_spider(self, spider):# 關閉數據庫連接self.cursor.close()self.conn.close()
?結果如下:
?
3.
MongoDB存儲管道封裝
直接創建管道
開啟管道
ITEM_PIPELINES = {'myspider.pipelines.Mysql_pipelines.Mysql_Pipeline': 300,'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 200,
}
還是以豆瓣三個字段為例
pip insatll pymongo
import pymongo
from scrapy import signalsclass MongoDBPipeline:def __init__(self):self.mongo_uri = 'mongodb://localhost:27017/' # MongoDB 的 URI 地址self.mongo_db = 'spider' # MongoDB 的數據庫名稱def open_spider(self, spider):# 在爬蟲啟動時執行,用于初始化操作,如建立 MongoDB 連接self.client = pymongo.MongoClient(self.mongo_uri)#自動創建數據庫self.db = self.client[self.mongo_db]def close_spider(self, spider):# 在爬蟲關閉時執行,用于清理操作,如關閉 MongoDB 連接self.client.close()def process_item(self, item, spider):# 處理每個數據項,將數據存儲到 MongoDB 中collection_name = spider.name # 使用爬蟲名字作為集合名#自動創建表self.db[collection_name].insert_one(item) # 將數據插入到集合中print('MongoDB插入成功!')#字典字典插入return item
?原理差不多一樣,鏈接數據庫,插入數據庫,關閉數據庫。
雙數據庫插入
結果如下:
mongodb?
?
mysql?
?
但如果只想插入其中一個數據庫
要么注釋return item,優先級設置高,這樣會拋出錯誤
要么注釋管道開啟
?最好的還是在爬蟲里重寫配置
custom_settings = {'ITEM_PIPELINES': {'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 300,}}
這樣管道設置以爬蟲類為主,只插入mongodb
?
?4.文件管道封裝
圖片存儲
class FilePipeline:def process_item(self, item, spider):# getcwd(): 用于獲取當前工作目錄(Current Working Directory)的路徑#圖片下載if item.get("f_type"):if item.get("f_type") == 'img':download_path = os.getcwd() + f'/img/'if not os.path.exists(download_path):os.mkdir(download_path)# 圖片保存image_name = item.get("title")image_content = item.get("content")if image_content:with open(download_path+f'{image_name}.jpg', "wb") as f:f.write(image_content)print("圖片保存成功: ", image_name)
一般用item.get("f_type")區分文件下載用os生成文件夾
圖片存儲一共就兩個參數,響應體和名字,這樣配置,
在爬蟲里重寫設置
custom_settings = {'ITEM_PIPELINES': {'myspider.pipelines.File_piplines.FilePipeline': 300,}}
二次請求圖片url
以上有個小錯誤,把?item_img['title'] =item['title']改為item_img['title'] =item
?結果如下:
利用item.get("f_type")區分文件下載,文件管道可以封裝如下:
import os
import picklefrom itemadapter import ItemAdapterimport json
class FilePipeline:def process_item(self, item, spider):# getcwd(): 用于獲取當前工作目錄(Current Working Directory)的路徑#圖片下載if item.get("f_type"):if item.get("f_type") == 'img':download_path = os.getcwd() + f'/img/'if not os.path.exists(download_path):os.mkdir(download_path)# 圖片保存image_name = item.get("title")image_content = item.get("content")if image_content:with open(download_path+f'{image_name}.jpg', "wb") as f:f.write(image_content)print("圖片保存成功: ", image_name)elif item.get("f_type") == 'txt':download_path = os.getcwd() + '/txt/'if not os.path.exists(download_path):os.mkdir(download_path)# 文本保存txt_name = item.get("title")txt_content = item.get("content")with open(download_path+f'{txt_name}.txt', 'a', encoding='utf-8') as f:f.write(txt_content + '\n')print('文本存儲成功')elif item.get("f_type") == 'json':download_path = os.getcwd() + '/json/'if not os.path.exists(download_path):os.mkdir(download_path)# 文本保存json_name = item.get("title")json_obj = itemwith open(download_path+f'{json_name}.json', 'a', encoding='utf-8') as file:file.write(json.dumps(json_obj, indent=2, ensure_ascii=False), )print('json存儲成功')elif item.get("f_type") == 'music':download_path = os.getcwd() + '/music/'if not os.path.exists(download_path):os.mkdir(download_path)# 文本保存music_name = item.get("title")music_content = item.get("content")with open(download_path+f'{music_name}.mp3', 'a', encoding='utf-8') as f:f.write(music_content + '\n')print('MP3存儲成功')else:print('無事發生')
包括mp3,文本,json,圖片等文件下載。?
?但是事實上,一個文件可以進行多次存儲,這也是用yield返回給管道的主要原因
def parse(self, response, **kwargs):# scrapy的response對象可以直接進行xpathol_list = response.xpath('//ol[@class="grid_view"]/li')for ol in ol_list:# 創建一個數據字典item = {}# 利用scrapy封裝好的xpath選擇器定位元素,并通過extract()或extract_first()來獲取結果item['title'] = ol.xpath('.//div[@class="hd"]/a/span[1]/text()').extract_first()#標題item['content'] = ol.xpath('.//div[@class="bd"]/div/span[2]/text()').extract_first()#評分item['f_type'] = 'txt'yield itemitem['url'] = ol.xpath('.//a/@href').extract_first()#鏈接item['img_url'] = ol.xpath('.//img/@src').extract_first()item['f_type'] = 'json'yield item# yield itemyield scrapy.Request(url= item['img_url'] , headers=self.headers, callback=self.img_parse,cb_kwargs={"item":item['title']},meta={"meta":item})#自定義一個回調方法def img_parse(self, response,item):item_img = {'f_type':"img"}item_img['content'] = response.bodyitem_img['title'] =itemyield item_imgif __name__ == '__main__':cmdline.execute('scrapy crawl douban'.split())
以上三個yield,返回三次 ,分別是文本保存,json保存,圖片保存。
運行結果如下:
文本存儲評分
管道封裝到此一段落了。?