深入 asyncio

awaitable 对象

用于 await 表达式中的对象。可以是的 coroutine 也可以是实现了 __await__() 方法的对象,参见 PEP 492。类比于 Iterable 对象是 Generator 或实现了 __iter__() 方法的对象。

object.__await__(self)

必须返回生成器,asyncio.Future 类也实现了该方法,用于兼容 await 表达式。

而 Task 继承自 Future,因此 awaitable 对象有三种:coroutinesTasksFutures

await 的目的:

  • 获取协程的结果
  • 挂起当前协程,将控制交由事件循环,切换到其他协程,然后等待结果,最后恢复协程继续执行

Coroutine 对象

Coroutine 对象是 awaitable 对象,包括 async def 定义的协程和 types.coroutine()asyncio.coroutine() 装饰的生成器,但后两者内部没有实现 __await__() 方法

import asyncio

async def f():
await asyncio.sleep(3)
  • coroutine function:用 async def 定义的函数,比如上面的 f,类比于生成器函数。

  • coroutine object:调用协程函数返回的对象,比如上面的 f(),类比于生成器对象。

  • generator-based coroutines:由 asyncio.coroutine() 装饰的生成器,该语法即将在 Python 3.10 中移除,不要再使用了。

    import asyncio
    import types

    @asyncio.coroutine
    def f():
    yield from asyncio.sleep(3)

    @types.coroutine
    def g():
    yield from asyncio.sleep(3)

有几个内置的函数可以判断协程的类型

import asyncio
import types


@asyncio.coroutine
def f():
yield from asyncio.sleep(3)


@types.coroutine
def g():
yield from asyncio.sleep(3)


async def h():
await asyncio.sleep(3)
True
True
True
True
False
True

运行协程

推荐使用 async/await 语法声明协程。

>>> import asyncio

>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')

>>> asyncio.run(main())
hello
world

简单地调用一个协程并不会将其加入执行计划中:

>>> main()
<coroutine object main at 0x1053bb7c8>

具体怎么运行,看下面

APIs

下面这些方法都是由事件循环控制,使用者无需手动控制。

  • coroutine.send(value)
  • coroutine.throw(type[, value[, traceback]])
  • coroutine.close()

Future 对象

Future 对象用于桥接低层的回调代码和高层的 async/await 代码。

  • Future 代表了异步操作的最终结果,非线程安全。
  • Future 是 awaitable 对象,协程可以 await Future 对象,直到有结果或发生异常,或被取消。
  • 在 asyncio 中需要 Future 对象,这样允许回调代码可用于 async/await 中。
  • 通常情况下没有必要在应用层级的代码中创建 Future 对象。如果要创建,也不要直接实例化 Future 对象,建议用 loop.create_future() 来创建 Future 对象,该方法会将 loop 绑定到创建的 future 上。
  • Python 3.7 中已经支持 contextvars 模块。
import asyncio


async def set_after(future, delay, value):
await asyncio.sleep(delay)
future.set_result(value)


async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.create_task(set_after(future, 1, '... world'))

print('hello ...')
print(await future)


asyncio.run(main())

APIs

相关的函数:

  • asyncio.isfuture(obj)
  • asyncio.ensure_future(obj, *, loop=None)
  • asyncio.wrap_future(future, *, loop=None)

实例方法:

  • result()

    • 如果 Future 已经完成且有了 set_result() 方法设置的结果,则返回该结果。
    • 如果 Future 已经完成且有了 set_exception() 方法设置的异常,则引发该异常。
    • 如果 Future 已经取消,则引发 CancelledError 异常。
    • 如果 Future 还未就绪,则引发 InvalidStateError 异常。
  • set_result(result) 标记 Future 为完成状态,然后设置结果。如果 Future 已经完成,会引发 InvalidStateError

  • set_exception(exception) 标记 Future 为完成状态,然后设置一个 exception。如果 Future 已经完成,会引发 InvalidStateError

  • done() 如果 Future 已经完成,返回 True

    Future 完成,存在三种可能:

    • 被取消了
    • 被设置了 set_result(result)
    • 被设置了 set_exception(exception)
  • cancelled() 如果 Future 被取消,则返回 True。该方法通常用于在设置 result 或 exception 前,检查 Future 是否被取消。

    if not fut.cancelled():
    fut.set_result(42)
  • add_done_callback(callback, *, context=None)

    给 Future 添加回调,当 Future 完成时,会执行该回调。

    该回调调用时会用该 Future 对象作为唯一的参数,即每次都是调用 callback(future)

    如果该方法调用时,Future 已经完成了,那么回调会被 loop.call_soon() 执行。

    可选的关键字参数 context 允许添加自定义的 contextvars.Context 用于回调的运行。当没有提供 context 时,默认的 context 会被使用。

    functools.partial() 可以用在回调中用来传递参数。

    import asyncio


    async def f():
    print('OK')
    return 'aaa'


    def get_result(future):
    print(future.result())
    loop.stop()


    loop = asyncio.get_event_loop()
    task = loop.create_task(f())
    task.add_done_callback(get_result)
    loop.run_forever()
  • remove_done_callback(callback) 将回调从回调列表中移除,并返回移除的回调数量,一般返回 1,除非之前回调添加过多次。

  • cancel() 取消 Future 和并计划执行回调。如果 Future 已经完成或取消,则返回 False,否则将 Future 的状态为已取消,并计划执行回调,返回 True

  • exception()

    返回 Future 设置的异常。

    • 只有当 Future 完成时,才返回异常(如果异常未设置,则返回 None)。

    • 如果 Future 已经取消,则引发 CancelledError 异常。

    • 如果 Future 还未就绪,则引发 InvalidStateError 异常。
  • get_loop() Python 3.7 新功能,返回该 Future 对象绑定的事件循环。

