"""Provides interfaces for defining negotiation mechanisms."""
from __future__ import annotations
import copy
import math
import pprint
import random
import time
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
from os import PathLike
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
Sequence,
runtime_checkable,
Protocol,
TypeVar,
Generic,
)
from warnings import warn
from attrs import define
from rich.progress import Progress
from negmas import warnings
from negmas.checkpoints import CheckpointMixin
from negmas.common import (
DEFAULT_JAVA_PORT,
MechanismAction,
MechanismState,
NegotiatorInfo,
NegotiatorMechanismInterface,
TraceElement,
)
from negmas.events import Event, EventSource
from negmas.helpers import snake_case
from negmas.helpers.misc import get_free_tcp_port
from negmas.helpers.strings import humanize_time
from negmas.negotiators import Negotiator
from negmas.outcomes import Outcome
from negmas.outcomes.common import check_one_and_only, ensure_os
from negmas.outcomes.protocols import OutcomeSpace
from negmas.preferences import (
kalai_points,
nash_points,
pareto_frontier,
pareto_frontier_bf,
)
from negmas.preferences.crisp_ufun import UtilityFunction
from negmas.preferences.ops import max_relative_welfare_points, max_welfare_points
from negmas.types import NamedObject
if TYPE_CHECKING:
from negmas.outcomes.base_issue import Issue
from negmas.outcomes.protocols import DiscreteOutcomeSpace
from negmas.preferences import Preferences
from negmas.preferences.base_ufun import BaseUtilityFunction
__all__ = ["Mechanism", "MechanismStepResult", "Traceable"]
TState = TypeVar("TState", bound=MechanismState)
TAction = TypeVar("TAction", bound=MechanismAction)
TNMI = TypeVar("TNMI", bound=NegotiatorMechanismInterface)
TNegotiator = TypeVar("TNegotiator", bound=Negotiator)
[docs]
@define(frozen=True)
class MechanismStepResult(Generic[TState]):
"""
Represents the results of a negotiation step.
This is what `round()` should return.
"""
state: TState
"""The returned state."""
completed: bool = True
"""Whether the current round is completed or not."""
broken: bool = False
"""True only if END_NEGOTIATION was selected by one negotiator."""
timedout: bool = False
"""True if a timeout occurred."""
agreement: Outcome | None = None
"""The agreement if any. Allows for a single outcome or a collection of outcomes."""
error: bool = False
"""True if an error occurred in the mechanism."""
error_details: str = ""
"""Error message."""
waiting: bool = False
"""Whether to consider that the round is still running and call the round
method again without increasing the step number."""
exceptions: dict[str, list[str]] | None = None
"""A mapping from negotiator ID to a list of exceptions raised by that
negotiator in this round."""
times: dict[str, float] | None = None
"""A mapping from negotiator ID to the time it consumed during this round."""
[docs]
class Mechanism(
NamedObject,
EventSource,
CheckpointMixin,
Generic[TNMI, TState, TAction, TNegotiator],
ABC,
):
"""Base class for all negotiation Mechanisms.
Override the `round` function of this class to implement a round of your mechanism
Args:
outcome_space: The negotiation agenda
outcomes: list of outcomes (optional as you can pass `issues`). If an int then it is the number of outcomes
n_steps: Number of rounds allowed (None means infinity)
time_limit: Number of real seconds allowed (None means infinity)
pend: Probability of ending the negotiation at any step
pend_per_second: Probability of ending the negotiation every second
hidden_time_limit: Number of real seconds allowed but not visilbe to the negotiators
max_n_negotiators: Maximum allowed number of negotiators.
dynamic_entry: Allow negotiators to enter/leave negotiations between rounds
cache_outcomes: If true, a list of all possible outcomes will be cached
max_cardinality: The maximum allowed number of outcomes in the cached set
annotation: Arbitrary annotation
checkpoint_every: The number of steps to checkpoint after. Set to <= 0 to disable
checkpoint_folder: The folder to save checkpoints into. Set to None to disable
checkpoint_filename: The base filename to use for checkpoints (multiple checkpoints will be prefixed with
step number).
single_checkpoint: If true, only the most recent checkpoint will be saved.
extra_checkpoint_info: Any extra information to save with the checkpoint in the corresponding json file as
a dictionary with string keys
exist_ok: IF true, checkpoints override existing checkpoints with the same filename.
name: Name of the mechanism session. Should be unique. If not given, it will be generated.
genius_port: the port used to connect to Genius for all negotiators in this mechanism (0 means any).
id: An optional system-wide unique identifier. You should not change
the default value except in special circumstances like during
serialization and should always guarantee system-wide uniquness
if you set this value explicitly
"""
def __init__(
self,
initial_state: TState | None = None,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | int | None = None,
n_steps: int | float | None = None,
time_limit: float | None = None,
pend: float = 0,
pend_per_second: float = 0,
step_time_limit: float | None = None,
negotiator_time_limit: float | None = None,
hidden_time_limit: float = float("inf"),
max_n_negotiators: int | None = None,
dynamic_entry=False,
annotation: dict[str, Any] | None = None,
nmi_factory: type[TNMI] = NegotiatorMechanismInterface,
extra_callbacks=False,
checkpoint_every: int = 1,
checkpoint_folder: PathLike | None = None,
checkpoint_filename: str | None = None,
extra_checkpoint_info: dict[str, Any] | None = None,
single_checkpoint: bool = True,
exist_ok: bool = True,
name=None,
genius_port: int = DEFAULT_JAVA_PORT,
id: str | None = None,
type_name: str | None = None,
verbosity: int = 0,
ignore_negotiator_exceptions=False,
):
check_one_and_only(outcome_space, issues, outcomes)
outcome_space = ensure_os(outcome_space, issues, outcomes)
self.__verbosity = verbosity
self._negotiator_logs: dict[str, list[dict[str, Any]]] = defaultdict(list)
super().__init__(name=name, id=id, type_name=type_name)
self.ignore_negotiator_exceptions = ignore_negotiator_exceptions
self._negotiator_times = defaultdict(float)
CheckpointMixin.checkpoint_init(
self,
step_attrib="_step",
every=checkpoint_every,
folder=checkpoint_folder,
filename=checkpoint_filename,
info=extra_checkpoint_info,
exist_ok=exist_ok,
single=single_checkpoint,
)
self.__last_second_tried = 0
self._hidden_time_limit = (
hidden_time_limit if hidden_time_limit is not None else float("inf")
)
time_limit = time_limit if time_limit is not None else float("inf")
step_time_limit = (
step_time_limit if step_time_limit is not None else float("inf")
)
negotiator_time_limit = (
negotiator_time_limit if negotiator_time_limit is not None else float("inf")
)
# parameters fixed for all runs
self.set_id(str(uuid.uuid4()))
if n_steps == float("inf"):
n_steps = None
if isinstance(n_steps, float):
n_steps = int(n_steps)
if pend is None:
pend = 0
if pend_per_second is None:
pend_per_second = 0
self.nmi = nmi_factory(
id=self.id,
n_outcomes=outcome_space.cardinality,
outcome_space=outcome_space,
time_limit=time_limit,
pend=pend,
pend_per_second=pend_per_second,
n_steps=n_steps,
step_time_limit=step_time_limit,
negotiator_time_limit=negotiator_time_limit,
dynamic_entry=dynamic_entry,
max_n_negotiators=max_n_negotiators,
annotation=annotation if annotation is not None else dict(),
_mechanism=self,
)
self._current_state = initial_state if initial_state else MechanismState() # type: ignore This is a shortcut to allow users to create mechanisms without passing any initial_state
self._current_state: TState
self._history: list[TState] = []
self._stats: dict[str, Any] = dict()
self._stats["round_times"] = list()
self._stats["times"] = defaultdict(float)
self._stats["exceptions"] = defaultdict(list)
# if self.nmi.issues is not None:
# self.nmi.issues = tuple(self.nmi.issues)
# if self.nmi.outcomes is not None:
# self.nmi.outcomes = tuple(self.nmi.outcomes)
self._requirements = {}
self._negotiators: list[TNegotiator] = []
self._negotiator_map: dict[str, TNegotiator] = dict()
self._negotiator_index: dict[str, int] = dict()
self._roles = []
self._start_time = None
self.__discrete_os = None
self.__discrete_outcomes = None
self._extra_callbacks = extra_callbacks
self.negotiators_of_role = defaultdict(list)
self.role_of_negotiator = {}
# mechanisms do not differentiate between RANDOM_JAVA_PORT and ANY_JAVA_PORT.
# if either is given as the genius_port, it will fix a port and all negotiators
# that are not explicitly assigned to a port (by passing port>0 to them) will just
# use that port.
self.genius_port = genius_port if genius_port > 0 else get_free_tcp_port()
self.params: dict[str, Any] = dict(
dynamic_entry=dynamic_entry, genius_port=genius_port, annotation=annotation
)
[docs]
def log(self, nid: str, data: dict[str, Any], level: str) -> None:
"""Saves a log for a negotiator"""
d = data | dict(
step=self.current_step,
relative_time=self.relative_time,
time=self.time,
level=level,
)
self._negotiator_logs[nid].append(d)
[docs]
def log_info(self, nid: str, data: dict[str, Any]) -> None:
"""Logs at info level"""
self.log(nid, level="info", data=data)
[docs]
def log_debug(self, nid: str, data: dict[str, Any]) -> None:
"""Logs at debug level"""
self.log(nid, level="debug", data=data)
[docs]
def log_warning(self, nid: str, data: dict[str, Any]) -> None:
"""Logs at warning level"""
self.log(nid, level="warning", data=data)
[docs]
def log_error(self, nid: str, data: dict[str, Any]) -> None:
"""Logs at error level"""
self.log(nid, level="error", data=data)
[docs]
def log_critical(self, nid: str, data: dict[str, Any]) -> None:
"""Logs at critical level"""
self.log(nid, level="critical", data=data)
@property
def negotiator_times(self) -> dict[str, float]:
"""The total time consumed by every negotiator.
Each mechanism class is responsible of updating this for any activities of the negotiator it controls.
"""
return self._negotiator_times
@property
def negotiators(self) -> list[TNegotiator]:
return self._negotiators
@property
def participants(self) -> list[NegotiatorInfo]:
"""Returns a list of all participant names."""
return [
NegotiatorInfo(name=_.name, id=_.id, type=snake_case(_.__class__.__name__))
for _ in self.negotiators
]
[docs]
def is_valid(self, outcome: Outcome):
"""Checks whether the outcome is valid given the issues."""
return outcome in self.nmi.outcome_space
@property
def outcome_space(self) -> OutcomeSpace:
return self.nmi.outcome_space
[docs]
def discrete_outcome_space(
self, levels: int = 5, max_cardinality: int = 10_000_000_000
) -> DiscreteOutcomeSpace:
"""Returns a stable discrete version of the given outcome-space."""
if self.__discrete_os:
return self.__discrete_os
self.__discrete_os = self.outcome_space.to_discrete(
levels=levels,
max_cardinality=max_cardinality
if max_cardinality is not None
else float("inf"),
)
return self.__discrete_os
@property
def outcomes(self):
return self.nmi.outcomes
[docs]
def discrete_outcomes(
self, levels: int = 5, max_cardinality: int | float = float("inf")
) -> list[Outcome]:
"""A discrete set of outcomes that spans the outcome space.
Args:
max_cardinality: The maximum number of outcomes to return. If None, all outcomes will be returned for discrete issues
and *10_000* if any of the issues was continuous
Returns:
list[Outcome]: list of `n` or less outcomes
"""
if self.outcomes is not None:
return list(self.outcomes)
if self.__discrete_outcomes:
return self.__discrete_outcomes
self.__discrete_os = self.outcome_space.to_discrete(
levels=levels,
max_cardinality=max_cardinality
if max_cardinality is not None
else float("inf"),
)
self.__discrete_outcomes = list(self.__discrete_os.enumerate_or_sample())
return self.__discrete_outcomes
[docs]
def random_outcomes(
self, n: int = 1, with_replacement: bool = False
) -> list[Outcome]:
"""Returns random outcomes.
Args:
n: Number of outcomes to generate
with_replacement: If true, outcomes may be repeated
Returns:
A list of outcomes of at most n outcomes.
Remarks:
- If the number of outcomes `n` cannot be satisfied, a smaller number will be returned
- Sampling is done without replacement (i.e. returned outcomes are unique).
"""
return list(
self.outcome_space.sample(
n, with_replacement=with_replacement, fail_if_not_enough=False
)
)
[docs]
def random_outcome(self) -> Outcome:
"""Returns a single random offer"""
return self.outcome_space.random_outcome()
@property
def time(self) -> float:
"""Elapsed time since mechanism started in seconds.
0.0 if the mechanism did not start running
"""
if self._start_time is None:
return 0.0
return time.perf_counter() - self._start_time
@property
def remaining_time(self) -> float | None:
"""
Returns remaining time in seconds.
None if no time limit is given.
"""
if self.nmi.time_limit == float("+inf"):
return None
if not self._start_time:
return self.nmi.time_limit
limit = self.nmi.time_limit - (time.perf_counter() - self._start_time)
if limit < 0.0:
return 0.0
return limit
@property
def expected_remaining_time(self) -> float | None:
"""
Returns remaining time in seconds (expectation).
None if no time limit or pend_per_second is given.
"""
rem = self.remaining_time
pend = self.nmi.pend_per_second
if pend <= 0:
return rem
return min(rem, (1 / pend)) if rem is not None else (1 / pend)
@property
def relative_time(self) -> float:
"""
Returns a number between ``0`` and ``1`` indicating elapsed relative time or steps.
Remarks:
- If pend or pend_per_second are defined in the `NegotiatorMechanismInterface`,
and time_limit/n_steps are not given, this becomes an expectation that is limited above by one.
"""
n_steps = self.nmi.n_steps
time_limit = self.nmi.time_limit
if time_limit == float("+inf") and n_steps is None:
if self.nmi.pend <= 0 and self.nmi.pend_per_second <= 0:
return 0.0
if self.nmi.pend > 0:
n_steps = int(math.ceil(1 / self.nmi.pend))
if self.nmi.pend_per_second > 0:
time_limit = 1 / self.nmi.pend_per_second
relative_step = (
(self._current_state.step + 1) / (n_steps + 1)
if n_steps is not None
else -1.0
)
relative_time = self.time / time_limit if time_limit is not None else -1.0
return min(1.0, max([relative_step, relative_time]))
@property
def expected_relative_time(self) -> float:
"""
Returns a positive number indicating elapsed relative time or steps.
Remarks:
- This is relative to the expected time/step at which the negotiation ends given all timing
conditions (time_limit, n_step, pend, pend_per_second).
"""
n_steps = self.nmi.n_steps
time_limit = self.nmi.time_limit
if time_limit == float("+inf") and n_steps is None:
if self.nmi.pend <= 0 and self.nmi.pend_per_second <= 0:
return 0.0
if n_steps is None:
# set the expected number of steps to the reciprocal of the probability of ending at every step
n_steps = (
int(math.ceil(1 / self.nmi.pend)) if self.nmi.pend > 0 else n_steps
)
else:
n_steps = min(int(math.ceil(1 / self.nmi.pend)), n_steps)
if time_limit == float("inf"):
# set the expected number of seconds to the reciprocal of the probability of ending every second
time_limit = (
int(math.ceil(1 / self.nmi.pend_per_second))
if self.nmi.pend_per_second > 0
else time_limit
)
else:
time_limit = (
min(time_limit, int(math.ceil(1 / self.nmi.pend_per_second)))
if self.nmi.pend_per_second > 0
else time_limit
)
relative_step = (
(self._current_state.step + 1) / (n_steps + 1)
if n_steps is not None
else -1.0
)
relative_time = self.time / time_limit if time_limit is not None else -1.0
return max([relative_step, relative_time])
@property
def remaining_steps(self) -> int | None:
"""
Returns the remaining number of steps until the end of the mechanism run.
None if unlimited
"""
if self.nmi.n_steps is None:
return None
return self.nmi.n_steps - self._current_state.step
@property
def expected_remaining_steps(self) -> int | None:
"""
Returns the expected remaining number of steps until the end of the mechanism run.
None if unlimited
"""
rem = self.remaining_steps
pend = self.nmi.pend
if pend <= 0:
return rem
return (
min(rem, int(math.ceil(1 / pend)))
if rem is not None
else int(math.ceil(1 / pend))
)
@property
def requirements(self):
"""A dictionary specifying the requirements that must be in the
capabilities of any negotiator to join the mechanism."""
return self._requirements
@requirements.setter
def requirements(
self,
requirements: dict[
str,
(
tuple[int | float | str, int | float | str]
| list
| set
| int
| float
| str
),
],
):
self._requirements = {
k: set(v) if isinstance(v, list) else v for k, v in requirements.items()
}
[docs]
def is_satisfying(self, capabilities: dict) -> bool:
"""Checks if the given capabilities are satisfying mechanism
requirements.
Args:
capabilities: capabilities to check
Returns:
bool are the requirements satisfied by the capabilities.
Remarks:
- Requirements are also a dict with the following meanings:
- tuple: Min and max acceptable values
- list/set: Any value in the iterable is acceptable
- Single value: The capability must match this value
- Capabilities can also have the same three possibilities.
"""
requirements = self.requirements
for r, v in requirements.items():
if v is None:
if r not in capabilities.keys():
return False
else:
continue
if r not in capabilities.keys():
return False
if capabilities[r] is None:
continue
c = capabilities[r]
if isinstance(c, tuple):
# c is range
if isinstance(v, tuple):
# both ranges
match = v[0] <= c[0] <= v[1] or v[0] <= c[1] <= v[1]
else:
# c is range and cutoff_utility is not a range
match = (
any(c[0] <= _ <= c[1] for _ in v)
if isinstance(v, set)
else c[0] <= v <= c[1]
)
elif isinstance(c, list) or isinstance(c, set):
# c is list
if isinstance(v, tuple):
# c is a list and cutoff_utility is a range
match = any(v[0] <= _ <= v[1] for _ in c)
else:
# c is a list and cutoff_utility is not a range
match = any(_ in v for _ in c) if isinstance(v, set) else v in c
else:
# c is a single value
if isinstance(v, tuple):
# c is a singlton and cutoff_utility is a range
match = v[0] <= c <= v[1]
else:
# c is a singlton and cutoff_utility is not a range
match = c in v if isinstance(v, set) else c == v
if not match:
return False
return True
[docs]
def can_participate(self, negotiator: TNegotiator) -> bool:
"""Checks if the negotiator can participate in this type of negotiation in
general.
Returns:
bool: True if the negotiator can participate
Remarks:
The only reason this may return `False` is if the mechanism requires some requirements
that are not within the capabilities of the negotiator.
When evaluating compatibility, the negotiator is considered incapable of participation if any
of the following conditions hold:
* A mechanism requirement is not in the capabilities of the negotiator
* A mechanism requirement is in the capabilities of the negotiator by the values required for it
is not in the values announced by the negotiator.
An negotiator that lists a `None` value for a capability is announcing that it can work with all its
values. On the other hand, a mechanism that lists a requirement as None announces that it accepts
any value for this requirement as long as it exist in the negotiator
"""
return self.is_satisfying(negotiator.capabilities)
[docs]
def can_accept_more_negotiators(self) -> bool:
"""Whether the mechanism can **currently** accept more negotiators."""
return (
True
if self.nmi.max_n_negotiators is None or self._negotiators is None
else len(self._negotiators) < self.nmi.max_n_negotiators
)
[docs]
def can_enter(self, negotiator: TNegotiator) -> bool:
"""Whether the negotiator can enter the negotiation now."""
return self.can_accept_more_negotiators() and self.can_participate(negotiator)
# def extra_state(self) -> dict[str, Any] | None:
# """Returns any extra state information to be kept in the `state` and `history` properties"""
# return dict()
@property
def state(self) -> TState:
"""Returns the current state.
Override `extra_state` if you want to keep extra state
"""
return self._current_state
def _get_nmi(self, negotiator: TNegotiator) -> TNMI:
_ = negotiator
return self.nmi
def _get_ami(self, negotiator: TNegotiator) -> TNMI:
_ = negotiator
warnings.deprecated("_get_ami is depricated. Use `get_nmi` instead of it")
return self.nmi
[docs]
def add(
self,
negotiator: TNegotiator,
*,
preferences: Preferences | None = None,
role: str | None = None,
ufun: BaseUtilityFunction | None = None,
) -> bool | None:
"""Add an negotiator to the negotiation.
Args:
negotiator: The negotiator to be added.
preferences: The utility function to use. If None, then the negotiator must already have a stored
utility function otherwise it will fail to enter the negotiation.
ufun: [depricated] same as preferences but must be a `UFun` object.
role: The role the negotiator plays in the negotiation mechanism. It is expected that mechanisms inheriting from
this class will check this parameter to ensure that the role is a valid role and is still possible for
negotiators to join on that role. Roles may include things like moderator, representative etc based
on the mechanism
Returns:
* True if the negotiator was added.
* False if the negotiator was already in the negotiation.
* None if the negotiator cannot be added. This can happen in the following cases:
1. The capabilities of the negotiator do not match the requirements of the negotiation
2. The outcome-space of the negotiator's preferences do not contain the outcome-space of the negotiation
3. The negotiator refuses to join (by returning False from its `join` method) see `Negotiator.join` for possible reasons of that
"""
from negmas.preferences import (
BaseUtilityFunction,
MappingUtilityFunction,
Preferences,
)
if ufun is not None:
if not isinstance(ufun, BaseUtilityFunction):
ufun = MappingUtilityFunction(ufun, outcome_space=self.outcome_space)
preferences = ufun
if (
preferences is not None
and not isinstance(preferences, Preferences)
and isinstance(preferences, Callable)
):
preferences = MappingUtilityFunction(
preferences, outcome_space=self.outcome_space
)
if (
preferences
and preferences.outcome_space
and self.outcome_space not in preferences.outcome_space
):
return None
if not self.can_enter(negotiator):
return None
if negotiator in self._negotiators:
return False
if role is None:
role = "negotiator"
if negotiator.join(
nmi=self._get_nmi(negotiator),
state=self.state,
preferences=preferences,
role=role,
):
self._negotiators.append(negotiator)
self._current_state.n_negotiators += 1
self._negotiator_map[negotiator.id] = negotiator
self._negotiator_index[negotiator.id] = len(self._negotiators) - 1
self._roles.append(role)
self.role_of_negotiator[negotiator.uuid] = role
self.negotiators_of_role[role].append(negotiator)
return True
return None
[docs]
def get_negotiator(self, source: str) -> Negotiator | None:
"""Returns the negotiator with the given ID if present in the
negotiation."""
return self._negotiator_map.get(source, None)
[docs]
def get_negotiator_raise(self, source: str) -> Negotiator:
"""Returns the negotiator with the given ID if present in the
negotiation otherwise it raises an exception."""
return self._negotiator_map[source]
[docs]
def can_leave(self, negotiator: Negotiator) -> bool:
"""Can the negotiator leave now?"""
return (
True
if self.nmi.dynamic_entry
else not self.nmi.state.running and negotiator in self._negotiators
)
def _call(self, negotiator: TNegotiator, callback: Callable, *args, **kwargs):
result = None
try:
result = callback(*args, **kwargs)
except Exception as e:
if self.ignore_negotiator_exceptions:
pass
else:
self.state.has_error = True
self.state.error_details = str(e)
self.state.erred_negotiator = negotiator.id if negotiator else ""
a = negotiator.owner if negotiator else None
self.state.erred_agent = a.id if a else ""
finally:
return result
[docs]
def remove(self, negotiator: TNegotiator) -> bool | None:
"""Remove the negotiator from the negotiation.
Returns:
* True if the negotiator was removed.
* False if the negotiator was not in the negotiation already.
* None if the negotiator cannot be removed.
"""
if not self.can_leave(negotiator):
return False
n = self._negotiator_map.get(negotiator.id, None)
if n is None:
return False
indx = self._negotiator_index.pop(negotiator.id, None)
if indx is not None:
self._negotiators = self._negotiators[:indx] + self._negotiators[indx + 1 :]
self._negotiator_map.pop(negotiator.id)
if self._extra_callbacks:
strt = time.perf_counter()
self._call(negotiator, negotiator.on_leave, self.nmi.state)
self._negotiator_times[negotiator.id] += time.perf_counter() - strt
return True
[docs]
def add_requirements(self, requirements: dict) -> None:
"""Adds requirements."""
requirements = {
k: set(v) if isinstance(v, list) else v for k, v in requirements.items()
}
if hasattr(self, "_requirements"):
self._requirements.update(requirements)
else:
self._requirements = requirements
[docs]
def remove_requirements(self, requirements: Iterable) -> None:
"""Adds requirements."""
for r in requirements:
if r in self._requirements.keys():
self._requirements.pop(r, None)
[docs]
def negotiator_index(self, source: str) -> int | None:
"""Gets the negotiator index.
Args:
source (str): source
Returns:
int | None:
"""
return self._negotiator_index.get(source, None)
@property
def negotiator_ids(self) -> list[str]:
return [_.id for _ in self._negotiators]
@property
def genius_negotiator_ids(self) -> list[str]:
return [
getattr(_, "java_uuid") if hasattr(_, "java_uuid") else _.id
for _ in self._negotiators
]
[docs]
def genius_id(self, id: str | None) -> str | None:
"""Gets the Genius ID corresponding to the given negotiator if known otherwise its normal ID"""
if id is None:
return None
negotiator = self._negotiator_map.get(id, None)
if not negotiator:
return id
return (
getattr(negotiator, "java_uuid")
if hasattr(negotiator, "java_uuid")
else negotiator.id
)
@property
def agent_ids(self) -> list[str | None]:
return [_.owner.id if _.owner else None for _ in self._negotiators]
@property
def agent_names(self) -> list[str | None]:
return [_.owner.name if _.owner else None for _ in self._negotiators]
@property
def negotiator_names(self) -> list[str]:
return [_.name for _ in self._negotiators]
@property
def agreement(self):
return self._current_state.agreement
@property
def n_outcomes(self):
return self.nmi.n_outcomes
@property
def issues(self) -> list[Issue]:
"""Returns the issues of the outcome space (if defined).
Will raise an exception if the outcome space has no defined
issues
"""
return getattr(self.nmi.outcome_space, "issues")
@property
def completed(self):
"""Ended without timing out (either with agreement or broken by a negotiator)"""
return self.agreement is not None or self._current_state.broken
@property
def ended(self):
"""Ended in any way"""
return self._current_state.ended
@property
def n_steps(self):
return self.nmi.n_steps
@property
def time_limit(self):
return self.nmi.time_limit
@property
def running(self):
return self._current_state.running
@property
def dynamic_entry(self):
return self.nmi.dynamic_entry
@property
def max_n_negotiators(self):
return self.nmi.max_n_negotiators
@property
def state4history(self) -> Any:
"""Returns the state as it should be stored in the history."""
return copy.deepcopy(self._current_state)
def _add_to_history(self, state4history):
if len(self._history) == 0:
self._history.append(state4history)
return
last = self._history[-1]
if last["step"] == state4history:
self._history[-1] = state4history
return
self._history.append(state4history)
[docs]
def on_mechanism_error(self) -> None:
"""Called when there is a mechanism error.
Remarks:
- When overriding this function you **MUST** call the base class version
"""
state = self.state
if self._extra_callbacks:
for a in self.negotiators:
strt = time.perf_counter()
self._call(a, a.on_mechanism_error, state=state)
self._negotiator_times[a.id] += time.perf_counter() - strt
[docs]
def on_negotiation_end(self) -> None:
"""Called at the end of each negotiation.
Remarks:
- When overriding this function you **MUST** call the base class version
"""
state = self.state
for a in self.negotiators:
strt = time.perf_counter()
self._call(a, a._on_negotiation_end, state=state)
self._negotiator_times[a.id] += time.perf_counter() - strt
self.announce(
Event(
type="negotiation_end",
data={
"agreement": self.agreement,
"state": state,
"annotation": self.nmi.annotation,
},
)
)
self.checkpoint_final_step()
@property
def verbosity(self) -> int:
"""
Verbosity level.
- Children of this class should only print if verbosity > 1
"""
return self.__verbosity
[docs]
def on_negotiation_start(self) -> bool:
"""Called before starting the negotiation.
If it returns False then negotiation will end immediately
"""
return True
@property
def atomic_steps(self) -> bool:
"""Is every step corresponding to a single action by a single negotiator"""
return True
[docs]
@abstractmethod
def __call__(
self, state: TState, action: dict[str, TAction] | None = None
) -> MechanismStepResult[TState]:
"""
Implements a single step of the mechanism. Override this!
Args:
state: The mechanism state. When overriding, set the type of this
to the specific `MechanismState` descendent for your mechanism.
action: An optional action (value) of the next negotiator (key). If given, the call
should just execute the action without calling the next negotiator.
Returns:
`MechanismStepResult` showing the result of the negotiation step
"""
...
[docs]
def step(self, action: dict[str, TAction] | None = None) -> TState:
"""Runs a single step of the mechanism.
Returns:
MechanismState: The state of the negotiation *after* the round is conducted
action: An optional action (value) for the next negotiator (key). If given, the call
should just execute the action without calling the next negotiator.
Remarks:
- Every call yields the results of one round (see `round()`)
- If the mechanism was yet to start, it will start it and runs one round
- There is another function (`run()`) that runs the whole mechanism in blocking mode
"""
if self._start_time is None or self._start_time < 0:
self._start_time = time.perf_counter()
if self.__verbosity >= 1:
if self.current_step == 0:
print(
f"{self.name}: Step {self.current_step} starting after {datetime.now()}",
flush=True,
)
else:
_elapsed = time.perf_counter() - self._start_time
remaining = self.expected_remaining_steps
etatime = self.expected_remaining_time
etatime = etatime if etatime is not None else float("inf")
if remaining is not None:
tt = humanize_time(
min((_elapsed * remaining) / self.current_step, etatime)
)
if tt is not None:
_eta = tt + f" {remaining} steps"
else:
_eta = "--"
else:
_eta = "--"
print(
f"{self.name}: Step {self.current_step} starting after {humanize_time(_elapsed, show_ms=True)} [ETA {_eta}]",
flush=True,
end="\r" if self.verbosity == 1 else "\n",
)
self.checkpoint_on_step_started()
state = self.state
state4history = self.state4history
rs, rt = random.random(), 2
# end with a timeout if condition is met
current_time = self.time
if self.__last_second_tried < int(current_time):
rt, self.__last_second_tried = random.random(), int(current_time)
if (
(current_time > self.time_limit)
or (self.nmi.n_steps and self._current_state.step >= self.nmi.n_steps)
or current_time > self._hidden_time_limit
or rs < self.nmi.pend - 1e-8
or rt < self.nmi.pend_per_second - 1e-8
):
(
self._current_state.running,
self._current_state.broken,
self._current_state.timedout,
) = (False, False, True)
self.on_negotiation_end()
return self.state
# if there is a single negotiator and no other negotiators can be added,
# end without starting
if len(self._negotiators) < 2:
if self.nmi.dynamic_entry:
return self.state
else:
(
self._current_state.running,
self._current_state.broken,
self._current_state.timedout,
) = (False, False, False)
self.on_negotiation_end()
return self.state
# if the mechanism states that it is broken, timedout or ended with
# agreement, report that
if (
self._current_state.broken
or self._current_state.timedout
or self._current_state.agreement is not None
):
self._current_state.running = False
self.on_negotiation_end()
return self.state
if not self._current_state.running:
# if we did not start, just start
self._current_state.running = True
self._current_state.step = 0
self._current_state.relative_time = self.relative_time
self._start_time = time.perf_counter()
self._current_state.started = True
state = self.state
# if the mechanism indicates that it cannot start, keep trying
if self.on_negotiation_start() is False:
(
self._current_state.agreement,
self._current_state.broken,
self._current_state.timedout,
) = (None, False, False)
return self.state
for a in self.negotiators:
strt = time.perf_counter()
self._call(a, a._on_negotiation_start, state=state)
self._negotiator_times[a.id] += time.perf_counter() - strt
self.announce(Event(type="negotiation_start", data=None))
else:
# if no steps are remaining, end with a timeout
remaining_steps, remaining_time = self.remaining_steps, self.remaining_time
if (remaining_steps is not None and remaining_steps <= 0) or (
remaining_time is not None and remaining_time <= 0.0
):
self._current_state.running = False
(
self._current_state.agreement,
self._current_state.broken,
self._current_state.timedout,
) = (None, False, True)
self.on_negotiation_end()
return self.state
# send round start only if the mechanism is not waiting for anyone
# TODO check this.
if not self._current_state.waiting and self._extra_callbacks:
for negotiator in self._negotiators:
strt = time.perf_counter()
self._call(negotiator, negotiator.on_round_start, state=state)
self._negotiator_times[negotiator.id] += time.perf_counter() - strt
# run a round of the mechanism and get the new state
step_start = (
time.perf_counter() if not self._current_state.waiting else self._last_start
)
self._last_start = step_start
self._current_state.waiting = False
try:
result = self(self._current_state, action=action)
except TypeError:
result = self(self._current_state)
self._current_state = result.state
step_time = time.perf_counter() - step_start
self._stats["round_times"].append(step_time)
# if negotaitor times are reported, save them
if result.times:
for k, v in result.times.items():
if v is not None:
self._stats["times"][k] += v
# if negotaitor exceptions are reported, save them
if result.exceptions:
for k, v in result.exceptions.items():
if v:
self._stats["exceptions"][k] += v
# update current state variables from the result of the round just run
(
self._current_state.has_error,
self._current_state.error_details,
self._current_state.waiting,
) = (result.state.has_error, result.state.error_details, result.state.waiting)
if self._current_state.has_error:
self.on_mechanism_error()
if (
self.nmi.step_time_limit is not None
and step_time > self.nmi.step_time_limit
):
(
self._current_state.broken,
self._current_state.timedout,
self._current_state.agreement,
) = (False, True, None)
else:
(
self._current_state.broken,
self._current_state.timedout,
self._current_state.agreement,
) = (result.state.broken, result.state.timedout, result.state.agreement)
if (
(self._current_state.agreement is not None)
or self._current_state.broken
or self._current_state.timedout
):
self._current_state.running = False
# now switch to the new state
state = self.state
if not self._current_state.waiting and result.completed:
state4history = self.state4history
if self._extra_callbacks:
for negotiator in self._negotiators:
strt = time.perf_counter()
self._call(negotiator, negotiator.on_round_end, state=state)
self._negotiator_times[negotiator.id] += time.perf_counter() - strt
self._add_to_history(state4history)
# we only indicate a new step if no one is waiting
self._current_state.step += 1
self._current_state.time = self.time
self._current_state.relative_time = self.relative_time
if not self._current_state.running:
self.on_negotiation_end()
return self.state
def __next__(self) -> TState:
result = self.step()
if not self._current_state.running:
raise StopIteration
return result
[docs]
def abort(self) -> TState:
"""Aborts the negotiation."""
(
self._current_state.has_error,
self._current_state.error_details,
self._current_state.waiting,
) = (True, "Uncaught Exception", False)
self.on_mechanism_error()
(
self._current_state.broken,
self._current_state.timedout,
self._current_state.agreement,
) = (True, False, None)
state = self.state
state4history = self.state4history
self._current_state.running = False
if self._extra_callbacks:
for negotiator in self._negotiators:
strt = time.perf_counter()
self._call(negotiator, negotiator.on_round_end, state=state)
self._negotiator_times[negotiator.id] += time.perf_counter() - strt
self._add_to_history(state4history)
self._current_state.step += 1
self.on_negotiation_end()
return state
[docs]
@classmethod
def runall(
cls,
mechanisms: list[Mechanism] | tuple[Mechanism, ...],
keep_order=True,
method: str = "ordered",
ordering: tuple[int, ...] | None = None,
ordering_fun: Callable[[int, list[TState | None]], int] | None = None,
completion_callback: Callable[[int, Mechanism], None] | None = None,
) -> list[TState | None]:
"""Runs all mechanisms.
Args:
mechanisms: list of mechanisms
keep_order: if True, the mechanisms will be run in order every step otherwise the order will be randomized
at every step. This is only allowed if the method is ordered
method: the method to use for running all the sessions.
Acceptable options are: sequential, ordered, threads, processes
ordering: Controls the order of advancing the negotiations with the "ordered" method.
ordering_fun: A function to implement dynamic ordering for the "ordered" method.
This function receives a list of states and returns the index of the next mechanism to step.
Note that a state may be None if the corresponding mechanism was None and it should never be stepped
completion_callback: A callback to be called at the moment each mechanism is ended
Returns:
- list of states of all mechanisms after completion
- None for any such states indicates disagreements
Remarks:
- sequential means running each mechanism until completion before going to the next
- ordered means stepping mechanisms in some order which can be controlled by `ordering`. If no ordering is given, the ordering is just round-robin
"""
if method == "serial":
warn(
"`serial` method is deprecated. Please use 'ordered' instead.",
DeprecationWarning,
)
method = "ordered"
if method == "sequential":
if not keep_order:
mechanisms = [_ for _ in mechanisms]
random.shuffle(mechanisms)
for i, mechanism in enumerate(mechanisms):
mechanism.run()
if completion_callback:
completion_callback(i, mechanism)
states = [_.state for _ in mechanisms]
elif method == "ordered":
completed = [_ is None for _ in mechanisms]
states: list[TState | None] = [None] * len(mechanisms)
allindices = list(range(len(list(mechanisms))))
indices = allindices if not ordering else list(ordering)
notmentioned = set(allindices).difference(indices)
assert (
len(notmentioned) == 0
), f"Mechanisms {notmentioned} are never mentioned in the ordering."
if ordering_fun:
j = 0
while not all(completed):
states = [_.state for _ in mechanisms]
i = ordering_fun(j, states)
j += 1
mechanism = mechanisms[i]
if completed[i]:
continue
result = mechanism.step()
if result.running:
continue
completed[i] = True
if completion_callback:
completion_callback(i, mechanism)
else:
while not all(completed):
if not keep_order:
random.shuffle(indices)
for i in indices:
mechanism = mechanisms[i]
if completed[i]:
continue
result = mechanism.step()
if result.running:
continue
completed[i] = True
if completion_callback:
completion_callback(i, mechanism)
states[i] = mechanism.state
if all(completed):
break
elif method == "threads":
raise NotImplementedError()
elif method == "processes":
raise NotImplementedError()
else:
raise ValueError(
f"method {method} is unknown. Acceptable options are ordered, sequential, threads, processes"
)
return states
[docs]
@classmethod
def stepall(
cls, mechanisms: list[Mechanism] | tuple[Mechanism, ...], keep_order=True
) -> list[TState]:
"""Step all mechanisms.
Args:
mechanisms: list of mechanisms
keep_order: if True, the mechanisms will be run in order every step otherwise the order will be randomized
at every step
Returns:
- list of states of all mechanisms after completion
"""
indices = list(range(len(list(mechanisms))))
if not keep_order:
random.shuffle(indices)
completed = [_ is None for _ in mechanisms]
for i in indices:
done, mechanism = completed[i], mechanisms[i]
if done:
continue
result = mechanism.step()
if result.running:
continue
completed[i] = True
return [_.state for _ in mechanisms]
[docs]
def run_with_progress(self, timeout=None) -> TState:
if timeout is None:
with Progress() as progress:
task = progress.add_task("Negotiating ...", total=100)
for _ in self:
progress.update(task, completed=int(self.relative_time * 100))
else:
start_time = time.perf_counter()
with Progress() as progress:
task = progress.add_task("Negotiating ...", total=100)
for _ in self:
progress.update(task, completed=int(self.relative_time * 100))
if time.perf_counter() - start_time > timeout:
(
self._current_state.running,
self._current_state.timedout,
self._current_state.broken,
) = (False, True, False)
self.on_negotiation_end()
break
return self.state
[docs]
def run(self, timeout=None) -> TState:
if timeout is None:
for _ in self:
pass
else:
start_time = time.perf_counter()
for _ in self:
if time.perf_counter() - start_time > timeout:
(
self._current_state.running,
self._current_state.timedout,
self._current_state.broken,
) = (False, True, False)
self.on_negotiation_end()
break
return self.state
@property
def history(self) -> list[TState]:
return self._history
@property
def stats(self):
return self._stats
@property
def current_step(self):
return self._current_state.step
def _get_preferences(self) -> list[UtilityFunction]:
preferences = []
for a in self.negotiators:
preferences.append(a.preferences)
return preferences
def _pareto_frontier(
self,
method: Callable,
max_cardinality: int | float = float("inf"),
sort_by_welfare=True,
) -> tuple[list[tuple[float, ...]], list[Outcome]]:
ufuns = tuple(self._get_preferences())
if any(_ is None for _ in ufuns):
raise ValueError(
"Some negotiators have no ufuns. Cannot calcualate the pareto frontier"
)
outcomes = self.discrete_outcomes(max_cardinality=max_cardinality)
points = [tuple(ufun(outcome) for ufun in ufuns) for outcome in outcomes]
reservs = tuple(
_.reserved_value if _ is not None else float("-inf") for _ in ufuns
)
rational_indices = [
i for i, _ in enumerate(points) if all(a >= b for a, b in zip(_, reservs))
]
points = [points[_] for _ in rational_indices]
indices = list(method(points, sort_by_welfare=sort_by_welfare))
frontier = [points[_] for _ in indices]
if frontier is None:
raise ValueError("Could not find the pareto-frontier")
return frontier, [outcomes[rational_indices[_]] for _ in indices]
[docs]
def pareto_frontier_bf(
self, max_cardinality: float = float("inf"), sort_by_welfare=True
) -> tuple[list[tuple[float, ...]], list[Outcome]]:
return self._pareto_frontier(
pareto_frontier_bf, max_cardinality, sort_by_welfare
)
[docs]
def pareto_frontier(
self, max_cardinality: float = float("inf"), sort_by_welfare=True
) -> tuple[tuple[tuple[float, ...], ...], list[Outcome]]:
ufuns = tuple(self._get_preferences())
if any(_ is None for _ in ufuns):
raise ValueError(
"Some negotiators have no ufuns. Cannot calculate the pareto frontier"
)
outcomes = self.discrete_outcomes(max_cardinality=max_cardinality)
results = pareto_frontier(
ufuns,
outcomes=outcomes,
n_discretization=None,
max_cardinality=float("inf"),
sort_by_welfare=sort_by_welfare,
)
return results[0], [outcomes[_] for _ in results[1]]
[docs]
def max_welfare_points(
self,
max_cardinality: float = float("inf"),
frontier: tuple[tuple[float, ...], ...] | None = None,
frontier_outcomes: list[Outcome] | None = None,
) -> tuple[tuple[tuple[float, ...], Outcome], ...]:
ufuns = self._get_preferences()
if not frontier:
frontier, frontier_outcomes = self.pareto_frontier(max_cardinality)
assert frontier_outcomes is not None
# outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality))
kalai_pts = max_welfare_points(
ufuns, frontier, outcome_space=self.outcome_space
)
return tuple(
(kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts
)
[docs]
def max_relative_welfare_points(
self,
max_cardinality: float = float("inf"),
frontier: tuple[tuple[float, ...], ...] | None = None,
frontier_outcomes: list[Outcome] | None = None,
) -> tuple[tuple[tuple[float, ...], Outcome], ...]:
ufuns = self._get_preferences()
if not frontier:
frontier, frontier_outcomes = self.pareto_frontier(max_cardinality)
assert frontier_outcomes is not None
# outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality))
kalai_pts = max_relative_welfare_points(
ufuns, frontier, outcome_space=self.outcome_space
)
return tuple(
(kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts
)
[docs]
def modified_kalai_points(
self,
max_cardinality: float = float("inf"),
frontier: tuple[tuple[float, ...], ...] | None = None,
frontier_outcomes: list[Outcome] | None = None,
) -> tuple[tuple[tuple[float, ...], Outcome], ...]:
ufuns = self._get_preferences()
if not frontier:
frontier, frontier_outcomes = self.pareto_frontier(max_cardinality)
assert frontier_outcomes is not None
# outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality))
kalai_pts = kalai_points(
ufuns,
frontier,
outcome_space=self.outcome_space,
subtract_reserved_value=False,
)
return tuple(
(kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts
)
[docs]
def kalai_points(
self,
max_cardinality: float = float("inf"),
frontier: tuple[tuple[float, ...], ...] | None = None,
frontier_outcomes: list[Outcome] | None = None,
) -> tuple[tuple[tuple[float, ...], Outcome], ...]:
ufuns = self._get_preferences()
if not frontier:
frontier, frontier_outcomes = self.pareto_frontier(max_cardinality)
assert frontier_outcomes is not None
# outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality))
kalai_pts = kalai_points(
ufuns,
frontier,
outcome_space=self.outcome_space,
subtract_reserved_value=True,
)
return tuple(
(kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts
)
[docs]
def nash_points(
self,
max_cardinality: float = float("inf"),
frontier: tuple[tuple[float, ...], ...] | None = None,
frontier_outcomes: list[Outcome] | None = None,
) -> tuple[tuple[tuple[float, ...], Outcome], ...]:
ufuns = self._get_preferences()
if not frontier:
frontier, frontier_outcomes = self.pareto_frontier(max_cardinality)
assert frontier_outcomes is not None
# outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality))
nash_pts = nash_points(ufuns, frontier, outcome_space=self.outcome_space)
return tuple(
(nash_utils, frontier_outcomes[indx]) for nash_utils, indx in nash_pts
)
[docs]
def plot(self, **kwargs) -> Any:
"""A method for plotting a negotiation session."""
_ = kwargs
def __iter__(self):
return self
def __str__(self):
d = self.__dict__.copy()
return pprint.pformat(d)
__repr__ = __str__
[docs]
@runtime_checkable
class Traceable(Protocol):
"""A mechanism that can generate a trace"""
@property
def full_trace(self) -> list[TraceElement]:
"""Returns the negotiation history as a list of relative_time/step/negotiator/offer tuples"""
...
[docs]
def negotiator_full_trace(self, negotiator_id: str) -> list[TraceElement]:
"""Returns the (time/relative-time/step/outcome/response) given by a negotiator (in order)"""
...