当前位置:   article > 正文

【地表最强】亚马逊Amazon高性能爬虫_亚马逊爬虫机器人检测

亚马逊爬虫机器人检测

亚马逊爬虫经验总结:

1亚马逊对于国外的IP友好,对于国内IP不友好。
小规模采集可以使用香港VPN或者美国IP直接通过 request请求即可。robot几率大概是15%
但是如果你使用国内IP robot几率就有90%

在这里插入图片描述
验证码识别如何搞定移步
如何打造自己的打码系统-验证码识别系统 【上】(亚马孙amazon验证码识别为例)

如果你是大规模的用户这样
要实现这样的爬虫系统,你需要考虑以下几个方面:

  1. 高性能爬虫设计:

    • 使用高效的异步网络库,如aiohttpScrapy配合Twisted
    • 利用分布式爬虫系统,如Scrapy的分布式组件Scrapyd,并部署多个实例以提高并发能力。
  2. IP解锁与代理管理:

    • 国内代理池的构建和管理,可以自建或使用第三方服务。
    • 实现自动切换IP的中间件,一旦检测到IP被封,自动更换代理。
    • 定期验证代理池中IP的有效性,确保代理IP的质量。
  3. 数据解析与存储:

    • 使用Scrapy框架进行数据抓取和解析,利用其强大的Xpath和CSS选择器。
    • 数据存储可以考虑使用轻量级数据库如SQLite,或者根据数据量选择MySQLPostgreSQL
    • 对于大数据量存储,可以考虑使用NoSQL数据库如MongoDB
  4. OSS镜像保存:

    • 对于关键词搜索页面,可以使用ScrapyFilesPipelineImagesPipeline来保存页面镜像。
    • 将抓取的页面存储到对象存储服务(OSS)中,如阿里云OSS、腾讯云COS等。
  5. 时间与成本控制:

    • 优化爬虫的抓取策略,比如合理安排爬取时间、避免重复爬取等。
    • 成本控制方面,可以通过提高代理IP的复用率、减少无效请求、优化数据处理流程等方式来降低成本。
  6. 代码优化:

    • 对爬虫代码进行性能分析和优化,减少不必要的计算和网络请求。
    • 使用缓存策略,减少对同一URL的重复访问。

解决方案:
1采用自建验证码识别系统,减少识别费用产生成本
2采用chrome 动态采集+request 采集同时进行。chrome +webdriver 用于识别和解锁ip
具体执行代码如下

is_robot = "/errors/validateCaptcha" in response.text
if (is_robot):
            # 机器人验证弹出 则标记被锁
            # logger.info(response.url + "mmmm"+proxy_about+'+++++++++'+"#机器人验证弹出================================")
            print(response.url + "mmmm" + proxy_about + '+++++++++' + "#机器人验证弹出================================")
            #读取验证码图片地址
            'div.a-text-center img'
            #执行IP解锁
            solution = get_amazon_code(code_img)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
def get_amazon_code(img_url):
    import base64
    url="http://xxxxxx:88/api/get/amazoncode?img_link=" 
    str = img_url.encode()
    # 对字符串进行编码
    bs4str = base64.b64encode(str)
    c=bytes.decode(bs4str)
    url=url+c
    code_json=get_task(url, payload={}, headers={})
    print(code_json.text)
    json1 = json.loads(code_json.text)
    return json1['code']
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
'
运行

3这样就能实现数据的高效采集,一边解锁IP,一边上锁IP
4这样IP被锁的概率由原先的80%可以降低到40%
5解析页面数据,
这里是亚马逊amazon 解析页面评论的python demo

