Source code for negmas.gb.components.selectors

from __future__ import annotations
from abc import abstractmethod
from random import choice
from typing import TYPE_CHECKING, Callable, Protocol, Sequence
from warnings import warn

import negmas.warnings as warnings
from negmas.gb.components import GBComponent
from negmas.outcomes.outcome_ops import generalized_minkowski_distance, min_dist
from negmas.preferences import (
    BaseUtilityFunction,
    InverseUFun,
    PresortingInverseUtilityFunction,
    RankOnlyUtilityFunction,
)

if TYPE_CHECKING:
    from negmas.gb import GBState
    from negmas.outcomes import Outcome
    from negmas.outcomes.outcome_space import DistanceFun


__all__ = [
    "OfferSelectorProtocol",
    "OfferSelector",
    "RandomOfferSelector",
    "BestOfferSelector",
    "MedianOfferSelector",
    "WorstOfferSelector",
    "OfferOrientedSelector",
    "FirstOfferOrientedSelector",
    "LastOfferOrientedSelector",
    "BestOfferOrientedSelector",
    "OutcomeSetOrientedSelector",
    "PartnerOffersOrientedSelector",
    "MultiplicativePartnerOffersOrientedSelector",
    "AdditivePartnerOffersOrientedSelector",
]


def additive_score(
    outcomes: Sequence[Outcome],
    partner_offers: Sequence[Outcome],
    ufun: BaseUtilityFunction,
    distance_fun: DistanceFun = generalized_minkowski_distance,
    u_weight: float = 0.5,
    **kwargs,
) -> Sequence[tuple[float, Outcome]]:
    """
    Selects the outcome that maximizes the weightd sum of utility value and distance to the partner offers (with `u_weight` weighing the utility value)

    See Also:

        `min_dist` , `diff`
    """
    utils = [float(ufun(_)) for _ in outcomes]
    dists = [
        min_dist(
            w, partner_offers, ufun.outcome_space, distance_fun=distance_fun, **kwargs
        )
        for w in outcomes
    ]
    max_dist = max(dists)
    if abs(max_dist) < 1e-8:
        scores = sorted(zip(utils, outcomes), reverse=True)
        return scores[0][1]
    dists = [(max_dist - _) / max_dist for _ in dists]
    scores = [
        (u * u_weight + (1 - u_weight) * d, w)
        for (u, d, w) in zip(utils, dists, outcomes)
    ]
    return scores


def multiplicative_score(
    outcomes: Sequence[Outcome],
    partner_offers: Sequence[Outcome],
    ufun: BaseUtilityFunction,
    distance_fun: DistanceFun = generalized_minkowski_distance,
    **kwargs,
) -> Sequence[tuple[float, Outcome]]:
    """
    Selects the outcome that maximizes the product of utility value and distance to the partner offers.

    See Also:

        `min_dist` , `diff`
    """
    utils = [float(ufun(_)) for _ in outcomes]
    dists = [
        min_dist(
            w, partner_offers, ufun.outcome_space, distance_fun=distance_fun, **kwargs
        )
        for w in outcomes
    ]
    max_dist = max(dists)
    if abs(max_dist) < 1e-8:
        scores = sorted(zip(utils, outcomes), reverse=True)
        return scores[0][1]
    dists = [(max_dist - _) / max_dist for _ in dists]
    scores = [(u * d, w) for (u, d, w) in zip(utils, dists, outcomes)]
    return scores


