Python - 爬蟲;Scrapy框架之插件Extensions(四)

閱讀本文前先參考

https://blog.csdn.net/MinggeQingchun/article/details/145904572

在 Scrapy 中,擴展(Extensions)是一種插件,允許你添加額外的功能到你的爬蟲項目中。這些擴展可以在項目的不同階段執行,比如啟動、關閉、處理請求、處理響應等。

Extensions官網文檔:Extensions — Scrapy 2.12.0 documentation

Signals官網文檔:

在 Scrapy 中,擴展是通過實現?scrapy.interfaces.ISpiderLoaderscrapy.interfaces.IDownloaderMiddlewarescrapy.interfaces.IExtension?等接口來定義的。最常用的擴展接口是?IExtension

一、創建和使用擴展

1、定義擴展

首先,定義一個擴展類,該類需要實現?scrapy.extensions.Extension?類。例如,創建一個簡單的擴展來記錄每個請求的 URL:

from scrapy import signalsclass UrlLogExtension:def __init__(self, stats):self.stats = stats@classmethoddef from_crawler(cls, crawler):# 從爬蟲設置中獲取統計對象stats = crawler.statsext = cls(stats)crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)return extdef spider_opened(self, spider):self.stats.set_value('url_count', 0)def spider_closed(self, spider):url_count = self.stats.get_value('url_count')print(f"Total URLs processed: {url_count}")
2、在 settings.py 中啟用擴展

在Scrapy 項目的?settings.py?文件中,添加你的擴展到?EXTENSIONS?設置中:

EXTENSIONS = {'path.to.your.extension.UrlLogExtension': 500,  # 數字表示優先級,數字越小優先級越高
}
3、編寫中間件或信號處理邏輯(如果需要)

如果你的擴展需要處理特定的信號(如請求、響應等),你可以在擴展的類中定義相應的方法,并通過?crawler.signals.connect?方法連接到這些信號。例如,在上面的?UrlLogExtension?中,我們連接了?spider_opened?和?spider_closed?信號。

內置擴展

Scrapy 提供了9個內置的擴展:

  • scrapy.extensions.corestats.CoreStats:scrapy核心數據統計

  • scrapy.extensions.telnet.TelnetConsole:scrapy運行時開啟tcp服務,利用telnet進行連接查詢scrapy的實時狀態

  • scrapy.extensions.memusage.MemoryUsage:內存使用預警功能,不能在window上面使用

  • scrapy.extensions.memdebug.MemoryDebugger:開啟gc,垃圾回收,然后統計對應的信息

  • scrapy.extensions.closespider.CloseSpider:主要功能是控制超時個數、page個數、item個數、錯誤次數

  • scrapy.extensions.feedexport.FeedExporter:將抓取的數據導出到文件。支持多種序列化格式(如JSON、CSV、XML等)和存儲后端(如本地文件系統、FTP、S3等),使得用戶可以根據需求將數據導出為所需的格式并保存到適當的存儲介質中?

  • scrapy.extensions.logstats.LogStats:主要統計page、item的個數等信息,從而計算頻率。

  • scrapy.extensions.spiderstate.SpiderState:保存SpiderState信息

  • scrapy.extensions.throttle.AutoThrottle:自適應調整延遲下載時間

在Scrapy下的default_settings.py文件中

D:\xx\項目\env\Lib\site-packages\scrapy\settings\default_settings.py

EXTENSIONS = {}EXTENSIONS_BASE = {"scrapy.extensions.corestats.CoreStats": 0,"scrapy.extensions.telnet.TelnetConsole": 0,"scrapy.extensions.memusage.MemoryUsage": 0,"scrapy.extensions.memdebug.MemoryDebugger": 0,"scrapy.extensions.closespider.CloseSpider": 0,"scrapy.extensions.feedexport.FeedExporter": 0,"scrapy.extensions.logstats.LogStats": 0,"scrapy.extensions.spiderstate.SpiderState": 0,"scrapy.extensions.throttle.AutoThrottle": 0,
}

可以在?settings.py?中啟用這些擴展,如:

EXTENSIONS = {'scrapy.extensions.logstats.LogStats': 500, # 日志統計信息'scrapy.extensions.telnet.TelnetConsole': 500, # Telnet 控制臺
}

二、創建和使用擴展?

1、scrapy.extensions.corestats.CoreStats

