赞
踩
因为scrapy-redis和scrapy之间最直接的区别在于调度器;那么,在讲解scrapy-redis之前,我发现自己没有对scrapy的调度器这一块进行过什么总结; 那么这篇 <关于调度器的说明文> 需要写在正式`自定义scrapy-redis`的前面!!
-包在scrapy/core/scheduler.py 里面,自己可以对着源码看;
- from __future__ import annotations
-
- import json
- import logging
- from abc import abstractmethod
- from pathlib import Path
- from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar, cast
-
- from twisted.internet.defer import Deferred
-
- from scrapy.crawler import Crawler
- from scrapy.dupefilters import BaseDupeFilter
- from scrapy.http.request import Request
- from scrapy.spiders import Spider
- from scrapy.statscollectors import StatsCollector
- from scrapy.utils.job import job_dir
- from scrapy.utils.misc import create_instance, load_object
-
- if TYPE_CHECKING:
- from typing_extensions import Self
-
-
- logger = logging.getLogger(__name__)
-
-
- class BaseSchedulerMeta(type):
- """
- 元类,用于检查调度器类是否符合必要的接口
- """
-
- def __instancecheck__(cls, instance: Any) -> bool:
- return cls.__subclasscheck__(type(instance))
-
- def __subclasscheck__(cls, subclass: type) -> bool:
- return (
- hasattr(subclass, "has_pending_requests")
- and callable(subclass.has_pending_requests)
- and hasattr(subclass, "enqueue_request")
- and callable(subclass.enqueue_request)
- and hasattr(subclass, "next_request")
- and callable(subclass.next_request)
- )
-
-
- class BaseScheduler(metaclass=BaseSchedulerMeta):
- """
- 调度器组件负责存储从引擎接收到的请求,并在需要时将其发送回引擎。
- 请求的原始来源包括:
- * Spider: ``start_requests`` 方法、为 ``start_urls`` 属性中的 URL 创建的请求、请求回调
- * Spider 中间件: ``process_spider_output`` 和 ``process_spider_exception`` 方法
- * Downloader 中间件: ``process_request``、``process_response`` 和 ``process_exception`` 方法
- 调度器返回存储的请求的顺序(通过 ``next_request`` 方法),在很大程度上决定了请求的下载顺序。
- 此类中定义的方法构成了 Scrapy 引擎与调度器交互的最小接口。
- """
-
- @classmethod
- def from_crawler(cls, crawler: Crawler) -> Self:
- """
- 工厂方法,接收当前 :class:`~scrapy.crawler.Crawler` 对象作为参数。
- """
- return cls()
-
- def open(self, spider: Spider) -> Optional[Deferred]:
- """
- 当引擎打开爬虫时调用该方法。它接收当前爬虫实例作为参数,
- 可用于执行初始化代码。
- :param spider: 当前爬虫的对象
- :type spider: :class:`~scrapy.spiders.Spider`
- """
- pass
-
- def close(self, reason: str) -> Optional[Deferred]:
- """
- 当引擎关闭爬虫时调用该方法。它接收爬取结束的原因作为参数,
- 可用于执行清理代码。
- :param reason: 描述爬虫关闭原因的字符串
- :type reason: :class:`str`
- """
- pass
-
- @abstractmethod
- def has_pending_requests(self) -> bool:
- """
- 返回``True``如果调度器中有待处理的请求,否则返回``False``。
- """
- raise NotImplementedError()
-
- @abstractmethod
- def enqueue_request(self, request: Request) -> bool:
- """
- 处理引擎接收到的请求。
- 如果请求成功存储,则返回``True``,否则返回``False``。
- 如果返回``False``,引擎将触发一个``request_dropped``信号,并且不会尝试以后再次调度该请求。
- 默认的 Scrapy 调度器在请求被去重过滤器拒绝时返回``False``。
- """
- raise NotImplementedError()
-
- @abstractmethod
- def next_request(self) -> Optional[Request]:
- """
- 返回下一个要处理的 :class:`~scrapy.http.Request`,如果当前没有准备好的请求则返回``None``。
- 返回``None``意味着当前的 reactor 循环中不会向下载器发送调度器中的请求。
- 引擎会不断调用``next_request``直到``has_pending_requests``为``False``。
- """
- raise NotImplementedError()
-
-
- SchedulerTV = TypeVar("SchedulerTV", bound="Scheduler")
-
-
- class Scheduler(BaseScheduler):
- """
- 默认的 Scrapy 调度器。此实现还通过 :setting:`DUPEFILTER_CLASS` 进行重复过滤。
- 该调度器将请求存储在多个优先级队列中(由 :setting:`SCHEDULER_PRIORITY_QUEUE` 设置定义)。
- 这些优先级队列由内存队列或磁盘队列(分别由 :setting:`SCHEDULER_MEMORY_QUEUE` 和 :setting:`SCHEDULER_DISK_QUEUE` 设置定义)支持。
- 请求的优先级几乎完全委托给优先级队列。此调度器执行的唯一优先级处理是,如果存在磁盘队列(即如果定义了 :setting:`JOBDIR` 设置),
- 则使用磁盘队列;如果序列化错误发生,则回退到使用内存队列。如果不存在磁盘队列,则直接使用内存队列。
- :param dupefilter: 负责检查和过滤重复请求的对象。
- 默认情况下使用 :setting:`DUPEFILTER_CLASS` 设置的值。
- :type dupefilter: :class:`scrapy.dupefilters.BaseDupeFilter` 实例或类似对象:
- 任何实现了 `BaseDupeFilter` 接口的类
- :param jobdir: 用于持久化爬取状态的目录路径。
- 默认使用 :setting:`JOBDIR` 设置的值。
- 有关详细信息,请参阅 :ref:`topics-jobs`。
- :type jobdir: :class:`str` 或 ``None``
- :param dqclass: 用作持久请求队列的类。
- 默认使用 :setting:`SCHEDULER_DISK_QUEUE` 设置的值。
- :type dqclass: 类
- :param mqclass: 用作非持久请求队列的类。
- 默认使用 :setting:`SCHEDULER_MEMORY_QUEUE` 设置的值。
- :type mqclass: 类
- :param logunser: 一个布尔值,指示是否应记录无法序列化的请求。
- 默认使用 :setting:`SCHEDULER_DEBUG` 设置的值。
- :type logunser: bool
- :param stats: 用于记录请求调度过程统计数据的统计收集器对象。
- 默认使用 :setting:`STATS_CLASS` 设置的值。
- :type stats: :class:`scrapy.statscollectors.StatsCollector` 实例或类似对象:
- 任何实现了 `StatsCollector` 接口的类
- :param pqclass: 用作请求优先级队列的类。
- 默认使用 :setting:`SCHEDULER_PRIORITY_QUEUE` 设置的值。
- :type pqclass: 类
- :param crawler: 当前爬虫对应的爬虫对象。
- :type crawler: :class:`scrapy.crawler.Crawler`
- """
-
- def __init__(
- self,
- dupefilter: BaseDupeFilter,
- jobdir: Optional[str] = None,
- dqclass=None,
- mqclass=None,
- logunser: bool = False,
- stats: Optional[StatsCollector] = None,
- pqclass=None,
- crawler: Optional[Crawler] = None,
- ):
- self.df: BaseDupeFilter = dupefilter
- self.dqdir: Optional[str] = self._dqdir(jobdir)
- self.pqclass = pqclass
- self.dqclass = dqclass
- self.mqclass = mqclass
- self.logunser: bool = logunser
- self.stats: Optional[StatsCollector] = stats
- self.crawler: Optional[Crawler] = crawler
-
- @classmethod
- def from_crawler(cls: Type[SchedulerTV], crawler: Crawler) -> SchedulerTV:
- """
- 工厂方法,使用来自爬虫设置的参数初始化调度器。
- """
- dupefilter_cls = load_object(crawler.settings["DUPEFILTER_CLASS"])
- return cls(
- dupefilter=create_instance(dupefilter_cls, crawler.settings, crawler),
- jobdir=job_dir(crawler.settings),
- dqclass=load_object(crawler.settings["SCHEDULER_DISK_QUEUE"]),
- mqclass=load_object(crawler.settings["SCHEDULER_MEMORY_QUEUE"]),
- logunser=crawler.settings.getbool("SCHEDULER_DEBUG"),
- stats=crawler.stats,
- pqclass=load_object(crawler.settings["SCHEDULER_PRIORITY_QUEUE"]),
- crawler=crawler,
- )
-
- def has_pending_requests(self) -> bool:
- return len(self) > 0
-
- def open(self, spider: Spider) -> Optional[Deferred]:
- """
- 1. 初始化内存队列
- 2. 初始化磁盘队列(如果 ``jobdir`` 属性是一个有效的目录)
- 3. 返回去重过滤器的 ``open`` 方法的结果
- """
- self.spider = spider
- self.mqs = self._mq()
- self.dqs = self._dq() if self.dqdir else None
- return self.df.open()
-
- def close(self, reason: str) -> Optional[Deferred]:
- """
- 1. 如果存在磁盘队列,将挂起的请求保存到磁盘
- 2. 返回去重过滤器的 ``close`` 方法的结果
- """
- if self.dqs is not None:
- state = self.dqs.close()
- assert isinstance(self.dqdir, str)
- self._write_dqs_state(self.dqdir, state)
- return self.df.close(reason)
-
- def enqueue_request(self, request: Request) -> bool:
- """
- 除非请求被去重过滤器过滤掉,否则尝试将请求推送到磁盘队列,
- 如果没有磁盘队列则将请求推送到内存队列。
- 增加适当的统计数据,如:``scheduler/enqueued``,
- ``scheduler/enqueued/disk``, ``scheduler/enqueued/memory``。
- 如果请求成功存储,则返回``True``,否则返回``False``。
- """
- if not request.dont_filter and self.df.request_seen(request):
- self.df.log(request, self.spider)
- return False
- dqok = self._dqpush(request)
- assert self.stats is not None
- if dqok:
- self.stats.inc_value("scheduler/enqueued/disk", spider=self.spider)
- else:
- self._mqpush(request)
- self.stats.inc_value("scheduler/enqueued/memory", spider=self.spider)
- self.stats.inc_value("scheduler/enqueued", spider=self.spider)
- return True
-
- def next_request(self) -> Optional[Request]:
- """
- 从内存队列返回一个 :class:`~scrapy.http.Request` 对象,如果内存队列为空,则返回磁盘队列。
- 如果没有更多的入队请求,则返回``None``。
- 增加适当的统计数据,如:``scheduler/dequeued``,
- ``scheduler/dequeued/disk``, ``scheduler/dequeued/memory``。
- """
- request: Optional[Request] = self.mqs.pop()
- assert self.stats is not None
- if request is not None:
- self.stats.inc_value("scheduler/dequeued/memory", spider=self.spider)
- else:
- request = self._dqpop()
- if request is not None:
- self.stats.inc_value("scheduler/dequeued/disk", spider=self.spider)
- if request is not None:
- self.stats.inc_value("scheduler/dequeued", spider=self.spider)
- return request
-
- def __len__(self) -> int:
- """
- 返回已入队的请求总数
- """
- return len(self.dqs) + len(self.mqs) if self.dqs is not None else len(self.mqs)
-
- def _dqpush(self, request: Request) -> bool:
- if self.dqs is None:
- return False
- try:
- self.dqs.push(request)
- except ValueError as e: # 非可序列化的请求
- if self.logunser:
- msg = (
- "无法序列化请求: %(request)s - 原因: "
- "%(reason)s - 不再记录更多无法序列化的请求(正在收集统计数据)"
- )
- logger.warning(
- msg,
- {"request": request, "reason": e},
- exc_info=True,
- extra={"spider": self.spider},
- )
- self.logunser = False
- assert self.stats is not None
- self.stats.inc_value("scheduler/unserializable", spider=self.spider)
- return False
- else:
- return True
-
- def _mqpush(self, request: Request) -> None:
- self.mqs.push(request)
-
- def _dqpop(self) -> Optional[Request]:
- if self.dqs is not None:
- return self.dqs.pop()
- return None
-
- def _mq(self):
- """创建一个新的内存存储的优先级队列实例"""
- return create_instance(
- self.pqclass,
- settings=None,
- crawler=self.crawler,
- downstream_queue_cls=self.mqclass,
- key="",
- )
-
- def _dq(self):
- """创建一个新的磁盘存储的优先级队列实例"""
- assert self.dqdir
- state = self._read_dqs_state(self.dqdir)
- q = create_instance(
- self.pqclass,
- settings=None,
- crawler=self.crawler,
- downstream_queue_cls=self.dqclass,
- key=self.dqdir,
- startprios=state,
- )
- if q:
- logger.info(
- "恢复爬取(%(queuesize)d 个请求已安排)",
- {"queuesize": len(q)},
- extra={"spider": self.spider},
- )
- return q
-
- def _dqdir(self, jobdir: Optional[str]) -> Optional[str]:
- """返回一个目录名称,用于保存磁盘队列的状态"""
- if jobdir is not None:
- dqdir = Path(jobdir, "requests.queue")
- if not dqdir.exists():
- dqdir.mkdir(parents=True)
- return str(dqdir)
- return None
-
- def _read_dqs_state(self, dqdir: str) -> list:
- path = Path(dqdir, "active.json")
- if not path.exists():
- return []
- with path.open(encoding="utf-8") as f:
- return cast(list, json.load(f))
-
- def _write_dqs_state(self, dqdir: str, state: list) -> None:
- with Path(dqdir, "active.json").open("w", encoding="utf-8") as f:
- json.dump(state, f)

-----除了这三个方法,还有一些其他方法和类变量也会影响调度器的行为,但它们的重要性可能不如上述三个方法。要实现一个完整的自定义调度器,还需要考虑其他方法,例如open和close方法用于初始化和清理,__len__方法用于获取当前队列中请求的数量等。
案例假设已经创建好了Scrapy项目并定义了爬虫(Spider)和项目文件结构后.
- from scrapy.core.scheduler import BaseScheduler
- from scrapy.http import Request
-
- class CustomScheduler(BaseScheduler):
- def __init__(self, dupefilter):
- super().__init__(dupefilter)
-
- def has_pending_requests(self) -> bool:
- # 自定义判断逻辑
- pass
-
- def enqueue_request(self, request: Request) -> bool:
- # 自定义存储逻辑
- pass
-
- def next_request(self) -> Optional[Request]:
- # 自定义调度逻辑
- pass

- from scrapy.spiders import Spider
- from custom_scheduler import CustomScheduler
-
- class MySpider(Spider):
- name = 'example'
- start_urls = ['http://example.com']
-
- custom_settings = {
- 'SCHEDULER': 'custom_scheduler.CustomScheduler',
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。