
进程与线程
1.一些核心概念:
并发VS并行
并发:在一段时间内,当cpu面对多个任务时候,会将每个任务交替执行一段时间
特点:1.对于某个瞬间,cpu只是在执行一个任务
2.cpu通过高频切换不同的任务,让每个任务都能得到推进,仿佛多个任务在同时执行
并行:并行依赖多个cpu,同一时刻,每个cpu执行不同的任务
特点:通过多个cpu同时工作的方式,让多个任务同时执行
同步vs异步
同步:发起一个任务之后,需要等该任务完成后,才能执行后续任务
表现为当前执行流会被阻塞
异步:发起一个任务之后,不必等该任务完成,可以继续执行其他任务
表现为当前执行流不会被阻塞
对比:
并发/并行:描述的是任务如何被执行,多个任务执行时,cpu怎么处理
同步/异步:描述的是任务如何被组织和等待,即:是否等当前任务执行完,再进行下一个任务
进程vs线程
进程:一个正在运行的程序,背后有一或多个线程
进程是操作系统进行资源分配的基本单位
每个进程有自己独立的一块内存空间
线程:线程是进程内部的执行单元,一个进程有多个线程
主进程VS子进程
python查看进程pid
os.getpid():获取当前进程ID
os.getppid():获取父进程ID
2.process创建线程
multiprocessing创建进程对象
import os
import time
from multiprocessing import Process
print(100,__name__,os.getpid())
def speak():
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study():
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
创建了p1和p2两个process的实例对象,此时p1和p2对应两个子进程,创建它们的时候,指定好他们要执行的任务,此时他们只是两个进程对象,还没有创建进程
p1=Process(target=speak)
p2=Process(target=study)
调用start方法,会立刻申请一个进程,交给操作系统调度
p1.start()
p2.start()
关于process参数
在实例化 Process 时,可以传递以下参数:
🔸group: 默认值为None(应当始终为None)。
🔸target:子进程要执行的可调用对象,默认值为 None。
🔸name: 进程名称,默认为 None ,如果设置为 None,Python 会自动分配名字。
🔸args: 给 target 传的位置参数(元组)
🔸kwargs:给 target 传的关键字参数(字典)。
🔸daemon:标记进程是否为守护进程,取值为布尔值(默认为 None,表示从创建方继承)。
import os
import time
from multiprocessing import Process
print(100,__name__,os.getpid())
def speak():
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def speak1(a,b,msg):
print(f"我是子进程speek1,{a},{b},{msg},{current_process().name}我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study():
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
p1=Process(target=speak)
p2=Process(target=study)
p3=Process(target=speak1,name="speak",args=(666,999),kwargs={'msg':'msg'})
p1.start()
p2.start()
p3.start()
进程控制
1.Lock进程锁
防止多个进程同时打印或操作同一资源导致数据错乱,可以使用Lock
lock.acquire()上锁
lock.release()释放锁
上下文管理器用法:
with lock:
传统死锁的方式
import os
import time
from multiprocessing import Process,current_process,Lock
print(100,__name__,os.getpid())
def speak(lock):
lock.acquire()
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
lock.release()
time.sleep(1)
def speak1(a,b,msg):
print(f"我是子进程speek1,{a},{b},{msg},{current_process().name}我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study(lock):
with lock:
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
lock=Lock()
p1=Process(target=speak,args=(lock,))
p2=Process(target=study,args=(lock,))
p3=Process(target=speak1,name="speak",args=(666,999),kwargs={'msg':'msg'})
p1.start()
p2.start()
p3.start()
传统上锁会导致死锁状态,因此我们用Rlock()
import os
import time
from multiprocessing import Process,current_process,Lock,RLock
print(100,__name__,os.getpid())
def speak(lock):
lock.acquire()
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
lock.release()
time.sleep(1)
def speak1(a,b,msg):
print(f"我是子进程speek1,{a},{b},{msg},{current_process().name}我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study(lock):
with lock:
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
lock=RLock()
p1=Process(target=speak,args=(lock,))
p2=Process(target=study,args=(lock,))
p3=Process(target=speak1,name="speak",args=(666,999),kwargs={'msg':'msg'})
p1.start()
p2.start()
p3.start()
join方法
阻塞当前进程,直接调用join的进程执行完毕
参数:join(timeout) timeout为超时时间,时间到了进程还没结束,主进程就不等了,会继续执行
join()必须在start()后面调用
import os
import time
from multiprocessing import Process,current_process,Lock,RLock
print(100,__name__,os.getpid())
def speak(lock):
lock.acquire()
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
lock.release()
time.sleep(1)
def speak1(a,b,msg):
print(f"我是子进程speek1,{a},{b},{msg},{current_process().name}我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study(lock):
with lock:
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
lock=RLock()
p1=Process(target=speak,args=(lock,))
p2=Process(target=study,args=(lock,))
p3=Process(target=speak1,name="speak",args=(666,999),kwargs={'msg':'msg'})
p1.start()
p1.join(5)
p2.start()
p2.join(5)
p3.start()
terminate方法
向操作系统申请强制终止进程,可以用is_alive()判断进程是否还活着
import os
import time
from multiprocessing import Process,current_process,Lock,RLock
print(100,__name__,os.getpid())
def speak(lock):
lock.acquire()
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
lock.release()
time.sleep(1)
def speak1(a,b,msg):
print(f"我是子进程speek1,{a},{b},{msg},{current_process().name}我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study(lock):
with lock:
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
lock=RLock()
p1=Process(target=speak,args=(lock,))
p2=Process(target=study,args=(lock,))
p3=Process(target=speak1,name="speak",args=(666,999),kwargs={'msg':'msg'})
p1.start()
p1.terminate()
p1.join(5)
print(p1.is_alive())
p2.start()
p2.join(5)
p3.start()
守护进程
什么是守护进程?
一种依附于主进程存在的子进程,一旦主进程结束,自动终止
守护进程的使用场景:
后台监控类任务
日志 / 统计 / 采样 类任务
辅助型“陪跑任务”
注意点:
守护进程必须是 子进程。
主进程结束,守护进程也会随之结束。
守护进程中,不允许再创建新的子进程。
必须在 start 之前,start()之后,不能再设置daemon。
import os
import time
from multiprocessing import Process,current_process,Lock,RLock
print(100,__name__,os.getpid())
def speak(lock):
lock.acquire()
print(f"我是子进程speek,我的pid{os.getpid()},父进程:{os.getppid()}")
lock.release()
time.sleep(1)
def speak1(a,b,msg):
print(f"我是子进程speek1,{a},{b},{msg},{current_process().name}我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def study(lock):
with lock:
print(f"我是子进程study,我的pid{os.getpid()},父进程:{os.getppid()}")
time.sleep(1)
def monotor():
while True:
try:
with open('log.txt', 'r', encoding='utf-8') as file:
lines = sum(1 for _ in file)
except FileNotFoundError:
lines = 0
print(f'我是【守护进程({os.getpid()})】,log.txt 共有{lines}行')
time.sleep(1)
#如果是主进程
if __name__=='__main__':
print("我是主进程")
lock=RLock()
p1=Process(target=speak,args=(lock,))
p2=Process(target=study,args=(lock,))
p3=Process(target=speak1,name="speak",args=(666,999),kwargs={'msg':'msg'})
p4=Process(target=monotor,daemon=True)
p1.start()
p1.terminate()
p1.join(5)
print(p1.is_alive())
p2.start()
p2.join(5)
p3.start()
p4.start()
with open('log.txt', 'a', encoding='utf-8') as file:
for index in range(10):
file.write(f'1233{index}\n')
file.flush()
time.sleep(1)
进程之间不共享变量
进程之间不共享内存,也不共享变量
from multiprocessing import Process
num=100
names=[]
def test1():
global num,names
num+=10
names.append("张三")
print(f"{num}")
def test2():
global num,names
num-=10
names.append("张三1")
print(f"{num}")
print(f"{names}")
if __name__=="__main__":
p1=Process(target=test1())
p2=Process(target=test2())
p1.start()
p2.start()
p1.join()
p2.join()
队列Queue
1.一种先进先出的数据结构
import time
from multiprocessing import Queue,Process
#创建一个队列,不限制大小
q1=Queue()
#创建一个队列,最多保存3个元素
q2=Queue(4)
#put入队
q1.put(10)
q1.put(20)
q1.put(30)
print(q1)
#get 取数据
value1=q1.get()
value2=q1.get()
value3=q1.get()
print(value1)
print(value2)
print(value3)
#empty判断队列是否为空
result=q1.empty()
q1.put(100)
result1=q1.full()
print(result1)
result2=q1.qsize()
print(result2)
#队列满的时候,取出后才会添加元素
q2.put(1000)
q2.put(2000)
q2.put(3000)
q2.get()
q2.put(4000,timeout=3)
#向队列之间添加元素,不会进入等待模式,若队列已满,会抛出异常
q2.put_nowait(400)
print("放入完毕")
#队列空的时候,继续get会进入等待模式
q2.get()
#get_nowait直接读取队列种的元素
q2.get_nowait()
8.使用Queue实现进程通信
核心思想:一个进程负责生产数据,一个负责消费数据,中间Queue进行传话
import time
from multiprocessing import Process,Queue
def test1(q):
for index in range(5):
print(f'{index}')
q.put(index)
time.sleep(0.5)
def test2(q):
for index in range(5):
data=q.get()
print(f'{data}')
time.sleep(1)
if __name__=="__main__":
q=Queue()
p1=Process(target=test1,args=(q,))
p2=Process(target=test2,args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
本节终



