Logo
Published on

3.14.异步编程

Authors
  • avatar
    Name
    xiaobai
    Twitter

1.概述

异步编程是一种非阻塞的编程模式,允许程序在等待I/O操作时执行其他任务,而不是阻塞等待。Python 通过 asyncio 模块提供了强大的异步编程支持。

2.核心概念

2.1.同步 vs 异步

特性同步编程异步编程
执行方式按顺序执行,遇到耗时操作会阻塞非阻塞,遇到耗时操作会切换到其他任务
适用场景CPU密集型任务,简单I/O操作I/O密集型任务,大量并发操作
复杂度简单易懂相对复杂,需要理解协程和事件循环
性能在I/O密集型任务中效率较低在I/O密集型任务中效率显著提升

2.2.生活中的例子

  • 同步:你去餐馆点菜,一直站在柜台等厨师做好饭菜拿给你,做好你才能走
  • 异步:你点完菜后在座位上玩手机或聊天,等菜做好了服务员叫你,这时你再去取餐或者服务员直接送过来

2.3.性能对比示例

import time
import asyncio

# 同步版本
def sync_demo():
    def task(name, delay):
        print(f"{name} 开始")
        time.sleep(delay)  # 阻塞调用
        print(f"{name} 完成")
    
    start = time.time()
    task("任务1", 2)
    task("任务2", 1)
    print(f"总耗时: {time.time() - start:.2f}秒")

# 异步版本
async def async_demo():
    async def task(name, delay):
        print(f"{name} 开始")
        await asyncio.sleep(delay)  # 非阻塞等待
        print(f"{name} 完成")
    
    start = time.time()
    await asyncio.gather(
        task("任务1", 2),
        task("任务2", 1)
    )
    print(f"总耗时: {time.time() - start:.2f}秒")

# 运行对比
print("=== 同步版本 ===")
sync_demo()  # 耗时约3秒

print("\n=== 异步版本 ===")
asyncio.run(async_demo())  # 耗时约2秒

3.asyncio 核心概念

3.1.协程 (Coroutine)

协程是 Python 实现异步编程的核心基础。它是一种可以在执行过程中被挂起和恢复的特殊函数。

3.1.1.协程的定义

async def my_coroutine():
    print("协程开始执行")
    await asyncio.sleep(1)
    print("协程结束")
    return "协程结果"

3.1.2.协程的运行方式

协程本身只是一个对象,必须由事件循环调度才能运行:

import asyncio

async def simple_coroutine():
    print("协程开始")
    await asyncio.sleep(1)
    print("协程结束")
    return "协程结果"

async def coroutine_usage():
    # 1. 直接等待
    print("=== 直接等待 ===")
    result = await simple_coroutine()
    print(f"结果: {result}")
    
    # 2. 创建任务
    print("\n=== 创建任务 ===")
    task = asyncio.create_task(simple_coroutine())
    result = await task
    print(f"任务结果: {result}")
    
    # 3. 并发执行
    print("\n=== 并发执行 ===")
    tasks = [
        simple_coroutine(),
        simple_coroutine(),
        simple_coroutine()
    ]
    results = await asyncio.gather(*tasks)
    print(f"所有结果: {results}")

asyncio.run(coroutine_usage())

3.1.3.协程与普通函数的区别

特性普通函数协程函数
定义def function_name():async def function_name():
调用立即执行返回协程对象,需要事件循环调度
返回值直接返回结果通过 await 获取结果
阻塞会阻塞调用者不会阻塞,可以挂起

3.2.事件循环 (Event Loop)

事件循环是异步编程的核心,负责调度和执行协程任务。

3.2.1.工作机制

  1. 事件循环驱动:事件循环是总调度中心,负责监控所有任务和事件
  2. 任务执行与挂起:协程任务在执行过程中,遇到I/O请求时会主动挂起,将控制权交还给事件循环
  3. 高效切换:事件循环立即切换到另一个已就绪的协程任务去执行
  4. 事件通知与唤醒:当I/O操作完成时,事件循环会收到通知,将对应的协程任务标记为"就绪"状态
  5. 从断点恢复:事件循环在适当的时机重新唤醒等待I/O完成的任务,让它从挂起的地方继续执行

3.2.2.事件循环操作

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()
print(f"事件循环: {loop}")
print(f"循环是否运行: {loop.is_running()}")
print(f"循环是否关闭: {loop.is_closed()}")

# 手动管理事件循环
async def simple_task():
    await asyncio.sleep(1)
    return "任务完成"

# 方式1:手动管理
loop = asyncio.get_event_loop()
try:
    if not loop.is_running():
        result = loop.run_until_complete(simple_task())
        print(f"手动执行结果: {result}")
finally:
    loop.close()

# 方式2:使用 asyncio.run() (推荐)
result = asyncio.run(simple_task())
print(result)

4.异步编程核心语法

4.1.async/await 关键字

import asyncio

