当前位置:   article > 正文

爬虫工作量由小到大的思维转变---<第三十二章 Scrapy scheduler说明书)>

爬虫工作量由小到大的思维转变---<第三十二章 Scrapy scheduler说明书)>

前言:

因为scrapy-redis和scrapy之间最直接的区别在于调度器;那么,在讲解scrapy-redis之前,我发现自己没有对scrapy的调度器这一块进行过什么总结; 那么这篇 <关于调度器的说明文> 需要写在正式`自定义scrapy-redis`的前面!!

正文:

原版翻译scheduler:

        -包在scrapy/core/scheduler.py 里面,自己可以对着源码看;

  1. from __future__ import annotations
  2. import json
  3. import logging
  4. from abc import abstractmethod
  5. from pathlib import Path
  6. from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar, cast
  7. from twisted.internet.defer import Deferred
  8. from scrapy.crawler import Crawler
  9. from scrapy.dupefilters import BaseDupeFilter
  10. from scrapy.http.request import Request
  11. from scrapy.spiders import Spider
  12. from scrapy.statscollectors import StatsCollector
  13. from scrapy.utils.job import job_dir
  14. from scrapy.utils.misc import create_instance, load_object
  15. if TYPE_CHECKING:
  16. from typing_extensions import Self
  17. logger = logging.getLogger(__name__)
  18. class BaseSchedulerMeta(type):
  19. """
  20. 元类,用于检查调度器类是否符合必要的接口
  21. """
  22. def __instancecheck__(cls, instance: Any) -> bool:
  23. return cls.__subclasscheck__(type(instance))
  24. def __subclasscheck__(cls, subclass: type) -> bool:
  25. return (
  26. hasattr(subclass, "has_pending_requests")
  27. and callable(subclass.has_pending_requests)
  28. and hasattr(subclass, "enqueue_request")
  29. and callable(subclass.enqueue_request)
  30. and hasattr(subclass, "next_request")
  31. and callable(subclass.next_request)
  32. )
  33. class BaseScheduler(metaclass=BaseSchedulerMeta):
  34. """
  35. 调度器组件负责存储从引擎接收到的请求,并在需要时将其发送回引擎。
  36. 请求的原始来源包括:
  37. * Spider: ``start_requests`` 方法、为 ``start_urls`` 属性中的 URL 创建的请求、请求回调
  38. * Spider 中间件: ``process_spider_output`` 和 ``process_spider_exception`` 方法
  39. * Downloader 中间件: ``process_request``、``process_response`` 和 ``process_exception`` 方法
  40. 调度器返回存储的请求的顺序(通过 ``next_request`` 方法),在很大程度上决定了请求的下载顺序。
  41. 此类中定义的方法构成了 Scrapy 引擎与调度器交互的最小接口。
  42. """
  43. @classmethod
  44. def from_crawler(cls, crawler: Crawler) -> Self:
  45. """
  46. 工厂方法,接收当前 :class:`~scrapy.crawler.Crawler` 对象作为参数。
  47. """
  48. return cls()
  49. def open(self, spider: Spider) -> Optional[Deferred]:
  50. """
  51. 当引擎打开爬虫时调用该方法。它接收当前爬虫实例作为参数,
  52. 可用于执行初始化代码。
  53. :param spider: 当前爬虫的对象
  54. :type spider: :class:`~scrapy.spiders.Spider`
  55. """
  56. pass
  57. def close(self, reason: str) -> Optional[Deferred]:
  58. """
  59. 当引擎关闭爬虫时调用该方法。它接收爬取结束的原因作为参数,
  60. 可用于执行清理代码。
  61. :param reason: 描述爬虫关闭原因的字符串
  62. :type reason: :class:`str`
  63. """
  64. pass
  65. @abstractmethod
  66. def has_pending_requests(self) -> bool:
  67. """
  68. 返回``True``如果调度器中有待处理的请求,否则返回``False``。
  69. """
  70. raise NotImplementedError()
  71. @abstractmethod
  72. def enqueue_request(self, request: Request) -> bool:
  73. """
  74. 处理引擎接收到的请求。
  75. 如果请求成功存储,则返回``True``,否则返回``False``。
  76. 如果返回``False``,引擎将触发一个``request_dropped``信号,并且不会尝试以后再次调度该请求。
  77. 默认的 Scrapy 调度器在请求被去重过滤器拒绝时返回``False``。
  78. """
  79. raise NotImplementedError()
  80. @abstractmethod
  81. def next_request(self) -> Optional[Request]:
  82. """
  83. 返回下一个要处理的 :class:`~scrapy.http.Request`,如果当前没有准备好的请求则返回``None``。
  84. 返回``None``意味着当前的 reactor 循环中不会向下载器发送调度器中的请求。
  85. 引擎会不断调用``next_request``直到``has_pending_requests``为``False``。
  86. """
  87. raise NotImplementedError()
  88. SchedulerTV = TypeVar("SchedulerTV", bound="Scheduler")
  89. class Scheduler(BaseScheduler):
  90. """
  91. 默认的 Scrapy 调度器。此实现还通过 :setting:`DUPEFILTER_CLASS` 进行重复过滤。
  92. 该调度器将请求存储在多个优先级队列中(由 :setting:`SCHEDULER_PRIORITY_QUEUE` 设置定义)。
  93. 这些优先级队列由内存队列或磁盘队列(分别由 :setting:`SCHEDULER_MEMORY_QUEUE` 和 :setting:`SCHEDULER_DISK_QUEUE` 设置定义)支持。
  94. 请求的优先级几乎完全委托给优先级队列。此调度器执行的唯一优先级处理是,如果存在磁盘队列(即如果定义了 :setting:`JOBDIR` 设置),
  95. 则使用磁盘队列;如果序列化错误发生,则回退到使用内存队列。如果不存在磁盘队列,则直接使用内存队列。
  96. :param dupefilter: 负责检查和过滤重复请求的对象。
  97. 默认情况下使用 :setting:`DUPEFILTER_CLASS` 设置的值。
  98. :type dupefilter: :class:`scrapy.dupefilters.BaseDupeFilter` 实例或类似对象:
  99. 任何实现了 `BaseDupeFilter` 接口的类
  100. :param jobdir: 用于持久化爬取状态的目录路径。
  101. 默认使用 :setting:`JOBDIR` 设置的值。
  102. 有关详细信息,请参阅 :ref:`topics-jobs`。
  103. :type jobdir: :class:`str` 或 ``None``
  104. :param dqclass: 用作持久请求队列的类。
  105. 默认使用 :setting:`SCHEDULER_DISK_QUEUE` 设置的值。
  106. :type dqclass: 类
  107. :param mqclass: 用作非持久请求队列的类。
  108. 默认使用 :setting:`SCHEDULER_MEMORY_QUEUE` 设置的值。
  109. :type mqclass: 类
  110. :param logunser: 一个布尔值,指示是否应记录无法序列化的请求。
  111. 默认使用 :setting:`SCHEDULER_DEBUG` 设置的值。
  112. :type logunser: bool
  113. :param stats: 用于记录请求调度过程统计数据的统计收集器对象。
  114. 默认使用 :setting:`STATS_CLASS` 设置的值。
  115. :type stats: :class:`scrapy.statscollectors.StatsCollector` 实例或类似对象:
  116. 任何实现了 `StatsCollector` 接口的类
  117. :param pqclass: 用作请求优先级队列的类。
  118. 默认使用 :setting:`SCHEDULER_PRIORITY_QUEUE` 设置的值。
  119. :type pqclass: 类
  120. :param crawler: 当前爬虫对应的爬虫对象。
  121. :type crawler: :class:`scrapy.crawler.Crawler`
  122. """
  123. def __init__(
  124. self,
  125. dupefilter: BaseDupeFilter,
  126. jobdir: Optional[str] = None,
  127. dqclass=None,
  128. mqclass=None,
  129. logunser: bool = False,
  130. stats: Optional[StatsCollector] = None,
  131. pqclass=None,
  132. crawler: Optional[Crawler] = None,
  133. ):
  134. self.df: BaseDupeFilter = dupefilter
  135. self.dqdir: Optional[str] = self._dqdir(jobdir)
  136. self.pqclass = pqclass
  137. self.dqclass = dqclass
  138. self.mqclass = mqclass
  139. self.logunser: bool = logunser
  140. self.stats: Optional[StatsCollector] = stats
  141. self.crawler: Optional[Crawler] = crawler
  142. @classmethod
  143. def from_crawler(cls: Type[SchedulerTV], crawler: Crawler) -> SchedulerTV:
  144. """
  145. 工厂方法,使用来自爬虫设置的参数初始化调度器。
  146. """
  147. dupefilter_cls = load_object(crawler.settings["DUPEFILTER_CLASS"])
  148. return cls(
  149. dupefilter=create_instance(dupefilter_cls, crawler.settings, crawler),
  150. jobdir=job_dir(crawler.settings),
  151. dqclass=load_object(crawler.settings["SCHEDULER_DISK_QUEUE"]),
  152. mqclass=load_object(crawler.settings["SCHEDULER_MEMORY_QUEUE"]),
  153. logunser=crawler.settings.getbool("SCHEDULER_DEBUG"),
  154. stats=crawler.stats,
  155. pqclass=load_object(crawler.settings["SCHEDULER_PRIORITY_QUEUE"]),
  156. crawler=crawler,
  157. )
  158. def has_pending_requests(self) -> bool:
  159. return len(self) > 0
  160. def open(self, spider: Spider) -> Optional[Deferred]:
  161. """
  162. 1. 初始化内存队列
  163. 2. 初始化磁盘队列(如果 ``jobdir`` 属性是一个有效的目录)
  164. 3. 返回去重过滤器的 ``open`` 方法的结果
  165. """
  166. self.spider = spider
  167. self.mqs = self._mq()
  168. self.dqs = self._dq() if self.dqdir else None
  169. return self.df.open()
  170. def close(self, reason: str) -> Optional[Deferred]:
  171. """
  172. 1. 如果存在磁盘队列,将挂起的请求保存到磁盘
  173. 2. 返回去重过滤器的 ``close`` 方法的结果
  174. """
  175. if self.dqs is not None:
  176. state = self.dqs.close()
  177. assert isinstance(self.dqdir, str)
  178. self._write_dqs_state(self.dqdir, state)
  179. return self.df.close(reason)
  180. def enqueue_request(self, request: Request) -> bool:
  181. """
  182. 除非请求被去重过滤器过滤掉,否则尝试将请求推送到磁盘队列,
  183. 如果没有磁盘队列则将请求推送到内存队列。
  184. 增加适当的统计数据,如:``scheduler/enqueued``,
  185. ``scheduler/enqueued/disk``, ``scheduler/enqueued/memory``。
  186. 如果请求成功存储,则返回``True``,否则返回``False``。
  187. """
  188. if not request.dont_filter and self.df.request_seen(request):
  189. self.df.log(request, self.spider)
  190. return False
  191. dqok = self._dqpush(request)
  192. assert self.stats is not None
  193. if dqok:
  194. self.stats.inc_value("scheduler/enqueued/disk", spider=self.spider)
  195. else:
  196. self._mqpush(request)
  197. self.stats.inc_value("scheduler/enqueued/memory", spider=self.spider)
  198. self.stats.inc_value("scheduler/enqueued", spider=self.spider)
  199. return True
  200. def next_request(self) -> Optional[Request]:
  201. """
  202. 从内存队列返回一个 :class:`~scrapy.http.Request` 对象,如果内存队列为空,则返回磁盘队列。
  203. 如果没有更多的入队请求,则返回``None``。
  204. 增加适当的统计数据,如:``scheduler/dequeued``,
  205. ``scheduler/dequeued/disk``, ``scheduler/dequeued/memory``。
  206. """
  207. request: Optional[Request] = self.mqs.pop()
  208. assert self.stats is not None
  209. if request is not None:
  210. self.stats.inc_value("scheduler/dequeued/memory", spider=self.spider)
  211. else:
  212. request = self._dqpop()
  213. if request is not None:
  214. self.stats.inc_value("scheduler/dequeued/disk", spider=self.spider)
  215. if request is not None:
  216. self.stats.inc_value("scheduler/dequeued", spider=self.spider)
  217. return request
  218. def __len__(self) -> int:
  219. """
  220. 返回已入队的请求总数
  221. """
  222. return len(self.dqs) + len(self.mqs) if self.dqs is not None else len(self.mqs)
  223. def _dqpush(self, request: Request) -> bool:
  224. if self.dqs is None:
  225. return False
  226. try:
  227. self.dqs.push(request)
  228. except ValueError as e: # 非可序列化的请求
  229. if self.logunser:
  230. msg = (
  231. "无法序列化请求: %(request)s - 原因: "
  232. "%(reason)s - 不再记录更多无法序列化的请求(正在收集统计数据)"
  233. )
  234. logger.warning(
  235. msg,
  236. {"request": request, "reason": e},
  237. exc_info=True,
  238. extra={"spider": self.spider},
  239. )
  240. self.logunser = False
  241. assert self.stats is not None
  242. self.stats.inc_value("scheduler/unserializable", spider=self.spider)
  243. return False
  244. else:
  245. return True
  246. def _mqpush(self, request: Request) -> None:
  247. self.mqs.push(request)
  248. def _dqpop(self) -> Optional[Request]:
  249. if self.dqs is not None:
  250. return self.dqs.pop()
  251. return None
  252. def _mq(self):
  253. """创建一个新的内存存储的优先级队列实例"""
  254. return create_instance(
  255. self.pqclass,
  256. settings=None,
  257. crawler=self.crawler,
  258. downstream_queue_cls=self.mqclass,
  259. key="",
  260. )
  261. def _dq(self):
  262. """创建一个新的磁盘存储的优先级队列实例"""
  263. assert self.dqdir
  264. state = self._read_dqs_state(self.dqdir)
  265. q = create_instance(
  266. self.pqclass,
  267. settings=None,
  268. crawler=self.crawler,
  269. downstream_queue_cls=self.dqclass,
  270. key=self.dqdir,
  271. startprios=state,
  272. )
  273. if q:
  274. logger.info(
  275. "恢复爬取(%(queuesize)d 个请求已安排)",
  276. {"queuesize": len(q)},
  277. extra={"spider": self.spider},
  278. )
  279. return q
  280. def _dqdir(self, jobdir: Optional[str]) -> Optional[str]:
  281. """返回一个目录名称,用于保存磁盘队列的状态"""
  282. if jobdir is not None:
  283. dqdir = Path(jobdir, "requests.queue")
  284. if not dqdir.exists():
  285. dqdir.mkdir(parents=True)
  286. return str(dqdir)
  287. return None
  288. def _read_dqs_state(self, dqdir: str) -> list:
  289. path = Path(dqdir, "active.json")
  290. if not path.exists():
  291. return []
  292. with path.open(encoding="utf-8") as f:
  293. return cast(list, json.load(f))
  294. def _write_dqs_state(self, dqdir: str, state: list) -> None:
  295. with Path(dqdir, "active.json").open("w", encoding="utf-8") as f:
  296. json.dump(state, f)

