from __future__ import annotations
from random import choice
from typing import TYPE_CHECKING, Callable, Literal, Sequence
from negmas.common import PreferencesChange
from negmas.gb.components import GBComponent
from negmas.preferences import (
BaseUtilityFunction,
InverseUFun,
PresortingInverseUtilityFunction,
RankOnlyUtilityFunction,
)
__all__ = ["UtilityInverter", "UtilityBasedOutcomeSetRecommender"]
if TYPE_CHECKING:
from negmas.gb import GBNegotiator, GBState
from negmas.outcomes import Outcome
from .selectors import OfferSelectorProtocol
def make_inverter(
ufun: BaseUtilityFunction,
ufun_inverter: Callable[[BaseUtilityFunction], InverseUFun] | None = None,
rank_only: bool = False,
max_cardinality: int | float = float("inf"),
) -> InverseUFun:
"""
Creates an `InverseUFun` object from the given ufun with appropriate type if the type is not given
Args:
ufun (BaseUtilityFunction): The ufun to invert
rank_only (bool): If True, only the relative ranks of outcomes will be used in inversion not the values themselves.
ufun_inverter (Callable[[BaseUtilityFunction], InverseUFun] | None): An optional factory to generate a `InverseUFun` from a `BaseUtilityFunction` .
max_cardinality (int): The maximum cardinality at which we switch to using a `SamplingInverseUtilityFunction`
"""
if rank_only:
ufun = RankOnlyUtilityFunction(ufun, randomize_equal=False, name=ufun.name)
if ufun_inverter:
return ufun_inverter(ufun)
return PresortingInverseUtilityFunction(ufun, rational_only=True)
# return (
# SamplingInverseUtilityFunction(ufun)
# if ufun.outcome_space is None
# or ufun.outcome_space.is_discrete() and ufun.outcome_space.cardinality >= max_cardinality
# else PresortingInverseUtilityFunction(ufun)
# )
[docs]
class UtilityBasedOutcomeSetRecommender(GBComponent):
"""
Recommends a set of outcome appropriate for proposal
"""
def __init__(
self,
rank_only: bool = False,
ufun_inverter: Callable[[BaseUtilityFunction], InverseUFun] | None = None,
max_cardinality: int | float = float("inf"),
eps: float = 0.0001,
inversion_method: Literal["min"]
| Literal["max"]
| Literal["one"]
| Literal["some"]
| Literal["all"] = "some",
):
super().__init__()
self._rank_only = rank_only
self._max_cardinality = max_cardinality
self._ufun_inverter = ufun_inverter
self._inv_method = None
self._inversion_method = inversion_method
self.eps = eps
self.inv: InverseUFun | None = None
self.min = self.max = self.best = None
self._inv_method = None
[docs]
def set_negotiator(self, negotiator: GBNegotiator) -> None: # type: ignore
super().set_negotiator(negotiator)
self.inv: InverseUFun | None = None
self.min = self.max = self.best = None
self._inv_method = None
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if self._negotiator is None:
raise ValueError("Unknown negotiator in a component")
ufun = self._negotiator.ufun
if not ufun:
self.inv = None
self.min = self.max = self.best = None
return
self.inv = make_inverter(
ufun, self._ufun_inverter, self._rank_only, self._max_cardinality
)
if self._inversion_method == "one":
self._inv_method = self.inv.one_in
self._single_inv_return = True
elif self._inversion_method == "min":
self._inv_method = self.inv.worst_in
self._single_inv_return = True
elif self._inversion_method == "max":
self._inv_method = self.inv.best_in
self._single_inv_return = True
elif self._inversion_method == "some":
self._inv_method = self.inv.some
self._single_inv_return = False
elif self._inversion_method == "all":
self._inv_method = self.inv.all # type: ignore
self._single_inv_return = False
else:
raise ValueError(f"Unknown selectortype: {self._inversion_method}")
_worst, self.best = ufun.extreme_outcomes()
self.min, self.max = float(ufun(_worst)), float(ufun(self.best))
if self.min < ufun.reserved_value:
self.min = ufun.reserved_value
[docs]
def before_proposing(self, state: GBState, dest: str | None = None):
if self._negotiator is None:
raise ValueError("Unknown negotiator in a component")
if self.inv is None or self.max is None or self.min is None:
# warn(
# f"It seems that on_prefrences_changed() was not called until propose for a ufun of type {self._negotiator.ufun.__class__.__name__}"
# f" for negotiator {self._negotiator.id} of type {self.__class__.__name__}",
# NegmasUnexpectedValueWarning,
# )
self.on_preferences_changed([PreferencesChange()])
if self.inv is None or self.max is None or self.min is None:
raise ValueError(
"Failed to find an invertor, a selector, or exreme outputs"
)
if not self.inv.initialized:
self.inv.init()
[docs]
def scale_utilities(self, urange: tuple[float, ...]) -> tuple[float, ...]:
"""
Scales given utilities to the range of the ufun.
Remarks:
- Assumes that the input utilities are in the range [0-1] no matter what
is the range of the ufun.
- Subtracts the `tolerance` from the first and adds it to the last utility value
which slightly enlarges the range to account for small rounding errors
"""
if self._negotiator is None:
raise ValueError("Unknown negotiator in a component")
if self.max is None or self.min is None or self.best is None:
# warn(
# f"It seems that on_prefrences_changed() was not called until propose for a ufun of type {self._negotiator.ufun.__class__.__name__}"
# f" for negotiator {self._negotiator.id} of type {self.__class__.__name__}",
# )
self.on_preferences_changed([PreferencesChange()])
if self.max is None or self.min is None:
raise ValueError("Cannot find extreme outcomes.")
adjusted = [(self.max - self.min) * _ + self.min for _ in urange]
if adjusted:
adjusted[0] -= self.eps
adjusted[-1] += self.eps
adjusted = [max(self.min, min(self.max, _)) for _ in adjusted]
assert (
adjusted[0] <= adjusted[1]
), f"{urange=} adjusted to {adjusted=} ({self.min=}, {self.max=})"
return tuple(adjusted)
@property
def tolerance(self):
return self.eps
@property
def ufun_max(self):
return self.max
@property
def ufun_min(self):
return self.min
[docs]
def __call__(
self, urange: tuple[float, float], state: GBState
) -> Sequence[Outcome]:
"""
Receives a normalized [0-> 1] utility range and returns a utility range relative to the ufun taking the tolerance _eps into account
Remarks:
- This method calls `scale_utilities` on the input range
"""
if self._negotiator is None:
raise ValueError("Unknown negotiator in a component")
urange = self.scale_utilities(urange) # type: ignore
outcomes = self._inv_method(urange, normalized=False) # type: ignore
if outcomes is None:
return []
if self._single_inv_return:
return [outcomes] # type: ignore
return outcomes
[docs]
class UtilityInverter(GBComponent):
"""
A component that can recommend an outcome based on utility
"""
[docs]
def set_negotiator(self, negotiator: GBNegotiator) -> None: # type: ignore
super().set_negotiator(negotiator)
self.recommender.set_negotiator(negotiator)
def __init__(
self,
*args,
offer_selector: OfferSelectorProtocol
| Literal["min"]
| Literal["max"]
| None = None,
**kwargs,
):
if offer_selector is None:
type_ = "one"
elif isinstance(offer_selector, Callable):
type_ = "some"
else:
type_ = offer_selector
self.recommender = UtilityBasedOutcomeSetRecommender(
*args, inversion_method=type_, **kwargs
)
self.selector: Callable[[Sequence[Outcome], GBState], Outcome | None] | None
self.selector = (
None if not isinstance(offer_selector, Callable) else offer_selector
)
self.set_negotiator(None) # type: ignore (It is OK. We do not really need to pass this at all here.)
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
self.recommender.on_preferences_changed(changes)
[docs]
def before_proposing(self, state: GBState, dest: str | None = None):
self.recommender.before_proposing(state)
[docs]
def scale_utilities(self, urange):
return self.recommender.scale_utilities(urange)
@property
def tolerance(self):
return self.recommender.eps
@property
def ufun_max(self):
return self.recommender.max
@property
def ufun_min(self):
return self.recommender.min
[docs]
def __call__(self, urange: tuple[float, float], state: GBState) -> Outcome | None:
"""
Receives a normalized [0-> 1] utility range and returns a utility range relative to the ufun taking the tolerance _eps into account
Remarks:
- This method calls `scale_utilities` on the input range
"""
outcomes = self.recommender(urange, state)
if not outcomes:
return None
if len(outcomes) == 1:
return outcomes[0]
if self.selector is None:
return choice(outcomes)
outcome = self.selector(outcomes, state)
if not outcome:
return self.recommender.best
return outcome