Python Asyncio API

前言

前面的文章中, Python协程 简单的介绍了一下python的协程的基础。在这段期间,我使用了Sanic(python异步框架)完成了一个小项目的后端。期间,大体看了Sanic源码,有很多和flask的思路和设计是一样的,最大的区别就是它是异步驱动的,它里面是使用了uvloop的事件循环(宣称代替python自带的,比nodo.js, gevent至少快两倍),但里面用的很多API都是python自带的,所以这篇文章只是整理一下python官方库的asyncio的api。对uvloop感兴趣的,参考 https://magic.io/blog/uvloop-blazing-fast-python-networking/

1
2
3
4
5
6
# uvloop 使用
# $ pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.get_event_loop() # 得到一个uvloop的实例

High-level Apis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio

async def main():
print('hello')
await asyncio.sleep(1)
print('world')

# awaitable object: coroutines, tasks, futures 可以使用在await表达式内的

asyncio.run(main()) # 最基本协程调用
task = asyncio.create_task(main()) # 异步任务 python3.7
await task
task = asyncio.ensure_future(main()) # 异步任务 all version
await asyncio.gather(main(), main(), main()) # 自动转为task, 同时运行,并发

try:
await asyncio.wait_for(main(), timeout=1.0)
execpt asyncio.TimeoutError:
print('timeout')

done, pending = await asyncio.wait({task}) # 同时运行task

# Streams
reader, writer = await asyncio.open_connection('127.0.0.1', 8888) # 建立一个网络连接
asyncio.start_server() # 开始一个socket服务

# TCP Client
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

print(f'Send: {message}')
writer.write(message.encode())

data = await reader.read(n=-1) # read 所有
print(f'Received: {data.decode()}')

print('Close the connection')
writer.close()

asyncio.run(tcp_echo_client('hello, world'))

# TCP Server
async def handle_echo(reader, writer):
data = await reader.read(n=-1)
message = data.decode()
addr = writer.get_extra_info('itswcg')

print(f'Received: {message} from {addr}')

print(f'Send: {message}')
writer.write(data)
await writer.drain()

print('Close the connection')
writer.close()

async def main():
server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)

addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')

async with server:
await server.serve_forever()

asyncio.run(main())

lock = asyncio.Lock()
async with lock:
pass

event = asyncio.Event()

cond = asyncio.Condition()
async with cond:
await cond.wait()

proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

queue = asyncio.Queue() # 不是线程安全的, python线程安全不是针对对象的,而是针对操作的。
# list.append() 是线程安全的, list[0] += 1 不是线程安全的。list是python内部加了锁。queue是线程安全的。

Low-level Apis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

# 事件循环
# Event Loop是一个程序结构,用于等待和发送消息和事件
# 简单来说, 就是在程序中设置两个线程:一个负责程序本身的运行,称为"主线程";另一个负责主线程与其他进程(主要是各种I/O操作)的通信, 所以事件循环是单线程是指主线程是单线程。
# 每当遇到I/O的时候,主线程就让Event Loop线程去通知相应的I/O程序,然后接着往后运行,所以不存在红色的等待时间。等到I/O程序完成操作,Event Loop线程再把结果返回主线程。主线程就调用事先设定的回调函数,完成整个任务。

loop = asyncio.get_event_loop() # 获取当前的,如果没有就会自动创建
asyncio.get_running_loop() # 获取当前线程正在运行的事件循环
asyncio.set_event_loop()
asyncio.new_event_loop()

loop.run_until_complete(asyncio.wait([])) # 同时执行,直到完成
loop.run_forever() # 运行event直到stop
...

参考

https://docs.python.org/3/library/asyncio.html

----------本文完,感谢您的阅读----------