"""
Extension for collecting core stats like items scraped and start/finish times
"""from __future__ import annotationsfrom datetime import datetime, timezone
from typing import TYPE_CHECKING, Anyfrom scrapy import Spider, signalsif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.statscollectors import StatsCollector[docs]class CoreStats:def __init__(self, stats: StatsCollector):self.stats: StatsCollector = statsself.start_time: datetime | None = None@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:assert crawler.statso = cls(crawler.stats)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)crawler.signals.connect(o.item_scraped, signal=signals.item_scraped)crawler.signals.connect(o.item_dropped, signal=signals.item_dropped)crawler.signals.connect(o.response_received, signal=signals.response_received)return odef spider_opened(self, spider: Spider) -> None:self.start_time = datetime.now(tz=timezone.utc)self.stats.set_value("start_time", self.start_time, spider=spider)def spider_closed(self, spider: Spider, reason: str) -> None:assert self.start_time is not Nonefinish_time = datetime.now(tz=timezone.utc)elapsed_time = finish_time - self.start_timeelapsed_time_seconds = elapsed_time.total_seconds()self.stats.set_value("elapsed_time_seconds", elapsed_time_seconds, spider=spider)self.stats.set_value("finish_time", finish_time, spider=spider)self.stats.set_value("finish_reason", reason, spider=spider)def item_scraped(self, item: Any, spider: Spider) -> None:self.stats.inc_value("item_scraped_count", spider=spider)def response_received(self, spider: Spider) -> None:self.stats.inc_value("response_received_count", spider=spider)def item_dropped(self, item: Any, spider: Spider, exception: BaseException) -> None:reason = exception.__class__.__name__self.stats.inc_value("item_dropped_count", spider=spider)self.stats.inc_value(f"item_dropped_reasons_count/{reason}", spider=spider)

監聽spider_opened、spider_closed、item_scraped、item_dropped、response_received信號,進行數據統計。

2、scrapy.extensions.telnet.TelnetConsole

"""
Scrapy Telnet Console extensionSee documentation in docs/topics/telnetconsole.rst
"""from __future__ import annotationsimport binascii
import logging
import os
import pprint
from typing import TYPE_CHECKING, Anyfrom twisted.internet import protocol
from twisted.internet.tcp import Portfrom scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.decorators import defers
from scrapy.utils.engine import print_engine_status
from scrapy.utils.reactor import listen_tcp
from scrapy.utils.trackref import print_live_refsif TYPE_CHECKING:from twisted.conch import telnet# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerlogger = logging.getLogger(__name__)# signal to update telnet variables
# args: telnet_vars
update_telnet_vars = object()[docs]class TelnetConsole(protocol.ServerFactory):def __init__(self, crawler: Crawler):if not crawler.settings.getbool("TELNETCONSOLE_ENABLED"):raise NotConfiguredself.crawler: Crawler = crawlerself.noisy: bool = Falseself.portrange: list[int] = [int(x) for x in crawler.settings.getlist("TELNETCONSOLE_PORT")]self.host: str = crawler.settings["TELNETCONSOLE_HOST"]self.username: str = crawler.settings["TELNETCONSOLE_USERNAME"]self.password: str = crawler.settings["TELNETCONSOLE_PASSWORD"]if not self.password:self.password = binascii.hexlify(os.urandom(8)).decode("utf8")logger.info("Telnet Password: %s", self.password)self.crawler.signals.connect(self.start_listening, signals.engine_started)self.crawler.signals.connect(self.stop_listening, signals.engine_stopped)@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:return cls(crawler)def start_listening(self) -> None:self.port: Port = listen_tcp(self.portrange, self.host, self)h = self.port.getHost()logger.info("Telnet console listening on %(host)s:%(port)d",{"host": h.host, "port": h.port},extra={"crawler": self.crawler},)def stop_listening(self) -> None:self.port.stopListening()def protocol(self) -> telnet.TelnetTransport:  # type: ignore[override]# these import twisted.internet.reactorfrom twisted.conch import manhole, telnetfrom twisted.conch.insults import insultsclass Portal:"""An implementation of IPortal"""@defersdef login(self_, credentials, mind, *interfaces):if not (credentials.username == self.username.encode("utf8")and credentials.checkPassword(self.password.encode("utf8"))):raise ValueError("Invalid credentials")protocol = telnet.TelnetBootstrapProtocol(insults.ServerProtocol, manhole.Manhole, self._get_telnet_vars())return (interfaces[0], protocol, lambda: None)return telnet.TelnetTransport(telnet.AuthenticatingTelnetProtocol, Portal())def _get_telnet_vars(self) -> dict[str, Any]:# Note: if you add entries here also update topics/telnetconsole.rstassert self.crawler.enginetelnet_vars: dict[str, Any] = {"engine": self.crawler.engine,"spider": self.crawler.engine.spider,"slot": self.crawler.engine.slot,"crawler": self.crawler,"extensions": self.crawler.extensions,"stats": self.crawler.stats,"settings": self.crawler.settings,"est": lambda: print_engine_status(self.crawler.engine),"p": pprint.pprint,"prefs": print_live_refs,"help": "This is Scrapy telnet console. For more info see: ""https://docs.scrapy.org/en/latest/topics/telnetconsole.html",}self.crawler.signals.send_catch_log(update_telnet_vars, telnet_vars=telnet_vars)return telnet_vars