注意:Future 对象设计时参考了 concurrent.futures.Future。几处关键不同如下:

  • 不像 asyncio Futuresconcurrent.futures.Future 实例不能被 awaited。
  • asyncio.Future.result()asyncio.Future.exception() 不接受 timeout 参数。
  • 当 Future 还未完成时,asyncio.Future.result()asyncio.Future.exception() 引发 InvalidStateError 异常。
  • asyncio.Future.add_done_callback() 注册的回调不会立即被调用,取而代之的是会被 loop.call_soon() 计划调用。
  • asyncio Future 和 concurrent.futures.wait()concurrent.futures.as_completed() 函数不兼容。

Task 对象

类似于 Future 的对象,运行在协程中,非线程安全。

Tasks 用来在事件循环中运行协程,如果协程 await 一个 Future,Task 会暂停协程的执行,等待 Future 完成。当 Future 完成后,被包裹的协程开始恢复运行。

事件循环使用协作式调度:一个循环事件一次运行一个 Task,当 Task await Future 的完成时,事件循环就会去运行其他的 Tasks、回调或者执行 IO 操作。

创建 Tasks

  • asyncio.create_task()

    将 coro 协程包裹到 Task 对象,加入到执行计划中。返回 Task 对象。该函数在 Python 3.7 中被添加。

    该 task 由 get_running_loop() 返回的 loop 来执行,如果当前线程没有运行的 loop,将会引发 RuntimeError

还可以用低层的 loop.create_task()ensure_future() 函数来创建。直接实例化 Task 是不推荐的。

取消 Tasks

要取消运行中的 Task,用 cancel() 方法。但调用它会造成 Task 抛出 CancelledError 异常到包裹的协程中,如果协程在取消过程中还在 awaiting 一个 Future,这个 Future 也会被取消。

cancelled() 方法可以用来检查 Task 是否被取消了,返回 True 代表包裹的协程没有抑制 CancelledError 异常,已经被取消了。

async def coro():
...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

# 第三种
loop = events.get_event_loop()
task = loop.create_task(coro())

APIs

asyncio.Task 继承自 Future,它的所有 APIs 除了 Future.set_result()Future.set_exception() 都被继承。

Tasks 支持 contextvars 模块。当 Task 被创建,它会复制当前上下文,之后用复制的上下文运行协程。

实例方法

  • cancel()
  • cancelled()
  • done()
  • result()
  • exception()
  • add_done_callback(callback, *, context=None)
  • remove_done_callback(callback)
  • get_stack(*, limit=None)
  • print_stack(*, limit=None, file=None)

类方法

  • classmethod all_tasks(loop=None)
  • classmethod current_task(loop=None)

睡眠

coroutine asyncio.sleep(delay, result=None, *, loop=None)

阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

loop 参数已弃用,计划在 Python 3.10 中移除。

以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime

async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)

asyncio.run(display_date())

事件循环 Event Loop

事件循环是 asyncio 应用的核心,事件循环运行异步 tasks 和 callbacks,执行网络 IO 操作,运行子进程 subprocesses。

应用开发者通常应当使用高层级的 asyncio 函数,例如 asyncio.run(),应当很少有必要引用 loop 对象或调用其方法。 本节所针对的主要是低层级代码、库和框架的编写者,他们需要更细致地控制事件循环行为。

获取事件循环

  • asyncio.get_running_loop()

    返回当前 OS 线程中正在运行的事件循环。

    如果没有正在运行的事件循环则会引发 RuntimeError。 此函数只能由协程或回调来调用。

  • asyncio.get_event_loop()

    获取当前事件循环。 如果当前 OS 线程没有设置当前事件循环并且 set_event_loop() 还没有被调用,asyncio 将创建一个新的事件循环并将其设置为当前循环。

    由于此函数具有相当复杂的行为(特别是在使用了自定义事件循环策略的时候),更推荐在协程和回调中使用 get_running_loop() 函数而非 get_event_loop()

    应该考虑使用 asyncio.run() 函数而非使用低层级函数来手动创建和关闭事件循环。

  • asyncio.set_event_loop(loop)

    将 loop 设置为当前 OS 线程的当前事件循环。

  • asyncio.new_event_loop()

    创建一个新的事件循环。

运行和停止循环

  • loop.run_until_complete(future)

  • loop.run_forever()

  • loop.stop() 停止事件循环

  • loop.is_running() 如果事件循环还在运行,返回 True

  • loop.close()

    关闭事件循环。

    该函数调用时,循环一定不会运行,任何 pending 的回调都会被放弃。

    该方法清除所有的队列,关闭 executor,但不会等待 executor 完成。

    该方法是不可逆转的,不要在事件循环关闭之后调用 loop 的其他方法。

    try:
    loop.run_forever()
    finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()
  • coroutine loop.shutdown_asyncgens()

    计划用 aclose() 调用将当前所有开着的异步生成器对象关闭。调用此方法后,如果新的异步生成器还在迭代,事件循环会引起警告。该方法会可靠地终结所有计划关闭的异步生成器。

    当使用了 asyncio.run() 时,无需调用该调用该函数。

安排回调

  • loop.call_soon(callback, *args, context=None)

    用参数 args 安排一个回调,在事件循环下一次迭代时调用。

    回调按注册的顺序调用,每个回调只调用一次。

    可选的关键字参数 context 允许添加自定义的 contextvars.Context 用于回调的运行。当没有提供 context 时,默认的 context 会被使用。

    该方法不是线程安全的。

  • loop.call_soon_threadsafe(callback, *args, context=None)

    线程安全版本的 call_soon(),必须用在其他线程的安排的回调中。

和 Future 的回调不一样,这里允许传递参数,不过只能是非关键字参数,因为是通过 *args 接受。除非通过 functools.partial 预先传递关键字参数。

# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
import asyncio
from functools import partial


async def f():
print('OK')
return 'done'


def callback1(args):
print(*args)
loop.stop()


def callback2(a, b=None):
print(a, b)


