我们主要关注的是从头开始构建完全由协程和 asyncio 实现的应用。当有任何工作不适合单线程并发模型时,我们会在线程或进程中运行它们。并非所有应用都符合这种范式。如果我们正在处理一个现有的同步应用,并想融入 asyncio 呢?
一个可能出现这种情况的例子是构建桌面用户界面。构建 GUI 的框架通常会运行自己的事件循环,而该事件循环会阻塞主线程。这意味着任何长时间运行的操作都可能导致用户界面冻结。此外,这个 UI 事件循环会阻止我们创建 asyncio 事件循环。在本节中,我们将学习如何通过使用多线程在同一个时间运行多个事件循环,来构建一个响应式的 HTTP 压力测试用户界面。
Tkinter 是一个平台无关的桌面图形用户界面(GUI)工具包,随默认的 Python 安装提供。它是 “Tk 接口” 的缩写,是用 tcl 语言编写的低级 Tk GUI 工具包的接口。随着 Tkinter Python 库的创建,Tk 已成长为 Python 开发者构建桌面用户界面的流行方式。
Tkinter 有一组“小部件”,如标签、文本框和按钮,我们可以将它们放置在桌面窗口中。当我们与小部件交互时,比如输入文本或按下按钮,我们可以触发一个函数来执行代码。响应用户操作的代码可以简单到更新另一个小部件,也可以触发另一个操作。
Tkinter 以及许多其他 GUI 库,通过它们自己的事件循环来绘制小部件并处理小部件交互。事件循环不断重绘应用程序,处理事件,并检查是否有代码应响应小部件事件而运行。为了熟悉 Tkinter 及其事件循环,让我们创建一个基本的 “你好世界” 应用。我们将创建一个带有“说你好”按钮的应用,点击该按钮时,会在控制台输出 “你好!”。
列表 7.12 使用 Tkinter 的 “你好世界”
import tkinterfrom tkinter import ttkwindow = tkinter.Tk()window.title('Hello world app')window.geometry('200x100')def say_hello(): print('Hello there!')hello_button = ttk.Button(window, text='Say hello', command=say_hello)hello_button.pack()window.mainloop()
这段代码首先创建一个 Tkinter 窗口(见图 7.2),并设置应用程序标题和窗口大小。然后,我们在窗口上放置一个按钮,并将其命令设置为 say_hello 函数。当用户按下此按钮时,say_hello 函数执行,打印出我们的消息。最后,我们调用 window.mainloop(),启动 Tk 事件循环,运行我们的应用。
图 7.2 列表 7.12 的 “你好世界” 应用
需要注意的一点是,我们的应用会在 window.mainloop() 上阻塞。内部,此方法运行 Tk 事件循环。这是一个无限循环,它不断检查窗口事件并持续重绘窗口,直到我们关闭它。Tk 事件循环与 asyncio 事件循环有有趣的相似之处。例如,如果我们尝试在按钮的命令中运行阻塞工作,会发生什么?如果我们向 say_hello 函数添加一个 10 秒的延迟,使用 time.sleep(10),我们很快就会发现问题:我们的应用会冻结 10 秒!
与 asyncio 一样,Tkinter 运行 所有 东西在其事件循环中。这意味着,如果有一个长时间运行的操作,比如发送网络请求或加载大文件,我们会阻塞 Tk 事件循环,直到该操作完成。对用户的影响是,UI 会挂起并变得无响应。用户无法点击任何按钮,我们无法用状态或进度更新任何小部件,操作系统可能会显示一个旋转图标(如图 7.3 所示)来表示应用已挂起。这显然是一个不可接受的、无响应的用户界面。
图 7.3 在 Mac 上,因阻塞事件循环而出现的“毁灭之沙漏”。
这是一个异步编程理论上可以帮助我们的地方。如果我们能进行不阻塞 Tk 事件循环的异步请求,我们就可以避免这个问题。这比看起来要复杂,因为 Tkinter 不是 asyncio 友好的,你不能将协程传递给按钮点击事件。我们可能尝试在同一线程中同时运行两个事件循环,但这行不通。Tkinter 和 asyncio 都是单线程的——这就像试图在同一线程中同时运行两个无限循环,这是不可能的。如果我们先启动 asyncio 事件循环,它会阻塞 Tkinter 循环的运行,反之亦然。有没有办法让我们在单线程应用旁边运行 asyncio 应用?
事实上,我们可以通过在独立线程中运行 asyncio 事件循环来组合这两个事件循环,从而创建一个功能性的应用。让我们看看如何通过一个应用来实现这一点,该应用将响应性地更新用户关于长时间运行任务的状态,包括进度条。
7.4.2 使用 asyncio 和线程构建响应式 UI
首先,让我们介绍一下我们的应用并草拟一个基本的用户界面。我们将构建一个 URL 压力测试应用。该应用将接受一个 URL 和多个请求作为输入。当我们按下提交按钮时,我们将使用 aiohttp 以最快的速度发送网络请求,向我们选择的网页服务器施加预定义的负载。由于这可能需要很长时间,我们将添加一个进度条来可视化测试的进度。我们将每完成 1% 的总请求数就更新一次进度条,以显示进度。此外,我们允许用户取消请求。我们的用户界面将包含几个小部件,包括用于测试的 URL 文本输入、用于我们希望发出的请求数量的文本输入、一个开始按钮和一个进度条。我们将设计一个如图 7.4 所示的用户界面。
现在我们已经勾勒出了用户界面,我们需要思考如何让两个事件循环同时运行。基本的想法是,我们将在主线程中运行 Tkinter 事件循环,并在独立线程中运行 asyncio 事件循环。然后,当用户点击“提交”时,我们将一个协程提交给 asyncio 事件循环来运行压力测试。在压力测试运行期间,我们将从 asyncio 事件循环向 Tkinter 事件循环发送命令,以更新我们的进度。这为我们提供了一个如图 7.5 所示的架构。
图 7.5 Tk 事件循环将任务提交给在独立线程中运行的 asyncio 事件循环。
这个新架构包括线程间的通信。我们需要小心避免在此情况下出现竞争条件,特别是因为 asyncio 事件循环 不是 线程安全的!Tkinter 是为线程安全设计的,因此从独立线程调用它时担忧较少(至少在 Python 3+ 中是这样;我们稍后会更详细地讨论这一点)。
我们可能会倾向于使用 asyncio.run 从 Tkinter 提交协程,但这会阻塞直到我们传入的协程完成,会导致 Tkinter 应用挂起。我们需要一个函数,可以在不阻塞的情况下将协程提交给事件循环。有几个新的 asyncio 函数可供学习,它们既非阻塞,又具有线程安全特性,可以正确地提交此类工作。第一个是 asyncio 事件循环上的一个方法,名为 call_soon_threadsafe。此函数接受一个 Python 函数(不是协程),并以线程安全的方式调度它在 asyncio 事件循环的下一次迭代中执行。第二个函数是 asyncio.run_coroutine_threadsafe。此函数接受一个协程,并以线程安全的方式提交它运行,立即返回一个未来,我们可以用它来访问协程的结果。重要的是,而且令人困惑的是,这个未来 不是asyncio 未来,而是来自 concurrent.futures 模块的未来。这样做的逻辑是,asyncio 未来不是线程安全的,但 concurrent.futures 未来是。这个 future 类虽然具有与 asyncio 模块中的 future 相同的功能。
让我们开始定义和实现一些类,以根据我们上述描述构建压力测试应用。我们将首先构建一个压力测试类。这个类负责启动和停止一次压力测试,并跟踪已完成的请求数量。它的构造函数将接收一个 URL、一个 asyncio 事件循环、计划发出的请求数量以及一个进度更新回调。当我们要触发进度条更新时,我们会调用这个回调。在实现用户界面时,这个回调将触发进度条的更新。在内部,我们将计算一个刷新率,即我们执行回调的频率。我们将默认此速率设置为每完成总请求数的 1%。
import asynciofrom concurrent.futures import Futurefrom asyncio import AbstractEventLoopfrom typing import Callable, Optionalfrom aiohttp import ClientSessionclass StressTest: def __init__(self, loop: AbstractEventLoop, url: str, total_requests: int, callback: Callable[[int, int], None]): self._completed_requests: int = 0 self._load_test_future: Optional[Future] = None self._loop = loop self._url = url self._total_requests = total_requests self._callback = callback self._refresh_rate = total_requests // 100 def start(self): <span class="fm-combinumeral">❶</span> future = asyncio.run_coroutine_threadsafe(self._make_requests(), self._loop) self._load_test_future = future def cancel(self): if self._load_test_future: self._loop.call_soon_threadsafe(self._load_test_future.cancel) <span class="fm-combinumeral">❷</span> async def _get_url(self, session: ClientSession, url: str): try: await session.get(url) except Exception as e: print(e) self._completed_requests = self._completed_requests + 1 if self._completed_requests % self._refresh_rate == 0 \ <span class="fm-combinumeral">❸</span> or self._completed_requests == self._total_requests: self._callback(self._completed_requests, self._total_requests) async def _make_requests(self): async with ClientSession() as session: reqs = [self._get_url(session, self._url) for _ in range(self._total_requests)] await asyncio.gather(*reqs)
- ❷ 如果我们想取消,就调用加载测试未来的
cancel 函数。 - ❸ 一旦我们完成了 1% 的请求,就用已完成的请求数和总请求数调用回调。
在我们的 start 方法中,我们调用 run_coroutine_threadsafe,传入 _make_requests,这将在 asyncio 事件循环上启动请求。我们还跟踪了返回的未来,存储在 _load_test_future 中。跟踪这个未来让我们能够在 cancel 方法中取消加载测试。在我们的 _make_requests 方法中,我们创建了一个协程列表来发送所有网络请求,将它们传递给 asyncio.gather 来运行。我们的 _get_url 协程会发起请求,递增 _completed_requests 计数器,并在必要时调用回调,传递已完成的请求数。我们可以简单地实例化这个类并调用 start 方法来使用它,可选地通过调用 cancel 方法来取消。
一个有趣的事情值得注意的是,尽管多个协程在更新它,但我们没有在 _completed_requests 计数器周围使用任何锁定。记住,asyncio 是单线程的,asyncio 事件循环一次只运行一段 Python 代码。这使得在使用 asyncio 时递增计数器是原子的,尽管在多个线程之间发生时不是原子的。asyncio 为我们避免了许多多线程中会出现的竞争条件,但并非全部。我们将在后面的章节中对此进行更多探讨。
接下来,让我们实现我们的 Tkinter GUI 来使用这个负载测试器类。为了代码整洁,我们将直接继承 TK 类并在构造函数中初始化我们的小部件。当用户点击开始按钮时,我们将创建一个新的 StressTest 实例并启动它。现在的问题是,我们传递给 StressTest 实例的回调是什么?线程安全性在这里成为一个问题,因为我们的回调将在工作线程中被调用。如果我们的回调修改了共享数据,而主线程也能修改这些数据,这可能会导致竞争条件。在我们的情况下,由于 Tkinter 具有内置的线程安全性,我们只是在更新进度条,所以应该没问题。但如果我们需要对共享数据进行操作怎么办?锁定是一种方法,但如果我们能在线程中运行我们的回调,我们就能避免任何竞争条件。我们将使用一个通用模式来演示如何做到这一点,尽管直接更新进度条应该是安全的。
一个常见的模式是使用 queue 模块中的共享线程安全队列。我们的 asyncio 线程可以将进度更新放入此队列。然后,我们的 Tkinter 线程可以在其线程中检查此队列,更新进度条。我们需要告诉 Tkinter 在主线程中轮询队列以做到这一点。
Tkinter 有一个方法,可以在主线程中指定时间间隔后排队运行一个函数,称为 after。我们将使用此方法来运行一个询问队列是否有新进度更新的方法(列表 7.14)。如果队列中有更新,我们就可以从主线程安全地更新进度条。我们将每 25 毫秒轮询一次队列,以确保我们获得合理的延迟更新。
关于 Tkinter 是否真的线程安全?如果你搜索 "Tkinter" 和 "线程安全",你会找到很多相互矛盾的信息。Tkinter 的线程状况相当复杂。部分原因是,多年来,Tk 和 Tkinter 缺乏适当的线程支持。即使添加了线程模式,也存在一些已被修复的错误。Tk 支持非线程模式和线程模式。在非线程模式下,没有线程安全;从主线程以外的任何地方使用 Tkinter 都会招致崩溃。在旧版本的 Python 中,未开启 Tk 线程安全;然而,在 Python 3 及更高版本中,默认启用了线程安全,我们有线程安全保证。在线程模式下,如果从工作线程发出更新,Tkinter 会获取一个互斥锁,
并将更新事件写入队列,供主线程稍后处理。相关代码位于 CPython 的 函数中,位于 文件中。
from queue import Queuefrom tkinter import Tkfrom tkinter import Labelfrom tkinter import Entryfrom tkinter import ttkfrom typing import Optionalfrom chapter_07.listing_7_13 import StressTestclass LoadTester(Tk): def __init__(self, loop, *args, **kwargs): <span class="fm-combinumeral">❶</span> Tk.__init__(self, *args, **kwargs) self._queue = Queue() self._refresh_ms = 25 self._loop = loop self._load_test: Optional[StressTest] = None self.title('URL Requester') self._url_label = Label(self, text="URL:") self._url_label.grid(column=0, row=0) self._url_field = Entry(self, width=10) self._url_field.grid(column=1, row=0) self._request_label = Label(self, text="Number of requests:") self._request_label.grid(column=0, row=1) self._request_field = Entry(self, width=10) self._request_field.grid(column=1, row=1) self._submit = ttk.Button(self, text="Submit", command=self._start)<span class="fm-combinumeral">❷</span> self._submit.grid(column=2, row=1) self._pb_label = Label(self, text="Progress:") self._pb_label.grid(column=0, row=3) self._pb = ttk.Progressbar(self, orient="horizontal", length=200, mode="determinate") self._pb.grid(column=1, row=3, columnspan=2) def _update_bar(self, pct: int): <span class="fm-combinumeral">❸</span> if pct == 100: self._load_test = None self._submit['text'] = 'Submit' else: self._pb['value'] = pct self.after(self._refresh_ms, self._poll_queue) def _queue_update(self, completed_requests: int, total_requests: int): <span class="fm-combinumeral">❹</span> self._queue.put(int(completed_requests / total_requests * 100)) def _poll_queue(self): <span class="fm-combinumeral">❺</span> if not self._queue.empty(): percent_complete = self._queue.get() self._update_bar(percent_complete) else: if self._load_test: self.after(self._refresh_ms, self._poll_queue) def _start(self): <span class="fm-combinumeral">❻</span> if self._load_test is None: self._submit['text'] = 'Cancel' test = StressTest(self._loop, self._url_field.get(), int(self._request_field.get()), self._queue_update) self.after(self._refresh_ms, self._poll_queue) test.start() self._load_test = test else: self._load_test.cancel() self._load_test = None self._submit['text'] = 'Submit'
- ❶ 在我们的构造函数中,我们设置文本输入、标签、提交按钮和进度条。
- ❷ 当点击时,我们的提交按钮将调用
_start 方法。 - ❸
update_bar 方法将进度条设置为 0 到 100 之间的百分比完成值。此方法应仅在主线程中调用。 - ❹ 这是传递给压力测试的回调;它将进度更新添加到队列中。
- ❺ 尝试从队列中获取进度更新;如果有,就更新进度条。
在我们应用的构造函数中,我们创建了用户界面所需的所有小部件。最重要的是,我们创建了用于测试的 URL 和要运行的请求数的 Entry 小部件,一个提交按钮和一个水平进度条。我们还使用 grid 方法将这些小部件适当地排列在窗口中。
当我们创建提交按钮小部件时,我们将命令指定为 _start 方法。此方法将创建一个 StressTest 对象并启动它,除非我们已经有一个负载测试在运行,否则将取消它。当我们创建一个 StressTest 对象时,我们将 _queue_update 方法作为回调传入。StressTest 对象将在有进度更新时调用此方法。当此方法运行时,我们计算适当的百分比并将该值放入队列。然后,我们使用 Tkinter 的 after 方法安排 poll_queue 方法每 25 毫秒运行一次。
使用队列作为共享通信机制,而不是直接调用 _update_bar,将确保我们的 _update_bar 方法在 Tkinter 事件循环线程中运行。如果不这样做,进度条更新将在 asyncio 事件循环中发生,因为回调在该线程中运行。
现在我们已经实现了用户界面应用,我们可以将这些部分组合起来创建一个完整的应用。我们将创建一个新线程在后台运行事件循环,然后启动我们新创建的 LoadTester 应用。
import asynciofrom asyncio import AbstractEventLoopfrom threading import Threadfrom chapter_07.listing_7_14 import LoadTesterclass ThreadedEventLoop(Thread): <span class="fm-combinumeral">❶</span> def __init__(self, loop: AbstractEventLoop): super().__init__() self._loop = loop self.daemon = True def run(self): self._loop.run_forever()loop = asyncio.new_event_loop()asyncio_thread = ThreadedEventLoop(loop)asyncio_thread.start() <span class="fm-combinumeral">❷</span>app = LoadTester(loop) <span class="fm-combinumeral">❸</span>app.mainloop()
- ❶ 我们创建一个新线程类来永久运行
asyncio 事件循环。 - ❷ 启动新线程在后台运行
asyncio 事件循环。 - ❸ 创建负载测试器的 Tkinter 应用,并启动其主事件循环。
我们首先定义一个 ThreadedEventLoopClass,它继承自 Thread,以运行我们的事件循环。在该类的构造函数中,我们接收一个事件循环,并将线程设置为守护线程。我们将线程设置为守护线程,因为 asyncio 事件循环将在此线程中无限期地运行并阻塞。如果我们在非守护模式下运行,这种无限循环将阻止我们的 GUI 应用关闭。在该线程的 run 方法中,我们调用事件循环的 run_forever 方法。这个方法命名得很恰当,因为它确实只是开始运行事件循环,阻塞直到我们停止事件循环。
一旦我们创建了这个类,我们就用 new_event_loop 方法创建一个新的 asyncio 事件循环。然后,我们创建一个 ThreadedEventLoop 实例,传入我们刚刚创建的循环并启动它。这会创建一个新线程,其中包含我们的事件循环。最后,我们创建一个 LoadTester 应用的实例并调用 mainloop 方法,启动 Tkinter 事件循环。
当我们在该应用中运行压力测试时,我们应该看到进度条平滑地更新,而不会冻结用户界面。我们的应用保持响应,我们可以随时点击取消来停止负载测试。在独立线程中运行 asyncio 事件循环的技术对于构建响应式 GUI 非常有用,但对于任何同步的遗留应用也同样有用,其中协程和 asyncio 无法顺利集成。
我们现在已经看到了如何利用线程处理各种 I/O 密集型工作负载,但关于 CPU 密集型工作负载呢?回想一下,GIL 阻止我们在线程中并发运行 Python 字节码,但有一些值得注意的例外,允许我们在某些线程中进行一些 CPU 密集型工作。