#!/usr/bin/env python
Utilities for handling timeout and async calls

from __future__ import annotations

import atexit
from concurrent.futures import TimeoutError
from concurrent.futures import thread as thread
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import contextmanager

from negmas import warnings

__all__ = ["TimeoutError", "TimeoutCaller", "force_single_thread", "single_thread"]

[docs] def force_single_thread(on: bool = True): """ Forces negmas to use a single thread for all internal calls. Remarks: - This will have the effect of not enforcing time-limits on calls. - Only use this with caution and for debugging. """ global SINGLE_THREAD_FORCED SINGLE_THREAD_FORCED = on
[docs] @contextmanager def single_thread(): force_single_thread(True) yield None force_single_thread(False)
def is_single_thread() -> bool: return SINGLE_THREAD_FORCED
[docs] class TimeoutCaller: pool = None
[docs] @classmethod def get_pool(cls): if cls.pool is None: cls.pool = ThreadPoolExecutor() return cls.pool
[docs] @classmethod def run(cls, to_run, timeout: float): if is_single_thread(): return to_run() pool = cls.get_pool() future = pool.submit(to_run) # type: ignore (Probably ok) try: result = future.result(timeout) return result except TimeoutError as s: future.cancel() raise s
[docs] @classmethod def cleanup(cls): if cls.pool is not None: try: cls.pool.shutdown(wait=False) for t in cls.pool._threads: # we are using an undocumented private value here. DANGEROUS del thread._threads_queues[t] # type: ignore except Exception: warnings.warn( "NegMAS have finished processing but there may be some " "threads still hanging there!! If your program does " "not die by itself. Please press Ctrl-c to kill it", warnings.NegmasShutdownWarning, )
def cleanup(): """ Used to cleanup at normal program exit """ TimeoutCaller.cleanup() TimeoutCaller.get_pool() atexit.register(cleanup)