loop = asyncio.get_event_loop()
task = loop.create_task(f())
loop.call_soon(callback1, (1, 2, 3))
loop.call_soon(partial(callback2, 10, b=11))
loop.run_forever()

Python 3.7 增加了上下文变量 Context Variables,至于为什么不用全局变量,因为可能会被其他协程修改,不安全,在这里也可以使用。

import asyncio
from contextvars import ContextVar

future_var = ContextVar('future')


async def f():
print('OK')
return 'aaa'


def get_result():
future = future_var.get('future')
print(future.result())
loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(f())
future_var.set(task)
loop.call_soon(get_result)
loop.run_forever()

安排延迟的回调

事件循环还提供了机制,将回调在 future 的某个时间点调用,事件循环用 monotonic clocks 追踪时间。

  • loop.call_later(delay, callback, *args, context=None)

    安排回调在 delay(整数或浮点数)秒后调用。

    返回 asyncio.TimerHandle 实例,可以用于取消回调。

    callback 只执行一次,如果两个回调都被安排在同一时间,那调用的顺序是不确定的。

    可选的位置参数 args 会被传递给 callback,当回调被调用时,如果你想传递关键字参数,你要用 functools.partial()

    可选的关键字参数 context 允许指定自定义的 contextvars.Context 用于回调的运行。当没有提供 context 时,默认的 context 会被使用。

    import asyncio


    def callback(n):
    print(f'执行 callback {n}')


    async def main():
    loop = asyncio.get_running_loop()
    print('注册 callbacks 中')
    loop.call_later(.1, callback, 1) # 第一个参数为延迟时间
    loop.call_later(.5, callback, 2) # 延迟时间超过了协程运行时间,协程已经提前退出,所以没有执行
    loop.call_soon(callback, 3)

    await asyncio.sleep(0.4)


    asyncio.run(main())
  • loop.call_at(when, callback, *args, context=None)

    安排回调在 when(整数或浮点数)指定的完全时间戳调用,和 loop.time() 使用同样的时间引用。

    这个方法的行为与 call_later() 相同。

    返回 asyncio.TimerHandle 实例,可以用于取消回调。

  • loop.time()

    返回当前时间,浮点数值,根据事件循环的内部 monotonic clock。

    Monotonic Clocks,意思是单调时间,所谓单调,就是只会不停的往前增长,不受校时操作的影响,这个时间是自进程启动以来的秒数。

    import asyncio
    import time


    def callback(n, loop):
    print(f'执行 callback {n}{loop.time()}')


    async def main():
    loop = asyncio.get_running_loop()
    now = loop.time()
    print(f'clock time: {time.time()}')
    print(f'loop time: {now}')

    print('注册 callbacks 中')
    loop.call_at(now + 0.2, callback, 1, loop)
    loop.call_at(now + 0.1, callback, 2, loop)
    loop.call_soon(callback, 3, loop)

    await asyncio.sleep(1)


    asyncio.run(main())

创建 Futures 和 Tasks

  • loop.create_future()

    创建依附于 loop 的 asyncio.Future 对象。

    这是在 asyncio 中创建 Future 更好的方式,给第三方事件循环提供了可选的 Future 对象实现。

  • loop.create_task(coro)

    安排 Coroutines 的执行,返回 Task 对象。

  • loop.set_task_factory(factory)

    loop.create_task(coro) 设置 task 工厂。

    如果 factory 为 None,那么将会用默认的 factory 设置。否则 factory 必须拥有匹配 (loop, coro) 签名的 callable,loop 为激活的事件循环的引用,coro 是协程对象,callable 必须返回 asyncio.Future 兼容对象。

  • loop.get_task_factory()
    返回 task factory,如果使用的默认 factory,则返回 None

在线程或进程池中执行

  • awaitable loop.run_in_executor(executor, func, *args)

    安排函数在指定的 executor 中调用。

    executor 参数是 concurrent.futures.Executor 实例。如果 executor 为 None,则使用默认的 executor。

    返回 asyncio.Future 对象。

    functools.partial() 传递关键字参数。

    import asyncio
    import concurrent.futures

    def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
    return f.read(100)

    def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

    async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
    None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
    result = await loop.run_in_executor(
    pool, blocking_io)
    print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
    result = await loop.run_in_executor(
    pool, cpu_bound)
    print('custom process pool', result)

    asyncio.run(main())
  • set_default_executor(executor)

    run_in_executor() 设置默认的 executor,executor 参数是 concurrent.futures.ThreadPoolExecutor 实例。

启用调试模式

  • loop.get_debug()

    查看事件循环的调试模式是否启用

    如果环境变量 PYTHONASYNCIODEBUG 为非空,则返回 True,否则返回 False

  • loop.set_debug(enabled: bool)

    启用事件循环的 debug 模式

运行子进程

推荐用高级的 asyncio.create_subprocess_shell()asyncio.create_subprocess_exec() 代替 loop 的低级 API。

注意:默认的 asyncio 事件循环在 Windows 上不支持子进程,在 Windows 上应用 ProactorEventLoop 代替。

import asyncio

asyncio.set_event_loop_policy(
asyncio.WindowsProactorEventLoopPolicy())