通過telnet可以執行本地的變量有engine、spider、slot、crawler、extensions、stats、settings、est、p、prefs、help等。

3、scrapy.extensions.memusage.MemoryUsage 內存利用

"""
MemoryUsage extensionSee documentation in docs/topics/extensions.rst
"""from __future__ import annotationsimport logging
import socket
import sys
from importlib import import_module
from pprint import pformat
from typing import TYPE_CHECKINGfrom twisted.internet import taskfrom scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.mail import MailSender
from scrapy.utils.engine import get_engine_statusif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerlogger = logging.getLogger(__name__)[docs]class MemoryUsage:def __init__(self, crawler: Crawler):if not crawler.settings.getbool("MEMUSAGE_ENABLED"):raise NotConfiguredtry:# stdlib's resource module is only available on unix platforms.self.resource = import_module("resource")except ImportError:raise NotConfiguredself.crawler: Crawler = crawlerself.warned: bool = Falseself.notify_mails: list[str] = crawler.settings.getlist("MEMUSAGE_NOTIFY_MAIL")self.limit: int = crawler.settings.getint("MEMUSAGE_LIMIT_MB") * 1024 * 1024self.warning: int = crawler.settings.getint("MEMUSAGE_WARNING_MB") * 1024 * 1024self.check_interval: float = crawler.settings.getfloat("MEMUSAGE_CHECK_INTERVAL_SECONDS")self.mail: MailSender = MailSender.from_crawler(crawler)crawler.signals.connect(self.engine_started, signal=signals.engine_started)crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:return cls(crawler)def get_virtual_size(self) -> int:size: int = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrssif sys.platform != "darwin":# on macOS ru_maxrss is in bytes, on Linux it is in KBsize *= 1024return sizedef engine_started(self) -> None:assert self.crawler.statsself.crawler.stats.set_value("memusage/startup", self.get_virtual_size())self.tasks: list[task.LoopingCall] = []tsk = task.LoopingCall(self.update)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.limit:tsk = task.LoopingCall(self._check_limit)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.warning:tsk = task.LoopingCall(self._check_warning)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)def engine_stopped(self) -> None:for tsk in self.tasks:if tsk.running:tsk.stop()def update(self) -> None:assert self.crawler.statsself.crawler.stats.max_value("memusage/max", self.get_virtual_size())def _check_limit(self) -> None:assert self.crawler.engineassert self.crawler.statspeak_mem_usage = self.get_virtual_size()if peak_mem_usage > self.limit:self.crawler.stats.set_value("memusage/limit_reached", 1)mem = self.limit / 1024 / 1024logger.error("Memory usage exceeded %(memusage)dMiB. Shutting down Scrapy...",{"memusage": mem},extra={"crawler": self.crawler},)if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} terminated: "f"memory usage exceeded {mem}MiB at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value("memusage/limit_notified", 1)if self.crawler.engine.spider is not None:self.crawler.engine.close_spider(self.crawler.engine.spider, "memusage_exceeded")else:self.crawler.stop()else:logger.info("Peak memory usage is %(virtualsize)dMiB",{"virtualsize": peak_mem_usage / 1024 / 1024},)def _check_warning(self) -> None:if self.warned:  # warn only oncereturnassert self.crawler.statsif self.get_virtual_size() > self.warning:self.crawler.stats.set_value("memusage/warning_reached", 1)mem = self.warning / 1024 / 1024logger.warning("Memory usage reached %(memusage)dMiB",{"memusage": mem},extra={"crawler": self.crawler},)if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} warning: "f"memory usage reached {mem}MiB at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value("memusage/warning_notified", 1)self.warned = Truedef _send_report(self, rcpts: list[str], subject: str) -> None:"""send notification mail with some additional useful info"""assert self.crawler.engineassert self.crawler.statsstats = self.crawler.statss = f"Memory usage at engine startup : {stats.get_value('memusage/startup') / 1024 / 1024}M\r\n"s += f"Maximum memory usage          : {stats.get_value('memusage/max') / 1024 / 1024}M\r\n"s += f"Current memory usage          : {self.get_virtual_size() / 1024 / 1024}M\r\n"s += ("ENGINE STATUS ------------------------------------------------------- \r\n")s += "\r\n"s += pformat(get_engine_status(self.crawler.engine))s += "\r\n"self.mail.send(rcpts, subject, s)

