Skip to content

1.本章介绍

Python由于GIL(全局解释器锁)的存在,不能发挥多线程多核的优势,其性能一直饱受诟病。然而在IO密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了python性能方面的短板。

  • Python3.4版本引入asyncio是到标准库

  • Python3.5又加入了async/await特性

接下来,我们会利用之前学习的协程来完成爬虫并发任务。

2.asyncio结合requests完成爬虫任务

python
import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup

url = 'https://movie.douban.com/top250?start={}&filter='

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}

loop = asyncio.get_event_loop()


async def get_movie_info(page):
    # run_in_executor不支持关键字参数传递headers, 使用偏函数传递
    response = await loop.run_in_executor(None, partial(requests.get, url.format(page * 25), headers=headers))
    # print(response)
    # print(response.request.headers)
    soup = BeautifulSoup(response.text, 'lxml')
    div_list = soup.find_all('div', class_='hd')
    for title in div_list:
        print(title.get_text())


if __name__ == '__main__':
    # 异步随机调度导致输出的标题不是顺序输出
    tasks = [loop.create_task(get_movie_info(page)) for page in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))

3.使用aiohttp完成爬虫任务

由于requests爬虫库本身不支持异步,在asyncio中需要开启线程池才能使用。在使用上稍微有些麻烦,为了解决这个问题,我们使用支持异步操作的aiohttp来完成爬虫任务。

介绍与安装

介绍

aiohttp是一个异步的网络库,可以实现HTTP客户端,也可以实现HTTP服务器,爬虫阶段我们只用它来实现HTTP客户端功能。

官网:https://docs.aiohttp.org/en/stable/

aiohttp客户端相关的官方文档:https://docs.aiohttp.org/en/stable/client.html#aiohttp-client

安装

python
pip install aiohttp -i https://pypi.tuna.tsinghua.edu.cn/simple
基本使用

示例代码

python
import asyncio
from aiohttp import ClientSession

url = "https://www.baidu.com"

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


async def get_baidu():
    async with ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            response = await response.text()
            print(response)


if __name__ == '__main__':
    asyncio.run(get_baidu())
并发操作

示例代码

python
import asyncio
import aiohttp


def download_completed_callback(task_obj):
    print("下载的内容为:", task_obj.result())


async def baidu_spider():
    print("---百度蜘蛛---")
    url = "https://www.baidu.com"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as r:
            return await r.text()


async def sogou_spider():
    print("---搜狗蜘蛛---")
    url = "https://www.sogou.com"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as r:
            return await r.text()


async def jingdong_spider():
    print("---京东蜘蛛---")
    url = "https://www.jd.com"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as r:
            return await r.text()


async def main():
    # 创建多个Task,且添加回调函数
    task_baidu = asyncio.create_task(baidu_spider())
    task_baidu.add_done_callback(download_completed_callback)

    task_sogou = asyncio.create_task(sogou_spider())
    task_sogou.add_done_callback(download_completed_callback)

    task_jingdong = asyncio.create_task(jingdong_spider())
    task_jingdong.add_done_callback(download_completed_callback)

    tasks = [task_baidu, task_sogou, task_jingdong]
    # 等待下载
    await asyncio.wait(tasks)


if __name__ == '__main__':
    asyncio.run(main())
练习:使用aiohttp抓取豆瓣电影标题

示例代码

python
import asyncio
import aiohttp
from bs4 import BeautifulSoup

url = 'https://movie.douban.com/top250?start={}&filter='

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


async def get_movie_info(page):
    async with aiohttp.ClientSession() as session:
        async with session.get(url.format(page * 25), headers=headers) as response:
            soup = BeautifulSoup(await response.text(), 'lxml')
            div_list = soup.find_all('div', class_='hd')
            for title in div_list:
                print(title.get_text())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(get_movie_info(page)) for page in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))

4.aiomysql的使用

安装
shell
pip install aiomysql