asyncio.run(your_code())
  • coroutine asyncio.create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None, loop=None, limit=None, **kwds)

    创建子进程。

    limit:如果给 stdout 和 stderr 参数设置了 subprocess.PIPE,那么会设置包裹了 Process.stdout 和 Process.stderr 的 StreamReader 的缓冲大小限制。

    返回 asyncio.subprocess.Process 实例。

    import asyncio
    import sys

    async def get_date():
    code = 'import datetime; print(datetime.datetime.now())'

    # Create the subprocess; redirect the standard output
    # into a pipe.
    proc = await asyncio.create_subprocess_exec(
    sys.executable, '-c', code,
    stdout=asyncio.subprocess.PIPE)

    # Read one line of output.
    data = await proc.stdout.readline()
    line = data.decode('ascii').rstrip()

    # Wait for the subprocess exit.
    await proc.wait()
    return line

    if sys.platform == "win32":
    asyncio.set_event_loop_policy(
    asyncio.WindowsProactorEventLoopPolicy())

    date = asyncio.run(get_date())
    print(f"Current date: {date}")
  • coroutine asyncio.create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, loop=None, limit=None, **kwds)

    运行 cmd 终端命令。

    limit:如果给 stdout 和 stderr 参数设置了 subprocess.PIPE,那么会设置包裹了 Process.stdout 和 Process.stderr 的 StreamReader 的缓冲大小限制。

    返回 asyncio.subprocess.Process 实例。

    import asyncio

    async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
    cmd,
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
    print(f'[stdout]\n{stdout.decode()}')
    if stderr:
    print(f'[stderr]\n{stderr.decode()}')

    asyncio.run(run('ls /zzz'))
    async def main():
    await asyncio.gather(
    run('ls /zzz'),
    run('sleep 1; echo "hello"'))

    asyncio.run(main())

常量

  • asyncio.subprocess.PIPE

    可用于 stdin、stdout 或 stderr 参数。

    如果用于 stdin 参数:Process.stdin 属性会指向 StreamWriter 实例。

    如果用于 stdout 或 stderr 参数:Process.stdoutProcess.stderr 属性会指向 StreamReader 实例。

  • asyncio.subprocess.STDOUT

    特殊值,可以用在 stderr 参数,指明标准错误应该重定向到标准输出。

  • asyncio.subprocess.DEVNULL

    特殊值,可以用于 stdin、stdout 或 stderr 参数,处理创建函数,指明特殊文件 os.devnull 将被用于适合的子进程流。

class asyncio.subprocess.Process

create_subprocess_exec()create_subprocess_shell() 返回的包裹系统进程的对象。

该类和 subprocess.Popen 类拥有相似的 API,但又有几点不同:

  • 不像 Popen,Process 实例没有与 poll() 等价的方法;
  • communicate()wait() 方法没有 timeout 参数:使用 wait_for() 函数;
  • Process.wait() 方法是异步的,而 subprocess.Popen.wait() 用阻塞的 busy loop实现的;
  • 不支持 universal_newlines 参数。

该类不是线程安全的。

  • coroutine wait()

    等待子进程终结,返回 returncode 属性。

    注意:当 stdout=PIPEstderr=PIPE,子进程产生太多的输出,该方法可能造成死锁,阻塞系统 pipe 缓冲等待接收更多的数据。用 pipes 时,应该使用 communicate() 方法以避免这种情况。

  • coroutine communicate(input=None)

    和 process 交互:

    • 如果 input 不是 None,会像标准输入发送数据
    • 从标准输出和标准错误读取数据,当遇到 EOF 时停止。
    • 等待进程终结。

    可选参数 input 是将发送给子进程的数据(bytes 对象)。

    返回 (stdout_data, stderr_data) 元组。

    要向进程的标准输入发送数据,请用 stdin=PIPE。同理,返回的元组要获取数据,请用 stdout=PIPEstderr=PIPE

  • send_signal(signal)

  • terminate() 杀死子进程,POSIX 系统上发送 signal.SIGTERM 命令杀死子进程,Windows 上调用 TerminateProcess() 停止子进程。
  • kill() 杀死子进程,POSIX 系统上发送 SIGKILL 命令杀死子进程,Windows 上是 terminate() 的别名。
  • stdin
  • stdout
  • stderr
  • pid
  • returncode

事件循环实现

asyncio 有两种事件循环实现:SelectorEventLoop 和 ProactorEventLoop。

默认情况下,asyncio 在所有平台使用 SelectorEventLoop。

class asyncio.SelectorEventLoop

基于 selectors 模块的事件循环

适用于:Unix、Windows。

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

class asyncio.ProactorEventLoop
基于 Windows 的 “I/O Completion Ports” (IOCP) 实现的事件循环

适用于:Windows。

import asyncio
import sys

if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)

class asyncio.AbstractEventLoop
asyncio-compliant event loops 的抽象基类

回调处理

class asyncio.Handle

loop.call_soon()loop.call_soon_threadsafe() 返回的包裹回调的对象

  • cancel() 取消回调,如果回调已经取消或执行,那么无效。
  • cancelled() 如果回调被取消,返回 True

class asyncio.TimerHandle

loop.call_later()loop.call_at() 返回的包裹回调的对象,是 Handle 的子类。

  • when() 返回回调安排的时间,浮点数的秒,该时间是一个完全时间戳,和 loop.time() 引用同样的时间。

运行 Task

先将 coroutine 包裹成 Task 对象,Task 也可以说是包裹了 coroutine 的 Future 对象,事件循环负责运行 Future 对象,如果是 Task 对象,则执行 Task 对象包裹的 coroutine,如果 coroutine 中 await 了一个 Future,Task 会挂起当前协程,等待 Future 完成,而 Future 在执行 set_result(result) 设置 result 或 set_exception(exception) 设置异常后,会将 Future 标记为完成。当 Future 完成后,会根据情况获取其 result 或引发异常,如果没有引发异常,则恢复协程,继续执行。

因此想要运行 coroutine,首先要将其转换成 Task 对象,而具体做法有好多种。

AbstractEventLoop.run_until_complete(future)

一直运行到 future(Future 实例)完成,返回 Future 的结果,或者引发异常。

如果参数是 coroutine,内部会隐式调用将其包裹成 asyncio.Task 对象,并加入运行计划。

import asyncio


async def f():
print('OK')
return 'Done'


loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(f())
print(result)
finally:
loop.close()

也可以手动将 coroutine 包裹成 asyncio.Task 对象。

asyncio.ensure_future(f())

import asyncio


async def f():
print('OK')
return 'Done'


