Python中有什么好办法约束一个函数的执行时间吗?

自己搜到的几个方法都有缺陷:

  1. signal: 如果方法运行在一个子线程中, 是不能接受信号的.
  2. 多线程: 开一个子线程执行, python 没有办法直接杀掉子线程.
  3. 开一个子进程, 设置为 daemon, 然后主进程退出他也会退出, 如果函数本身执行在 daemon 线程中, 是不能新建进程的.

我在想 fork 一个独立的进程, 然后发送 kill 信号, 这样会不会有问题, 另外如何得到 fork 出来的线程的输出呢?好像没有 Queue 这种东西可以用了.


Python中有什么好办法约束一个函数的执行时间吗?

16 回复

创建主题时描述错了,为看到的 v 友说声抱歉, Process 和 Thread 说的乱七八糟的, 已改正.


import signal
import time
from contextlib import contextmanager

class TimeoutException(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutException("函数执行超时")

@contextmanager
def time_limit(seconds):
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)

# 使用示例
def long_running_task():
    time.sleep(5)
    return "任务完成"

try:
    with time_limit(3):
        result = long_running_task()
        print(result)
except TimeoutException as e:
    print(f"捕获超时: {e}")

# 多线程方案示例
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError

def run_with_timeout(func, args=(), kwargs={}, timeout=3):
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func, *args, **kwargs)
        try:
            return future.result(timeout=timeout)
        except TimeoutError:
            future.cancel()
            raise TimeoutException(f"函数执行超过{timeout}秒")

# 使用线程池
try:
    result = run_with_timeout(long_running_task, timeout=3)
    print(result)
except TimeoutException as e:
    print(e)

核心就两种方法:用signal给函数设闹钟,或者开个线程池来监控。signal简单但Windows可能不兼容,线程池更通用但开销大点。看你的具体场景选吧。

总结:根据平台和需求选择合适方案。

celery 🙈

老哥,找到什么好方法了公布下

创建一个进程之后获取到 pid,时间到了直接 kill

你是要开发实时系统吗?楼主你说的都是软实时的,简单来说,内核对你的进程进行调度,那么你的函数执行时间上限就有可能会超过了。真要硬实时的话我还是建议你用 C,然后老实地独占一个核心。

不知道是不是想要这种:把函数内的执行代码分成小块,然后每一小块执行完后检查执行时间,超时既返回。如果执行自己写的函数,可以用这种方法。

比如有一句 sleep(10), 这句话没法再被拆分了.

<br>def run_with_limited_time(func, args, kwargs, time):<br> """Runs a function with time limit<br> :param func: The function to run<br> :param args: The functions args, given as tuple<br> :param kwargs: The functions keywords, given as dict<br> :param time: The time limit in seconds<br> :return: True if the function ended successfully. False if it was terminated.<br> """<br> def wrapper(queue, func, *args, **kwargs):<br> return queue.put(func(*args, **kwargs))<br> import multiprocessing<br> q = multiprocessing.Queue(maxsize=1)<br> func = functools.partial(wrapper, q, func)<br> p = Process(target=func, args=args, kwargs=kwargs)<br> p.start()<br> p.join(time)<br> if p.is_alive():<br> p.terminate()<br> raise TimeoutError('time out!')<br><br> return q.get_nowait()<br>

看起来是最好的实现了

循环里设置标记位置,循环外部能控制这个标记位置让循环退出

sleep 可以同信号抛出异常的

哦 重新看了下你说的

用 eventlet 写协程可以做到
因为里面没有真正的 sleep
sleep 都被 hack 成排序调度的位置了

协程本身可以终止

超时装饰器
import signal,functools # 下面会用到的两个库
class TimeoutError(Exception): pass # 定义一个 Exception,后面超时抛出

def timeout(seconds, error_message = ‘Function call timed out’):
def decorated(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return functools.wraps(func)(wrapper)
return decorated

回到顶部