Python中有什么好办法约束一个函数的执行时间吗?
自己搜到的几个方法都有缺陷:
- signal: 如果方法运行在一个子线程中, 是不能接受信号的.
- 多线程: 开一个子线程执行, python 没有办法直接杀掉子线程.
- 开一个子进程, 设置为 daemon, 然后主进程退出他也会退出, 如果函数本身执行在 daemon 线程中, 是不能新建进程的.
我在想 fork 一个独立的进程, 然后发送 kill 信号, 这样会不会有问题, 另外如何得到 fork 出来的线程的输出呢?好像没有 Queue 这种东西可以用了.
Python中有什么好办法约束一个函数的执行时间吗?
创建主题时描述错了,为看到的 v 友说声抱歉, Process 和 Thread 说的乱七八糟的, 已改正.
import signal
import time
from contextlib import contextmanager
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("函数执行超时")
@contextmanager
def time_limit(seconds):
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
# 使用示例
def long_running_task():
time.sleep(5)
return "任务完成"
try:
with time_limit(3):
result = long_running_task()
print(result)
except TimeoutException as e:
print(f"捕获超时: {e}")
# 多线程方案示例
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def run_with_timeout(func, args=(), kwargs={}, timeout=3):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except TimeoutError:
future.cancel()
raise TimeoutException(f"函数执行超过{timeout}秒")
# 使用线程池
try:
result = run_with_timeout(long_running_task, timeout=3)
print(result)
except TimeoutException as e:
print(e)
核心就两种方法:用signal给函数设闹钟,或者开个线程池来监控。signal简单但Windows可能不兼容,线程池更通用但开销大点。看你的具体场景选吧。
总结:根据平台和需求选择合适方案。
docker
celery 🙈
老哥,找到什么好方法了公布下
创建一个进程之后获取到 pid,时间到了直接 kill
你是要开发实时系统吗?楼主你说的都是软实时的,简单来说,内核对你的进程进行调度,那么你的函数执行时间上限就有可能会超过了。真要硬实时的话我还是建议你用 C,然后老实地独占一个核心。
不知道是不是想要这种:把函数内的执行代码分成小块,然后每一小块执行完后检查执行时间,超时既返回。如果执行自己写的函数,可以用这种方法。
比如有一句 sleep(10), 这句话没法再被拆分了.
<br>def run_with_limited_time(func, args, kwargs, time):<br> """Runs a function with time limit<br> :param func: The function to run<br> :param args: The functions args, given as tuple<br> :param kwargs: The functions keywords, given as dict<br> :param time: The time limit in seconds<br> :return: True if the function ended successfully. False if it was terminated.<br> """<br> def wrapper(queue, func, *args, **kwargs):<br> return queue.put(func(*args, **kwargs))<br> import multiprocessing<br> q = multiprocessing.Queue(maxsize=1)<br> func = functools.partial(wrapper, q, func)<br> p = Process(target=func, args=args, kwargs=kwargs)<br> p.start()<br> p.join(time)<br> if p.is_alive():<br> p.terminate()<br> raise TimeoutError('time out!')<br><br> return q.get_nowait()<br>
看起来是最好的实现了
循环里设置标记位置,循环外部能控制这个标记位置让循环退出
不行。
sleep 可以同信号抛出异常的
哦 重新看了下你说的
用 eventlet 写协程可以做到
因为里面没有真正的 sleep
sleep 都被 hack 成排序调度的位置了
协程本身可以终止