重点关注:

  1. has_pending_requests(self) -> bool: 判断调度器中是否有待处理的请求。这个方法用于告诉引擎是否还有未处理的请求,以确定是否继续进行请求调度和下载。
  2. enqueue_request(self, request: Request) -> bool: 处理引擎接收到的请求。这个方法将传入的请求存储到调度器中,并返回一个布尔值,指示请求是否成功存储。如果返回值为True,则请求已成功存储;如果返回值为False,则请求被去重过滤器拒绝或无法序列化等原因导致存储失败。
  3. next_request(self) -> Optional[Request]: 返回下一个要处理的请求。这个方法从调度器中获取下一个要处理的请求,并返回一个Request对象。如果调度器没有准备好的请求,将返回None。

-----除了这三个方法,还有一些其他方法和类变量也会影响调度器的行为,但它们的重要性可能不如上述三个方法。要实现一个完整的自定义调度器,还需要考虑其他方法,例如open和close方法用于初始化和清理,__len__方法用于获取当前队列中请求的数量等。

实现自定义调度器:

步骤:

案例假设已经创建好了Scrapy项目并定义了爬虫(Spider)和项目文件结构后.

1. 在项目目录下创建一个新的Python文件,例如`custom_scheduler.py`,用于编写自定义调度器的代码。

