本文和大家分享的主要是
python开发中Tornado
协程相关问题,一起来看看吧,希望对大家学习和使用这部分内容有所帮助。
什么是协程
我们经常使用的函数又称子例程(subroutine)
,往往只有一个入口(函数的第一行),一个出口(
return
、抛出异常)。子例程在入口时获得控制权,在出口时把控制权交还给调用者。一旦交还控制权,就意味着例程的结束,函数中做的所有工作以及保存在局部变量中的数据都将被释放。而协程可以有多个入口点,允许从一个入口点执行到下一个入口点之前暂停,保存执行状态;等到合适的时机恢复执行状态,从下一个入口点重新开始执行。
"Subroutines are special cases of ... coroutines." –Donald Knuth.
可以把协程看做是子例程的泛化形式。
在Python
中,协程基于生成器。它可以有多个出口和入口。出口有两种类型:一种是
return
,用于永久交还控制权,效果同子例程;另一种是
yield
,用于暂时交还控制权,函数将来还会收回控制权。当然入口也有两种:一种是函数的第一行;另一种是上次
yield
的那一行。
Tornado
中的协程
Tornado
典型的协程例子:
class GenAsyncHandler(RequestHandler): @gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")
我们为这个Handler
的
get()
添加了装饰器
gen.coroutine
,从而使其成为一个协程。不难理解,面对耗时的操作,利用协程可以达到异步的效果,需要等待的时候
yield
出去,等待结束后重新回来继续执行。如例子中
get
需要等待http_client.fetch("http://example.com")
的结果,因此通过yield
返回;当获取到结果后,
get
函数从上次离开处继续运行,且
response
被赋值为 http_client.fetch("http://example.com")
的结果。
这主要涉及到以下几样东西:
1.Future
Future
的设计目标是作为协程
(coroutine)
和
IOLoop
的媒介,从而将协程和
IOLoop
关联起来。
Future
在
concurrent.py
中定义,是异步操作结果的占位符,用于等待结果返回。通常作为函数
IOLoop.add_future()
的参数或
gen.coroutine
协程中
yield
的返回值。
等到结果返回时,外部可以通过调用set_result()
设置真正的结果,然后调用所有回调函数,恢复协程的执行。
2.IOLoop
IOLoop
是一个
I/O
事件循环,用于调度
socket
相关的连接、响应、异步读写等网络事件,并支持在事件循环中添加回调
(callback)
和定时回调
(timeout)
。在支持的平台上,默认使用
epoll
进行
I/O
多路复用处理。
IOLoop
是
Tornado
的核心,绝大部分模块都依赖于
IOLoop
的调度。在协程运行环境中,
IOLoop
担任着协程调度器的角色,能够让暂停的协程重新获得控制权,从而能继续执行。
IOLoop
通过
add_future()
对
Future
的支持:
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(lambda future: self.add_callback(callback, future))
通过调用future
的
add_done_callback()
,使当
future
在操作完成时,能够通过
add_callback
将
callback
添加到
IOLoop
中,让
callback
在
IOLoop
下一次迭代中执行
(
不在本轮是为了避免饿死
)
。
3.Runner
Runner
和
coroutine
都在
gen.py
中定义。
Runner
由
coroutine
装饰器创建,用于维护挂起的回调函数及结果的相关信息,包括中间结果
(future)
和最终结果
(result_future)
。
4.coroutine
gen.coroutine
是一个装饰器,负责将普通函数包装成协程。功能包括:
·
调用函数,如果该函数有
yield
,则返回生成器。否则立即得到结果,不属于协程。
·
通过
next()
执行生成器,如果未能结束
(
遇到
yield)
,则生成中间类
Runner
对象,用于保存生成器、
yield
返回值和最终返回值。
·
创建
Runner
。
结合以上几样东西,协程的具体流程整理如下:
每一次协程调用yield
释放控制权后:
-> [Runner]handle_yield
处理yield
返回的结果
-> [Runner]ioloop.add_future(self.future, lambda f: self.run())
将结果构造成future
后添加到ioloop
-> [future]add_done_callback(lambda future: self.add_callback(callback, future))
将Runner.run()
加入到完成时的回调函数列表中
???
触发:
-> [future]set_result
已经得到future
的结果,设置之
-> [future]_set_done
调用future
所有回调函数
(_callbacks)
-> [ioloop]add_callback(callback, future)
callback
为
[Runner]add_future
添加的那个,即
[Runner]self.run()
,将在下一轮循环被执行
-> [Runner]self.run()
取出Runner
的self.future(
上次yield
的返回值)
:
1.
如果
future
未完成,return
,流程结束,等待下一次set_result
2.
如果
future
完成
-> [Runner]yielded = self.gen.send(value)
通过send
把
future
的
result
发送给协程,并让其恢复执行:
1.
如果协程结束
(
没yield
了)
-> [Runner]self.result_future.set_result
设置最终的结果result_future
2.
未结束
(
再次遇到yield)
-> [Runner]handle_yield
则再次调用handle_yield
从以上的流程可以看出,每一次协程调用yield
释放控制权后的恢复,都依赖于
set_result()
的调用。当我们把目光看向总调度
IOLoop
时,可以发现
IOLoop
只是忠实地在每一轮迭代中调用那些就绪的回调函数,并没有主动调用
set_result()
的能力。
那么,在Tornado
中,这个
set_result()
到底是谁调用?在何时调用?
搜遍了Tornado
的代码,主要找到以下几个调用点:
1.
在
coroutine
装饰器中,如果装饰的函数调用后直接结束
(
没
yield)
,直接
set_result()
。
2.
在
[Runner]
的
run()
中,如果调用
send
后协程结束,对
result_future
进行
set_result()
。
3.
取决于
yield
后阻塞操作的具体实现,下面以例子中
AsyncHTTPClient
的
fetch()
来进行分析。
AsyncHTTPClient
的
fetch()
是一个异步操作,其构造了一个
HTTP
请求,然后调用
fetch_impl()
,返回一个
future
。
fetch_impl()
取决于
AsyncHTTPClient
的具体实现,默认情况下,
AsyncHTTPClient
生成的是子类
SimpleAsyncHTTPClient
的实例,所以主要看
SimpleAsyncHTTPClient
的
fetch_impl()
:
def fetch_impl(self, request, callback):
key = object()
self.queue.append((key, request, callback))
if not len(self.active) < self.max_clients:
timeout_handle = self.io_loop.add_timeout(
self.io_loop.time() + min(request.connect_timeout,
request.request_timeout),
functools.partial(self._on_timeout, key))
else:
timeout_handle = None
self.waiting[key] = (request, callback, timeout_handle)
self._process_queue()
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
"%d active, %d queued requests." % (len(self.active), len(self.queue)))
fetch_impl()
接受两个参数,
request
为
fetch()
中构造的
HTTP
请求,
callback
为
fetch
中的回调函数
handle_response
:
def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
在handle_response()
中,调用了我们期待的
set_result()
。所以我们把目光转移到
fetch_impl()
的
callback
。在
fetch_impl()
中,函数先将
callback
加到队列中,然后通过
_process_queue()
处理掉,处理时调用
_handle_request()
:
def _handle_request(self, request, release_callback, final_callback):
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
这里构造了一个_connection_class
对象,即
HTTPConnection
。
HTTPConnection
通过
self.tcp_client.connect()
来建立
TCP
连接,然后通过该连接发送
HTTP
请求, 在超时
(timeout)
或完成
(finish)
时调用
callback
。
tcp_client
在建立异步
TCP
连接时,先进行
DNS
解析
(
又是协程
)
,然后建立
socket
来构造
IOStream
对象,最后调用
IOStream.connect()
。在
IOStream.connect()
的过程中,我们看到了关键操作:
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
还记得我们前面说过的IOLoop
吗?
IOLoop
可以添加
socket
、
callback
和
timeout
,并当它们就绪时调用相应的回调函数。这里
add_handler
处理的就是
socket
的多路复用,默认的实现是
epoll
。当
epoll
中该
socket
就绪时,相关函数得以回调。于是
tcp_client
读取
socket
内容获得
HTTP response
,
handle_response()
被调用,最终
set_result()
被调用。
到这里我们恍然大悟,AsyncHTTPClient
的
set_result()
调用依赖于
IO
多路复用方案,这里是
epoll
,在
epoll
中相应
socket
的就绪的是
set_result()
得到调用的根本原因。而这个就绪事件的传递,离不开
Tornado
内建的
IOStream
,异步
TCPClient
、异步
HTTPConnection
,这些类的存在为我们隐藏了简单调用后的复杂性。因此当我们在用
yield
返回耗时操作时,如果不是
Tornado
的内建组件,则必须自己负责设计
set_result
的方案,比如以下代码:
@gen.coroutinedef add(self, a, b):
future = Future()
def callback(a, b):
print("calculating the sum of %d + %d:" % (a,b))
future.set_result(a+b)
tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
result = yield future
print("%d + %d = %d" % (a, b, result))
通过手动将包含set_result()
的回调函数加到
IOLoop
中,于是回调下一次迭代中执行,
set_result()
被调用,协程恢复控制权。
总结
实现高性能服务端,同步多进程、多线程风靡一时,然而由于其需要在内核态进行上下文切换,同步时还要加锁,导致并发性能低下。于是异步冒出来了,如Session
、状态机,当然还有本文讨论的协程。
学习协程,主要的原因是在看SS
代码中深受状态机代码的折磨
(
为啥要看
SS
的代码?你懂的
)
。因为在状态机中,逻辑代码被分割成多块分散在
N
个回调里
(
各种
on_xxxxx)
,割裂了人的顺序性思维,在阅读代码、理解逻辑时跳来跳去令人痛不欲生。反观协程,真正做到了同步编码异步执行。
然而深挖协程的实现发现,协程的高性能和易编写易阅读是以后端框架复杂的封装为代价的。即使是在Python
中我们拥有能够维护上下文的生成器,为了实现协程的调度,
Tornado
依然耗费了不少功夫:从定义
Future
对象用于等待结果返回,到使用
coroutine
装饰器将生成器的
yield
返回值封装成
Runner
,最后到
set_result
让
Runner
重新跑起来,而这些都依赖于
IOLoop
的调度。在经过层层跳转后达成了协程的调度目的,不得不感慨
Tornado
设计的巧妙。
不管怎么说,对于我等码农来说,协程代码写起来真的爽,读起来也也很爽,可以少死很多脑细胞,这就够了。
来源:binsite