利用python3中新加入的异步关键词async/await, 我们使用各种异步操作为来执行各种异步的操作,如使用aiohttp来代替requests来执行异步的网络请求操作,使用motor来代替同步的pymongo库来操作mongo数据库,我们在开发同步的python程序时,我们会使用PyMySQL来操作mysql数据库,同样,我们会使用aiomysql来异步操作mysql数据库。

使用方式
python
import asyncio
import aiomysql

loop = asyncio.get_event_loop()


async def test_example():
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                  user='root', password='root', db='py_spider',
                                  loop=loop)

    cursor = await conn.cursor()
    await cursor.execute("SELECT * from ali_work")

    # 打印输出当前表中的字段信息
    print(cursor.description)
    result = await cursor.fetchall()
    print(result)
    await cursor.close()
    conn.close()


loop.run_until_complete(test_example())
通过异步爬虫完成数据存储

使用asyncio完成汽车之家的汽车参数信息并保存到mysql数据库中

网址:https://www.che168.com/china/a0_0msdgscncgpi1ltocsp7exf4x0/?pvareaid=102179#currengpostion

思路分析:

  1. 当前页面数据为静态数据,在翻页时url中的sp1会变更为sp2,所以当前页面可以使用xpath提取数据。
  2. 通过首页进入到详情页有当前汽车的配置信息,汽车配置信息页中的数据是动态数据,可以使用抓包的方式获取api
  3. 根据获取的api链接发现当前链接中存在查询字符串:specid
  4. 回到首页,在汽车列表中通过元素发现li标签中存在汽车的id值,获取id值拼接api链接地址。
  5. 构造请求访问构造好的api地址获取数据。

注意点:

  • 查看api接口返回的数据我们发现当前返回的数据类型并不是json数据,需要对返回的数据进行处理。处理方式有以下两种:

    • 拿到返回数据后进行字符串切片,保留json数据部分
    • api链接中的callback=configTitle查询字符串参数删除
  • 汽车之家页面编码格式会随机变换,需要使用chardet第三方包实时监测编码格式,并且当页面编码格式为UTF-8-SIGspecid数据不存在。

    shell
    pip install chardet

代码实现:

python
"""
分析思路:
    1.在首页中获取汽车id
    2.将获取到的汽车id拼接到api数据接口中
    3.保存数据
"""
import redis
import chardet
import hashlib
import asyncio
import aiohttp
import aiomysql
from lxml import etree