2. 在`custom_scheduler.py`文件中,导入必要的模块和类,并创建一个自定义的调度器类,并继承自`BaseScheduler`:

  1. from scrapy.core.scheduler import BaseScheduler
  2. from scrapy.http import Request
  3. class CustomScheduler(BaseScheduler):
  4.     def __init__(self, dupefilter):
  5.         super().__init__(dupefilter)
  6.     def has_pending_requests(self) -> bool:
  7.         # 自定义判断逻辑
  8.         pass
  9.     def enqueue_request(self, request: Request) -> bool:
  10.         # 自定义存储逻辑
  11.         pass
  12.     def next_request(self) -> Optional[Request]:
  13.         # 自定义调度逻辑
  14.         pass

3. 在自定义调度器的方法中,根据自己的需求实现相应的逻辑。例如,可以根据特定的调度算法重新定义`enqueue_request`和`next_request`方法。

5. 在项目的爬虫文件(Spider)中,导入自定义调度器类,并在`custom_settings`中指定使用自定义调度器:

  1. from scrapy.spiders import Spider
  2. from custom_scheduler import CustomScheduler
  3. class MySpider(Spider):
  4.     name = 'example'
  5.     start_urls = ['http://example.com']
  6.     custom_settings = {
  7.         'SCHEDULER': 'custom_scheduler.CustomScheduler',
  8.     }

6. 运行Scrapy爬虫,Scrapy会使用你自定义调度器进行爬取的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/43116
推荐阅读
相关标签
  

闽ICP备14008679号