插件窝 干货文章 Python事件驱动编程:如何用asyncio实现高效的异步操作?

Python事件驱动编程:如何用asyncio实现高效的异步操作?

asyncio Task await print 637    来源:    2025-03-12

在Python中,asyncio 是一个用于编写单线程并发代码的库,使用 async/await 语法。它非常适合处理I/O密集型任务,如网络请求、文件读写等。通过事件驱动的方式,asyncio 可以高效地管理多个异步任务,避免阻塞主线程。

1. 基本概念

  • 协程(Coroutine): 使用 async def 定义的函数,返回一个协程对象。协程可以在执行过程中暂停和恢复。
  • 事件循环(Event Loop): 事件循环是 asyncio 的核心,负责调度和执行协程。
  • 任务(Task): 任务是对协程的封装,表示一个正在运行的协程。
  • Future: 一个低级别的对象,表示异步操作的最终结果。

2. 基本用法

2.1 创建一个简单的协程

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行协程
asyncio.run(say_hello())

2.2 使用 asyncio.gather 并发运行多个协程

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

2.3 使用 asyncio.create_task 创建任务

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    task_1 = asyncio.create_task(task1())
    task_2 = asyncio.create_task(task2())

    await task_1
    await task_2

asyncio.run(main())

3. 高级用法

3.1 使用 asyncio.Queue 进行任务队列管理

import asyncio

async def worker(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Processing {item}")
        await asyncio.sleep(1)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # 启动工作协程
    tasks = [asyncio.create_task(worker(queue)) for _ in range(3)]

    # 添加任务到队列
    for item in range(10):
        await queue.put(item)

    # 等待队列中的所有任务完成
    await queue.join()

    # 停止工作协程
    for _ in range(3):
        await queue.put(None)

    await asyncio.gather(*tasks)

asyncio.run(main())

3.2 使用 asyncio.wait 控制任务完成

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    task_1 = asyncio.create_task(task1())
    task_2 = asyncio.create_task(task2())

    # 等待所有任务完成
    done, pending = await asyncio.wait({task_1, task_2})

    print("All tasks completed")

asyncio.run(main())

4. 错误处理

在异步编程中,错误处理非常重要。可以使用 try/except 来捕获异常。

import asyncio

async def faulty_task():
    print("Faulty task started")
    await asyncio.sleep(1)
    raise ValueError("Something went wrong")

async def main():
    try:
        await faulty_task()
    except ValueError as e:
        print(f"Caught an exception: {e}")

asyncio.run(main())

5. 性能优化

  • 避免阻塞操作: 在协程中避免使用阻塞操作,如 time.sleep(),应该使用 await asyncio.sleep()
  • 合理使用并发: 使用 asyncio.gatherasyncio.create_task 来并发执行多个任务。
  • 限制并发数: 使用信号量(asyncio.Semaphore)来限制并发任务的数量,避免资源耗尽。
import asyncio

async def limited_task(semaphore, task_id):
    async with semaphore:
        print(f"Task {task_id} started")
        await asyncio.sleep(1)
        print(f"Task {task_id} finished")

async def main():
    semaphore = asyncio.Semaphore(2)  # 限制并发数为2
    tasks = [limited_task(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

6. 总结

asyncio 提供了一种高效的方式来处理异步操作,特别适合I/O密集型任务。通过合理使用协程、任务、事件循环等机制,可以编写出高效且易于维护的异步代码。在实际应用中,需要注意错误处理和性能优化,以确保程序的稳定性和高效性。