該功能執行需要部署在linux上,可以配置預警監控、發送預警郵件等,

配置預警郵件參數:

MAIL_HOST = 'localhost' # 郵件服務器
MAIL_PORT = 25 # 郵箱端口號
MAIL_FROM = 'scrapy@localhost' # 郵箱名稱
MAIL_PASS = None # 郵箱密碼
MAIL_USER = None # 郵箱地址

配置預警監控的參數如下:

MEMUSAGE_CHECK_INTERVAL_SECONDS = 60.0 ?# 每60s檢測一次
MEMUSAGE_ENABLED = True ?# 開啟預警監控
MEMUSAGE_LIMIT_MB = 0 # 預警限制使用內存
MEMUSAGE_NOTIFY_MAIL = [] # 預警郵件接收郵箱
MEMUSAGE_WARNING_MB = 0 # 預警警告信息內存大小
當使用內存查過limit和waring內存時,會發送對應的郵件提醒。

4、scrapy.extensions.memdebug.MemoryDebugger?

"""
MemoryDebugger extensionSee documentation in docs/topics/extensions.rst
"""from __future__ import annotationsimport gc
from typing import TYPE_CHECKINGfrom scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.trackref import live_refsif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.statscollectors import StatsCollector[docs]class MemoryDebugger:def __init__(self, stats: StatsCollector):self.stats: StatsCollector = stats@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:if not crawler.settings.getbool("MEMDEBUG_ENABLED"):raise NotConfiguredassert crawler.statso = cls(crawler.stats)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_closed(self, spider: Spider, reason: str) -> None:gc.collect()self.stats.set_value("memdebug/gc_garbage_count", len(gc.garbage), spider=spider)for cls, wdict in live_refs.items():if not wdict:continueself.stats.set_value(f"memdebug/live_refs/{cls.__name__}", len(wdict), spider=spider)

參數

MEMDEBUG_ENABLED = False ?# enable memory debugging
MEMDEBUG_NOTIFY = [] ?# send memory debugging report by mail at engine shutdown
其中MEMDEBUG_NOTITY目前項目中未使用。

主要功能就是開啟gc,垃圾回收,然后統計對應的信息。

5、scrapy.extensions.closespider.CloseSpider

"""CloseSpider is an extension that forces spiders to be closed after certain
conditions are met.See documentation in docs/topics/extensions.rst
"""from __future__ import annotationsimport logging
from collections import defaultdict
from typing import TYPE_CHECKING, Anyfrom scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfiguredif TYPE_CHECKING:from twisted.python.failure import Failure# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.http import Responselogger = logging.getLogger(__name__)[docs]class CloseSpider:def __init__(self, crawler: Crawler):self.crawler: Crawler = crawlerself.close_on: dict[str, Any] = {"timeout": crawler.settings.getfloat("CLOSESPIDER_TIMEOUT"),"itemcount": crawler.settings.getint("CLOSESPIDER_ITEMCOUNT"),"pagecount": crawler.settings.getint("CLOSESPIDER_PAGECOUNT"),"errorcount": crawler.settings.getint("CLOSESPIDER_ERRORCOUNT"),"timeout_no_item": crawler.settings.getint("CLOSESPIDER_TIMEOUT_NO_ITEM"),"pagecount_no_item": crawler.settings.getint("CLOSESPIDER_PAGECOUNT_NO_ITEM"),}if not any(self.close_on.values()):raise NotConfiguredself.counter: defaultdict[str, int] = defaultdict(int)if self.close_on.get("errorcount"):crawler.signals.connect(self.error_count, signal=signals.spider_error)if self.close_on.get("pagecount") or self.close_on.get("pagecount_no_item"):crawler.signals.connect(self.page_count, signal=signals.response_received)if self.close_on.get("timeout"):crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)if self.close_on.get("itemcount") or self.close_on.get("pagecount_no_item"):crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)if self.close_on.get("timeout_no_item"):self.timeout_no_item: int = self.close_on["timeout_no_item"]self.items_in_period: int = 0crawler.signals.connect(self.spider_opened_no_item, signal=signals.spider_opened)crawler.signals.connect(self.item_scraped_no_item, signal=signals.item_scraped)crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:return cls(crawler)def error_count(self, failure: Failure, response: Response, spider: Spider) -> None:self.counter["errorcount"] += 1if self.counter["errorcount"] == self.close_on["errorcount"]:assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_errorcount")def page_count(self, response: Response, request: Request, spider: Spider) -> None:self.counter["pagecount"] += 1self.counter["pagecount_since_last_item"] += 1if self.counter["pagecount"] == self.close_on["pagecount"]:assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_pagecount")returnif self.close_on["pagecount_no_item"] and (self.counter["pagecount_since_last_item"]>= self.close_on["pagecount_no_item"]):assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_pagecount_no_item")def spider_opened(self, spider: Spider) -> None:from twisted.internet import reactorassert self.crawler.engineself.task = reactor.callLater(self.close_on["timeout"],self.crawler.engine.close_spider,spider,reason="closespider_timeout",)def item_scraped(self, item: Any, spider: Spider) -> None:self.counter["itemcount"] += 1self.counter["pagecount_since_last_item"] = 0if self.counter["itemcount"] == self.close_on["itemcount"]:assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_itemcount")def spider_closed(self, spider: Spider) -> None:task = getattr(self, "task", None)if task and task.active():task.cancel()task_no_item = getattr(self, "task_no_item", None)if task_no_item and task_no_item.running:task_no_item.stop()def spider_opened_no_item(self, spider: Spider) -> None:from twisted.internet import taskself.task_no_item = task.LoopingCall(self._count_items_produced, spider)self.task_no_item.start(self.timeout_no_item, now=False)logger.info(f"Spider will stop when no items are produced after "f"{self.timeout_no_item} seconds.")def item_scraped_no_item(self, item: Any, spider: Spider) -> None:self.items_in_period += 1def _count_items_produced(self, spider: Spider) -> None:if self.items_in_period >= 1:self.items_in_period = 0else:logger.info(f"Closing spider since no items were produced in the last "f"{self.timeout_no_item} seconds.")assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_timeout_no_item")

