Python——并发编程
标签: Python——并发编程 Python博客 51CTO博客
2023-07-20 18:24:17 127浏览
1.系统知识
操作系统是计算机系统的核心软件之一,负责管理和控制计算机的硬件资源,以及提供各种功能和服务。操作系统包括硬件管理和资源分配等方面的功能,同时由五个主要的子系统组成:文件系统、进程调度、内存管理、网络接口和进程通信。
1.1 五个子系统
文件系统:负责管理计算机中的文件和文件夹,提供对存储设备的访问和数据存储功能。
进程调度:负责调度和管理计算机中的进程,决定进程的执行顺序和时间片分配等,并确保多个进程能够合理共享CPU资源。
内存管理:负责管理计算机的内存资源,包括逻辑地址和物理地址的转换、段页式内存管理、处理缺页中断、实模式、和保护模式的转换、slab分配器、内存管理单元(MMU)和翻译后备缓冲器(TLB)等。
网络接口:提供计算机与网络之间的连接和通信功能,包括网络协议栈的实现、数据传输和数据交换等。
进程通信:用于不同进程之间的信息传递和数据交换,常用的进程通信方式包括管道(pipe)、消息队列、信号、信号量、共享内存和套接字(socket)等。
1.2 进程与线程
进程:正在运行的程序,是系统进行资源分配的最小单位;线程:运行在进程之上,是系统进行调度的最小单位。 线程的组成:线程ID、当前指令指针(PC)、寄存器集合、堆栈。
进程的组成:进程控制块PCB、数据段、正文段。
1.真正运行在cpu之上的是线程。
2.线程共享内存空间;进程的内存是独立的,进程之间是相互隔离的。
3.一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。
4.资源分配给进程,同一进程的所有线程共享该进程的所有资源,进程的资源是独立的。
5.同一个进程的线程之间可以直接交流;两个进程想通信,必须通过一个内核代理来实现的。
6.用户创建出来的所有进程都是由操作系统负责,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。
Linux中pid为0的进程是所有进程的主进程。
进程三态模型:就绪、运行、阻塞; 进程五态模型:新建、就绪、运行、阻塞、终止
1.3多线程与多进程的多维度比较
总结:1.由于多进程之间的内存是不共享的,一个进程崩溃不会影响其他进程的正常运行,而同一进程的多线程之间是共享内存数据的,一个线程挂掉将导致整个进程挂掉;所以从可靠性的角度而言,多进程的可靠性更高。
1.4并发并行
并发:某一个时间段内,可以处理多件事情,交替执行; 并行:同一时刻,处理多件事情,同时执行。并发与并行并不相同,并发主要由切换时间片来实现“同时”运行,并行则是直接利用多核实现。所以,对于单核系统,同一时刻只能执行一个任务。
由此可以引入时间片的概念:时间片是多任务操作系统中使用的一种调度策略。在多任务系统中,操作系统按照一定的时间间隔划分为一段段小的时间片,每个进程或线程被分配一个时间片来执行。当时间片用完后,操作系统将暂停当前进程或线程的执行,并切换到下一个可执行的进程或线程上。关于时间片,还有一个时间片轮转调度算法,这里就不扩展了。
1.5内存空间
在一个32位操作系统中,程序最大的内存空间是4GB。
其中,内核空间占据了1GB的地址空间,用于操作系统内核的执行和存储。用户空间则占据了剩下的3GB的地址空间,用于用户程序的执行和存储。
在用户空间中,通常将内存分为不同的区域:
- 栈区(Stack):栈区是由系统自动分配和释放的,用于存放函数调用时的局部变量和函数调用的上下文信息。栈区的内存地址从高地址向低地址递减。
- 堆区(Heap):堆区是由程序员手动申请和释放的,用于动态分配内存(例如使用malloc函数)。堆区的内存地址从低地址向高地址递增。
- 数据段(Data):数据段包括两个部分,未初始化的静态数据(BSS段)和已初始化的静态数据(Data段)。这些数据通常是全局变量或静态变量。
- 代码段(Code):代码段存放着程序的可执行代码。
2.线程
在单个程序中同时运行多个线程完成不同的工作称为多线程。线程是应用程序中工作的最小单元。在python中,提供了threading模块进行多线程的使用。
threading模块提供的常用类:1.Thread: 创建线程 2.Lock/RLock: 互斥锁
使用Thread:t = threading.Thread(target=None, args=()) target传入需要执行的方法,注意不要传入方法运行的结果;args()要传入方法的参数,若没有参数则不需要写,这里也需要注意,参数是以元组的方式进行传入。这两个是常用的,还有 name:线程名、group:线程组,目前尚未实现。
Thread实例方法:
1 t.name: 获取或设置线程的名称。
2 t.getName()/setName(name):获取/设置线程名。
3 t.is_alive()、t.isAlive():判断线程是否为激活状态。
4 t.ident:获取线程的标识符。
5 t.run():线程被cpu调度后自动执行线程对象的run方法。
6 t.start():线程准备就绪,等待CPU调度,start会自动调用t.run()。
7 t.join():阻塞当前上下文环境的线程。
8 t.isDaemon():判断是否为后台线程。
9 t.setDaemon(bool):设置是后台线程 (默认前台进程(False))。 设置为True 后台线程 -- 主线程执行结束,子线程就退出。 默认情况为False 前台进程 -- 主线程执行结束后,等待子线程执行结束后才退出。
2.1使用案例
import threading
import requests
import time
def runtime(func):
def inner(*args, **kwargs): # *args **kwargs 让装饰器更加通用
start = time.time()
result = func(*args, **kwargs) #原函数执行
end = time.time()
print(f"函数执行花了{end-start}s")
# print(f"执行了{func.__name__}函数")
return result #inner函数返回原函数的返回值
return inner
def get_content(url):
result = requests.get(url).text
time.sleep(0.5)
print("get url")
@runtime
def main():
print("start......")
# for i in range(5):
# get_content("http://www.baidu.com")
t_list = []
# 多线程
for i in range(5):
# target --> 任务 --> 传入一个callable对象,做什么
# args --> 指定要传递的参数,元组
t = threading.Thread(target=get_content, args=("http://www.baidu.com",))
# 前台线程,后台线程,在start之前设置
# 默认是False 前台线程 -- 主线程等待子线程结束才退出
# 设置为True 后台线程 -- 主线程执行结束,子线程就退出
t.setDaemon(True)
t.start() # 启动的时候自动调用run方法,run方法里面又去调用传递进来的target
t_list.append(t)
# t.join()
# 等线程全部创建启动完成之后再去join
# for i in t_list:
# t.join() # 阻塞当前环境上下文,直到t的线程执行完成
print("main function finished......")
main()
print("ending......")
与传统方式相比,多线程的效率更高。
2.2 自定义线程类
自定义线程类,其实也比较简单,其核心就两步1.继承threading.Thread 2.重写run方法,下面是一个简单案例:
import threading
# 自定义线程类(核心:继承threading.Thread,重写run方法)
class MyThread(threading.Thread):
def __init__(self, num):
super().__init__()
self.num = num
def run(self):
print(f"running...{self.num}")
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
run()方法是线程的主体,包含了线程要执行的代码逻辑。当我们通过调用线程的start()方法启动线程时,线程会自动调用run()方法并在新的线程上执行其中的代码。
3.进程
进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配的基本单位。多个进程同时执行时,每个进程的执行都需要由操作系统按一定的算法(RR调度、优先数调度算法等)分配内存空间。
僵尸进程:子进程退出,父进程没有响应,父进程没有调用wait或者waitpid去获取子进程的状态,那么这个子进程的进程描述符就会依然存在系统中。
孤儿进程:父进程退出,子进程还在运行,那么这个子进程就会称为孤儿进程,孤儿进程会被pid为1的进程所收养。
linux下的进程状态:
1 R(Running),运行和就绪等待状态
2 S(Sleeping),可中断的睡眠状态
3 D(Disk Sleep),不可中断的睡眠状态
4 T(Stopped),暂停状态
5 Z(Zombie),僵尸进程状态
6 X(Dead),即将销毁状态
3.1创建子进程
在python中,每个运行的程序都有一个主进程,可以利用模块中封装的方法来创建子进程。os.fork中就有用来创建子进程的方法。注意:这个os.fork()方法在linux系统中才会有,在windows下没有。
1 使用fork创建子进程之后,操作系统会将当前的进程复制一份
2 原来的进程称为父进程,新建的进程称为子进程
3 两个进程会各自互不干扰的执行下面的程序
4 父进程与子进程的执行顺序与系统调度有关
5 在子进程内这个方法会返回0 ; 在父进程内,这个方法会返回子进程的编号PID
os.fork的返回值:
1 大于0时,此进程为父进程,且返回的数字为子进程的PID
2 当返回值为0时,此进程为子进程
3 如果返回值为负数则表明创建子进程失败
父进程结束时,子进程并不会随父进程立刻结束。同样,父进程不会等待子进程执行完毕。
os.getpid() : 获取进程的进程号
os.getppid() : 获取父进程的进程号
在linux系统创建子进程之后,可以通过ps -ef | grep ***命令可以查看到
3.2 使用multiprocessing库
由于windows中没有fork函数可以调用,python提供了multiprocessing库支持跨系统版本。
multiprocessing模块提供的常用类:Process类
使用:p = multiprocessing.Process(target=执行函数, args=(,)) target中传入需要执行的方法,args传入执行函数需要的参数,同样是以元组的形式传入
Process类的实例方法有:
1 p.start(): 启动进程,并调用该子进程中的p.run()
2 p.run(): start()调用run方法,如果实例进程时未指定传入target,这个start执行默认run()方法
3 p.terminate(): 不管任务是否完成,立即停止工作进程
4 p.is_alive(): 如果p仍然运行,返回True
5 p.join(): 阻塞上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout
3.2.1使用案例
from multiprocessing import Process, current_process
import time
lst = []
def task(i):
print(current_process().name, f"start....{i}")
time.sleep(2)
lst.append(i)
print(f"lst is {lst}")
print(current_process().name, f"end....{i}")
# 各个进程都拥有一份数据,相互隔离
if __name__ == "__main__":
for i in range(4):
p = Process(target=task, args=(i,))
p.start()
注意: 创建多进程时,需要使用if __name__ == "__main__":这个if语句,它的作用是:
1.避免多个进程之间共享资源冲突:在多进程编程时,每个子进程都拥有自己独立的内存空间。如果你在文件中定义了全局变量或共享的数据结构,而没有使用此if语句来限制,可能导致数据冲突。通过将主要的逻辑放在if语句内部,可以确保这部分代码只在主进程中执行,避免了多个进程之间的共享资源冲突。
2.避免子进程递归调用:当一个文件被导入为模块时,其中的代码就会被执行。如果你在脚本文件中创建了子进程,并且没有使用此if语句限制,那么每个子进程也会执行整个文件,包括创建子进程的那一部分,这会导致无限递归地创建新的子进程。因此使用if语句可以确保子进程只在文件中作为主程序运行时才会被创建。
3.2.2自定义进程类
其核心跟自定义线程类基本相同:1.继承Process类 2.重写run方法
#自定义进程类
import multiprocessing
class A(multiprocessing.Process):
def __init__(self, num):
super().__init__()
self.num = num
def run(self):
print(f"running...{self.num}")
if __name__ == "__main__":
a = A(1)
b = A(2)
a.start()
b.start()
3.3多线程和多进程的选择
io密集型计算使用多线程
cpu密集型计算使用多进程
3.4进程池
进程池的原理:进程池都是采用预创建的技术,在应用启动之初就预先创建一定数目的进程。进程需要一个管理者,按照一定的要求去动态的维护其中进程的项目。一个master进行管理,多个worker进行任务处理。通常是只有一个master。
创建一个进程池,建议进程数和cpu核数一致。
注意:
1 使用Pool创建进程池对象,同时进程池中进程已经启动
2 向进程池对象中添加事件,事件排队执行
3 如果主进程退出,则进程池中所有进程都退出
3.4.1Pool类的使用
Pool类的构造方法:
Pool(): p = Pool(processes=4, maxtasksperchild=2) processes设置使用的工作进程的数量。 maxtasksperchild指定每个子进程最多处理多少个任务。达到相应的任务数之后,当前进程就会退出,开启新的进程,来让闲置的资源被释放,减少一定的内存消耗。
实例方法:
p.apply_async(): p.apply_async(func= , args=( ,)) 表示进程池接收的任务。
p.close(): 表示关闭进程池,不再接收新的任务。
p.join(): 主进程阻塞,等待子进程的退出,这个方法要在close之后使用。
使用案例:
from multiprocessing import Pool, current_process
import time
def task(i):
# current_process()用于获取当前正在执行的进程对象。它返回一个表示当前进程的 Process 对象。
# .name表示获取的进程名
print(current_process().name, f"start....{i}")
time.sleep(2)
print(current_process().name, f"end....{i}")
if __name__ == "__main__":
# 创建一个进程池,建议进程数和cpu核数一致
# maxtaskperchild 指定每个字进程最多处理多少个任务
# 达到相应的任务数之后,当前进程就会退出,开启新的进程
# 定期释放资源可以达到减少内存消耗的效果
p = Pool(processes=4, maxtasksperchild=2)
for i in range(20):
# 进程池接收任务
p.apply_async(func=task, args=(i,))
# 关闭进程池,不接受任务
p.close()
# 阻塞当前环境
p.join()
print("end......process")
4.协程
概念:协程是用户态的轻量级线程,协程的调度完全由用户控制。协程有自己的寄存器上下文和栈。协程又称微线程,纤程 Coroutine。它运行在线程之上。
作用:在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)
简单比较:
1.线程有自己的上下文,切换受系统控制, 而协程有自己的上下文,但其切换由用户控制,由当前协程切换到其他协程由当前协程来控制。
2.协程避免了无意义的调度,由此可提高性能,同时,协程也失去了标准线程使用多核CPU的能力。
3.与多线程相比协程有极高的执行效率,不需要多线程的锁机制。
4.强调非阻塞异步并发的一般都是使用协程
在专门学并发编程之前,我们之前其实就已经接触到了协程。用户自己控制自己的上下文的特性,其实生成器也有这个特性,生成器通过使用关键字可以在函数执行中多次产生值并暂停。生成器的执行也是可以暂停和恢复的,生成器的状态会被保留下来,下次执行时可以从上次暂停的地方继续执行。生成器在迭代过程中经常被使用,使得我们能够按需生成元素而不是一次性生成所有元素,从而节约内存和提高性能。
协程的缺点:无法利用多核资源
4.1asyncio库
async/await关键字: 用于定义协程,async定义一个协程,await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。
asyncio.sleep(): 用于协程中产生一个阻塞的等待时间。
asyncio.create_task(): 用于创建一个任务。任务是协程的一种封装形式,它可以在事件循环(event loop)中被调度和执行。
asyncio.run(): 用于运行一个简单的协程或任务。它会创建一个新的事件循环,并在运行完协程或任务时,自动关闭事件循环。
4.1.1使用案例
import asyncio
import time
async def say_after(delay, what):
print(f"test start......{what}")
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(4, 'hello'))
task2 = asyncio.create_task(
say_after(5, 'world'))
print(f"started at {time.time()}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.time()}")
asyncio.run(main())
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论
您可能感兴趣的博客