def make_inverter(
    ufun: BaseUtilityFunction,
    ufun_inverter: Callable[[BaseUtilityFunction], InverseUFun] | None = None,
    rank_only: bool = False,
    max_cardinality: int = 10_000,
) -> InverseUFun:
    """
    Creates an `InverseUFun` object from the given ufun with appropriate type if the type is not given

    Args:
        ufun (BaseUtilityFunction): The ufun to invert
        rank_only (bool): If True, only the relative ranks of outcomes will be used in inversion not the values themselves.
        ufun_inverter (Callable[[BaseUtilityFunction], InverseUFun] | None): An optional factory to generate a `InverseUFun` from a `BaseUtilityFunction` .
        max_cardinality (int): The maximum cardinality at which we switch to using a `SamplingInverseUtilityFunction`
    """
    if rank_only:
        ufun = RankOnlyUtilityFunction(ufun, randomize_equal=False, name=ufun.name)
    if ufun_inverter:
        return ufun_inverter(ufun)
    return PresortingInverseUtilityFunction(ufun, max_cache_size=max_cardinality)


class OfferFilterProtocol(Protocol):
    """
    Can select *the best* offers in some  sense from a list of offers based on an inverter
    """

    def __call__(
        self, outcomes: Sequence[Outcome], state: GBState
    ) -> Sequence[Outcome]:
        ...


def NoFiltering(outcomes: Sequence[Outcome], state: GBState) -> Sequence[Outcome]:
    return outcomes


def KeepFirst(outcomes: Sequence[Outcome], state: GBState) -> Sequence[Outcome]:
    return [] if not outcomes else [outcomes[0]]


def KeepLast(outcomes: Sequence[Outcome], state: GBState) -> Sequence[Outcome]:
    return [] if not outcomes else [outcomes[-1]]


