1.本章介绍
Python
由于GIL
(全局解释器锁)的存在,不能发挥多线程多核的优势,其性能一直饱受诟病。然而在IO
密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了python
性能方面的短板。
Python3.4
版本引入asyncio
是到标准库Python3.5
又加入了async
/await
特性
接下来,我们会利用之前学习的协程来完成爬虫并发任务。
2.asyncio
结合requests
完成爬虫任务
import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
loop = asyncio.get_event_loop()
async def get_movie_info(page):
# run_in_executor不支持关键字参数传递headers, 使用偏函数传递
response = await loop.run_in_executor(None, partial(requests.get, url.format(page * 25), headers=headers))
# print(response)
# print(response.request.headers)
soup = BeautifulSoup(response.text, 'lxml')
div_list = soup.find_all('div', class_='hd')
for title in div_list:
print(title.get_text())
if __name__ == '__main__':
# 异步随机调度导致输出的标题不是顺序输出
tasks = [loop.create_task(get_movie_info(page)) for page in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
3.使用aiohttp
完成爬虫任务
由于requests
爬虫库本身不支持异步,在asyncio
中需要开启线程池才能使用。在使用上稍微有些麻烦,为了解决这个问题,我们使用支持异步操作的aiohttp
来完成爬虫任务。
介绍与安装
介绍
aiohttp
是一个异步的网络库,可以实现HTTP
客户端,也可以实现HTTP
服务器,爬虫阶段我们只用它来实现HTTP
客户端功能。
官网:https://docs.aiohttp.org/en/stable/
aiohttp
客户端相关的官方文档:https://docs.aiohttp.org/en/stable/client.html#aiohttp-client
安装
pip install aiohttp -i https://pypi.tuna.tsinghua.edu.cn/simple
基本使用
示例代码
import asyncio
from aiohttp import ClientSession
url = "https://www.baidu.com"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
async def get_baidu():
async with ClientSession() as session:
async with session.get(url, headers=headers) as response:
response = await response.text()
print(response)
if __name__ == '__main__':
asyncio.run(get_baidu())
并发操作
示例代码
import asyncio
import aiohttp
def download_completed_callback(task_obj):
print("下载的内容为:", task_obj.result())
async def baidu_spider():
print("---百度蜘蛛---")
url = "https://www.baidu.com"
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
return await r.text()
async def sogou_spider():
print("---搜狗蜘蛛---")
url = "https://www.sogou.com"
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
return await r.text()
async def jingdong_spider():
print("---京东蜘蛛---")
url = "https://www.jd.com"
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
return await r.text()
async def main():
# 创建多个Task,且添加回调函数
task_baidu = asyncio.create_task(baidu_spider())
task_baidu.add_done_callback(download_completed_callback)
task_sogou = asyncio.create_task(sogou_spider())
task_sogou.add_done_callback(download_completed_callback)
task_jingdong = asyncio.create_task(jingdong_spider())
task_jingdong.add_done_callback(download_completed_callback)
tasks = [task_baidu, task_sogou, task_jingdong]
# 等待下载
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
练习:使用aiohttp
抓取豆瓣电影标题
示例代码
import asyncio
import aiohttp
from bs4 import BeautifulSoup
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
async def get_movie_info(page):
async with aiohttp.ClientSession() as session:
async with session.get(url.format(page * 25), headers=headers) as response:
soup = BeautifulSoup(await response.text(), 'lxml')
div_list = soup.find_all('div', class_='hd')
for title in div_list:
print(title.get_text())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [loop.create_task(get_movie_info(page)) for page in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
4.aiomysql
的使用
安装
pip install aiomysql
利用python3
中新加入的异步关键词async/await
, 我们使用各种异步操作为来执行各种异步的操作,如使用aiohttp
来代替requests
来执行异步的网络请求操作,使用motor
来代替同步的pymongo
库来操作mongo
数据库,我们在开发同步的python
程序时,我们会使用PyMySQL
来操作mysql
数据库,同样,我们会使用aiomysql
来异步操作mysql
数据库。
使用方式
import asyncio
import aiomysql
loop = asyncio.get_event_loop()
async def test_example():
conn = await aiomysql.connect(host='127.0.0.1', port=3306,
user='root', password='root', db='py_spider',
loop=loop)
cursor = await conn.cursor()
await cursor.execute("SELECT * from ali_work")
# 打印输出当前表中的字段信息
print(cursor.description)
result = await cursor.fetchall()
print(result)
await cursor.close()
conn.close()
loop.run_until_complete(test_example())
通过异步爬虫完成数据存储
使用
asyncio
完成汽车之家的汽车参数信息并保存到mysql
数据库中网址:https://www.che168.com/china/a0_0msdgscncgpi1ltocsp7exf4x0/?pvareaid=102179#currengpostion
思路分析:
- 当前页面数据为静态数据,在翻页时
url
中的sp1
会变更为sp2
,所以当前页面可以使用xpath
提取数据。 - 通过首页进入到详情页有当前汽车的配置信息,汽车配置信息页中的数据是动态数据,可以使用抓包的方式获取
api
。 - 根据获取的
api
链接发现当前链接中存在查询字符串:specid
- 回到首页,在汽车列表中通过元素发现
li
标签中存在汽车的id
值,获取id
值拼接api
链接地址。 - 构造请求访问构造好的
api
地址获取数据。
注意点:
查看
api
接口返回的数据我们发现当前返回的数据类型并不是json
数据,需要对返回的数据进行处理。处理方式有以下两种:- 拿到返回数据后进行字符串切片,保留
json
数据部分 - 将
api
链接中的callback=configTitle
查询字符串参数删除
- 拿到返回数据后进行字符串切片,保留
汽车之家页面编码格式会随机变换,需要使用
chardet
第三方包实时监测编码格式,并且当页面编码格式为UTF-8-SIG
时specid
数据不存在。shellpip install chardet
代码实现:
"""
分析思路:
1.在首页中获取汽车id
2.将获取到的汽车id拼接到api数据接口中
3.保存数据
"""
import redis
import chardet
import hashlib
import asyncio
import aiohttp
import aiomysql
from lxml import etree
class CarSpider:
redis_client = redis.Redis()
def __init__(self):
self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
self.api_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
self.headers = {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
# 获取汽车id
async def get_car_id(self, page, session, pool):
async with session.get(self.url.format(page), headers=self.headers) as response:
content = await response.read()
encoding = chardet.detect(content)['encoding'] # 汽车之家会检测是否频繁请求, 如果频繁请求则将页面替换成UTF8编码格式并无法获取汽车id
# print(encoding)
if encoding == 'GB2312' or encoding == 'ISO-8859-1':
result = content.decode('gbk')
else:
result = content.decode(encoding)
print('被反爬了...')
tree = etree.HTML(result)
id_list = tree.xpath('//ul[@class="viewlist_ul"]/li/@specid')
if id_list:
print(id_list)
tasks = [asyncio.create_task(self.get_car_info(spec_id, session, pool)) for spec_id in id_list]
await asyncio.wait(tasks)
# 当获取到页面中所有的汽车id之后要进行api连接的拼接并获取api数据
async def get_car_info(self, spec_id, session, pool):
async with session.get(self.api_url.format(spec_id), headers=self.headers) as response:
result = await response.json()
if result['result'].get('paramtypeitems'):
item = dict()
item['name'] = result['result']['paramtypeitems'][0]['paramitems'][0]['value']
item['price'] = result['result']['paramtypeitems'][0]['paramitems'][1]['value']
item['brand'] = result['result']['paramtypeitems'][0]['paramitems'][2]['value']
item['altitude'] = result['result']['paramtypeitems'][1]['paramitems'][2]['value']
item['breadth'] = result['result']['paramtypeitems'][1]['paramitems'][1]['value']
item['length'] = result['result']['paramtypeitems'][1]['paramitems'][0]['value']
await self.save_car_info(item, pool)
else:
print('数据不存在...')
@staticmethod
def get_md5(dict_item):
md5 = hashlib.md5()
md5.update(str(dict_item).encode('utf-8'))
return md5.hexdigest()
async def save_car_info(self, item, pool):
print(item)
# 使用异步上下文管理器创建链接对象以及游标对象
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
val_md5 = self.get_md5(item)
# 保存成功返回1, 保存失败返回0
redis_result = self.redis_client.sadd('car:filter', val_md5)
if redis_result:
sql = """
insert into car_info(
id, name, price, brand, altitude, breadth, length) values (
%s, %s, %s, %s, %s, %s, %s
);
"""
try:
await cursor.execute(sql, (0,
item['name'],
item['price'],
item['brand'],
item['altitude'],
item['breadth'],
item['length']
))
await conn.commit()
print('插入成功...')
except Exception as e:
print('数据插入失败:', e)
await conn.rollback()
else:
print('数据重复...')
# 启动函数
async def main(self):
# 创建数据库连接池并获取游标对象
async with aiomysql.create_pool(user='root', password='root', db='py_spider') as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
# 创建表
create_table_sql = """
create table car_info(
id int primary key auto_increment,
name varchar(100),
price varchar(100),
brand varchar(100),
altitude varchar(100),
breadth varchar(100),
length varchar(100)
);
"""
# 在异步代码中必须先要检查表是否存在, 直接使用if not语句无效
check_table_query = "show tables like 'car_info'"
result = await cursor.execute(check_table_query) # 如果表存在返回1 不存在返回0
if not result:
await cursor.execute(create_table_sql)
# 创建请求对象
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(self.get_car_id(page, session, pool)) for page in range(1, 16)]
await asyncio.wait(tasks)
if __name__ == '__main__':
car = CarSpider()
asyncio.run(car.main())
5.使用多线程完成并发爬虫
在上一小节中我们使用了asyncio
的方式完成了并发爬虫,但是大多数时候最常用的还是基于多线程的方式来完成爬虫需求,所以还是有必要回顾一下之前所学习的多线程知识点。
爬虫需求
根据豆瓣协程爬虫代码改写成多线程模式
import requests
import threading
from lxml import etree
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
def get_movie_info(page):
response = requests.get(url.format(page * 25), headers=headers).text
tree = etree.HTML(response)
result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
print(result)
if __name__ == '__main__':
thread_obj_list = [threading.Thread(target=get_movie_info, args=(page,)) for page in range(10)]
for thread_obj in thread_obj_list:
thread_obj.start()
6.使用线程池完成并发爬虫
还是以当前豆瓣爬虫为例,将上面的代码改写成线程池模式
代码示例
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
def get_movie_info(page):
response = requests.get(url.format(page * 25), headers=headers).text
tree = etree.HTML(response)
result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
print(result)
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5) as pool:
for page_num in range(10):
pool.submit(get_movie_info, page_num)
7.使用多进程完成并发爬虫
因为在Python
中存在GIL
锁,无法充分利用多核优势。所以为了能够提高程序运行效率我们也会采用进程的方式来完成代码需求。
进程代码回顾
from multiprocessing import Process
# 创建进程对象
p = Process(target=func, args=(,))
# 设置守护进程
p.daemon = True
# 启动进程
p.start()
进程中的队列
多进程中使用普通的队列模块会发生阻塞,对应的需要使用multiprocessing
提供的JoinableQueue
模块,其使用过程和在线程中使用的queue
方法相同。
接下来我们通过腾讯招聘代码案例学习如何在进程中使用JoinableQueue
队列模块。
页面地址:https://careers.tencent.com/search.html?keyword=python
代码示例
import time
import pymongo
import requests
import jsonpath
from multiprocessing import Process, JoinableQueue as Queue
url = 'https://careers.tencent.com/tencentcareer/api/post/Query'
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
def get_work_info_json(page_mum, queue):
params = {
'timestamp': 1696774900608,
'countryId': '',
'cityId': '',
'bgIds': '',
'productId': '',
'categoryId': '',
'parentCategoryId': '',
'attrId': '',
'keyword': 'python',
'pageIndex': page_num,
'pageSize': 10,
'language': 'zh-cn',
'area': 'cn'
}
response = requests.get(url, headers=headers, params=params).json()
for info in response['Data']['Posts']:
work_info_dict = dict()
work_info_dict['recruit_post_name'] = jsonpath.jsonpath(info, '$..RecruitPostName')[0]
work_info_dict['country_name'] = jsonpath.jsonpath(info, '$..CountryName')[0]
work_info_dict['location_name'] = jsonpath.jsonpath(info, '$..LocationName')[0]
work_info_dict['category_name'] = jsonpath.jsonpath(info, '$..CategoryName')[0]
work_info_dict['responsibility'] = jsonpath.jsonpath(info, '$..Responsibility')[0]
work_info_dict['last_update_time'] = jsonpath.jsonpath(info, '$..LastUpdateTime')[0]
# print(work_info_dict)
queue.put(work_info_dict)
def save_work_info(queue):
mongo_client = pymongo.MongoClient()
collection = mongo_client['py_spider']['tx_work']
while True:
dict_data = queue.get()
print(dict_data)
collection.insert_one(dict_data)
# 计数器减1, 为0解堵塞
queue.task_done()
if __name__ == '__main__':
dict_data_queue = Queue()
# 创建进程对象列表
process_list = list()
for page in range(1, 50):
p_get_info = Process(target=get_work_info_json, args=(page, dict_data_queue))
process_list.append(p_get_info)
p_save_work = Process(target=save_work_info, args=(dict_data_queue,))
process_list.append(p_save_work)
for process_obj in process_list:
process_obj.daemon = True
process_obj.start()
time.sleep(2) # 让操作系统有足够的时间来启动进程
dict_data_queue.join()
print('爬虫任务完成...')
8. 爬虫实战
使用多线程抓取爱奇艺视频信息
目标地址:https://list.iqiyi.com/www/2/15-------------11-1-1-iqiyi--.html?s_source=PCW_SC
import pymongo
import requests
import threading
from queue import Queue
class AiQiYi:
def __init__(self):
self.mongo_client = pymongo.MongoClient(host='localhost', port=27017)
self.collection = self.mongo_client['py_spider']['Thread_AiQiYi']
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
'Referer': 'https://list.iqiyi.com/www/2/15-------------11-1-1-iqiyi--.html?s_source=PCW_SC'
}
self.api_url = 'https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=85dd981b69cead4b60f6d980438a5664&three_category_id=15;must'
# 创建队列
self.url_queue = Queue() # 请求地址队列
self.json_queue = Queue() # json数据队列
self.content_dict_queue = Queue() # 内容字典队列
def get_url(self):
for page in range(1, 6):
self.url_queue.put(self.api_url.format(page)) # 将请求地址上传到url队列
def get_api_json(self):
while True:
url = self.url_queue.get()
response = requests.get(url, headers=self.headers)
self.json_queue.put(response.json()) # 将获取的json数据上传到json队列
self.url_queue.task_done() # 让url队列获取一条数据后队列内部计数器减1
def parse_movie_info(self):
while True:
json_data = self.json_queue.get()
category_movies = json_data['data']['list']
for movie in category_movies:
item = dict()
item['title'] = movie['title']
item['playUrl'] = movie['playUrl']
item['description'] = movie['description']
self.content_dict_queue.put(item) # 将内容上传到内容字典队列
self.json_queue.task_done() # 循环上传字典数据完成后则json队列计数器减1
def save_movie_info(self):
while True:
item = self.content_dict_queue.get()
self.collection.insert_one(item)
print('插入成功:', item)
self.content_dict_queue.task_done() # 获取一条内容让内容字典队列计数器减1
def main(self):
# 初始化线程对象列表
thread_obj_list = list()
# 创建获取url地址的线程对象并加入到线程对象列表中
t_url = threading.Thread(target=self.get_url)
thread_obj_list.append(t_url)
# 创建发送请求的线程对象并加入到线程对象列表中
for _ in range(3):
t_get_json = threading.Thread(target=self.get_api_json)
thread_obj_list.append(t_get_json)
# 创建数据提取的线程对象并加入到线程对象列表中
t_parse_info = threading.Thread(target=self.parse_movie_info)
thread_obj_list.append(t_parse_info)
# 创建保存数据的线程对象并加入到线程对象列表中
t_save_info = threading.Thread(target=self.save_movie_info)
thread_obj_list.append(t_save_info)
# 循环线程列表设置线程对象为守护线程并启动
for t_obj in thread_obj_list:
t_obj.daemon = True
t_obj.start()
# 判断所有队列中的计数器是否为零, 如果为零则退出程序, 否则让主线程堵塞
for q in [self.url_queue, self.json_queue, self.content_dict_queue]:
q.join()
print('主线程结束...')
if __name__ == '__main__':
aqy = AiQiYi()
aqy.main()
注意点:
put
会让队列的计数+1
,但是单纯的使用get
不会让其-1
,需要和task_done
同时使用才能够-1
task_done
不能放在另一个队列的put
之前,否则可能会出现数据没有处理完成,程序结束的情况
使用线程池完成百度招聘信息
目标地址:https://talent.baidu.com/jobs/social-list?search=python
注意点:当前网站
api
请求方式为POST
import time
import pymysql
import requests
from concurrent.futures import ThreadPoolExecutor
class BaiDuWork:
def __init__(self):
self.db = pymysql.connect(host="localhost", user="root", password="root", db="py_spider")
self.cursor = self.db.cursor()
self.api_url = 'https://talent.baidu.com/httservice/getPostListNew'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
'Cookie': 'BIDUPSID=79ED59B3DF405E7BE0B2F089BF5636C0; PSTM=1697716565; BAIDUID=79ED59B3DF405E7B87EFE83B3F670F21:FG=1; BAIDUID_BFESS=79ED59B3DF405E7B87EFE83B3F670F21:FG=1; ZFY=E8UL64u1CWxtvnkjGKUmcr39lCirPsWnNcY4Ojzc6Ts:C; H_WISE_SIDS=234020_110085_264353_268593_269904_271171_270102_275171_276572_276589_277162_277356_277636_277639_275732_259642_278054_278390_278574_274779_278791_278388_279020_279039_279610_279711_279998_280266_280304_280373_278414_276438_280619_279201_277759_280809_280902_280557_280873_280636_280926_281043_281153_277970_281148; H_WISE_SIDS_BFESS=234020_110085_264353_268593_269904_271171_270102_275171_276572_276589_277162_277356_277636_277639_275732_259642_278054_278390_278574_274779_278791_278388_279020_279039_279610_279711_279998_280266_280304_280373_278414_276438_280619_279201_277759_280809_280902_280557_280873_280636_280926_281043_281153_277970_281148; Hm_lvt_50e85ccdd6c1e538eb1290bc92327926=1699171013; Hm_lpvt_50e85ccdd6c1e538eb1290bc92327926=1699173864; RT="z=1&dm=baidu.com&si=439a22e1-0524-47fc-94cc-717583dbaefa&ss=lol6js62&sl=0&tt=0&bcn=https%3A%2F%2Ffclog.baidu.com%2Flog%2Fweirwood%3Ftype%3Dperf"',
'Referer': 'https://talent.baidu.com/jobs/social-list?search=python'
}
def get_work_info(self, page):
post_data = {
'recruitType': 'SOCIAL',
'pageSize': 10,
'keyWord': '',
'curPage': page,
'projectType': '',
}
response = requests.post(url=self.api_url, headers=self.headers, data=post_data)
return response.json()
def parse_work_info(self, response):
works = response['data']['list']
for work_info in works:
education = work_info['education'] if work_info['education'] else '空'
name = work_info['name']
service_condition = work_info['serviceCondition']
self.save_work_info(education, name, service_condition)
def create_table(self):
sql = """
create table if not exists baiduWork_threadPool(
id int primary key auto_increment,
education varchar(200),
name varchar(100),
service_condition text
);
"""
try:
self.cursor.execute(sql)
print('表创建成功...')
except Exception as e:
print('表创建失败: ', e)
def save_work_info(self, education, name, service_condition):
sql = """
insert into baiduWork_threadPool(id, education, name, service_condition) values (
%s, %s, %s, %s
)
"""
try:
self.cursor.execute(sql, (0, education, name, service_condition))
self.db.commit()
print('数据保存成功...')
except Exception as e:
print('数据保存失败: ', e)
self.db.rollback()
def main(self):
self.create_table()
with ThreadPoolExecutor(max_workers=5) as pool:
for page in range(1, 6):
response = pool.submit(self.get_work_info, page)
self.parse_work_info(response.result())
def __del__(self):
self.cursor.close()
self.db.close()
if __name__ == '__main__':
baidu_work = BaiDuWork()
baidu_work.main()
使用多进程抓取芒果视频信息
import time
import redis
import pymongo
import hashlib
import requests
from multiprocessing import Process, JoinableQueue as Queue
class MovieInfo:
mongo_client = pymongo.MongoClient()
collection = mongo_client['py_spider']['mg_movie']
redis_client = redis.Redis()
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36'
}
self.url = 'https://pianku.api.mgtv.com/rider/list/pcweb/v3'
self.params_queue = Queue()
self.json_queue = Queue()
self.content_queue = Queue()
def get_params(self):
for page in range(1, 3):
params = {
"allowedRC": "1",
"platform": "pcweb",
"channelId": "2",
"pn": page,
"pc": "80",
"hudong": "1",
"_support": "10000000",
"kind": "19",
"area": "10",
"year": "all",
"chargeInfo": "a1",
"sort": "c2"
}
self.params_queue.put(params)
# 请求数据
def get_movie_info(self):
while True:
params = self.params_queue.get()
response = requests.get(self.url, headers=self.headers, params=params).json()
self.json_queue.put(response)
self.params_queue.task_done()
# 数据清洗以及数据结构调整
def parse_data(self):
while True:
response = self.json_queue.get()
movie_list = response['data']['hitDocs']
for movie in movie_list:
item = dict()
item['title'] = movie['title']
item['subtitle'] = movie['subtitle']
item['story'] = movie['story']
self.content_queue.put(item)
self.json_queue.task_done()
@staticmethod
def get_md5(value):
md5_hash = hashlib.md5(str(value).encode('utf-8')).hexdigest()
return md5_hash
def save_data(self):
while True:
item = self.content_queue.get()
value = self.get_md5(item)
result = self.redis_client.sadd('movie:filter', value)
if result:
self.collection.insert_one(item)
print(item)
print('保存成功...')
else:
print('数据重复...')
self.content_queue.task_done()
def main(self):
# 创建进程对象列表
process_obj_list = list()
# 创建一个进程对象用于构造请求地址并添加到进程对象列表中
p_params = Process(target=self.get_params)
process_obj_list.append(p_params)
# 创建五个进程对象用于发送请求并添加到进程对象列表中
for _ in range(5):
p_get_movie = Process(target=self.get_movie_info)
process_obj_list.append(p_get_movie)
# 创建一个进程对象用于数据清洗并添加到进程对象列表中
p_parse = Process(target=self.parse_data)
process_obj_list.append(p_parse)
# 创建一个进程对象用户数据保存并添加到进程对象列表中
p_save = Process(target=self.save_data)
process_obj_list.append(p_save)
for p in process_obj_list:
p.daemon = True
p.start()
time.sleep(0.2)
for q in [self.params_queue, self.json_queue, self.content_queue]:
q.join()
print('主进程结束...')
if __name__ == '__main__':
movie_info = MovieInfo()
movie_info.main()
注意点:
- 数据库连接对象不能在
__init__
方法中执行,会出现序列化失败的问题,需要将连接方法放置在类属性中。
使用协程完成王者荣耀英雄图片下载
采集王者荣耀官网中所有英雄的图片信息
import os
import aiofile
import aiohttp
import asyncio
class HeroSkin:
def __init__(self):
self.json_url = 'https://pvp.qq.com/web201605/js/herolist.json'
self.skin_url = 'https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg'
self.headers = {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
async def get_image_content(self, session, e_name, c_name):
# 因为不确定每个英雄具体的皮肤数量, 所以设置一个超出英雄皮肤数量的最大值
# 根据链接状态码判断是否请求成功, 如果请求失败则跳过请求并获取下一个英雄的皮肤图片
for skin_id in range(1, 30):
async with session.get(self.skin_url.format(e_name, e_name, skin_id), headers=self.headers) as response:
if response.status == 200:
content = await response.read()
async with aiofile.async_open('./images/' + c_name + '-' + str(skin_id) + '.jpg', 'wb') as f:
await f.write(content)
print('保存成功...')
else:
break
async def main(self):
tasks = list()
async with aiohttp.ClientSession() as session:
async with session.get(self.json_url, headers=self.headers) as response:
result = await response.json(content_type=None)
for item in result:
e_name = item['ename']
c_name = item['cname']
# print(e_name, c_name)
coro_obj = self.get_image_content(session, e_name, c_name)
tasks.append(asyncio.create_task(coro_obj))
await asyncio.wait(tasks)
if __name__ == '__main__':
if not os.path.exists('./images'):
os.mkdir('./images')
hero_skin = HeroSkin()
asyncio.run(hero_skin.main())