class AsyncExamples:
    """异步编程示例"""
    
    async def io_operation(self, name, delay):
        """模拟I/O操作"""
        print(f"{name}: 开始I/O操作,需要{delay}秒")
        await asyncio.sleep(delay)
        print(f"{name}: I/O操作完成")
        return f"{name}_result"
    
    async def cpu_operation(self, name, iterations):
        """模拟CPU密集型操作"""
        print(f"{name}: 开始CPU计算")
        result = 0
        for i in range(iterations):
            result += i
        print(f"{name}: CPU计算完成")
        return result
    
    async def concurrent_operations(self):
        """并发操作示例"""
        tasks = [
            self.io_operation(f"任务{i}", i)
            for i in range(1, 4)
        ]
        
        print("=== 并发I/O操作 ===")
        results = await asyncio.gather(*tasks)
        print(f"并发结果: {results}")

# 使用示例
examples = AsyncExamples()
asyncio.run(examples.concurrent_operations())

4.2.异步上下文管理器

异步上下文管理器用于异步资源的获取与释放。

4.2.1.基本语法

import asyncio

class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("建立数据库连接...")
        await asyncio.sleep(0.5)
        print("数据库连接已建立")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭数据库连接...")
        await asyncio.sleep(0.2)
        print("数据库连接已关闭")
        return False

    async def execute_query(self, query):
        print(f"执行查询: {query}")
        await asyncio.sleep(0.3)
        return f"查询结果: {query}"

async def async_context():
    async with AsyncDatabaseConnection() as db:
        result = await db.execute_query("SELECT * FROM users")
        print(result)

asyncio.run(async_context())

4.2.2.应用场景

  • 数据库连接:在查询前建立连接,结束之后自动关闭
  • 文件操作:异步打开与关闭文件,确保资源不会泄漏
  • 网络会话:请求前建立异步会话,请求后自动关闭

4.3.异步迭代器

异步迭代器使得我们可以遍历异步生成的数据流。

4.3.1.基本实现

import asyncio

class AsyncDataStream:
    """异步数据流"""
    
    def __init__(self, data_list, delay=0.5):
        self.data = data_list
        self.delay = delay
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        item = self.data[self.index]
        self.index += 1
        
        await asyncio.sleep(self.delay)
        return item

async def async_iterator():
    """异步迭代器演示"""
    
    stream = AsyncDataStream(["数据1", "数据2", "数据3", "数据4"], 0.3)
    
    print("使用异步for循环:")
    async for item in stream:
        print(f"接收到: {item}")
    
    print("\n使用异步推导式:")
    stream2 = AsyncDataStream(["A", "B", "C", "D"], 0.2)
    results = [item async for item in stream2]
    print(f"所有数据: {results}")

asyncio.run(async_iterator())

4.3.2.适用场景

  • 异步读取大文件的每一行
  • 网络抓取数据流
  • 处理异步产生的数据队列
  • 异步推导式:[item async for item in async_iterator]

5.任务和Future

5.1.任务管理

在异步编程中,任务(Task) 是对协程的进一步封装,它将协程排入事件循环等待执行,并提供了更丰富的管理与监控接口。

5.1.1.任务的基本操作

操作方法描述
创建任务asyncio.create_task(coroutine)将协程对象注入到事件循环,立即启动执行
等待任务await task等待任务完成并获取结果
取消任务task.cancel()取消进行中的任务
检查状态task.done()检查任务是否完成
获取结果task.result()获取任务结果(仅在任务完成后)

5.1.2.任务管理示例

import asyncio
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

class TaskManager:
    """任务管理器"""
    
    def __init__(self):
        self.tasks = set()
    
    async def worker(self, name, duration):
        """工作协程"""
        logging.info(f"任务 {name} 开始")
        try:
            await asyncio.sleep(duration)
            logging.info(f"任务 {name} 完成")
            return f"{name}_result"
        except asyncio.CancelledError:
            logging.info(f"任务 {name} 被取消")
            raise
    
    async def create_tasks(self, count):
        """创建多个任务"""
        for i in range(count):
            task = asyncio.create_task(
                self.worker(f"worker_{i}", i + 1),
                name=f"task_{i}"
            )
            self.tasks.add(task)
            task.add_done_callback(self.tasks.discard)
        
        logging.info(f"创建了 {count} 个任务")
    
    async def monitor_tasks(self):
        """监控任务状态"""
        while True:
            running_tasks = [t for t in self.tasks if not t.done()]
            logging.info(f"运行中的任务: {len(running_tasks)}")
            
            if not running_tasks:
                break
            
            await asyncio.sleep(0.5)
    
    async def cancel_all_tasks(self):
        """取消所有任务"""
        logging.info("取消所有任务")
        for task in self.tasks:
            task.cancel()
        
        await asyncio.gather(*self.tasks, return_exceptions=True)

async def task_management():
    """任务管理演示"""
    manager = TaskManager()
    
    await manager.create_tasks(5)
    monitor_task = asyncio.create_task(manager.monitor_tasks())
    
    await asyncio.sleep(2)
    await manager.cancel_all_tasks()
    
    await monitor_task

asyncio.run(task_management())

5.2.Future 对象

