from __future__ import annotations
import math
import random
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import bisect
from attrs import define, field
from negmas import warnings
from negmas.common import PreferencesChangeType, Value
from negmas.negotiators.helpers import PolyAspiration
from negmas.outcomes.common import ExtendedOutcome
from negmas.outcomes.protocols import DiscreteOutcomeSpace
from negmas.preferences.inv_ufun import PresortingInverseUtilityFunction
from .base import FilterResult, OfferingPolicy
from .concession import ConcessionRecommender
from .models.ufun import UFunModel
if TYPE_CHECKING:
from negmas.common import PreferencesChange
from negmas.gb import GBState
from negmas.gb.negotiators.base import GBNegotiator
from negmas.outcomes import Outcome
__all__ = [
"CABOfferingPolicy",
"WAROfferingPolicy",
"LimitedOutcomesOfferingPolicy",
"NegotiatorOfferingPolicy",
"ConcensusOfferingPolicy",
"RandomConcensusOfferingPolicy",
"UnanimousConcensusOfferingPolicy",
"UtilBasedConcensusOfferingPolicy",
"MyBestConcensusOfferingPolicy",
"MyWorstConcensusOfferingPolicy",
"NoneOfferingPolicy",
"RandomOfferingPolicy",
"OfferTop",
"OfferBest",
"TFTOfferingPolicy",
"MiCROOfferingPolicy",
"TimeBasedOfferingPolicy",
"HybridOfferingPolicy",
]
def index_of_nearest_value(
values: list[float], x: float, above_only: bool = False
) -> int:
"""
Finds the index of the nearest value to x
Args:
values: A list of sorted values (sorted ascendingly)
x: The target float value.
above_only: If True, only consider items with values above x.
Returns:
The index of the item with the nearest value. -1 means an empty list
"""
if not values:
return -1
n = len(values)
if above_only:
# Find the first item with a value greater than or equal to x
first_above_index = bisect.bisect_left(values, x)
if first_above_index >= n - 1:
return n - 1
return first_above_index
else:
# Find the nearest value using bisect
index = bisect.bisect_left(values, x)
if index == 0:
return 0
elif index == n:
return n - 1
if abs(values[index - 1] - x) <= abs(values[index] - x):
return index - 1
return index
[docs]
@define
class HybridOfferingPolicy(OfferingPolicy):
initial_utility: float = float("nan")
concession_ratio: float = float("nan")
final_utility: float = float("nan")
empathy_score: float = float("nan")
frac_time_based: dict[int, tuple[float, ...]] = field(
factory=lambda: {
1: (1.0,),
2: (0.25, 0.75),
3: (0.11, 0.22, 0.66),
4: (0.05, 0.15, 0.3, 0.5),
}
)
above_only: bool = False
_sent_offers: list[Outcome] = field(init=False, factory=list)
_sent_utils: list[float] = field(init=False, factory=list)
_received_offers: list[Outcome] = field(init=False, factory=list)
_received_utils: list[float] = field(init=False, factory=list)
_outcomes: list[Outcome] = field(init=False, factory=list)
_values: list[float] = field(init=False, factory=list)
def _adjust_params(self):
self.initial_utility = 1.0
self.concession_ratio = 0.75
self.final_utility = 0.55
self.empathy_score = 0.5
ufun = self.negotiator.ufun
assert ufun, "Unknown ufun. Cannot continue"
domain_size = int(ufun.outcome_space.cardinality) # type: ignore
if domain_size < 450:
self.final_utility = 0.80
elif domain_size < 1500:
self.final_utility = 0.775
elif domain_size < 4500:
self.final_utility = 0.75
elif domain_size < 18000:
self.final_utility = 0.725
elif domain_size < 33000:
self.final_utility = 0.70
else:
self.final_utility = 0.675
self._sent_utils = [ufun(_) for _ in self._sent_offers]
self._received_utils = [ufun(_) for _ in self._received_utils]
self.final_utility = max(self.final_utility, float(ufun.reserved_value))
os = ufun.outcome_space
assert os
outcomes = (
os.enumerate_or_sample(levels=10, max_cardinality=1_000_000)
if not isinstance(os, DiscreteOutcomeSpace)
else os.enumerate()
)
r = float(ufun.reserved_value)
outcome_util = sorted([(u, _) for _ in outcomes if (u := ufun(_)) >= r])
self._outcomes = [_[1] for _ in outcome_util]
self._values = [_[0] for _ in outcome_util]
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
self._adjust_params()
return super().on_preferences_changed(changes)
[docs]
def time_based(self, t: float) -> float:
"""
Target utility calculation of Time-Based strategy
:param t: Negotiation time
:return: Target utility
"""
return (
(1 - t) * (1 - t) * self.initial_utility
+ 2 * (1 - t) * t * self.concession_ratio
+ t * t * self.final_utility
)
[docs]
def behaviour_based(self, t: float) -> float:
"""
Target utility calculation of Behavior-Based strategy
:param t: Negotiation time
:return: Target utility
"""
# Utility differences of consecutive offers of opponent
diff = [
self._received_utils[i + 1] - self._received_utils[i]
for i in range(len(self._received_utils) - 1)
]
# Fixed size window
if len(diff) > len(self.frac_time_based):
diff = diff[-len(self.frac_time_based) :]
# delta = diff * window
delta = sum([u * w for u, w in zip(diff, self.frac_time_based[len(diff)])])
# Calculate target utility by updating the last offered bid
target_utility = (
self._sent_utils[-1] - (self.empathy_score + self.empathy_score * t) * delta
)
return target_utility
[docs]
def __call__(self, state: GBState, dest: str | None = None):
if not self._values:
self._adjust_params()
t = state.relative_time
# Target utility of Time-Based strategy
target_utility = self.time_based(t)
# If first 2 round, apply only Time-Based strategy
if len(self._received_offers) > 2:
# Target utility of Behavior-Based strategy
behavior_utility = self.behaviour_based(t)
# Combining Time-Based and Behavior-Based strategy
target_utility = (1.0 - t * t) * behavior_utility + t * t * target_utility
ufun = self.negotiator.ufun
assert ufun, "Unknown ufun. Cannot continue"
r = float(ufun.reserved_value)
# Target utility cannot be lower than the reservation value.
if target_utility < r:
target_utility = r
# # AC_Next strategy to decide accepting or not
# if self.can_accept() and target_utility <= self.last_received_bids[-1].utility:
# return self.accept_action
# Find the closest bid to target utility
indx = index_of_nearest_value(self._values, target_utility)
outcome = self._outcomes[indx]
self._sent_offers.append(outcome)
self._sent_utils.append(float(ufun(outcome)))
return outcome
[docs]
def on_partner_proposal(
self, state: GBState, partner_id: str, offer: Outcome
) -> None:
ufun = self.negotiator.ufun
assert ufun, "Unknown ufun. Cannot continue"
self._received_offers.append(offer)
self._received_utils.append(float(ufun(offer)))
return super().on_partner_proposal(state, partner_id, offer)
[docs]
@define
class TimeBasedOfferingPolicy(OfferingPolicy):
curve: PolyAspiration = field(factory=lambda: PolyAspiration(1.0, "boulware"))
stochastic: bool = False
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
if self.sorter is not None:
warnings.warn(
"Sorter is already initialized. May be on_preferences_changed is called twice!!"
)
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
[docs]
def __call__(self, state: GBState, dest: str | None = None):
assert self.negotiator.ufun is not None
asp = self.curve.utility_at(state.relative_time)
mn, mx = self.sorter.minmax()
assert mn >= self.negotiator.ufun.reserved_value
asp = asp * (mx - mn) + mn
if self.stochastic:
outcome = self.sorter.one_in((asp, mx), normalized=True)
else:
outcome = self.sorter.worst_in((asp - 1e-5, mx), normalized=True)
if outcome:
return outcome
return self.sorter.best()
[docs]
@define
class MiCROOfferingPolicy(OfferingPolicy):
next_indx: int = 0
sorter: PresortingInverseUtilityFunction | None = field(repr=False, default=None)
_received: set[Outcome] = field(factory=set)
_sent: set[Outcome] = field(factory=set)
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
if any(
_.type
not in (
PreferencesChangeType.Scale,
PreferencesChangeType.ReservedOutcome,
PreferencesChangeType.ReservedValue,
)
for _ in changes
):
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
self.next_indx = 0
self._received = set()
self._sent = set()
[docs]
def sample_sent(self) -> Outcome | None:
if not len(self._sent):
return None
return random.choice(list(self._sent))
[docs]
def ensure_sorter(self):
if not self.sorter:
assert self.negotiator.ufun
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
return self.sorter
[docs]
def next_offer(self) -> Outcome | None:
return self.ensure_sorter().outcome_at(self.next_indx)
[docs]
def best_offer_so_far(self) -> Outcome | None:
if self.next_indx > 0:
return self.ensure_sorter().outcome_at(self.next_indx - 1)
return None
[docs]
def ready_to_concede(self) -> bool:
return len(self._sent) <= len(self._received)
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
outcome = self.next_offer()
assert self.sorter
assert self.negotiator.ufun
if (
outcome is None
or self.sorter.utility_at(self.next_indx)
< self.negotiator.ufun.reserved_value
or not self.ready_to_concede()
):
return self.sample_sent()
self.next_indx += 1
self._sent.add(outcome)
return outcome
[docs]
def on_partner_proposal(
self, state: GBState, partner_id: str, offer: Outcome
) -> None:
self._received.add(offer)
return super().on_partner_proposal(state, partner_id, offer)
[docs]
@define
class CABOfferingPolicy(OfferingPolicy):
next_indx: int = 0
sorter: PresortingInverseUtilityFunction | None = field(repr=False, default=None)
_last_offer: Outcome | None = field(init=False, default=None)
_repeating: bool = field(init=False, default=False)
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
if any(
_.type
not in (
PreferencesChangeType.Scale,
PreferencesChangeType.ReservedOutcome,
PreferencesChangeType.ReservedValue,
)
for _ in changes
):
if self.sorter is not None:
warnings.warn(
"Sorter is already initialized. May be on_preferences_changed is called twice!!"
)
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
self.next_indx = 0
self._repeating = False
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
if (
self._repeating
or not self.negotiator
or not self.negotiator.ufun
or not self.negotiator.nmi
):
return self._last_offer
if self.next_indx >= self.negotiator.nmi.n_outcomes:
return self._last_offer
if not self.sorter:
warnings.warn(
"Sorter is not initialized. May be on_preferences_changed is never called before propose!!"
)
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
outcome = self.sorter.outcome_at(self.next_indx)
if (
outcome is None
or self.sorter.utility_at(self.next_indx)
< self.negotiator.ufun.reserved_value
):
# self.negotiator.nmi.mechanism.plot()
# breakpoint()
self._repeating = True
return self._last_offer
self.next_indx += 1
self._last_offer = outcome
return outcome
[docs]
@define
class WAROfferingPolicy(OfferingPolicy):
next_indx: int = 0
sorter: PresortingInverseUtilityFunction | None = field(repr=False, default=None)
_last_offer: Outcome | None = field(init=False, default=None)
_repeating: bool = field(init=False, default=False)
_irrational: bool = field(init=False, default=True)
_irrational_index: int = field(init=False, default=-1)
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
self._irrational = True
self._irrational_index = int(self.negotiator.nmi.n_outcomes) - 1
if any(
_.type
not in (
PreferencesChangeType.Scale,
PreferencesChangeType.ReservedOutcome,
PreferencesChangeType.ReservedValue,
)
for _ in changes
):
if self.sorter is not None:
warnings.warn(
"Sorter is already initialized. May be on_preferences_changed is called twice!!"
)
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
self.next_indx = 0
self._repeating = False
[docs]
def on_negotiation_start(self, state) -> None:
self._repeating = False
self._irrational = True
self._irrational_index = self.negotiator.nmi.n_outcomes - 1 # type: ignore
return super().on_negotiation_start(state)
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
if not self.negotiator or not self.negotiator.ufun or not self.negotiator.nmi:
return self._last_offer
if self._repeating:
return self._last_offer
if not self._irrational and self.next_indx >= self.negotiator.nmi.n_outcomes:
return self._last_offer
if not self.sorter:
warnings.warn(
"Sorter is not initialized. May be on_preferences_changed is never called before propose!!"
)
self.sorter = PresortingInverseUtilityFunction(
self.negotiator.ufun, rational_only=True, eps=-1, rel_eps=-1
)
self.sorter.init()
nxt = self._irrational_index if self._irrational else self.next_indx
outcome = self.sorter.outcome_at(nxt)
if self._irrational:
if (
outcome is None
or self.sorter.utility_at(self._irrational_index)
>= self.negotiator.ufun.reserved_value
):
self._irrational = False
assert self._last_offer is None
outcome = self.sorter.outcome_at(self.next_indx)
else:
self._irrational_index -= 1
return outcome
if (
outcome is None
or self.sorter.utility_at(self.next_indx)
< self.negotiator.ufun.reserved_value
):
self._repeating = True
return self._last_offer
self.next_indx += 1
self._last_offer = outcome
return outcome
[docs]
@define
class TFTOfferingPolicy(OfferingPolicy):
"""
An acceptance strategy that concedes as much as the partner (or more)
"""
partner_ufun: UFunModel
recommender: ConcessionRecommender
stochastic: bool = False
_partner_offer: Outcome | None = field(init=False, default=None)
[docs]
def before_responding(
self, state: GBState, offer: Outcome | None, source: str | None = None
):
self._partner_offer = offer
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
super().on_preferences_changed(changes)
self.partner_ufun.on_preferences_changed(changes)
[docs]
def __call__(self, state: GBState, dest: str | None = None):
if not self.negotiator or not self.negotiator.ufun:
return None
partner_u = (
float(self.partner_ufun.eval_normalized(self._partner_offer, True))
if self._partner_offer
else 1.0
)
partner_concession = 1.0 - partner_u
my_concession = self.recommender(partner_concession, state)
if not math.isfinite(my_concession):
warnings.warn(
f"Got {my_concession} for concession which is unacceptable. Will use no concession"
)
my_concession = 0.0
if not (-1e-6 <= my_concession <= 1.0000001):
warnings.warn(f"{my_concession} is negative or above 1")
my_concession = 0.0
target_utility = 1.0 - float(my_concession)
if self.stochastic:
return self.negotiator.ufun.invert().one_in(
(target_utility, 1.0), normalized=True
)
return self.negotiator.ufun.invert().worst_in(
(target_utility, 1.0), normalized=True
)
[docs]
@define
class OfferBest(OfferingPolicy):
"""
Offers Only the best outcome.
Remarks:
- You can pass the best outcome if you know it as `best` otherwise it will find it.
"""
_best: Outcome | None = None
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
_, self._best = self.negotiator.ufun.extreme_outcomes()
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
return self._best
[docs]
@define
class OfferTop(OfferingPolicy):
"""
Offers outcomes that are in the given top fraction or top `k`. If neither is given it reverts to only offering the best outcome
Remarks:
- The outcome-space is always discretized and the constraints `fraction` and `k` are applied to the discretized space
"""
fraction: float = 0.0
k: int = 1
_top: list[Outcome] | None = field(init=False, default=None)
[docs]
def on_preferences_changed(self, changes: list[PreferencesChange]):
if not self.negotiator or not self.negotiator.ufun:
return
if any(
_.type
not in (
PreferencesChangeType.Scale,
PreferencesChangeType.ReservedOutcome,
PreferencesChangeType.ReservedValue,
)
for _ in changes
):
inverter = self.negotiator.ufun.invert()
inverter.init()
top_k = inverter.within_indices((0, self.k))
top_f = inverter.within_fractions((0.0, self.fraction))
self._top = list(set(top_k + top_f))
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
if not self.negotiator or not self.negotiator.ufun:
return None
if self._top is None:
self.on_preferences_changed([])
if not self._top:
return None
return random.choice(self._top)
[docs]
@define
class NoneOfferingPolicy(OfferingPolicy):
"""
Always offers `None` which means it never gets an agreement.
"""
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
return None
[docs]
@define
class RandomOfferingPolicy(OfferingPolicy):
"""
Always offers `None` which means it never gets an agreement.
"""
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
if not self.negotiator or not self.negotiator.nmi:
return None
return self.negotiator.nmi.random_outcome()
[docs]
@define
class LimitedOutcomesOfferingPolicy(OfferingPolicy):
"""
Offers from a given list of outcomes
"""
outcomes: list[Outcome] | None
prob: list[float] | None = None
p_ending: float = 0.0
def _run(
self, state: GBState, dest: str | None = None, second_trial: bool = False
) -> Outcome | None:
if not self.negotiator or not self.negotiator.nmi:
return None
if random.random() < self.p_ending - 1e-7:
return None
if not self.prob or not self.outcomes:
return random.choice(
self.outcomes
if self.outcomes
else list(self.negotiator.nmi.discrete_outcomes())
)
r, s = random.random(), 0.0
for w, p in zip(self.outcomes, self.prob):
s += p
if r <= s:
return w
if second_trial:
return None
if s > 0.999:
return self.outcomes[-1]
self.prob = [_ / s for _ in self.prob]
return self._run(state, dest, True)
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
return self._run(state, dest)
[docs]
@define
class NegotiatorOfferingPolicy(OfferingPolicy):
"""
Uses a negotiator as an offering strategy
"""
proposer: GBNegotiator = field(kw_only=True)
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
r = self.proposer.propose(state)
if isinstance(r, ExtendedOutcome):
return r.outcome
return r
[docs]
@define
class ConcensusOfferingPolicy(OfferingPolicy, ABC):
"""
Offers based on concensus of multiple strategies
"""
strategies: list[OfferingPolicy]
[docs]
def filter(self, indx: int, offer: Outcome | None) -> FilterResult:
"""
Called with the decision of each strategy in order.
Remarks:
- Two decisions need to be made:
1. Should we continue trying other strategies
2. Should we save this result.
"""
return FilterResult(True, True)
[docs]
@abstractmethod
def decide(
self, indices: list[int], responses: list[Outcome | None]
) -> Outcome | None:
"""
Called to make a final decsision given the decisions of the stratgeis with indices `indices` (see `filter` for filtering rules)
"""
[docs]
def __call__(self, state: GBState, dest: str | None = None) -> Outcome | None:
selected, selected_indices = [], []
for i, s in enumerate(self.strategies):
response = s.propose(state)
r = self.filter(i, response)
if not r.next:
break
if r.save:
selected.append(response)
selected_indices.append(i)
return self.decide(selected_indices, selected)
[docs]
@define
class UnanimousConcensusOfferingPolicy(ConcensusOfferingPolicy):
"""
Offers only if all offering strategies gave exactly the same outcome
"""
[docs]
def decide(
self, indices: list[int], responses: list[Outcome | None]
) -> Outcome | None:
outcomes = set(responses)
if len(outcomes) != 1:
return None
return list(outcomes)[0]
[docs]
@define
class RandomConcensusOfferingPolicy(ConcensusOfferingPolicy):
"""
Offers a random response from the list of strategies (different strategy every time).
"""
prob: list[float] | None = None
def __attrs_post_init__(self):
if not self.prob:
return
s = sum(self.prob)
self.prob = [_ / s for _ in self.prob]
[docs]
def decide(
self, indices: list[int], responses: list[Outcome | None]
) -> Outcome | None:
if not self.prob:
return random.choice(responses)
r, s = random.random(), 0.0
for i, p in enumerate(self.prob):
s += p
if r <= s:
return responses[i]
if s > 0.999:
return responses[-1]
raise ValueError(f"sum of probabilities is less than 1: {s}")
[docs]
@define
class UtilBasedConcensusOfferingPolicy(ConcensusOfferingPolicy, ABC):
"""
Offers from the list of stratgies (different strategy every time) based on outcome utilities
"""
[docs]
@abstractmethod
def decide_util(self, utils: list[Value]) -> int:
"""
Returns the index to chose based on utils
"""
[docs]
def decide(
self, indices: list[int], responses: list[Outcome | None]
) -> Outcome | None:
if not self.negotiator.ufun:
raise ValueError("Cannot decide because I have no ufun")
return responses[
self.decide_util([self.negotiator.ufun(_) for _ in set(responses)])
]
[docs]
@define
class MyBestConcensusOfferingPolicy(UtilBasedConcensusOfferingPolicy):
"""
Offers my best outcome from the list of stratgies (different strategy every time).
"""
[docs]
def decide_util(self, utils: list[Value]) -> int:
return max(range(len(utils)), key=lambda x: utils[x])
[docs]
@define
class MyWorstConcensusOfferingPolicy(UtilBasedConcensusOfferingPolicy):
"""
Offers my worst outcome from the list of stratgies (different strategy every time) based on outcome utilities
"""
[docs]
def decide_util(self, utils: list[Value]) -> int:
return min(range(len(utils)), key=lambda x: utils[x])