[docs] class OfferSelectorProtocol(Protocol): """ Can select *the best* offer in some sense from a list of offers based on an inverter """
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: ...
[docs] class OfferSelector(OfferSelectorProtocol, GBComponent): """ Can select *the best* offer in some sense from a list of offers based on an inverter """
[docs] @abstractmethod def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: ...
[docs] class RandomOfferSelector(OfferSelector):
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: if not outcomes: return None return choice(outcomes)
[docs] class BestOfferSelector(OfferSelector):
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: if not self._negotiator: warn( "Asked to select an outcome with unkonwn negotiator", warnings.NegmasUnexpectedValueWarning, ) return None if not self._negotiator.ufun: warn( "Asked to select an outcome with unkonwn utility function", warnings.NegmasUnexpectedValueWarning, ) return None if not outcomes: return None return sorted(((self._negotiator.ufun(_), _) for _ in outcomes), reverse=True)[ 0 ][1]
[docs] class MedianOfferSelector(OfferSelector):
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: if not self._negotiator: warn( "Asked to select an outcome with unkonwn negotiator", warnings.NegmasUnexpectedValueWarning, ) return None if not self._negotiator.ufun: warn( "Asked to select an outcome with unkonwn utility function", warnings.NegmasUnexpectedValueWarning, ) return None if not outcomes: return None ordered = sorted( ((self._negotiator.ufun(_), _) for _ in outcomes), reverse=False ) return ordered[len(ordered) // 2][1]
[docs] class WorstOfferSelector(OfferSelector):
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: if not self._negotiator: warn( "Asked to select an outcome with unkonwn negotiator", warnings.NegmasUnexpectedValueWarning, ) return None if not self._negotiator.ufun: warn( "Asked to select an outcome with unkonwn utility function", warnings.NegmasUnexpectedValueWarning, ) return None if not outcomes: return None return sorted(((self._negotiator.ufun(_), _) for _ in outcomes), reverse=False)[ 0 ][1]
[docs] class OfferOrientedSelector(OfferSelector): """ Selects the nearest outcome to the pivot outcome which is updated before responding """ def __init__( self, distance_fun: DistanceFun = generalized_minkowski_distance, **kwargs ): self._pivot: Outcome | None = None self._distance_fun = distance_fun self._distance_fun_params = kwargs
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: if not self._negotiator or not self._negotiator.ufun: raise ValueError("Unknown ufun or negotiator") if not self._pivot: return choice(outcomes) nearest, ndist = None, float("inf") for o in outcomes: d = self._distance_fun( o, self._pivot, self._negotiator.ufun.outcome_space, **self._distance_fun_params, ) if d < ndist: nearest, ndist = o, d return nearest
[docs] class FirstOfferOrientedSelector(OfferOrientedSelector): """ Selects the offer nearest the partner's first offer """
[docs] def before_responding( self, state: GBState, offer: Outcome | None, source: str | None = None ): if self._pivot or offer is None: return self._pivot = offer
[docs] class LastOfferOrientedSelector(OfferOrientedSelector): """ Selects the offer nearest the partner's last offer """
[docs] def before_responding( self, state: GBState, offer: Outcome | None, source: str | None = None ): if offer is None: return self._pivot = offer
[docs] class BestOfferOrientedSelector(OfferOrientedSelector): """ Selects the offer nearest the partner's best offer for me so far """ _pivot_util: float = float("-inf")
[docs] def before_responding( self, state: GBState, offer: Outcome | None, source: str | None = None ): if offer is None: return if not self._negotiator or not self._negotiator.ufun: raise ValueError("Unknown ufun or negotiator") u = self._negotiator.ufun(offer) if u is None: return u = float(u) if u > self._pivot_util: self._pivot_util = u self._pivot = offer self._pivot_util = u
[docs] class OutcomeSetOrientedSelector(OfferSelector): """ Selects the nearest outcome to a set of pivot outcomes which is updated before responding """ def __init__( self, distance_fun: DistanceFun = generalized_minkowski_distance, offer_filter: OfferFilterProtocol = NoFiltering, **kwargs, ): self._pivots: list[Outcome] = [] self._distance_fun = distance_fun self._distnace_fun_params = kwargs self._offer_filter = offer_filter
[docs] @abstractmethod def calculate_scores( self, outcomes: Sequence[Outcome], pivots: list[Outcome], state: GBState ) -> Sequence[tuple[float, Outcome]]: ...
[docs] def __call__(self, outcomes: Sequence[Outcome], state: GBState) -> Outcome | None: if not self._negotiator or not self._negotiator.ufun: raise ValueError("Unknown ufun or negotiator") outcomes = self._offer_filter(outcomes, state) if not self._pivots: return choice(outcomes) scores = self.calculate_scores(outcomes, self._pivots, state) scores = sorted(scores, reverse=True) return scores[0][1]
[docs] class PartnerOffersOrientedSelector(OutcomeSetOrientedSelector): """ Orients offes toward the set of past opponent offers """
[docs] def before_responding( self, state: GBState, offer: Outcome | None, source: str | None = None ): if offer is None: return self._pivots.append(offer)
[docs] class MultiplicativePartnerOffersOrientedSelector(PartnerOffersOrientedSelector): """ Orients offes toward the set of past opponent offers. The score of an offer is the product of its utility to self and its distance to opponent's past offers after normalization """
[docs] def calculate_scores( self, outcomes: Sequence[Outcome], pivots: list[Outcome], state: GBState ) -> Sequence[tuple[float, Outcome]]: if not self._negotiator or not self._negotiator.ufun: raise ValueError("Unknown ufun or negotiator") return multiplicative_score( outcomes, pivots, self._negotiator.ufun, self._distance_fun, **self._distnace_fun_params, )
[docs] class AdditivePartnerOffersOrientedSelector(PartnerOffersOrientedSelector): """ Orients offes toward the set of past opponent offers. The score of an offer is the product of its utility to self and its distance to opponent's past offers after normalization """ def __init__(self, *args, u_weight: float = 0.6, **kwargs): super().__init__(*args, **kwargs) self.u_weight = u_weight
[docs] def calculate_scores( self, outcomes: Sequence[Outcome], pivots: list[Outcome], state: GBState ) -> Sequence[tuple[float, Outcome]]: if not self._negotiator or not self._negotiator.ufun: raise ValueError("Unknown ufun or negotiator") return additive_score( outcomes, pivots, self._negotiator.ufun, u_weight=self.u_weight, distance_fun=self._distance_fun, **self._distnace_fun_params, )