Python中tornado异步编程:为什么拿到gen.return结果后没有在yield处恢复执行?
请教大伙一个问题,我调试了半天,发现 gen.return 返回异步结果后,程序不是接着 yield 的地方执行,而是又跳到 ioloop,tornado 初学,文档较少,希望大家能帮忙指出错误在哪,感激 部分程序如下
@gen.coroutine
def _fetch_and_extract(self, task):
self.processing_task_number += 1
self.processing_task_set.add(task)
if self.processing_task_number != self.processing_task_set.size():ioloop.IOLoop.instance().stop()
self.logger.info("%s: start to fetch and extract" % self.worker_name)
if check_task(task):
try:
spider = load_object(task["spider"])
except Exception, e:
handle_fail_task(task,"load %s object failed" % (task["spider"]),self.process_fail_task_queue, self.wait_for_process_task_queue)
self.logger.error("%s: load object failed.path:%s, exception:%s" % (self.worker_name, task["spider"], e))
else:
spider_object = spider(self.processed_url_set, self.wait_for_process_task_queue)
fetch_start_time = datetime.datetime.now()
resp = yield spider_object.fetch(task["request"])
self.logger.debug("got resp already ”)#已打印
#这里已经拿到异步返回的 resp 了,但是代码没有恢复接着往下走,而是直接回到 loop 去取 task 了,但是这时候 task 里空了,所以之后就一直提示任务空
#相当于只抓了美团 api 的 city 列表,后面 extract、再添加 task 都没有执行,我 debug 时候比较奇怪这点,按道理异步返回 resp 了应该接着之前代码的位置继续执行 不是么
if resp == None or resp.error != None:
handle_fail_task(task, "fetch request: %s failed,code:%d error:%s" % (task["request"], resp.code, resp.error), self.process_fail_task_queue, self.wait_for_process_task_queue)
self.logger.error("%s: fetch request: %s failed, error: %s" % (self.worker_name, task["request"], resp))
else:
content_length = resp.headers['Content-Length'] if resp.headers.has_key('Content-Length') else None
last_modified = resp.headers['Last-Modified'] if resp.headers.has_key("Last-Modified") else None
fetch_time = datetime.datetime.now() - fetch_start_time
extract_start_time = datetime.datetime.now()
status = yield spider_object.extract(resp, **dict(task["kwargs"])
Python中tornado异步编程:为什么拿到gen.return结果后没有在yield处恢复执行?
这个问题核心在于对Tornado协程执行流程的误解。gen.return返回的值并不会直接让协程从yield处恢复,而是作为yield表达式的结果值返回给调用者。
看个具体例子:
from tornado import gen
from tornado.ioloop import IOLoop
@gen.coroutine
def async_task():
print("Start async_task")
# yield暂停协程,将控制权交还给事件循环
result = yield gen.sleep(1)
print(f"After yield: {result}")
# 这里永远不会执行,因为gen.return在yield之前就返回了
return "final"
@gen.coroutine
def main():
print("Calling async_task")
# async_task()在遇到第一个yield时就返回了Future
future = async_task()
# 手动设置Future的结果
future.set_result("manual_result")
# 获取协程最终返回值
final_result = yield future
print(f"Final result: {final_result}")
if __name__ == "__main__":
IOLoop.current().run_sync(main)
执行流程:
async_task()执行到yield gen.sleep(1)时暂停- 返回一个
Future对象给调用者 - 当
Future.set_result()被调用时,yield表达式获得结果值"manual_result" - 协程从
yield处恢复,继续执行后面的代码 - 最终返回
"final"给调用者
关键点:yield是暂停点,不是终点。协程恢复执行需要等待Future有结果,然后事件循环才会调度它继续执行。
一句话总结:yield返回的是Future,恢复执行需要Future有结果。
肉眼表示没看出什么问题,你在那句打印出来 log 的地方以下每隔一行打 log 出来看看呢,或者能否精简个可独立运行的简陋版本出来
‘’‘
class Worker(object):
def init(self,str):
self._name = str
self._installed = False
def install(self):
if self._installed:
print “%s: no need to start again.worker has been installed!” % self._name
else:
self._installed = True
ioloop.IOLoop.instance().add_callback(self.test)
print “%s: worker is installed” % self._name
.coroutine
def fetch(self,url):
req = HTTPRequest(url,connect_timeout=3,request_timeout=5)
client = httpclient.AsyncHTTPClient()
resp = yield gen.Task(client.fetch, req)
raise gen.Return(resp)
.coroutine
def exact(self,resp):
yield gen.sleep(5)
raise gen.Return(10)
.coroutine
def test(self):
if self._installed:
ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
str = self._name
task_url = u’http://api.caipiao.163.com/missNumber_trend.html?gameEn=kuai3’
resp = yield self.fetch(task_url)
print “%s get resp already at %s” %(str,datetime.datetime.now())
staus = yield self.exact(resp)
print “callback!!!%s status returned %d at %s” %(str,staus,datetime.datetime.now())
if name == ‘main’:
arrs = [‘aaa’,‘bbb’]
for arr in arrs:
worker = Worker(arr)
worker.install()
ioloop.IOLoop.instance().start()
’’'
谢谢,还在找原因
请问 ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
这个能让 test 方法定期执行么,测试结果这个间隔没有用


