from __future__ import annotations
import math
import random
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Callable, Iterable
from attrs import define, field
from negmas import warnings
from negmas.common import PreferencesChange, PreferencesChangeType
from negmas.gb.common import ResponseType, current_thread_id
from .base import AcceptancePolicy, FilterResult
from .concession import ConcessionRecommender
from .models.ufun import UFunModel
if TYPE_CHECKING:
from negmas.common import PreferencesChange
from negmas.gb import GBState
from negmas.gb.negotiators.base import GBNegotiator
from negmas.outcomes import Outcome
from .offering import MiCROOfferingPolicy, OfferingPolicy
__all__ = [
"LimitedOutcomesAcceptancePolicy",
"NegotiatorAcceptancePolicy",
"ConcensusAcceptancePolicy",
"AllAcceptanceStrategies",
"AnyAcceptancePolicy",
"AcceptImmediately",
"RejectAlways",
"EndImmediately",
"AcceptAbove",
"RandomAcceptancePolicy",
"AcceptTop",
"AcceptBest",
"TFTAcceptancePolicy",
"ACNext",
"ACLast",
"ACLastKReceived",
"ACLastFractionReceived",
"ACTime",
"AcceptAfter",
"AcceptAround",
"AcceptBetween",
"ACConst",
"AcceptAnyRational",
"AcceptBetterRational",
"AcceptNotWorseRational",
]
[docs]
@define
class AcceptAnyRational(AcceptancePolicy):
"""
Accepts any rational outcome.
"""
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if not self.negotiator or not self.negotiator.preferences:
return ResponseType.REJECT_OFFER
pref = self.negotiator.preferences
if not pref.is_worse(offer, None):
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class AcceptNotWorseRational(AcceptancePolicy):
"""
Accept any outcome not worse than the best so far.
"""
_accepted: dict[str, Outcome] = field(factory=dict)
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if not self.negotiator or not self.negotiator.preferences:
return ResponseType.REJECT_OFFER
tid = current_thread_id(state, source)
current = self._accepted.get(tid, None)
if offer is None:
return ResponseType.REJECT_OFFER
pref = self.negotiator.preferences
if pref.is_not_worse(offer, current):
self._accepted[tid] = offer
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class AcceptBetterRational(AcceptancePolicy):
"""
Accept first rational outcomes and then accept only outcomes better than the all accepted so far.
"""
_accepted: dict[str, Outcome] = field(factory=dict)
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if not self.negotiator or not self.negotiator.preferences:
return ResponseType.REJECT_OFFER
tid = current_thread_id(state, source)
current = self._accepted.get(tid, None)
if offer is None:
return ResponseType.REJECT_OFFER
pref = self.negotiator.preferences
if pref.is_better(offer, current) or (
current is None and pref.is_not_worse(offer, current)
):
self._accepted[tid] = offer
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class ACConst(AcceptancePolicy):
"""
Accepts outcomes with utilities above the given threshold
"""
th: float = 0.9
[docs]
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
u = float(self.negotiator.ufun(offer))
if u >= self.th:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class ACLastKReceived(AcceptancePolicy):
"""
Accepts $\\omega$ if $\alpha u(my-next-offer) + \beta > f(u(\text{utils of offers received in the last k steps))$
"""
k: int = 0
alpha: float = 1.0
beta: float = 0.0
op: Callable[[list[float]], float] = max
_best: list[float] = field(init=False, default=[])
[docs]
def after_join(self, nmi) -> None:
k = nmi.n_steps if self.k <= 0 and nmi.n_steps else self.k
self._best = [float("inf") for _ in range(k)] # type: ignore
[docs]
def before_responding(
self, state: GBState, offer: Outcome | None, source: str | None = None
):
if not self.negotiator or not self.negotiator.ufun:
return
self._best.append(float(self.negotiator.ufun(offer)))
self._best = self._best[1:]
[docs]
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
best = self.op(self._best)
u = float(self.negotiator.ufun(offer))
if self.alpha * u + self.beta > best:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class ACLastFractionReceived(AcceptancePolicy):
"""
Accepts $\\omega$ if $\alpha u(my-next-offer) + \beta > f(u(\text{utils of offers received in the given fraction of time}))$
"""
fraction: float = 1.0
alpha: float = 1.0
beta: float = 0.0
op: Callable[[list[float]], float] = max
_best: list[tuple[float, float]] = field(init=False, default=[])
[docs]
def before_responding(
self, state: GBState, offer: Outcome | None, source: str | None = None
):
if not self.negotiator or not self.negotiator.ufun:
return
self._best.append((float(self.negotiator.ufun(offer)), state.relative_time))
[docs]
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
cutoff = state.relative_time - self.fraction
lst = []
for i in range(len(self._best) - 1, -1, -1):
if self._best[i][1] < cutoff:
self._best = self._best[i:]
break
lst.append(self._best[i][0])
best = self.op(lst)
u = float(self.negotiator.ufun(offer))
if self.alpha * u + self.beta > best:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
@define
class AcceptFinalOffer(AcceptancePolicy):
"""
Accepts the final offer as long as it is above the reserved value
"""
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun or not self.negotiator.nmi:
return ResponseType.REJECT_OFFER
if (
self.negotiator.nmi.n_steps == float("inf")
or self.negotiator.nmi.n_steps is None
):
is_last = state.relative_time > 0.999999
else:
is_last = state.step == self.negotiator.nmi.n_steps - 1
if (
is_last
and self.negotiator.ufun(offer) >= self.negotiator.ufun.reserved_value
):
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class ACLast(AcceptancePolicy):
"""
Implements the AClast acceptance strategy based on our last offer.
Accepts $\\omega$ if $\alpha u(my-next-offer) + \beta > u(\\omega)$
"""
last_offer_util: float = field(init=False, default=float("inf"))
alpha: float = 1.0
beta: float = 0.0
[docs]
def after_proposing(self, state: GBState, offer: Outcome | None):
if not self.negotiator or not self.negotiator.ufun:
return
self.last_offer_util = float(self.negotiator.ufun(offer))
[docs]
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
last = self.last_offer_util
u = float(self.negotiator.ufun(offer))
if self.alpha * u + self.beta > last:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class AcceptBetween(AcceptancePolicy):
"""
Accepts in the given range of relative times.
"""
min: float
max: float = 1.0
[docs]
def __call__(self, state, offer, source):
if self.max >= state.relative_time >= self.min:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class ACTime(AcceptancePolicy):
"""
Implements the ACtime acceptance strategy based on our next offer.
Accepts if the relative time is greater than or equal to tau
"""
tau: float
[docs]
def __call__(self, state, offer, source):
if state.relative_time >= self.tau:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
AcceptAfter = ACTime
"""An alias for ACTime"""
@define
class MiCROAcceptancePolicy(AcceptancePolicy):
"""
Implements the ACnext acceptance strategy based on our next offer.
Accepts $\\omega$ if $\alpha u(my-next-offer) + \beta > u(\\omega)$
"""
offering_strategy: MiCROOfferingPolicy
accept_same: bool = True
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
mine = (
self.offering_strategy.next_offer()
if self.offering_strategy.ready_to_concede()
else self.offering_strategy.best_offer_so_far()
)
if self.negotiator.ufun.is_better(None, mine):
mine = None
is_acceptable = (
self.negotiator.ufun.is_not_worse
if self.accept_same
else self.negotiator.ufun.is_better
)
if is_acceptable(offer, mine):
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class ACNext(AcceptancePolicy):
"""
Implements the ACnext acceptance strategy based on our next offer.
Accepts $\\omega$ if $\alpha u(my-next-offer) + \beta > u(\\omega)$
"""
offering_strategy: OfferingPolicy
alpha: float = 1.0
beta: float = 0.0
[docs]
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
next = float(self.negotiator.ufun(self.offering_strategy(state)))
u = float(self.negotiator.ufun(offer))
if self.alpha * u + self.beta >= next:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class TFTAcceptancePolicy(AcceptancePolicy):
"""
An acceptance strategy that concedes as much as the partner (or more)
"""
partner_ufun: UFunModel
recommender: ConcessionRecommender
[docs]
def __call__(self, state, offer, source):
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
partner_u = float(self.partner_ufun.eval_normalized(offer)) if offer else 1.0
partner_concession = 1.0 - partner_u
my_concession = self.recommender(partner_concession, state)
if not math.isfinite(my_concession):
warnings.warn(
f"Got {my_concession} for concession which is unacceptable. Will use no concession"
)
my_concession = 0.0
if not (-1e-6 <= my_concession <= 1.0000001):
warnings.warn(f"{my_concession} is negative or above 1")
my_concession = 0.0
u = 1.0 - float(my_concession)
if self.negotiator.ufun.eval_normalized(offer) >= u:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class AcceptAround(AcceptancePolicy):
"""Accepts around the given relative time (i.e. eps from it)"""
relative_time: float = 1.0
eps: float = 0.001
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if offer is None:
return ResponseType.REJECT_OFFER
# guarantee acceptance in the last step if relative_time <= 1
n_steps = self.negotiator.nmi.n_steps
if self.relative_time <= 1.0 and (
n_steps is not None and state.step >= n_steps - 1
):
return ResponseType.ACCEPT_OFFER
# accept around the given time
if (
state.relative_time >= self.relative_time
or (self.relative_time - state.relative_time) < self.eps
):
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class RandomAcceptancePolicy(AcceptancePolicy):
p_acceptance: float = 0.15
p_rejection: float = 0.25
p_ending: float = 0.1
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
r = random.random()
if r <= self.p_acceptance + 1e-8:
return ResponseType.ACCEPT_OFFER
if r <= self.p_acceptance + self.p_rejection + 1e-8:
return ResponseType.REJECT_OFFER
if r <= self.p_acceptance + self.p_rejection + self.p_ending + 1e-8:
return ResponseType.END_NEGOTIATION
return ResponseType.NO_RESPONSE
[docs]
@define
class AcceptBest(AcceptancePolicy):
"""
Accepts Only the best outcome.
Remarks:
- If the best possible utility cannot be found, nothing will be accepted
"""
_best_util: float = float("inf")
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
if self.negotiator.ufun(offer) >= self.negotiator.ufun.max() - 1e-10:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class AcceptTop(AcceptancePolicy):
"""
Accepts outcomes that are in the given top fraction or top `k`. If neither is given it reverts to accepting the best outcome only.
Remarks:
- The outcome-space is always discretized and the constraints `fraction` and `k` are applied to the discretized space
"""
fraction: float = 0.0
k: int = 1
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
if any(
_.type
not in (
PreferencesChangeType.Scale,
PreferencesChangeType.ReservedOutcome,
PreferencesChangeType.ReservedValue,
)
for _ in changes
):
self.negotiator.ufun.invert().init()
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
top_k = self.negotiator.ufun.invert().within_indices((0, self.k))
if offer in top_k:
return ResponseType.ACCEPT_OFFER
top_f = self.negotiator.ufun.invert().within_fractions((0.0, self.fraction))
if offer in top_f:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class AcceptAbove(AcceptancePolicy):
"""
Accepts outcomes with utilities in the given top `limit` fraction above reserve/minimum (based on `above_resrve` ).
"""
limit: float
above_reserve: bool = True
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if not self.negotiator or not self.negotiator.ufun:
return ResponseType.REJECT_OFFER
if (
self.negotiator.ufun.eval_normalized(offer, self.above_reserve)
>= self.limit
):
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class RejectAlways(AcceptancePolicy):
"""
Rejects everything
"""
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
return ResponseType.REJECT_OFFER
[docs]
@define
class LimitedOutcomesAcceptancePolicy(AcceptancePolicy):
"""
Accepts from a list of predefined outcomes
Remarks:
- if `prob` is a number, it is taken as the probability of aceptance for any outcome.
- if `prob` is `None`, the probability of acceptance of any outcome will be set to the relative time
"""
prob: dict[Outcome, float] | float | None
p_ending: float = 0.0
[docs]
@classmethod
def from_outcome_list(
cls,
outcomes: list[Outcome],
prob: list[float] | float = 1.0,
p_ending: float = 0.0,
):
if not isinstance(prob, Iterable):
prob = [float(prob)] * len(outcomes)
return LimitedOutcomesAcceptancePolicy(
prob=dict(zip(outcomes, prob)), p_ending=p_ending
)
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
if random.random() < self.p_ending - 1e-12:
return ResponseType.END_NEGOTIATION
if self.prob is None:
prob = state.relative_time
elif isinstance(self.prob, float):
prob = self.prob
else:
prob = self.prob.get(offer, 0.0) # type: ignore
if random.random() <= prob:
return ResponseType.ACCEPT_OFFER
return ResponseType.REJECT_OFFER
[docs]
@define
class NegotiatorAcceptancePolicy(AcceptancePolicy):
"""
Uses a negotiator as an offering strategy
"""
acceptor: GBNegotiator
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
return self.acceptor.respond(state, source)
[docs]
@define
class ConcensusAcceptancePolicy(AcceptancePolicy, ABC):
"""
Accepts based on concensus of multiple strategies
"""
strategies: list[AcceptancePolicy]
def _set_child_negotiators(self) -> None:
for s in self.strategies:
self.set_negotiator(self.negotiator)
[docs]
def on_negotiation_start(self, state) -> None:
self._set_child_negotiators()
return super().on_negotiation_start(state)
[docs]
def filter(self, indx: int, response: ResponseType) -> FilterResult:
"""
Called with the decision of each strategy in order.
Remarks:
- Two decisions need to be made:
1. Should we continue trying other strategies
2. Should we save this result.
"""
return FilterResult(True, True)
[docs]
@abstractmethod
def decide(self, indices: list[int], responses: list[ResponseType]) -> ResponseType:
"""
Called to make a final decision given the decisions of the strategies with indices `indices` (see `filter` for filtering rules)
"""
[docs]
def __call__(
self, state: GBState, offer: Outcome | None, source: str | None
) -> ResponseType:
selected, selected_indices = [], []
for i, s in enumerate(self.strategies):
s._negotiator = self.negotiator # type: ignore
response = s.respond(state, offer, source)
r = self.filter(i, response)
if r.save:
selected.append(response)
selected_indices.append(i)
if not r.next:
break
return self.decide(selected_indices, selected)
[docs]
@define
class AllAcceptanceStrategies(ConcensusAcceptancePolicy):
"""Accept only if all children accept, end only if all of them end, otherwise reject"""
[docs]
def filter(self, indx: int, response: ResponseType) -> FilterResult:
if response == ResponseType.REJECT_OFFER:
return FilterResult(False, True)
if response == ResponseType.END_NEGOTIATION:
return FilterResult(False, True)
return FilterResult(True, False)
[docs]
def decide(self, indices: list[int], responses: list[ResponseType]) -> ResponseType:
if not responses:
return ResponseType.ACCEPT_OFFER
return responses[0]
[docs]
@define
class AnyAcceptancePolicy(ConcensusAcceptancePolicy):
"""Accept any children accept,
end or reject only if all of them end or reject"""
[docs]
def filter(self, indx: int, response: ResponseType) -> FilterResult:
if response == ResponseType.ACCEPT_OFFER:
return FilterResult(False, True)
if response == ResponseType.END_NEGOTIATION:
return FilterResult(False, True)
return FilterResult(True, False)
[docs]
def decide(self, indices: list[int], responses: list[ResponseType]) -> ResponseType:
if not responses:
return ResponseType.REJECT_OFFER
return ResponseType.ACCEPT_OFFER