异步 IO

协程

类似于小学时候的一个计算任务,协程就类似于同时做多件家务,烧开水的同时 启动了洗衣机 之后拖地

而不是烧开水一直等,然后拖地,然后洗衣机等着

asyncio 模块

该模块通过一个线程去执行并发任务

  • async

需要创建一个协程的时候,可以使用 async 关键字将一个函数声明为协程

协程的协有协助之意,协程就是通过一个线程去执行并发,一个任务就是一个协程,也可以是多个任务通过一个协程调用多次来实现

协程的核心是通过事件循环实现的

1
2
async def func():
pass
  • asyncio.run() 启动一个协程
1
2
3
4
5
6
7
async def multi(num1: int, num2: int):
res = num1 * num2
print(f'{num1} * {num2} = {res}')


if __name__ == '__main__':
asyncio.run(multi(3,7))
  • await 协程中调用另外一个协程
1
2
3
4
5
6
7
8
9
10
11
async def multi(num1: int, num2: int):
res = num1 * num2
print(f'{num1} * {num2} = {res}')


async def multi_api():
await multi(1, 2)


if __name__ == '__main__':
asyncio.run(multi_api())

创建任务

  • asyncio.create_task
    • 先来一个没有实现并发的调用,由时间上的统计来看,是一个串行的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
async def process(name: str, num1: int) -> None:
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')


async def process_api():
start = time.perf_counter()
await process('task1', 1)
await process('task2', 2)
await process('task3', 3)
print(f'finished in {time.perf_counter() - start:.2f} seconds')


if __name__ == '__main__':
asyncio.run(process_api())

# task1 started
# task1 processing
# task1 finished
# task2 started
# task2 processing
# task2 finished
# task3 started
# task3 processing
# task3 finished
# finished in 6.01 seconds
- 通过`asyncio.create_task`实现并发
    * create_task返回task对象
    * 先不用await
    * 再加上await 可以看出时间上运行速度的差距
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def process(name: str, num1: int) -> None:
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')


async def process_api():
start = time.perf_counter()
task_1 = asyncio.create_task(process('task1', 1))
task_2 = asyncio.create_task(process('task2', 2))
task_3 = asyncio.create_task(process('task3', 3))

# await task_1
# await task_2
# await task_3
print(f'finished in {time.perf_counter() - start:.10f} seconds')


if __name__ == '__main__':
asyncio.run(process_api())

取消任务

  • Task.done()用于判断任务是否完成
  • Task.cancel()用于取消一个未完成的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async def process(name: str, num1: int) -> None:
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')


async def process_api():
start = time.perf_counter()
task_1 = asyncio.create_task(process('task1', 1))
task_2 = asyncio.create_task(process('task2', 2))
task_3 = asyncio.create_task(process('task3', 3))

await task_1
await task_2
if not task_3.done():
task_3.cancel()
else:
await task_3
print(f'finished in {time.perf_counter() - start:.10f} seconds')


if __name__ == '__main__':
asyncio.run(process_api())

超时取消任务

  • asyncio.wait_for(Task, timeout=time) 用于设定多久未完成任务,取消任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def process(name: str, num1: int) -> None:
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')


async def process_api():
start = time.perf_counter()
task_1 = asyncio.create_task(process('task1', 3))
try:
await asyncio.wait_for(task_1, 1)
except asyncio.TimeoutError:
print('task1 timeout')
else:
await task_1
print(f'finished in {time.perf_counter() - start:.10f} seconds')


if __name__ == '__main__':
asyncio.run(process_api())
  • asyncio.shield(Task)给任务加盾,即使超时了也不会停止任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def process(name: str, num1: int) -> None:
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')


async def process_api():
start = time.perf_counter()
task_1 = asyncio.create_task(process('task1', 3))
try:
await asyncio.wait_for(asyncio.shield(task_1), 1)
except asyncio.TimeoutError:
print('task1 timeout')
await task_1
print(f'finished in {time.perf_counter() - start:.10f} seconds')


if __name__ == '__main__':
asyncio.run(process_api())

等待多个任务

  • asyncio.gather(Task1, Task2,...)可以直接并行多个任务等待结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def process(name: str, num1: int):
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')
return f"{name} finished"


async def process_api():
start = time.perf_counter()
task_1 = process('task_1', 1)
task_2 = process('task_2', 2)
task_3 = process('task_3', 3)
result = await asyncio.gather(task_1, task_2, task_3)
print(f'finished in {time.perf_counter() - start:.10f} seconds')
print(result)


if __name__ == '__main__':
asyncio.run(process_api())

协程异常

  • return_exceptions=True 在等待多个任务时,若有异常任务会导致整体主线程的中断,可以使用该参数保持其他任务的正常运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def process(name: str, num1: int):
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} processing')
await asyncio.sleep(num1 - 1)
print(f'{name} finished')
return f"{name} finished"


async def exception_process():
raise Exception('error')


async def process_api():
start = time.perf_counter()
task_1 = process('task_1', 1)
task_2 = process('task_2', 2)
task_error = exception_process()

result = await asyncio.gather(task_1, task_2, task_error,
return_exceptions=True)
print(f'finished in {time.perf_counter() - start:.10f} seconds')
print(result)


if __name__ == '__main__':
asyncio.run(process_api())

更新: 2024-05-16 13:20:45
原文: https://www.yuque.com/zacharyblock/cx2om6/oau1f4oo9g509hgz