Skip to content

1.免费IP的采集与使用

使用IP的原因与作用

首先代理ip可以保护用户信息的安全。在如今的大数据互联网时代,每个人上网总会留下一点信息,很有可能被别人利用,而使用代理ip可以完美解决这个问题。高匿名代理ip可以隐藏用户的真实ip地址,保护用户的个人数据和信息安全,提高用户上网的安全性。

其次可以提高访问速度,有时出现过访问网页时出现卡顿的问题,通过代理ip一定程度上可以解决这个问题。通过代理ip访问的一些网站等信息会存留在代理服务器的缓冲区内,假如别人访问过的信息你再访问,则会直接在缓冲区内拉取数据,进一步提高访问速度。遇到对ip检测比较严格的网址也需要进行替换。

不同的代理类型

开放代理:是由全网扫描而来,就是别人搭建了代理服务器被扫到了拿来用,采用分布在全球各地的云服务器使用扫描软件,借助于nmap工具,7*24*365不间断全网扫描、验证。开放代理容易随时失效存在稳定性差、速度不稳定、安全性、可用率低等问题,目前市面上很多都是这种开放代理,价格低廉,我们不推荐这种代理,建议考虑私密代理。

私密代理:非扫描而来的,而是ip提供商租用全国各地实体服务器或拨号服务器,采用自研的服务端代理程序和高可用的调度系统,并支持Http/Https/Scoks5等协议,具有高匿、高速、稳定的特点。

独享代理:是私密代理的一种,客户需要长期稳定的长效IP使用。

免费代理网址

可以从提供的免费代理网址中采集ip地址,不保证可用:

txt
西刺代理: http://www.xicidaili.com
快代理: https://www.kuaidaili.com
云代理: http://www.ip3366.net
无忧代理: http://www.data5u.com/
360代理: http://www.swei360.com
66ip代理: http://www.66ip.cn
ip海代理: http://www.iphai.com
大象代理: http://www.daxiangdaili.com/
米扑代理: https://proxy.mimvp.com/freesecret.php
站大爷: http://ip.zdaye.com/
讯代理: http://www.xdaili.cn/
蚂蚁代理: http://www.mayidaili.com/free
89免费代理: http://www.89ip.cn/
全网代理: http://www.goubanjia.com/buy/high.html
开心代理: http://www.kxdaili.com/dailiip.html
猿人云: https://www.apeyun.com/
免费代理采集脚本
python
import json
import requests
from lxml import etree


class FreeAgent:
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
    }

    @classmethod
    def get_ip(cls, page):
        url = f'https://www.kuaidaili.com/free/inha/{page}/'
        response = requests.get(url, headers=cls.headers).text
        tree = etree.HTML(response)

        tr_list = tree.xpath('//tbody/tr')
        for tr in tr_list:
            ip_dict = dict()
            ip_dict['ip'] = tr.xpath('./td[1]/text()')[0]
            ip_dict['port'] = tr.xpath('./td[2]/text()')[0]
            yield ip_dict

    def test_ip(self, max_page_num):
        for page_num in range(1, max_page_num + 1):
            for result in self.get_ip(page_num):
                proxies = {
                    'http': 'http://' + result['ip'] + ':' + result['port'],
                    'https': 'https://' + result['ip'] + ':' + result['port']
                }

                try:
                    response = requests.get('http://httpbin.org/ip',
                                            headers=self.headers, proxies=proxies, timeout=3)
                    if response.status_code == 200:
                        print(response.text)
                        with open('success_ip.txt', 'a', encoding='utf-8') as f:
                            f.write(json.dumps(proxies, ensure_ascii=False, indent=4) + '\n')
                except Exception as e:
                    print('请求超时:', e)


free_agent = FreeAgent()
free_agent.test_ip(10)

2.付费代理的使用

付费代理平台

选择自己喜欢的代理平台自行购买:不同的代理平台都有对应的api文档。在代理平台中需要自己注册账号并实名认证。

项目实战 - 亚马逊商品数据采集

采集地址:https://www.amazon.cn/

  1. 在首页中点击全部按钮并进行抓包,获取对应的api链接
txt
https://www.amazon.cn/nav/ajax/hamburgerMainContent?ajaxTemplate=hamburgerMainContent&pageType=Gateway&hmDataAjaxHint=1&navDeviceType=desktop&isSmile=0&isPrime=0&isBackup=false&hashCustomerAndSessionId=c108bde04b677f19f2e5d7df74ff6ce0cad515fc&languageCode=zh_CN&environmentVFI=AmazonNavigationCards%2Fdevelopment%40B6122949553-AL2_x86_64&secondLayerTreeName=apparel_shoes%2Bcomputer_office%2Bhome_kitchen%2Bbeauty_pca%2Bkindle_ebook%2Bsports_outdoor%2Bgrocery%2Bbaby_toy%2Bphones_elec%2Bjewelry_watch%2Bhome_improvement%2Bvideo_game%2Bmusical_instrument%2Bcamera&customerCountryCode=null
  1. 分析api链接返回的数据,在数据中包含广告请求,将广告请求剔除掉
  2. 分析每个分类中所对应的页数,获取所有页数对应的网址
  3. 对网址发送请求,提取每个商品的详情页面地址
  4. 在详情页中提取商品数据进行保存

