negmas.helpers¶
Helper modueles
- class negmas.helpers.PathLike[source]¶
Bases:
ABCAbstract base class for implementing the file system path protocol.
- class negmas.helpers.TimeoutCaller[source]¶
Bases:
objectExecutes callables with timeout support using a shared thread pool.
- classmethod run(to_run, timeout: float)[source]¶
Executes a callable with a timeout limit.
- Parameters:
to_run – A callable to execute in the thread pool.
timeout – Maximum seconds to wait before raising TimeoutError.
- Remarks:
On timeout the worker thread is asked to stop by injecting an exception into it (best-effort, pure-Python loops only). This prevents the thread from leaking and burning CPU after the call is abandoned. CPU-bound C-extension code cannot be interrupted this way; use process isolation for a hard guarantee.
- negmas.helpers.camel_case(s: str, capitalize_first: bool = False, lower_first: bool = False) str[source]¶
Converts a string from snake_case to CamelCase
Example
>>> print(camel_case("this_is_a_test")) thisIsATest >>> print(camel_case("this_is_a_test", capitalize_first=True)) ThisIsATest >>> print(camel_case("This_is_a_test", lower_first=True)) thisIsATest >>> print(camel_case("This_is_a_test")) ThisIsATest
- Parameters:
s – input string
capitalize_first – if true, the first character will be capitalized
lower_first – If true, the first character will be lowered
- Returns:
converted string
- Return type:
- negmas.helpers.create_loggers(file_name: str | None = None, module_name: str | None = None, screen_level: int | None = 30, file_level: int | None = 10, format_str: str = '%(asctime)s - %(levelname)s - %(message)s', colored: bool = True, app_wide_log_file: bool = True, module_wide_log_file: bool = False) Logger[source]¶
Create a set of loggers to report feedback.
The logger created can log to both a file and the screen at the same time with adjustable level for each of them. The default is to log everything to the file and to log WARNING at least to the screen
- Parameters:
module_wide_log_file
app_wide_log_file
file_name – The file to export_to the logs to. If None only the screen is used for logging. If empty, a time-stamp is used
module_name – The module name to use. If not given the file name without .py is used
screen_level – level of the screen logger
file_level – level of the file logger
format_str – the format of logged items
colored – whether or not to try using colored logs
- Returns:
The logger
- Return type:
- negmas.helpers.distribute_integer_randomly(n: int, m: int, min_per_bin: int | None = 1) list[int][source]¶
Distributes an integer n over a list of m values randomly, with each value at least one.
- Parameters:
n – The integer to distribute.
m – The number of values to distribute over.
min_per_bin – Minimum number of elements per bin. This is only guaranteed if n >= m * min_per_bin. If None, then the distribution will be as normal as possible
- Returns:
A list of m integers, where each value is at least one.
- Remarks:
if n < m * min_per_bin, n will be distributed randomly with some zeros inserted.
- negmas.helpers.exception2str(limit=None, chain=True) str[source]¶
Exception2str.
- Parameters:
limit – Limit.
chain – Chain.
- Returns:
The result.
- Return type:
- negmas.helpers.floatin(x: float | tuple[float, float] | Sequence[float], log_uniform: bool) float[source]¶
Samples a value. A random choice is made if x is another sequence, a value between the two limits is used if the input is a tuple otherwise x is returned
- negmas.helpers.force_single_thread(on: bool = True)[source]¶
Forces negmas to use a single thread for all internal calls.
- Remarks:
This will have the effect of not enforcing time-limits on calls.
Only use this with caution and for debugging.
- negmas.helpers.generate_random_weights(n)[source]¶
Generates a list of n random weights between 0 and 1 that sum to 1.
- Parameters:
n – The number of weights to generate.
- Returns:
A list of n random weights between 0 and 1 that sum to 1.
- negmas.helpers.get_class(class_name: str | type, module_name: str | None = None, scope: dict | None = None, allow_nonstandard_names=False, extra_modules: tuple[str, ...] = (), extra_paths: tuple[str | Path, ...] = ()) type[source]¶
Imports and creates a class object for the given class name
- negmas.helpers.get_full_type_name(t: None) None[source]¶
- negmas.helpers.get_full_type_name(t: str) str
- negmas.helpers.get_full_type_name(t: type[Any] | Callable) str
Gets the full type name of a type. You should not pass an instance to this function but it may just work.
An exception is that if the input is of type
stror if it is None, it will be returned as it is
- negmas.helpers.humanize_time(secs: int | float | None, align=False, always_show_all_units=False, show_us=False, show_ms=False, always_show_from='') str | None[source]¶
Prints time that is given as seconds in human readable form. Useful only for times >=1sec.
- Parameters:
secs – float: number of seconds
align – bool, optional: whether to align outputs so that they all take the same size (not implemented)
always_show_all_units – bool, optional: Whether to always show days, hours, and minutes even when they are zeros. default False
always_show_from – One of d,h,m,s,ms,u (day, hour, minute, second, milli-sec, micro-sec) to always show as well as everything shorter than it (i.e passing ‘m’ shows minutes, seconds, … etc)
show_us – bool, if given microseconds and milliseconds will be shown
show_ms – bool, if given milliseconds will be shown
- Returns:
str: formated string with the humanized form
- negmas.helpers.import_by_name(full_name: str) Any[source]¶
Imports something form a module using its full name
- negmas.helpers.instantiate(class_name: str | type | None, module_name: str | None = None, scope: dict | None = None, **kwargs) Any[source]¶
Imports and instantiates an object of a class
- negmas.helpers.intin(x: int | tuple[int, int] | Sequence[int], log_uniform: bool = False) int[source]¶
Samples a value. A random choice is made if x is another sequence, a value between the two limits is used if the input is a tuple otherwise x is returned
- negmas.helpers.is_jsonable(x)[source]¶
Check if an object can be serialized to JSON.
- Parameters:
x – Object to test for JSON compatibility
- Returns:
True if the object can be serialized to JSON, False otherwise
- Return type:
- negmas.helpers.is_lambda_or_partial_function(obj)[source]¶
Checks if the given object is a lambda function or a partial function
- negmas.helpers.is_not_lambda_nor_partial_function(obj)[source]¶
Checks if the given object is not a lambda function
- negmas.helpers.kill_future_process(future: Future, pool: ProcessPoolExecutor, wait_time: float = 10.0) bool[source]¶
Best-effort termination of the worker process running
future.Returns
Trueif the kill sequence completed without raising. On POSIX aSIGTERMis sent first, thenSIGKILLif the process is still alive afterwait_timeseconds. On Windows the worker is terminated directly via the multiprocessing handle.
- negmas.helpers.make_callable(x: dict | Sequence | Callable | None) Callable[source]¶
Converts its input to a callable (i.e. can be called using () operator).
Examples
>>> make_callable(lambda x: 2 * x)(4) 8
>>> make_callable(dict(a=1, b=3))("a") 1
>>> make_callable((3, 4, 5))(1) 4
- negmas.helpers.make_process_executor(*, max_workers: int | None = None, max_tasks_per_child: int | None = None) ProcessPoolExecutor[source]¶
Create a
ProcessPoolExecutorwith safe defaults for negmas.On Python <= 3.10
max_tasks_per_childis silently dropped (the parameter does not exist). On newer Pythons it is forwarded as-is; the defaultNonedisables worker recycling, which is required to avoid the 3.11ProcessPoolExecutordeadlock during worker replacement.
- negmas.helpers.monotonic_minmax(input: HasMinMax, f: Callable[[Any], float]) tuple[float, float][source]¶
Finds the limits of a function
ffor the input assuming that it is monotonic and input hasmin_valueandmax_valuemembers
- negmas.helpers.monotonic_multi_minmax(input: Iterable[HasMinMax], f: Callable[[Any], float]) tuple[float, float][source]¶
Finds the limits of a function
ffor the input assuming that it is monotonic and each input hasmin_valueandmax_valuemembers
- negmas.helpers.nonmonotonic_minmax(input: Iterable, f: Callable[[Any], float]) tuple[float, float][source]¶
Finds the limits of a function
ffor the input assuming that it is non-monotonic and input is iterable
- negmas.helpers.nonmonotonic_multi_minmax(input: Iterable[Iterable], f: Callable[[Any], float]) tuple[float, float][source]¶
Finds the limits of a function
ffor each input assuming that it is non-monotonic and input is iterable
- negmas.helpers.parse_parallelism(method: str) tuple[str, int | None][source]¶
Parse a parallelism specifier of the form
"kind[:fraction]".Returns
(kind, max_workers). When no fraction is given the returnedmax_workersisNoneand the executor will pick a default.
- negmas.helpers.pretty_string(src, tab_size=2, compact=False) str[source]¶
Recursively print nested elements.
- Parameters:
- Returns:
The pretty version of the input
- Return type:
- Remarks:
This function assumes that the patterns `` “`` and
":do not appear anywhere in the input. If they appear, the space, : will be removed.
- negmas.helpers.read_lines(path: Path) list[str][source]¶
Read a list of strings from a file, one per line.
- negmas.helpers.resolve_cpus(njobs: int | None) int[source]¶
Convert an
njobsvalue to a concrete worker count.Semantics match the existing call sites in
cartesian.py:njobs is Noneornjobs == 0-> use all available coresnjobs > 0->min(cpu_count(), njobs)njobs < 0-> caller is expected to branch to serial; we still return1so the value is safe to feed to an executor.
- negmas.helpers.run_isolated_tasks(tasks: Iterable[tuple[Any, Callable[[...], Any], tuple, dict]], *, max_workers: int, timeout: float | None = None, total_timeout: float | None = None, max_tasks: int | None = 50, window: int | None = None, on_result: Callable[[Any, Any, int, int], None] | None = None, on_timeout: Callable[[Any, int, int], None] | None = None, on_error: Callable[[BaseException, Any, int, int], None] | None = None, track: Callable[[...], Iterable] | None = None, description: str = 'Running') int[source]¶
Run
tasksin isolated worker processes with a real per-task timeout.Each task is
(info, fn, args, kwargs)whereinfois opaque metadata round-tripped to the callbacks. Unlikerun_parallel_tasks()(which drives a sharedProcessPoolExecutorviaas_completedand can neither detect a hung task nor survive killing one), this driver uses apebble.ProcessPool:Per-task timeout that actually fires.
timeoutis enforced by pebble from the moment a task starts executing (queued tasks are not penalized). On timeout pebble kills the worker process running the task and replaces it transparently, so a CPU-bound negotiator stuck in an infinite loop (even inside a C extension) is terminated and the remaining tasks keep running.Bounded memory. Each worker is recycled after
max_taskstasks (defaultDEFAULT_MAX_TASKS_PER_WORKER), so per-negotiation state cannot accumulate without bound. Passmax_tasks=0to reuse workers forever (fastest, but unbounded memory).Optional whole-run budget.
total_timeoutcaps the total wall-clock time across all tasks; once exceeded the pool is stopped (running workers terminated) and the remaining tasks are abandoned.Bounded scheduling window. At most
windowpayloads are serialized and in flight at once (default2 * max_workers), so a tournament with very many negotiations does not serialize them all up front.Local ufuns/negotiators. Payloads are serialized with cloudpickle, so closures, lambdas and locally defined classes work.
Tasks whose payload cannot be serialized at all are run in-process as a best-effort fallback (a warning is emitted per such task): their pure-Python infinite loops can still be interrupted via the thread-injection path, but a C-extension hang in an unpicklable task cannot be isolated and will block.
- Callbacks:
on_result(info, result, i, n)on successon_timeout(info, i, n)when the task exceededtimeouton_error(exc, info, i, n)on any other failure
Returns the number of tasks submitted.
- negmas.helpers.run_parallel_tasks(tasks: Iterable[tuple[Any, Callable[[...], Any], tuple, dict]], *, executor: ProcessPoolExecutor, timeout: float | None = None, total_timeout: float | None = None, as_completed_fn: Callable[[...], Iterable[Future]] | None = None, on_result: Callable[[Any, Any, int, int], None] | None = None, on_timeout: Callable[[Any, Future, int, int], None] | None = None, on_broken_pool: Callable[[BaseException, int, int], bool] | None = None, on_error: Callable[[BaseException, Any, int, int], None] | None = None, track: Callable[[...], Iterable] | None = None, description: str = 'Running') int[source]¶
Submit
taskstoexecutorand drive theas_completedloop.Each task is
(info, fn, args, kwargs)–infois opaque metadata round-tripped to the callbacks. The driver:submits every task to
executoriterates results via
as_completed_fn(defaults toconcurrent.futures.as_completed())applies
timeoutas a per-future result timeoutbreaks out of the loop if
total_timeoutelapsesdispatches outcomes to the callbacks; for
BrokenProcessPoolthe driver breaks the loop iffon_broken_poolreturns truthy
The driver does not shut the executor down; the caller is responsible for that (typically via a
withblock).
- negmas.helpers.run_serial_tasks(tasks: Iterable[tuple[Any, Callable[[...], Any], tuple, dict]], *, on_result: Callable[[Any, Any, int, int], None] | None = None, on_error: Callable[[BaseException, Any, int, int], None] | None = None, total_timeout: float | None = None, track: Callable[[...], Iterable] | None = None, description: str = 'Running') int[source]¶
Run
tasksserially, mirroringrun_parallel_tasks().Each task is
(info, fn, args, kwargs).infois opaque and passed back to the callbacks unchanged. Returns the number of tasks that were submitted (regardless of how many completed).
- negmas.helpers.shorten(name: str, length: int = 4, common_parts=('Mechanism', 'Negotiator', 'Agent', 'Controller', 'Acceptance', 'Component', 'Model', 'Strategy', 'Offering', 'Entity')) str[source]¶
Returns a short version of the name.
- Remarks:
Removes common parts of names in negmas like Negotiator, Agent, etc
Keeps Capital letters up to the given length
Adds some of the lowercase letters to fit the length
If the input is shorter than the length, it is returned as it is
- negmas.helpers.shorten_keys(d: dict | None) dict[source]¶
Shorten dict keys to shortest unique names.
Always uses maximum compression since dict keys are already unique, making minimal compression (which preserves more of the original key) unnecessary - the goal is to produce the shortest possible unique keys.
- Parameters:
d – The dictionary whose keys should be shortened. If None or empty, returns an empty dict.
- Returns:
A new dictionary with shortened keys and the same values.
Examples
>>> shorten_keys({"first_name": "John", "last_name": "Doe"}) {'f': 'John', 'l': 'Doe'}
- negmas.helpers.shorten_keys_in_string(s: str, max_compression: bool | None = True, max_length: int = 31) str[source]¶
Shorten keys in a string like ‘[key1=val1,key2=val2]’ to shortest unique names.
- Parameters:
s – The input string containing key=value pairs in the format ‘[key1=val1,key2=val2]’.
max_compression –
Controls how aggressively keys are shortened. - True (default): Use maximum compression immediately for shortest possible keys. - False: Use minimal compression, keeping keys more readable. - None: Adaptive mode - tries minimal compression first, then falls back to
maximum compression if the result exceeds
max_length.max_length – Maximum allowed length of the output string when
max_compression=None. If the minimally compressed string exceeds this length, maximum compression is applied. Only used whenmax_compression=None. Defaults to 31.
- Returns:
The string with shortened keys, maintaining the ‘[key=val,…]’ format.
Examples
>>> shorten_keys_in_string( ... "[first_name=John,last_name=Doe]", max_compression=True ... ) '[f=John,l=Doe]' >>> shorten_keys_in_string( ... "[first_name=John,last_name=Doe]", max_compression=False ... ) '[first_name=John,last_name=Doe]' >>> shorten_keys_in_string("[a=1,b=2]", max_compression=None, max_length=100) '[a=1,b=2]'
- negmas.helpers.shortest_unique_names(strs: list[str], sep='.', max_compression=False, guarantee_unique=False)[source]¶
Finds the shortest unique strings starting from the end of each input string based on the separator.
The final strings will only be unique if the inputs are unique.
- Parameters:
strs – A list of strings
sep – The separator used to separate fields in each string
max_compression – If True, each string will be further compressed by taking the shortest prefix that keeps the strings unique (if they were originally unique)
guarantee_unique – If given, random characters will be postfixed on strings to guarantee uniquness
Example
given [“a.b.cat”, “d.e.f”, “a.d.cat”] it will generate [“b.c”, “f”, “d.cat”] if max_compression was false and will generate [“b”, “f”, “d”] if it was True
- negmas.helpers.single_thread()[source]¶
Context manager that temporarily forces single-threaded execution.
- negmas.helpers.snake_case(s: str) str[source]¶
Converts a string from CamelCase to snake_case
Example
>>> print(snake_case("ThisIsATest")) this_is_a_test
- Parameters:
s – input string
- Returns:
converted string
- Return type:
- negmas.helpers.stable_hash(value) int[source]¶
Return a process-stable 64-bit signed integer hash of
value.Unlike the builtin
hash(), which is salted per process (PYTHONHASHSEED), this is deterministic across processes and machines. Use it whenever a hash feeds a persisted identity — arun_id, a file name, a serialized key — so the value survives across separate runs. Builtinhashthere silently breaks resuming/continuing work because the recomputed id no longer matches what was stored on disk.
- negmas.helpers.unique_name(base, add_time=True, add_host=False, rand_digits=8, sep='/') str[source]¶
Return a unique name.
Can be used to return a unique directory name on the givn base.
- Parameters:
Examples
>>> a = unique_name("") >>> len(a) == 8 + 1 + 6 + 8 + 6 True
- Returns:
The unique name.
- Return type: