1.免费IP
的采集与使用
使用IP
的原因与作用
首先代理ip
可以保护用户信息的安全。在如今的大数据互联网时代,每个人上网总会留下一点信息,很有可能被别人利用,而使用代理ip
可以完美解决这个问题。高匿名代理ip
可以隐藏用户的真实ip
地址,保护用户的个人数据和信息安全,提高用户上网的安全性。
其次可以提高访问速度,有时出现过访问网页时出现卡顿的问题,通过代理ip
一定程度上可以解决这个问题。通过代理ip
访问的一些网站等信息会存留在代理服务器的缓冲区内,假如别人访问过的信息你再访问,则会直接在缓冲区内拉取数据,进一步提高访问速度。遇到对ip
检测比较严格的网址也需要进行替换。
不同的代理类型
开放代理:是由全网扫描而来,就是别人搭建了代理服务器被扫到了拿来用,采用分布在全球各地的云服务器使用扫描软件,借助于nmap
工具,7*24*365
不间断全网扫描、验证。开放代理容易随时失效存在稳定性差、速度不稳定、安全性、可用率低等问题,目前市面上很多都是这种开放代理,价格低廉,我们不推荐这种代理,建议考虑私密代理。
私密代理:非扫描而来的,而是ip
提供商租用全国各地实体服务器或拨号服务器,采用自研的服务端代理程序和高可用的调度系统,并支持Http/Https/Scoks5
等协议,具有高匿、高速、稳定的特点。
独享代理:是私密代理的一种,客户需要长期稳定的长效IP
使用。
免费代理网址
可以从提供的免费代理网址中采集ip
地址,不保证可用:
txt
西刺代理: http://www.xicidaili.com
快代理: https://www.kuaidaili.com
云代理: http://www.ip3366.net
无忧代理: http://www.data5u.com/
360代理: http://www.swei360.com
66ip代理: http://www.66ip.cn
ip海代理: http://www.iphai.com
大象代理: http://www.daxiangdaili.com/
米扑代理: https://proxy.mimvp.com/freesecret.php
站大爷: http://ip.zdaye.com/
讯代理: http://www.xdaili.cn/
蚂蚁代理: http://www.mayidaili.com/free
89免费代理: http://www.89ip.cn/
全网代理: http://www.goubanjia.com/buy/high.html
开心代理: http://www.kxdaili.com/dailiip.html
猿人云: https://www.apeyun.com/
免费代理采集脚本
python
import json
import requests
from lxml import etree
class FreeAgent:
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
@classmethod
def get_ip(cls, page):
url = f'https://www.kuaidaili.com/free/inha/{page}/'
response = requests.get(url, headers=cls.headers).text
tree = etree.HTML(response)
tr_list = tree.xpath('//tbody/tr')
for tr in tr_list:
ip_dict = dict()
ip_dict['ip'] = tr.xpath('./td[1]/text()')[0]
ip_dict['port'] = tr.xpath('./td[2]/text()')[0]
yield ip_dict
def test_ip(self, max_page_num):
for page_num in range(1, max_page_num + 1):
for result in self.get_ip(page_num):
proxies = {
'http': 'http://' + result['ip'] + ':' + result['port'],
'https': 'https://' + result['ip'] + ':' + result['port']
}
try:
response = requests.get('http://httpbin.org/ip',
headers=self.headers, proxies=proxies, timeout=3)
if response.status_code == 200:
print(response.text)
with open('success_ip.txt', 'a', encoding='utf-8') as f:
f.write(json.dumps(proxies, ensure_ascii=False, indent=4) + '\n')
except Exception as e:
print('请求超时:', e)
free_agent = FreeAgent()
free_agent.test_ip(10)
2.付费代理的使用
付费代理平台
选择自己喜欢的代理平台自行购买:不同的代理平台都有对应的api
文档。在代理平台中需要自己注册账号并实名认证。
项目实战 - 亚马逊商品数据采集
- 在首页中点击全部按钮并进行抓包,获取对应的
api
链接
txt
https://www.amazon.cn/nav/ajax/hamburgerMainContent?ajaxTemplate=hamburgerMainContent&pageType=Gateway&hmDataAjaxHint=1&navDeviceType=desktop&isSmile=0&isPrime=0&isBackup=false&hashCustomerAndSessionId=c108bde04b677f19f2e5d7df74ff6ce0cad515fc&languageCode=zh_CN&environmentVFI=AmazonNavigationCards%2Fdevelopment%40B6122949553-AL2_x86_64&secondLayerTreeName=apparel_shoes%2Bcomputer_office%2Bhome_kitchen%2Bbeauty_pca%2Bkindle_ebook%2Bsports_outdoor%2Bgrocery%2Bbaby_toy%2Bphones_elec%2Bjewelry_watch%2Bhome_improvement%2Bvideo_game%2Bmusical_instrument%2Bcamera&customerCountryCode=null
- 分析
api
链接返回的数据,在数据中包含广告请求,将广告请求剔除掉 - 分析每个分类中所对应的页数,获取所有页数对应的网址
- 对网址发送请求,提取每个商品的详情页面地址
- 在详情页中提取商品数据进行保存
当前代码案例禁用了代理,如需启用代理功能请替换自己的代理
api
并解开代码注释
python
import time
import pymysql
import requests
import retrying
import threading
from lxml import etree
from queue import Queue
from loguru import logger
from feapder.network.user_agent import get
class AmazonShop:
def __init__(self):
self.db = pymysql.connect(host='localhost', user='root', password='root', db='py_spider')
self.cursor = self.db.cursor()
self.amazon_api = 'https://www.amazon.cn/nav/ajax/hamburgerMainContent?ajaxTemplate=hamburgerMainContent&pageType=Gateway&hmDataAjaxHint=1&navDeviceType=desktop&isSmile=0&isPrime=0&isBackup=false&hashCustomerAndSessionId=284d1b5a4086d9d81cafe4e0cdb784184a5c9f92&languageCode=zh_CN&environmentVFI=AmazonNavigationCards%2Fdevelopment-nov13patch%40B6165608796-AL2_x86_64&secondLayerTreeName=apparel_shoes%2Bcomputer_office%2Bhome_kitchen%2Bbeauty_pca%2Bsports_outdoor%2Bgrocery%2Bbaby_toy%2Bphones_elec%2Bjewelry_watch%2Bhome_improvement%2Bvideo_game%2Bmusical_instrument%2Bcamera&customerCountryCode=null'
self.headers = {
"Referer": "https://www.amazon.cn",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
"downlink": "10",
"ect": "4g",
"rtt": "200",
}
# 需要自行购买并替换自己的api接口
# self.ip_url = 'http://dev.qydailiip.com/api/?apikey=5c0cf0893210da4e59c65bb489741190bbee43e3&num=30&type=json&line=mac&proxy_type=putong&sort=rand&model=all&protocol=http&address=&kill_address=&port=&kill_port=&today=false&abroad=&isp=&anonymity='
# ip队列
# self.ip_queue = Queue()
# 商品分类地址队列
self.classify_url_queue = Queue()
# 商品详情地址队列
self.detail_url_queue = Queue()
# 商品数据队列
self.shop_info_queue = Queue()
# 数据表创建
def create_table(self):
sql = """
create table if not exists amazon_shop(
id int primary key auto_increment,
price varchar(255) not null,
title varchar(255) not null,
goods_url varchar(255) not null
);
"""
try:
self.cursor.execute(sql)
print('商品表创建成功...')
except Exception as e:
print(f'商品表创建失败: ', e)
# 获取代理ip并上传到ip队列
# def get_ip(self):
# while True:
# # 判断当前队列是否为空, 如果为空则获取ip
# if self.ip_queue.empty():
# response = requests.get(self.ip_url).json()
# for ip in response:
# self.ip_queue.put(ip)
# else:
# continue
# 发送请求并获取网站接口数据
@retrying.retry(stop_max_attempt_number=3)
def get_shop_data(self, url):
# ip = self.ip_queue.get()
# proxies = {
# 'http': 'http://' + ip,
# 'https': 'https://' + ip
# }
self.headers['User-Agent'] = get()
# response = requests.get(url, headers=self.headers, proxies=proxies, verify=False, timeout=2)
response = requests.get(url, headers=self.headers)
# 代理ip重用
# if response.status_code == 200:
# self.ip_queue.put(ip)
# else:
# print('状态码异常:', ip)
return response
def get_classify_info(self):
try:
response = self.get_shop_data(self.amazon_api).json()['data']
tree = etree.HTML(response)
li_list = tree.xpath('//ul/li[position() > 2]')
for li in li_list:
item = dict()
if li.xpath('./a/text()'):
# 全部分类的数据是分类页面
if '全部' in li.xpath('./a/text()')[0]:
continue
# 带有https的为广告页面
if 'http' in li.xpath('./a/@href')[0]:
continue
item['title'] = li.xpath('./a/text()')[0]
item['href'] = li.xpath('./a/@href')[0].split('=')[1].split('&')[0]
# print(item)
self.classify_url_queue.put(item)
except Exception as e:
print('请求失败:', e)
logger.error(self.amazon_api)
# 获取商品列表最大翻页数以及提取单个商品详情页地址
def get_detail_url(self):
while True:
info_url = self.classify_url_queue.get()
try:
response = self.get_shop_data(
'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true').text
except Exception as e:
print('访问失败:', e)
logger.error('商品列表页访问失败:',
'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true')
continue
tree = etree.HTML(response)
if tree.xpath('//span[@class="s-pagination-strip"]/span[last()]/text()'):
max_page = tree.xpath('//span[@class="s-pagination-strip"]/span[last()]/text()')[0]
# 根据在商品列表页中提取的最大页数进行翻页
for page in range(1, int(max_page) + 1):
new_page = 'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true&page=' + str(page)
try:
response = self.get_shop_data(new_page)
except Exception as e:
print('访问失败:', e)
logger.error('商品列表页访问失败:', new_page)
continue
tree = etree.HTML(response.text)
# 获取所有指定商品的详情地址
detail_href_list = tree.xpath(
'//div[@class="sg-col-inner"]/span/div[1]/div/div/div//h2/a/@href')
for detail_href in detail_href_list:
item = dict()
item['detail_href'] = detail_href
# print('成立:', item)
self.detail_url_queue.put(item)
else:
page = 'https://www.amazon.cn/s?rh=n%3A' + info_url['href'] + '&fs=true'
response = self.get_shop_data(page)
tree = etree.HTML(response.text)
# 获取所有指定商品的详情地址
detail_href_list = tree.xpath('//div[@class="sg-col-inner"]/span/div[1]/div/div/div//h2/a/@href')
for detail_href in detail_href_list:
item = dict()
item['detail_href'] = detail_href
# print('不成立:', item)
self.detail_url_queue.put(item)
self.classify_url_queue.task_done()
# 商品数据解析
def parse_shop_info(self):
while True:
goods_data = self.detail_url_queue.get()
goods_url = 'https://www.amazon.cn' + goods_data['detail_href']
try:
response = self.get_shop_data(goods_url)
except Exception as e:
logger.error(goods_url)
print('访问失败:', e)
continue
tree = etree.HTML(response.text)
# 获取商品标题
title = tree.xpath('//div[@id="centerCol"]//h1/span/text()')[0] if tree.xpath(
'//div[@id="centerCol"]//h1/span/text()') else tree.xpath('//title/text()')[0]
# 获取商品价格
if tree.xpath('//div[@id="centerCol"]//div[@id="apex_desktop"]//span[@class="a-price-whole"]/text()'):
price = "¥" + tree.xpath(
'//div[@id="centerCol"]//div[@id="apex_desktop"]//span[@class="a-price-whole"]/text()')[0]
else:
price = '-'.join(tree.xpath(
'//td[@class="a-span12"]//span[@class="a-offscreen"]/text()'))
print((title.strip(), price, goods_url))
# 将获取到的商品信息上传到队列: 上传的数据类型为一个元组
self.shop_info_queue.put((title.strip(), price, goods_url))
self.detail_url_queue.task_done()
# 商品数据保存
def save_shop_info(self):
while True:
# 定义列表用于数据批量存储
info_list = list()
# 在队列中生成30条数据完成多行插入
for _ in range(30):
info = self.shop_info_queue.get()
info_list.append((0,) + info)
self.shop_info_queue.task_done()
sql = """
insert into amazon_shop(id, price, title, goods_url) values (
%s, %s, %s, %s
);
"""
try:
self.cursor.executemany(sql, info_list)
self.db.commit()
print('数据插入成功:', info_list)
except Exception as e:
print('数据插入失败:', e)
self.db.rollback()
def main(self):
self.create_table()
# 创建线程对象列表
thread_list = list()
# 分类线程
thread_list.append(threading.Thread(target=self.get_classify_info))
# 商品详情线程
for _ in range(10):
thread_list.append(threading.Thread(target=self.get_detail_url))
# 数据解析线程
for _ in range(2):
thread_list.append(threading.Thread(target=self.parse_shop_info))
# 数据保存 建议大家使用一个线程完成数据保存
thread_list.append(threading.Thread(target=self.save_shop_info))
for thread_obj in thread_list:
thread_obj.daemon = True
thread_obj.start()
# 延迟等待线程对象启动
time.sleep(4)
for queue in [self.classify_url_queue, self.detail_url_queue, self.shop_info_queue]:
queue.join()
if __name__ == '__main__':
# 日志记录: 文件过大于500M就会重新生成一个文件
logger.add('runtime_{time}.log', rotation='500 MB')
amazon_shop = AmazonShop()
amazon_shop.main()