class CarSpider:
    redis_client = redis.Redis()

    def __init__(self):
        self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
        self.api_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
        self.headers = {
            'User-Agent':
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
        }

    # 获取汽车id
    async def get_car_id(self, page, session, pool):
        async with session.get(self.url.format(page), headers=self.headers) as response:
            content = await response.read()
            encoding = chardet.detect(content)['encoding']  # 汽车之家会检测是否频繁请求, 如果频繁请求则将页面替换成UTF8编码格式并无法获取汽车id
            # print(encoding)
            if encoding == 'GB2312' or encoding == 'ISO-8859-1':
                result = content.decode('gbk')
            else:
                result = content.decode(encoding)
                print('被反爬了...')

            tree = etree.HTML(result)
            id_list = tree.xpath('//ul[@class="viewlist_ul"]/li/@specid')
            if id_list:
                print(id_list)
                tasks = [asyncio.create_task(self.get_car_info(spec_id, session, pool)) for spec_id in id_list]
                await asyncio.wait(tasks)

    # 当获取到页面中所有的汽车id之后要进行api连接的拼接并获取api数据
    async def get_car_info(self, spec_id, session, pool):
        async with session.get(self.api_url.format(spec_id), headers=self.headers) as response:
            result = await response.json()
            if result['result'].get('paramtypeitems'):
                item = dict()
                item['name'] = result['result']['paramtypeitems'][0]['paramitems'][0]['value']
                item['price'] = result['result']['paramtypeitems'][0]['paramitems'][1]['value']
                item['brand'] = result['result']['paramtypeitems'][0]['paramitems'][2]['value']
                item['altitude'] = result['result']['paramtypeitems'][1]['paramitems'][2]['value']
                item['breadth'] = result['result']['paramtypeitems'][1]['paramitems'][1]['value']
                item['length'] = result['result']['paramtypeitems'][1]['paramitems'][0]['value']
                await self.save_car_info(item, pool)
            else:
                print('数据不存在...')

    @staticmethod
    def get_md5(dict_item):
        md5 = hashlib.md5()
        md5.update(str(dict_item).encode('utf-8'))
        return md5.hexdigest()

    async def save_car_info(self, item, pool):
        print(item)
        # 使用异步上下文管理器创建链接对象以及游标对象
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                val_md5 = self.get_md5(item)
                # 保存成功返回1, 保存失败返回0
                redis_result = self.redis_client.sadd('car:filter', val_md5)
                if redis_result:
                    sql = """
                        insert into car_info(
                            id, name, price, brand, altitude, breadth, length) values (
                                %s, %s, %s, %s, %s, %s, %s
                            );
                    """
                    try:
                        await cursor.execute(sql, (0,
                                                   item['name'],
                                                   item['price'],
                                                   item['brand'],
                                                   item['altitude'],
                                                   item['breadth'],
                                                   item['length']
                                                   ))
                        await conn.commit()
                        print('插入成功...')
                    except Exception as e:
                        print('数据插入失败:', e)
                        await conn.rollback()
                else:
                    print('数据重复...')

    # 启动函数
    async def main(self):
        # 创建数据库连接池并获取游标对象
        async with aiomysql.create_pool(user='root', password='root', db='py_spider') as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cursor:
                    # 创建表
                    create_table_sql = """
                        create table car_info(
                            id int primary key auto_increment,
                            name varchar(100),
                            price varchar(100),
                            brand varchar(100),
                            altitude varchar(100),
                            breadth varchar(100),
                            length varchar(100)
                        );
                    """

                    # 在异步代码中必须先要检查表是否存在, 直接使用if not语句无效
                    check_table_query = "show tables like 'car_info'"
                    result = await cursor.execute(check_table_query)  # 如果表存在返回1 不存在返回0
                    if not result:
                        await cursor.execute(create_table_sql)

            # 创建请求对象
            async with aiohttp.ClientSession() as session:
                tasks = [asyncio.create_task(self.get_car_id(page, session, pool)) for page in range(1, 16)]
                await asyncio.wait(tasks)


if __name__ == '__main__':
    car = CarSpider()
    asyncio.run(car.main())

5.使用多线程完成并发爬虫

在上一小节中我们使用了asyncio的方式完成了并发爬虫,但是大多数时候最常用的还是基于多线程的方式来完成爬虫需求,所以还是有必要回顾一下之前所学习的多线程知识点。

爬虫需求

根据豆瓣协程爬虫代码改写成多线程模式

python
import requests
import threading
from lxml import etree

url = 'https://movie.douban.com/top250?start={}&filter='

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


def get_movie_info(page):
    response = requests.get(url.format(page * 25), headers=headers).text
    tree = etree.HTML(response)
    result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
    print(result)


if __name__ == '__main__':
    thread_obj_list = [threading.Thread(target=get_movie_info, args=(page,)) for page in range(10)]
    for thread_obj in thread_obj_list:
        thread_obj.start()

6.使用线程池完成并发爬虫

还是以当前豆瓣爬虫为例,将上面的代码改写成线程池模式

代码示例

python
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor

url = 'https://movie.douban.com/top250?start={}&filter='

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


def get_movie_info(page):
    response = requests.get(url.format(page * 25), headers=headers).text
    tree = etree.HTML(response)
    result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
    print(result)


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5) as pool:
        for page_num in range(10):
            pool.submit(get_movie_info, page_num)