task = asyncio.ensure_future(f())
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(task)
print(result)
print(task.result())
finally:
loop.close()
import asyncio


async def done():
return 'Done'


async def f():
print('OK')
return await asyncio.ensure_future(done())


loop = asyncio.get_event_loop()

try:
result = loop.run_until_complete(f())
print(result)
finally:
loop.close()

asyncio.create_task(coro)

该函数源码如下,可以发现此函数要求当前线程必须先有运行的事件循环才行,如果没有运行的事件循环,会引发 RuntimeError

def create_task(coro):
"""Schedule the execution of a coroutine object in a spawn task.

Return a Task object.
"""
loop = events.get_running_loop()
return loop.create_task(coro)

下面的写法是错误的

import asyncio


async def f():
print('OK')
return 'Done'


loop = asyncio.get_event_loop()
task = asyncio.create_task(f())
try:
result = loop.run_until_complete(task)
print(result)
print(task.result())
finally:
loop.close()

应该改成:

import asyncio


async def done():
return 'Done'


async def f():
print('OK')
return await asyncio.create_task(done())


loop = asyncio.get_event_loop()

try:
result = loop.run_until_complete(f())
print(result)
finally:
loop.close()

loop.create_task(coro())

import asyncio


async def f():
print('OK')
return 'Done'


loop = asyncio.get_event_loop()
task = loop.create_task(f())

try:
result = loop.run_until_complete(task)
print(result)
finally:
loop.close()
import asyncio


async def done():
return 'Done'


async def f():
print('OK')
task = loop.create_task(done())
return await task


loop = asyncio.get_event_loop()

try:
result = loop.run_until_complete(f())
print(result)
finally:
loop.close()

loop.create_future()

import asyncio


def mark_done(future):
future.set_result('Done')


loop = asyncio.get_event_loop()

try:
future = loop.create_future()
loop.call_soon(mark_done, future)
result = loop.run_until_complete(future)
print(result)
finally:
loop.close()
import asyncio


def mark_done(future):
future.set_result('Done')


async def f():
print('OK')
future = loop.create_future()
loop.call_soon(mark_done, future)
return await future


loop = asyncio.get_event_loop()

try:
result = loop.run_until_complete(f())
print(result)
finally:
loop.close()

在协程中添加 Task 注意事项

添加 task 后一定要 await 该 task,因为事件循环只能从一个入口进入,如果该入口的协程没有去 await 添加的 task,该协程一旦提前执行完毕,不会等待 task 而提前结束,进而将添加的 task 取消。所以必须要 await 该 task。

import asyncio


async def f():
await asyncio.sleep(1)
print('f')


async def main():
print('main')
t = asyncio.create_task(f())
await asyncio.sleep(0.1)


asyncio.run(main())

# main
import asyncio


async def f():
await asyncio.sleep(1)
print('f')


async def main():
print('main')
await asyncio.sleep(0.1)
await asyncio.create_task(f())


asyncio.run(main())

# main
# f

loop.run_forever()

会一直运行事件循环,直到 loop.stop() 被调用。

如果 loop.stop()loop.run_forever() 调用前被调用,该循环会以 timeout 为 0 轮询 I/O selector 一次,运行所有安排的回调以响应 I/O 事件(那些已经被安排过的),然后退出。

如果 loop.stop() 调用时, loop.run_forever() 在运行,该循环会运行当前的一批回调,然后退出。注意安排的新回调在此情况下不会被运行,它们会在下一次 loop.run_forever()run_until_complete() 调用时运行。

import asyncio

async def f():
print('OK')

loop = asyncio.get_event_loop()
task = loop.create_task(f())
try:
loop.run_forever()
finally:
loop.close()

asyncio.run(coro, *, debug=False)

该函数运行传入的协程,还负责管理 asyncio 事件循环,结束异步生成器。

当其他 asyncio 事件循环在同一线程运行时,该函数不能被调用。

如果 debug 为 True,事件循环运行在调试模式下。

该函数总是会创建一个新的事件循环,最后关闭循环,适用于只有一个 main() 入口且只调用一次的 asyncio 程序。

该函数在 Python 3.7 中被加入,属于临时 API。

并发运行 Tasks

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

并发运行 aws 序列中的 awaitable 对象。

如果 aws 中的 awaitable 是协程,会被自动以 Task 安排。

如果 aws 中所有的 awaitable 都已经成功完成,结果会聚合到一起作为结果列表返回。返回值顺序按照 aws 的顺序放置。

如果 return_exceptionsFalse(默认值),第一个引发的异常会被立即传播到在 gather() 中 await 的 task。aws 序列中其他的不会被取消,还会继续运行。

如果 return_exceptionsTrue,异常会被视为成功的 results,聚合到一起作为结果列表返回。

如果 gather() 被取消,所有提交的还未完成的 awaitables 都会被取消。

如果 aws 序列中的任何 Task 或 Future 被取消,会被视为引发了 CancelledErrorgather() 调用在此情况下不会被取消,这是为了防止提交的 Task/Future 被取消后,造成其他的 Tasks/Futures 也被取消。

在 3.7 版更改:如果 gather 本身被取消,则无论 return_exceptions 取值为何,取消都会被传播。

import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")

async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)

asyncio.run(main())

# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

见下面 Waiting Primitives

asyncio.as_completed(aws, *, loop=None, timeout=None)

见下面 Waiting Primitives

避免被取消

awaitable asyncio.shield(aw, *, loop=None)

保护 awaitable 对象不被取消。

如果 aw 是协程,那么它会被自动安排成 Task。

res = await shield(something())

等价于

res = await something()

区别:上面的语句中,如果包含的协程被取消,运行在 something() 中的 Task 不会取消。虽然调用者已经被取消,但从 something() 的视角看,取消并没发生。所以 “await” 表达式仍然会引发 CancelledError