当前代码案例禁用了代理,如需启用代理功能请替换自己的代理api并解开代码注释

python
import time
import pymysql
import requests
import retrying
import threading
from lxml import etree
from queue import Queue
from loguru import logger
from feapder.network.user_agent import get


class AmazonShop:
    def __init__(self):
        self.db = pymysql.connect(host='localhost', user='root', password='root', db='py_spider')
        self.cursor = self.db.cursor()
        self.amazon_api = 'https://www.amazon.cn/nav/ajax/hamburgerMainContent?ajaxTemplate=hamburgerMainContent&pageType=Gateway&hmDataAjaxHint=1&navDeviceType=desktop&isSmile=0&isPrime=0&isBackup=false&hashCustomerAndSessionId=284d1b5a4086d9d81cafe4e0cdb784184a5c9f92&languageCode=zh_CN&environmentVFI=AmazonNavigationCards%2Fdevelopment-nov13patch%40B6165608796-AL2_x86_64&secondLayerTreeName=apparel_shoes%2Bcomputer_office%2Bhome_kitchen%2Bbeauty_pca%2Bsports_outdoor%2Bgrocery%2Bbaby_toy%2Bphones_elec%2Bjewelry_watch%2Bhome_improvement%2Bvideo_game%2Bmusical_instrument%2Bcamera&customerCountryCode=null'
        self.headers = {
            "Referer": "https://www.amazon.cn",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
            "X-Requested-With": "XMLHttpRequest",
            "downlink": "10",
            "ect": "4g",
            "rtt": "200",
        }

        # 需要自行购买并替换自己的api接口
        # self.ip_url = 'http://dev.qydailiip.com/api/?apikey=5c0cf0893210da4e59c65bb489741190bbee43e3&num=30&type=json&line=mac&proxy_type=putong&sort=rand&model=all&protocol=http&address=&kill_address=&port=&kill_port=&today=false&abroad=&isp=&anonymity='

        # ip队列
        # self.ip_queue = Queue()
        # 商品分类地址队列
        self.classify_url_queue = Queue()
        # 商品详情地址队列
        self.detail_url_queue = Queue()
        # 商品数据队列
        self.shop_info_queue = Queue()

    # 数据表创建
    def create_table(self):
        sql = """
            create table if not exists amazon_shop(
                id int primary key auto_increment,
                price varchar(255) not null,
                title varchar(255) not null,
                goods_url varchar(255) not null
            );
        """
        try:
            self.cursor.execute(sql)
            print('商品表创建成功...')
        except Exception as e:
            print(f'商品表创建失败: ', e)

    # 获取代理ip并上传到ip队列
    # def get_ip(self):
    #     while True:
    #         # 判断当前队列是否为空, 如果为空则获取ip
    #         if self.ip_queue.empty():
    #             response = requests.get(self.ip_url).json()
    #             for ip in response:
    #                 self.ip_queue.put(ip)
    #         else:
    #             continue

    # 发送请求并获取网站接口数据
    @retrying.retry(stop_max_attempt_number=3)
    def get_shop_data(self, url):
        # ip = self.ip_queue.get()
        # proxies = {
        #     'http': 'http://' + ip,
        #     'https': 'https://' + ip
        # }

        self.headers['User-Agent'] = get()
        # response = requests.get(url, headers=self.headers, proxies=proxies, verify=False, timeout=2)
        response = requests.get(url, headers=self.headers)
        # 代理ip重用
        # if response.status_code == 200:
        #     self.ip_queue.put(ip)
        # else:
        #     print('状态码异常:', ip)
        return response

    def get_classify_info(self):
        try:
            response = self.get_shop_data(self.amazon_api).json()['data']
            tree = etree.HTML(response)
            li_list = tree.xpath('//ul/li[position() > 2]')
            for li in li_list:
                item = dict()
                if li.xpath('./a/text()'):
                    # 全部分类的数据是分类页面
                    if '全部' in li.xpath('./a/text()')[0]:
                        continue
                    # 带有https的为广告页面
                    if 'http' in li.xpath('./a/@href')[0]:
                        continue

                    item['title'] = li.xpath('./a/text()')[0]
                    item['href'] = li.xpath('./a/@href')[0].split('=')[1].split('&')[0]
                    # print(item)
                    self.classify_url_queue.put(item)
        except Exception as e:
            print('请求失败:', e)
            logger.error(self.amazon_api)

    # 获取商品列表最大翻页数以及提取单个商品详情页地址
    def get_detail_url(self):
        while True:
            info_url = self.classify_url_queue.get()
            try:
                response = self.get_shop_data(
                    'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true').text
            except Exception as e:
                print('访问失败:', e)
                logger.error('商品列表页访问失败:',
                             'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true')
                continue

            tree = etree.HTML(response)
            if tree.xpath('//span[@class="s-pagination-strip"]/span[last()]/text()'):
                max_page = tree.xpath('//span[@class="s-pagination-strip"]/span[last()]/text()')[0]
                # 根据在商品列表页中提取的最大页数进行翻页
                for page in range(1, int(max_page) + 1):
                    new_page = 'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true&page=' + str(page)
                    try:
                        response = self.get_shop_data(new_page)
                    except Exception as e:
                        print('访问失败:', e)
                        logger.error('商品列表页访问失败:', new_page)
                        continue

                    tree = etree.HTML(response.text)
                    # 获取所有指定商品的详情地址
                    detail_href_list = tree.xpath(
                        '//div[@class="sg-col-inner"]/span/div[1]/div/div/div//h2/a/@href')
                    for detail_href in detail_href_list:
                        item = dict()
                        item['detail_href'] = detail_href
                        # print('成立:', item)
                        self.detail_url_queue.put(item)
            else:
                page = 'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true'
                response = self.get_shop_data(page)
                tree = etree.HTML(response.text)
                # 获取所有指定商品的详情地址
                detail_href_list = tree.xpath('//div[@class="sg-col-inner"]/span/div[1]/div/div/div//h2/a/@href')
                for detail_href in detail_href_list:
                    item = dict()
                    item['detail_href'] = detail_href
                    # print('不成立:', item)
                    self.detail_url_queue.put(item)

            self.classify_url_queue.task_done()

    # 商品数据解析
    def parse_shop_info(self):
        while True:
            goods_data = self.detail_url_queue.get()
            goods_url = 'https://www.amazon.cn' + goods_data['detail_href']
            try:
                response = self.get_shop_data(goods_url)
            except Exception as e:
                logger.error(goods_url)
                print('访问失败:', e)
                continue

            tree = etree.HTML(response.text)

            # 获取商品标题
            title = tree.xpath('//div[@id="centerCol"]//h1/span/text()')[0] if tree.xpath(
                '//div[@id="centerCol"]//h1/span/text()') else tree.xpath('//title/text()')[0]

            # 获取商品价格
            if tree.xpath('//div[@id="centerCol"]//div[@id="apex_desktop"]//span[@class="a-price-whole"]/text()'):
                price = "¥" + tree.xpath(
                    '//div[@id="centerCol"]//div[@id="apex_desktop"]//span[@class="a-price-whole"]/text()')[0]
            else:
                price = '-'.join(tree.xpath(
                    '//td[@class="a-span12"]//span[@class="a-offscreen"]/text()'))

            print((title.strip(), price, goods_url))

            # 将获取到的商品信息上传到队列: 上传的数据类型为一个元组
            self.shop_info_queue.put((title.strip(), price, goods_url))
            self.detail_url_queue.task_done()

    # 商品数据保存
    def save_shop_info(self):
        while True:
            # 定义列表用于数据批量存储
            info_list = list()

            # 在队列中生成30条数据完成多行插入
            for _ in range(30):
                info = self.shop_info_queue.get()
                info_list.append((0,) + info)
                self.shop_info_queue.task_done()

            sql = """
                    insert into amazon_shop(id, price, title, goods_url) values (
                        %s, %s, %s, %s
                    );
                """

            try:
                self.cursor.executemany(sql, info_list)
                self.db.commit()
                print('数据插入成功:', info_list)
            except Exception as e:
                print('数据插入失败:', e)
                self.db.rollback()

    def main(self):
        self.create_table()

        # 创建线程对象列表
        thread_list = list()

        # 分类线程
        thread_list.append(threading.Thread(target=self.get_classify_info))
        # 商品详情线程
        for _ in range(10):
            thread_list.append(threading.Thread(target=self.get_detail_url))
        # 数据解析线程
        for _ in range(2):
            thread_list.append(threading.Thread(target=self.parse_shop_info))
        # 数据保存 建议大家使用一个线程完成数据保存
        thread_list.append(threading.Thread(target=self.save_shop_info))

        for thread_obj in thread_list:
            thread_obj.daemon = True
            thread_obj.start()

        # 延迟等待线程对象启动
        time.sleep(4)

        for queue in [self.classify_url_queue, self.detail_url_queue, self.shop_info_queue]:
            queue.join()


if __name__ == '__main__':
    # 日志记录: 文件过大于500M就会重新生成一个文件
    logger.add('runtime_{time}.log', rotation='500 MB')
    amazon_shop = AmazonShop()
    amazon_shop.main()

Sube's Study Notes.