7.使用多进程完成并发爬虫

因为在Python中存在GIL锁,无法充分利用多核优势。所以为了能够提高程序运行效率我们也会采用进程的方式来完成代码需求。

进程代码回顾
python
from multiprocessing import Process


# 创建进程对象
p = Process(target=func, args=(,))

# 设置守护进程
p.daemon = True

# 启动进程
p.start()
进程中的队列

多进程中使用普通的队列模块会发生阻塞,对应的需要使用multiprocessing提供的JoinableQueue模块,其使用过程和在线程中使用的queue方法相同。

接下来我们通过腾讯招聘代码案例学习如何在进程中使用JoinableQueue队列模块。

页面地址:https://careers.tencent.com/search.html?keyword=python

代码示例

python
import time
import pymongo
import requests
import jsonpath
from multiprocessing import Process, JoinableQueue as Queue

url = 'https://careers.tencent.com/tencentcareer/api/post/Query'

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


def get_work_info_json(page_mum, queue):
    params = {
        'timestamp': 1696774900608,
        'countryId': '',
        'cityId': '',
        'bgIds': '',
        'productId': '',
        'categoryId': '',
        'parentCategoryId': '',
        'attrId': '',
        'keyword': 'python',
        'pageIndex': page_num,
        'pageSize': 10,
        'language': 'zh-cn',
        'area': 'cn'
    }

    response = requests.get(url, headers=headers, params=params).json()
    for info in response['Data']['Posts']:
        work_info_dict = dict()
        work_info_dict['recruit_post_name'] = jsonpath.jsonpath(info, '$..RecruitPostName')[0]
        work_info_dict['country_name'] = jsonpath.jsonpath(info, '$..CountryName')[0]
        work_info_dict['location_name'] = jsonpath.jsonpath(info, '$..LocationName')[0]
        work_info_dict['category_name'] = jsonpath.jsonpath(info, '$..CategoryName')[0]
        work_info_dict['responsibility'] = jsonpath.jsonpath(info, '$..Responsibility')[0]
        work_info_dict['last_update_time'] = jsonpath.jsonpath(info, '$..LastUpdateTime')[0]

        # print(work_info_dict)
        queue.put(work_info_dict)


def save_work_info(queue):
    mongo_client = pymongo.MongoClient()
    collection = mongo_client['py_spider']['tx_work']
    while True:
        dict_data = queue.get()
        print(dict_data)
        collection.insert_one(dict_data)
        # 计数器减1, 为0解堵塞
        queue.task_done()


if __name__ == '__main__':
    dict_data_queue = Queue()
    # 创建进程对象列表
    process_list = list()

    for page in range(1, 50):
        p_get_info = Process(target=get_work_info_json, args=(page, dict_data_queue))
        process_list.append(p_get_info)

    p_save_work = Process(target=save_work_info, args=(dict_data_queue,))

    process_list.append(p_save_work)

    for process_obj in process_list:
        process_obj.daemon = True
        process_obj.start()
    
    time.sleep(2)  # 让操作系统有足够的时间来启动进程

    dict_data_queue.join()
    print('爬虫任务完成...')

8. 爬虫实战

使用多线程抓取爱奇艺视频信息

目标地址:https://list.iqiyi.com/www/2/15-------------11-1-1-iqiyi--.html?s_source=PCW_SC

python
import pymongo
import requests
import threading
from queue import Queue