參數

CLOSESPIDER_TIMEOUT = 0 ?# download超時次數超過該數值時關系Spider
CLOSESPIDER_PAGECOUNT = 0 ?# download page個數超過該數值時關系Spider
CLOSESPIDER_ITEMCOUNT = 0 ?# pipeline item個數超過該數值時關系Spider
CLOSESPIDER_ERRORCOUNT = 0 ?# download 錯誤次數超過該數值時關系Spider

主要功能是控制超時個數、page個數、item個數、錯誤次數等。

6、scrapy.extensions.logstats.LogStats

from __future__ import annotationsimport logging
from typing import TYPE_CHECKINGfrom twisted.internet import taskfrom scrapy import Spider, signals
from scrapy.exceptions import NotConfiguredif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.statscollectors import StatsCollectorlogger = logging.getLogger(__name__)[docs]class LogStats:"""Log basic scraping stats periodically like:* RPM - Requests per Minute* IPM - Items per Minute"""def __init__(self, stats: StatsCollector, interval: float = 60.0):self.stats: StatsCollector = statsself.interval: float = intervalself.multiplier: float = 60.0 / self.intervalself.task: task.LoopingCall | None = None@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:interval: float = crawler.settings.getfloat("LOGSTATS_INTERVAL")if not interval:raise NotConfiguredassert crawler.statso = cls(crawler.stats, interval)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_opened(self, spider: Spider) -> None:self.pagesprev: int = 0self.itemsprev: int = 0self.task = task.LoopingCall(self.log, spider)self.task.start(self.interval)def log(self, spider: Spider) -> None:self.calculate_stats()msg = ("Crawled %(pages)d pages (at %(pagerate)d pages/min), ""scraped %(items)d items (at %(itemrate)d items/min)")log_args = {"pages": self.pages,"pagerate": self.prate,"items": self.items,"itemrate": self.irate,}logger.info(msg, log_args, extra={"spider": spider})def calculate_stats(self) -> None:self.items: int = self.stats.get_value("item_scraped_count", 0)self.pages: int = self.stats.get_value("response_received_count", 0)self.irate: float = (self.items - self.itemsprev) * self.multiplierself.prate: float = (self.pages - self.pagesprev) * self.multiplierself.pagesprev, self.itemsprev = self.pages, self.itemsdef spider_closed(self, spider: Spider, reason: str) -> None:if self.task and self.task.running:self.task.stop()rpm_final, ipm_final = self.calculate_final_stats(spider)self.stats.set_value("responses_per_minute", rpm_final)self.stats.set_value("items_per_minute", ipm_final)def calculate_final_stats(self, spider: Spider) -> tuple[None, None] | tuple[float, float]:start_time = self.stats.get_value("start_time")finished_time = self.stats.get_value("finished_time")if not start_time or not finished_time:return None, Nonemins_elapsed = (finished_time - start_time).seconds / 60items = self.stats.get_value("item_scraped_count", 0)pages = self.stats.get_value("response_received_count", 0)return (pages / mins_elapsed), (items / mins_elapsed)

