Source code for negmas.preferences.inv_ufun

from __future__ import annotations
import math
import random
import warnings

# from bisect import bisect_left, bisect_right
from typing import Any, Iterable  # , Sequence

import numpy as np
from numpy import floating, integer
from numpy.typing import NDArray

from negmas.outcomes import Outcome
from negmas.warnings import NegmasUnexpectedValueWarning, warn_if_slow

from .base_ufun import BaseUtilityFunction
from .protocols import InverseUFun

__all__ = ["PresortingInverseUtilityFunction", "SamplingInverseUtilityFunction"]

EPS = 1e-12


def _nearest_around(
    x: float, a: NDArray, i: int, mn: float, mx: float, n: int = 1, eps: float = EPS
) -> int | None:
    """Finds the nearest value in a to x around i subject to being between mn and mx and no more than eps from x within n items from i."""
    n = len(a) - 1
    best, best_diff = i, abs(a[i] - x)
    for j in range(i - n, i + n + 1):
        if j < 0 or j > n:
            continue
        if not (mn <= a[j] <= mx):
            continue
        d = abs(a[j] - x)
        if d < best_diff:
            best, best_diff = j, d
    if best is None:
        return i
    if abs(a[best] - x) > eps and best != i:
        return None
    return best


def index_above_or_equal(a: NDArray, x: Any, lo: int = 0, hi: int | None = None) -> int:
    "Locate the smallest value greater than or equal to x"
    if hi is None:
        hi = len(a) - 1
    i = np.searchsorted(a[lo : hi + 1], x, side="left") + lo
    return max(0, min(len(a) - 1, i))


def index_below_or_equal(a: NDArray, x: Any, lo: int = 0, hi: int | None = None) -> int:
    "Locate the greatest value less than or equal to x"
    if hi is None:
        hi = len(a) - 1
    i = np.searchsorted(a[lo : hi + 1], x, side="right") + lo
    i = max(0, min(len(a) - 1, i))
    j = max(0, i - 1)
    if abs(a[i] - x) <= abs(a[j] - x):
        return i
    return j