class AiQiYi:
    def __init__(self):
        self.mongo_client = pymongo.MongoClient(host='localhost', port=27017)
        self.collection = self.mongo_client['py_spider']['Thread_AiQiYi']
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
            'Referer': 'https://list.iqiyi.com/www/2/15-------------11-1-1-iqiyi--.html?s_source=PCW_SC'
        }
        self.api_url = 'https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=85dd981b69cead4b60f6d980438a5664&three_category_id=15;must'

        # 创建队列
        self.url_queue = Queue()  # 请求地址队列
        self.json_queue = Queue()  # json数据队列
        self.content_dict_queue = Queue()  # 内容字典队列

    def get_url(self):
        for page in range(1, 6):
            self.url_queue.put(self.api_url.format(page))  # 将请求地址上传到url队列

    def get_api_json(self):
        while True:
            url = self.url_queue.get()
            response = requests.get(url, headers=self.headers)
            self.json_queue.put(response.json())  # 将获取的json数据上传到json队列
            self.url_queue.task_done()  # 让url队列获取一条数据后队列内部计数器减1

    def parse_movie_info(self):
        while True:
            json_data = self.json_queue.get()
            category_movies = json_data['data']['list']
            for movie in category_movies:
                item = dict()
                item['title'] = movie['title']
                item['playUrl'] = movie['playUrl']
                item['description'] = movie['description']
                self.content_dict_queue.put(item)  # 将内容上传到内容字典队列

            self.json_queue.task_done()  # 循环上传字典数据完成后则json队列计数器减1

    def save_movie_info(self):
        while True:
            item = self.content_dict_queue.get()
            self.collection.insert_one(item)
            print('插入成功:', item)
            self.content_dict_queue.task_done()  # 获取一条内容让内容字典队列计数器减1

    def main(self):
        # 初始化线程对象列表
        thread_obj_list = list()

        # 创建获取url地址的线程对象并加入到线程对象列表中
        t_url = threading.Thread(target=self.get_url)
        thread_obj_list.append(t_url)

        # 创建发送请求的线程对象并加入到线程对象列表中
        for _ in range(3):
            t_get_json = threading.Thread(target=self.get_api_json)
            thread_obj_list.append(t_get_json)

        # 创建数据提取的线程对象并加入到线程对象列表中
        t_parse_info = threading.Thread(target=self.parse_movie_info)
        thread_obj_list.append(t_parse_info)

        # 创建保存数据的线程对象并加入到线程对象列表中
        t_save_info = threading.Thread(target=self.save_movie_info)
        thread_obj_list.append(t_save_info)

        # 循环线程列表设置线程对象为守护线程并启动
        for t_obj in thread_obj_list:
            t_obj.daemon = True
            t_obj.start()

        # 判断所有队列中的计数器是否为零, 如果为零则退出程序, 否则让主线程堵塞
        for q in [self.url_queue, self.json_queue, self.content_dict_queue]:
            q.join()

        print('主线程结束...')


if __name__ == '__main__':
    aqy = AiQiYi()
    aqy.main()

注意点:

  • put会让队列的计数+1,但是单纯的使用get不会让其-1,需要和task_done同时使用才能够-1
  • task_done不能放在另一个队列的put之前,否则可能会出现数据没有处理完成,程序结束的情况
使用线程池完成百度招聘信息

目标地址:https://talent.baidu.com/jobs/social-list?search=python

注意点:当前网站api请求方式为POST

python
import time
import pymysql
import requests
from concurrent.futures import ThreadPoolExecutor