參數

LOGSTATS_INTERVAL = 60.0 # 每60s統計一次數據 當為0時,則不進行統計
主要統計page、item的個數等信息,從而計算頻率。

7、scrapy.extensions.spiderstate.SpiderState

from __future__ import annotationsimport pickle  # nosec
from pathlib import Path
from typing import TYPE_CHECKINGfrom scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.job import job_dirif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawler[docs]class SpiderState:"""Store and load spider state during a scraping job"""def __init__(self, jobdir: str | None = None):self.jobdir: str | None = jobdir@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:jobdir = job_dir(crawler.settings)if not jobdir:raise NotConfiguredobj = cls(jobdir)crawler.signals.connect(obj.spider_closed, signal=signals.spider_closed)crawler.signals.connect(obj.spider_opened, signal=signals.spider_opened)return objdef spider_closed(self, spider: Spider) -> None:if self.jobdir:with Path(self.statefn).open("wb") as f:assert hasattr(spider, "state")  # set in spider_openedpickle.dump(spider.state, f, protocol=4)def spider_opened(self, spider: Spider) -> None:if self.jobdir and Path(self.statefn).exists():with Path(self.statefn).open("rb") as f:spider.state = pickle.load(f)  # type: ignore[attr-defined]  # nosecelse:spider.state = {}  # type: ignore[attr-defined]@propertydef statefn(self) -> str:assert self.jobdirreturn str(Path(self.jobdir, "spider.state"))

參數

JOBDIR='' # 項目spider state保存地址
配置JOBDIR時,會自動創建文件夾然后保存spider state到文件夾內。默認是不配置的。

8、scrapy.extensions.throttle.AutoThrottle

