异步 IO
协程
类似于小学时候的一个计算任务,协程就类似于同时做多件家务,烧开水的同时 启动了洗衣机 之后拖地
而不是烧开水一直等,然后拖地,然后洗衣机等着
asyncio 模块
该模块通过一个线程去执行并发任务
需要创建一个协程的时候,可以使用 async 关键字将一个函数声明为协程
协程的协有协助之意,协程就是通过一个线程去执行并发,一个任务就是一个协程,也可以是多个任务通过一个协程调用多次来实现
协程的核心是通过事件循环实现的
1 2 3 4 5 6 7
| async def multi(num1: int, num2: int): res = num1 * num2 print(f'{num1} * {num2} = {res}')
if __name__ == '__main__': asyncio.run(multi(3,7))
|
1 2 3 4 5 6 7 8 9 10 11
| async def multi(num1: int, num2: int): res = num1 * num2 print(f'{num1} * {num2} = {res}')
async def multi_api(): await multi(1, 2)
if __name__ == '__main__': asyncio.run(multi_api())
|
创建任务
asyncio.create_task
- 先来一个没有实现并发的调用,由时间上的统计来看,是一个串行的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| async def process(name: str, num1: int) -> None: print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished')
async def process_api(): start = time.perf_counter() await process('task1', 1) await process('task2', 2) await process('task3', 3) print(f'finished in {time.perf_counter() - start:.2f} seconds')
if __name__ == '__main__': asyncio.run(process_api())
|
- 通过`asyncio.create_task`实现并发
* create_task返回task对象
* 先不用await
* 再加上await 可以看出时间上运行速度的差距
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| async def process(name: str, num1: int) -> None: print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished')
async def process_api(): start = time.perf_counter() task_1 = asyncio.create_task(process('task1', 1)) task_2 = asyncio.create_task(process('task2', 2)) task_3 = asyncio.create_task(process('task3', 3))
print(f'finished in {time.perf_counter() - start:.10f} seconds')
if __name__ == '__main__': asyncio.run(process_api())
|
取消任务
Task.done()用于判断任务是否完成
Task.cancel()用于取消一个未完成的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| async def process(name: str, num1: int) -> None: print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished')
async def process_api(): start = time.perf_counter() task_1 = asyncio.create_task(process('task1', 1)) task_2 = asyncio.create_task(process('task2', 2)) task_3 = asyncio.create_task(process('task3', 3))
await task_1 await task_2 if not task_3.done(): task_3.cancel() else: await task_3 print(f'finished in {time.perf_counter() - start:.10f} seconds')
if __name__ == '__main__': asyncio.run(process_api())
|
超时取消任务
asyncio.wait_for(Task, timeout=time) 用于设定多久未完成任务,取消任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| async def process(name: str, num1: int) -> None: print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished')
async def process_api(): start = time.perf_counter() task_1 = asyncio.create_task(process('task1', 3)) try: await asyncio.wait_for(task_1, 1) except asyncio.TimeoutError: print('task1 timeout') else: await task_1 print(f'finished in {time.perf_counter() - start:.10f} seconds')
if __name__ == '__main__': asyncio.run(process_api())
|
asyncio.shield(Task)给任务加盾,即使超时了也不会停止任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| async def process(name: str, num1: int) -> None: print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished')
async def process_api(): start = time.perf_counter() task_1 = asyncio.create_task(process('task1', 3)) try: await asyncio.wait_for(asyncio.shield(task_1), 1) except asyncio.TimeoutError: print('task1 timeout') await task_1 print(f'finished in {time.perf_counter() - start:.10f} seconds')
if __name__ == '__main__': asyncio.run(process_api())
|
等待多个任务
asyncio.gather(Task1, Task2,...)可以直接并行多个任务等待结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| async def process(name: str, num1: int): print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished') return f"{name} finished"
async def process_api(): start = time.perf_counter() task_1 = process('task_1', 1) task_2 = process('task_2', 2) task_3 = process('task_3', 3) result = await asyncio.gather(task_1, task_2, task_3) print(f'finished in {time.perf_counter() - start:.10f} seconds') print(result)
if __name__ == '__main__': asyncio.run(process_api())
|
协程异常
return_exceptions=True 在等待多个任务时,若有异常任务会导致整体主线程的中断,可以使用该参数保持其他任务的正常运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| async def process(name: str, num1: int): print(f'{name} started') await asyncio.sleep(1) print(f'{name} processing') await asyncio.sleep(num1 - 1) print(f'{name} finished') return f"{name} finished"
async def exception_process(): raise Exception('error')
async def process_api(): start = time.perf_counter() task_1 = process('task_1', 1) task_2 = process('task_2', 2) task_error = exception_process()
result = await asyncio.gather(task_1, task_2, task_error, return_exceptions=True) print(f'finished in {time.perf_counter() - start:.10f} seconds') print(result)
if __name__ == '__main__': asyncio.run(process_api())
|
更新: 2024-05-16 13:20:45
原文: https://www.yuque.com/zacharyblock/cx2om6/oau1f4oo9g509hgz
Author:
Zachary Block
Permalink:
http://blockzachary.cn/blog/2601309225/
License:
Copyright (c) 2019 CC-BY-NC-4.0 LICENSE