class BaiDuWork:
    def __init__(self):
        self.db = pymysql.connect(host="localhost", user="root", password="root", db="py_spider")
        self.cursor = self.db.cursor()
        self.api_url = 'https://talent.baidu.com/httservice/getPostListNew'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
            'Cookie': 'BIDUPSID=79ED59B3DF405E7BE0B2F089BF5636C0; PSTM=1697716565; BAIDUID=79ED59B3DF405E7B87EFE83B3F670F21:FG=1; BAIDUID_BFESS=79ED59B3DF405E7B87EFE83B3F670F21:FG=1; ZFY=E8UL64u1CWxtvnkjGKUmcr39lCirPsWnNcY4Ojzc6Ts:C; H_WISE_SIDS=234020_110085_264353_268593_269904_271171_270102_275171_276572_276589_277162_277356_277636_277639_275732_259642_278054_278390_278574_274779_278791_278388_279020_279039_279610_279711_279998_280266_280304_280373_278414_276438_280619_279201_277759_280809_280902_280557_280873_280636_280926_281043_281153_277970_281148; H_WISE_SIDS_BFESS=234020_110085_264353_268593_269904_271171_270102_275171_276572_276589_277162_277356_277636_277639_275732_259642_278054_278390_278574_274779_278791_278388_279020_279039_279610_279711_279998_280266_280304_280373_278414_276438_280619_279201_277759_280809_280902_280557_280873_280636_280926_281043_281153_277970_281148; Hm_lvt_50e85ccdd6c1e538eb1290bc92327926=1699171013; Hm_lpvt_50e85ccdd6c1e538eb1290bc92327926=1699173864; RT="z=1&dm=baidu.com&si=439a22e1-0524-47fc-94cc-717583dbaefa&ss=lol6js62&sl=0&tt=0&bcn=https%3A%2F%2Ffclog.baidu.com%2Flog%2Fweirwood%3Ftype%3Dperf"',
            'Referer': 'https://talent.baidu.com/jobs/social-list?search=python'
        }

    def get_work_info(self, page):
        post_data = {
            'recruitType': 'SOCIAL',
            'pageSize': 10,
            'keyWord': '',
            'curPage': page,
            'projectType': '',
        }
        response = requests.post(url=self.api_url, headers=self.headers, data=post_data)
        return response.json()

    def parse_work_info(self, response):
        works = response['data']['list']
        for work_info in works:
            education = work_info['education'] if work_info['education'] else '空'
            name = work_info['name']
            service_condition = work_info['serviceCondition']
            self.save_work_info(education, name, service_condition)

    def create_table(self):
        sql = """
            create table if not exists baiduWork_threadPool(
                id int primary key auto_increment,
                education varchar(200),
                name varchar(100),
                service_condition text
            );
        """

        try:
            self.cursor.execute(sql)
            print('表创建成功...')
        except Exception as e:
            print('表创建失败: ', e)

    def save_work_info(self, education, name, service_condition):
        sql = """
            insert into baiduWork_threadPool(id, education, name, service_condition) values (
                %s, %s, %s, %s
            )
        """

        try:
            self.cursor.execute(sql, (0, education, name, service_condition))
            self.db.commit()
            print('数据保存成功...')
        except Exception as e:
            print('数据保存失败: ', e)
            self.db.rollback()

    def main(self):
        self.create_table()
        with ThreadPoolExecutor(max_workers=5) as pool:
            for page in range(1, 6):
                response = pool.submit(self.get_work_info, page)
                self.parse_work_info(response.result())

    def __del__(self):
        self.cursor.close()
        self.db.close()


if __name__ == '__main__':
    baidu_work = BaiDuWork()
    baidu_work.main()
使用多进程抓取芒果视频信息
python
import time
import redis
import pymongo
import hashlib
import requests
from multiprocessing import Process, JoinableQueue as Queue