class AutoThrottle:def __init__(self, crawler):self.crawler = crawlerif not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):raise NotConfiguredself.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")self.target_concurrency = crawler.settings.getfloat("AUTOTHROTTLE_TARGET_CONCURRENCY")crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def _spider_opened(self, spider):self.mindelay = self._min_delay(spider)self.maxdelay = self._max_delay(spider)spider.download_delay = self._start_delay(spider)def _min_delay(self, spider):s = self.crawler.settingsreturn getattr(spider, 'download_delay', s.getfloat('DOWNLOAD_DELAY'))def _max_delay(self, spider):return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY')def _start_delay(self, spider):return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY'))def _response_downloaded(self, response, request, spider):key, slot = self._get_slot(request, spider)latency = request.meta.get('download_latency')if latency is None or slot is None:returnolddelay = slot.delayself._adjust_delay(slot, latency, response)if self.debug:diff = slot.delay - olddelaysize = len(response.body)conc = len(slot.transferring)logger.info("slot: %(slot)s | conc:%(concurrency)2d | ""delay:%(delay)5d ms (%(delaydiff)+d) | ""latency:%(latency)5d ms | size:%(size)6d bytes",{'slot': key, 'concurrency': conc,'delay': slot.delay * 1000, 'delaydiff': diff * 1000,'latency': latency * 1000, 'size': size},extra={'spider': spider})def _get_slot(self, request, spider):key = request.meta.get('download_slot')return key, self.crawler.engine.downloader.slots.get(key)def _adjust_delay(self, slot, latency, response):"""Define delay adjustment policy"""# If a server needs `latency` seconds to respond then# we should send a request each `latency/N` seconds# to have N requests processed in paralleltarget_delay = latency / self.target_concurrency# Adjust the delay to make it closer to target_delaynew_delay = (slot.delay + target_delay) / 2.0# If target delay is bigger than old delay, then use it instead of mean.# It works better with problematic sites.new_delay = max(target_delay, new_delay)# Make sure self.mindelay <= new_delay <= self.max_delaynew_delay = min(max(self.mindelay, new_delay), self.maxdelay)# Dont adjust delay if response status != 200 and new delay is smaller# than old one, as error pages (and redirections) are usually small and# so tend to reduce latency, thus provoking a positive feedback by# reducing delay instead of increase.if response.status != 200 and new_delay <= slot.delay:returnslot.delay = new_delay

參數

AUTOTHROTTLE_ENABLED = False # 是否開啟自適應下載延遲
AUTOTHROTTLE_DEBUG = False # 是否開啟自適應DEBUG
AUTOTHROTTLE_MAX_DELAY = 60.0 # 最大延遲60s
AUTOTHROTTLE_START_DELAY = 5.0 # 開始延遲5s
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # 自動調整精度為1s

該功能默認不開啟。

參考鏈接

Scrapy 源碼分析 4 extensions middlewares詳解_scrapy.extensions.logstats-CSDN博客

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:
http://www.pswp.cn/news/907828.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/907828.shtml
英文地址,請注明出處:http://en.pswp.cn/news/907828.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

95套HTML高端大數據可視化大屏源碼分享

概述?? 在大數據時代&#xff0c;數據可視化已成為各行各業的重要需求。這里精心整理了95套高端HTML大數據可視化大屏源碼&#xff0c;這些資源采用現代化設計風格&#xff0c;可幫助開發者快速構建專業的數據展示界面。 ??主要內容?? ??1. 設計風格與特點?? 采用…

redis未授權(CVE-2022-0543)

概述 Redis 默認綁定在 0.0.0.0:6379&#xff0c;在未配置防火墻或訪問控制的情況下會將服務暴露在公網上。若未設置訪問密碼&#xff08;默認通常為空&#xff09;&#xff0c;攻擊者可直接未授權訪問 Redis。利用 Redis 提供的 CONFIG 命令&#xff0c;攻擊者可修改配置并將…

(面試)OkHttp實現原理

OkHttp 是一個高效的 HTTP 客戶端&#xff0c;被廣泛應用于 Android 和 Java 應用中。它提供了許多強大的特性&#xff0c;例如連接池、透明的 GZIP 壓縮、HTTP/2 支持等。理解 OkHttp 的實現原理有助于更好地使用和調試它。 以下是 OkHttp 的一些核心實現原理&#xff1a; 1…

Netty 實戰篇:構建簡易注冊中心,實現服務發現與調用路由

本文將為前面構建的輕量級 RPC 框架添加“服務注冊與發現”功能&#xff0c;支持多服務節點動態上線、自動感知與調用路由&#xff0c;為構建真正可擴展的分布式系統打好基礎。 一、背景&#xff1a;為什么需要注冊中心&#xff1f; 如果每個客戶端都硬編碼連接某個 IP/端口的…

c++之分支

深入理解 C 分支結構&#xff1a;從基礎到實戰 在 C 編程的世界里&#xff0c;分支結構是控制程序流程的重要手段&#xff0c;它賦予程序 “思考” 和 “選擇” 的能力&#xff0c;讓程序能夠根據不同的條件執行不同的代碼塊。本文將帶大家深入探索 C 分支結構&#xff0c;結合…

LLMs之MCP:如何使用 Gradio 構建 MCP 服務器

LLMs之MCP&#xff1a;如何使用 Gradio 構建 MCP 服務器 導讀&#xff1a;本文詳細介紹了如何使用Gradio構建MCP服務器&#xff0c;包括前提條件、構建方法、關鍵特性和相關資源。通過一個簡單的字母計數示例&#xff0c;演示了如何將Gradio應用轉換為LLM可以使用的工具。Gradi…

ubuntu20.04.5-arm64版安裝robotjs

ubuntu20.04.5arm上使用robotjs #ssh&#xff0c;可選 sudo apt update sudo apt install openssh-server sudo systemctl status ssh sudo systemctl enable ssh sudo systemctl enable --now ssh #防火墻相關&#xff0c;可選 sudo ufw allow ssh sudo ufw allow 2222/tc…

craw4ai 抓取實時信息,與 mt4外行行情結合實時交易,基本面來覺得趨勢方向,搞一個外匯交易策略

結合實時信息抓取、MT4行情數據、基本面分析的外匯交易策略框架&#xff0c;旨在通過多維度數據融合提升交易決策質量&#xff1a;行不行不知道先試試&#xff0c;理論是對的&#xff0c;只要基本面方向沒錯 策略名稱&#xff1a;Tri-Sync 外匯交易系統 核心理念 「基本面定方…

Python中scapy庫詳細使用(強大的交互式數據包操作程序和庫)

更多內容請見: 爬蟲和逆向教程-專欄介紹和目錄 文章目錄 一、scapy概述1.1 scapy介紹1.2 安裝1.3 交互模式1.4 安全注意事項二、基本使用2.1 數據包構造基礎2.2 數據包發送2.3 數據包嗅探2.4 數據包分析與操作2.5 網絡掃描技術2.6 協議實現示例三、高級功能3.1 數據包重放3.2 …

基于Web的瀕危野生動物保護信息管理系統設計(源碼+定制+開發)瀕危野生動物監測與保護平臺開發 面向公眾參與的野生動物保護與預警信息系統

博主介紹&#xff1a; ?我是阿龍&#xff0c;一名專注于Java技術領域的程序員&#xff0c;全網擁有10W粉絲。作為CSDN特邀作者、博客專家、新星計劃導師&#xff0c;我在計算機畢業設計開發方面積累了豐富的經驗。同時&#xff0c;我也是掘金、華為云、阿里云、InfoQ等平臺…

[SAP] 矩陣復制(Matrix Copy)

SAP中的復制粘貼功能被稱為矩陣復制&#xff0c;通過點擊對話框或屏幕&#xff0c;并執行下述命令&#xff0c;使用矩陣復制就可以復制多行文本 ① 按下Ctrl-Y&#xff0c;從左上到右下拖拉鼠標來選擇文本 ② 文本高亮顯示后&#xff0c;按下Ctrl-C ③ 移到新的位置插入文本…

【筆記】在 MSYS2(MINGW64)中安裝 Python 工具鏈的記錄

#工作記錄 &#x1f4cc; 安裝背景 操作系統&#xff1a;MSYS2 MINGW64當前時間&#xff1a;2025年6月1日Python 版本&#xff1a;3.12&#xff08;默認通過 pacman 安裝&#xff09;目標工具鏈&#xff1a; pipxnumpypipsetuptoolswheel &#x1f6e0;? 安裝過程與結果記錄…

OpenCV CUDA模塊結構分析與形狀描述符------在 GPU 上計算圖像的原始矩(spatial moments)函數spatialMoments()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 該函數用于在 GPU 上計算圖像的原始矩&#xff08;spatial moments&#xff09;。這些矩可用于描述圖像中物體的形狀特征&#xff0c;如面積、質…

Nacos實戰——動態 IP 黑名單過濾

1、需求分析 一些惡意用戶&#xff08;?可能是黑客、爬蟲、DDoS ?攻擊者&#xff09;可能頻繁請求服務器資?源&#xff0c;導致資源占用過高。針對這種問題&#xff0c;可以通過IP? 封禁&#xff0c;可以有效拉?黑攻擊者&#xff0c;防止資源?被濫用&#xff0c;保障合法…

opencv + jpeg_turbo(啟用SIMD加速)

背景 opencv的imreadimwrite耗時過大 一張5M的圖片讀用了140ms,寫一張1.7M的圖片用149ms 平臺&#xff1a;mingw64編譯Windows程序版本&#xff1a;opencv4.5.4 加速方案 opencv啟用openmpopencv啟用jpeg_turbojpeg_turbo啟動SIMD加速 下載jpeg_turbo源碼 opencv源碼自帶…

Redis 主從節點

Redis 主從節點的核心區別 特性主節點 (Master)從節點 (Slave/Replica)讀寫權限可讀可寫只讀&#xff08;默認配置&#xff09;數據流向數據來源從主節點同步數據連接關系可連接多個從節點只能連接一個主節點故障切換故障時需要手動/自動提升從節點可被提升為新的主節點命令執…

汽車安全:功能安全FuSa、預期功能安全SOTIF與網絡安全Cybersecurity 解析

汽車安全的三重防線&#xff1a;深入解析FuSa、SOTIF與網絡安全技術 現代汽車已成為裝有數千個傳感器的移動計算機&#xff0c;安全挑戰比傳統車輛復雜百倍。 隨著汽車智能化、網聯化飛速發展&#xff0c;汽車電子電氣架構已從簡單的分布式控制系統演變為復雜的移動計算平臺。現…

github好玩的工具

以下是 GitHub 上一些有趣且實用的開源工具推薦,涵蓋 AI 應用、效率提升、趣味開發等方向,結合最新趨勢和項目熱度整理: 一、AI 與深度偽造工具 Deep-Live-Cam 僅需一張圖片即可在視頻直播中實時替換人臉,適用于內容創作和虛擬角色開發,支持多平臺硬件運行(如 NVIDIA CUD…

Python應用for循環臨時變量作用域

大家好!如果你剛開始學習Python&#xff0c;可能會對for循環中臨時變量的作用域感到好奇。下面通過一個簡單的練習&#xff0c;幫助你理解這個概念。 代碼呈現: i 0 for i in range(5):print(i)print(i)代碼介紹: 首先我們初始化變量i 0然后進入for循環&#xff0c;這里i成為…

深度學習---負樣本訓練

一、負樣本的本質與核心作用 1. 定義與范疇 負樣本&#xff08;Negative Sample&#xff09;是與目標樣本&#xff08;正樣本&#xff09;在語義、特征或任務目標上存在顯著差異的樣本。其核心價值在于通過對比學習引導模型學習樣本間的判別性特征&#xff0c;而非僅記憶正樣本…