❝Python入门第三十五课,主要是学习了线程的核心概念,以及线程的创建和使用方法。
阅读提示
线程的语法与进程有很多相似之处,所以本文就简单串讲一下线程的相关知识,更多细节可以阅读本站进程相关文章或官方文档。
一、什么是线程?
线程是操作系统能够调度的最小执行单元,它属于一个进程,同一个进程内的多个线程共享进程的内存空间(包括全局变量、文件描述符等)。
我们需要明确一个概念:任何一个正在运行的 Python 程序中,至少都有一个线程!以下面的代码为例:
if __name__ == '__main__': print('主进程中的代码')
对于上面的代码来说:print('主进程中的代码')确实属于主进程,但更准确的说,是运行在主进程里的主线程中。
线程是进程中的执行单位:
- 多个线程之间会:共享进程的内存空间、但执行顺序由操作系统调度。
二、基本用法:使用 Thread 创建线程
与进程非常相似,语法结构基本一致,且传递参数也相同。
在实例化Thread时,可以传递以下参数:
✧ group默认值为None(应当始终为None)。
✧ target子进程要执行的可调用对象,默认值为None。
✧ name进程名称,默认为None,如果设置为None,Python 会自动分配名字。
✧ args给target传的位置参数(元组)。
✧ kwargs给target传的关键字参数(字典)。
✧ daemon标记进程是否为守护进程,取值为布尔值(默认为None,表示从创建方继承)。
可以使用current_thread().name获取当前线程的名字。
线程控制:
✧ t1.join()方法的作用是阻塞t1线程,直到t1线程执行完成。(更多用法可以参考进程的join)
✧ 使用threading.RLock锁保护线程的安全共享,具体应用with lock为最佳实践,它可以通过上下文管理器自动处理锁的获取和释放,避免忘记解锁。
示例代码:
import os, timefrom threading import Thread, get_native_id, RLock, current_threaddefspeak(lock, thread_nums, desc):for index in range(thread_nums):with lock: print(f'{desc}{current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}') time.sleep(1)defstudy(lock, thread_nums, desc):for index in range(thread_nums):with lock: print(f'{desc}{current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}') time.sleep(1)if __name__ == '__main__': print(f'-------- Main Process Start --------进程PID是:{os.getpid()},线程编号是:{get_native_id()}') lock = RLock()# 创建线程对象 t1 = Thread(target=speak, name='SpeakThread', args=(lock, 5), kwargs={'desc': '讲话线程'}) t2 = Thread(target=study, name='StudyThread', args=(lock, 3), kwargs={'desc': '学习线程'})# 调用线程对象的 start 方法,会立刻将该线程交由操作系统进行调度 t1.start() t2.start()# 让主线程等 t1 和 t2 线程执行完毕后,主线程再继续执行 t1.join() t2.join() print('-------- Main Process End --------')
三、线程之间共享变量
同一个进程下的多个线程,共享它们所同属的进程的内存空间,可以访问和修改全局变量,但由于GIL(全局解释器锁)和线程调度的不确定性,直接共享变量可能导致静态条件。
下面是一个完整的 Python 线程共享变量示例,演示内容:
from threading import Thread, RLockimport time# 共享变量counter = 0# 创建一个锁对象lock = RLock()# 目标函数:对共享变量进行 N 次自增(不安全的自增操作,缺少锁保护)defincrement_without_lock(n):global counterfor _ in range(n): current = counter time.sleep(0.0001) # 模拟一些计算延迟,增加竞态条件发生的概率 counter = current + 1# 目标函数:对共享变量进行 N 次自增(使用了锁保护)defincrement_with_lock(n):global counterfor _ in range(n):with lock: # 自动获取和释放锁 current = counter# 模拟计算延迟(即使有延迟,锁也能保证安全) time.sleep(0.0001) counter = current + 1defrun_test(target_func, n, num_threads, description):global counter counter = 0# 重置共享变量 threads = []# 创建多个线程for _ in range(num_threads): t = Thread(target=target_func, args=(n,)) threads.append(t) t.start()for t in threads: t.join()# 理论上结果应为:n * num_threads expected = n * num_threads print(f'{description:20} | 期望值:{expected:6} | 实际值:{counter:6} | {'正确'if counter == expected else'错误'}')if __name__ == '__main__':# 每个线程执行的自增次数 INC_PER_THREAD = 10_000# 线程数量 THREAD_COUNT = 3 print('线程共享变量静态条件演示\n')# 测试无锁版本(错误) run_test(increment_without_lock, INC_PER_THREAD, THREAD_COUNT, '无锁版本(错误)')# 测试有锁版本(正确) run_test(increment_with_lock, INC_PER_THREAD, THREAD_COUNT, '有锁版本(正确)')
运行结果如下:
线程共享变量静态条件演示无锁版本(错误) | 期望值: 30000 | 实际值: 10000 | 错误有锁版本(正确) | 期望值: 30000 | 实际值: 30000 | 正确
四、守护线程(Daemon Thread)
如果一个线程被设为守护线程,但所有非守护线程结束时,他会自动被强制终止。
import timefrom threading import Thread, RLocklock = RLock()def daemon_worker(): while True: with lock: print('守护线程工作中……') time.sleep(1)def normal_worker(): for _ in range(5): with lock: print('正常线程工作中……') time.sleep(1)if __name__ == '__main__': t1 = Thread(target=daemon_worker, daemon=True) t2 = Thread(target=normal_worker) t1.start() t2.start()
上面的代码运行结果是:5秒后会自动终止daemon_worker中的无限循环。
五、继承 Thread 创建线程
与继承Process创建进程一样,我们也可以继承Thread创建线程。
import os, timefrom threading import Thread, get_native_id, RLock, current_threadclassSpeakClass(Thread):def__init__(self, lock, thread_nums, desc, **kwargs): super().__init__(**kwargs) self.lock = lock self.thread_nums = thread_nums self.desc = descdefrun(self):for index in range(self.thread_nums):with self.lock: print(f'{self.desc}{current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}') time.sleep(1)classStudyClass(Thread):def__init__(self, lock, thread_nums, desc, **kwargs): super().__init__(**kwargs) self.lock = lock self.thread_nums = thread_nums self.desc = descdefrun(self):for index in range(self.thread_nums):with self.lock: print(f'{self.desc}{current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}') time.sleep(1)if __name__ == '__main__': print(f'-------- Main Process Start --------进程PID是:{os.getpid()},线程编号是:{get_native_id()}') lock = RLock()# 创建线程对象 t1 = SpeakClass(lock, 5, '讲话线程', name='SpeakThread') t2 = StudyClass(lock, 3, '学习线程', name='StudyThread')# 调用线程对象的 start 方法,会立刻将该线程交由操作系统进行调度 t1.start() t2.start()# 让主线程等 t1 和 t2 线程执行完毕后,主线程再继续执行 t1.join() t2.join() print('-------- Main Process End --------')