赞
踩
1亚马逊对于国外的IP友好,对于国内IP不友好。
小规模采集可以使用香港VPN或者美国IP直接通过 request请求即可。robot几率大概是15%
但是如果你使用国内IP robot几率就有90%
验证码识别如何搞定移步
如何打造自己的打码系统-验证码识别系统 【上】(亚马孙amazon验证码识别为例)
如果你是大规模的用户这样
要实现这样的爬虫系统,你需要考虑以下几个方面:
高性能爬虫设计:
aiohttp
或Scrapy
配合Twisted
。Scrapy
的分布式组件Scrapyd
,并部署多个实例以提高并发能力。IP解锁与代理管理:
数据解析与存储:
Scrapy
框架进行数据抓取和解析,利用其强大的Xpath和CSS选择器。SQLite
,或者根据数据量选择MySQL
、PostgreSQL
。NoSQL
数据库如MongoDB
。OSS镜像保存:
Scrapy
的FilesPipeline
或ImagesPipeline
来保存页面镜像。时间与成本控制:
代码优化:
解决方案:
1采用自建验证码识别系统,减少识别费用产生成本
2采用chrome 动态采集+request 采集同时进行。chrome +webdriver 用于识别和解锁ip
具体执行代码如下
is_robot = "/errors/validateCaptcha" in response.text
if (is_robot):
# 机器人验证弹出 则标记被锁
# logger.info(response.url + "mmmm"+proxy_about+'+++++++++'+"#机器人验证弹出================================")
print(response.url + "mmmm" + proxy_about + '+++++++++' + "#机器人验证弹出================================")
#读取验证码图片地址
'div.a-text-center img'
#执行IP解锁
solution = get_amazon_code(code_img)
def get_amazon_code(img_url):
import base64
url="http://xxxxxx:88/api/get/amazoncode?img_link="
str = img_url.encode()
# 对字符串进行编码
bs4str = base64.b64encode(str)
c=bytes.decode(bs4str)
url=url+c
code_json=get_task(url, payload={}, headers={})
print(code_json.text)
json1 = json.loads(code_json.text)
return json1['code']
3这样就能实现数据的高效采集,一边解锁IP,一边上锁IP
4这样IP被锁的概率由原先的80%可以降低到40%
5解析页面数据,
这里是亚马逊amazon 解析页面评论的python demo
def parseReview_element(self, element, countryCode): # review唯一标识 reviewId = element.css('div::attr(id)').extract_first() if (reviewId == None): # 不是本国的review,跳过 return None element_head = element.css("#customer_review-" + reviewId).extract_first() if (element_head == None): # 不是本国的review,跳过 return None # 亚马逊用户id与名称 amazonUserId = self.user(element) amazonUserName = str(element.css(".a-profile-name::text").extract_first())[0: 90] # 星级与title star = self.star(element) title = self.title(element) # review date reviewDateElement = element.css('span[data-hook*="review-date"]').extract_first() reviewDate = ReviewDateParseHelper().reviewDate(html_clear(reviewDateElement)) if (reviewDate == ReviewDateParseHelper().LocalDateTimeUtils.parse("2099-12-31", ReviewDateParseHelper().LocalDateTimeUtils.YYYY_MM_DD)): logging.debug("解析评论日期出错*********************************") logging.debug(element.extract()) logging.debug("解析评论日期出错*********************************") # Verified_Purchase 购买认证 verifiedElement = element.css('span[data-hook*="avp-badge"]').extract_first() verified = 1 if verifiedElement else 0 # 点赞数 helpful_num = element.css('span[data-hook*="helpful-vote-statement"]::text').extract_first() if (helpful_num): helpful_num = "".join(list(filter(str.isdigit, helpful_num))) helpful_num = int(helpful_num) if (helpful_num != "") else 1 else: helpful_num = 0 # *review 内容 content = html_clear(element.css('span[data-hook*="review-body"]').extract_first()) originReviewDate = html_clear(reviewDateElement) user_info_ext = ",".join(element.css('span[class*="c7yTopDownDashedStrike"]::text').extract()) # 解析media videos = [] has_video = element.css("div[class*='cr-video-desktop']").extract_first() if (has_video): video = {} video['videoUrl'] = element.css("input[value$='.mp4']::attr(value)").extract_first() video['videoSlateImgUrl'] = element.css( "div[class*='cr-video-desktop']::attr(data-thumbnail-url)").extract_first() videos.append(video) pics = [] has_pics = element.css("div[class*=review-image-tile-section] span a img::attr(src)").extract() if (has_pics): for pic in has_pics: pics.append(pic) if(len(pics)==0): has_pics2 = element.css("div[class*=cr-lightbox-image-thumbnails] img::attr(src)").extract() if (has_pics2): for pic in has_pics2: pics.append(pic) if (len(videos) > 0 or len(pics) > 0): media = {} media['videos'] = videos media['reviewImageUrls'] = list(set(pics)) media = json.dumps(media, ensure_ascii=False) else: media = "" print("media:==============",media) content = content.replace('The media could not be loaded.', " ").strip().replace("\n", " ").replace("\r", " ") Review = {} Review['country_code'] = countryCode Review['review_id'] = reviewId Review['amazon_user_id'] = amazonUserId Review['amazon_user_name'] = amazonUserName Review['verified'] = verified Review['star'] = star Review['title'] = title Review['content'] = content Review['helpful_num'] = helpful_num # helpful_num Review['review_date'] = reviewDate Review['user_info_ext'] = user_info_ext Review['origin_review_date'] = originReviewDate Review['media'] = media Review['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) Review['update_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) return Review
6采集完后需要写入数据库中,这里有两种方式写入一种是同步写入,会有并发性能问题,二异步写入,性能提高700%
以下是异步写入数据库的方法
将采集好的数据,先保存为本地json,然后通过其他的进程异步上传,本进程只做采集。进过测试,改造成异步后性能提升700%
if (sync_post):
self.save_postinfo_json(type="review", id=task['id'], data_dict=payload)
print(response.url, "异步上传数据")
return ""
(未完待续。。。。。。。。。。。。。。。)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。