Future 是一个底层对象,表示一个将来才会有结果的对象。Task 是 Future 的子类,具备 Future 的所有接口。

5.2.1.Future 的特性

  • 可以通过 set_result()set_exception() 为 Future 设置结果或异常
  • 可以通过 await future 挂起,直到 Future 完成
  • 这是构建异步回调和事件驱动编程的基础

5.2.2.Future 示例

import asyncio

async def future_example():
    """Future 对象示例"""
    
    # 创建一个Future对象
    future = asyncio.Future()
    print(f"Future初始状态: {future.done()}")
    
    async def set_future_result():
        await asyncio.sleep(1)
        if not future.done():
            future.set_result("Future结果")
            print("Future结果已设置")
    
    async def use_future():
        print("等待Future结果...")
        result = await future
        print(f"收到Future结果: {result}")
        return result
    
    # 并发执行设置和获取Future结果
    results = await asyncio.gather(
        set_future_result(),
        use_future()
    )
    
    print(f"所有操作完成: {results}")

asyncio.run(future_example())

5.2.3.总结

  • Future 是低层异步原语,是 Task 和协程运行的基础
  • 如果只做高层业务,通常直接用 await 协程和 Task,无需手动操作 Future
  • 了解其原理有助于阅读、设计高级异步库和协议

6.高级异步模式

6.1.生产者-消费者模式

生产者-消费者模式是一种经典的并发设计模式,常用于解耦"数据的生产"和"数据的消费"过程。

6.1.1.核心思想

  • 生产者:异步地生产数据,并通过 queue.put() 投递到队列
  • 消费者:异步地从队列用 queue.get() 取数据进行处理
  • 队列:连接生产者与消费者,起到缓冲区作用,可以限制队列大小,避免内存溢出

6.1.2.异步生产者-消费者优势

  • 降低耦合:生产与消费速度可以不同步,互不阻塞
  • 自动流控:队列满时生产者会等待,队列空时消费者会等待
  • 易扩展:可以轻松扩展为多生产者/多消费者模式

6.1.3.实现示例

import asyncio
import random

class AsyncProducerConsumer:
    """异步生产者-消费者"""
    
    def __init__(self, queue_size=5):
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.producers = []
        self.consumers = []
    
    async def producer(self, name, count):
        """生产者"""
        for i in range(count):
            item = f"{name}_item_{i}"
            production_time = random.uniform(0.1, 0.5)
            await asyncio.sleep(production_time)
            
            await self.queue.put(item)
            print(f"生产者 {name} 生产了: {item} (队列大小: {self.queue.qsize()})")
        
        print(f"生产者 {name} 完成")
    
    async def consumer(self, name):
        """消费者"""
        while True:
            try:
                item = await asyncio.wait_for(self.queue.get(), timeout=2.0)
                
                consumption_time = random.uniform(0.2, 0.8)
                await asyncio.sleep(consumption_time)
                
                print(f"消费者 {name} 消费了: {item} (队列大小: {self.queue.qsize()})")
                self.queue.task_done()
                
            except asyncio.TimeoutError:
                print(f"消费者 {name} 超时,退出")
                break
    
    async def run(self, producer_count=2, items_per_producer=5, consumer_count=3):
        """运行生产者-消费者系统"""
        print("启动生产者-消费者系统...")
        
        # 创建生产者任务
        for i in range(producer_count):
            producer_task = asyncio.create_task(
                self.producer(f"P{i}", items_per_producer)
            )
            self.producers.append(producer_task)
        
        # 创建消费者任务
        for i in range(consumer_count):
            consumer_task = asyncio.create_task(self.consumer(f"C{i}"))
            self.consumers.append(consumer_task)
        
        # 等待所有生产者完成
        await asyncio.gather(*self.producers)
        print("所有生产者已完成")
        
        # 等待队列中所有项目被消费
        await self.queue.join()
        print("队列已清空")
        
        # 取消所有消费者任务
        for consumer in self.consumers:
            consumer.cancel()
        
        await asyncio.gather(*self.consumers, return_exceptions=True)
        print("所有消费者已结束")

# 使用示例
system = AsyncProducerConsumer()
asyncio.run(system.run())

6.2.异步锁和信号量

在异步编程中,并发访问共享资源可能导致数据不一致或资源竞争问题。常用的同步原语包括异步锁和信号量。

6.2.1.异步锁(asyncio.Lock)

异步锁用于保护对共享资源的互斥访问,确保同一时刻只有一个协程可以进入临界区。

import asyncio

# 基本用法
lock = asyncio.Lock()

async def task_with_lock():
    async with lock:
        # 此处代码同一时刻只能有一个协程执行
        print("task start")
        await asyncio.sleep(1)
        print("task end")

6.2.2.信号量(asyncio.Semaphore)

信号量允许"最多N个"协程同时访问某一资源,适用于连接池、限流器等场景。

import asyncio

# 创建最大允许2个协程同时运行的信号量
sem = asyncio.Semaphore(2)

async def limited_task():
    async with sem:
        print("task start")
        await asyncio.sleep(1)
        print("task end")

