赞
踩
上一篇:[Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(三) —— 数据的持久化——使用MongoDB存储爬取的数据
最近项目有些忙,很多需求紧急上线,所以一直没能完善《 使用 Scrapy 爬取新浪微博用户信息》这一系列的博客,今天好不容易闲下来,就完成这一系列最后一节:选取 User-Agent、添加 IP代理池以及Cookies池。在上一篇博客中,我们介绍了如何对爬取的用户信息进行持久化处理,存入了 MongDB,但是并没有限制爬取速度,导致爬虫程序频繁出现 418 响应码,这是微博反爬的一种策略,这一篇博客我们就来介绍如何应对目标网页的反爬程序,需要注意的是,微博反爬策略是针对用户的,在只用单用户的情况下,只能降低爬取频率,当然,如果手里有一批账号,可以采用多账号的Cookies池,当出现 418 请求时,就切换 Cookies,由于我目前只有一个账号,因此我只能通过降低爬取频率来应对新浪微博的反爬策略,但是我会用单个账号获取多个 Cookies 来模拟多账户情况下的Cookies池。
User Agent 中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器能够识别客户使用的操作系统及版本、CPU 类型、浏览器及版本、浏览器渲染引擎、浏览器语言、浏览器插件等。(数据来自:百度百科 User-Agent)
User-Agent 可以通过浏览器调试模式,然后选择 Network,任意查看一个连接,就能找到,如下图:

在这里我们通过上网查找了几个 User-Agent,添加到 settings.py 文件内,代码如下:
- # User-Agent 列表,提供随机 User-Agent
- USER_AGENT_LIST = [
- "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
- "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
- "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
- "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
- "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
- "Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
- "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
- "Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
- "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
- "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
- "Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
- "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
- "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
- "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
- "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11",
- "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER",
- "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)",
- "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E; LBBROWSER)",
- "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.84 Safari/535.11 LBBROWSER",
- "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
- "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400)",
- "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
- "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; 360SE)",
- "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
- "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
- "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
- "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
- "Mozilla/5.0 (iPad; U; CPU OS 4_2_1 like Mac OS X; zh-cn) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8C148 Safari/6533.18.5",
- "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:2.0b13pre) Gecko/20110307 Firefox/4.0b13pre",
- "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:16.0) Gecko/20100101 Firefox/16.0",
- "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11",
- "Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10",
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
- ]

添加了 User-Agent 列表之后,我们在 middleware.py 中使用它,我们在 middleware.py 定义一个名为 RandomUserAgentMiddleware 的类,该类继承了 UserAgentMiddleware 类,在开始构造一个爬虫请求前,会调用 RandomUserAgentMiddleware 类的 from_crawler() 方法,构造请求后,发送请求前,将会执行 RandomUserAgentMiddleware 类 的 from_crawler() 方法,代码如下:
- from scrapy.http.headers import Headers
- from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
-
- class RandomUserAgentMiddleware(UserAgentMiddleware):
- """
- 随机选取 代理(User-Agent)
- """
-
- def __init__(self, user_agent):
- self.user_agent = user_agent
- self.headers = Headers()
-
- @classmethod
- def from_crawler(cls, crawler):
- """
- 开始构造请求前执行的方法\n
- :param crawler:整个爬虫的全局对象\n
- :return:
- """
- # 从配置里获取 用户代理(User-Agent) 列表
- return cls(user_agent=crawler.settings.get('USER_AGENT_LIST'))
-
- def process_request(self, request, spider):
- """
- 发送请求前执行的方法\n
- :param request:请求\n
- :param spider:爬虫应用\n
- :return:
- """
- # 从 代理 列表中随机选取一个 代理
- agent = random.choice(self.user_agent)
- print('当前 User-Agent :', agent)
- self.headers['User-Agent'] = agent
- request.headers = self.headers

目前,我们定义了在中间件(middleware)中定义了随机选取 User-Agent 的类,但是如果要使用该类,还得在 settings.py 中启用该中间件,并设置优先级,代码如下:
- DOWNLOADER_MIDDLEWARES = {
- 'sina_scrapy.middlewares.RandomUserAgentMiddleware': 10
- }
好了,现在打开控制台(Console)使用 scrapy crawl sina_user 命令启动爬虫,可以看到输出如下的日志信息,每次爬虫请求使用的 User-Agent 都是从 USER_AGENT_LIST 中随机获取的,如下图所示:

在反爬技术中,很多目标网页会记录访问者的 ip,并通过计算单位时间内同一 ip 访问网站的次数,如果次数过高,则被视为爬虫程序,然后服务器将拦截该请求。对此,不难想到,采取使用多个 ip 来访问目标网站,可以有效的应对这种反爬机制。对于 ip 资源的来源,目前网上有很多代理,提供了很多可用的 ip,例如:西刺代理、快代理等。需要注意的是,代理网站提供的 ip 是有时效的,因此,我们需要动态地获取代理 ip,在这里,我只是采用最简单地爬虫爬取代理网站的 ip,我们并不能保证所有获取的 ip 的都是可用的,因此还需要增加校验机制,确定获取的 ip 是否有效,最直接的办法就是利用 ping 命令,去 ping ip,看该 ip 是否能够 ping 通,为此,我们新建一个包,名为 utils,然后在该包下新建 crawl_proxy.py 脚本,脚本如下:
- # -*-* encoding:UTF-8 -*-
- # author : mengy
- # date : 2019/9/1
- # python-version : Python 3.7.0
- # description :
-
- import re, subprocess as sp, time, json
-
- from urllib import request
- from bs4 import BeautifulSoup
- from sina_scrapy.utils.cache_utils import Cache
- from sina_scrapy.utils.thread_pool import ThreadPool
-
- executor = ThreadPool()
-
- cache = Cache()
-
- # 西刺代理 URL
- PROXY_IP_XICI_URL = 'https://www.xicidaili.com/nn/%s'
- # 快代理 URL
- PROXY_IP_QUICK_URL = 'https://www.kuaidaili.com/free/inha/%s/'
- # 模拟请求头
- PROXY_IP_XICI_HEADERS = {
- 'Host': 'www.xicidaili.com',
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36',
- 'Accept - Encoding': 'gzip, deflate, br',
- 'Accept - Language': 'zh - CN, zh;q = 0.9, en;q = 0.8',
- 'Cookie': '_free_proxy_session = BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJTBhMGNlZjVlYjdjNDU5NjY3ZDNlOGU0YmQ4NTU0OTBhBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMVZpMzIrOVV3aFp5cnJXR3hTVUtFRy9ud0MxMGtyY2R3WjJzMjltSFNSeEE9BjsARg % 3D % 3D - -55779e702f4e95b04fa84eafbb70ccb4006cd839;Hm_lvt_0cf76c77469e965d2957f0553e6ecf59 = 1558427855, 1558427893, 1558427898, 1558427901;Hm_lpvt_0cf76c77469e965d2957f0553e6ecf59 = 1558428119'
- }
- PROXY_IP_QUICK_HEADERS = {
- 'Host': 'www.kuaidaili.com',
- 'Connection': ' keep-alive',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
- 'Cookie': 'channelid=bdtg_a10_a10a1; sid=1559283308913843; _ga=GA1.2.594886518.1559283655; _gid=GA1.2.578719903.1559283655; Hm_lvt_7ed65b1cc4b810e9fd37959c9bb51b31=1559283656; Hm_lpvt_7ed65b1cc4b810e9fd37959c9bb51b31=1559283719'
- }
- # 代理ip列表在缓存中的命名
- PROXY_IP_NAMESPACE = 'POOL_PROXY_IPS'
- # 缓存中代理 ip 失效时间(s)
- PROXY_IP_EXPIRE = 15 * 60
-
- # ping ip 最高丢包率(%)
- MAX_LOST = 75
- # ping ip 最大延迟时间(ms)
- MAX_TIMEOUT = 1000
-
-
- def get_ips(pages=1, refresh=False):
- """
- 获取代理ip,优先从缓存取,如果缓存为空,则爬取新的代理 ip,并更新缓存\n
- :param refresh: 是否强制爬取\n
- :return:
- """
- if refresh:
- return crawl_quick(pages)
- else:
- # 从缓存中查询代理ip
- data = cache.lrange(name=PROXY_IP_NAMESPACE, start=0, end=100)
- if not data:
- print(u'缓存数据为空!开始爬取高匿代理ip')
- return crawl_quick(pages)
- else:
- return data
-
-
- def sub_thread(ip_info):
- """
- 校验 ip 是否连通\n
- :param ip_info:
- :return:
- """
- if check_ip(ip_info.get('ip')):
- # 将可用的 ip 放入缓存
- cache.lpush(PROXY_IP_NAMESPACE, json.dumps(ip_info))
- # 如果 ip 可用,则返回 ip 的信息
- return json.dumps(ip_info)
- else:
- return None
-
- def crawl_quick(page=1):
- """
- 请求 快代理 爬取高匿代理 ip\n
- :param page:
- :return:
- """
- print(u'请求 快代理 爬取高匿代理 ip')
- assert 1 <= page <= 10, '页数有效范围为(1 - 10)'
- validate_ips = []
- for i in range(page):
- req = request.Request(url=PROXY_IP_QUICK_URL % str(i + 1), headers=PROXY_IP_QUICK_HEADERS)
- response = request.urlopen(req)
- if response.status == 200:
- # 解析页面元素
- soap = BeautifulSoup(str(response.read(), encoding='utf-8'), 'lxml')
- ip_table = soap.select('#list > table > tbody > tr')
- ips = []
- # 获取当前页的所有 ip 信息
- for data in ip_table:
- item = data.text.split('\n')
- info = {}
- ip, port, area, proxy_type, protocol, alive_time, check_time = item[1], item[2], item[5], item[3], item[
- 4], '', item[7]
- url = str.lower(protocol) + "://" + ip + ":" + port
- # 将 ip 信息封装成字典
- info.update(ip=ip, port=port, area=area, type=proxy_type, protocol=protocol, alive_time=alive_time,
- check_time=check_time, url=url, add_time=int(time.time()))
- ips.append(info)
- # 遍历爬取的 ip 信息,校验 ip 是否连通
- tasks = [executor.submit(sub_thread, (ip_info)) for ip_info in ips]
-
- # 轮询所有完成的线程,查询线程的执行结果
- for task in executor.completed_tasks(tasks):
- data = task.result()
- if data:
- # 将线程执行结果返回
- validate_ips.append(data)
- # 降低爬取频率
- time.sleep(2.5)
- # 当还没有子线程返回可用的 ip 时,再次查询缓存
- if not validate_ips:
- validate_ips = cache.lrange(name=PROXY_IP_NAMESPACE, start=0, end=100)
- # 设置缓存超时时间
- cache.expire(name=PROXY_IP_NAMESPACE, time=PROXY_IP_EXPIRE)
- print(u'本次爬取 ip :%d 条,有效:%d 条' % (15 * page, len(validate_ips)))
- return validate_ips
-
-
- def check_ip(ip):
- """
- 通过 ping ip 来验证 ip 是否有效\n
- :param ip: 待 ping 的 ip
- :return:
- """
- assert ip, 'ip 不能为空!'
- # CMD 命令(windows)
- cmd = 'ping -n 4 -w 4 %s' % ip
- # 参数 shell 设为 true,程序将通过 shell 来执行,subprocess.PIPE 可以初始化 stdin , stdout 或 stderr 参数。表示与子进程通信的标准流
- p = sp.Popen(cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=True)
- out = p.stdout.read().decode('gbk')
- # 丢失率
- lost_ratio = re.compile(u'(\d+)% 丢失', re.IGNORECASE).findall(out)
- # 平均耗时
- avg_time = re.compile(u'平均 = (\d+)', re.IGNORECASE).findall(out)
- # 如果失败率高于最高丢包率则丢弃
- if lost_ratio[0] and int(lost_ratio[0]) > MAX_LOST:
- print('%s 失败率过高!丢弃' % ip)
- return False
- # 如果响应时间高于最大延迟时间则丢弃
- if avg_time and int(avg_time[0]) > MAX_TIMEOUT:
- print('%s 响应时间过长,网络不稳定,丢弃' % ip)
- return False
- return True

在以上脚本中,一次性从代理网站爬取了一页的 ip,每页 15 条数据,然后将爬取的 ip 存入缓存 Redis 中,并且我们设置了休眠时间为 2.5s ,避免爬取过快,被目标服务器拦截。另外,增加了线程池,用子线程来校验 ip 是否可用,并设定丢弃丢包率大于 75% 的ip。缓存工具类 cache_utils.py 以及线程池工具类 thread_pool,都在 utils 包下,完整代码如下:
cache_utils
- # -*-* encoding:UTF-8 -*-
- # author : mengy
- # date : 2019/5/21
- # python-version : Python 3.7.0
- # description : Redis 缓存相关操作
-
- import redis
-
- # Redis 主机地址
- CACHE_HOST = '127.0.0.1'
- # Redis 端口
- CACHE_PORT = '6379'
- # 设置写入的键值对中的value为str类型
- CACHE_DECODE_RESPONSES = True
-
-
- class Cache(object):
- __pool = redis.ConnectionPool(host=CACHE_HOST, port=CACHE_PORT, decode_responses=CACHE_DECODE_RESPONSES)
-
- def __init__(self):
- self.__redis = redis.Redis(connection_pool=self.__pool)
-
- def delete(self, *names):
- """
- 根据name删除redis中的任意数据类型\n
- :param names: key或者命名空间
- :return:
- """
- self.__redis.delete(*names)
-
- def exists(self, name):
- """
- 检测redis的name是否存在\n
- :param name: key或者命名空间
- :return:
- """
- return self.__redis.exists(name)
-
- def keys(self, pattern='*'):
- """
- 根据* ?等通配符匹配获取redis的name\n
- :param pattern: 通配符
- :return:
- """
- return self.__redis.keys(pattern)
-
- def expire(self, name, time):
- """
- 为某个name设置超时时间\n
- :param name: key或者命名空间\n
- :param time: 超时时间(s)
- :return:
- """
- if not self.exists(name):
- raise Exception(name + ' 不存在')
- self.__redis.expire(name, time)
-
- def type(self, name):
- """
- 获取name对应值的类型\n
- :param name: key或者命名空间\n
- :return:
- """
- return self.__redis.type(name)
-
- def rename(self, src, dst):
- """
- 重命名key或者命名空间\n\n
- :param src: 原key或者命名空间\n
- :param dst: 修改后的key或者命名空间\n
- :return:
- """
- if self.exists(dst):
- raise Exception(dst + ' 已存在')
- if not self.exists(src):
- raise Exception(src + ' 不存在')
- self.__redis.rename(src, dst)
-
- # ------------------------字符串-----------------------------
-
- def get(self, key):
- """
- 获取指定字符串值\n
- :param key:单个键\n
- :return:
- """
- return self.__redis.get(key)
-
- def mget(self, *keys):
- """
- 批量获取指定字符串值\n
- :param keys:多个键\n
- :return:
- """
- return self.__redis.mget(keys)
-
- def set(self, key, value, px=None):
- """
- 字符串设置值 \n
- :param key:键\n
- :param value:值\n
- :param px:过期时间(ms)\n
- :return:
- """
- self.__redis.set(name=key, value=value, px=px)
-
- def mset(self, **map):
- """
- 字符串批量设置值\n
- :param map:批量设置的键值字典\n
- :return:
- """
- self.__redis.mset(mapping=map)
-
- # -------------------------Hash-----------------------------
-
- def hget(self, name, key):
- """
- 在name对应的hash中根据key获取value \n
- :param name: 命名空间
- :param key: 命名空间下对应的键
- :return:
- """
- return self.__redis.hget(name=name, key=key)
-
- def hmget(self, name, *keys):
- """
- 在name对应的hash中获取多个key的值\n
- :param name: 命名空间\n
- :param keys: 命名空间下的多个键
- :return:
- """
- return self.__redis.hmget(name=name, keys=keys)
-
- def hgetall(self, name):
- """
- 获取name对应hash的所有键值 \n
- :param name:命名空间 \n
- :return:
- """
- return self.__redis.hgetall(name=name)
-
- def hset(self, name, key, value):
- """
- name对应的hash中设置一个键值对(不存在,则创建,否则,修改)\n
- :param name: 命名空间
- :param key: 命名空间下对应的键
- :param value: 命名空间下对应的值
- :return:
- """
- self.__redis.hset(name=name, key=key, value=value)
-
- def hmset(self, name, **map):
- """
- 在name对应的hash中批量设置键值对\n
- :param name:命名空间\n
- :param map:键值对\n
- :return:
- """
- self.__redis.hmset(name=name, mapping=map)
-
- def hexists(self, name, key):
- """
- 检查name对应的hash是否存在当前传入的key\n
- :param name: 命名空间\n
- :param key: 命名空间下对应的键
- :return:
- """
- return self.__redis.hexists(name=name, key=key)
-
- def hdel(self, name, keys):
- """
- 批量删除指定name对应的key所在的键值对\n
- :param name:命名空间\n
- :param keys:要删除的键\n
- :return:
- """
- self.__redis.hdel(name, keys)
-
- # -------------------------List-----------------------------
-
- def lpush(self, name, *values, left=True):
- """
- 在name对应的list中添加元素,每个新的元素都添加到列表的最左边\n
- :param name: 命名空间
- :param values: 值
- :param left: 是否添加到列表的最左边,True:最左边,False:最右边,默认为True
- :return:
- """
- if left:
- self.__redis.lpush(name, *values)
- else:
- self.__redis.rpush(name, *values)
-
- def lset(self, name, index, value):
- """
- 对list中的某一个索引位置重新赋值\n
- :param name: 命名空间
- :param index: 索引位置
- :param value: 要插入的值
- :return:
- """
- self.__redis.lset(name=name, index=index, value=value)
-
- def lrem(self, name, count, value):
- """
- 删除name对应的list中的指定值\n
- :param name:命名空间\n
- :param count:num=0 删除列表中所有的指定值;num=2 从前到后,删除2个;num=-2 从后向前,删除2个
- :param value:要删除的值
- :return:
- """
- self.__redis.lrem(name=name, count=count, value=value)
-
- def lpop(self, name):
- """
- 移除列表的左侧第一个元素,返回值则是第一个元素\n
- :param name: 命名空间\n
- :return: 第一个元素
- """
- return self.__redis.lpop(name=name)
-
- def lindex(self, name, index):
- """
- 根据索引获取列表内元素\n
- :param name: 命名空间\n
- :param index: 索引位置
- :return:
- """
- return self.__redis.lindex(name=name, index=index)
-
- def lrange(self, name, start, end):
- """
- 获取指定范围内的元素\n
- :param name: 命名空间\n
- :param start: 起始位置
- :param end: 结束位置
- :return:
- """
- return self.__redis.lrange(name=name, start=start, end=end)
-
- def ltrim(self, name, start, end):
- """
- 移除列表内没有在该索引之内的值\n
- :param name: 命名空间\n
- :param start: 起始位置
- :param end: 结束位置
- :return:
- """
- self.__redis.ltrim(name=name, start=start, end=end)
-
- # -------------------------Set-----------------------------
-
- def sadd(self, name, *values):
- """
- 给name对应的集合中添加元素\n
- :param name:命名空间\n
- :param values:集合
- :return:
- """
- self.__redis.sadd(name, *values)
-
- def smembers(self, name):
- """
- 获取name对应的集合的所有成员\n
- :param name: 命名空间\n
- :return:
- """
- return self.__redis.smembers(name=name)
-
- def sdiff(self, name, *others):
- """
- 在第一个name对应的集合中且不在其他name对应的集合的元素集合,即,name集合对于其他集合的差集\n
- :param name:主集合\n
- :param others:其他集合\n
- :return:
- """
- # print(*others)
- return self.__redis.sdiff(name, *others)
-
- def sinter(self, name, *names):
- """
- 获取多个name对应集合的交集\n
- :param name: 主集合\n
- :param names: 其他集合\n
- :return:
- """
- return self.__redis.sinter(name, *names)
-
- def sunion(self, name, *names):
- """
- 获取多个name对应集合的并集\n
- :param name: 主集合\n
- :param names: 其他集合\n
- :return:
- """
- return self.__redis.sunion(name, *names)
-
- def sismember(self, name, value):
- """
- 检查value是否是name对应的集合内的元素\n
- :param name:命名空间\n
- :param value:待检查的值\n
- :return:
- """
- return self.__redis.sismember(name=name, value=value)
-
- def smove(self, src, dst, value):
- """
- 将某个元素从一个集合中移动到另外一个集合\n
- :param src: 原集合\n
- :param dst: 目标集合\n
- :param value: 待移动的值
- :return:
- """
- self.__redis.smove(src=src, dst=dst, value=value)
-
- def spop(self, name):
- """
- 从集合的右侧移除一个元素,并将其返回\n
- :param name: 命名空间\n
- :return:
- """
- return self.__redis.spop(name=name)
-
- def srem(self, name, *values):
- """
- 删除name对应的集合中的某些值\n
- :param name: 命名空间\n
- :param values: 要删除的值
- :return:
- """
- self.__redis.srem(name, *values)

thread_pool
- # -*-* encoding:UTF-8 -*-
- # author : mengy
- # date : 2019/5/23
- # python-version : Python 3.7.0
- # description : 线程池
-
- from concurrent.futures import ThreadPoolExecutor, as_completed
-
- # 最大线程数
- MAX_WORKERS = 50
-
-
- class ThreadPool:
- __instance = None
-
- def __init__(self):
- self.__executor = ThreadPoolExecutor(MAX_WORKERS)
-
- def __new__(cls, *args, **kwargs):
- """
- 使用单例模式\n
- :param args:
- :param kwargs:
- :return:
- """
- if cls.__instance is None:
- cls.__instance = object.__new__(cls)
- return cls.__instance
-
- def submit(self, func, *args, **kwargs):
- return self.__executor.submit(func, *args, **kwargs)
-
- def batch_submit(self, func, *args, **kwargs):
- return [self.submit(func, *item, **kwargs) for item in args]
-
- @staticmethod
- def completed_tasks(tasks):
- return as_completed(tasks)

至此,我们已经创建了自己的 ip 池,接下来就需要运用到我们的爬虫程序中。在 middleware.py 中新建 IPProxyMiddleware 类,在该类的构造方法中,首先从我们的 ip 池获取一页的 ip ,然后再请求之前,对请求对象 Request 设置代理的 url,然后在请求完成之后,根据目标网页的返回码进行判断,如果请求发生异常,则从 ip 池中随机取出一条 ip ,重新构造 Request,待下一次请求,另外,我们在 settings.py 文件中指定了失败的最大次数为 5 次,意味着,如果一个请求失败超过 5 次,则放弃该请求。该逻辑实现是在 process_response () 方法中。该方法如果返回值是 request,则表示重新将该请求发送到 Scheduler ,该请求将再次被执行;如果返回是 response ,则表示该请求已经完成,不会将对该请求返回的数据进行处理。具体代码如下:
-
- class IPProxyMiddleware(object):
- """
- IP 代理池中间件
- """
-
- def __init__(self):
- # 爬取有效 ip
- self.ip_list = crawl_proxy.get_ips(pages=3)
- # 请求已经失败的次数
- self.retry_time = 0
- self.index = random.randint(0, len(self.ip_list) - 1)
-
- def process_request(self, request, spider):
- """
- 处理将要请求的 Request
- :param request:
- :param spider:
- :return:
- """
- # 失败重试次数
- self.retry_time = 0
- #
- # if len(self.ip_list) < 5:
- # self.ip_list.extend(crawl_proxy.get_ips(refresh=True))
- # 随机选取 ip
- proxy = json.loads(self.ip_list[self.index])
- print('选取的 ip:' + proxy.get('url'))
- # 设置代理
- request.meta['Proxy'] = proxy.get('url')
-
- def process_response(self, request, response, spider):
- """
- 处理返回的 Response
- :param request:
- :param response:
- :param spider:
- :return:
- """
- # 针对4**、和5** 响应码,重新选取 ip
- if re.findall('[45]\d+', str(response.status)):
- print(u'[%s] 响应状态码:%s' % (response.url, response.status))
- if self.retry_time > settings.get('MAX_RETRY', 5):
- return response
- if response.status == 418:
- sec = random.randrange(30, 35)
- print(u'休眠 %s 秒后重试' % sec)
- # time.sleep(sec)
- self.retry_time += 1
- proxy = json.loads(random.choice(self.ip_list))
- print('失败 %s 次后,重新选取的 ip:%s' % (self.retry_time, proxy.get('url')))
- request.meta['Proxy'] = proxy.get('url')
- return request
- return response

最后,不要忘记在 settings.py 中启用 IPProxyMiddleware 中间件:
- DOWNLOADER_MIDDLEWARES = {
- 'sina_scrapy.middlewares.RandomUserAgentMiddleware': 10,
- 'sina_scrapy.middlewares.IPProxyMiddleware': 30,
- }
再次启动爬虫程序,我们可以看到,已经从我们的 ip 池中获取到 ip 并进行请求,当有异常发生时,将会再次从 ip 池中获取 ip,并重新请求。

在本篇博客的开头就提到,新浪微博的反爬机制是针对账号的,但是我只有一个账号,因此这一小节提到的 Cookies 池对于我们 新浪微博爬虫程序来说,也许并不能起到太大的左右,但是如果你有多个微博账号,Cookies 池的方法将能够极大地提高你程序的爬取效率。方法都是一样的。Cookies 池和前面提到的 User-Agent 池、ip 池的原理是一样的,都是获取一组的数据,存入 Redis 中,等到需要使用的时候,在随机从 Redis 中取出数据,进行处理。
我们在 utils 包下新建一个文件 simulate_login.py 用于编写模拟登录新浪微博的脚本,在这里,我分别实现了微博移动网页版模拟登录(https://weibo.cn/)以及新浪微博 PC 网页版(https://weibo.com)。登录的难度不一致,可以根据需要自己选择。其中 PC 网页版的登录逻辑,可以参考《模拟新浪微博登录(Python+RSA加密算法)》这篇博客,里面有详细的分析,在这里就不一一赘述了,只是需要注意相应的 js 版本可能已经过时了。具体实现如下:
- # -*-* encoding:UTF-8 -*-
- # author : mengy
- # date : 2019/6/26
- # python-version : Python 3.7.0
- # description : 模拟登录新浪微博
-
- import base64
- import urllib
- import rsa
- import binascii
- import json
- import re
- import http.cookiejar
- import urllib.request
- from sina_scrapy.utils.cache_utils import Cache
-
- # cookies 在缓存中的有效期(s)
- COOKIES_EXPIRES = 3 * 24 * 60 * 60
-
-
- def urlopen(url, callback=None, data=None, timeout=5):
- """
- 重写 urllib 的 urlopen 方法,该方法能够将 cookies 作为参数传给回调函数\n
- :param url:请求的地址或者 url.request.Request() 对象\n
- :param callback:回调函数\n
- :param data:请求数据\n
- :param timeout:超时时间(s),默认为 5s\n
- :return:
- """
- cookie = http.cookiejar.CookieJar()
- handler = urllib.request.HTTPCookieProcessor(cookie)
- opener = urllib.request.build_opener(handler)
- response = opener.open(url, data=data, timeout=timeout)
- if callback:
- callback(cookie)
- return response
-
-
- class LoginBase(object):
- """
- 微博模拟登录基类,实现了新浪移动微博网页版(https://weibo.cn/)的模拟登录\n
- """
- # 缓存工具
- __cache = Cache()
-
- # 移动网页版 cookies 在缓存中的命名
- COOKIES_NAMESPACE = 'MOBILE_WEB_POOL_COOKIES'
-
- def __init__(self, username, password):
- # 微博账号
- self.__username = username
- # 微博密码
- self.__password = password
- # 记录 cookies,按照 domain 分组
- self.cookies = {}
-
- @property
- def username(self) -> str:
- return self.__username
-
- @property
- def password(self) -> str:
- return self.__password
-
- def save_cookies(self, cookie: http.cookiejar.CookieJar):
- """
- 保存 Cookies\n
- :param cookie:
- :return:
- """
- # 按照 domain 分组记录所访问过 url 的 cookies
- for item in cookie:
- tmp = self.cookies.get(item.domain)
- if tmp:
- tmp.update({item.name: item.value})
- else:
- self.cookies.update({item.domain: {item.name: item.value}})
-
- def login(self):
- """
- 微博移动网页版模拟登录(https://weibo.cn/),代码实现逻辑根据网页版 js,有一定的时效性\n
- :return:
- """
- print(u'微博移动网页版模拟登录(https://weibo.cn/)开始...')
- # 登录地址
- url = 'https://passport.weibo.cn/sso/login'
- # 默认 Headers
- headers = {
- 'Referer': 'https://passport.weibo.cn/signin/login?entry=mweibo&r=https%3A%2F%2Fweibo.cn&page=9.com&uid=1260427471&_T_WM=c6e864f47316ecbaf8607a214d4bb3fa',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'
- }
- # 构造模拟登录请求的表单数据
- data = {
- 'username': self.__username,
- 'password': self.__password,
- 'savestate': 1,
- 'r': 'https://weibo.cn',
- 'ec': 0,
- 'pagerefer': '',
- 'entry': 'mweibo',
- 'wentry': '',
- 'loginfrom': '',
- 'client_id': '',
- 'code': '',
- 'qq': '',
- 'mainpageflag': 1,
- 'hff': '',
- 'hfp': ''
- }
- try:
- # 格式化 请求数据
- post_data = urllib.parse.urlencode(data).encode('gbk')
- # 构造请求
- req = urllib.request.Request(url=url, data=post_data, headers=headers, method='POST')
- # 使用自定义的请求方法,保存请求的 cookies
- response = urlopen(url=req, callback=self.save_cookies, timeout=10)
- # 将返回的数据转化成 dict
- result = json.loads(response.read().decode('gbk'))
- if result.get('retcode') == 20000000:
- print(u'登录成功!')
- # 登录成功后返回的 url
- crossdomainlist = result.get('data').get('crossdomainlist')
- # 依次访问 url,获取 cookies 并保存
- if crossdomainlist:
- for item in dict(crossdomainlist).values():
- urlopen(item, self.save_cookies)
- else:
- print(u'登录失败!')
- # 将 cookies 放入缓存 redis
- self.push_cache()
- return True
- except Exception as e:
- print(u'解析失败', e)
- return False
-
- def push_cache(self):
- assert self.cookies, u'请先模拟登录'
- # self.__cache.lpush(self.COOKIES_NAMESPACE, json.dumps(self.cookies))
- self.__cache.hset(self.COOKIES_NAMESPACE, self.username, json.dumps(self.cookies))
- # 设置 cookies 的有效时间(三天)
- self.__cache.expire(self.COOKIES_NAMESPACE, COOKIES_EXPIRES)
-
- def get_cookies(self, domain=None, is_force_login=False):
- """
- 获取 cookies\n
- :param domain:域名
- :param is_force_login:是否强制登录(默认为 False)\n
- :return:
- """
- # 从 redis 获取 cookies
- # data = self.__cache.lrange(namespace, 0, 1)
- data = self.__cache.hget(self.COOKIES_NAMESPACE, self.username)
- if is_force_login or not data:
- # 如果 redis 中没有 cookies,则模拟登录,重新获取 cookies
- if self.login():
- cookies = self.cookies
- else:
- raise Exception(u'获取 Cookies 失败!')
- else:
- print(u'从缓存中获取 cookies')
- cookies = json.loads(data)
- if domain:
- return cookies.get(domain)
- return cookies
-
-
- class LoginForSinaCom(LoginBase):
- """
- 模拟新浪微博 PC 网页版(https://weibo.com)登录,登录后,将 cookies 保存到 redis 缓存中,并提供获取 cookies 的方法
- """
-
- # PC 网页版 cookies 在缓存中的命名
- COOKIES_NAMESPACE = 'PC_WEB_POOL_COOKIES'
-
- def __init__(self, username, password):
- LoginBase.__init__(self, username, password)
-
- def encrypt_name(self) -> str:
- """
- 用 base64 加密用户名 \n
- :return:
- """
- return base64.encodebytes(bytes(urllib.request.quote(self.username), 'utf-8'))[:-1].decode('utf-8')
-
- def encrypt_passwd(self, **kwargs) -> str:
- """
- 使用 rsa 加密密码\n
- :param kwargs:
- :return:
- """
- try:
- # 拼接明文
- message = str(kwargs['servertime']) + '\t' + str(kwargs['nonce']) + '\n' + str(self.password)
- # 10001 为 js 加密文件中的加密因子,16进制
- key = rsa.PublicKey(int(kwargs['pubkey'], 16), 0x10001)
- # 使用 rsa 加密拼接后的密码
- encrypt_pwd = rsa.encrypt(message.encode('utf-8'), key)
- # 将加密后的密文转化成 AscII 码
- final_pwd = binascii.b2a_hex(encrypt_pwd)
- return final_pwd
- except Exception as e:
- print(e)
- return None
-
- def pre_login(self) -> dict:
- """
- 预登录,请求 prelogin_url 链接地址 获取 servertime,nonce,pubkey 和 rsakv \n
- :return:
- """
- # 预登录地址
- pre_login_url = 'http://login.sina.com.cn/sso/prelogin.php?entry=sso&callback=sinaSSOController.preloginCallBack&su=%s&rsakt=mod&client=ssologin.js(v1.4.19)' % self.encrypt_name()
- try:
- response = urlopen(pre_login_url, callback=self.save_cookies, timeout=5)
- # 提取响应结果
- preloginCallBack = re.compile('\((.*)\)').search(str(response.read(), 'UTF-8'))
- if preloginCallBack:
- result = json.loads(preloginCallBack.group(1))
- else:
- raise Exception(u'解析响应结果失败!')
- return result
- except Exception as e:
- print(e)
- return None
-
- def login(self):
- """
- 登录新浪微博 PC 网页版(https://weibo.com)\n
- :return:
- """
- print(u'新浪微博 PC 网页版(https://weibo.com)登录开始...')
- # 预登录
- result = self.pre_login()
- # 加密用户账号
- encodedUserName = self.encrypt_name()
- serverTime = result.get('servicetime')
- nonce = result.get('nonce')
- rsakv = result.get('rsakv')
- # 加密密码
- encodedPassWord = self.encrypt_passwd(**result)
- # 构造请求数据
- post_data = {
- "entry": "weibo",
- "gateway": "1",
- "from": "",
- "savestate": "7",
- "qrcode_flag": 'false',
- "useticket": "1",
- "pagerefer": "https://login.sina.com.cn/crossdomain2.php?action=logout&r=https%3A%2F%2Fweibo.com%2Flogout.php%3Fbackurl%3D%252F",
- "vsnf": "1",
- "su": encodedUserName,
- "service": "miniblog",
- "servertime": serverTime,
- "nonce": nonce,
- "pwencode": "rsa2",
- "rsakv": rsakv,
- "sp": encodedPassWord,
- "sr": "1680*1050",
- "encoding": "UTF-8",
- "prelt": "194",
- "url": "https://weibo.com/ajaxlogin.php?framelogin=1&callback=parent.sinaSSOController.feedBackUrlCallBack",
- "returntype": "META"
- }
- # 登录地址
- url = 'https://login.sina.com.cn/sso/login.php?client=ssologin.js(v1.4.19)'
- # 打包请求数据
- data = urllib.parse.urlencode(post_data).encode('GBK')
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
- }
- try:
- # 请求登录
- req = urllib.request.Request(url=url, data=data, headers=headers)
- response = urlopen(req, callback=self.save_cookies)
- text = response.read().decode('GBK')
- except Exception as e:
- print(e)
- try:
- # 获取第一次重定向地址
- login_url = re.compile('location\.replace\("(.*)"\)').search(text).group(1)
- # 第一次重定向
- response = urlopen(login_url, callback=self.save_cookies)
- data = response.read().decode('GBK')
- # 获取第二次重定向地址
- jump_url = re.compile("location\.replace\('(.*)'\)").search(data).group(1)
- # 第二次重定向
- response = urlopen(jump_url, callback=self.save_cookies)
- data = response.read().decode('utf-8')
- # 获取服务器返回的加密的 用户名
- name = re.compile('"userdomain":"(.*)"').search(data).group(1)
- index = 'http://weibo.com/' + name
- # 第三次跳转到首页
- urlopen(index, callback=self.save_cookies)
- print(u'登录成功!')
- # 将 cookies 放入缓存 redis
- self.push_cache()
- return True
- except Exception as e:
- print(u'登录失败!,异常:', e)
- return False
-

有了 Cookies 池以后,就需要编写中间件类使用 Cookies 池的数据了,在 middleware.py 中添加如下类:
- class CookiesMiddleware(object):
- """
- 登录 Cookies 中间件
- """
-
- def __init__(self):
- self.cookies = simulate_login.LoginBase(settings.get('SINA_ACCOUNT'), settings.get('SINA_PASSWD')).get_cookies()
-
- def process_request(self, request, spider):
- cookies = self.cookies.get('.weibo.cn')
- request.cookies = cookies
判断代码实现很简单,只是在请求之前,从 Cookies 中获取一个 Cookies,然后赋给 Request。另外,在前面的使用 ip 池的时候,我们有根据返回码来确定是不是要重新从 ip 池中拉取 ip,同样的,我们可以增加判断,如果响应码为 418 ,就重新从 Cookies 池中获取 Cookies,另外,我们的微博账号密码是配置在 settings.py 中的 SINA_ACCOUNT 以及 SINA_PASSWD,需要替换成自己的账号密码。代码如下:
- if response.status == 418:
- # 出现 418 重新获取 cookies
- request.cookies = simulate_login.LoginBase(settings.get('SINA_ACCOUNT'),
- settings.get('SINA_PASSWD')).get_cookies('.weibo.cn')
- sec = random.randrange(30, 35)
- print(u'休眠 %s 秒后重试' % sec)
- # time.sleep(sec)
好了,接着,我们在 settings.py 中启用 CookiesMiddleware 中间件,代码如下:
-
- DOWNLOADER_MIDDLEWARES = {
- 'sina_scrapy.middlewares.RandomUserAgentMiddleware': 10,
- 'sina_scrapy.middlewares.CookiesMiddleware': 20,
- 'sina_scrapy.middlewares.IPProxyMiddleware': 30,
- }
-
- SINA_ACCOUNT = 'XXXXXXXXXXX'
- SINA_PASSWD = 'XXXXXXXXXXXX'
再次运行爬虫程序,将会从 Cookies 池中获取 Cookies。
本系列博客代码均已提交至我的 GitHub,有需要的可以移步下载,如需更详细的代码(数据分析,使用 pyecharts2.0 和 pyplot 绘制图表,包括:日爬取量统计图、微博用户性别分布统计图、微博用户年龄分布统计图、新浪微博用户活跃度分布图、新浪微博用户粉丝和关注数分布图、新浪微博用户分布地图等),请从 CSDN下载。pyecharts2.0 数据分析效果预览:
日爬取量统计图

微博用户性别分布统计图

微博用户年龄分布统计图

新浪微博用户活跃度分布图

新浪微博用户粉丝和关注数分布图

新浪微博用户分布地图

到这里,使用 Scrapy 爬取新浪微博用户信息系列博客就已经结束了,在这一系列博客中,我们学习了 Scrapy 框架的基本原理以及基本用法,当然 Scrapy 的强大之处远不止如此,例如:下载器中间件(DownloaderMiddleware)、基于 Redis 的分布式 Scrapy等。学无止境,希望大家不要止步于此。想要掌握 Scrapy 的全部功能,需要自己慢慢探索、实践。大家加油!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。