如果 something() 被通过其他方式取消(例如,从自身内部),可能会取消 shield()

如果你希望完全忽略取消(不建议),你应该将 shield() 函数与 try/except 子句结合:

try:
res = await shield(something())
except CancelledError:
res = None

超时

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

等待 aw awaitable 完成,指定 timeout 秒数后超时。

如果 aw 是一个协程,它将自动作为 Task 被安排。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeoutNone,则阻塞直到完成。

如果发生超时,任务将被取消并引发 asyncio.TimeoutError.

要避免任务 取消,可以用 shield() 包裹。

函数将等待直到 future 确实被取消,所以总等待时间可能会超过 timeout 指定的秒数。

如果该等待操作被取消,则 futur aw 也会被取消。

loop 参数已弃用,计划在 Python 3.10 中移除。

在 3.7 版变化:当 aw 因超时被取消,wait_for 会等待 aw 被取消。之前版本则会立即引发 asyncio.TimeoutError

async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())

# Expected output:
#
# timeout!

Waiting Primitives

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发运行 aws 中的一组 awaitble 对象,会阻塞直到指定 return_when 被满足。

如果任何 aws 中的 awaitable 是协程,那么会被作为 Task 被安排。直接传递协程对象给 wait() 已经废弃了,因为这会引起混淆,看下面的例子。

返回 (done, pending) 元组,其中 done 和 pending 分别是 Futures/Tasks 集合。

用法:

done, pending = await asyncio.wait(aws)

loop 参数已弃用,计划在 Python 3.10 中移除。

timeout 为 float 或 int 型数值表示返回前的最大等待秒数。如果 timeout 未指定,则阻塞直到完成。

注意该函数不会引发 asyncio.TimeoutError。还未完成的 Futures 在超时发生时,会作为 pending 集合返回。

return_when 指明函数何时返回,必须为下列参数之一:

Constant Description
FIRST_COMPLETED 当任意的 future 完成或取消,函数将返回。
FIRST_EXCEPTION 当任意完成的的 future 引发异常,函数将返回。如果没引发异常发生,那么等同于 ALL_COMPLETED
ALL_COMPLETED 当所以的 future 完成或取消,函数将返回。

不像 wait_for(),当超时发生时 wait() 不会取消 futures。

注意:wait() 自动安排协程作为 Tasks,之后在 (done, pending) 的集合中返回那些隐式创建的 Task 对象,因此下面的代码执行和期望的不一样:

async def foo():
return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
# 该代码永远不会运行,因为 done 里面是 Tasks,不是 Coroutines

应该改为

async def foo():
return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
# Everything will work as expected now.

虽然和 asyncio.gather() 一样,asyncio.wait() 也是并发运行 tasks 的,但是有几个区别

  • wait 返回的是 (done sets, pending sets),是无序的,gather 返回的 results list,是有序的。
  • wait 的 aws 参数是 set 类型,当然也可以是 iterable,传参方式为 wait(iterable),gather 的参数是可变参数,因此只能 gather(coro1(), coro2())gather(*iterable)
  • wait 有 timeout 参数,gather 没有。

asyncio.as_completed(aws, *, loop=None, timeout=None)

并发运行 aws 集合中的 awaitable 对象,返回 Future 对象的迭代器,每一个返回的 Future 对象代表了剩余 awaitables 集合的结果。完成的 Future 顺序是不确定。

如果在 Futures 完成前超时,会引发 asyncio.TimeoutError

import asyncio


async def phase(i):
print(f"phase {i}")
return i


async def main(num):
phases = [phase(i) for i in range(num)]
print('waiting for phases to complete')

results = []
for next_to_complete in asyncio.as_completed(phases):
result = await next_to_complete
print(f'received answer {result}')
results.append(result)

print(f'results: {results}')


asyncio.run(main(3))

从其他线程调度

asyncio.run_coroutine_threadsafe(coro, loop)

提交协程给事件循环,是线程安全的。

返回 concurrent.futures.Future 对象,等待来自另一个系统线程的结果。

该函数从系统的其他线程调用,而不是从当前正在运行事件循环的线程调用。

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果协程引发了异常,返回的 Future 将被通知,也可用于取消事件循环中的 task。

try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task...')
future.cancel()
except Exception as exc:
print(f'The coroutine raised an exception: {exc!r}')
else:
print(f'The coroutine returned: {result!r}')

不同于其他 asyncio 函数,此函数要求显式地传入 loop 参数。

自省

asyncio.current_task(loop=None)

返回当前运行的 Task 实例,如果没有 task 运行,则返回 None

如果 loop 为 None,会先调用 get_running_loop() 获取当前的 loop。

asyncio.all_tasks(loop=None)

返回一组 loop 还未完成的 Task 对象。

如果 loop 为 None,会先调用 get_running_loop() 获取当前的 loop。

队列

设计和 queue 模块相似,不过 asyncio queues 不是线程安全的,因为它们设计的目的是用于 async/await 代码中。

注意 asyncio queues 没有 timeout 参数,用 asyncio.wait_for() 函数可以实现有 timeout 的队列操作。

class asyncio.Queue(maxsize=0, *, loop=None)

  • maxsize
  • empty()
  • full()
  • coroutine get()
  • get_nowait()
  • coroutine join()
  • coroutine put(item)
  • put_nowait(item)
  • qsize()
  • task_done()

class asyncio.PriorityQueue

class asyncio.LifoQueue

Exceptions

  • exception asyncio.QueueEmpty
  • exception asyncio.QueueFull
import asyncio
import random
import time


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()

# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)

# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

Synchronization Primitives

和 threading 模块设计是相似的,不同点:

  • asyncio primitives 不是线程安全的,因此不应该用于系统的线程同步,此情况应该用 threading。
  • asyncio 的 synchronization primitives 不接受 timeout 参数,用 asyncio.wait_for() 函数执行有 timeout 的操作。

Lock

class asyncio.Lock(*, loop=None)

