与多进程代码一样,多线程代码在共享数据时也容易受到竞争条件的影响,因为我们无法控制执行顺序。任何时候,如果有两个线程或进程可能修改一个非线程安全的共享数据,你就需要使用锁来正确同步访问。概念上,这与我们在多进程时采用的方法没有区别;然而,线程的内存模型改变了方法的实现细节。
回想一下,使用多进程时,默认情况下我们创建的进程不会共享内存。这意味着我们需要创建特殊的共享内存对象并正确初始化它们,以便每个进程都能读写该对象。由于线程 确实 有权限访问父进程的相同内存,我们不再需要这样做,线程可以直接访问共享变量。
这简化了一些事情,但由于我们不会使用内置锁的 Value 对象,所以我们需要自己创建它们。为此,我们需要使用 threading 模块的 Lock 实现,这与我们在多进程时使用的不一样。这很简单,只需从 threading 模块导入 Lock,然后在关键代码段周围调用其 acquire 和 release 方法,或者将其用作上下文管理器。
为了了解如何在线程中使用锁,让我们重新审视第 6 章的任务:跟踪并显示长时间任务的进度。我们将之前的发送数千次网络请求的例子改为使用一个共享计数器来跟踪已完成的请求数量。
import functoolsimport requestsimport asynciofrom concurrent.futures import ThreadPoolExecutorfrom threading import Lockfrom util import async_timedcounter_lock = Lock()counter: int = 0def get_status_code(url: str) -> int: global counter response = requests.get(url) with counter_lock: counter = counter + 1 return response.status_codeasync def reporter(request_count: int): while counter < request_count: print(f'Finished {counter}/{request_count} requests') await asyncio.sleep(.5)@async_timed()async def main(): loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: request_count = 200 urls = ['https:// www .example .com' for _ in range(request_count)] reporter_task = asyncio.create_task(reporter(request_count)) tasks = [loop.run_in_executor(pool, functools.partial(get_status_code, url)) for url in urls] results = await asyncio.gather(*tasks) await reporter_task print(results)asyncio.run(main())
这应该看起来很熟悉,因为它类似于我们在第 6 章中为 map 操作输出进度时编写的代码。我们创建了一个全局的 counter 变量和一个 counter_lock,用于在关键部分同步对其的访问。在 get_status_code 函数中,我们在递增计数器时获取锁。然后,在我们的主协程中,我们启动了一个报告器后台任务,每隔 500 毫秒输出已完成的请求数量。运行这段代码,你应该会看到类似以下的输出:
Finished 0/200 requestsFinished 48/200 requestsFinished 97/200 requestsFinished 163/200 requests
现在我们知道多线程和多进程的基本锁知识,但关于锁还有很多要学。接下来,我们将探讨可重入的概念。
简单的锁在协调多个线程对共享变量的访问时效果很好,但如果一个线程尝试获取它已经持有的锁,会发生什么?这安全吗?由于同一个线程在获取锁,这应该是安全的,因为从定义上讲这是单线程的,因此是线程安全的。
虽然这种访问应该是安全的,但它确实会给到目前为止我们一直在使用的锁带来问题。为了说明这一点,让我们设想一个递归求和函数,它接收一个整数列表并计算列表的总和。我们想要求和的列表可以从多个线程中被修改,所以我们需要使用锁来确保在求和操作期间列表不会被修改。让我们尝试用一个普通锁来实现,看看会发生什么。我们还会添加一些控制台输出,以便查看我们的函数是如何执行的。
from threading import Lock, Threadfrom typing import Listlist_lock = Lock()def sum_list(int_list: List[int]) -> int: print('Waiting to acquire lock...') with list_lock: print('Acquired lock.') if len(int_list) == 0: print('Finished summing.') return 0 else: head, *tail = int_list print('Summing rest of list.') return head + sum_list(tail)thread = Thread(target=sum_list, args=([1, 2, 3, 4],))thread.start()thread.join()
如果你运行这段代码,你会看到以下几条消息,然后应用会永远挂起:
Waiting to acquire lock...Acquired lock.Summing rest of list.Waiting to acquire lock...
为什么会这样?如果我们一步步分析,第一次获取 list_lock 是完全没问题的。然后我们解包列表并递归调用 sum_list 来处理列表的其余部分。这又导致我们试图第二次获取 list_lock。这时我们的代码就挂起了,因为既然我们已经获得了锁,就会永远阻塞在第二次获取锁上。这也意味着我们从未退出第一个 with 块,也无法释放锁;我们正等待一个永远不会被释放的锁!
由于这个递归来自最初启动它的同一个线程,多次获取锁不应该是个问题,因为这不会导致竞争条件。为了支持这些用例,threading 库提供了可重入锁。可重入锁是一种特殊类型的锁,可以被同一个线程多次获取,允许该线程“重新进入”临界区。threading 模块提供了 RLock 类来实现可重入锁。我们可以将上面的代码修改为只修改两行代码——import 语句和 list_lock 的创建:
from threading import RLocklist_lock = RLock()
如果我们修改这两行代码,我们的代码就能正常工作,单个线程可以多次获取锁。内部,可重入锁通过保留一个递归计数来工作。每次从首次获取锁的线程中获取锁时,计数都会增加,每次释放锁时计数都会减少。当计数为 0 时,锁才最终被释放,供其他线程获取。
让我们看一个更实际的应用来真正理解带锁的递归。想象一下,我们正在尝试构建一个线程安全的整数列表类,该类有一个方法可以查找并替换所有等于某个值的元素为另一个值。这个类将包含一个普通的 Python 列表和一个用于防止竞争条件的锁。我们假装现有的类已经有名为 indices_of(to_find: int) 的方法,该方法接收一个整数并返回列表中所有匹配 to_find 的索引。由于我们想遵循 DRY(不要重复自己)原则,我们将在定义查找和替换方法时重用此方法(请注意,这不是最高效的方法,但我们这样做是为了说明概念)。这意味着我们的类和方法将如下所示。
from threading import Lockfrom typing import Listclass IntListThreadsafe: def __init__(self, wrapped_list: List[int]): self._lock = Lock() self._inner_list = wrapped_list def indices_of(self, to_find: int) -> List[int]: with self._lock: enumerator = enumerate(self._inner_list) return [index for index, value in enumerator if value == to_find] def find_and_replace(self, to_replace: int, replace_with: int) -> None: with self._lock: indices = self.indices_of(to_replace) for index in indices: self._inner_list[index] = replace_withthreadsafe_list = IntListThreadsafe([1, 2, 1, 2, 1])threadsafe_list.find_and_replace(1, 2)
如果另一个线程在我们的 indices_of 调用期间修改了列表,我们可能会得到错误的返回值,所以我们需要在搜索匹配索引之前获取锁。我们的 find_and_replace 方法也需要获取锁,原因相同。然而,使用普通锁时,当我们调用 find_and_replace 时,我们会永远挂起。在这种情况下,切换到 RLock 可以解决这个问题,因为一次 find_and_replace 调用将始终获取来自同一线程的任何锁。这说明了一个通用的公式:如果你正在开发一个线程安全的类,其中方法 A 获取锁,而方法 B 也需要获取锁并且 还要 调用方法 A,那么你很可能需要使用可重入锁。
你可能对死锁的概念比较熟悉,这在新闻中政治谈判的报道里经常出现,一方对另一方提出要求,而另一方则提出反要求。双方在下一步行动上意见不合,谈判陷入僵局。计算机科学中的概念与此相似,即我们达到一种状态,对共享资源的争夺没有解决办法,应用会永远挂起。
我们在前一节中看到的问题,即非可重入锁可能导致程序永远挂起,就是一个死锁的例子。在这种情况下,我们陷入与自己的僵局谈判,要求获取一个永远不会被释放的锁。这种情况也可能出现在有两个线程使用多个锁时。图 7.1 描述了这种情形:如果线程 A 请求线程 B 已获取的锁,而线程 B 正在请求 A 已获取的锁,我们就陷入了僵局,形成了死锁。在这种情况下,使用可重入锁也无法解决问题,因为多个线程被困在等待对方持有的资源。
图 7.1 线程 1 和 2 大致在同一时间获取了锁 A 和锁 B。然后,线程 1 等待锁 B,而线程 2 持有该锁;与此同时,线程 2 正在等待锁 A,而线程 1 持有该锁。这种循环依赖导致了死锁,会使应用挂起。
让我们看看如何在代码中创建这种类型的死锁。我们将创建两个锁,锁 A 和锁 B,以及两个需要获取这两个锁的方法。一个方法会先获取 A 再获取 B,另一个方法会先获取 B 再获取 A。
from threading import Lock, Threadimport timelock_a = Lock()lock_b = Lock()def a(): with lock_a: <spanclass="fm-combinumeral">❶</span> print('Acquired lock a from method a!') time.sleep(1) <span class="fm-combinumeral">❷</span> with lock_b: <spanclass="fm-combinumeral">❸</span> print('Acquired both locks from method a!')def b(): with lock_b: <spanclass="fm-combinumeral">❸</span> print('Acquired lock b from method b!') with lock_a: <spanclass="fm-combinumeral">❶</span> print('Acquired both locks from method b!')thread_1 = Thread(target=a)thread_2 = Thread(target=b)thread_1.start()thread_2.start()thread_1.join()thread_2.join()
- ❷ 睡眠 1 秒;这确保了产生死锁所需的正确条件。
当我们运行这段代码时,我们会看到以下输出,然后应用会永远挂起:
Acquired lock a from method a!Acquired lock b from method b!
我们首先调用方法 A 并获取锁 A,然后引入一个人工延迟,给方法 B 机会获取锁 B。这使我们处于一种状态,即方法 A 持有锁 A,而方法 B 持有锁 B。接下来,方法 A 尝试获取锁 B,但方法 B 正在持有该锁。与此同时,方法 B 试图获取锁 A,但方法 A 正在持有它,因此卡在等待 B 释放其锁。两个方法都卡在等待对方释放资源,从而陷入僵局。
那么我们该如何处理这种情况?一种解决方案是所谓的“鸵鸟算法”,这个名字来源于一种情况(尽管鸵鸟实际上并不这么行为),即鸵鸟在感觉到危险时就把头埋进沙子里。采用这种策略,我们忽略问题,制定一个策略,当遇到问题时重启应用。这种做法背后的思路是,如果问题发生得足够罕见,投资于修复就不值得。如果你移除了上面代码中的 sleep,你会发现死锁很少发生,因为它依赖于一个非常特定的操作序列。这并不是真正的解决方案,也不是理想的做法,但却是处理极少发生的死锁时常用的一种策略。
然而,在我们的情况中,有一个简单的解决方案,那就是改变两个方法中锁的获取顺序。例如,方法 A 和方法 B 都可以先获取锁 A,然后再获取锁 B。这解决了问题,因为我们永远不会以可能导致死锁的顺序获取锁。另一种选择是重构锁,使其只使用一个锁而不是两个。使用一个锁是不可能发生死锁的(排除前面提到的可重入死锁)。总的来说,当处理需要获取多个锁的情况时,请问自己:“我是否以一致的顺序获取这些锁?有没有办法重构,使其只使用一个锁?”
我们现在已经看到了如何有效地使用线程与 asyncio 结合,并研究了更复杂的锁场景。接下来,让我们看看如何使用线程将 asyncio 集成到现有的同步应用程序中,这些应用可能无法与 asyncio 顺畅配合。