async def main():
    tasks = [limited_task() for _ in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

6.2.3.综合示例

import asyncio

class AsyncResourceManager:
    """异步资源管理"""

    def __init__(self, max_concurrent=3):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.lock = asyncio.Lock()
        self.shared_resource = 0

    async def access_limited_resource(self, name):
        """访问受限制的资源"""
        async with self.semaphore:
            print(f"{name} 获得资源访问权")
            await asyncio.sleep(1)  # 模拟资源使用
            print(f"{name} 释放资源访问权")

    async def update_shared_resource(self, name, value):
        """更新共享资源(需要锁)"""
        async with self.lock:
            print(f"{name} 获得锁,当前资源值: {self.shared_resource}")
            await asyncio.sleep(0.5)  # 模拟处理时间
            self.shared_resource += value
            print(f"{name} 更新资源为: {self.shared_resource}")
            print(f"{name} 释放锁")

async def concurrency_control():
    """并发控制演示"""
    manager = AsyncResourceManager(max_concurrent=2)

    # 测试信号量
    print("=== 信号量测试 ===")
    semaphore_tasks = [
        manager.access_limited_resource(f"任务{i}")
        for i in range(5)
    ]
    await asyncio.gather(*semaphore_tasks)

    # 测试锁
    print("\n=== 锁测试 ===")
    lock_tasks = [
        manager.update_shared_resource(f"更新者{i}", i+1)
        for i in range(3)
    ]
    await asyncio.gather(*lock_tasks)

asyncio.run(concurrency_control())

7.实际应用场景

7.1.异步Web请求

# 导入asyncio模块,用于异步编程
import asyncio
# 导入aiohttp模块,用于异步HTTP请求
import aiohttp
# 导入time模块,用于计时
import time

# 定义异步Web客户端类
class AsyncWebClient:
    # 声明类的用途为异步Web客户端
    """异步Web客户端"""
    
    # 初始化方法
    def __init__(self):
        # 初始化session为None
        self.session = None
    
    # 定义异步上下文管理器进入方法
    async def __aenter__(self):
        # 创建aiohttp客户端会话
        self.session = aiohttp.ClientSession()
        # 返回自身
        return self
    
    # 定义异步上下文管理器退出方法
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 关闭aiohttp客户端会话
        await self.session.close()
    
    # 定义异步方法,用于获取指定URL的内容
    async def fetch_url(self, url, name):
        # 获取URL内容
        """获取URL内容"""
        # 打印请求开始信息
        print(f"{name}: 开始请求 {url}")
        
        try:
            # 以异步方式发起GET请求,10秒超时
            async with self.session.get(url, timeout=10) as response:
                # 读取响应的文本内容
                content = await response.text()
                # 打印请求完成信息,包括状态码和内容长度
                print(f"{name}: 完成,状态码 {response.status}, 长度 {len(content)}")
                # 返回内容长度
                return len(content)
        except Exception as e:
            # 捕获异常并打印错误信息
            print(f"{name}: 错误 {e}")
            # 出错时返回0
            return 0
    
    # 定义异步方法,用于并发请求多个URL
    async def concurrent_requests(self, urls):
        # 并发请求多个URL
        """并发请求多个URL"""
        # 创建任务列表
        tasks = []
        
        # 遍历所有URL,创建对应的fetch_url任务
        for i, url in enumerate(urls):
            # 创建单个请求任务
            task = self.fetch_url(url, f"请求{i+1}")
            # 添加任务到任务列表
            tasks.append(task)
        
        # 打印并发请求开始信息
        print("开始并发请求...")
        # 记录开始时间
        start_time = time.time()
        
        # 并发执行所有异步请求,等待结果
        results = await asyncio.gather(*tasks)
        
        # 记录结束时间
        end_time = time.time()
        # 打印所有请求完成和耗时信息
        print(f"所有请求完成,耗时: {end_time - start_time:.2f}秒")
        # 打印所有任务的结果
        print(f"结果: {results}")
        
        # 返回结果列表
        return results

# 定义异步函数,演示Web客户端的用法
async def web_client():
    # Web客户端演示
    """Web客户端演示"""
    # 准备待请求的URL列表
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2", 
        "https://httpbin.org/delay/1",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
    
    # 使用AsyncWebClient异步上下文管理器
    async with AsyncWebClient() as client:
        # 并发请求所有URL
        await client.concurrent_requests(urls)

# 运行Web客户端演示
asyncio.run(web_client())

7.2.异步数据库操作

# 导入异步IO相关库
import asyncio
# 导入asyncpg模块,用于异步操作PostgreSQL数据库。需要预先安装:pip install asyncpg
import asyncpg  # 需要安装: pip install asyncpg

# 定义异步数据库操作类
class AsyncDatabase:
    # 提供类说明
    """异步数据库操作"""
    
    # 类的初始化方法,设定连接字符串并初始化数据库连接池为None
    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.pool = None
    
    # 定义异步的数据库连接方法
    async def connect(self):
        # 连接数据库,创建连接池
        """连接数据库"""
        self.pool = await asyncpg.create_pool(self.connection_string)
        # 输出连接池创建成功的信息
        print("数据库连接池已创建")
    
    # 定义异步关闭数据库连接池的方法
    async def close(self):
        # 关闭连接池(如已存在)
        """关闭连接"""
        if self.pool:
            await self.pool.close()
            # 输出连接池关闭的信息
            print("数据库连接池已关闭")
    
    # 定义异步执行SQL查询的方法
    async def execute_query(self, query, *args):
        # 执行SQL查询,并返回结果
        """执行查询"""
        # 从连接池获取连接,执行查询
        async with self.pool.acquire() as connection:
            result = await connection.fetch(query, *args)
            return result
    
    # 定义异步并发执行多条SQL查询的方法
    async def concurrent_queries(self, queries):
        # 并发执行多个查询
        """并发执行多个查询"""
        # 存放任务的列表
        tasks = []
        
        # 遍历所有查询,准备每个任务
        for i, (query, params) in enumerate(queries):
            task = self.execute_query(query, *params)
            tasks.append(task)
        
        # 输出开始并发查询的提示信息
        print("开始并发查询...")
        # 并发执行所有任务,等待结果
        results = await asyncio.gather(*tasks)
        # 输出所有查询完成的信息
        print(f"完成 {len(results)} 个查询")
        
        # 返回所有结果
        return results

# 定义一个用于模拟(不实际访问数据库)的异步数据库类
class MockAsyncDatabase:
    # 模拟异步数据库说明
    """模拟的异步数据库"""
    
    # 模拟异步执行SQL查询的方法
    async def execute_query(self, query, name, delay=1):
        # 输出开始执行的提示信息
        """模拟数据库查询"""
        print(f"{name}: 开始执行查询: {query}")
        # 异步睡眠以模拟耗时操作
        await asyncio.sleep(delay)  # 模拟数据库延迟
        # 输出查询完成信息
        print(f"{name}: 查询完成")
        # 返回模拟的查询结果
        return f"{name}_result"
    
    # 模拟并发执行多条SQL查询的方法
    async def concurrent_queries(self):
        # 并发查询演示说明
        """并发查询演示"""
        # 定义多个模拟查询
        queries = [
            ("SELECT * FROM users", "用户查询", 0.5),
            ("SELECT COUNT(*) FROM orders", "订单统计", 0.3),
            ("UPDATE products SET stock = stock - 1", "库存更新", 0.2),
            ("INSERT INTO logs (message) VALUES ('test')", "日志插入", 0.4)
        ]
        
        # 存放任务的列表
        tasks = []
        # 遍历所有查询,准备每个任务
        for query, name, delay in queries:
            task = self.execute_query(query, name, delay)
            tasks.append(task)
        
        # 并发执行所有任务,等待所有结果
        results = await asyncio.gather(*tasks)
        # 返回所有结果
        return results

# 定义演示数据库操作的主异步函数
async def database():
    # 数据库操作演示说明
    """数据库操作演示"""
    # 创建模拟数据库对象
    db = MockAsyncDatabase()
    # 调用并发查询方法并获取结果
    results = await db.concurrent_queries()
    # 打印所有查询结果
    print(f"所有查询结果: {results}")

# 执行主异步函数
asyncio.run(database())

8.错误处理和调试

8.1.异步异常处理

在异步编程中,异常处理和同步代码类似,也可以使用 try/except 语句来捕获异常。例如:

import asyncio

async def some_async_func():
    # 示例异步函数,实现你自己的逻辑
    await asyncio.sleep(1)
    return "执行成功"

async def main():
    try:
        result = await some_async_func()
        print(result)
    except Exception as e:
        print("发生异常:", e)

if __name__ == "__main__":
    asyncio.run(main())

此方法适合单个协程的错误捕获。

8.1.1.asyncio.gather 的异常机制

asyncio.gather 并发执行多个协程。当其中任何一个协程抛出异常时,默认会终止其他协程并传播第一个异常。

但如果将 return_exceptions=True,则所有异常会被包装为异常对象返回,不会中断其他任务。示例:

import asyncio

async def task1():
    await asyncio.sleep(1)
    return '任务1完成'

async def task2():
    await asyncio.sleep(1)
    raise Exception('任务2出错')

async def main():
    results = await asyncio.gather(task1(), task2(), return_exceptions=True)
    for result in results:
        if isinstance(result, Exception):
            print("任务异常:", result)
        else:
            print("任务成功:", result)

if __name__ == "__main__":
    asyncio.run(main())

8.1.2.asyncio.wait 的异常捕获

asyncio.wait 执行的一组任务可以分为已完成 (done) 和未完成 (pending)。可以遍历 done 中的任务,通过 task.exception() 查看每个任务是否有异常:

# 导入 asyncio 库,用于异步编程
import asyncio

# 定义一个异步任务函数,接受一个参数 n
async def example_task(n):
    # 异步休眠 1 秒
    await asyncio.sleep(1)
    # 如果 n 是偶数,返回任务完成信息
    if n % 2 == 0:
        return f"Task {n} completed"
    # 否则抛出异常
    else:
        raise Exception(f"Task {n} error")

# 定义主异步函数
async def main():
    # 创建 5 个异步任务,分别传入 0 到 4
    tasks = [asyncio.create_task(example_task(i)) for i in range(5)]
    # 等待所有任务完成,done 是已完成任务集合,pending 是未完成任务集合
    done, pending = await asyncio.wait(tasks)
    # 遍历所有已完成的任务
    for task in done:
        # 如果任务有异常
        if task.exception():
            # 打印捕获到的异常信息
            print("捕获到异常:", task.exception())
        # 否则打印任务的返回结果
        else:
            print("任务结果:", task.result())

# 程序入口,判断是否为主模块运行
if __name__ == '__main__':
    # 运行主异步函数
    asyncio.run(main())

8.1.3.小结

  • 单个协程用 try/except
  • 并发推荐 gather(..., return_exceptions=True)
  • 也可用 wait 配合检查每个任务的异常。 这些方法可以提高异步程序的健壮性和可调试性。

8.2.异步调试

# 导入异步IO模块
import asyncio
# 导入日志模块
import logging

# 定义异步调试函数
async def async_debugging():
    """异步调试演示"""
    
    # 配置日志调试参数
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 定义一个异步任务,接收任务名和延迟参数
    async def debug_task(name, delay):
        # 日志记录任务开始
        logging.debug(f"任务 {name} 开始")
        # 异步休眠指定秒数
        await asyncio.sleep(delay)
        # 日志记录任务结束
        logging.debug(f"任务 {name} 结束")
        # 返回任务结果字符串
        return f"{name}_result"
    
    # 创建第一个异步任务,并设置任务名称
    task1 = asyncio.create_task(debug_task("调试任务1", 1), name="debug_task_1")
    # 创建第二个异步任务,并设置任务名称
    task2 = asyncio.create_task(debug_task("调试任务2", 0.5), name="debug_task_2")
    
    # 打印当前所有运行的任务信息
    print("当前运行的任务:")
    # 遍历所有当前的异步任务
    for task in asyncio.all_tasks():
        # 打印每个任务的名称和协程对象
        print(f"  - {task.get_name()}: {task.get_coro()}")  # 修正:get_name()
    
    # 等待所有任务完成,并收集返回结果
    results = await asyncio.gather(task1, task2)
    # 打印任务结果
    print(f"任务结果: {results}")

# 导入os模块,用于设置环境变量
import os
# 启用异步调试模式
os.environ['PYTHONASYNCIODEBUG'] = '1'
# 运行异步调试函数
asyncio.run(async_debugging())

9.性能优化和最佳实践

9.1.异步性能测试

# 导入异步相关库
import asyncio
# 导入时间库用于计时
import time
# 导入requests库用于同步HTTP请求
import requests
# 导入aiohttp库用于异步HTTP请求
import aiohttp

# 定义性能比较器类
class PerformanceComparator:
    # 类的说明文档
    """性能比较器"""
    
    # 定义静态方法用于异步HTTP请求
    @staticmethod
    async def async_http_requests(urls):
        # 方法的说明文档
        """异步HTTP请求"""
        # 创建异步HTTP会话
        async with aiohttp.ClientSession() as session:
            # 初始化任务列表
            tasks = []
            # 遍历每一个url
            for url in urls:
                # 创建GET请求任务
                task = session.get(url)
                # 添加任务到任务列表
                tasks.append(task)
            # 并发执行所有任务并获取响应
            responses = await asyncio.gather(*tasks)
            # 返回所有响应的状态码
            return [resp.status for resp in responses]
    
    # 定义静态方法用于同步HTTP请求
    @staticmethod
    def sync_http_requests(urls):
        # 方法的说明文档
        """同步HTTP请求"""
        # 初始化结果列表
        results = []
        # 遍历每一个url
        for url in urls:
            # 发送GET请求
            response = requests.get(url)
            # 添加响应状态码到结果列表
            results.append(response.status_code)
        # 返回所有结果
        return results
    
    # 定义异步方法用于比较性能
    async def compare_performance(self, urls):
        # 方法的说明文档
        """比较性能"""
        # 打印待测试的URL数量
        print(f"测试 {len(urls)} 个URL")
        
        # --- 同步部分 ---
        # 打印同步测试标题
        print("=== 同步版本 ===")
        # 记录开始时间
        start = time.time()
        # 进行同步HTTP请求
        sync_results = self.sync_http_requests(urls)
        # 计算同步请求耗时
        sync_time = time.time() - start
        # 打印同步请求耗时
        print(f"同步耗时: {sync_time:.2f}秒")
        
        # --- 异步部分 ---
        # 打印异步测试标题
        print("\n=== 异步版本 ===")
        # 记录开始时间
        start = time.time()
        # 进行异步HTTP请求
        async_results = await self.async_http_requests(urls)
        # 计算异步请求耗时
        async_time = time.time() - start
        # 打印异步请求耗时
        print(f"异步耗时: {async_time:.2f}秒")
        
        # 计算性能提升倍数
        improvement = sync_time / async_time
        # 打印性能提升结果
        print(f"\n性能提升: {improvement:.2f}x")
        
        # 返回同步和异步的结果
        return sync_results, async_results

# 定义异步性能演示函数
async def performance():
    # 方法文档说明
    """性能演示"""
    # 使用httpbin.org作为测试URL,重复5次
    urls = ["https://httpbin.org/delay/1"] * 5
    
    # 创建性能比较器对象
    comparator = PerformanceComparator()
    # 调用性能比较方法并等待结果
    await comparator.compare_performance(urls)

# 启动性能测试
asyncio.run(performance())

9.2.最佳实践

# 导入异步IO模块
import asyncio

# 定义异步最佳实践类
class AsyncBestPractices:
    """异步编程最佳实践"""
    
    # 静态方法:正确的任务管理
    @staticmethod
    async def proper_task_management():
        """正确的任务管理"""
        # 定义一个异步工人函数
        async def worker(name):
            # 异步睡眠1秒
            await asyncio.sleep(1)
            # 返回结果字符串
            return f"{name}_done"
        
        # 创建任务列表用于保存引用
        tasks = []
        # 循环创建3个任务
        for i in range(3):
            # 创建一个异步任务
            task = asyncio.create_task(worker(f"worker_{i}"))
            # 把任务添加到任务列表
            tasks.append(task)
        
        # 等待所有任务完成,并获取结果
        results = await asyncio.gather(*tasks)
        # 返回所有任务结果
        return results
    
    # 静态方法:避免阻塞调用
    @staticmethod
    async def avoid_blocking_calls():
        """避免阻塞调用"""
        # 导入time模块,用于演示阻塞
        import time
        
        # 定义一个好的异步IO操作
        async def good_io_operation():
            # 使用异步sleep
            await asyncio.sleep(0.1)
        
        # 定义一个坏的IO操作(阻塞)
        async def bad_io_operation():
            # 使用同步sleep,会阻塞事件循环
            time.sleep(0.1)  # 这会阻塞事件循环!
        
        # 调用好的异步IO操作
        await good_io_operation()
    
    # 静态方法:使用超时
    @staticmethod
    async def use_timeouts():
        """使用超时"""
        # 定义一个慢操作的异步函数
        async def slow_operation():
            # 模拟慢操作,睡眠5秒
            await asyncio.sleep(5)
            # 返回“完成”
            return "完成"
        
        try:
            # 使用asyncio.wait_for设置超时为2秒
            result = await asyncio.wait_for(slow_operation(), timeout=2.0)
            # 如果未超时,返回结果
            return result
        except asyncio.TimeoutError:
            # 捕获超时异常,返回“操作超时”
            return "操作超时"
    
    # 静态方法:资源清理
    @staticmethod
    async def resource_cleanup():
        """资源清理"""
        # 定义一个资源类
        class Resource:
            # 定义异步清理方法
            async def cleanup(self):
                # 清理前异步等待0.1秒
                await asyncio.sleep(0.1)
                # 打印资源已清理
                print("资源已清理")
        
        # 实例化资源对象
        resource = Resource()
        try:
            # 使用资源(假装占用0.5秒)
            await asyncio.sleep(0.5)
        finally:
            # 最终确保资源被清理
            await resource.cleanup()

# 定义主函数,演示最佳实践
async def best_practices():
    """最佳实践演示"""
    # 创建最佳实践对象
    practices = AsyncBestPractices()
    
    # 打印任务管理演示
    print("=== 任务管理 ===")
    # 调用并等待任务管理方法
    results = await practices.proper_task_management()
    # 打印任务结果
    print(f"任务结果: {results}")
    
    # 打印超时控制演示
    print("\n=== 超时控制 ===")
    # 调用并等待超时方法
    timeout_result = await practices.use_timeouts()
    # 打印超时结果
    print(f"超时结果: {timeout_result}")
    
    # 打印资源清理演示
    print("\n=== 资源清理 ===")
    # 调用并等待资源清理方法
    await practices.resource_cleanup()

# 运行主异步函数
asyncio.run(best_practices())

10.实际项目示例

10.1.异步Web爬虫

# 导入asyncio模块,用于异步编程
import asyncio
# 导入aiohttp模块,用于异步HTTP请求
import aiohttp
# 导入urljoin和urlparse方法,用于处理URL
from urllib.parse import urljoin, urlparse
# 导入time模块,用于计时
import time

# 定义异步Web爬虫类
class AsyncWebCrawler:
    # 类文档字符串,说明用途
    """异步Web爬虫"""
    
    # 初始化方法,设置最大并发数和延迟时间
    def __init__(self, max_concurrent=10, delay=1):
        # 最大并发任务数
        self.max_concurrent = max_concurrent
        # 每次请求的延迟(秒)
        self.delay = delay
        # 已访问的URL集合
        self.visited = set()
        # 创建异步信号量控制并发数
        self.semaphore = asyncio.Semaphore(max_concurrent)
        # HTTP会话对象
        self.session = None
    
    # 爬虫的主入口,异步方法
    async def crawl(self, start_url, max_pages=20):
        # 文档字符串:开始爬取
        """开始爬取"""
        # 创建aiohttp的会话
        self.session = aiohttp.ClientSession()
        # 创建异步队列
        queue = asyncio.Queue()
        # 将开始URL放入队列
        await queue.put(start_url)
        
        # 创建多个worker协程任务
        workers = [
            asyncio.create_task(self.worker(f"worker-{i}", queue, max_pages))
            for i in range(self.max_concurrent)
        ]
        
        try:
            # 等待队列消费完毕
            await queue.join()
        finally:
            # 取消所有worker任务
            for worker in workers:
                worker.cancel()
            # 关闭HTTP会话
            await self.session.close()
        
        # 输出总共访问页面数量
        print(f"爬取完成! 总共访问了 {len(self.visited)} 个页面")
        # 返回访问过的页面列表
        return list(self.visited)
    
    # worker工作线程,异步方法
    async def worker(self, name, queue, max_pages):
        # 文档字符串:爬虫工作线程
        """爬虫工作线程"""
        # 当未达到最大页面数时循环工作
        while len(self.visited) < max_pages:
            try:
                # 尝试从队列获取新的URL,设置超时时间为5秒
                url = await asyncio.wait_for(queue.get(), timeout=5.0)
                
                # 如果URL已经访问过,则标记完成后继续下一个
                if url in self.visited:
                    queue.task_done()
                    continue
                
                # 使用信号量限制并发请求
                async with self.semaphore:
                    # 处理单个页面
                    await self.process_page(url, queue)
                
                # 将当前URL添加到已访问集合
                self.visited.add(url)
                # 通知队列该任务完成
                queue.task_done()
                
                # 延迟,避免过快请求
                await asyncio.sleep(self.delay)
                
            # 如果队列空闲超过超时时间,退出
            except asyncio.TimeoutError:
                break
            # 其他异常输出错误信息并继续
            except Exception as e:
                print(f"{name} 处理错误: {e}")
                queue.task_done()
    
    # 处理单个页面的方法
    async def process_page(self, url, queue):
        # 文档字符串:处理单个页面
        """处理单个页面"""
        try:
            # 发送HTTP GET请求,设置超时为10秒
            async with self.session.get(url, timeout=10) as response:
                # 如果响应状态为200
                if response.status == 200:
                    # 获取页面内容
                    content = await response.text()
                    # 输出爬取成功消息和内容长度
                    print(f"成功爬取: {url} (长度: {len(content)})")
                    
                    # 可以在这里解析页面并提取更多链接
                    # 简化版:只做演示用途
                    # 只在已访问数量小于10时添加新链接
                    if len(self.visited) < 10:  # 限制链接数量
                        # 提取页面中的链接
                        new_links = self.extract_links(content, url)
                        # 遍历新链接,将未访问过的加入队列
                        for link in new_links:
                            if link not in self.visited:
                                await queue.put(link)
                
        # 捕获并输出请求异常
        except Exception as e:
            print(f"爬取失败 {url}: {e}")
    
    # 简单的链接提取方法
    def extract_links(self, content, base_url):
        # 文档字符串:提取链接(简化版)
        """提取链接(简化版)"""
        # 实际项目中通常使用BeautifulSoup等解析HTML
        # 这里仅作示例,返回模拟链接
        return [
            f"{base_url}/page1",
            f"{base_url}/page2", 
            f"{base_url}/page3"
        ]

# 定义异步函数,用于演示Web爬虫使用
async def web_crawler():
    # 文档字符串:Web爬虫演示
    """Web爬虫演示"""
    # 创建爬虫对象,设置最大并发数和延迟
    crawler = AsyncWebCrawler(max_concurrent=5, delay=0.5)
    
    # 记录开始时间
    start_time = time.time()
    # 执行爬虫,爬取最多10个页面
    results = await crawler.crawl("https://httpbin.org/html", max_pages=10)
    # 记录结束时间
    end_time = time.time()
    
    # 输出爬虫运行耗时
    print(f"爬虫耗时: {end_time - start_time:.2f}秒")
    # 输出访问的页面列表
    print(f"访问的页面: {results}")

# 运行Web爬虫演示异步函数
asyncio.run(web_crawler())

11.总结

11.1.核心概念

  1. async/await: 定义和调用协程
  2. 事件循环: 管理和调度异步任务
  3. 协程: 异步函数,使用async def定义
  4. 任务: 对协程的封装,用于并发执行
  5. Future: 代表异步操作的最终结果

11.2.关键函数

  • asyncio.run(): 运行异步程序
  • asyncio.create_task(): 创建任务
  • asyncio.gather(): 并发执行多个协程
  • asyncio.sleep(): 异步等待
  • asyncio.wait(): 等待多个任务完成

11.3.最佳实践

  1. 避免在异步代码中使用阻塞操作
  2. 使用异步上下文管理器管理资源
  3. 合理控制并发数量
  4. 正确处理异常和超时
  5. 使用适当的调试工具
img