Source code for negmas.mechanisms

"""Provides interfaces for defining negotiation mechanisms."""
from __future__ import annotations

import copy
import math
import pprint
import random
import time
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
from os import PathLike
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence

from attrs import define
from rich.progress import Progress

from negmas import warnings
from negmas.checkpoints import CheckpointMixin
from negmas.common import (
    DEFAULT_JAVA_PORT,
    Action,
    MechanismState,
    NegotiatorInfo,
    NegotiatorMechanismInterface,
    ReactiveStrategy,
)
from negmas.events import Event, EventSource
from negmas.helpers import snake_case
from negmas.helpers.misc import get_free_tcp_port
from negmas.helpers.strings import humanize_time
from negmas.negotiators import Negotiator
from negmas.outcomes import Outcome
from negmas.outcomes.common import check_one_and_only, ensure_os
from negmas.outcomes.protocols import OutcomeSpace
from negmas.preferences import (
    kalai_points,
    nash_points,
    pareto_frontier,
    pareto_frontier_bf,
)
from negmas.preferences.ops import max_relative_welfare_points, max_welfare_points
from negmas.types import NamedObject

if TYPE_CHECKING:
    from negmas.outcomes.base_issue import Issue
    from negmas.outcomes.protocols import DiscreteOutcomeSpace
    from negmas.preferences import Preferences
    from negmas.preferences.base_ufun import BaseUtilityFunction

__all__ = ["Mechanism", "MechanismStepResult"]