class MovieInfo:
    mongo_client = pymongo.MongoClient()
    collection = mongo_client['py_spider']['mg_movie']
    redis_client = redis.Redis()

    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36'
        }

        self.url = 'https://pianku.api.mgtv.com/rider/list/pcweb/v3'

        self.params_queue = Queue()
        self.json_queue = Queue()
        self.content_queue = Queue()

    def get_params(self):
        for page in range(1, 3):
            params = {
                "allowedRC": "1",
                "platform": "pcweb",
                "channelId": "2",
                "pn": page,
                "pc": "80",
                "hudong": "1",
                "_support": "10000000",
                "kind": "19",
                "area": "10",
                "year": "all",
                "chargeInfo": "a1",
                "sort": "c2"
            }
            self.params_queue.put(params)

    # 请求数据
    def get_movie_info(self):
        while True:
            params = self.params_queue.get()
            response = requests.get(self.url, headers=self.headers, params=params).json()
            self.json_queue.put(response)
            self.params_queue.task_done()

    # 数据清洗以及数据结构调整
    def parse_data(self):
        while True:
            response = self.json_queue.get()
            movie_list = response['data']['hitDocs']
            for movie in movie_list:
                item = dict()
                item['title'] = movie['title']
                item['subtitle'] = movie['subtitle']
                item['story'] = movie['story']
                self.content_queue.put(item)
            self.json_queue.task_done()

    @staticmethod
    def get_md5(value):
        md5_hash = hashlib.md5(str(value).encode('utf-8')).hexdigest()
        return md5_hash

    def save_data(self):
        while True:
            item = self.content_queue.get()
            value = self.get_md5(item)
            result = self.redis_client.sadd('movie:filter', value)
            if result:
                self.collection.insert_one(item)
                print(item)
                print('保存成功...')
            else:
                print('数据重复...')
            self.content_queue.task_done()

    def main(self):
        # 创建进程对象列表
        process_obj_list = list()

        # 创建一个进程对象用于构造请求地址并添加到进程对象列表中
        p_params = Process(target=self.get_params)
        process_obj_list.append(p_params)

        # 创建五个进程对象用于发送请求并添加到进程对象列表中
        for _ in range(5):
            p_get_movie = Process(target=self.get_movie_info)
            process_obj_list.append(p_get_movie)

        # 创建一个进程对象用于数据清洗并添加到进程对象列表中
        p_parse = Process(target=self.parse_data)
        process_obj_list.append(p_parse)

        # 创建一个进程对象用户数据保存并添加到进程对象列表中
        p_save = Process(target=self.save_data)
        process_obj_list.append(p_save)

        for p in process_obj_list:
            p.daemon = True
            p.start()
            time.sleep(0.2)

        for q in [self.params_queue, self.json_queue, self.content_queue]:
            q.join()

        print('主进程结束...')


if __name__ == '__main__':
    movie_info = MovieInfo()
    movie_info.main()

注意点:

  • 数据库连接对象不能在__init__方法中执行,会出现序列化失败的问题,需要将连接方法放置在类属性中。
使用协程完成王者荣耀英雄图片下载

采集王者荣耀官网中所有英雄的图片信息

地址:https://pvp.qq.com/web201605/herolist.shtml

python
import os
import aiofile
import aiohttp
import asyncio


class HeroSkin:
    def __init__(self):
        self.json_url = 'https://pvp.qq.com/web201605/js/herolist.json'
        self.skin_url = 'https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg'
        self.headers = {
            'User-Agent':
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
        }

    async def get_image_content(self, session, e_name, c_name):
        # 因为不确定每个英雄具体的皮肤数量, 所以设置一个超出英雄皮肤数量的最大值
        # 根据链接状态码判断是否请求成功, 如果请求失败则跳过请求并获取下一个英雄的皮肤图片
        for skin_id in range(1, 30):
            async with session.get(self.skin_url.format(e_name, e_name, skin_id), headers=self.headers) as response:
                if response.status == 200:
                    content = await response.read()
                    async with aiofile.async_open('./images/' + c_name + '-' + str(skin_id) + '.jpg', 'wb') as f:
                        await f.write(content)
                        print('保存成功...')
                else:
                    break

    async def main(self):
        tasks = list()
        async with aiohttp.ClientSession() as session:
            async with session.get(self.json_url, headers=self.headers) as response:
                result = await response.json(content_type=None)
                for item in result:
                    e_name = item['ename']
                    c_name = item['cname']
                    # print(e_name, c_name)
                    coro_obj = self.get_image_content(session, e_name, c_name)
                    tasks.append(asyncio.create_task(coro_obj))
                await asyncio.wait(tasks)


if __name__ == '__main__':
    if not os.path.exists('./images'):
        os.mkdir('./images')

    hero_skin = HeroSkin()
    asyncio.run(hero_skin.main())

Sube's Study Notes.