
python进阶完结篇
上节讲了Queue实现进程通信,这次我们用Pipe
Pipe就像一根水管,一头发送一头接收
1.创建管道
import time
from multiprocessing import Process, Pipe
def test1(con1):
con1.send(100)
print('test1发送了100')
def test2(con2):
data = con2.recv()
print(f'test2接收了{data}')
if __name__ == '__main__':
con1, con2 = Pipe(duplex=True)
p1 = Process(target=test1, args=(con1,))
p2 = Process(target=test2, args=(con2,))
p1.start()
p2.start()
p1.join()
p2.join()
duplex是True的时候是双向,False是单向
继承Process类创建进程
子进程逻辑复杂,或者想把进程+行为封装成整体时候,可以使用Process类去创建
核心:
● 必须继承 Process类
● 把子进程要干的事,写进 run() 方法里
● 依然使用start方法启动进程,不要手动调用 run()!
● 若子进程不需要参数,可以不写__init__,若需要参数,则需编写__init__
● 传给的子进程的参数,作为实例属性保存
from multiprocessing import Process
import os,time
class SpeakProcess(Process):
def __init__(self,a,b,**kwargs):
super().__init__(**kwargs)
self.a=a
self.b=b
def run(self):
print(f'我在说话{self.a}b{self.b}')
class runProcess(Process):
def run(self):
print("我在跑步")
if __name__=="__main__":
p1=SpeakProcess(100,200)
p2=runProcess()
p1.start()
p2.start()
p1.join()
p2.join()
进程池
如果每来一个进程就创建一个任务的话会很浪费系统资源,所以就把它们都放进进程池
import os
from concurrent.futures.process import ProcessPoolExecutor
def work(n):
print(f'word执行中{n},{os.getpid()}')
if __name__=='__main__':
print("start")
#创建一个进程池
executor=ProcessPoolExecutor(3)
executor.submit(work,1)
executor.submit(work, 2)
executor.submit(work, 3)
executor.submit(work, 4)
#shutdown:不再接收新任务,wait=True阻塞主进程,等所有任务执行完毕
executor.shutdown(wait=True)
print("end")
获取子进程返回结果
import os
from concurrent.futures.process import ProcessPoolExecutor
def work(n):
print(f'word执行中{n},{os.getpid()}')
return f"返回结果{n}"
if __name__=='__main__':
print("start")
#创建一个进程池
executor=ProcessPoolExecutor(3)
f1 = executor.submit(work,1)
executor.submit(work, 2)
executor.submit(work, 3)
executor.submit(work, 4)
#shutdown:不再接收新任务,wait=True阻塞主进程,等所有任务执行完毕
executor.shutdown(wait=True)
print(f1.result())
print("end")
as_complete按照完成顺徐获取返回结果
import os
from concurrent.futures import as_completed
from concurrent.futures.process import ProcessPoolExecutor
def work(n):
print(f'word执行中{n},{os.getpid()}')
return f"返回结果{n}"
if __name__=='__main__':
print("start")
#创建一个进程池
executor=ProcessPoolExecutor(3)
future=[executor.submit(work,index) for index in range(1,9)]
resultlist=[]
#shutdown:不再接收新任务,wait=True阻塞主进程,等所有任务执行完毕
for f in as_completed(future):
resultlist.append(f.result())
executor.shutdown(wait=True)
print(resultlist)
print("end")
使用 add_done_callback 方法,为任务添加完成时的回调函数
import os
from concurrent.futures import as_completed
from concurrent.futures.process import ProcessPoolExecutor
def work(n):
print(f'word执行中{n},{os.getpid()}')
return f"返回结果{n}"
if __name__=='__main__':
print("start")
#创建一个进程池
executor=ProcessPoolExecutor(3)
resultlist=[]
def done_func(future):
resultlist.append(future.result())
for index in range(1, 9):
f=executor.submit(work,index)
f.add_done_callback(done_func)
executor.shutdown(wait=True)
print(resultlist)
print("end")
map方法批量读取结果,返回结果和任务执行顺序一致
import os
from concurrent.futures import as_completed
from concurrent.futures.process import ProcessPoolExecutor
def work(n):
print(f'word执行中{n},{os.getpid()}')
return f"返回结果{n}"
if __name__=='__main__':
print("start")
#创建一个进程池
executor=ProcessPoolExecutor(3)
resultlist=executor.map(work,[1,2,3,4])
executor.shutdown(wait=True)
print(list(resultlist))
print("end")
with自动回收线程池,自动执行executor.shutdown(wait=True)
import os
from concurrent.futures import as_completed
from concurrent.futures.process import ProcessPoolExecutor
def work(n):
print(f'word执行中{n},{os.getpid()}')
return f"返回结果{n}"
if __name__=='__main__':
print("start")
#创建一个进程池
# executor=ProcessPoolExecutor(3)
# resultlist=executor.map(work,[1,2,3,4])
#
#
# executor.shutdown(wait=True)
# print(list(resultlist))
with ProcessPoolExecutor(3) as excutor:
resultlist = excutor.map(work, [1, 2, 3, 4])
print(list(resultlist))
print("end")
使用Thread建立线程
线程是进程中的执行单位:
● 一个进程里,至少有一个线程(主线程)
● 一个进程里,也可以有多个线程
● 多个线程之间会: 共享进程的内存空间、 但执行顺序由操作 系统调度。
使用Thread创建线程
from threading import Thread,RLock
def speak(lock):
with lock:
print("我在说话")
def run(lock):
with lock:
print("run")
if __name__=="__main__":
lock=RLock()
t1=Thread(target=speak,args=(lock,))
t2=Thread(target=run,args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
继承Thread线程
和继承Process创建进程一样,我们也可以继承Thread创建线程。
from threading import RLock, Thread
class speak(Thread):
def __init__(self, lock, **kwargs):
super().__init__(**kwargs)
self.lock = lock
def run(self):
print("我在讲话")
class run(Thread):
def __init__(self, lock, **kwargs):
super().__init__(**kwargs)
self.lock = lock
def run(self):
print("我在跑步")
if __name__=="__main__":
lock=RLock()
# 继承 Thread 类创建线程对象
p1=speak(lock)
p2=run(lock)
#调用线程的方法
p1.start()
p2.start()
#子线程调用完以后调用主线程
p1.join()
p2.join()
多进程 vs 多线程,该如何选择?
CPU密集型任务,更适合多进程
IO类密集型任务,更适合多线程
协程
什么是协程?
线程内部的任务调度机制,通过事件循环,在用户态中实现任务的挂起与恢复执行,从而在遇到 IO 操作时,不让 CPU 等待,而是继续执行其它需要 CPU 的任务
关键点1️⃣:协程不是线程,也不是进程
● 协程不是操作系统提供的,并且 CPU 看不见协程。
● 操作系统不知道协程的存在。
● 协程是程序员在用户态,用代码“设计出来”的任务切换机制。
关键点2️⃣:协程发生在一个线程内部
● 协程不是线程之间的切换。
● 而是线程内部多个任务之间的切换。
● 本质是一个线程里,写了很多任务,由事件循环统一调度。
关键点3️⃣:协程的核心能力:挂起与恢复
● 当任务 遇到 IO 操作 时:任务会被挂起。
● 当 IO 操作完成后:任务会被恢复执行。 关键点4️⃣:协程依赖一个关键角色:事件循环
● 事件循环负责:调度任务、判断是否该挂起、决定何时恢复执行,事件循环是协程系统的“大脑” 关键点5️⃣:协程的目标是尽量减少线程切换
● 在单线程场景下,最大化 CPU 利用率,特别适合 IO 密集型任务
协程函数:使用async修饰的函数
协程对象:调用协程函数的对象,就是协程对象
import asyncio
async def work():
print("工作")
return"工作结果"
object=work()
result=asyncio.run(object)
print(result)
await 关键有三个作用:
1.挂起:await 会暂停当前协程的执行。
2.等待:遇到 await 关键字,事件循环会立即安排 await 后面的对象去执行,并等待该对象执行完成,并且可以拿到执行结果。
3.恢复:当 await 后的对象执行完毕,事件循环会恢复之前被挂起的协程,该协程会从当时挂起的位置继续执行,并拿到返回值。
async def work():
print("工作")
return"工作结果"
async def main():
res= await work()
print("mains")
print(res)
print("main")
object=main()
result=asyncio.run(object)
print(result)
关键点:在执行 await 后面的对象时,会出现两种情况:
● 情况一:如果在执行该对象中的代码时,遇到了【await I/O操作】(需要等待外部资源返回结果的操作)例如:网络请求、文件读写等,那 CPU 的控制权就会交给事件循环。事件循环会去调度循环中的其他任务(如果有的话)。
● 情况二:如果该对象中的代码,不包含任何【await I/O操作】。例如:print打印、数学计算、逻辑计算等。此时事件循环拿不到 CPU 控制权,无法调度循环中的其他任务,不会发生任务切换。
多个任务同步执行
通过await进行同步执行
import asyncio
async def work():
print("work")
return"word"
async def main():
res1=await work()
res2=await work()
res3=await work()
print(res1)
print(res2)
print(res3)
return"main"
res=asyncio.run(main())
print(res)
多个任务异步执行
使用asyncio.create_task()方法向事件循环中添加任务,从而实现多个任务异步执行。
import asyncio
async def work():
print("work")
return"word"
async def main():
task1=asyncio.create_task(work())
task2=asyncio.create_task(work())
task3=asyncio.create_task(work())
res1=await task1
print(res1)
res2 = await task2
print(res2)
res3 = await task3
print(res3)
return"main"
result=asyncio.run(main())
print(result)
asyncio.gather方法可以把多个协程对象丢给事件循环,并在全部执行完后,一次性拿到所有结果。
import asyncio
async def work():
print("work")
return"word"
async def main():
res1=await asyncio.gather(work(),work())
print(res1)
return"main"
result=asyncio.run(main())
print(result)