[docs] @define(frozen=True) class MechanismStepResult: """ Represents the results of a negotiation step. This is what `round()` should return. """ state: MechanismState """The returned state.""" completed: bool = True """Whether the current round is completed or not.""" broken: bool = False """True only if END_NEGOTIATION was selected by one agent.""" timedout: bool = False """True if a timeout occurred.""" agreement: Outcome | None = None """The agreement if any. Allows for a single outcome or a collection of outcomes.""" error: bool = False """True if an error occurred in the mechanism.""" error_details: str = "" """Error message.""" waiting: bool = False """Whether to consider that the round is still running and call the round method again without increasing the step number.""" exceptions: dict[str, list[str]] | None = None """A mapping from negotiator ID to a list of exceptions raised by that negotiator in this round.""" times: dict[str, float] | None = None """A mapping from negotiator ID to the time it consumed during this round."""
# noinspection PyAttributeOutsideInit
[docs] class Mechanism(NamedObject, EventSource, CheckpointMixin, ABC): """Base class for all negotiation Mechanisms. Override the `round` function of this class to implement a round of your mechanism Args: outcome_space: The negotiation agenda outcomes: list of outcomes (optional as you can pass `issues`). If an int then it is the number of outcomes n_steps: Number of rounds allowed (None means infinity) time_limit: Number of real seconds allowed (None means infinity) pend: Probability of ending the negotiation at any step pend_per_second: Probability of ending the negotiation every second hidden_time_limit: Number of real seconds allowed but not visilbe to the negotiators max_n_agents: Maximum allowed number of agents dynamic_entry: Allow agents to enter/leave negotiations between rounds cache_outcomes: If true, a list of all possible outcomes will be cached max_cardinality: The maximum allowed number of outcomes in the cached set annotation: Arbitrary annotation checkpoint_every: The number of steps to checkpoint after. Set to <= 0 to disable checkpoint_folder: The folder to save checkpoints into. Set to None to disable checkpoint_filename: The base filename to use for checkpoints (multiple checkpoints will be prefixed with step number). single_checkpoint: If true, only the most recent checkpoint will be saved. extra_checkpoint_info: Any extra information to save with the checkpoint in the corresponding json file as a dictionary with string keys exist_ok: IF true, checkpoints override existing checkpoints with the same filename. name: Name of the mechanism session. Should be unique. If not given, it will be generated. genius_port: the port used to connect to Genius for all negotiators in this mechanism (0 means any). id: An optional system-wide unique identifier. You should not change the default value except in special circumstances like during serialization and should always guarantee system-wide uniquness if you set this value explicitly """ def __init__( self, initial_state: MechanismState | None = None, outcome_space: OutcomeSpace | None = None, issues: Sequence[Issue] | None = None, outcomes: Sequence[Outcome] | int | None = None, n_steps: int | float | None = None, time_limit: float | None = None, pend: float = 0, pend_per_second: float = 0, hidden_time_limit: float = float("inf"), step_time_limit: float | None = None, negotiator_time_limit: float | None = None, max_n_agents: int | None = None, dynamic_entry=False, annotation: dict[str, Any] | None = None, nmi_factory=NegotiatorMechanismInterface, extra_callbacks=False, checkpoint_every: int = 1, checkpoint_folder: PathLike | None = None, checkpoint_filename: str | None = None, extra_checkpoint_info: dict[str, Any] | None = None, single_checkpoint: bool = True, exist_ok: bool = True, name=None, genius_port: int = DEFAULT_JAVA_PORT, id: str | None = None, type_name: str | None = None, verbosity: int = 0, ): check_one_and_only(outcome_space, issues, outcomes) outcome_space = ensure_os(outcome_space, issues, outcomes) self.__verbosity = verbosity super().__init__(name, id=id, type_name=type_name) CheckpointMixin.checkpoint_init( self, step_attrib="_step", every=checkpoint_every, folder=checkpoint_folder, filename=checkpoint_filename, info=extra_checkpoint_info, exist_ok=exist_ok, single=single_checkpoint, ) self.__last_second_tried = 0 self._hidden_time_limit = hidden_time_limit time_limit = time_limit if time_limit is not None else float("inf") step_time_limit = ( step_time_limit if step_time_limit is not None else float("inf") ) negotiator_time_limit = ( negotiator_time_limit if negotiator_time_limit is not None else float("inf") ) # parameters fixed for all runs self.id = str(uuid.uuid4()) if n_steps == float("inf"): n_steps = None if isinstance(n_steps, float): n_steps = int(n_steps) self.nmi = nmi_factory( id=self.id, n_outcomes=outcome_space.cardinality, outcome_space=outcome_space, time_limit=time_limit, pend=pend, pend_per_second=pend_per_second, n_steps=n_steps, step_time_limit=step_time_limit, negotiator_time_limit=negotiator_time_limit, dynamic_entry=dynamic_entry, max_n_agents=max_n_agents, annotation=annotation if annotation is not None else dict(), mechanism=self, ) self._current_state = initial_state if initial_state else MechanismState() self._history: list[MechanismState] = [] self._stats: dict[str, Any] = dict() self._stats["round_times"] = list() self._stats["times"] = defaultdict(float) self._stats["exceptions"] = defaultdict(list) # if self.nmi.issues is not None: # self.nmi.issues = tuple(self.nmi.issues) # if self.nmi.outcomes is not None: # self.nmi.outcomes = tuple(self.nmi.outcomes) self._requirements = {} self._negotiators = [] self._negotiator_map: dict[str, Negotiator] = dict() self._negotiator_index: dict[str, int] = dict() self._roles = [] self._start_time = None self.__discrete_os = None self.__discrete_outcomes = None self._extra_callbacks = extra_callbacks self.agents_of_role = defaultdict(list) self.role_of_agent = {} # mechanisms do not differentiate between RANDOM_JAVA_PORT and ANY_JAVA_PORT. # if either is given as the genius_port, it will fix a port and all negotiators # that are not explicitly assigned to a port (by passing port>0 to them) will just # use that port. self.genius_port = genius_port if genius_port > 0 else get_free_tcp_port() self.params: dict[str, Any] = dict( dynamic_entry=dynamic_entry, genius_port=genius_port, annotation=annotation, ) @property def negotiators(self): return self._negotiators @property def participants(self) -> list[NegotiatorInfo]: """Returns a list of all participant names.""" return [ NegotiatorInfo(name=_.name, id=_.id, type=snake_case(_.__class__.__name__)) for _ in self.negotiators ]
[docs] def is_valid(self, outcome: Outcome): """Checks whether the outcome is valid given the issues.""" return outcome in self.nmi.outcome_space
@property def outcome_space(self) -> OutcomeSpace: return self.nmi.outcome_space
[docs] def discrete_outcome_space( self, levels: int = 5, max_cardinality: int = 10_000_000_000 ) -> DiscreteOutcomeSpace: """Returns a stable discrete version of the given outcome-space.""" if self.__discrete_os: return self.__discrete_os self.__discrete_os = self.outcome_space.to_discrete( levels=levels, max_cardinality=max_cardinality if max_cardinality is not None else float("inf"), ) return self.__discrete_os
@property def outcomes(self): return self.nmi.outcomes
[docs] def discrete_outcomes( self, levels: int = 5, max_cardinality: int | float = float("inf") ) -> list[Outcome]: """A discrete set of outcomes that spans the outcome space. Args: max_cardinality: The maximum number of outcomes to return. If None, all outcomes will be returned for discrete issues and *10_000* if any of the issues was continuous Returns: list[Outcome]: list of `n` or less outcomes """ if self.outcomes is not None: return list(self.outcomes) if self.__discrete_outcomes: return self.__discrete_outcomes self.__discrete_os = self.outcome_space.to_discrete( levels=levels, max_cardinality=max_cardinality if max_cardinality is not None else float("inf"), ) self.__discrete_outcomes = list(self.__discrete_os.enumerate_or_sample()) return self.__discrete_outcomes
[docs] def random_outcomes( self, n: int = 1, with_replacement: bool = False ) -> list[Outcome]: """Returns random offers. Args: n: Number of outcomes to generate with_replacement: If true, outcomes may be repeated Returns: A list of outcomes of at most n outcomes. Remarks: - If the number of outcomes `n` cannot be satisfied, a smaller number will be returned - Sampling is done without replacement (i.e. returned outcomes are unique). """ return list( self.outcome_space.sample( n, with_replacement=with_replacement, fail_if_not_enough=False ) )
[docs] def random_outcome(self) -> Outcome: """Returns a single random offer""" return self.outcome_space.random_outcome()
@property def time(self) -> float: """Elapsed time since mechanism started in seconds. 0.0 if the mechanism did not start running """ if self._start_time is None: return 0.0 return time.perf_counter() - self._start_time @property def remaining_time(self) -> float | None: """ Returns remaining time in seconds. None if no time limit is given. """ if self.nmi.time_limit == float("+inf"): return None if not self._start_time: return self.nmi.time_limit limit = self.nmi.time_limit - (time.perf_counter() - self._start_time) if limit < 0.0: return 0.0 return limit @property def expected_remaining_time(self) -> float | None: """ Returns remaining time in seconds (expectation). None if no time limit or pend_per_second is given. """ rem = self.remaining_time pend = self.nmi.pend_per_second if pend <= 0: return rem return min(rem, (1 / pend)) if rem is not None else (1 / pend) @property def relative_time(self) -> float: """ Returns a number between ``0`` and ``1`` indicating elapsed relative time or steps. Remarks: - If pend or pend_per_second are defined in the `NegotiatorMechanismInterface`, and time_limit/n_steps are not given, this becomes an expectation that is limited above by one. """ n_steps = self.nmi.n_steps time_limit = self.nmi.time_limit if time_limit == float("+inf") and n_steps is None: if self.nmi.pend <= 0 and self.nmi.pend_per_second <= 0: return 0.0 if self.nmi.pend > 0: n_steps = int(math.ceil(1 / self.nmi.pend)) if self.nmi.pend_per_second > 0: time_limit = 1 / self.nmi.pend_per_second relative_step = ( (self._current_state.step + 1) / (n_steps + 1) if n_steps is not None else -1.0 ) relative_time = self.time / time_limit if time_limit is not None else -1.0 return min(1.0, max([relative_step, relative_time])) @property def expected_relative_time(self) -> float: """ Returns a positive number indicating elapsed relative time or steps. Remarks: - This is relative to the expected time/step at which the negotiation ends given all timing conditions (time_limit, n_step, pend, pend_per_second). """ n_steps = self.nmi.n_steps time_limit = self.nmi.time_limit if time_limit == float("+inf") and n_steps is None: if self.nmi.pend <= 0 and self.nmi.pend_per_second <= 0: return 0.0 if n_steps is None: # set the expected number of steps to the reciprocal of the probability of ending at every step n_steps = ( int(math.ceil(1 / self.nmi.pend)) if self.nmi.pend > 0 else n_steps ) else: n_steps = min(int(math.ceil(1 / self.nmi.pend)), n_steps) if time_limit == float("inf"): # set the expected number of seconds to the reciprocal of the probability of ending every second time_limit = ( int(math.ceil(1 / self.nmi.pend_per_second)) if self.nmi.pend_per_second > 0 else time_limit ) else: time_limit = ( min(time_limit, int(math.ceil(1 / self.nmi.pend_per_second))) if self.nmi.pend_per_second > 0 else time_limit ) relative_step = ( (self._current_state.step + 1) / (n_steps + 1) if n_steps is not None else -1.0 ) relative_time = self.time / time_limit if time_limit is not None else -1.0 return max([relative_step, relative_time]) @property def remaining_steps(self) -> int | None: """ Returns the remaining number of steps until the end of the mechanism run. None if unlimited """ if self.nmi.n_steps is None: return None return self.nmi.n_steps - self._current_state.step @property def expected_remaining_steps(self) -> int | None: """ Returns the expected remaining number of steps until the end of the mechanism run. None if unlimited """ rem = self.remaining_steps pend = self.nmi.pend if pend <= 0: return rem return ( min(rem, int(math.ceil(1 / pend))) if rem is not None else int(math.ceil(1 / pend)) ) @property def requirements(self): """A dictionary specifying the requirements that must be in the capabilities of any agent to join the mechanism.""" return self._requirements @requirements.setter def requirements( self, requirements: dict[ str, ( tuple[int | float | str, int | float | str] | list | set | int | float | str ), ], ): self._requirements = { k: set(v) if isinstance(v, list) else v for k, v in requirements.items() }
[docs] def is_satisfying(self, capabilities: dict) -> bool: """Checks if the given capabilities are satisfying mechanism requirements. Args: capabilities: capabilities to check Returns: bool are the requirements satisfied by the capabilities. Remarks: - Requirements are also a dict with the following meanings: - tuple: Min and max acceptable values - list/set: Any value in the iterable is acceptable - Single value: The capability must match this value - Capabilities can also have the same three possibilities. """ requirements = self.requirements for r, v in requirements.items(): if v is None: if r not in capabilities.keys(): return False else: continue if r not in capabilities.keys(): return False if capabilities[r] is None: continue c = capabilities[r] if isinstance(c, tuple): # c is range if isinstance(v, tuple): # both ranges match = v[0] <= c[0] <= v[1] or v[0] <= c[1] <= v[1] else: # c is range and cutoff_utility is not a range match = ( any(c[0] <= _ <= c[1] for _ in v) if isinstance(v, set) else c[0] <= v <= c[1] ) elif isinstance(c, list) or isinstance(c, set): # c is list if isinstance(v, tuple): # c is a list and cutoff_utility is a range match = any(v[0] <= _ <= v[1] for _ in c) else: # c is a list and cutoff_utility is not a range match = any(_ in v for _ in c) if isinstance(v, set) else v in c else: # c is a single value if isinstance(v, tuple): # c is a singlton and cutoff_utility is a range match = v[0] <= c <= v[1] else: # c is a singlton and cutoff_utility is not a range match = c in v if isinstance(v, set) else c == v if not match: return False return True
[docs] def can_participate(self, agent: Negotiator) -> bool: """Checks if the agent can participate in this type of negotiation in general. Args: agent: Returns: bool: True if it can Remarks: The only reason this may return `False` is if the mechanism requires some requirements that are not within the capabilities of the agent. When evaluating compatibility, the agent is considered incapable of participation if any of the following conditions hold: * A mechanism requirement is not in the capabilities of the agent * A mechanism requirement is in the capabilities of the agent by the values required for it is not in the values announced by the agent. An agent that lists a `None` value for a capability is announcing that it can work with all its values. On the other hand, a mechanism that lists a requirement as None announces that it accepts any value for this requirement as long as it exist in the agent """ return self.is_satisfying(agent.capabilities)
[docs] def can_accept_more_agents(self) -> bool: """Whether the mechanism can **currently** accept more negotiators.""" return ( True if self.nmi.max_n_agents is None or self._negotiators is None else len(self._negotiators) < self.nmi.max_n_agents )
[docs] def can_enter(self, agent: Negotiator) -> bool: """Whether the agent can enter the negotiation now.""" return self.can_accept_more_agents() and self.can_participate(agent)
# def extra_state(self) -> dict[str, Any] | None: # """Returns any extra state information to be kept in the `state` and `history` properties""" # return dict() @property def state(self) -> MechanismState: """Returns the current state. Override `extra_state` if you want to keep extra state """ return self._current_state def _get_nmi(self, negotiator: Negotiator) -> NegotiatorMechanismInterface: # type: ignore _ = negotiator return self.nmi
[docs] def add( self, negotiator: Negotiator, *, preferences: Preferences | None = None, role: str | None = None, ufun: BaseUtilityFunction | None = None, ) -> bool | None: """Add an agent to the negotiation. Args: negotiator: The agent to be added. preferences: The utility function to use. If None, then the agent must already have a stored utility function otherwise it will fail to enter the negotiation. ufun: [depricated] same as preferences but must be a `UFun` object. role: The role the agent plays in the negotiation mechanism. It is expected that mechanisms inheriting from this class will check this parameter to ensure that the role is a valid role and is still possible for negotiators to join on that role. Roles may include things like moderator, representative etc based on the mechanism Returns: * True if the agent was added. * False if the agent was already in the negotiation. * None if the agent cannot be added. This can happen in the following cases: 1. The capabilities of the negotiator do not match the requirements of the negotiation 2. The outcome-space of the negotiator's preferences do not contain the outcome-space of the negotiation 3. The negotiator refuses to join (by returning False from its `join` method) see `Negotiator.join` for possible reasons of that """ from negmas.preferences import ( BaseUtilityFunction, MappingUtilityFunction, Preferences, ) if ufun is not None: if not isinstance(ufun, BaseUtilityFunction): ufun = MappingUtilityFunction(ufun, outcome_space=self.outcome_space) preferences = ufun if ( preferences is not None and not isinstance(preferences, Preferences) and isinstance(preferences, Callable) ): preferences = MappingUtilityFunction( preferences, outcome_space=self.outcome_space ) if ( preferences and preferences.outcome_space and self.outcome_space not in preferences.outcome_space ): return None if not self.can_enter(negotiator): return None if negotiator in self._negotiators: return False if role is None: role = "negotiator" if negotiator.join( nmi=self._get_nmi(negotiator), state=self.state, preferences=preferences, role=role, ): self._negotiators.append(negotiator) self._current_state.n_negotiators += 1 self._negotiator_map[negotiator.id] = negotiator self._negotiator_index[negotiator.id] = len(self._negotiators) - 1 self._roles.append(role) self.role_of_agent[negotiator.uuid] = role self.agents_of_role[role].append(negotiator) return True return None
[docs] def get_negotiator(self, source: str) -> Negotiator | None: """Returns the negotiator with the given ID if present in the negotiation.""" return self._negotiator_map.get(source, None)
[docs] def can_leave(self, agent: Negotiator) -> bool: """Can the agent leave now?""" return ( True if self.nmi.dynamic_entry else not self.nmi.state.running and agent in self._negotiators )
[docs] def remove(self, negotiator: Negotiator) -> bool | None: """Remove the agent from the negotiation. Args: agent: Returns: * True if the agent was removed. * False if the agent was not in the negotiation already. * None if the agent cannot be removed. """ if not self.can_leave(negotiator): return False n = self._negotiator_map.get(negotiator.id, None) if n is None: return False self._negotiators.remove(negotiator) self._negotiator_map.pop(negotiator.id) self._negotiator_index.pop(negotiator.id) if self._extra_callbacks: negotiator.on_leave(self.nmi.state) return True
[docs] def add_requirements(self, requirements: dict) -> None: """Adds requirements.""" requirements = { k: set(v) if isinstance(v, list) else v for k, v in requirements.items() } if hasattr(self, "_requirements"): self._requirements.update(requirements) else: self._requirements = requirements
[docs] def remove_requirements(self, requirements: Iterable) -> None: """Adds requirements.""" for r in requirements: if r in self._requirements.keys(): self._requirements.pop(r, None)
[docs] def negotiator_index(self, source: str) -> int | None: """Gets the negotiator index. Args: source (str): source Returns: int | None: """ return self._negotiator_index.get(source, None)
@property def negotiator_ids(self) -> list[str]: return [_.id for _ in self._negotiators] @property def genius_negotiator_ids(self) -> list[str]: return [ _.java_uuid if hasattr(_, "java_uuid") else _.id for _ in self._negotiators ]
[docs] def genius_id(self, id: str | None) -> str | None: """Gets the Genius ID corresponding to the given negotiator if known otherwise its normal ID""" if id is None: return None negotiator = self._negotiator_map.get(id, None) if not negotiator: return id return negotiator.java_uuid if hasattr(negotiator, "java_uuid") else negotiator.id # type: ignore
@property def agent_ids(self) -> list[str]: return [_.owner.id for _ in self._negotiators if _.owner] @property def agent_names(self) -> list[str]: return [_.owner.name for _ in self._negotiators if _.owner] @property def negotiator_names(self) -> list[str]: return [_.name for _ in self._negotiators] @property def agreement(self): return self._current_state.agreement @property def n_outcomes(self): return self.nmi.n_outcomes @property def issues(self) -> list[Issue]: """Returns the issues of the outcomespace. Will raise an exception if the outcome space has no defined issues """ return self.nmi.outcome_space.issues # type: ignore @property def completed(self): """Ended without timing out (either with agreement or broken by a negotiator)""" return self.agreement is not None or self._current_state.broken @property def ended(self): """Ended in any way""" return self._current_state.ended @property def n_steps(self): return self.nmi.n_steps @property def time_limit(self): return self.nmi.time_limit @property def running(self): return self._current_state.running @property def dynamic_entry(self): return self.nmi.dynamic_entry @property def max_n_agents(self): return self.nmi.max_n_agents @property def state4history(self) -> Any: """Returns the state as it should be stored in the history.""" return copy.deepcopy(self._current_state) def _add_to_history(self, state4history): if len(self._history) == 0: self._history.append(state4history) return last = self._history[-1] if last["step"] == state4history: self._history[-1] = state4history return self._history.append(state4history)
[docs] def on_mechanism_error(self) -> None: """Called when there is a mechanism error. Remarks: - When overriding this function you **MUST** call the base class version """ state = self.state if self._extra_callbacks: for a in self.negotiators: a.on_mechanism_error(state)
[docs] def on_negotiation_end(self) -> None: """Called at the end of each negotiation. Remarks: - When overriding this function you **MUST** call the base class version """ state = self.state for a in self.negotiators: a._on_negotiation_end(state) self.announce( Event( type="negotiation_end", data={ "agreement": self.agreement, "state": state, "annotation": self.nmi.annotation, }, ) ) self.checkpoint_final_step()
@property def verbosity(self) -> int: """ Verbosity level. - Children of this class should only print if verbosity > 1 """ return self.__verbosity
[docs] def on_negotiation_start(self) -> bool: """Called before starting the negotiation. If it returns False then negotiation will end immediately """ return True
[docs] @abstractmethod def __call__( self, state: MechanismState, action: dict[str, Action | None] | None = None ) -> MechanismStepResult: """ Implements a single step of the mechanism. Override this! Args: state: The mechanism state. When overriding, set the type of this to the specific `MechanismState` descendent for your mechanism. action: An optional action (value) of the next negotiator (key). If given, the call should just execute the action without calling the next negotiator. Returns: `MechanismStepResult` showing the result of the negotiation step """ ...
[docs] def step(self, action: dict[str, Action | None] | None = None) -> MechanismState: """Runs a single step of the mechanism. Returns: MechanismState: The state of the negotiation *after* the round is conducted action: An optional action (value) for the next negotiator (key). If given, the call should just execute the action without calling the next negotiator. Remarks: - Every call yields the results of one round (see `round()`) - If the mechanism was yet to start, it will start it and runs one round - There is another function (`run()`) that runs the whole mechanism in blocking mode """ if self._start_time is None or self._start_time < 0: self._start_time = time.perf_counter() if self.__verbosity >= 1: if self.current_step == 0: print( f"{self.name}: Step {self.current_step} starting after {datetime.now()}", flush=True, ) else: _elapsed = time.perf_counter() - self._start_time remaining = self.expected_remaining_steps etatime = self.expected_remaining_time etatime = etatime if etatime is not None else float("inf") if remaining is not None: _eta = ( humanize_time( min( (_elapsed * remaining) / self.current_step, etatime, ) ) + f" {remaining} steps" ) else: _eta = "--" print( f"{self.name}: Step {self.current_step} starting after {humanize_time(_elapsed, show_ms=True)} [ETA {_eta}]", flush=True, end="\r" if self.verbosity == 1 else "\n", ) self.checkpoint_on_step_started() state = self.state state4history = self.state4history rs, rt = random.random(), 2 # end with a timeout if condition is met current_time = self.time if self.__last_second_tried < int(current_time): rt, self.__last_second_tried = random.random(), int(current_time) if ( (current_time > self.time_limit) or (self.nmi.n_steps and self._current_state.step >= self.nmi.n_steps) or current_time > self._hidden_time_limit or rs < self.nmi.pend - 1e-8 or rt < self.nmi.pend_per_second - 1e-8 ): ( self._current_state.running, self._current_state.broken, self._current_state.timedout, ) = (False, False, True) self.on_negotiation_end() return self.state # if there is a single negotiator and no other negotiators can be added, # end without starting if len(self._negotiators) < 2: if self.nmi.dynamic_entry: return self.state else: ( self._current_state.running, self._current_state.broken, self._current_state.timedout, ) = (False, False, False) self.on_negotiation_end() return self.state # if the mechanism states that it is broken, timedout or ended with # agreement, report that if ( self._current_state.broken or self._current_state.timedout or self._current_state.agreement is not None ): self._current_state.running = False self.on_negotiation_end() return self.state if not self._current_state.running: # if we did not start, just start self._current_state.running = True self._current_state.step = 0 self._start_time = time.perf_counter() self._current_state.started = True state = self.state # if the mechanism indicates that it cannot start, keep trying if self.on_negotiation_start() is False: ( self._current_state.agreement, self._current_state.broken, self._current_state.timedout, ) = (None, False, False) return self.state for a in self.negotiators: a._on_negotiation_start(state=state) self.announce(Event(type="negotiation_start", data=None)) else: # if no steps are remaining, end with a timeout remaining_steps, remaining_time = self.remaining_steps, self.remaining_time if (remaining_steps is not None and remaining_steps <= 0) or ( remaining_time is not None and remaining_time <= 0.0 ): self._current_state.running = False ( self._current_state.agreement, self._current_state.broken, self._current_state.timedout, ) = (None, False, True) self.on_negotiation_end() return self.state # send round start only if the mechanism is not waiting for anyone # TODO check this. if not self._current_state.waiting and self._extra_callbacks: for agent in self._negotiators: agent.on_round_start(state) # run a round of the mechanism and get the new state step_start = ( time.perf_counter() if not self._current_state.waiting else self._last_start ) self._last_start = step_start self._current_state.waiting = False try: result = self(self._current_state, action=action) except TypeError: result = self(self._current_state) self._current_state = result.state step_time = time.perf_counter() - step_start self._stats["round_times"].append(step_time) # if negotaitor times are reported, save them if result.times: for k, v in result.times.items(): if v is not None: self._stats["times"][k] += v # if negotaitor exceptions are reported, save them if result.exceptions: for k, v in result.exceptions.items(): if v: self._stats["exceptions"][k] += v # update current state variables from the result of the round just run ( self._current_state.has_error, self._current_state.error_details, self._current_state.waiting, ) = ( result.state.has_error, result.state.error_details, result.state.waiting, ) if self._current_state.has_error: self.on_mechanism_error() if ( self.nmi.step_time_limit is not None and step_time > self.nmi.step_time_limit ): ( self._current_state.broken, self._current_state.timedout, self._current_state.agreement, ) = (False, True, None) else: ( self._current_state.broken, self._current_state.timedout, self._current_state.agreement, ) = ( result.state.broken, result.state.timedout, result.state.agreement, ) if ( (self._current_state.agreement is not None) or self._current_state.broken or self._current_state.timedout ): self._current_state.running = False # now switch to the new state state = self.state if not self._current_state.waiting and result.completed: state4history = self.state4history if self._extra_callbacks: for agent in self._negotiators: agent.on_round_end(state) self._add_to_history(state4history) # we only indicate a new step if no one is waiting self._current_state.step += 1 self._current_state.time = self.time self._current_state.relative_time = self.relative_time if not self._current_state.running: self.on_negotiation_end() return self.state
def __next__(self) -> MechanismState: result = self.step() if not self._current_state.running: raise StopIteration return result
[docs] def abort(self) -> MechanismState: """Aborts the negotiation.""" ( self._current_state.has_error, self._current_state.error_details, self._current_state.waiting, ) = ( True, "Uncaught Exception", False, ) self.on_mechanism_error() ( self._current_state.broken, self._current_state.timedout, self._current_state.agreement, ) = (True, False, None) state = self.state state4history = self.state4history self._current_state.running = False if self._extra_callbacks: for agent in self._negotiators: agent.on_round_end(state) self._add_to_history(state4history) self._current_state.step += 1 self.on_negotiation_end() return state
[docs] @classmethod def runall( cls, mechanisms: list[Mechanism] | tuple[Mechanism, ...], keep_order=True, method="serial", ) -> list[MechanismState | None]: """Runs all mechanisms. Args: mechanisms: list of mechanisms keep_order: if True, the mechanisms will be run in order every step otherwise the order will be randomized at every step. This is only allowed if the method is serial method: the method to use for running all the sessions. Acceptable options are: serial, threads, processes Returns: - list of states of all mechanisms after completion - None for any such states indicates disagreements """ completed = [_ is None for _ in mechanisms] states: list[MechanismState | None] = [None] * len(mechanisms) indices = list(range(len(list(mechanisms)))) if method == "serial": while not all(completed): if not keep_order: random.shuffle(indices) for i in indices: done, mechanism = completed[i], mechanisms[i] if done: continue result = mechanism.step() if result.running: continue completed[i] = True states[i] = mechanism.state if all(completed): break elif method == "threads": raise NotImplementedError() elif method == "processes": raise NotImplementedError() else: raise ValueError( f"method {method} is unknown. Acceptable options are serial, threads, processes" ) return states
[docs] @classmethod def stepall( cls, mechanisms: list[Mechanism] | tuple[Mechanism, ...], keep_order=True ) -> list[MechanismState]: """Step all mechanisms. Args: mechanisms: list of mechanisms keep_order: if True, the mechanisms will be run in order every step otherwise the order will be randomized at every step Returns: - list of states of all mechanisms after completion """ indices = list(range(len(list(mechanisms)))) if not keep_order: random.shuffle(indices) completed = [_ is None for _ in mechanisms] for i in indices: done, mechanism = completed[i], mechanisms[i] if done: continue result = mechanism.step() if result.running: continue completed[i] = True return [_.state for _ in mechanisms]
[docs] def run_with_progress(self, timeout=None) -> MechanismState: if timeout is None: with Progress() as progress: task = progress.add_task("Negotiating ...", total=100) for _ in self: progress.update(task, completed=int(self.relative_time * 100)) else: start_time = time.perf_counter() with Progress() as progress: task = progress.add_task("Negotiating ...", total=100) for _ in self: progress.update(task, completed=int(self.relative_time * 100)) if time.perf_counter() - start_time > timeout: ( self._current_state.running, self._current_state.timedout, self._current_state.broken, ) = (False, True, False) self.on_negotiation_end() break return self.state
[docs] def run(self, timeout=None) -> MechanismState: if timeout is None: for _ in self: pass else: start_time = time.perf_counter() for _ in self: if time.perf_counter() - start_time > timeout: ( self._current_state.running, self._current_state.timedout, self._current_state.broken, ) = (False, True, False) self.on_negotiation_end() break return self.state
@property def history(self): return self._history @property def stats(self): return self._stats @property def current_step(self): return self._current_state.step def _get_preferences(self): preferences = [] for a in self.negotiators: preferences.append(a.preferences) return preferences def _pareto_frontier( self, method: Callable, max_cardinality: int | float = float("inf"), sort_by_welfare=True, ) -> tuple[list[tuple[float, ...]], list[Outcome]]: ufuns = tuple(self._get_preferences()) if any(_ is None for _ in ufuns): raise ValueError( "Some negotiators have no ufuns. Cannot calcualate the pareto frontier" ) outcomes = self.discrete_outcomes(max_cardinality=max_cardinality) points = [tuple(ufun(outcome) for ufun in ufuns) for outcome in outcomes] reservs = tuple( _.reserved_value if _ is not None else float("-inf") for _ in ufuns ) rational_indices = [ i for i, _ in enumerate(points) if all(a >= b for a, b in zip(_, reservs)) ] points = [points[_] for _ in rational_indices] indices = list( method( points, sort_by_welfare=sort_by_welfare, ) ) frontier = [points[_] for _ in indices] if frontier is None: raise ValueError("Could not find the pareto-frontier") return frontier, [outcomes[rational_indices[_]] for _ in indices]
[docs] def pareto_frontier_bf( self, max_cardinality: float = float("inf"), sort_by_welfare=True ) -> tuple[list[tuple[float, ...]], list[Outcome]]: return self._pareto_frontier( pareto_frontier_bf, max_cardinality, sort_by_welfare )
[docs] def pareto_frontier( self, max_cardinality: float = float("inf"), sort_by_welfare=True ) -> tuple[tuple[tuple[float, ...]], list[Outcome]]: ufuns = tuple(self._get_preferences()) if any(_ is None for _ in ufuns): raise ValueError( "Some negotiators have no ufuns. Cannot calculate the pareto frontier" ) outcomes = self.discrete_outcomes(max_cardinality=max_cardinality) results = pareto_frontier( ufuns, outcomes=outcomes, n_discretization=None, max_cardinality=float("inf"), sort_by_welfare=sort_by_welfare, ) return results[0], [outcomes[_] for _ in results[1]]
[docs] def max_welfare_points( self, max_cardinality: float = float("inf"), frontier: tuple[tuple[float]] | None = None, frontier_outcomes: list[Outcome] | None = None, ) -> tuple[tuple[tuple[float], Outcome]]: ufuns = self._get_preferences() if not frontier: frontier, frontier_outcomes = self.pareto_frontier(max_cardinality) assert frontier_outcomes is not None # outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality)) kalai_pts = max_welfare_points( ufuns, frontier, outcome_space=self.outcome_space ) return tuple( (kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts )
[docs] def max_relative_welfare_points( self, max_cardinality: float = float("inf"), frontier: tuple[tuple[float]] | None = None, frontier_outcomes: list[Outcome] | None = None, ) -> tuple[tuple[tuple[float], Outcome]]: ufuns = self._get_preferences() if not frontier: frontier, frontier_outcomes = self.pareto_frontier(max_cardinality) assert frontier_outcomes is not None # outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality)) kalai_pts = max_relative_welfare_points( ufuns, frontier, outcome_space=self.outcome_space ) return tuple( (kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts )
[docs] def modified_kalai_points( self, max_cardinality: float = float("inf"), frontier: tuple[tuple[float]] | None = None, frontier_outcomes: list[Outcome] | None = None, ) -> tuple[tuple[tuple[float], Outcome]]: ufuns = self._get_preferences() if not frontier: frontier, frontier_outcomes = self.pareto_frontier(max_cardinality) assert frontier_outcomes is not None # outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality)) kalai_pts = kalai_points( ufuns, frontier, outcome_space=self.outcome_space, subtract_reserved_value=False, ) return tuple( (kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts )
[docs] def kalai_points( self, max_cardinality: float = float("inf"), frontier: tuple[tuple[float]] | None = None, frontier_outcomes: list[Outcome] | None = None, ) -> tuple[tuple[tuple[float], Outcome]]: ufuns = self._get_preferences() if not frontier: frontier, frontier_outcomes = self.pareto_frontier(max_cardinality) assert frontier_outcomes is not None # outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality)) kalai_pts = kalai_points( ufuns, frontier, outcome_space=self.outcome_space, subtract_reserved_value=True, ) return tuple( (kalai_utils, frontier_outcomes[indx]) for kalai_utils, indx in kalai_pts )
[docs] def nash_points( self, max_cardinality: float = float("inf"), frontier: tuple[tuple[float]] | None = None, frontier_outcomes: list[Outcome] | None = None, ) -> tuple[tuple[tuple[float], Outcome]]: ufuns = self._get_preferences() if not frontier: frontier, frontier_outcomes = self.pareto_frontier(max_cardinality) assert frontier_outcomes is not None # outcomes = tuple(self.discrete_outcomes(max_cardinality=max_cardinality)) nash_pts = nash_points(ufuns, frontier, outcome_space=self.outcome_space) return tuple( (nash_utils, frontier_outcomes[indx]) for nash_utils, indx in nash_pts )
[docs] def plot(self, **kwargs): """A method for plotting a negotiation session.""" _ = kwargs
def _get_ami(self, negotiator: ReactiveStrategy) -> NegotiatorMechanismInterface: # type: ignore _ = negotiator warnings.deprecated(f"_get_ami is depricated. Use `get_nmi` instead of it") return self.nmi def __iter__(self): return self def __str__(self): d = self.__dict__.copy() return pprint.pformat(d) __repr__ = __str__