Python中tornado异步编程:为什么拿到gen.return结果后没有在yield处恢复执行?

请教大伙一个问题,我调试了半天,发现 gen.return 返回异步结果后,程序不是接着 yield 的地方执行,而是又跳到 ioloop,tornado 初学,文档较少,希望大家能帮忙指出错误在哪,感激 部分程序如下


@gen.coroutine
def _fetch_and_extract(self, task):
    self.processing_task_number += 1
    self.processing_task_set.add(task)
    if self.processing_task_number != self.processing_task_set.size():ioloop.IOLoop.instance().stop()
    self.logger.info("%s: start to fetch and extract" % self.worker_name)
    if check_task(task):
        try:
            spider = load_object(task["spider"])
        except Exception, e:
            handle_fail_task(task,"load %s object failed" % (task["spider"]),self.process_fail_task_queue, self.wait_for_process_task_queue)
            self.logger.error("%s: load object failed.path:%s, exception:%s" % (self.worker_name, task["spider"], e))
        else:
            spider_object = spider(self.processed_url_set, self.wait_for_process_task_queue)
            fetch_start_time = datetime.datetime.now()
            resp = yield spider_object.fetch(task["request"])
            self.logger.debug("got resp already ”)#已打印
	    #这里已经拿到异步返回的 resp 了,但是代码没有恢复接着往下走,而是直接回到 loop 去取 task 了,但是这时候 task 里空了,所以之后就一直提示任务空
	    #相当于只抓了美团 api 的 city 列表,后面 extract、再添加 task 都没有执行,我 debug 时候比较奇怪这点,按道理异步返回 resp 了应该接着之前代码的位置继续执行 不是么
            if resp == None or resp.error != None:
                handle_fail_task(task, "fetch request: %s failed,code:%d error:%s" % (task["request"], resp.code, resp.error), self.process_fail_task_queue, self.wait_for_process_task_queue)
                self.logger.error("%s: fetch request: %s failed, error: %s" % (self.worker_name, task["request"], resp))
            else:
                content_length = resp.headers['Content-Length'] if resp.headers.has_key('Content-Length') else None
                last_modified = resp.headers['Last-Modified'] if resp.headers.has_key("Last-Modified") else None
                fetch_time = datetime.datetime.now() - fetch_start_time
                extract_start_time = datetime.datetime.now()
                status = yield spider_object.extract(resp, **dict(task["kwargs"])

Python中tornado异步编程:为什么拿到gen.return结果后没有在yield处恢复执行?

4 回复

mark 下,如果有答案了再来更新


这个问题核心在于对Tornado协程执行流程的误解。gen.return返回的值并不会直接让协程从yield处恢复,而是作为yield表达式的结果值返回给调用者。

看个具体例子:

from tornado import gen
from tornado.ioloop import IOLoop

@gen.coroutine
def async_task():
    print("Start async_task")
    # yield暂停协程,将控制权交还给事件循环
    result = yield gen.sleep(1)
    print(f"After yield: {result}")
    # 这里永远不会执行,因为gen.return在yield之前就返回了
    return "final"

@gen.coroutine  
def main():
    print("Calling async_task")
    # async_task()在遇到第一个yield时就返回了Future
    future = async_task()
    
    # 手动设置Future的结果
    future.set_result("manual_result")
    
    # 获取协程最终返回值
    final_result = yield future
    print(f"Final result: {final_result}")

if __name__ == "__main__":
    IOLoop.current().run_sync(main)

执行流程:

  1. async_task()执行到yield gen.sleep(1)时暂停
  2. 返回一个Future对象给调用者
  3. Future.set_result()被调用时,yield表达式获得结果值"manual_result"
  4. 协程从yield处恢复,继续执行后面的代码
  5. 最终返回"final"给调用者

关键点:yield暂停点,不是终点。协程恢复执行需要等待Future有结果,然后事件循环才会调度它继续执行。

一句话总结:yield返回的是Future,恢复执行需要Future有结果。

肉眼表示没看出什么问题,你在那句打印出来 log 的地方以下每隔一行打 log 出来看看呢,或者能否精简个可独立运行的简陋版本出来


‘’‘
class Worker(object):
def init(self,str):
self._name = str
self._installed = False
def install(self):
if self._installed:
print “%s: no need to start again.worker has been installed!” % self._name
else:
self._installed = True
ioloop.IOLoop.instance().add_callback(self.test)
print “%s: worker is installed” % self._name
.coroutine
def fetch(self,url):
req = HTTPRequest(url,connect_timeout=3,request_timeout=5)
client = httpclient.AsyncHTTPClient()
resp = yield gen.Task(client.fetch, req)
raise gen.Return(resp)

.coroutine
def exact(self,resp):
yield gen.sleep(5)
raise gen.Return(10)

.coroutine
def test(self):
if self._installed:
ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
str = self._name
task_url = u’http://api.caipiao.163.com/missNumber_trend.html?gameEn=kuai3’
resp = yield self.fetch(task_url)
print “%s get resp already at %s” %(str,datetime.datetime.now())
staus = yield self.exact(resp)
print “callback!!!%s status returned %d at %s” %(str,staus,datetime.datetime.now())


if name == ‘main’:
arrs = [‘aaa’,‘bbb’]
for arr in arrs:
worker = Worker(arr)
worker.install()
ioloop.IOLoop.instance().start()

’’'

谢谢,还在找原因
请问 ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
这个能让 test 方法定期执行么,测试结果这个间隔没有用

回到顶部