def parseReview_element(self, element, countryCode):

        # review唯一标识
        reviewId = element.css('div::attr(id)').extract_first()
        if (reviewId == None):
            # 不是本国的review,跳过
            return None
        element_head = element.css("#customer_review-" + reviewId).extract_first()
        if (element_head == None):
            # 不是本国的review,跳过
            return None

        # 亚马逊用户id与名称

        amazonUserId = self.user(element)

        amazonUserName = str(element.css(".a-profile-name::text").extract_first())[0: 90]

        # 星级与title
        star = self.star(element)
        title = self.title(element)
        # review date
        reviewDateElement = element.css('span[data-hook*="review-date"]').extract_first()

        reviewDate = ReviewDateParseHelper().reviewDate(html_clear(reviewDateElement))
        if (reviewDate == ReviewDateParseHelper().LocalDateTimeUtils.parse("2099-12-31",
                                                                           ReviewDateParseHelper().LocalDateTimeUtils.YYYY_MM_DD)):
            logging.debug("解析评论日期出错*********************************")
            logging.debug(element.extract())
            logging.debug("解析评论日期出错*********************************")
        # Verified_Purchase 购买认证
        verifiedElement = element.css('span[data-hook*="avp-badge"]').extract_first()
        verified = 1 if verifiedElement else 0

        # 点赞数
        helpful_num = element.css('span[data-hook*="helpful-vote-statement"]::text').extract_first()
        if (helpful_num):
            helpful_num = "".join(list(filter(str.isdigit, helpful_num)))
            helpful_num = int(helpful_num) if (helpful_num != "") else 1
        else:
            helpful_num = 0

        # *review 内容
        content = html_clear(element.css('span[data-hook*="review-body"]').extract_first())
        originReviewDate = html_clear(reviewDateElement)
        user_info_ext = ",".join(element.css('span[class*="c7yTopDownDashedStrike"]::text').extract())

        # 解析media

        videos = []
        has_video = element.css("div[class*='cr-video-desktop']").extract_first()
        if (has_video):
            video = {}
            video['videoUrl'] = element.css("input[value$='.mp4']::attr(value)").extract_first()
            video['videoSlateImgUrl'] = element.css(
                "div[class*='cr-video-desktop']::attr(data-thumbnail-url)").extract_first()
            videos.append(video)

        pics = []
        has_pics = element.css("div[class*=review-image-tile-section] span a img::attr(src)").extract()
        if (has_pics):
            for pic in has_pics:
                pics.append(pic)

        if(len(pics)==0):
            has_pics2 = element.css("div[class*=cr-lightbox-image-thumbnails] img::attr(src)").extract()
            if (has_pics2):
                for pic in has_pics2:
                    pics.append(pic)

        if (len(videos) > 0 or len(pics) > 0):
            media = {}
            media['videos'] = videos
            media['reviewImageUrls'] = list(set(pics))
            media = json.dumps(media, ensure_ascii=False)
        else:
            media = ""
        print("media:==============",media)
        content = content.replace('The media could not be loaded.', " ").strip().replace("\n", " ").replace("\r", " ")

        Review = {}
        Review['country_code'] = countryCode
        Review['review_id'] = reviewId
        Review['amazon_user_id'] = amazonUserId
        Review['amazon_user_name'] = amazonUserName
        Review['verified'] = verified
        Review['star'] = star
        Review['title'] = title
        Review['content'] = content
        Review['helpful_num'] = helpful_num  # helpful_num
        Review['review_date'] = reviewDate
        Review['user_info_ext'] = user_info_ext
        Review['origin_review_date'] = originReviewDate
        Review['media'] = media
        Review['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
        Review['update_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
        return Review
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
'
运行

6采集完后需要写入数据库中,这里有两种方式写入一种是同步写入,会有并发性能问题,二异步写入,性能提高700%
以下是异步写入数据库的方法
将采集好的数据,先保存为本地json,然后通过其他的进程异步上传,本进程只做采集。进过测试,改造成异步后性能提升700%

if (sync_post):
            self.save_postinfo_json(type="review", id=task['id'], data_dict=payload)
            print(response.url, "异步上传数据")
            return ""
  • 1
  • 2
  • 3
  • 4

联系作者

(未完待续。。。。。。。。。。。。。。。)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/916069
推荐阅读
相关标签
  

闽ICP备14008679号