实现了用于 asyncio tasks 的独占锁, 非线程安全。

asyncio lock 可以保证对资源互斥访问。

lock = asyncio.Lock()

# ... later
async with lock:
# access shared state

等价于

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
# access shared state
finally:
lock.release()
import asyncio
import functools


def unlock(lock):
print('callback releasing lock')
lock.release()


async def coro1(lock):
print('coro1 waiting for the lock')
async with lock:
print('coro1 acquired lock')
print('coro1 released lock')


async def coro2(lock):
print('coro2 waiting for the lock')
async with lock:
print('coro2 acquired lock')
print('coro2 released lock')


async def main():
loop = asyncio.get_running_loop()

# Create and acquire a shared lock.
lock = asyncio.Lock()
print('acquiring the lock before starting coroutines')
await lock.acquire()
print(f'lock acquired: {lock.locked()}')

# Schedule a callback to unlock the lock.
loop.call_later(1, functools.partial(unlock, lock))

# Run the coroutines that want to use the lock.
print('waiting for coroutines')
await asyncio.wait([coro1(lock), coro2(lock)]),


asyncio.run(main())

coroutine acquire()

获取锁。该方法一直等到解锁,然后设置为锁定状态,返回 True

release()

释放锁。当锁已经锁定,重置其为未锁定状态,然后返回。如果锁已经释放,会引发 RuntimeError 异常。

locked()
如果锁已经锁定,返回 True

Event

class asyncio.Event(*, loop=None)

event 对象,非线程安全。

asyncio event 用于通知多个 asyncio tasks 某个 event 发生了。

Event 对象管理内部 flag,该 flag 可以用 set() 设置为 True,或者用 clear() 重置为 Falsewait() 方法阻塞,直到 flag 被设置为 True。初始时 flag 被设置为 False

async def waiter(event):
print('waiting for it ...')
await event.wait()
print('... got it!')

async def main():
# Create an Event object.
event = asyncio.Event()

# Spawn a Task to wait until 'event' is set.
waiter_task = asyncio.create_task(waiter(event))

# Sleep for 1 second and set the event.
await asyncio.sleep(1)
event.set()

# Wait until the waiter task is finished.
await waiter_task

asyncio.run(main())

coroutine wait()

等待,直到 event 被设置

如果 event 被设置,则立即返回 True,否则阻塞直到另一个 task 调用 call()

set()

设置 event。所有等待 event 被设置的任务会被立即唤醒。

clear()

清除 event。在 wait() 上的 awaiting 的 tasks 将阻塞,直到 set() 方法再次调用。

is_set()

如果 event 被设置,返回 True

Condition

class asyncio.Condition(lock=None, *, loop=None)

Condition 对象,非线程安全。

asyncio condition primitive 用于 task 等待某个 event 发生,然后对共享资源获取独占访问。

实质上,Condition 对象结合了 Event 和 Lock 的功能,可以使多个 Condition 共享一个锁,允许不同任务在共享资源的特定阶段,对共享资源相应的独占访问,

可选的 lock 参数必须是 Lock 对象或 None,为 None 时,会自动创建新的 Lock 对象。

cond = asyncio.Condition()

# ... later
async with cond:
await cond.wait()

coroutine acquire()

获取底层的锁。该方法等待,直到底层的锁释放,然后设置为 locked,返回 True

notify(n=1)

唤醒最多 n 个等待该 condition 的 tasks(默认为 1),如果没有 tasks 在等待,该方法不做任何操作。

该方法调用前或锁释放不久后,锁必须先获取。如果调用时,锁未锁定,会引发 RuntimeError

locked()

如果底层的锁被释放,则返回 True

notify_all()

唤醒所有等待该 condition 的 tasks。和 notify() 行为很像,但会唤醒所有等待的 tasks。

该方法调用前或锁释放不久后,锁必须先获取。如果调用时,锁未锁定,会引发 RuntimeError

release()

释放底层的锁。

当在未锁定的锁上执行,会引发 RuntimeError

coroutine wait()

等待,直到被通知。

当该方法调用时,如果正在调用的 task 还未获取锁,会引发 RuntimeError

该方法释放底层的锁,然后阻塞,直到被 notify()notify_all()调用唤醒。一旦唤醒,Condition 会重新获取锁,该方法返回 True

coroutine wait_for(predicate)

等待,直到 predicate 变成 True

predicate 必须是 callable,result 会被解释成布尔值,最终的 value 是返回值。

import asyncio


async def consumer(condition, n):
async with condition:
print('consumer {} is waiting'.format(n))
await condition.wait()
print('consumer {} triggered'.format(n))
print('ending consumer {}'.format(n))


async def manipulate_condition(condition):
print('starting manipulate_condition')

# pause to let consumers start
await asyncio.sleep(0.1)

for i in range(1, 3):
async with condition:
print('notifying {} consumers'.format(i))
condition.notify(n=i)
await asyncio.sleep(0.1)

async with condition:
print('notifying remaining consumers')
condition.notify_all()

print('ending manipulate_condition')


async def main():
# Create a condition
condition = asyncio.Condition()

# Set up tasks watching the condition
consumers = [
consumer(condition, i)
for i in range(5)
]

# Schedule a task to manipulate the condition variable
task = asyncio.create_task(manipulate_condition(condition))

# Wait for the consumers to be done
await asyncio.wait(consumers)
await task


asyncio.run(main())

Semaphore

class asyncio.Semaphore(value=1, *, loop=None)

Semaphore 对象,非线程安全。

Semaphore 管理内部计数,每次 acquire() 调用时会减 1,release() 调用时加 1。该计数不会到 0 以下,当 acquire() 发现计数为 0,会阻塞,直到遇到某个 task 调用 release()

可选的 value 参数指定初始 value 用于内部计数器(默认是 1)。如果给定的 value 比 0 小,会引发 ValueError

