在Python中,asyncio
是一个用于编写单线程并发代码的库,使用 async
和 await
语法。它非常适合用于I/O密集型任务,如网络请求、文件读写等。asyncio
通过事件循环(event loop)来实现高效的异步I/O操作。
async def
定义的函数,返回一个协程对象。协程可以在执行过程中暂停和恢复。Future
的子类,用于封装协程,使其可以在事件循环中调度。import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
await say_hello()
# 运行事件循环
asyncio.run(main())
asyncio.gather
可以并发执行多个协程,并等待它们全部完成。
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
asyncio.create_task
创建任务asyncio.create_task
可以将协程封装为任务,并立即调度到事件循环中。
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
task1_coro = task1()
task2_coro = task2()
task1_task = asyncio.create_task(task1_coro)
task2_task = asyncio.create_task(task2_coro)
await task1_task
await task2_task
asyncio.run(main())
asyncio
提供了许多异步I/O操作的API,如 asyncio.open_connection
、asyncio.start_server
等。
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
print(f"Send: {message}")
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f"Received: {data.decode()}")
print("Close the connection")
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello, World!'))
asyncio.Queue
进行任务队列管理asyncio.Queue
可以用于在多个协程之间传递数据,实现生产者-消费者模式。
import asyncio
import random
async def producer(queue):
for i in range(5):
print(f"Producing {i}")
await queue.put(i)
await asyncio.sleep(random.random())
async def consumer(queue):
while True:
item = await queue.get()
print(f"Consuming {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await producer_task
await queue.join() # 等待队列中的所有任务完成
consumer_task.cancel() # 取消消费者任务
asyncio.run(main())
在异步编程中,异常处理与同步代码类似,可以使用 try-except
块来捕获异常。
import asyncio
async def faulty_task():
raise ValueError("Something went wrong")
async def main():
try:
await faulty_task()
except ValueError as e:
print(f"Caught an exception: {e}")
asyncio.run(main())
asyncio.wait_for
可以设置超时时间,防止某个协程执行时间过长。
import asyncio
async def long_running_task():
await asyncio.sleep(10)
return "Task finished"
async def main():
try:
result = await asyncio.wait_for(long_running_task(), timeout=5)
print(result)
except asyncio.TimeoutError:
print("Task timed out")
asyncio.run(main())
asyncio.Semaphore
控制并发数asyncio.Semaphore
可以限制同时运行的协程数量。
import asyncio
async def worker(semaphore, id):
async with semaphore:
print(f"Worker {id} started")
await asyncio.sleep(2)
print(f"Worker {id} finished")
async def main():
semaphore = asyncio.Semaphore(2) # 限制并发数为2
tasks = [worker(semaphore, i) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
asyncio
提供了一套强大的工具来实现高效的异步I/O操作。通过合理使用协程、事件循环、任务队列等机制,可以编写出高性能的异步程序。在实际开发中,应根据具体需求选择合适的异步模式,并注意异常处理和资源管理。