[docs] class SamplingInverseUtilityFunction(InverseUFun): """ A utility function inverter that uses sampling. Nothing is done during initialization so the fixed cost of this inverter is minimal. Nevertheless, each time the system is asked to find an outcome within some range, it uses random sampling which is very inefficient and suffers from the curse of dimensionality. """ def __init__(self, ufun: BaseUtilityFunction, max_samples_per_call: int = 10_000): self._ufun = ufun self.max_samples_per_call = max_samples_per_call self._initialized = True @property def initialized(self): return self._initialized @property def ufun(self): return self._ufun
[docs] def init(self): pass
[docs] def all(self, rng: float | tuple[float, float]) -> list[Outcome]: """ Finds all outcomes with in the given utility value range Args: rng: The range. If a value, outcome utilities must match it exactly Remarks: - If issues or outcomes are not None, then init_inverse will be called first - If the outcome-space is discrete, this method will return all outcomes in the given range """ raise ValueError( f"Cannot find all outcomes in a range ({rng}) using a SamplingInverseUtilityFunction. Try a PresortedInverseUtilityFunction" )
[docs] def some( self, rng: float | tuple[float, float], normalized: bool, n: int | None = None ) -> list[Outcome]: """ Finds some outcomes with the given utility value (if discrete, all) Args: rng: The range. If a value, outcome utilities must match it exactly n: The maximum number of outcomes to return Remarks: - If issues or outcomes are not None, then init_inverse will be called first - If the outcome-space is discrete, this method will return all outcomes in the given range """ if not isinstance(rng, Iterable): rng = (rng - EPS, rng + EPS) if not n: n = self.max_samples_per_call else: n *= 3 if not self._ufun.outcome_space: return [] outcomes = list(self._ufun.outcome_space.sample(n, False, False)) mn, mx = rng u = self.ufun.eval_normalized if normalized else self.ufun.eval return [_ for _ in outcomes if mn <= u(_) <= mx]
[docs] def worst_in( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: some = self.some(rng, normalized) if not isinstance(rng, Iterable): rng = (rng, rng) worst_util, worst = float("inf"), None for o in some: util = self._ufun(o) if util < worst_util: worst_util, worst = util, o return worst
[docs] def best_in( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: some = self.some(rng, normalized) if not isinstance(rng, Iterable): rng = (rng, rng) best_util, best = float("-inf"), None for o in some: util = self._ufun(o) if util < best_util: best_util, best = util, o return best
[docs] def one_in( self, rng: float | tuple[float, float], normalized: float, fallback_to_higher: bool = True, fallback_to_best: bool = True, ) -> Outcome | None: if not self._ufun.outcome_space: return None if not isinstance(rng, Iterable): rng = (rng - EPS, rng + EPS) u = self.ufun.eval_normalized if normalized else self.ufun.eval for _ in range(self.max_samples_per_call): o = list(self._ufun.outcome_space.sample(1))[0] if rng[0] <= u(o) <= rng[1]: return o if fallback_to_higher and rng[1] < 1 - EPS: return self.one_in( (rng[0], 1), normalized, fallback_to_higher=False, fallback_to_best=fallback_to_best, ) if fallback_to_best: return self._ufun.best() return None
[docs] def minmax(self) -> tuple[float, float]: """ Finds the minimum and maximum utility values that can be returned. Remarks: These may be different from the results of `ufun.minmax()` as they can be approximate. """ return self.min(), self.max()
[docs] def extreme_outcomes(self) -> tuple[Outcome | None, Outcome | None]: """ Finds the worst and best outcomes that can be returned. Remarks: These may be different from the results of `ufun.extreme_outcomes()` as they can be approximate. """ return self.worst(), self.best()
[docs] def __call__( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: """ Calling an inverse ufun directly is equivalent to calling `one_in()` """ return self.one_in(rng, normalized)
[docs] class PresortingInverseUtilityFunction(InverseUFun): """ A utility function inverter that uses pre-sorting. The outcome-space is sampled if it is continuous and enumerated if it is discrete during the call to `init()` and an ordered list of outcomes with their utility values is then cached. Args: ufun: The utility function to be inverted levels: discretization levels per issue max_cache_size: maximum allowed number of outcomes in the resulting inverse rational_only: If true, rational outcomes will be sorted but irrational outcomes will not be sorted (should be faster if the reserved value is high) n_waypoints: Used to speedup sampling outcomes at given utilities. The larger, the slower init() will be but the faster worst_in() and best_in() eps: Absolute difference between utility values to consider them equal (zero or negative to disable). rel_eps: Relative difference between utility values to consider them equal (zero or negative to disable). Remarks: - The actual limit used to judge ufun equality is max(eps, rel_eps * range) where range is the difference between max and min utilities for rational outcomes. Set both eps, rel_eps to zero or a negative number to disable cycling through outcomes with equal utilities (or set cycle=False when calling the appropriate function). """ def __init__( self, ufun: BaseUtilityFunction, levels: int = 10, max_cache_size: int = 10_000_000_000, rational_only: bool = False, n_waypints: int = 10, eps: float = 1e-12, rel_eps: float = 1e-6, ): self._ufun = ufun self.max_cache_size = max_cache_size self.levels = levels self._initialized = False self.outcomes: list[Outcome] = [] self._last_rational: int = -1 self.utils: NDArray[floating[Any]] = [] # type: ignore self.rational_only = rational_only self._waypoints: NDArray[integer[Any]] = [] # type: ignore self._waypoint_values: NDArray[floating[Any]] = [] # type: ignore self.__nwaypoints = n_waypints self._smallest_indx, self._smallest_val = 0, float("inf") self._largest_indx, self._largest_val = -1, float("-inf") self._last_returned_from_next: int = -1 self._eps = eps self._rel_eps = rel_eps self._near_range: dict[int, tuple[int, int]] = dict() @property def initialized(self): return self._initialized @property def ufun(self): return self._ufun
[docs] def reset(self) -> None: self._initialized = False self.outcomes, self.utils = [], [] # type: ignore self._waypoints, self._waypoint_values = [], [] # type: ignore
[docs] def init(self): outcome_space = self._ufun.outcome_space if outcome_space is None: raise ValueError("Cannot find the outcome space.") self._worst, self._best = self._ufun.extreme_outcomes() self._min, self._max = ( float(self._ufun(self._worst)), float(self._ufun(self._best)), ) self._range = self._max - self._min self._offset = self._min / self._range if self._range > EPS else self._min for L in range(self.levels, 0, -1): n = outcome_space.cardinality_if_discretized(L) if n <= self.max_cache_size: break else: raise ValueError( f"Cannot discretize keeping cache size at {self.max_cache_size}. Outcome space cardinality is {outcome_space.cardinality}\nOutcome space: {outcome_space}" ) os = outcome_space.to_discrete(levels=L, max_cardinality=self.max_cache_size) outcomes = list(os.enumerate_or_sample(max_cardinality=self.max_cache_size)) utils = [float(self._ufun.eval(_)) for _ in outcomes] # x = len(utils) warn_if_slow( len(utils), "Inverting a large utility function", lambda x: (x * math.log2(x)) if x else x, ) r = self._ufun.reserved_value r = float(r) if r is not None else float("-inf") if self.rational_only: rational, irrational = [], [] ur, uir = [], [] for u, o in zip(utils, outcomes): if u >= r: rational.append(o) ur.append(u) else: irrational.append(o) uir.append(u) else: rational, irrational = outcomes, [] ur, uir = utils, [] ur, uir = ( np.asarray(ur, dtype=float).flatten(), np.asarray(uir, dtype=float).flatten(), ) indices = np.argsort(ur) ur_sorted = ur[indices] if len(ur_sorted) > 0: eps = max(self._eps, self._rel_eps * (ur_sorted[-1] - ur_sorted[0])) else: eps = -1 if eps > 0: try: n = len(ur_sorted) if n >= 2: scaled = np.asarray(ur_sorted / eps, dtype=int) diffs = np.diff(scaled) != 0 indexes = np.nonzero(diffs)[0] + 1 groups = np.split(scaled, indexes) lengths = np.asarray([_.size for _ in groups], dtype=int) starts = np.hstack((np.asarray([0], dtype=int), indexes)) ends = starts + lengths - 1 extended = np.nonzero(lengths > 1)[0] starts, ends = starts[extended], ends[extended] for mn, mx in zip(starts, ends): # assert scaled[mn] == scaled[mx], f"{scaled[mn]=}, {scaled[mx]=}" for indx in range(mn, mx + 1): # assert scaled[indx] == scaled[mx], f"{scaled[indx]=}, {scaled[mx]=}" self._near_range[indx] = (mn, mx) except Exception: pass # for indx, current in enumerate(scaled): # mn_indx = indx # for i in range(indx - 1, -1, -1): # u = scaled[i] # if current != scaled[i]: # mn_indx = i + 1 # break # mx_indx = indx # for i in range(indx + 1, n): # u = scaled[i] # if current != scaled[i]: # mx_indx = i - 1 # break # if mn_indx < mx_indx: # self._near_range[indx] = (mn_indx, mx_indx) # ordered_outcomes = sorted(zip(ur, rational, strict=True)) # if irrational: # ordered_outcomes += list(zip(uir, irrational, strict=True)) self._initialized = True self._last_rational = len(rational) - 1 # save waypoints within the sorted outcomes list with known utilities. # Used to limit the lo, hi limits when doing bisection later self.__nwaypoints = min(self.__nwaypoints, self._last_rational + 1) waypoints: NDArray[integer[Any]] = ( np.asarray( np.linspace(0, self._last_rational, self.__nwaypoints, endpoint=True), dtype=int, ) if self.__nwaypoints > 0 else np.empty(0, dtype=int) ) waypoint_values: NDArray[floating[Any]] = ( ur_sorted[waypoints] if self.__nwaypoints > 0 else np.empty(0, dtype=float) ) assert not any( _ is None for _ in waypoints ), f"{waypoints=}\n{waypoint_values}\n{self._last_rational}" assert not any( a < b for a, b in zip(waypoints[1:], waypoints[:-1]) ), f"{waypoints=}\n{waypoint_values}\n{self._last_rational}" assert not any( a < b for a, b in zip(waypoint_values[1:], waypoint_values[:-1]) ), f"{waypoints=}\n{waypoint_values}\n{self._last_rational}" self._waypoints, self._waypoint_values = waypoints, waypoint_values self.outcomes = [rational[_] for _ in indices] + irrational self.utils = np.hstack((ur_sorted, uir)) self._smallest_indx, self._smallest_val = 0, self.utils[0] self._largest_indx, self._largest_val = len(self.utils) - 1, self.utils[-1]
def _un_normalize_range( self, rng: float | tuple[float, float], normalized: bool, for_best: bool ) -> tuple[float, float]: if not isinstance(rng, Iterable): rng = (rng - EPS, rng + EPS) if not normalized: return rng mn, mx = self._min, self._max d = mx - mn if d < EPS: return tuple(0.0 if not for_best else 1.0 for _ in rng) return tuple(_ * d + mn for _ in rng) def _get_limiting_waypoints(self, mn: float, mx: float) -> tuple[int, int]: """Returns indices of largest utility <= mn and the smallest utility >= mx in self._utils""" nw = len(self._waypoints) n = len(self.utils) if nw <= 0: return (-1, -1) lo, hi = self._waypoints[0], self._waypoints[-1] # lo_val, hi_val = self._waypoint_values[0], self._waypoint_values[-1] for j in range(nw): i, u = self._waypoints[j], self._waypoint_values[j] if u == mn: lo = i break if u > mn: lo = self._waypoints[max(0, j - 1)] break else: assert mn > self._waypoint_values[-1] lo = n - 1 for j in range(nw - 1, -1, -1): i, u = self._waypoints[j], self._waypoint_values[j] if u == mx: hi = i break if u < mx: hi = self._waypoints[min(nw - 1, j + 1)] break else: assert mx < self._waypoint_values[0] hi = 0 # adjust limits to known largest and smallest. This will be specially useful for # strategies that call worst_in or best_in repeatedly with descending/ascending values if self._smallest_val <= mn and self._smallest_indx > lo: lo = self._smallest_indx if self._largest_val >= mx and self._largest_indx < hi: hi = self._largest_indx return lo, hi
[docs] def next_worse(self) -> Outcome | None: """Returns the rational outcome with utility just below the last one returned from this function""" if self._last_returned_from_next < 0: self._last_returned_from_next = self._last_rational return self.best() if self._last_returned_from_next > 0: self._last_returned_from_next -= 1 return self.outcomes[self._last_returned_from_next] return None
[docs] def next_better(self) -> Outcome | None: """Returns the rational outcome with utility just above the last one returned from this function""" if self._last_returned_from_next < 0: self._last_returned_from_next = 0 return self.worst() if self._last_returned_from_next < self._last_rational: self._last_returned_from_next += 1 return self.outcomes[self._last_returned_from_next] return None
[docs] def some( self, rng: float | tuple[float, float], normalized: bool, n: int | None = None ) -> list[Outcome]: """ Finds some outcomes with the given utility value (if discrete, all) Args: rng: The range. If a value, outcome utilities must match it exactly normalized: if given, consider the range as a normalized range betwwen 0 and 1 representing lowest and highest utilities. n: The maximum number of outcomes to return Remarks: - If issues or outcomes are not None, then init_inverse will be called first - If the outcome-space is discrete, this method will return all outcomes in the given range """ if not self._ufun.is_stationary(): self.init() if len(self._waypoints) <= 0: return [] rng = self._un_normalize_range(rng, normalized, False) mn, mx = rng lo, hi = self._get_limiting_waypoints(mn, mx) if lo > hi: return [] results = [] for util, w in zip(self.utils[lo : hi + 1], self.outcomes[lo : hi + 1]): if util > mx: break if util < mn: continue results.append(w) if n and len(results) >= n: return random.sample(results, n) return results
[docs] def all(self, rng: float | tuple[float, float], normalized: bool) -> list[Outcome]: """ Finds all outcomes with in the given utility value range Args: rng: The range. If a value, outcome utilities must match it exactly Remarks: - If issues or outcomes are not None, then init_inverse will be called first - If the outcome-space is discrete, this method will return all outcomes in the given range """ os_ = self._ufun.outcome_space if not os_: raise ValueError("Unknown outcome space. Cannot invert the ufun") if os_.is_discrete(): return self.some(rng, normalized) raise ValueError( "Cannot find all outcomes in a range for a continuous outcome space (there is in general an infinite number of them)" )
def _indx_of_worst_in(self, rng: tuple[float, float] | float, normalized: bool): if not self._ufun.is_stationary(): self.init() rng = self._un_normalize_range(rng, normalized, False) mn, mx = rng lo, hi = self._get_limiting_waypoints(mn, mx) return index_above_or_equal(self.utils, mn, lo, hi), mn, mx def _indx_of_best_in(self, rng: tuple[float, float] | float, normalized: bool): if not self._ufun.is_stationary(): self.init() rng = self._un_normalize_range(rng, normalized, True) mn, mx = rng lo, hi = self._get_limiting_waypoints(mn, mx) return index_below_or_equal(self.utils, mx, lo, hi), mn, mx
[docs] def worst_in( self, rng: float | tuple[float, float], normalized: bool, cycle: bool = True, eps: float = EPS, ) -> Outcome | None: """ Finds an outcome with the lowest possible utility within the given range (if any) Args: rng: Range of utilities to find the outcome within normalized: If true, the range will be interpreted as on a scaled ufun with range (0-1) cycle: If given, the system will cycle between outcomes with the same utility value (up to `eps`) Remarks: - Returns None if no such outcome exists, or the worst outcome in the given range is irrational - This is an O(log(n)) operation (where n is the number of outcomes) """ indx_, mn, mx = self._indx_of_worst_in(rng, normalized) if indx_ > self._last_rational: # fail if this worst is irrational return None indx = _nearest_around( mn, self.utils, indx_, mn - eps, mx + 2 * eps, eps=2 * eps ) # if indx is None or (mx > mn + EPS and self.utils[indx] > mx + 2 * eps): # # fail if the found outcome is actually worse than the maximum allowed. Should never happen # raise ValueError( # f"worst_in failed to find an appropriate outcome: {rng=} with initial find at {indx_} but we found an outcome with utility {self.utils[indx] if indx is not None else self.utils}" # ) if mn < self._smallest_val: self._smallest_indx, self._smallest_val = indx, mn if cycle and indx: self._cycle_around(indx) return self.outcomes[indx]
[docs] def best_in( self, rng: float | tuple[float, float], normalized: bool, cycle: bool = True, eps: float = EPS, ) -> Outcome | None: """ Finds an outcome with the highest possible utility within the given range (if any) Args: rng: Range of utilities to find the outcome within normalized: If true, the range will be interpreted as on a scaled ufun with range (0-1) cycle: If given, the system will cycle between outcomes with the same utility value (up to `eps`) Remarks: - Returns None if no such outcome exists - This is an O(log(n)) operation (where n is the number of outcomes) """ indx_, mn, mx = self._indx_of_best_in(rng, normalized) if indx_ < 0: indx_ = 0 indx = _nearest_around( mx, self.utils, indx_, mn - eps, mx + 2 * eps, eps=2 * eps ) # if indx is None or (mn < mx - eps and self.utils[indx] < mn - 2 * eps): # raise ValueError( # f"best_in failed to find an appropriate outcome: {rng=} with initial find at {indx_} but we found an outcome with utility {self.utils[indx] if indx is not None else self.utils}" # ) if mx > self._largest_val: self._largest_indx, self._largest_val = indx, mx if cycle and indx: self._cycle_around(indx) return self.outcomes[indx]
def _in(self, x, rng): return rng[0] - EPS <= x <= rng[1] + EPS
[docs] def one_in( self, rng: tuple[float, float], normalized: bool, fallback_to_higher: bool = True, fallback_to_best: bool = True, ) -> Outcome | None: """ Finds an outcome within the given range of utilities. Remarks: - This is an O(log(n)) operation (where n is the number of outcomes) - We only search rational outcomes """ mx, rmn, rmx = self._indx_of_worst_in(rng, normalized) if mx > self._last_rational or not self._in(self.utils[mx], (rmn, rmx)): return None mn, rmn, rmx = self._indx_of_worst_in(rng, normalized) if mn < 0 or not self._in(self.utils[mn], (rmn, rmx)): return None if mn < 0: mn = 0 if mx > self._last_rational: mx = self._last_rational def recover_from_failure(): if fallback_to_higher and rng[1] < 1 - EPS: return self.one_in( (rng[0], 1 if normalized else float(self._ufun.max())), normalized, fallback_to_higher=False, fallback_to_best=fallback_to_best, ) if fallback_to_best: return self._best return None if mx < mn: # TODO: Something is wrong here. When using stochastic aspiration mn > mx a lot and this leads to strange behavior. warnings.warn( f"Utility Inverter: {mx=}, {mn=} but we expect mn < mx. Could not find any outcomes in the range given", NegmasUnexpectedValueWarning, ) return recover_from_failure() # return self.outcomes[min(mx, mn)] # mx, mn = mn, mx if mn == mx: mn = int(mn) return self.outcomes[mn] indx = random.randint(mn, mx) return self.outcomes[indx]
def _cycle_around(self, indx: int) -> None: mn_indx, mx_indx = self._near_range.get(indx, (indx, indx)) if mn_indx < mx_indx: np.roll(self.utils[mn_indx : mx_indx + 1], 1)
[docs] def within_fractions(self, rng: tuple[float, float]) -> list[Outcome]: """ Finds outcomes within the given fractions of utility values (the fractions must be between zero and one). Remarks: - This is an O(n) operation (where n is the number of outcomes) """ if not self._ufun.is_stationary(): self.init() n = self._last_rational + 1 rng = (rng[0] * n, rng[1] * n) rng = ( max(self._last_rational - math.floor(rng[1]), 0), min(n, self._last_rational - math.floor(rng[0])), ) return list(reversed(self.outcomes[int(rng[0]) : int(rng[1]) + 1]))
[docs] def within_indices(self, rng: tuple[int, int]) -> list[Outcome]: """ Finds outcomes within the given indices with the best at index 0 and the worst at largest index. Remarks: - Works only for discrete outcome spaces """ if not self._ufun.is_stationary(): self.init() n = self._last_rational + 1 rng = (self._last_rational - rng[1], self._last_rational - rng[0]) rng = (max(rng[0], 0), min(rng[1], n)) return list(reversed(self.outcomes[rng[0] : rng[1] + 1]))
[docs] def min(self) -> float: """ Finds the minimum utility value (of the rational outcomes if constructed with `sort_rational_only`) """ if not self._ufun.is_stationary(): self.init() return self.utils[0] if self._last_rational >= 0 else float("inf")
[docs] def max(self) -> float: """ Finds the maximum rational utility value (of the rational outcomes if constructed with `sort_rational_only`) """ if not self._ufun.is_stationary(): self.init() return ( self.utils[self._last_rational] if self._last_rational >= 0 else float("-inf") )
[docs] def worst(self) -> Outcome | None: """ Finds the worst outcome (of the rational outcomes if constructed with `sort_rational_only`) """ if not self._ufun.is_stationary(): self.init() return self.outcomes[0] if self._last_rational >= 0 else None
[docs] def best(self) -> Outcome | None: """ Finds the best outcome (of the rational outcomes if constructed with `sort_rational_only`) """ if not self._ufun.is_stationary(): self.init() return self.outcomes[self._last_rational] if self._last_rational >= 0 else None
[docs] def minmax(self) -> tuple[float, float]: """ Finds the minimum and maximum utility values that can be returned. Remarks: - These may be different from the results of `ufun.minmax()` as they can be approximate. - Will only consider rational outcomes if constructed with `sort_rational_only` """ return self.min(), self.max()
[docs] def extreme_outcomes(self) -> tuple[Outcome | None, Outcome | None]: """ Finds the worst and best outcomes that can be returned. Remarks: These may be different from the results of `ufun.extreme_outcomes()` as they can be approximate. """ return self.worst(), self.best()
[docs] def __call__( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: """ Calling an inverse ufun directly is equivalent to calling `one_in()` """ return self.one_in(rng, normalized)
[docs] def outcome_at(self, indx: int) -> Outcome | None: n = len(self.outcomes) if indx >= n: return None if indx <= self._last_rational: return self.outcomes[self._last_rational - indx] return self.outcomes[indx]
[docs] def utility_at(self, indx: int) -> float: n = len(self.outcomes) if indx >= n: return float("-inf") if indx <= self._last_rational: return self.utils[self._last_rational - indx] return self.utils[indx]
class PresortingInverseUtilityFunctionBruteForce(InverseUFun): """ A utility function inverter that uses pre-sorting. The outcome-space is sampled if it is continuous and enumerated if it is discrete during the call to `init()` and an ordered list of outcomes with their utility values is then cached. Args: ufun: The utility function to be inverted levels: discretization levels per issue max_cache_size: maximum allowed number of outcomes in the resulting inverse sort_rational_only: If true, rational outcomes will be sorted but irrational outcomes will not be sorted (should be faster if the reserved value is high) """ def __init__( self, ufun: BaseUtilityFunction, levels: int = 10, max_cache_size: int = 10_000_000_000, rational_only: bool = False, ): self._ufun = ufun self.max_cache_size = max_cache_size self.levels = levels self._initialized = False self._ordered_outcomes: list[tuple[float, Outcome]] = [] self.rational_only = rational_only @property def initialized(self): return self._initialized @property def ufun(self): return self._ufun def reset(self) -> None: self._initialized = False self._orddered_outcomes = [] def init(self): outcome_space = self._ufun.outcome_space if outcome_space is None: raise ValueError("Cannot find the outcome space.") self._worst, self._best = self._ufun.extreme_outcomes() self._min, self._max = ( float(self._ufun(self._worst)), float(self._ufun(self._best)), ) self._range = self._max - self._min self._offset = self._min / self._range if self._range > EPS else self._min for L in range(self.levels, 0, -1): n = outcome_space.cardinality_if_discretized(L) if n <= self.max_cache_size: break else: raise ValueError( f"Cannot discretize keeping cache size at {self.max_cache_size}. " f"Outcome space cardinality is {outcome_space.cardinality}\nOutcome space: {outcome_space}" ) os = outcome_space.to_discrete(levels=L, max_cardinality=self.max_cache_size) outcomes = os.enumerate_or_sample(max_cardinality=self.max_cache_size) utils = [float(self._ufun.eval(_)) for _ in outcomes] warn_if_slow( len(utils), "Inverting a large utility function", lambda x: (x * math.log2(x)) if x else x, ) r = self._ufun.reserved_value r = float(r) if r is not None else float("-inf") if self.rational_only: rational, irrational = [], [] ur, uir = [], [] for u, o in zip(utils, outcomes): if u >= r: rational.append(o) ur.append(u) else: irrational.append(o) uir.append(u) else: rational, irrational = list(outcomes), [] ur, uir = utils, [] self._ordered_outcomes = sorted( zip(ur, rational, strict=True), key=lambda x: -x[0] ) self._last_rational = len(rational) - 1 if irrational: self._ordered_outcomes += list(zip(uir, irrational, strict=True)) self._initialized = True self._last_returned_from_next: int = -1 def _un_normalize_range( self, rng: float | tuple[float, float], normalized: bool, for_best: bool ) -> tuple[float, float]: if not isinstance(rng, Iterable): rng = (rng - EPS, rng + EPS) if not normalized: return rng mn, mx = self._min, self._max d = mx - mn if d < EPS: return tuple(0.0 if not for_best else 1.0 for _ in rng) return tuple(_ * d + mn for _ in rng) def _normalize_range( self, rng: float | tuple[float, float], normalized: bool, for_best: bool ) -> tuple[float, float]: if not isinstance(rng, Iterable): rng = (rng - EPS, rng + EPS) if not normalized: return rng mn, mx = self._min, self._max d = mx - mn if d < EPS: return tuple(1.0 if for_best else 0.0 for _ in rng) return tuple(_ * d + mn for _ in rng) def next_worse(self) -> Outcome | None: """Returns the rational outcome with utility just below the last one returned from this function""" if self._last_returned_from_next < 0: self._last_returned_from_next = self._last_rational return self.best() if self._last_returned_from_next > 0: self._last_returned_from_next -= 1 return self._ordered_outcomes[self._last_returned_from_next][1] return None def next_better(self) -> Outcome | None: """Returns the rational outcome with utility just above the last one returned from this function""" if self._last_returned_from_next < 0: self._last_returned_from_next = 0 return self.worst() if self._last_returned_from_next < self._last_rational: self._last_returned_from_next += 1 return self._ordered_outcomes[self._last_returned_from_next][1] return None def some( self, rng: float | tuple[float, float], normalized: bool, n: int | None = None, fallback_to_higher: bool = True, fallback_to_best: bool = True, ) -> list[Outcome]: """ Finds some outcomes with the given utility value (if discrete, all) Args: rng: The range. If a value, outcome utilities must match it exactly normalized: if given, consider the range as a normalized range betwwen 0 and 1 representing lowest and highest utilities. n: The maximum number of outcomes to return Remarks: - If issues or outcomes are not None, then init_inverse will be called first - If the outcome-space is discrete, this method will return all outcomes in the given range """ rmx = rng[1] if isinstance(rng, Iterable) else rng rmn = rng[0] if isinstance(rng, Iterable) else rng def recover_from_failure(): if fallback_to_higher and rmx < 1 - EPS: return self.some( (rmn, 1 if normalized else float(self._ufun.max())), normalized, fallback_to_higher=False, fallback_to_best=fallback_to_best, ) if fallback_to_best: return [self._ufun.best()] return [] if not self._ufun.is_stationary(): self.init() rng = self._normalize_range(rng, normalized, True) mn, mx = rng # todo use bisection results = [] for util, w in self._ordered_outcomes: if util > mx: continue if util < mn: break results.append(w) if n and len(results) >= n: return random.sample(results, n) if not results: return recover_from_failure() return results def all(self, rng: float | tuple[float, float], normalized: bool) -> list[Outcome]: """ Finds all outcomes with in the given utility value range Args: rng: The range. If a value, outcome utilities must match it exactly Remarks: - If issues or outcomes are not None, then init_inverse will be called first - If the outcome-space is discrete, this method will return all outcomes in the given range """ os_ = self._ufun.outcome_space if not os_: raise ValueError("Unknown outcome space. Cannot invert the ufun") if os_.is_discrete(): return self.some( rng, normalized, fallback_to_higher=False, fallback_to_best=False ) raise ValueError( "Cannot find all outcomes in a range for a continuous outcome space (there is in general an infinite number of them)" ) def worst_in( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: if not self._ufun.is_stationary(): self.init() rng = self._un_normalize_range(rng, normalized, False) mn, mx = rng if not self._ordered_outcomes: return None wbefore = None for util, w in self._ordered_outcomes[: self._last_rational + 1]: if util > mx: continue if util < mn: return wbefore wbefore = w return wbefore def best_in( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: if not self._ufun.is_stationary(): self.init() rng = self._un_normalize_range(rng, normalized, True) mn, mx = rng for util, w in self._ordered_outcomes[: self._last_rational + 1]: if util > mx + EPS: continue if util < mn - EPS: return None return w return None def one_in( self, rng: float | tuple[float, float], normalized: bool, fallback_to_higher: bool = True, fallback_to_best: bool = True, ) -> Outcome | None: lst = self.some( rng, normalized, fallback_to_higher=fallback_to_higher, fallback_to_best=fallback_to_best, ) if not lst: return None if len(lst) == 1: return lst[0] return lst[random.randint(0, len(lst) - 1)] def within_fractions(self, rng: tuple[float, float]) -> list[Outcome]: """ Finds outcomes within the given fractions of utility values (the fractions must be between zero and one) """ if not self._ufun.is_stationary(): self.init() n = self._last_rational rng = (max(rng[0] * n, 0), min(rng[1] * n, n)) return [_[1] for _ in self._ordered_outcomes[int(rng[0]) : int(rng[1]) + 1]] def within_indices(self, rng: tuple[int, int]) -> list[Outcome]: """ Finds outocmes within the given indices with the best at index 0 and the worst at largest index. Remarks: - Works only for discrete outcome spaces """ if not self._ufun.is_stationary(): self.init() n = self._last_rational + 1 rng = (max(rng[0], 0), min(rng[1], n)) return [_[1] for _ in self._ordered_outcomes[rng[0] : rng[1] + 1]] def min(self) -> float: """ Finds the minimum utility value """ if not self._ufun.is_stationary(): self.init() if not self._ordered_outcomes: raise ValueError("No outcomes to find the best") return ( self._ordered_outcomes[self._last_rational][0] if self._last_rational >= 0 else float("inf") ) def max(self) -> float: """ Finds the maximum utility value """ if not self._ufun.is_stationary(): self.init() if not self._ordered_outcomes: raise ValueError("No outcomes to find the best") return ( self._ordered_outcomes[0][0] if self._last_rational >= 0 else float("-inf") ) def worst(self) -> Outcome | None: """ Finds the worst outcome """ if not self._ufun.is_stationary(): self.init() if not self._ordered_outcomes: raise ValueError("No outcomes to find the best") return ( self._ordered_outcomes[self._last_rational][1] if self._last_rational >= 0 else None ) def best(self) -> Outcome | None: """ Finds the best outcome """ if not self._ufun.is_stationary(): self.init() if not self._ordered_outcomes: raise ValueError("No outcomes to find the best") return self._ordered_outcomes[0][1] if self._last_rational >= 0 else None def minmax(self) -> tuple[float, float]: """ Finds the minimum and maximum utility values that can be returned. Remarks: These may be different from the results of `ufun.minmax()` as they can be approximate. """ return self.min(), self.max() def extreme_outcomes(self) -> tuple[Outcome | None, Outcome | None]: """ Finds the worst and best outcomes that can be returned. Remarks: These may be different from the results of `ufun.extreme_outcomes()` as they can be approximate. """ return self.worst(), self.best() def __call__( self, rng: float | tuple[float, float], normalized: bool ) -> Outcome | None: """ Calling an inverse ufun directly is equivalent to calling `one_in()` """ return self.one_in(rng, normalized) def outcome_at(self, indx: int) -> Outcome | None: if indx >= len(self._ordered_outcomes): return None return self._ordered_outcomes[indx][1] def utility_at(self, indx: int) -> float: if indx >= len(self._ordered_outcomes): return float("-inf") return self._ordered_outcomes[indx][0]