sem = asyncio.Semaphore(10)

# ... later
async with sem:
# work with shared resource

coroutine acquire()

获取 semaphore。

如果内部计数器比 0 大,会减 1,然后立即返回 True;如果为 0,会一直等,直到 release() 调用,然后返回 True

locked()

如果 semaphore 不能立即获取,返回 True

release()

释放 semaphore,内部计数器加 1,可以唤醒等待获取 semaphore 的任务。

和 BoundedSemaphore 不同的是,Semaphore 允许 release() 调用次数超过 acquire()

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1, *, loop=None)

有界的 semaphore 对象,非线程安全。

Bounded Semaphore 是 Semaphore 的另一个版本,如果计数超过了初始的 value,会在 release() 时引发 ValueError

Asynchronous Iterate

Asynchronous Iterators

异步迭代器可以用在 async for 语句中。

  • object.__aiter__(self)

    必须返回异步迭代器对象。

  • object.__anext__(self)

    必须返回 awaitable,用于 iterator 生成下一个值,迭代结束时会引发 StopAsyncIteration

    class Reader:
    async def readline(self):
    ...

    def __aiter__(self):
    return self

    async def __anext__(self):
    val = await self.readline()
    if val == b'':
    raise StopAsyncIteration
    return val

Asynchronous Generator

返回 asynchronous generator iterator 的函数,用 async defyield 表达式定义,产出一系统的值,用在 async for 循环中。

import asyncio


async def gen(n):
for i in range(n):
yield i


async def main():
async for result in gen(10):
print(result)


asyncio.run(main())

Asynchronous Context Managers

异步的上下文管理器,用在 async with 语句中。

  • object.__aenter__(self)

    该方法语义上和 __enter__() 类似,不同支持是必须返回 awaitable。

  • object.__aexit__(self, exc_type, exc_value, traceback)

    该方法语义上和 __exit__() 类似,不同支持是必须返回 awaitable。

class AsyncContextManager:
async def __aenter__(self):
await log('entering context')

async def __aexit__(self, exc_type, exc, tb):
await log('exiting context')

桥接同步和异步代码

  • 从同步代码中调用同步代码。这只是一个普通的函数调用 - 比如 time.sleep(10)。没有什么风险或特别的。
  • 从异步代码中调用异步代码。你必须使用 await,所以你要 await asyncio.sleep(10)
  • 从异步代码中调用同步代码。你可以做到,但正如我上面所说,它会阻止整个过程并使一切变得缓慢,你不应该这样做。您应该为同步代码提供自己的线程。
  • 从同步代码调用异步代码。尝试在同步函数中使用 await 在 Python 中的是语法错误,因此要做到这一点,您需要为代码在内部创建一个事件循环来运行异步代码。

Sync From Sync

这是标准的 Python 代码,调用一个对象,Python 阻塞并移动到被调用的代码中,运行它,然后将结果返回给调用者并解除阻塞。

Async From Async

async def get_chat_id(name):
await asyncio.sleep(3)
return "chat-%s" % name

async def main():
result = await get_chat_id("django")

Sync From Async

直接在协程中调用同步函数,可能会造成整个线程阻塞,因为事件循环也被阻塞,根本没法拿到控制权来调度任务。

def get_chat_id(name):
time.sleep(3)
return "chat-%s" % name

async def main():
result = get_chat_id("django")

下面虽然影响不大

def get_chat_id(name):
return "chat-%s" % name

async def main():
result = get_chat_id("django")

但如果是进行数据库查询这种 IO 任务,又会阻塞

def get_chat_id(name):
return Chat.objects.get(name=name).id

async def main():
result = get_chat_id("django")

我的建议在不知道是否阻塞的情况下,永远不要在协程中调用同步函数。

在同步代码中,为了异步执行,我们一般是用多线程执行代码,这里我们也可以这么做,将同步函数放入另一个线程中执行,这样就不会阻塞当前线程了。

def get_chat_id(name):
return Chat.objects.get(name=name).id

async def main():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, get_chat_id, "django")

这种写法稍微麻烦,asgiref.sync 包中提供了 ASGI 帮助函数 sync_to_asyncasync_to_sync,可以简化代码。

def get_chat_id(name):
return Chat.objects.get(name=name).id

async def main():
result = await sync_to_async(get_chat_id)("django")

或者用装饰器,异常会向外传播。

@sync_to_async
def get_chat_id(name):
return Chat.objects.get(name=name).id

async def main():
result = await get_chat_id("django")

Async From Sync

协程,需要运行在事件循环里,以处理 await 调用,在同步函数中没法 await

async def get_chat_id(name):
await asyncio.sleep(3)
return "chat-%s" % name

def main():
result = await get_chat_id("django")

# Try even loading this file and you'll get:
result = await get_chat_id("django")
^
SyntaxError: invalid syntax

只有这么写

async def get_chat_id(name):
await asyncio.sleep(3)
return "chat-%s" % name

def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(get_chat_id("django"))

或者使用 Python 3.7 的 asyncio.run()

但如果你的程序很复杂,需要从同步代码中调用异步代码,而该代码已经在其他异步代码中,那该怎么办?

你当然可以再创建一个事件循环,asyncio 事件循环可以在每个线程开启一个,但是过多的事件循环也不太好。

asgiref.sync.async_to_sync 装饰器可以完成上面的操作,异常和返回值会向外传播,并且它还可以检测出一些问题(比如你尝试从已经运行了事件循环的线程中使用它)。

async def get_chat_id(name):
await asyncio.sleep(3)
return "chat-%s" % name

def main():
result = async_to_sync(get_chat_id)("django")

装饰器方式

@async_to_sync
async def get_chat_id(name):
await asyncio.sleep(3)
return "chat-%s" % name

def main():
result = get_chat_id("django")

调试模式

建议将环境变量 PYTHONASYNCIODEBUG 设置为 1,方便调试。

参考资料