线程
线程概念
我们在日常开发中经常会听到使用多线程/多进程的方式完成并发任务。那么什么是进程?什么是线程?进程与线程之间有什么关系?接下来我们通过日常场景简单的了解一下进程与线程。
一个工厂,至少有一个车间,一个车间中最少有一个工人,最终是工人在工作。
一个程序,至少有一个进程,一个进程中最少有一个线程,最终是线程在工作。
在一个车间中有最基本的工作工具,人通过操作工具的方式来完成工作。
一个进程中有最基本的运行代码的资源(内存、cpu...),线程通过进程中提供的资源来运行代码。
通过上述描述,我们来总结一下线程与进程的关系:
- 线程是计算机可以被
cpu
调度的最小单元 - 进程是计算机分配资源的的最小单元,进程可以为线程提供运行资源。
一个进程中可以有多个线程,同一个进程中的线程可以共享当前进程中的资源。
使用线程完成并发任务
我们之前编写的代码都被称为同步代码,排队依次运行。如果前一个任务没有完成,那么之后的任务也无法继续。
import time
def work_1():
print('任务1...')
time.sleep(2)
def work_2():
print('任务2...')
time.sleep(2)
work_1()
work_2()
使用线程的方式优化代码
import time
import threading
def work_1():
print('任务1...')
time.sleep(2)
def work_2():
print('任务2...')
time.sleep(2)
# 通过Thread方法创建线程对象,并使用target指定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)
# 运行线程
t1.start()
t2.start()
实现一个简单的单线程同步爬虫
import requests
def get_image(url):
response = requests.get(url).content
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
for url in url_list:
get_image(url)
对上述单线程同步爬虫进行优化
import requests
import threading
def get_image(url):
response = requests.get(url).content
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
for url in url_list:
t = threading.Thread(target=get_image, args=(url,))
t.start()
除了上述使用线程的方式提升运行速度之外,我们还可以使用多进程的方式完成并发任务。
import requests
import multiprocessing
def get_image(url):
response = requests.get(url).content
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
if __name__ == "__main__":
for url in url_list:
p = multiprocessing.Process(target=get_image, args=(url,))
p.start()
GIL锁
在刚刚的一些案例中我们发现无论使用多线程还是多进程的方式都可以实现并发任务,那么在什么业务场景下去使用多线程/多进程?在学习本小结内容之前,先给出结论:
- 如果任务是属于IO密集型任务那么优先使用线程方式
- 如果任务是属于计算密集型任务那么优先使用进程方式
在Python中存在全局解释器锁,简称为GIL。GIL是cpython
解释器独有的。主要的功能是:让一个进程在同一时刻只能有一个线程被执行。
例如在一个进程中创建了多个线程,在运行当前程序时在同一时刻只有一个线程被执行,其他线程等待cpu
调度。这种情况无法利用多核cpu
的优势,如果想要绕开GIL,那么可以使用多进程的方式,创建多个进程,每个进程中只有一个线程。但是请注意,创建多进程消耗的资源比多线程消耗的资源大。
线程方法 - 重点
在正式学习线程方法之前,我们首先回顾一下刚刚学习到的内容。
# threading_test.py
import time
import threading
def work():
time.sleep(2)
print('子线程任务...')
# 当前只是创建了线程对象,并不会执行线程
t = threading.Thread(target=work)
# 通过start运行线程
t.start()
print('主线程打印信息...')
'''
在上述代码中,我们要明确:
1.一个py文件被解释器执行时会在操作系统中创建进程。
2.在进程中创建线程来执行当前文件中的代码,我们把最初创建的线程称之为主线程。
3.当主线程执行到Thread代码时会创建一个新的线程,我们一般称之为子线程。
4.当前代码中的主线程与子线程交替执行。
5.子线程被执行时主线程不会等待,继续往下执行到没有代码时等待子线程执行完毕再退出。
'''
thread_obj.start()
当前线程准备就绪,等待
cpu
调度,具体调度时间由cpu
决定。
import threading
num = 0
def add():
global num
for i in range(100000):
num += i
t = threading.Thread(target=add)
t.start()
print(num)
thread_obj.join()
等待子线程任务执行完毕后再继续向下执行
import threading
num = 0
def add():
global num
for i in range(100000):
num += i
t = threading.Thread(target=add)
t.start()
t.join() # 主线程等待子线程任务结束时解堵塞
print(num)
import threading
num = 0
def add_():
global num
for i in range(100000):
num += i
def sub_():
global num
for i in range(100000):
num -= i
t1 = threading.Thread(target=add_)
t2 = threading.Thread(target=sub_)
t1.start()
t1.join() # 主线程等待子线程任务结束时解堵塞
t2.start()
t2.join()
print(num)
thread_obj.setDaemon(bool: attr)
设置守护线程,需要在线程启动之前设置。
如果一个线程为守护线程则主线程执行完毕后不管子线程任务是否结束都自动退出。
当前参数默认为:
False
import time
import threading
def work():
for i in range(5):
print(i)
time.sleep(1)
t = threading.Thread(target=work)
t.setDaemon(True)
t.start()
print('主线程即将退出...')
thread_obj.current_thread()
获取当前运行的线程对象的引用
import threading
def work():
name = threading.current_thread().getName()
print(name)
for i in range(5):
t = threading.Thread(target=work)
t.setName(f'线程: {i}')
t.start()
自定义线程类完成并发爬虫
import requests
import threading
class ThreadSpider(threading.Thread):
def __init__(self, url):
super().__init__() # 子类继承线程类如果需要重写构造函数必须先调用父类的构造函数
self.url = url
def run(self):
response = requests.get(self.url).content
file_name = self.url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(response)
print('下载完成...')
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
for url in url_list:
ts = ThreadSpider(url)
ts.start()
线程安全
在之前的案例中我们使用了多个线程对一个全局变量进行修改的操作,如果多个线程都对一个全局变量进行操作的话会出现资源竞争的问题,会导致计算错误。
import threading
num = 0
def add_():
global num
for i in range(100000):
num += i
def sub_():
global num
for i in range(100000):
num -= i
t1 = threading.Thread(target=add_)
t2 = threading.Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
使用线程锁解决上述问题
from threading import Thread, RLock
num = 0
lock_obj = RLock()
def add_():
global num
for i in range(100000):
lock_obj.acquire() # 申请锁,申请成功会让其他线程等待直到当前线程释放
num += i
lock_obj.release() # 释放锁,当锁被释放后其他线程才能被cpu挂起执行
def sub_():
global num
for i in range(100000):
lock_obj.acquire()
num -= i
lock_obj.release()
t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
在上述案例中,我们在代码中手动申请锁与释放锁,RLock
方法支持上下文管理协议,可以使用with
语句帮助我们申请和释放锁。
from threading import Thread, RLock
num = 0
lock_obj = RLock()
def add_():
global num
for i in range(100000):
with lock_obj: # 使用上下文管理锁的申请与释放
num += i
def sub_():
global num
for i in range(100000):
with lock_obj:
num -= i
t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
在多线程编程中,不是任何地方都需要加锁处理。有些内置类型已经具有了加锁的机制,例如:列表。
import threading
data_list = list()
def add_list():
for _ in range(100000):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target=add_list)
t.start()
在线程中一般使用两种锁机制:Lock
、RLock
Lock
同步锁:同步锁一般很少使用,不支持锁嵌套
from threading import Thread, Lock
num = 0
lock_obj = Lock()
def add_num():
global num
for i in range(10000000):
lock_obj.acquire()
num += 1
lock_obj.release()
print(num)
for _ in range(2):
t = Thread(target=add_num)
t.start()
RLock
递归锁:支持锁嵌套
from threading import Thread, RLock
num = 0
lock_obj = RLock()
def add_num():
global num
for i in range(100000):
lock_obj.acquire()
lock_obj.acquire()
num += 1
lock_obj.release()
lock_obj.release()
print(num)
for _ in range(2):
t = Thread(target=add_num)
t.start()
死锁
在使用锁的过程中我们发现如果在同步锁中使用了嵌套则程序会卡死,这种情况我们称之为死锁。
死锁:由于资源竞争造成的一种堵塞现象。
# coding=utf-8
import threading
import time
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()
# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name + '----do1---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name + '----do1---down----')
mutexB.release()
# 对mutexA解锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()
# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name + '----do2---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name + '----do2---down----')
mutexA.release()
# 对mutexB解锁
mutexB.release()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
线程池
创建线程对象的期间会损耗时间,尤其是在需要开辟大量线程对象的时候会发生性能下降的情况。那么我们能否让程序创建一定数量的线程对象,并且在执行完某一个任务后不会被解释器销毁,下一个任务重复使用之前所创建的线程对象。
像这种需要创建大量线程对象的场景推荐使用线程池。
线程池的创建
# 模拟网页请求
import time
from concurrent.futures import ThreadPoolExecutor
def get_html(time_attr):
time.sleep(time_attr)
print(f'get page {time_attr} success')
return time_attr
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 通过submit提交需要执行的函数到线程池中,并且submit是立即返回对象不会堵塞
task_1 = executor.submit(get_html, 3)
task_2 = executor.submit(get_html, 2)
# done方法用于判定某个任务是否完成
print('task_1完成情况:', task_1.done())
# 可以使用cancel取消任务 但是运行中的任务无法取消,可以将线程数量修改成1
# print('task_2任务取消:', task_2.cancel())
# result方法可以获取任务的返回值 当前获取为阻塞
print('task_1返回结果:', task_1.result())
as_completed
获取已经执行成功的
task
的返回值
# 模拟网页请求
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_html(time_attr):
time.sleep(time_attr)
print(f'get page {time_attr} success')
return time_attr
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 批量提交任务并获取已经执行成功的task的返回值
time_attr_list = [3, 2, 5, 4]
all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list]
for future in as_completed(all_task):
data = future.result()
print(f'get page {data}') # 线程任务只要执行完就能获取到返回值
map
获取已经执行成功的
task
的返回值 - 代码更精简
# 模拟网页请求
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_html(time_attr):
time.sleep(time_attr)
print(f'get page {time_attr} success')
return time_attr
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 批量提交任务并获取已经执行成功的task的返回值
time_attr_list = [3, 2, 5, 4]
# 通过executor对象中的map获取已经完成的任务的返回值
for data in executor.map(get_html, time_attr_list):
print(f'get page {data}') # 当前打印的返回值顺序与列表顺序一致
# all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list]
# for future in as_completed(all_task):
# data = future.result()
# print(f'get page {data}') # 线程任务只要执行完就能获取到返回值
wait
等待指定任务完成后主线程解堵塞
import time
from concurrent.futures import ThreadPoolExecutor, wait
def get_html(time_attr):
time.sleep(time_attr)
print(f'get page {time_attr} success')
return time_attr
# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)
# 批量提交任务并获取已经执行成功的task的返回值
time_attr_list = [3, 2, 5, 4]
all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list]
wait(all_task)
print('主线程执行...')