"""
Implements Stacked Alternating Offers (SAO) mechanism.
"""
from __future__ import annotations
import functools
import sys
import time
from collections import defaultdict
from typing import TYPE_CHECKING
from attr import asdict
from rich import print
from negmas import warnings
from negmas.gb.negotiators import GBNegotiator
from negmas.helpers.strings import humanize_time
from ..common import TraceElement
from ..events import Event
from ..helpers import TimeoutCaller, TimeoutError, exception2str
from ..mechanisms import Mechanism, MechanismStepResult
from ..outcomes.common import Outcome
from ..outcomes.outcome_ops import cast_value_types, outcome_types_are_ok
from .common import SAONMI, ResponseType, SAOResponse, SAOState
from .negotiators import SAONegotiator
if TYPE_CHECKING:
from negmas.preferences import Preferences
__all__ = ["SAOMechanism", "SAOProtocol", "TraceElement"]
DEFAULT_COLORMAP = "jet"
[docs]
class SAOMechanism(
Mechanism[SAONMI, SAOState, SAOResponse, SAONegotiator | GBNegotiator]
):
"""
Implements Several variants of the Stacked Alternating Offers Protocol
Args:
outcome_space: The negotiation agenda
issues: A list of issues defining the outcome-space of the negotiation
outcomes: A list of outcomes defining the outcome-space of the negotiation
n_steps: The maximum number of negotiaion rounds (see `time_limit` )
time_limit: The maximum wall-time allowed for the negotiation (see `n_steps`)
step_time_limit: The maximum wall-time allowed for a single negotiation round.
max_n_agents: The maximum number of negotiators allowed to join the negotiation.
dynamic_entry: Whether it is allowed for negotiators to join the negotiation after it starts
cache_outcomes: If true, the mechnism will catch `outcomes` and a discrete version (`discrete_outcomes`) that
can be accessed by any negotiator through their AMI.
max_cardinality: Maximum number or outcomes to use when disctetizing the outcome-space
annotation: A key-value mapping to keep around. Accessible through the AMI but not used by the mechanism.
end_on_no_response: End the negotiation if any negotiator returns NO_RESPONSE from `respond`/`counter` or returns
REJECT_OFFER then refuses to give an offer (by returning `None` from `proposee/`counter`).
enable_callbacks: Enable callbacks like on_round_start, etc. Note that on_negotiation_end is always received
by the negotiators no matter what is the setting for this parameter.
check_offers: If true, offers are checked to see if they are valid for the negotiation
outcome-space and if not the offer is considered None which is the same as
refusing to offer (NO_RESPONSE).
enforce_issue_types: If True, the type of each issue is enforced depending on the value of `cast_offers`
cast_offers: If true, each issue value is cast using the issue's type otherwise an incorrect type will be considered an invalid offer. See `check_offers`. Only
used if `enforce_issue_types`
ignore_negotiator_exceptions: just silently ignore negotiator exceptions and consider them no-responses.
offering_is_accepting: Offering an outcome implies accepting it. If not, the agent who proposed an offer will
be asked to respond to it after all other agents.
sync_calls: If given, calls to negotiators will be synchronized. This will not enforce timeouts on
single calls which means that a negotiator in an infinite loop will hog the CPU. By default
calls are done using a different thread that is killed when the timeout passes. This may, but
is not guaranteed to, resolve this issue at the expense of slower negotiations and harder debugging
name: Name of the mecnanism
**kwargs: Extra paramters passed directly to the `Mechanism` constructor
Remarks:
- One and only one of `outcome_space`, `issues`, `outcomes` can be given
- If both `n_steps` and `time_limit` are passed, the negotiation ends when either of the limits is reached.
- Negotiations may take longer than `time_limit` because negotiators are not interrupted while they are
executing their `respond` or `propose` methods.
Events:
- negotiator_exception: Data=(negotiator, exception) raised whenever a negotiator raises an exception if
ignore_negotiator_exceptions is set to True.
"""
def __init__(
self,
dynamic_entry=False,
extra_callbacks=True,
end_on_no_response=True,
avoid_ultimatum=False,
check_offers=False,
enforce_issue_types=False,
cast_offers=False,
ignore_negotiator_exceptions=False,
offering_is_accepting=True,
allow_offering_just_rejected_outcome=True,
name: str | None = None,
max_wait: int = sys.maxsize,
sync_calls: bool = False,
initial_state: SAOState | None = None,
one_offer_per_step: bool = False,
**kwargs,
):
debug = kwargs.get("debug", False)
if debug:
sync_calls = True
if avoid_ultimatum:
warnings.warn(
"Support for Avoid-Ultimatum will be removed soon. We will force avoid_ultimatum to False",
warnings.NegmasWarning,
)
avoid_ultimatum = False
super().__init__(
dynamic_entry=dynamic_entry,
extra_callbacks=extra_callbacks,
initial_state=SAOState() if not initial_state else initial_state,
name=name,
**kwargs,
)
self.nmi = SAONMI(
**{
**asdict(self.nmi, recurse=False),
**dict(
end_on_no_response=end_on_no_response,
one_offer_per_step=one_offer_per_step,
),
}
)
assert self.nmi.end_on_no_response == end_on_no_response
assert self.nmi.one_offer_per_step == one_offer_per_step
self._one_offer_per_step = one_offer_per_step
self._current_state: SAOState
# self._history: list[SAOState] = []
n_steps, time_limit = self.n_steps, self.time_limit
if (n_steps is None or n_steps == float("inf")) and (
time_limit is None or time_limit == float("inf")
):
warnings.warn(
"You are passing no time_limit and no n_steps to an SAOMechanism. The mechanism may never finish!!",
warnings.NegmasInfiniteNegotiationWarning,
)
self._sync_calls = sync_calls
self.params["one_offer_per_step"] = one_offer_per_step
self.params["end_on_no_response"] = end_on_no_response
self.params["enable_callbacks"] = extra_callbacks
self.params["sync_calls"] = sync_calls
self.params["check_offers"] = check_offers
self.params["offering_is_accepting"] = offering_is_accepting
self.params["enforce_issue_types"] = enforce_issue_types
self.params["cast_offers"] = cast_offers
self.params[
"allow_offering_just_rejected_outcome"
] = allow_offering_just_rejected_outcome
self._n_max_waits = max_wait if max_wait is not None else float("inf")
self.params["max_wait"] = self._n_max_waits
self.ignore_negotiator_exceptions = ignore_negotiator_exceptions
self.allow_offering_just_rejected_outcome = allow_offering_just_rejected_outcome
self.end_negotiation_on_refusal_to_propose = end_on_no_response
self.check_offers = check_offers
self._enforce_issue_types = enforce_issue_types
self._cast_offers = cast_offers
self._last_checked_negotiator = -1
self._current_proposer = None
self._frozen_neg_list = None
self._no_responses = 0
self._offering_is_accepting = offering_is_accepting
self._n_waits = 0
self._waiting_time: dict[str, float] = defaultdict(float)
self._waiting_start: dict[str, float] = defaultdict(lambda: float("inf"))
self._selected_first = 0
@property
def state(self) -> SAOState:
"""Returns the current state.
Override `extra_state` if you want to keep extra state
"""
return self._current_state
[docs]
def add( #
self,
negotiator: SAONegotiator | GBNegotiator,
*,
preferences: Preferences | None = None,
role: str | None = None,
**kwargs,
) -> bool | None:
from ..genius.negotiator import GeniusNegotiator
added = super().add(negotiator, preferences=preferences, role=role, **kwargs)
if (
added
and isinstance(negotiator, GeniusNegotiator)
and self.nmi.time_limit is not None
and self.nmi.time_limit != float("inf")
and self.nmi.n_steps is not None
and self.nmi.n_steps != float("inf")
):
warnings.warn(
f"{negotiator.id} of type {negotiator.__class__.__name__} is joining "
f"SAOMechanism which has a time_limit of {self.nmi.time_limit} seconds "
f"and a n_steps of {self.nmi.n_steps}. This agnet will only know about the "
f"time_limit and will not know about the n_steps!!!",
warnings.NegmasStepAndTimeLimitWarning,
)
return added
[docs]
def set_sync_call(self, v: bool):
self._sync_call = v
def _agent_info(self):
state = self._current_state
current_proposer_agent = (
self._current_proposer.owner if self._current_proposer else None
)
if current_proposer_agent:
current_proposer_agent = current_proposer_agent.id
new_offerer_agents = []
for neg_id, _ in state.new_offers:
neg = self._negotiator_map.get(neg_id, None)
agent = neg.owner if neg else None
if agent is not None:
new_offerer_agents.append(agent.id)
else:
new_offerer_agents.append(None)
return current_proposer_agent, new_offerer_agents
[docs]
def next_negotitor_ids(self) -> list[str]:
"""Returns a list of negotiator IDs in the order they are to be run in the next call to step()"""
n_negotiators = len(self.negotiators)
if self._frozen_neg_list is not None:
ordered_indices = self._frozen_neg_list
else:
ordered_indices = [
(_ + self._last_checked_negotiator + 1) % n_negotiators
for _ in range(n_negotiators)
]
if ordered_indices and self._one_offer_per_step:
ordered_indices = ordered_indices[:1]
ids = self.negotiator_ids
return [ids[_] for _ in ordered_indices]
def _stop_waiting(self, negotiator_id):
self._waiting_time[negotiator_id] = 0.0
self._waiting_start[negotiator_id] = float("inf")
self._n_waits = 0
self._frozen_neg_list = None
[docs]
def __call__(self, state, action=None) -> MechanismStepResult:
"""
implements a round or a single step of the Stacked Alternating Offers Protocol.
Args:
state: Current state of the mechanism
action: The action to use as a mapping from negotiator ID (key) to its response (value).
If not given, the negotiator(s) is called to generate its response.
"""
state = self._current_state
if self._frozen_neg_list is None:
state.new_offers = []
negotiators: list[SAONegotiator | GBNegotiator] = self.negotiators
n_negotiators = len(negotiators)
# times = dict(zip([_.id for _ in negotiators], itertools.repeat(0.0)))
times = defaultdict(float, self._waiting_time)
exceptions = dict(
zip([_.id for _ in negotiators], [list() for _ in negotiators])
)
def _safe_counter(
negotiator, *args, **kwargs
) -> tuple[SAOResponse | None, bool]:
assert (
not state.waiting or negotiator.id == state.current_proposer
), f"We are waiting with {state.current_proposer} as the last offerer but we are asking {negotiator.id} to offer\n{state}"
if self.verbosity > 2:
print(
f"{self.name}: {negotiator.name} called after {humanize_time(time.perf_counter() - self._start_time, show_ms=True) if self._start_time else 0}",
flush=True,
)
rem = self.remaining_time
if rem is None:
rem = float("inf")
timeout = min(
self.nmi.negotiator_time_limit - times[negotiator.id],
self.nmi.step_time_limit,
rem,
self._hidden_time_limit - self.time,
)
given_response = action.pop(negotiator.id, None) if action else None
if timeout is None or timeout == float("inf") or self._sync_calls:
__strt = time.perf_counter()
try:
if (
negotiator == self._current_proposer
) and self._offering_is_accepting:
self._current_state.n_acceptances = 0
response = (
given_response
if given_response
else negotiator(*args, **kwargs)
)
else:
response = (
given_response
if given_response
else negotiator(*args, **kwargs)
)
except TimeoutError:
response = None
try:
negotiator.cancel()
except Exception:
pass
except Exception as ex:
exceptions[negotiator.id].append(exception2str())
if self.ignore_negotiator_exceptions:
self.announce(
Event(
"negotiator_exception",
{"negotiator": negotiator, "exception": ex},
)
)
times[negotiator.id] += time.perf_counter() - __strt
return SAOResponse(ResponseType.END_NEGOTIATION, None), True
else:
raise ex
times[negotiator.id] += time.perf_counter() - __strt
else:
fun = functools.partial(negotiator, *args, **kwargs)
__strt = time.perf_counter()
try:
if (
negotiator == self._current_proposer
) and self._offering_is_accepting:
state.n_acceptances = 0
response = (
given_response
if given_response
else TimeoutCaller.run(fun, timeout=timeout)
)
else:
response = (
given_response
if given_response
else TimeoutCaller.run(fun, timeout=timeout)
)
except TimeoutError:
response = None
except Exception as ex:
exceptions[negotiator.id].append(exception2str())
if self.ignore_negotiator_exceptions:
self.announce(
Event(
"negotiator_exception",
{"negotiator": negotiator, "exception": ex},
)
)
times[negotiator.id] += time.perf_counter() - __strt
return SAOResponse(ResponseType.END_NEGOTIATION, None), True
else:
raise ex
times[negotiator.id] += time.perf_counter() - __strt
if (
self.check_offers
and response is not None
and response.outcome is not None
):
if not self.outcome_space.is_valid(response.outcome):
return SAOResponse(response.response, None), False
# todo: do not use .issues here as they are not guaranteed to exist (if it is not a cartesial outcome space)
if self._enforce_issue_types and hasattr(self.outcome_space, "issues"):
if outcome_types_are_ok(
response.outcome, getattr(self.outcome_space, "issues")
):
return response, False
elif self._cast_offers:
return (
SAOResponse(
response.response,
cast_value_types(
response.outcome,
getattr(self.outcome_space, "issues"),
),
),
False,
)
return SAOResponse(response.response, None), False
return response, False
proposers, proposer_indices = [], []
for i, neg in enumerate(negotiators):
if not neg.capabilities.get("propose", False):
continue
proposers.append(neg)
proposer_indices.append(i)
n_proposers = len(proposers)
if n_proposers < 1:
if not self.dynamic_entry:
state.broken = True
state.has_error = True
state.error_details = "No proposers and no dynamic entry"
return MechanismStepResult(state, times=times, exceptions=exceptions)
else:
return MechanismStepResult(state, times=times, exceptions=exceptions)
if self._frozen_neg_list is not None:
ordered_indices = self._frozen_neg_list
else:
ordered_indices = [
(_ + self._last_checked_negotiator + 1) % n_negotiators
for _ in range(n_negotiators)
]
if ordered_indices and self._one_offer_per_step:
ordered_indices = ordered_indices[:1]
for _, neg_indx in enumerate(ordered_indices):
self._last_checked_negotiator = neg_indx
neg = self.negotiators[neg_indx]
strt = time.perf_counter()
resp, has_exceptions = _safe_counter(neg, state=self.state)
self._negotiator_times[neg.id] += time.perf_counter() - strt
if has_exceptions:
state.broken = True
state.has_error = True
state.error_details = str(exceptions[neg.id])
return MechanismStepResult(state, times=times, exceptions=exceptions)
if resp is None:
state.timedout = True
return MechanismStepResult(state, times=times, exceptions=exceptions)
if resp.response == ResponseType.WAIT:
self._waiting_start[neg.id] = min(self._waiting_start[neg.id], strt)
self._waiting_time[neg.id] += time.perf_counter() - strt
self._last_checked_negotiator = (neg_indx - 1) % n_negotiators
offered = {self._negotiator_index[_[0]] for _ in state.new_offers}
did_not_offer = sorted(
list(set(range(n_negotiators)).difference(offered))
)
assert neg_indx in did_not_offer
indx = did_not_offer.index(neg_indx)
assert (
self._frozen_neg_list is None
or self._frozen_neg_list[0] == neg_indx
)
self._frozen_neg_list = did_not_offer[indx:] + did_not_offer[:indx]
self._n_waits += 1
else:
self._stop_waiting(neg.id)
if resp is None or time.perf_counter() - strt > self.nmi.step_time_limit:
state.timedout = True
return MechanismStepResult(state, times=times, exceptions=exceptions)
if self._extra_callbacks:
if state.current_offer is not None:
for other in self.negotiators:
if other is not neg:
other.on_partner_response(
state=self.state,
partner_id=neg.id,
outcome=state.current_offer,
response=resp.response,
)
if resp.response == ResponseType.NO_RESPONSE:
continue
if resp.response == ResponseType.WAIT:
if self._n_waits > self._n_max_waits:
self._stop_waiting(neg.id)
state.timedout = True
state.waiting = False
return MechanismStepResult(
state, times=times, exceptions=exceptions
)
state.waiting = True
return MechanismStepResult(state, times=times, exceptions=exceptions)
if resp.response == ResponseType.END_NEGOTIATION:
state.broken = True
return MechanismStepResult(state, times=times, exceptions=exceptions)
if resp.response == ResponseType.ACCEPT_OFFER:
state.n_acceptances += 1
if state.n_acceptances == n_negotiators:
state.agreement = self._current_state.current_offer
return MechanismStepResult(
state,
timedout=False,
agreement=state.current_offer,
times=times,
exceptions=exceptions,
broken=False,
)
if resp.response == ResponseType.REJECT_OFFER:
proposal = resp.outcome
if (
not self.allow_offering_just_rejected_outcome
and proposal == state.current_offer
):
proposal = None
if proposal is None:
if (
neg.capabilities.get("propose", True)
and self.end_negotiation_on_refusal_to_propose
):
state.broken = True
return MechanismStepResult(
state, times=times, exceptions=exceptions
)
state.n_acceptances = 0
else:
state.n_acceptances = 1 if self._offering_is_accepting else 0
if self._extra_callbacks:
for other in self.negotiators:
if other is neg:
continue
other.on_partner_proposal(
partner_id=neg.id, offer=proposal, state=self.state
)
state.current_offer = proposal
self._current_proposer = neg
state.current_proposer = neg.id
state.new_offers.append((neg.id, proposal))
if self._last_checked_negotiator >= 0:
state.last_negotiator = self.negotiators[
self._last_checked_negotiator
].name
else:
state.last_negotiator = ""
(
self._current_proposer_agent,
state.new_offerer_agents,
) = self._agent_info()
# if action is not None:
# assert (
# not action
# ), f"Not all negotiator actions were used in this step: {action}"
return MechanismStepResult(state, times=times, exceptions=exceptions)
@property
def full_trace(self) -> list[TraceElement]:
"""Returns the negotiation history as a list of relative_time/step/negotiator/offer tuples"""
def response(state: SAOState):
if state.agreement:
return "agreement"
if state.timedout:
return "timedout"
if state.ended:
return "ended"
if state.has_error:
return "error"
return "continuing"
offers = []
def get_acceptances(state: SAOState):
neg = state.current_proposer
n_acceptances = state.n_acceptances
if self._offering_is_accepting:
n_acceptances -= 1
if neg is None:
indices = []
else:
indx = self.negotiator_index(neg)
n = self.nmi.n_negotiators
if indx is None:
indices = []
else:
indices = [
_ if _ < n else _ % n for _ in range(indx, n_acceptances + indx)
]
return [self.negotiator_ids[_] for _ in indices]
for state in self._history:
state: SAOState
acceptances = get_acceptances(state)
offers += [
TraceElement(
state.time,
state.relative_time,
state.step,
n,
o,
{n: ResponseType.ACCEPT_OFFER for n in acceptances},
response(state),
)
for n, o in state.new_offers
]
def not_equal(a, b):
return any(x != y for x, y in zip(a, b))
self._history: list[SAOState]
# if the agreement does not appear as the last offer in the trace, add it.
# this should not happen though!!
if (
self.agreement is not None
and offers
and not_equal(offers[-1].offer, self.agreement)
):
acceptances = get_acceptances(self._history[-1])
offers.append(
TraceElement(
self._history[-1].time,
self._history[-1].relative_time,
self._history[-1].step,
self._history[-1].current_proposer,
self.agreement,
{n: ResponseType.ACCEPT_OFFER for n in acceptances},
response(self._history[-1]),
)
)
return offers
@property
def extended_trace(self) -> list[tuple[int, str, Outcome]]:
"""Returns the negotiation history as a list of step/negotiator/offer tuples"""
offers = []
for state in self._history:
state: SAOState
offers += [(state.step, n, o) for n, o in state.new_offers]
def not_equal(a, b):
return any(x != y for x, y in zip(a, b))
self._history: list[SAOState] #
# if the agreement does not appear as the last offer in the trace, add it.
# this should not happen though!!
if (
self.agreement is not None
and offers
and not_equal(offers[-1][-1], self.agreement)
):
offers.append(
(
self._history[-1].step,
self._history[-1].current_proposer,
self.agreement,
)
)
return offers
@property
def trace(self) -> list[tuple[str, Outcome]]:
"""Returns the negotiation history as a list of negotiator/offer tuples"""
offers = []
for state in self._history:
offers += [(n, o) for n, o in state.new_offers]
def not_equal(a, b):
if isinstance(a, dict):
a = a.values()
if isinstance(b, dict):
b = b.values()
return any(x != y for x, y in zip(a, b))
if (
self.agreement is not None
and offers
and not_equal(offers[-1][-1], self.agreement)
):
offers.append((self._history[-1].current_proposer, self.agreement))
return offers
[docs]
def negotiator_offers(self, negotiator_id: str) -> list[Outcome]:
"""Returns the offers given by a negotiator (in order)"""
return [o for n, o in self.trace if n == negotiator_id]
[docs]
def negotiator_full_trace(
self, negotiator_id: str
) -> list[tuple[float, float, int, Outcome, str]]:
"""Returns the (time/relative-time/step/outcome/response) given by a negotiator (in order)"""
return [
(t, rt, s, o, a)
for t, rt, s, n, o, _, a in self.full_trace
if n == negotiator_id
]
@property
def offers(self) -> list[Outcome]:
"""Returns the negotiation history as a list of offers"""
return [o for _, o in self.trace]
@property
def _step(self):
"""
A private property used by the checkpoint system
"""
return self._current_state.step
[docs]
def plot(
self,
plotting_negotiators: tuple[int, int] | tuple[str, str] = (0, 1),
save_fig: bool = False,
path: str | None = None,
fig_name: str | None = None,
ignore_none_offers: bool = True,
with_lines: bool = True,
show_agreement: bool = False,
show_pareto_distance: bool = True,
show_nash_distance: bool = True,
show_kalai_distance: bool = True,
show_max_welfare_distance: bool = True,
show_max_relative_welfare_distance: bool = False,
show_end_reason: bool = True,
show_last_negotiator: bool = True,
show_annotations: bool = False,
show_reserved: bool = True,
show_total_time=True,
show_relative_time=True,
show_n_steps=True,
colors: list | None = None,
markers: list[str] | None = None,
colormap: str = DEFAULT_COLORMAP,
ylimits: tuple[float, float] | None = None,
common_legend: bool = True,
xdim: str = "relative_time",
only2d: bool = False,
no2d: bool = False,
fast: bool = False,
simple_offers_view: bool = False,
mark_offers_view: bool = True,
mark_pareto_points: bool = True,
mark_all_outcomes: bool = True,
mark_nash_points: bool = True,
mark_kalai_points: bool = True,
mark_max_welfare_points: bool = True,
**kwargs,
):
from negmas.plots.util import plot_mechanism_run
extra_annotation = (
f"Last: {self._current_state.last_negotiator}"
if show_last_negotiator
else ""
)
return plot_mechanism_run(
mechanism=self,
negotiators=plotting_negotiators,
save_fig=save_fig,
path=path,
fig_name=fig_name,
ignore_none_offers=ignore_none_offers,
with_lines=with_lines,
only2d=only2d,
show_agreement=show_agreement,
show_pareto_distance=show_pareto_distance,
show_nash_distance=show_nash_distance,
show_kalai_distance=show_kalai_distance,
show_max_welfare_distance=show_max_welfare_distance,
show_max_relative_welfare_distance=show_max_relative_welfare_distance,
show_end_reason=show_end_reason,
show_annotations=show_annotations,
show_reserved=show_reserved,
colors=colors,
markers=markers,
colormap=colormap,
ylimits=ylimits,
common_legend=common_legend,
extra_annotation=extra_annotation,
xdim=xdim,
colorizer=lambda _: 1.0,
show_total_time=show_total_time,
show_relative_time=show_relative_time,
show_n_steps=show_n_steps,
fast=fast,
no2d=no2d,
simple_offers_view=simple_offers_view,
mark_offers_view=mark_offers_view,
mark_pareto_points=mark_pareto_points,
mark_all_outcomes=mark_all_outcomes,
mark_nash_points=mark_nash_points,
mark_kalai_points=mark_kalai_points,
mark_max_welfare_points=mark_max_welfare_points,
**kwargs,
)
SAOProtocol = SAOMechanism
"""An alias for `SAOMechanism` object"""