插件窝 干货文章 Python事件驱动编程:如何利用asyncio实现高效的异步IO?

Python事件驱动编程:如何利用asyncio实现高效的异步IO?

asyncio Task await async 84    来源:    2025-03-14

在Python中,asyncio 是一个用于编写单线程并发代码的库,使用 asyncawait 语法。它非常适合用于I/O密集型任务,如网络请求、文件读写等。asyncio 通过事件循环(event loop)来实现高效的异步I/O操作。

1. 基本概念

  • 协程(Coroutine): 使用 async def 定义的函数,返回一个协程对象。协程可以在执行过程中暂停和恢复。
  • 事件循环(Event Loop): 负责调度和执行协程,处理I/O事件、定时器等。
  • Future: 表示一个异步操作的结果,通常由事件循环管理。
  • Task: 是 Future 的子类,用于封装协程,使其可以在事件循环中调度。

2. 基本用法

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    await say_hello()

# 运行事件循环
asyncio.run(main())

3. 并发执行多个协程

asyncio.gather 可以并发执行多个协程,并等待它们全部完成。

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

4. 使用 asyncio.create_task 创建任务

asyncio.create_task 可以将协程封装为任务,并立即调度到事件循环中。

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    task1_coro = task1()
    task2_coro = task2()

    task1_task = asyncio.create_task(task1_coro)
    task2_task = asyncio.create_task(task2_coro)

    await task1_task
    await task2_task

asyncio.run(main())

5. 处理I/O操作

asyncio 提供了许多异步I/O操作的API,如 asyncio.open_connectionasyncio.start_server 等。

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    print(f"Send: {message}")
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f"Received: {data.decode()}")

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello, World!'))

6. 使用 asyncio.Queue 进行任务队列管理

asyncio.Queue 可以用于在多个协程之间传递数据,实现生产者-消费者模式。

import asyncio
import random

async def producer(queue):
    for i in range(5):
        print(f"Producing {i}")
        await queue.put(i)
        await asyncio.sleep(random.random())

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consuming {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))

    await producer_task
    await queue.join()  # 等待队列中的所有任务完成
    consumer_task.cancel()  # 取消消费者任务

asyncio.run(main())

7. 异常处理

在异步编程中,异常处理与同步代码类似,可以使用 try-except 块来捕获异常。

import asyncio

async def faulty_task():
    raise ValueError("Something went wrong")

async def main():
    try:
        await faulty_task()
    except ValueError as e:
        print(f"Caught an exception: {e}")

asyncio.run(main())

8. 超时处理

asyncio.wait_for 可以设置超时时间,防止某个协程执行时间过长。

import asyncio

async def long_running_task():
    await asyncio.sleep(10)
    return "Task finished"

async def main():
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out")

asyncio.run(main())

9. 使用 asyncio.Semaphore 控制并发数

asyncio.Semaphore 可以限制同时运行的协程数量。

import asyncio

async def worker(semaphore, id):
    async with semaphore:
        print(f"Worker {id} started")
        await asyncio.sleep(2)
        print(f"Worker {id} finished")

async def main():
    semaphore = asyncio.Semaphore(2)  # 限制并发数为2
    tasks = [worker(semaphore, i) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

10. 总结

asyncio 提供了一套强大的工具来实现高效的异步I/O操作。通过合理使用协程、事件循环、任务队列等机制,可以编写出高性能的异步程序。在实际开发中,应根据具体需求选择合适的异步模式,并注意异常处理和资源管理。