Source code for negmas.situated.world

from __future__ import annotations
import copy
import itertools
import logging
import math
import os
import random
import sys
import time
import traceback
from abc import ABC, abstractmethod
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Generic, TypeVar

import numpy as np
import pandas as pd
import scipy
import yaml

from negmas.checkpoints import CheckpointMixin
from negmas.common import MechanismAction, NegotiatorMechanismInterface
from negmas.sao.common import SAOState
from negmas.sao.mechanism import SAOMechanism
from negmas.config import negmas_config
from negmas.events import Event, EventLogger, EventSink, EventSource
from negmas.genius import ANY_JAVA_PORT, DEFAULT_JAVA_PORT, get_free_tcp_port
from negmas.helpers import (
    create_loggers,
    exception2str,
    get_class,
    humanize_time,
    unique_name,
)
from negmas.helpers.inout import ConfigReader, add_records
from negmas.mechanisms import Mechanism
from negmas.negotiators import Negotiator
from negmas.outcomes import Issue, Outcome, outcome2dict
from negmas.outcomes.outcome_space import CartesianOutcomeSpace
from negmas.preferences import Preferences
from negmas.serialization import to_flat_dict
from negmas.types import NamedObject
from negmas.warnings import NegmasImportWarning, warn

from .agent import Agent
from .breaches import Breach, BreachProcessing
from .bulletinboard import BulletinBoard
from .common import (
    DEFAULT_EDGE_TYPES,
    EDGE_COLORS,
    EDGE_TYPES,
    NegotiationInfo,
    Operations,
)
from .contract import Contract
from .entity import Entity
from .helpers import deflistdict
from .mechanismfactory import MechanismFactory
from .monitors import StatsMonitor, WorldMonitor
from .save import save_stats
from .awi import AgentWorldInterface

if TYPE_CHECKING:
    from matplotlib.axes import Axes

try:
    import networkx as nx
except ImportError:
    nx = None

__all__ = ["World"]

LOG_BASE = negmas_config("log_base", Path.home() / "negmas" / "logs")


def _path(path) -> Path:
    """Creates an absolute path from given path which can be a string"""
    if isinstance(path, str):
        if path.startswith("~"):
            path = Path.home() / ("/".join(path.split("/")[1:]))
    return Path(path).absolute()


TAWI = TypeVar("TAWI", bound=AgentWorldInterface)
TAgent = TypeVar("TAgent", bound=Agent)


[docs] class World( EventSink, EventSource, ConfigReader, NamedObject, CheckpointMixin, Generic[TAWI, TAgent], ABC, ): """Base world class encapsulating a world that runs a simulation with several agents interacting within some dynamically changing environment. A world maintains its own session. Args: * General * name: World Name bulletin_board: A bulletin board object to use. If not given one will be created awi_type: The type used for agent world interfaces (must descend from or behave like `AgentWorldInterface` ) info: A dictionary of key-value pairs that is kept within the world but never used. It is useful for storing contextual information. For example, when running tournaments. * Simulation parameters * n_steps: Total simulation time in steps time_limit: Real-time limit on the simulation operations: A list of `Operations` to run in order during every simulation step * Negotiation Parameters * negotiation_speed: The number of negotiation steps per simulation step. None means infinite neg_n_steps: Maximum number of steps allowed for a negotiation. neg_step_time_limit: Time limit for single step of the negotiation protocol. neg_time_limit: Real-time limit on each single negotiation. shuffle_negotiations: Whether negotiations are shuffled everytime when stepped. negotiation_quota_per_step: Number of negotiations an agent is allowed to start per step negotiation_quota_per_simulation: Number of negotiations an agent is allowed to start in the simulation start_negotiations_immediately: If true negotiations start immediately when registered rather than waiting for the next step mechanisms: The mechanism types allowed in this world associated with each keyward arguments to be passed to it. * Signing parameters * default_signing_delay: The default number of steps between contract conclusion and signing it. Only takes effect if `force_signing` is `False` force_signing: If true, agents are not asked to sign contracts. They are forced to do so. In this case, `default_singing_delay` is not effective and signature is immediate batch_signing: If true, contracts are signed in batches not individually * Breach Processing * breach_processing: How to handle breaches. Can be any of `BreachProcessing` values * Logging * log_folder: Folder to save all logs log_to_file: If true, will log to a file log_file_name: Name of the log file log_file_level: The log-level to save to file (WARNING, ERROR, INFO, DEBUG, CRITICAL, ...) log_ufuns: Log utility functions log_negotiations: Log all negotiation events log_to_screen: Whether to log to screen log_screen_level: The log-level to show on screen (WARNING, ERROR, INFO, DEBUG, CRITICAL, ...) no_logs: If True, All logging will be disabled no matter what other options are given. log_stats_every: If nonzero and positive, the period of saving stats construct_graphs: If true, information needed to draw graphs using `draw` method are kept. event_file_name: If not None, the file-name to store events into. event_types: Types of events to log * What to save * save_signed_contracts: Save all signed contracts save_cancelled_contracts: Save all cancelled contracts save_negotiations: Save all negotiation records save_resolved_breaches: Save all resolved breaches save_unresolved_breaches: Save all unresolved breaches saved_details_level: The level of details to save agent info (>=1), simulation info (>=1), negotiation info (>=2), negotiator action info (>=3) * Exception Handling * ignore_agent_exceptions: Ignore agent exceptions and keep running ignore_negotiation_exceptions: If true, all mechanism exceptions are ignored and the mechanism is aborted ignore_simulation_exceptions: Ignore simulation exceptions and keep running ignore_contract_execution_exceptions: Ignore contract execution exceptions and keep running safe_stats_monitoring: Never throw an exception for a failure to save stats or because of a Stats Monitor object * Checkpoints * checkpoint_every: The number of steps to checkpoint after. Set to <= 0 to disable checkpoint_folder: The folder to save checkpoints into. Set to None to disable checkpoint_filename: The base filename to use for checkpoints (multiple checkpoints will be prefixed with step number). single_checkpoint: If true, only the most recent checkpoint will be saved. extra_checkpoint_info: Any extra information to save with the checkpoint in the corresponding json file as a dictionary with string keys exist_ok: IF true, checkpoints override existing checkpoints with the same filename. genius_port: the port used to connect to Genius for all negotiators in this mechanism (0 means any). """ def __init__( self, bulletin_board: BulletinBoard | None = None, n_steps: int = 10000, time_limit: int | float | None = 60 * 60, negotiation_speed: int | None = None, neg_n_steps: int | None = 100, neg_time_limit: int | float | None = None, neg_step_time_limit: int | float | None = float("inf"), shuffle_negotiations=True, negotiation_quota_per_step: int = sys.maxsize, negotiation_quota_per_simulation: int = sys.maxsize, default_signing_delay=1, force_signing=False, batch_signing=True, breach_processing=BreachProcessing.NONE, mechanisms: dict[str, dict[str, Any]] | None = None, awi_type: str | type[TAWI] = "negmas.situated.AgentWorldInterface", start_negotiations_immediately: bool = False, log_folder=None, log_to_file=True, log_ufuns=False, log_negotiations: bool = False, log_to_screen: bool = False, log_stats_every: int = 0, log_file_level=logging.DEBUG, log_screen_level=logging.ERROR, no_logs=False, event_file_name="events.json", event_types=None, log_file_name="log.txt", save_signed_contracts: bool = True, save_cancelled_contracts: bool = True, save_negotiations: bool = True, save_resolved_breaches: bool = True, save_unresolved_breaches: bool = True, saved_details_level: int = 4, ignore_agent_exceptions: bool = False, ignore_negotiation_exceptions: bool = False, ignore_contract_execution_exceptions: bool = False, ignore_simulation_exceptions: bool = False, safe_stats_monitoring: bool = False, construct_graphs: bool = False, checkpoint_every: int = 1, checkpoint_folder: str | Path | None = None, checkpoint_filename: str | None = None, extra_checkpoint_info: dict[str, Any] | None = None, single_checkpoint: bool = True, exist_ok: bool = True, operations: list[Operations] | tuple[Operations, ...] = ( Operations.StatsUpdate, Operations.Negotiations, Operations.ContractSigning, Operations.AgentSteps, Operations.ContractExecution, Operations.SimulationStep, Operations.ContractSigning, Operations.StatsUpdate, ), info: dict[str, Any] | None = None, genius_port: int = DEFAULT_JAVA_PORT, disable_agent_printing: bool = False, debug: bool = False, name: str | None = None, id: str | None = None, ): self._debug = debug if debug: ignore_agent_exceptions = False ignore_negotiation_exceptions = False ignore_contract_execution_exceptions = False ignore_simulation_exceptions = False self.info = None self.disable_agent_printing = disable_agent_printing self.ignore_simulation_exceptions = ignore_simulation_exceptions self.ignore_negotiation_exceptions = ignore_negotiation_exceptions if force_signing: batch_signing = False super().__init__() self.__next_operation_index = 0 NamedObject.__init__(self, name, id=id) CheckpointMixin.checkpoint_init( self, step_attrib="current_step", every=checkpoint_every, folder=checkpoint_folder, filename=checkpoint_filename, info=extra_checkpoint_info, exist_ok=exist_ok, single=single_checkpoint, ) self.name = ( name.replace("/", ".") if name is not None else unique_name(base=self.__class__.__name__, add_time=True, rand_digits=5) ) self.id = unique_name(self.name, add_time=True, rand_digits=8) self._no_logs = no_logs if log_folder is not None: self._log_folder = Path(log_folder).absolute() else: self._log_folder = Path(LOG_BASE) if name is not None: for n in name.split("/"): self._log_folder /= n else: self._log_folder /= self.name if event_file_name: self._event_logger = EventLogger( self._log_folder / event_file_name, types=event_types ) self.register_listener(None, self._event_logger) else: self._event_logger = None if log_file_name is None: log_file_name = "log.txt" if len(log_file_name) == 0: log_to_file = False if ( log_folder or log_negotiations or log_stats_every or log_to_file or log_ufuns ): self._log_folder.mkdir(parents=True, exist_ok=True) self._agent_log_folder = self._log_folder / "_agent_logs" self._agent_log_folder.mkdir(parents=True, exist_ok=True) self._agent_loggers: dict[str, logging.Logger] = {} self.log_file_name = ( str(self._log_folder / log_file_name) if log_to_file else None ) self.log_file_level = log_file_level self.log_screen_level = log_screen_level self.log_to_screen = log_to_screen self.log_negotiations = log_negotiations self.logger = ( create_loggers( file_name=self.log_file_name, module_name=None, screen_level=log_screen_level if log_to_screen else None, file_level=log_file_level, app_wide_log_file=True, ) if not no_logs else None ) self.ignore_contract_execution_exceptions = ignore_contract_execution_exceptions self.ignore_agent_exception = ignore_agent_exceptions self.times: dict[str, float] = defaultdict(float) self.simulation_exceptions: dict[int, list[str]] = defaultdict(list) self.mechanism_exceptions: dict[int, list[str]] = defaultdict(list) self.contract_exceptions: dict[int, list[str]] = defaultdict(list) self.agent_exceptions: dict[str, list[tuple[int, str]]] = defaultdict(list) self.negotiator_exceptions: dict[str, list[tuple[int, str]]] = defaultdict(list) self._negotiations: dict[str, NegotiationInfo] = {} self.unsigned_contracts: dict[int, set[Contract]] = defaultdict(set) self.breach_processing = breach_processing self.n_steps = n_steps self.save_signed_contracts = save_signed_contracts self.save_cancelled_contracts = save_cancelled_contracts self.save_negotiations = save_negotiations self.save_resolved_breaches = save_resolved_breaches self.save_unresolved_breaches = save_unresolved_breaches self.construct_graphs = construct_graphs self.operations = operations self._current_step = 0 self.negotiation_speed = negotiation_speed self.default_signing_delay = default_signing_delay self.time_limit = time_limit if time_limit is not None else float("inf") self.neg_n_steps = neg_n_steps self.neg_time_limit = neg_time_limit self.neg_step_time_limit = neg_step_time_limit self.frozen_time = 0.0 self._entities: dict[int, set[Entity]] = defaultdict(set) self._negotiations: dict[str, NegotiationInfo] = {} self.force_signing = force_signing self.neg_quota_step = negotiation_quota_per_step self.neg_quota_simulation = negotiation_quota_per_simulation self._start_time = None self._log_ufuns = log_ufuns self._log_negs = log_negotiations self.safe_stats_monitoring = safe_stats_monitoring self.shuffle_negotiations = shuffle_negotiations self.info = info if info is not None else dict() if isinstance(mechanisms, Collection) and not isinstance(mechanisms, dict): mechanisms = dict(zip(mechanisms, [dict()] * len(mechanisms))) self.mechanisms: dict[str, dict[str, Any]] | None = mechanisms self.awi_type = get_class(awi_type, scope=globals()) self._log_folder = str(self._log_folder) self._stats: dict[str, list[Any]] = defaultdict(list) self.__stepped_mechanisms: set[str] = set() self.__n_negotiations = 0 self.__n_contracts_signed = 0 self.__n_contracts_concluded = 0 self.__n_contracts_cancelled = 0 self.__n_contracts_dropped = 0 self.__stats_stage = 0 self.__stage = 0 self.__n_new_contract_executions = 0 self.__n_new_breaches = 0 self.__n_new_contract_errors = 0 self.__n_new_contract_nullifications = 0 self.__activity_level = 0 self.__blevel = 0.0 self.__n_steps_broken = 0 self.__n_steps_success = 0 self.__n_broken = 0 self.__n_success = 0 self._saved_contracts: dict[str, dict[str, Any]] = {} self._saved_negotiations: dict[str, dict[str, Any]] = {} self._saved_breaches: dict[str, dict[str, Any]] = {} self._started = False self.batch_signing = batch_signing self.agents: dict[str, TAgent] = {} self.immediate_negotiations = start_negotiations_immediately self.stats_monitors: set[StatsMonitor] = set() self.world_monitors: set[WorldMonitor] = set() self._edges_negotiation_requests_accepted: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_negotiation_requests_rejected: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_negotiations_started: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_negotiations_rejected: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_negotiations_succeeded: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_negotiations_failed: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_concluded: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_signed: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_cancelled: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_nullified: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_erred: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_executed: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self._edges_contracts_breached: dict[ int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]] ] = defaultdict(deflistdict) self.neg_requests_sent: dict[str, int] = defaultdict(int) self.neg_requests_received: dict[str, int] = defaultdict(int) self.negs_registered: dict[str, int] = defaultdict(int) self.negs_succeeded: dict[str, int] = defaultdict(int) self.negs_failed: dict[str, int] = defaultdict(int) self.negs_timedout: dict[str, int] = defaultdict(int) self.negs_initiated: dict[str, int] = defaultdict(int) self.contracts_concluded: dict[str, int] = defaultdict(int) self.contracts_signed: dict[str, int] = defaultdict(int) self.neg_requests_rejected: dict[str, int] = defaultdict(int) self.contracts_dropped: dict[str, int] = defaultdict(int) self.breaches_received: dict[str, int] = defaultdict(int) self.breaches_committed: dict[str, int] = defaultdict(int) self.contracts_erred: dict[str, int] = defaultdict(int) self.contracts_nullified: dict[str, int] = defaultdict(int) self.contracts_executed: dict[str, int] = defaultdict(int) self.contracts_breached: dict[str, int] = defaultdict(int) self.attribs: dict[str, dict[str, Any]] = {} self._sim_start: float = 0 self._step_start: float = 0 self.bulletin_board: BulletinBoard self.set_bulletin_board(bulletin_board=bulletin_board) stats_calls = [_ for _ in self.operations if _ == Operations.StatsUpdate] self._single_stats_call = len(stats_calls) == 1 self._two_stats_calls = len(stats_calls) == 2 self._n_negs_per_agent_per_step: dict[str, int] = defaultdict(int) self._n_negs_per_agent: dict[str, int] = defaultdict(int) self.genius_port = ( genius_port if genius_port > 0 else ANY_JAVA_PORT if genius_port == ANY_JAVA_PORT else get_free_tcp_port() ) self.params = dict( negotiation_speed=negotiation_speed, negotiation_can_cross_step_boundaries=not ( self.negotiation_speed is None or ( self.neg_n_steps is not None and self.negotiation_speed is not None and self.neg_n_steps < self.negotiation_speed ) or ( self.neg_n_steps is None and ( self.neg_time_limit is not None and not math.isinf(self.neg_time_limit) ) ) ), default_signing_delay=default_signing_delay, batch_signing=batch_signing, breach_processing=breach_processing, mechanisms=mechanisms, start_negotiations_immediately=start_negotiations_immediately, ignore_agent_exceptions=ignore_agent_exceptions, ignore_negotiation_exceptions=ignore_negotiation_exceptions, ignore_contract_execution_exceptions=ignore_contract_execution_exceptions, ignore_simulation_exceptions=ignore_simulation_exceptions, operations=operations, genius_port=self.genius_port, ) self.loginfo(f"{self.name}: World Created") if log_stats_every is None or log_stats_every < 1: self._stats_file_name = None self._stats_dir_name = None else: stats_file_name = _path(str(Path(self._log_folder) / "stats.csv")) self._stats_file_name = stats_file_name.name self._stats_dir_name = stats_file_name.parent # extra log information self._saved_details_level = saved_details_level self._extra_folder = Path(self._log_folder) if self._log_folder else None if self._extra_folder is None: self._saved_details_level = 0 else: self._extra_folder.mkdir(exist_ok=True, parents=True) self._sim_info: list[dict[str, Any]] = [] self._neg_info: dict[str, dict[str, Any]] = defaultdict(dict) self._offer_info: list[tuple] = [] self._state_cats = ["agreement", "timedout", "ended", "error"] self._offer_info_cols: list[tuple[str, type | str]] = [ ("id", str), ("neg_id", str), ("step", int), ("relative_time", float), ("time", float), ("sender", str), ("receiver", str), ("sender_agent_id", str), ("receiver_agent_id", str), ("state", "category"), ] + list(self.action_info_cols()) self._agreement_info_cols = self.agreement_info_cols() self._agent_info: dict[str | None, dict[str, Any]] = defaultdict(dict) self._agent_info[None] = dict( id=0, name="NoAgent", type="NoAgent" ) | self.extra_agent_info(None) for i, (aid, agent) in enumerate(self.agents.items()): self._agent_info[aid] = dict( id=i, name=aid, type=self.type_name_for_logs(agent) ) | self.extra_agent_info(agent) if self._extra_folder is not None: df = pd.DataFrame.from_records(list(self._agent_info.values())) df.to_csv(self._extra_folder / "agents.csv", index=False)
[docs] def action_info_cols(self) -> list[tuple[str, type]]: return [("offer", str)]
[docs] def extract_action_info(self, action: Any) -> list[Any]: return [str(action)]
[docs] def agreement_info_cols(self) -> list[tuple[str, type]]: return [("agreement", str)]
[docs] def extract_agreement_info(self, agreement: Outcome | None) -> list[Any]: return [str(agreement)] if agreement else [""]
[docs] def extra_agent_info(self, agent: Agent | None) -> dict[str, Any]: return dict()
[docs] def extra_sim_step_info_pre(self) -> dict[str, Any]: return dict()
[docs] def extra_sim_step_info_post(self) -> dict[str, Any]: return dict()
[docs] def extra_neg_info(self, info: NegotiationInfo) -> dict[str, Any]: return dict()
@property def stat_names(self, peragent: bool = False): """Returns names of all stats available""" names = sorted(list(self.stats.keys())) if peragent: return names final = [] for name in names: parts = name.split("_") if any(parts[-1] == _ for _ in self.agents.keys()): final.append("_".join(parts[:-1])) continue final.append(name) return sorted(list(set(final))) @property def stats(self) -> dict[str, Any]: if len(self._stats) == 0: return dict() L = max(len(_) for _ in self._stats.values()) def extend(x, L): n = len(x) if n >= L: return x return x + [float("nan")] * (L - n) return {k: extend(v, L) for k, v in self._stats.items()} @property def breach_fraction(self) -> float: """Fraction of signed contracts that led to breaches""" n_breaches = sum(self.stats["n_breaches"]) n_signed_contracts = len( [_ for _ in self._saved_contracts.values() if _["signed_at"] >= 0] ) return n_breaches / n_signed_contracts if n_signed_contracts != 0 else 0.0 breach_rate = breach_fraction
[docs] def n_saved_contracts(self, ignore_no_issue: bool = True) -> int: """ Number of saved contracts Args: ignore_no_issue: If true, only contracts resulting from negotiation (has some issues) will be counted """ if ignore_no_issue: return len([_ for _ in self._saved_contracts.values() if _["issues"]]) return len(self._saved_contracts)
@property def agreement_fraction(self) -> float: """Fraction of negotiations ending in agreement and leading to signed contracts""" n_negs = sum(self.stats["n_negotiations"]) n_contracts = self.n_saved_contracts(True) return n_contracts / n_negs if n_negs != 0 else np.nan agreement_rate = agreement_fraction @property def cancellation_fraction(self) -> float: """Fraction of contracts concluded (through negotiation or otherwise) that were cancelled.""" n_contracts = self.n_saved_contracts(False) n_signed_contracts = len( [_ for _ in self._saved_contracts.values() if _["signed_at"] >= 0] ) return (1.0 - n_signed_contracts / n_contracts) if n_contracts != 0 else np.nan cancellation_rate = cancellation_fraction
[docs] def loginfo(self, s: str, event: Event | None = None) -> None: """logs info-level information Args: s (str): The string to log event (Event): The event to announce after logging """ if event: self.announce(event) if self._no_logs or not self.logger: return self.logger.info(f"{self._log_header()}: " + s.strip())
[docs] def set_bulletin_board(self, bulletin_board): self.bulletin_board = ( bulletin_board if bulletin_board is not None else BulletinBoard() ) self.bulletin_board.add_section("breaches") self.bulletin_board.add_section("stats") self.bulletin_board.add_section("settings") self.bulletin_board.record("settings", self.n_steps, "n_steps") self.bulletin_board.record("settings", self.time_limit, "time_limit") self.bulletin_board.record( "settings", self.negotiation_speed, "negotiation_speed" ) self.bulletin_board.record("settings", self.neg_n_steps, "neg_n_steps") self.bulletin_board.record("settings", self.neg_time_limit, "neg_time_limit") self.bulletin_board.record( "settings", self.neg_step_time_limit, "neg_step_time_limit" ) self.bulletin_board.record( "settings", self.default_signing_delay, "default_signing_delay" ) self.bulletin_board.record("settings", self.force_signing, "force_signing") self.bulletin_board.record("settings", self.batch_signing, "batch_signing") self.bulletin_board.record( "settings", self.breach_processing, "breach_processing" ) self.bulletin_board.record( "settings", list(self.mechanisms.keys()) if self.mechanisms is not None else [], "mechanism_names", ) self.bulletin_board.record( "settings", self.mechanisms if self.mechanisms is not None else dict(), "mechanisms", ) self.bulletin_board.record( "settings", self.immediate_negotiations, "start_negotiations_immediately" )
[docs] @classmethod def is_basic_stat(self, s: str) -> bool: """Checks whether a given statistic is agent specific.""" return ( s in ["activity_level", "breach_level", "n_bankrupt", "n_breaches"] or s.startswith("n_contracts") or s.startswith("n_negotiation") or s.startswith("n_registered_negotiations") )
@property def current_step(self): return self._current_step def _agent_logger(self, aid: str) -> logging.Logger: """Returns the logger associated with a given agent""" if aid not in self._agent_loggers.keys(): self._agent_loggers[aid] = ( create_loggers( file_name=self._agent_log_folder / f"{aid}.txt", module_name=None, file_level=self.log_file_level, app_wide_log_file=False, module_wide_log_file=False, ) if not self._no_logs else None ) return self._agent_loggers[aid]
[docs] def logdebug_agent(self, aid: str, s: str, event: Event | None = None) -> None: """logs debug to the agent individual log Args: s (str): The string to log event (Event): The event to announce after logging """ if event: self.announce(event) if self._no_logs: return logger = self._agent_logger(aid) logger.debug(f"{self._log_header()}: " + s.strip())
[docs] def on_event(self, event: Event, sender: EventSource): """Received when an event is raised""" if event.type == "negotiator_exception": negotiator = event.data.get("negotiator") if not negotiator: return agent = negotiator.owner if not agent: return self.logdebug_agent( agent.id, f"Negotiator {negotiator.name} raised: " + str(event.data.get("exception", "Unknown exception")), )
@property def log_folder(self): return self._log_folder
[docs] def loginfo_agent( self, aid: str, s: str | None, event: Event | None = None ) -> None: """logs information to the agent individual log Args: s (str): The string to log event (Event): The event to announce after logging """ if not s: return if event: self.announce(event) if self._no_logs or not self.logger: return logger = self._agent_logger(aid) logger.info(f"{self._log_header()}: " + s.strip())
[docs] def logwarning_agent( self, aid: str, s: str | None, event: Event | None = None ) -> None: """logs warning to the agent individual log Args: s (str): The string to log event (Event): The event to announce after logging """ if not s: return if event: self.announce(event) if self._no_logs or not self.logger: return logger = self._agent_logger(aid) logger.warning(f"{self._log_header()}: " + s.strip())
[docs] def logerror_agent( self, aid: str, s: str | None, event: Event | None = None ) -> None: """logs information to the agent individual log Args: s (str): The string to log event (Event): The event to announce after logging """ if not s: return if not s: return if event: self.announce(event) if self._no_logs or not self.logger: return logger = self._agent_logger(aid) logger.error(f"{self._log_header()}: " + s.strip())
[docs] def logdebug(self, s: str | None, event: Event | None = None) -> None: """logs debug-level information Args: s (str): The string to log event (Event): The event to announce after logging """ if not s: return if event: self.announce(event) if self._no_logs or not self.logger: return self.logger.debug(f"{self._log_header()}: " + s.strip())
[docs] def logwarning(self, s: str | None, event: Event | None = None) -> None: """logs warning-level information Args: s (str): The string to log event (Event): The event to announce after logging """ if not s: return if event: self.announce(event) if self._no_logs or not self.logger: return self.logger.warning(f"{self._log_header()}: " + s.strip())
[docs] def logerror(self, s: str | None, event: Event | None = None) -> None: """logs error-level information Args: s (str): The string to log event (Event): The event to announce after logging """ if not s: return if event: self.announce(event) if self._no_logs or not self.logger: return self.logger.error(f"{self._log_header()}: " + s.strip())
@property def time(self) -> float: """Elapsed time since world started in seconds. 0.0 if the world did not start running""" if self._start_time is None: return 0.0 if ( self.n_steps is not None and self.current_step >= self.n_steps and self.frozen_time > 0.0 ): return self.frozen_time return time.perf_counter() - self._start_time @property def remaining_time(self) -> float | None: """Returns remaining time in seconds. None if no time limit is given.""" if not self._start_time: return self.time_limit limit = self.time_limit - (time.perf_counter() - self._start_time) if limit < 0.0: return 0.0 return limit @property def relative_time(self) -> float: """Returns a number between ``0`` and ``1`` indicating elapsed relative time or steps.""" if self.time_limit == float("inf") and self.n_steps is None: return 0.0 relative_step = ( self.current_step / self.n_steps if self.n_steps is not None else np.nan ) relative_time = self.time / self.time_limit return max([relative_step, relative_time]) @property def remaining_steps(self) -> int | None: """Returns the remaining number of steps until the end of the mechanism run. None if unlimited""" if self.n_steps is None: return None return self.n_steps - self.current_step
[docs] @abstractmethod def breach_record(self, breach: Breach) -> dict[str, Any]: """Converts a breach to a record suitable for storage during the simulation"""
def _register_breach(self, breach: Breach) -> None: # we do not report breachs with no victims if breach.victims is None or len(breach.victims) < 1: return for v in breach.victims: self.breaches_received[v] += 1 self.breaches_committed[breach.perpetrator] += 1 self.bulletin_board.record( section="breaches", key=breach.id, value=self.breach_record(breach) ) @property def saved_negotiations(self) -> list[dict[str, Any]]: return list(self._saved_negotiations.values())
[docs] def on_exception(self, entity: Entity, e: Exception) -> None: """ Called when an exception happens. Args: entity: The entity that caused the exception e: The exception """
[docs] def call(self, agent: TAgent, method: Callable, *args, **kwargs) -> Any: """ Calls a method on an agent updating exeption count Args: agent: The agent on which the method is to be called method: The bound method (bound to the agent) *args: position arguments **kwargs: keyword arguments Returns: whatever method returns """ old_stdout = sys.stdout # backup current stdout if self.disable_agent_printing: sys.stdout = open(os.devnull, "w") _strt = time.perf_counter() try: result = method(*args, **kwargs) _end = time.perf_counter() self.times[agent.id] = _end - _strt return result except Exception as e: _end = time.perf_counter() self.times[agent.id] = _end - _strt self.agent_exceptions[agent.id].append( (self._current_step, exception2str()) ) self.on_exception(agent, e) if not self.ignore_agent_exception: raise e exc_type, exc_value, exc_traceback = sys.exc_info() self.logerror( f"Entity exception @{agent.id}: " f"{traceback.format_tb(exc_traceback)}", Event("entity-exception", dict(exception=e)), ) finally: if self.disable_agent_printing: sys.stdout.close() sys.stdout = old_stdout # reset old stdout
def _add_edges( self, src: TAgent | str, dst: list[TAgent | str], target: dict[int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]]], bi=False, issues: list[Issue] | None = None, agreement: dict[str, Any] | None = None, ): """Registers an edge""" if not self.construct_graphs: return attr = None if issues is not None: attr = {i.name: i.values for i in issues} if agreement is not None: attr = agreement for p in dst: if p == src: continue src_id = src.id if isinstance(src, Agent) else src p_id = p.id if isinstance(p, Agent) else p target[self.current_step][(src_id, p_id)].append(attr) if bi: target[self.current_step][(p_id, src_id)].append(attr)
[docs] def is_valid_contract(self, contract: Contract) -> bool: """ Confirms that the agreement is valid given the world rules. Args: contract: The contract being tested Return: Returns True for valid contracts and False for invalid contracts Remarks: - This test will be conducted after agents are asked to sign the contract and only for signed contracts. - If False is returned, the contract will considered unsigned and will be recorded as a concluded but not signed contract with no rejectors """ return True
def _sign_contract(self, contract: Contract) -> list[str] | None: """Called to sign a contract and returns whether or not it was signed""" # if self._contract_finalization_time(contract) >= self.n_steps or \ # self._contract_execution_time(contract) < self.current_step: # return None partners = [self.agents[_] for _ in contract.partners] def _do_sign(c, p): s_ = c.signatures.get(p, None) if s_ is not None: return s_ try: result = self.call(p, p.sign_all_contracts, [c])[0] if self.time >= self.time_limit: result = None return result except Exception as e: self.agent_exceptions[p.id].append((self._current_step, str(e))) exc_type, exc_value, exc_traceback = sys.exc_info() self.logerror( f"Signature exception @ {p.name}: {traceback.format_tb(exc_traceback)}", Event("agent-exception", dict(method="sign_contract", exception=e)), ) return None if self.force_signing: signatures = [(partner, partner.id) for partner in partners] rejectors = [] else: signatures = list( zip(partners, (_do_sign(contract, partner) for partner in partners)) ) rejectors = [ partner for partner, signature in signatures if signature is None ] if len(rejectors) == 0: contract.signatures = {a.id: s for a, s in signatures} contract.signed_at = self.current_step for partner in partners: self.contracts_signed[partner.id] += 1 self.call(partner, partner.on_contract_signed_, contract=contract) if self.time >= self.time_limit: break else: for partner in partners: self.call( partner, partner.on_contract_cancelled_, contract=contract, rejectors=[_.id for _ in rejectors], ) if self.time >= self.time_limit: break return [_.id for _ in rejectors]
[docs] def on_contract_processed(self, contract): """ Called whenever a contract finished processing to be removed from unsigned contracts Args: contract: Contract Remarks: - called by on_contract_cancelled and on_contract_signed """ unsigned = self.unsigned_contracts.get(self.current_step, None) if unsigned is None: return try: unsigned.remove(contract) except KeyError: pass
[docs] @abstractmethod def contract_record(self, contract: Contract) -> dict[str, Any]: """Converts a contract to a record suitable for permanent storage"""
def _contract_record(self, contract: Contract) -> dict[str, Any]: """Converts a contract to a record suitable for permanent storage""" record = self.contract_record(contract) record.update({"negotiation_id": contract.mechanism_id}) return record
[docs] def on_contract_signed(self, contract: Contract) -> bool: """Called to add a contract to the existing set of contract after it is signed Args: contract: The contract to add Returns: True if everything went OK and False otherwise Remarks: - By default this function just adds the contract to the set of contracts maintaned by the world. - You should ALWAYS call this function when overriding it. """ if not self.is_valid_contract(contract): # TODO check adding an edge of type dropped record = self._contract_record(contract) record["signed_at"] = self.current_step record["executed_at"] = -1 record["breaches"] = "" record["nullified_at"] = -1 record["dropped_at"] = self.current_step record["erred_at"] = -1 self._saved_contracts[contract.id] = record self.__n_contracts_dropped += 1 for p in contract.partners: self.contracts_dropped[p] += 1 self.on_contract_processed(contract) return False self._add_edges( contract.partners[0], contract.partners, self._edges_contracts_signed, bi=True, ) self.__n_contracts_signed += 1 for p in contract.partners: self.contracts_signed[p] += 1 try: self.unsigned_contracts[self.current_step].remove(contract) except KeyError: pass record = self._contract_record(contract) if self.save_signed_contracts: record["signed_at"] = self.current_step record["executed_at"] = -1 record["breaches"] = "" record["nullified_at"] = -1 record["erred_at"] = -1 record["dropped_at"] = -1 self._saved_contracts[contract.id] = record else: self._saved_contracts.pop(contract.id, None) return True
[docs] def on_contract_cancelled(self, contract): """Called whenever a concluded contract is not signed (cancelled) Args: contract: The contract to add Remarks: - By default this function just adds the contract to the set of contracts maintaned by the world. - You should ALWAYS call this function when overriding it. """ self._add_edges( contract.partners[0], contract.partners, self._edges_contracts_cancelled, bi=True, ) record = self._contract_record(contract) record["signed_at"] = -1 record["executed_at"] = -1 record["breaches"] = "" record["nullified_at"] = -1 record["dropped_at"] = -1 record["erred_at"] = -1 self._saved_contracts[contract.id] = record self.__n_contracts_cancelled += 1 self.on_contract_processed(contract)
def _process_unsigned(self): """Processes all concluded but unsigned contracts""" unsigned = self.unsigned_contracts.get(self.current_step, None) signed = [] cancelled = [] if unsigned: if self.batch_signing: agent_contracts = defaultdict(list) agent_signed = defaultdict(list) agent_cancelled = defaultdict(list) contract_signatures = defaultdict(int) contract_rejectors = defaultdict(list) for contract in unsigned: for p in contract.partners: if contract.signatures.get(p, None) is None: agent_contracts[p].append(contract) else: contract_signatures[contract.id] += 1 for agent_id, contracts in agent_contracts.items(): slist = self.call( self.agents[agent_id], self.agents[agent_id].sign_all_contracts, contracts, ) if self.time >= self.time_limit: break if slist is None: slist = [False] * len(contracts) elif isinstance(slist, str): slist = [slist] * len(contracts) elif isinstance(slist, dict): slist = [slist.get(c.id, None) for c in contracts] elif isinstance(slist, Iterable): slist = list(slist) missing = len(contracts) - len(slist) if missing > 0: slist += [None] * missing elif missing < 0: slist = slist[: len(contracts)] for contract, signature in zip(contracts, slist): if signature is not None: contract_signatures[contract.id] += 1 else: contract_rejectors[contract.id].append(agent_id) for contract in unsigned: if contract_signatures[contract.id] == len(contract.partners): contract.signatures = dict( zip(contract.partners, contract.partners) ) contract.signed_at = self.current_step for partner in contract.partners: agent_signed[partner].append(contract) signed.append(contract) else: rejectors = contract_rejectors.get(contract.id, []) for partner in contract.partners: agent_cancelled[partner].append((contract, rejectors)) cancelled.append(contract) everyone = set(agent_signed.keys()).union(set(agent_cancelled.keys())) for agent_id in everyone: cinfo = agent_cancelled[agent_id] rejectors = [_[1] for _ in cinfo] clist = [_[0] for _ in cinfo] self.call( self.agents[agent_id], self.agents[agent_id].on_contracts_finalized, agent_signed[agent_id], clist, rejectors, ) if self.time >= self.time_limit: break else: for contract in unsigned: rejectors = self._sign_contract(contract) if rejectors is not None and len(rejectors) == 0: signed.append(contract) else: cancelled.append(contract) for contract in signed: self.on_contract_signed(contract) for contract in cancelled: self.on_contract_cancelled(contract) def _make_negotiation_record(self, negotiation: NegotiationInfo) -> dict[str, Any]: """Creates a record of the negotiation to be saved""" if negotiation is None: return {} mechanism = negotiation.mechanism if mechanism is None: return {} running, agreement = mechanism.state.running, mechanism.state.agreement record = { "id": mechanism.id, "partner_ids": [_.id for _ in negotiation.partners], "partners": [_.name for _ in negotiation.partners], "partner_types": [_.type_name for _ in negotiation.partners], "requested_at": negotiation.requested_at, "ended_at": self.current_step, "mechanism_type": mechanism.__class__.__name__, "issues": [str(issue) for issue in negotiation.issues], "final_status": ( "running" if running else "succeeded" if agreement is not None else "failed" ), "failed": agreement is None, "agreement": str(agreement), "group": negotiation.group, "caller": negotiation.caller, } if negotiation.annotation: record.update(to_flat_dict(negotiation.annotation)) dd = mechanism.state.asdict() dd = {(k if k not in record.keys() else f"{k}_neg"): v for k, v in dd.items()} dd["history"] = [_.asdict() for _ in mechanism.history] if hasattr(mechanism, "negotiator_offers"): dd["offers"] = { n.owner.id if n.owner else n.name: [ _ for _ in mechanism.negotiator_offers(n.id) ] for n in mechanism.negotiators } record.update(dd) return record def _log_negotiation(self, negotiation: NegotiationInfo) -> None: if not self._log_negs: return mechanism = negotiation.mechanism if not mechanism: return negs_folder = str(Path(self._log_folder) / "negotiations") os.makedirs(negs_folder, exist_ok=True) record = self._make_negotiation_record(negotiation) if len(record) < 1: return add_records(str(Path(self._log_folder) / "negotiations.csv"), [record]) data = pd.DataFrame([to_flat_dict(_) for _ in mechanism.history]) data.to_csv(os.path.join(negs_folder, f"{mechanism.id}.csv"), index=False) @property def n_simulation_exceptions(self) -> dict[int, int]: """ Returns a mapping from agent ID to the total number of exceptions it and its negotiators have raised """ result = defaultdict(int) for k, v in self.simulation_exceptions.items(): result[k] += len(v) return result @property def n_contract_exceptions(self) -> dict[int, int]: """ Returns a mapping from agent ID to the total number of exceptions it and its negotiators have raised """ result = defaultdict(int) for k, v in self.contract_exceptions.items(): result[k] += len(v) return result @property def n_mechanism_exceptions(self) -> dict[int, int]: """ Returns a mapping from agent ID to the total number of exceptions it and its negotiators have raised """ result = defaultdict(int) for k, v in self.mechanism_exceptions.items(): result[k] += len(v) return result @property def n_total_simulation_exceptions(self) -> dict[int, int]: """ Returns the total number of exceptions per step that are not directly raised by agents or their negotiators. Remarks: - This property sums the totals of `n_simulation_exceptions`, `n_contract_exceptions`, and `n_mechanism_exceptions` """ result = defaultdict(int) for d in ( self.n_mechanism_exceptions, self.n_contract_exceptions, self.n_simulation_exceptions, ): for k, v in d.items(): result[k] += v return result @property def n_agent_exceptions(self) -> dict[str, int]: """ Returns a mapping from agent ID to the total number of exceptions it and its negotiators have raised """ result = dict() for k, v in self.agent_exceptions.items(): result[k] = len(v) return result @property def n_total_agent_exceptions(self) -> dict[str, int]: """ Returns a mapping from agent ID to the total number of exceptions it and its negotiators have raised """ result: dict[str, int] = defaultdict(int) for k, v in self.agent_exceptions.items(): result[k] += len(v) for k, v in self.negotiator_exceptions.items(): result[k] += len(v) return result @property def n_negotiator_exceptions(self) -> dict[str, int]: """ Returns a mapping from agent ID to the total number of exceptions its negotiators have raised """ result = dict() for k, v in self.negotiator_exceptions.items(): result[k] = len(v) return result
[docs] def is_valid_agreement( self, negotiation: NegotiationInfo, agreement: Outcome, mechanism: Mechanism ) -> bool: """ Confirms that the agreement is valid given the world rules. Args: negotiation: The `NegotiationInfo` that led to the agreement agreement: The agreement mechanism: The mechanism that led to the agreement Return: Returns True for valid agreements and False for invalid agreements Remarks: - This test is conducted before the agents are asked to sign the corresponding contract - Invalid agreements will be treated as never happened and agents will not be asked to sign it """ return True
[docs] def on_contract_concluded(self, contract: Contract, to_be_signed_at: int) -> None: """Called to add a contract to the existing set of unsigned contract after it is concluded Args: contract: The contract to add to_be_signed_at: The timestep at which the contract is to be signed Remarks: - By default this function just adds the contract to the set of contracts maintaned by the world. - You should ALWAYS call this function when overriding it. """ self.__n_contracts_concluded += 1 for p in contract.partners: self.contracts_concluded[p] += 1 self._add_edges( contract.partners[0], contract.partners, self._edges_contracts_concluded, agreement=contract.agreement, bi=True, ) self.unsigned_contracts[to_be_signed_at].add(contract)
def _register_contract( self, mechanism, negotiation, to_be_signed_at ) -> Contract | None: partners = negotiation.partners if self.save_negotiations: _stats = self._make_negotiation_record(negotiation) self._saved_negotiations[mechanism.id] = _stats if mechanism.state.agreement is None or negotiation is None: return None for partner in partners: self.negs_succeeded[partner.id] += 1 if not self.is_valid_agreement( negotiation, mechanism.state.agreement, mechanism ): return None agreement = mechanism.state.agreement agreement = outcome2dict(agreement, issues=[_.name for _ in mechanism.issues]) signed_at = -1 contract = Contract( partners=list(_.id for _ in partners), annotation=negotiation.annotation, issues=negotiation.issues, agreement=agreement, concluded_at=self.current_step, to_be_signed_at=to_be_signed_at, signed_at=signed_at, mechanism_state=mechanism.state, mechanism_id=mechanism.id, ) self.on_contract_concluded(contract, to_be_signed_at) for partner in partners: self.call( partner, partner.on_negotiation_success_, contract=contract, mechanism=mechanism, ) if self.time >= self.time_limit: break if self.batch_signing: if to_be_signed_at != self.current_step: sign_status = f"to be signed at {contract.to_be_signed_at}" else: sign_status = "" else: if to_be_signed_at == self.current_step: rejectors = self._sign_contract(contract) signed = rejectors is not None and len(rejectors) == 0 if signed: signed = self.on_contract_signed(contract) sign_status = ( "signed" if signed else f"cancelled by {rejectors if rejectors else 'being invalid!!'}" ) else: sign_status = f"to be signed at {contract.to_be_signed_at}" # self.on_contract_processed(contract=contract) if negotiation.annotation is not None: annot_ = dict( zip( negotiation.annotation.keys(), (str(_) for _ in negotiation.annotation.values()), ) ) else: annot_ = "" self.logdebug( f"Contract [{sign_status}]: " f"{[_.name for _ in partners]}" f" > {str(mechanism.state.agreement)} on annotation {annot_}", Event( "negotiation-success", dict(mechanism=mechanism, contract=contract, partners=partners), ), ) return contract def _register_failed_negotiation(self, mechanism, negotiation) -> None: partners = negotiation.partners mechanism_state = mechanism.state annotation = negotiation.annotation self._add_edges( partners[0], partners, self._edges_negotiations_failed, issues=mechanism.issues, bi=True, ) for partner in partners: self.negs_failed[partner.id] += 1 if mechanism_state.timedout: self.negs_timedout[partner.id] += 1 if self.save_negotiations: _stats = self._make_negotiation_record(negotiation) self._saved_negotiations[mechanism.id] = _stats for partner in partners: self.call( partner, partner.on_negotiation_failure_, partners=[_.id for _ in partners], annotation=annotation, mechanism=mechanism, state=mechanism_state, ) if self.time >= self.time_limit: break self.logdebug( f"Negotiation failure between {[_.name for _ in partners]}" f" on annotation {negotiation.annotation} ", Event("negotiation-failure", dict(mechanism=mechanism, partners=partners)), ) def _tobe_signed_at(self, agreement: Outcome, force_immediate_signing=False) -> int: return ( self.current_step if force_immediate_signing else self.current_step + self.default_signing_delay ) def _step_a_mechanism( self, mechanism, force_immediate_signing, action: dict[str, MechanismAction | None] | None = None, ) -> tuple[Contract | None, bool]: """Steps a mechanism one step. Returns: The agreement or None and whether the negotiation is still running """ contract = None try: result = mechanism.step(action) except Exception as e: result = mechanism.abort() if not self.ignore_negotiation_exceptions: raise e exc_type, exc_value, exc_traceback = sys.exc_info() self.logerror( f"Mechanism exception: " f"{traceback.format_tb(exc_traceback)}", Event("entity-exception", dict(exception=e)), ) finally: namap = dict() for neg in mechanism.negotiators: namap[neg.id] = neg.owner if mechanism.stats["times"]: for source, t in mechanism.stats["times"].items(): self.times[namap[source].id if namap[source] else "Unknown"] += t if mechanism.stats["exceptions"]: for source, exceptions in mechanism.stats["exceptions"].items(): self.negotiator_exceptions[ namap[source].id if namap[source] else "Unknown" ].append( list(zip(itertools.repeat(self._current_step), exceptions)) ) agreement, is_running = result.agreement, not result.ended negotiation = self._negotiations.get(mechanism.id, None) if negotiation is not None: self.on_negotiation_stepped(negotiation) if agreement is not None or not is_running: if negotiation is not None: self.on_negotiation_end(negotiation) if self._debug: assert ( negotiation is not None ), f"{mechanism.id} just finished but it is not in the set of running negotiations!!" if agreement is None: self._register_failed_negotiation(mechanism.nmi, negotiation) contract = None else: contract = self._register_contract( mechanism.nmi, negotiation, self._tobe_signed_at(agreement, force_immediate_signing), ) if negotiation is not None: self.on_negotiation_processed(negotiation, contract) self._log_negotiation(negotiation) self._negotiations.pop(mechanism.id, None) return contract, is_running def _step_negotiations( self, mechanisms: list[Mechanism], n_steps: int | float | None, force_immediate_signing: bool, partners: list[list[TAgent]], action: dict[str, dict[str, MechanismAction | None]] | None = None, ) -> tuple[list[Contract | None], list[bool], int, int, int, int]: """ Runs all bending negotiations. Args: mechanisms: The mechanisms to step forward n_steps: The maximum number of steps to step each mechanism. force_immediate_signing: If true, all agreements are signed as contracts immediately upon agreement. partners: List of partners for each mechanism. action: Mapping of negotiator IDs to corresponding negotiation action (e.g. offer in SAO) for every mechanism. Negotiators will be called upon to act only if no action is passed here. Remarks: - The actual number of steps executed is min(n_steps, self.negotiation_speed, mechanism.n_remaining_steps) with any None substituted with float('inf') """ running = [_ is not None for _ in mechanisms] contracts: list[Contract | None] = [None] * len(mechanisms) indices = list(range(len(mechanisms))) n_steps_broken_, n_steps_success_ = 0, 0 n_broken_, n_success_ = 0, 0 current_step = 0 if n_steps is None: n_steps = float("inf") if self.negotiation_speed is not None: n_steps = min(n_steps, self.negotiation_speed) while any(running): if self.shuffle_negotiations: random.shuffle(indices) for i in indices: if not running[i]: continue if self.time >= self.time_limit: break mechanism = mechanisms[i] contract, r = self._step_a_mechanism( mechanism, force_immediate_signing, action=action.get(mechanism.id, None) if action else None, ) contracts[i] = contract running[i] = r if not running[i]: if contract is None: n_broken_ += 1 n_steps_broken_ += mechanism.state.step + 1 else: n_success_ += 1 n_steps_success_ += mechanism.state.step + 1 for _p in partners: self._add_edges( _p[0], _p, ( self._edges_negotiations_succeeded if contract is not None else self._edges_negotiations_failed ), issues=mechanism.issues, bi=True, ) current_step += 1 if current_step >= n_steps: break if self.time >= self.time_limit: break return ( contracts, running, n_steps_broken_, n_steps_success_, n_broken_, n_success_, )
[docs] def append_stats(self): if self._stats_file_name is not None: save_stats( self, log_dir=self._stats_dir_name, stats_file_name=self._stats_file_name, )
[docs] def step( self, n_neg_steps: int | None = None, n_mechanisms: int | None = None, actions: dict[str, Any] | None = None, neg_actions: dict[str, dict[str, MechanismAction | None]] | None = None, ) -> bool: """ A single simulation step. Args: n_mechanisms: Number of mechanisms to step (None for all) n_neg_steps: Number of steps for every mechanism (None to complete one simulation step) actions: Mapping of agent IDs to their actions. The agent will be asked to act only if this is not given neg_actions: Mapping of mechanism IDs to a negotiator action. Negotiators will be called upon to act only if no action is passed here. This is a dict with keys corresponding to mechanism IDs and values corresponding to a dict mapping a negotiator (key) to its action (value) Remarks: - We have two modes of operation depending on `n_neg_steps` 1. `n_neg_steps is None` will run a single complete simulation step every call including all negotiations and everything before and after them. 2. `n_neg_steps is an integer` will step the simulation so that the given number of simulation steps are executed every call. The simulator will run operations before and after negotiations appropriately. - We have two modes of operation depending on `n_mechanisms` 1. `n_mechanisms is None` will step all negotiations according to `n_neg_steps` 2. `n_mechanisms` is an integer and `n_neg_steps` will step this number of mechanisms in parallel every call to step. - We have a total of four modes: 1. `n_neg_steps` and `n_mechanisms` are both None: Each call to `step` corresponds to one simulation step from start to end. 2. `n_neg_steps` and `n_mechanisms` are both integers: Each call to `step` steps `n_mechanisms` mechanisms by `n_neg_steps` steps. 3. `n_neg_steps` is None and `n_mechanisms` is an integer: Each call to `step` runs `n_mechanisms` according to `negotiation_speed` 4. `n_neg_steps` is an integer and `n_mechanisms` is None: Each call to `step` steps all mechanisms `n_neg_steps` steps. - Never mix calls with `n_neg_steps` equaling `None` and an integer. - Never call this method again on a world if it ever returned `False` on that world. """ if self.time >= self.time_limit: return False if self.current_step >= self.n_steps: return False if self._saved_details_level > 0: self._sim_info.append( dict( id=len(self._sim_info) + 1, started=self.time, step=self.current_step, relative_time_start=self.relative_time, world=self.id, ) | self.extra_sim_step_info_pre() ) cross_step_boundary = n_neg_steps is not None if self._debug: existing = { _.mechanism.id for _ in self._negotiations.values() if _ is not None and _.mechanism is not None } passed = set(neg_actions.keys()) if neg_actions else set() missing = passed.difference(existing) assert not missing, f"Mechanisms not found:\n{existing=} ({len(existing)})\n{passed=} ({len(passed)})\n{missing=} ({len(missing)})" # _n_registered_negotiations_before = len(self._negotiations) n_steps_broken = self.__n_steps_broken n_steps_success = self.__n_steps_success n_broken = self.__n_broken n_success = self.__n_success def _negotiate(n_steps_to_run: int | None = n_neg_steps) -> bool: """Runs all bending negotiations. Returns True if all negotiations are done""" if n_steps_to_run is not None and n_steps_to_run == 0: mechanisms = list( (_.mechanism, _.partners) for _ in self._negotiations.values() if _ is not None ) running = [ _[0] for _ in mechanisms if _ is not None and not _[0].state.ended ] return not running mechanisms = list( (_.mechanism, _.partners) for _ in self._negotiations.values() if _ is not None and _.mechanism.id not in self.__stepped_mechanisms ) if n_mechanisms is not None and len(mechanisms) > n_mechanisms: mechanisms = mechanisms[:n_mechanisms] if not mechanisms: self.__stepped_mechanisms = set() mechanisms = list( (_.mechanism, _.partners) for _ in self._negotiations.values() if _ is not None ) if n_mechanisms is not None and len(mechanisms) > n_mechanisms: mechanisms = mechanisms[:n_mechanisms] ( _, _, self.__n_steps_broken, self.__n_steps_success, self.__n_broken, self.__n_success, ) = self._step_negotiations( [_[0] for _ in mechanisms], n_steps_to_run, False, [_[1] for _ in mechanisms], action=neg_actions, ) self.__stepped_mechanisms = self.__stepped_mechanisms.union( {_[0].id for _ in mechanisms} ) running = [ _.mechanism.id for _ in self._negotiations.values() if _ is not None and _.mechanism is not None and not _.mechanism.state.ended ] return not running def _finalize_sim_info(): if self._saved_details_level < 1: return self._sim_info[-1]["ended"] = self.time self._sim_info[-1]["relative_time_end"] = self.relative_time self._sim_info[-1]["duration"] = ( self._sim_info[-1]["ended"] - self._sim_info[-1]["started"] ) # if self._stats: # self._sim_info[-1] = ( # self._sim_info[-1] # | {k: v[-1] for k, v in self._stats.items()} # | self.extra_sim_step_info_pre() # ) df = pd.DataFrame.from_records(self._sim_info) assert self._extra_folder is not None df.to_csv(self._extra_folder / "simsteps.csv", index=False) self._save_extra() if cross_step_boundary: if self.operations[self.__next_operation_index] != Operations.Negotiations: self.__stepped_mechanisms = set() if not self._step_to_negotiations(cross_step_boundary): return False if _negotiate(n_neg_steps): self.__next_operation_index += 1 # TODO correct this. Curently we just store whatever happens in the last negotiation step not for all negotiations. # n_steps_broken_ = 0 n_steps_success_ = 0 n_broken_ = 0 n_success_ = 0 n_steps_broken = self.__n_steps_broken n_steps_success = self.__n_steps_success n_broken = self.__n_broken n_success = self.__n_success if self.time < self.time_limit: n_total_broken = n_broken + n_broken_ if n_total_broken > 0: n_steps_broken = ( n_steps_broken * n_broken + n_steps_broken_ * n_broken_ ) / n_total_broken n_broken = n_total_broken n_total_success = n_success + n_success_ if n_total_success > 0: n_steps_success = ( n_steps_success * n_success + n_steps_success_ * n_success_ ) / n_total_success n_success = n_total_success self.__n_steps_broken = n_steps_broken self.__n_steps_success = n_steps_success self.__n_broken = n_broken self.__n_success = n_success if self.__next_operation_index >= len(self.operations): self.__next_operation_index = 0 if not self._step_to_negotiations(cross_step_boundary): return False _finalize_sim_info() return True if self._debug: assert self.__next_operation_index == 0 if not self._step_to_negotiations(cross_step_boundary): return False self.__stepped_mechanisms = set() if self._debug: assert self.__next_operation_index != 0 while self.__next_operation_index != 0: if not _negotiate(n_neg_steps): pass # print( # "Some negotiations are still running but all should be completed by now" # ) self.__next_operation_index += 1 if self.__next_operation_index >= len(self.operations): self.__next_operation_index = 0 if not self._step_to_negotiations(cross_step_boundary): return False _finalize_sim_info() return True
def _pre_step(self) -> bool: self.__stepped_mechanisms = set() if self._start_time is None or self._start_time < 0: self._start_time = time.perf_counter() if self.time >= self.time_limit: return False if self.current_step >= self.n_steps: return False self.__stats_stage = 0 self.__stage = 0 self.__n_new_contract_executions = 0 self.__n_new_breaches = 0 self.__n_new_contract_errors = 0 self.__n_new_contract_nullifications = 0 self.__activity_level = 0 self.__blevel = 0.0 self.__n_steps_broken = 0 self.__n_steps_success = 0 self.__n_broken = 0 self.__n_success = 0 self.__n_registered_negotiations_before = 0 self._n_negs_per_agent_per_step = defaultdict(int) self._started = True if self.current_step == 0: self._sim_start = time.perf_counter() self._step_start = self._sim_start for priority in sorted(self._entities.keys()): for agent in self._entities[priority]: self.call(agent, agent.init_) if self.time >= self.time_limit: return False # update monitors for monitor in self.stats_monitors: if self.safe_stats_monitoring: __stats = copy.deepcopy(self.stats) else: __stats = self.stats monitor.init(__stats, world_name=self.name) for monitor in self.world_monitors: monitor.init(self) else: self._step_start = time.perf_counter() # do checkpoint processing self.checkpoint_on_step_started() for agent in self.agents.values(): self.call(agent, agent.on_simulation_step_started) if self.time >= self.time_limit: return False self.loginfo( f"{len(self._negotiations)} Negotiations/{len(self.agents)} Agents" ) return True def _step_to_negotiations(self, cross_step_boundary: bool = True) -> bool: """Runs the operations before/after negotiations but not the negotiations themselves""" if self.__next_operation_index == 0: if not self._pre_step(): return False # initialize stats # ---------------- def _step_agents(): # Step all entities in the world once: # ------------------------------------ # note that entities are simulated in the partial-order specified by their priority value tasks: list[Entity] = [] for priority in sorted(self._entities.keys()): tasks += [_ for _ in self._entities[priority]] for task in tasks: self.call(task, task.step_) if self.time >= self.time_limit: break def _sign_contracts(): self._process_unsigned() def _simulation_step(): # save simulation step information try: self.simulation_step(self.__stage) if self.time >= self.time_limit: return except Exception as e: self.simulation_exceptions[self._current_step].append(exception2str()) if not self.ignore_simulation_exceptions: raise (e) self.__stage += 1 def _execute_contracts(): # execute contracts that are executable at this step # -------------------------------------------------- # n_new_breaches = self.__n_new_breaches # n_new_contract_executions = self.__n_new_contract_executions # n_new_contract_errors = self.__n_new_contract_errors # n_new_contract_nullifications = self.__n_new_contract_nullifications # activity_level = self.__activity_level blevel = self.__blevel current_contracts = [ _ for _ in self.executable_contracts() if _.nullified_at < 0 ] if len(current_contracts) > 0: # remove expired contracts executed = set() current_contracts = self.order_contracts_for_execution( current_contracts ) for contract in current_contracts: if self.time >= self.time_limit: break if contract.signed_at < 0: continue try: contract_breaches = self.start_contract_execution(contract) except Exception as e: for p in contract.partners: self.contracts_erred[p] += 1 self.contract_exceptions[self._current_step].append( exception2str() ) contract.executed_at = self.current_step self._saved_contracts[contract.id]["breaches"] = "" self._saved_contracts[contract.id]["executed_at"] = -1 self._saved_contracts[contract.id]["dropped_at"] = -1 self._saved_contracts[contract.id]["nullified_at"] = -1 self._saved_contracts[contract.id][ "erred_at" ] = self._current_step self._add_edges( contract.partners[0], contract.partners, self._edges_contracts_erred, bi=True, ) self.__n_new_contract_errors += 1 if not self.ignore_contract_execution_exceptions: raise e exc_type, exc_value, exc_traceback = sys.exc_info() self.logerror( f"Contract exception @{str(contract)}: " f"{traceback.format_tb(exc_traceback)}", Event( "contract-exception", dict(contract=contract, exception=e), ), ) continue if contract_breaches is None: for p in contract.partners: self.contracts_nullified[p] += 1 self._saved_contracts[contract.id]["breaches"] = "" self._saved_contracts[contract.id]["executed_at"] = -1 self._saved_contracts[contract.id]["dropped_at"] = -1 self._saved_contracts[contract.id][ "nullified_at" ] = self._current_step self._add_edges( contract.partners[0], contract.partners, self._edges_contracts_nullified, bi=True, ) self._saved_contracts[contract.id]["erred_at"] = -1 self.__n_new_contract_nullifications += 1 self.loginfo( f"Contract nullified: {str(contract)}", Event("contract-nullified", dict(contract=contract)), ) elif len(contract_breaches) < 1: for p in contract.partners: self.contracts_executed[p] += 1 self._saved_contracts[contract.id]["breaches"] = "" self._saved_contracts[contract.id]["dropped_at"] = -1 self._saved_contracts[contract.id][ "executed_at" ] = self._current_step self._add_edges( contract.partners[0], contract.partners, self._edges_contracts_executed, bi=True, ) self._saved_contracts[contract.id]["nullified_at"] = -1 self._saved_contracts[contract.id]["erred_at"] = -1 executed.add(contract) self.__n_new_contract_executions += 1 _size = self.contract_size(contract) if _size is not None: self.__activity_level += _size for partner in contract.partners: self.call( self.agents[partner], self.agents[partner].on_contract_executed, contract, ) if self.time >= self.time_limit: break else: for p in contract.partners: self.contracts_breached[p] += 1 self._saved_contracts[contract.id]["executed_at"] = -1 self._saved_contracts[contract.id]["nullified_at"] = -1 self._saved_contracts[contract.id]["dropped_at"] = -1 self._saved_contracts[contract.id]["erred_at"] = -1 self._saved_contracts[contract.id]["breaches"] = "; ".join( f"{_.perpetrator}:{_.type}({_.level})" for _ in contract_breaches ) breachers = { (_.perpetrator, tuple(_.victims)) for _ in contract_breaches } for breacher, victims in breachers: if isinstance(victims, str) or isinstance(victims, Agent): victims = [victims] self._add_edges( breacher, victims, self._edges_contracts_breached, bi=False, ) for b in contract_breaches: self._saved_breaches[b.id] = b.as_dict() self.loginfo( f"Breach of {str(contract)}: {str(b)} ", Event( "contract-breached", dict(contract=contract, breach=b), ), ) resolution = self._process_breach( contract, list(contract_breaches) ) if resolution is None: self.__n_new_breaches += 1 blevel += sum(_.level for _ in contract_breaches) else: self.__n_new_contract_executions += 1 self.loginfo( f"Breach resolution cor {str(contract)}: {str(resolution)} ", Event( "breach-resolved", dict( contract=contract, breaches=list(contract_breaches), resolution=resolution, ), ), ) self.complete_contract_execution( contract, list(contract_breaches), resolution ) self.loginfo( f"Executed {str(contract)}", Event("contract-executed", dict(contract=contract)), ) for partner in contract.partners: self.call( self.agents[partner], self.agents[partner].on_contract_breached, contract, list(contract_breaches), resolution, ) if self.time >= self.time_limit: break contract.executed_at = self.current_step dropped = self.get_dropped_contracts() self.delete_executed_contracts() # note that all contracts even breached ones are to be deleted for c in dropped: self.loginfo( f"Dropped {str(c)}", Event("dropped-contract", dict(contract=c)) ) self._saved_contracts[c.id]["dropped_at"] = self._current_step for p in c.partners: self.contracts_dropped[p] += 1 self.__n_contracts_dropped += len(dropped) def _stats_update(): self.update_stats(self.__stats_stage) self.__stats_stage += 1 operation_map = { Operations.AgentSteps: _step_agents, Operations.ContractExecution: _execute_contracts, Operations.ContractSigning: _sign_contracts, Operations.Negotiations: None, Operations.SimulationStep: _simulation_step, Operations.StatsUpdate: _stats_update, } for i, operation in enumerate(self.operations): if i < self.__next_operation_index: continue if operation == Operations.Negotiations: self.__next_operation_index = i break operation_map[operation]() if self.time >= self.time_limit: self.__next_operation_index = i return False else: self.__next_operation_index = 0 # remove all negotiations that are completed # ------------------------------------------ completed = list( k for k, _ in self._negotiations.items() if _ is not None and _.mechanism.ended ) for key in completed: self._negotiations.pop(key, None) # update stats # ------------ self._stats["n_contracts_executed"].append(self.__n_new_contract_executions) self._stats["n_contracts_erred"].append(self.__n_new_contract_errors) self._stats["n_contracts_nullified"].append( self.__n_new_contract_nullifications ) self._stats["n_contracts_cancelled"].append(self.__n_contracts_cancelled) self._stats["n_contracts_dropped"].append(self.__n_contracts_dropped) self._stats["n_breaches"].append(self.__n_new_breaches) self._stats["breach_level"].append(self.__blevel) self._stats["n_contracts_signed"].append(self.__n_contracts_signed) self._stats["n_contracts_concluded"].append(self.__n_contracts_concluded) self._stats["n_negotiations"].append(self.__n_negotiations) self._stats["activity_level"].append(self.__activity_level) self._stats["n_registered_negotiations_before"].append( self.__n_registered_negotiations_before ) self._stats["n_negotiation_rounds_successful"].append( self.__n_steps_success ) self._stats["n_negotiation_rounds_failed"].append(self.__n_steps_broken) self._stats["n_negotiation_successful"].append(self.__n_success) self._stats["n_negotiation_failed"].append(self.__n_broken) self._stats["n_registered_negotiations_after"].append( len(self._negotiations) ) current_time = time.perf_counter() - self._step_start self._stats["step_time"].append(current_time) total = self._stats.get("total_time", [0.0])[-1] self._stats["total_time"].append(total + current_time) self.__n_negotiations = 0 self.__n_contracts_signed = 0 self.__n_contracts_concluded = 0 self.__n_contracts_cancelled = 0 self.__n_contracts_dropped = 0 self.__n_new_contract_executions = 0 self.__n_new_breaches = 0 self.__n_new_contract_errors = 0 self.__n_new_contract_nullifications = 0 self.__activity_level = 0 self.__blevel = 0.0 self.__n_steps_broken = 0 self.__n_steps_success = 0 self.__n_broken = 0 self.__n_success = 0 self.append_stats() for agent in self.agents.values(): self.call(agent, agent.on_simulation_step_ended) if self.time >= self.time_limit: return False for monitor in self.stats_monitors: if self.safe_stats_monitoring: __stats = copy.deepcopy(self.stats) else: __stats = self.stats monitor.step(__stats, world_name=self.name) for monitor in self.world_monitors: monitor.step(self) self._current_step += 1 self.frozen_time = self.time if cross_step_boundary: return self._step_to_negotiations() # always indicate that the simulation is to continue return True @property def total_time(self): """Returns total simulation time (till now) in mx""" return self._stats.get("total_time", [0.0])[-1] @property def saved_breaches(self) -> list[dict[str, Any]]: return list(self._saved_breaches.values()) @property def resolved_breaches(self) -> list[dict[str, Any]]: return list(_ for _ in self._saved_breaches.values() if _["resolved"]) @property def unresolved_breaches(self) -> list[dict[str, Any]]: return list(_ for _ in self._saved_breaches.values() if not _["resolved"])
[docs] def run(self): """Runs the simulation until it ends""" self._start_time = time.perf_counter() for _ in range(self.n_steps): if self.time >= self.time_limit: break if not self.step(): break
[docs] def run_with_progress(self, callback: Callable[[int], None] | None = None) -> None: """Runs the simulation showing progress, with optional callback""" from rich.progress import track self._start_time = time.perf_counter() for _ in track(range(self.n_steps)): if self.time >= self.time_limit: break if not self.step(): break
[docs] def register(self, x: Entity, simulation_priority: int = 0): """ Registers an entity in the world so it can be looked up by name. Should not be called directly Args: x: The entity to be registered simulation_priority: The simulation periority. Entities with lower periorities will be stepped first during Returns: """ # super().register(x) # If we inherit from session, we can do that but it is not needed as we do not do string # based resolution now if hasattr(x, "_world"): x._world = self if hasattr(x, "step_"): self._entities[simulation_priority].add(x)
[docs] def register_stats_monitor(self, m: StatsMonitor): self.stats_monitors.add(m)
[docs] def unregister_stats_monitor(self, m: StatsMonitor): self.stats_monitors.remove(m)
[docs] def register_world_monitor(self, m: WorldMonitor): self.world_monitors.add(m)
[docs] def unregister_world_monitor(self, m: WorldMonitor): self.world_monitors.remove(m)
[docs] def join(self, x: TAgent, simulation_priority: int = 0, **kwargs): """Add an agent to the world. Args: x: The agent to be registered simulation_priority: The simulation priority. Entities with lower pprioritieswill be stepped first during kwargs: Any key-value pairs specifying attributes of the agent. NegMAS internally uses the attribute 'color' when drawing the agent in `draw` Returns: """ self.register(x, simulation_priority=simulation_priority) self.agents[x.id] = x self.attribs[x.id] = kwargs x.awi = self.awi_type(self, x) if self._started and not x.initialized: self.call(x, x.init_) self.loginfo(f"{x.name} joined", Event("agent-joined", dict(agent=x))) aid = x.id self._agent_info[aid] = dict( id=len(self._agent_info) + 1, name=aid, type=self.type_name_for_logs(x) ) | self.extra_agent_info(x) if self._extra_folder is not None: df = pd.DataFrame.from_records(list(self._agent_info.values())) df.to_csv(self._extra_folder / "agents.csv", index=False)
def _combine_edges( self, beg: int, end: int, target: dict[int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]]], ): """Combines edges for the given steps [beg, end)""" result = deflistdict() def add_dicts(d1, d2): d3 = deflistdict() for k, v in d1.items(): d3[k] = v + d2[k] for k, v in d2.items(): if k not in d3.keys(): d3[k] = v return d3 for i in range(beg, end): result = add_dicts(result, target[i]) return result def _get_edges( self, target: dict[int, dict[tuple[TAgent, TAgent], list[dict[str, Any]]]], step: int, ) -> list[tuple[TAgent, TAgent, int]]: """Get the edges for the given step""" return [(*k, {"weight": len(v)}) for k, v in target[step].items() if len(v) > 0] def _register_negotiation( self, mechanism_name, mechanism_params, roles, caller, partners, annotation, issues, req_id, run_to_completion=False, may_run_immediately=True, group: str | None = None, ) -> tuple[NegotiationInfo | None, Contract | None, Mechanism | None]: """Registers a negotiation and returns the negotiation info""" if self._n_negs_per_agent_per_step[caller.id] >= self.neg_quota_step: return None, None, None if self._n_negs_per_agent[caller.id] >= self.neg_quota_simulation: return None, None, None self.neg_requests_sent[caller.id] += 1 for partner in partners: self.neg_requests_received[partner.id] += 1 n_outcomes_ = CartesianOutcomeSpace(issues).cardinality if n_outcomes_ < 1: self.logwarning( f"A negotiation with no outcomes is requested by {caller.name}", event=Event( "zero-outcomes-negotiation", dict(caller=caller, partners=partners, annotation=annotation), ), ) return None, None, None factory = MechanismFactory( world=self, mechanism_name=mechanism_name, mechanism_params=mechanism_params, issues=issues, req_id=req_id, caller=caller, partners=partners, roles=roles, annotation=annotation, group=group, neg_n_steps=self.neg_n_steps, neg_time_limit=self.neg_time_limit, neg_step_time_limit=self.neg_step_time_limit, log_ufuns_file=( str(Path(self._log_folder) / "ufuns.csv") if self._log_ufuns else None ), ) neg = factory.init() if neg is None: self._add_edges( caller, partners, self._edges_negotiations_rejected, issues=issues ) return None, None, None if neg.mechanism is None: self._add_edges( caller, partners, self._edges_negotiations_rejected, issues=issues ) return neg, None, None self.__n_negotiations += 1 self._n_negs_per_agent_per_step[caller.id] += 1 self._n_negs_per_agent[caller.id] += 1 self._add_edges( caller, partners, self._edges_negotiations_started, issues=issues ) # if not run_to_completion: self._negotiations[neg.mechanism.id] = neg self.negs_initiated[caller.id] += 1 for partner in partners: self.negs_registered[partner.id] += 1 self.on_negotiation_start(neg) if run_to_completion: running, contract = True, None while running: contract, running = self._step_a_mechanism(neg.mechanism, True) self._add_edges( caller, partners, ( self._edges_negotiations_succeeded if contract is not None else self._edges_negotiations_failed ), issues=issues, ) return None, contract, neg.mechanism if may_run_immediately and self.immediate_negotiations: running = True assert self.negotiation_speed is not None for _ in range(self.negotiation_speed): contract, running = self._step_a_mechanism(neg.mechanism, False) if not running: self._add_edges( caller, partners, ( self._edges_negotiations_succeeded if contract is not None else self._edges_negotiations_failed ), issues=issues, ) return None, contract, neg.mechanism # self.loginfo( # f'{caller.id} request was accepted') return neg, None, None def _unregister_negotiation(self, neg: MechanismFactory) -> None: if neg is None or neg.mechanism is None: return del self._negotiations[neg.mechanism.id]
[docs] def request_negotiation_about( self, req_id: str, caller: TAgent, issues: list[Issue], partners: list[TAgent | str], roles: list[str] | None = None, annotation: dict[str, Any] | None = None, mechanism_name: str | None = None, mechanism_params: dict[str, Any] | None = None, group: str | None = None, ) -> NegotiationInfo: """ Requests to start a negotiation with some other agents Args: req_id: An ID For the request that is unique to the caller caller: The agent requesting the negotiation partners: A list of partners to participate in the negotiation. Note that the caller itself may not be in this list which makes it possible for an agent to request a negotaition that it does not participate in. If that is not to be allowed in some world, override this method and explicitly check for these kinds of negotiations and return False. If partners is passed as a single string/`Agent` or as a list containing a single string/`Agent`, then he caller will be added at the beginning of the list. This will only be done if `roles` was passed as None. issues: Negotiation issues annotation: Extra information to be passed to the `partners` when asking them to join the negotiation partners: A list of partners to participate in the negotiation roles: The roles of different partners. If None then each role for each partner will be None mechanism_name: Name of the mechanism to use. It must be one of the mechanism_names that are supported by the `World` or None which means that the `World` should select the mechanism. If None, then `roles` and `my_role` must also be None mechanism_params: A dict of parameters used to initialize the mechanism object group: An identifier for the group to which the negotiation belongs. This is not not used by the system. Returns: None. The caller will be informed by a callback function `on_neg_request_accepted` or `on_neg_request_rejected` about the status of the negotiation. """ if roles is None: if isinstance(partners, str) or isinstance(partners, Agent): partners = [partners] if ( len(partners) == 1 and isinstance(partners[0], str) and partners[0] != caller.id ): partners = [caller.id, partners[0]] if ( len(partners) == 1 and isinstance(partners[0], Agent) and partners[0] != caller ): partners = [caller, partners[0]] self.loginfo( f"{caller.name} requested negotiation " + ( f"using {mechanism_name}[{mechanism_params}] " if mechanism_name is not None or mechanism_params is not None else "" ) + f"with {[_.name for _ in partners]} (ID {req_id})", Event( "negotiation-request", dict( caller=caller, partners=partners, issues=issues, mechanism_name=mechanism_name, annotation=annotation, req_id=req_id, ), ), ) neg, *_ = self._register_negotiation( mechanism_name=mechanism_name, mechanism_params=mechanism_params, roles=roles, caller=caller, partners=partners, annotation=annotation, group=group, issues=issues, req_id=req_id, run_to_completion=False, ) success = neg is not None and neg.mechanism is not None self._add_edges( caller, partners, ( self._edges_negotiation_requests_accepted if success else self._edges_negotiation_requests_rejected ), issues=issues, ) return neg
[docs] def run_negotiation( self, caller: TAgent, issues: list[Issue], partners: list[str | TAgent], negotiator: Negotiator, preferences: Preferences | None = None, caller_role: str | None = None, roles: list[str] | None = None, annotation: dict[str, Any] | None = None, mechanism_name: str | None = None, mechanism_params: dict[str, Any] | None = None, ) -> tuple[Contract | None, NegotiatorMechanismInterface | None]: """ Runs a negotiation until completion Args: caller: The agent requesting the negotiation partners: A list of partners to participate in the negotiation. Note that the caller itself may not be in this list which makes it possible for an agent to request a negotaition that it does not participate in. If that is not to be allowed in some world, override this method and explicitly check for these kinds of negotiations and return False. If partners is passed as a single string/`Agent` or as a list containing a single string/`Agent`, then he caller will be added at the beginning of the list. This will only be done if `roles` was passed as None. negotiator: The negotiator to be used in the negotiation preferences: The utility function. Only needed if the negotiator does not already know it caller_role: The role of the caller in the negotiation issues: Negotiation issues annotation: Extra information to be passed to the `partners` when asking them to join the negotiation partners: A list of partners to participate in the negotiation roles: The roles of different partners. If None then each role for each partner will be None mechanism_name: Name of the mechanism to use. It must be one of the mechanism_names that are supported by the `World` or None which means that the `World` should select the mechanism. If None, then `roles` and `my_role` must also be None mechanism_params: A dict of parameters used to initialize the mechanism object Returns: A Tuple of a contract and the nmi of the mechanism used to get it in case of success. None otherwise """ if roles is None: if isinstance(partners, str) or isinstance(partners, Agent): partners = [partners] if ( len(partners) == 1 and isinstance(partners[0], str) and partners[0] != caller.id ): partners = [caller.id, partners[0]] if ( len(partners) == 1 and isinstance(partners[0], Agent) and partners[0] != caller ): partners = [caller, partners[0]] partners = [self.agents[_] if isinstance(_, str) else _ for _ in partners] self.loginfo( f"{caller.name} requested immediate negotiation " f"{mechanism_name}[{mechanism_params}] with {[_.name for _ in partners]}", Event( "negotiation-request-immediate", dict( caller=caller, partners=partners, issues=issues, mechanism_name=mechanism_name, annotation=annotation, ), ), ) req_id = caller.create_negotiation_request( issues=issues, partners=partners, annotation=annotation, negotiator=negotiator, extra={}, ) neg, contract, mechanism = self._register_negotiation( mechanism_name=mechanism_name, mechanism_params=mechanism_params, roles=roles, caller=caller, partners=partners, annotation=annotation, issues=issues, req_id=req_id, run_to_completion=True, ) if contract is not None: return contract, mechanism.nmi if neg and neg.mechanism: mechanism = neg.mechanism if negotiator is not None: mechanism.add(negotiator, preferences=preferences, role=caller_role) mechanism.run() if mechanism.agreement is None: contract = None self._register_failed_negotiation( mechanism=mechanism.nmi, negotiation=neg ) else: contract = self._register_contract( mechanism.nmi, neg, self._tobe_signed_at(mechanism.agreement, True) ) return contract, mechanism.nmi return None, None
[docs] def run_negotiations( self, caller: TAgent, issues: list[Issue] | list[list[Issue]], partners: list[list[str | TAgent]], negotiators: list[Negotiator], preferences: list[Preferences] | None = None, caller_roles: list[str] | None = None, roles: list[list[str] | None] | None = None, annotations: list[dict[str, Any] | None] | None = None, mechanism_names: str | list[str] | None = None, mechanism_params: dict[str, Any] | list[dict[str, Any]] | None = None, all_or_none: bool = False, ) -> list[tuple[Contract, NegotiatorMechanismInterface]]: """ Requests to run a set of negotiations simultaneously. Returns after all negotiations are run to completion Args: caller: The agent requesting the negotiation partners: A list of list of partners to participate in the negotiation. Note that the caller itself may not be in this list which makes it possible for an agent to request a negotaition that it does not participate in. If that is not to be allowed in some world, override this method and explicitly check for these kinds of negotiations and return False. If partners[i] is passed as a single string/`Agent` or as a list containing a single string/`Agent`, then he caller will be added at the beginning of the list. This will only be done if `roles` was passed as None. issues: Negotiation issues negotiators: The negotiator to be used in the negotiation ufuns: The utility function. Only needed if the negotiator does not already know it caller_roles: The role of the caller in the negotiation annotations: Extra information to be passed to the `partners` when asking them to join the negotiation partners: A list of partners to participate in the negotiation roles: The roles of different partners. If None then each role for each partner will be None mechanism_names: Name of the mechanism to use. It must be one of the mechanism_names that are supported by the `World` or None which means that the `World` should select the mechanism. If None, then `roles` and `my_role` must also be None mechanism_params: A dict of parameters used to initialize the mechanism object all_of_none: If True, ALL partners must agree to negotiate to go through. Returns: A list of tuples each with two values: contract (None for failure) and nmi (The mechanism info [None if the partner refused the negotiation]) """ group = unique_name(base="NG") partners = [ [self.agents[_] if isinstance(_, str) else _ for _ in p] for p in partners ] n_negs = len(partners) if isinstance(issues[0], Issue): issues = [issues] * n_negs if roles is None or not ( isinstance(roles, list) and isinstance(roles[0], list) ): roles = [roles] * n_negs if annotations is None or isinstance(annotations, dict): annotations = [annotations] * n_negs if mechanism_names is None or isinstance(mechanism_names, str): mechanism_names = [mechanism_names] * n_negs if mechanism_params is None or isinstance(mechanism_params, dict): mechanism_params = [mechanism_params] * n_negs if caller_roles is None or isinstance(caller_roles, str): caller_roles = [caller_roles] * n_negs if negotiators is None or isinstance(negotiators, Negotiator): raise ValueError("Must pass all negotiators for run_negotiations") if preferences is None or isinstance(preferences, Preferences): preferences = [preferences] * n_negs self.loginfo( f"{caller.name} requested {n_negs} immediate negotiation " f"{mechanism_names}[{mechanism_params}] between {[[_.name for _ in p] for p in partners]}", Event( "negotiation-request", dict( caller=caller, partners=partners, issues=issues, mechanism_name=mechanism_names, all_or_none=all_or_none, annotations=annotations, ), ), ) negs = [] for issue, partner, role, annotation, mech_name, mech_param, negotiator_ in zip( issues, partners, roles, annotations, mechanism_names, mechanism_params, negotiators, ): if role is None: if isinstance(partner, str) or isinstance(partner, Agent): partner = [partner] if ( len(partner) == 1 and isinstance(partner[0], str) and partner[0] != caller.id ): partner = [caller.id, partners[0]] if ( len(partner) == 1 and isinstance(partner[0], Agent) and partner[0] != caller ): partner = [caller, partner[0]] req_id = caller.create_negotiation_request( issues=issue, partners=partner, annotation=annotation, negotiator=negotiator_, extra={}, ) neg, *_ = self._register_negotiation( mechanism_name=mech_name, mechanism_params=mech_param, roles=role, caller=caller, partners=partner, group=group, annotation=annotation, issues=issue, req_id=req_id, run_to_completion=False, may_run_immediately=False, ) # neg.partners.append(caller) if neg is None and all_or_none: for _n in negs: self._unregister_negotiation(_n) return [] negs.append(neg) if all(_ is None for _ in negs): return [] completed = [False] * n_negs contracts = [None] * n_negs amis = [None] * n_negs for i, (neg, crole, ufun, negotiator) in enumerate( zip(negs, caller_roles, preferences, negotiators) ): completed[i] = neg is None or (neg.mechanism is None) or negotiator is None if completed[i]: continue mechanism = neg.mechanism mechanism.add(negotiator, ufun=ufun, role=crole) locs = [i for i in range(n_negs) if not completed[i]] cs, rs, _, _, _, _ = self._step_negotiations( [negs[i].mechanism for i in locs], float("inf"), True, [negs[i].partners for i in locs], ) for i, loc in enumerate(locs): contracts[loc] = cs[i] completed[loc] = not rs[i] amis[i] = negs[i].mechanism.nmi return list(zip(contracts, amis))
def _log_header(self): if self.time is None: return f"{self.name} (not started)" return f"{self.current_step}/{self.n_steps} [{self.relative_time:0.2%}]"
[docs] def ignore_contract(self, contract, as_dropped=False): """ Ignores the contract as if it was never agreed upon or as if was dropped Args: contract: The contract to ignore as_dropped: If true, the contract is treated as a dropped invalid contract, otherwise it is treated as if it never happened. """ if as_dropped: if contract.agreement is not None: self.__n_contracts_dropped += 1 for p in contract.partners: self.contracts_dropped[p] += 1 if contract.id in self._saved_contracts.keys(): self._saved_contracts[contract.id]["dropped_at"] = self.current_step else: if contract.agreement is not None: self.__n_contracts_concluded -= 1 if contract.id in self._saved_contracts.keys(): if self._saved_contracts[contract.id]["signed_at"] >= 0: self.__n_contracts_signed -= 1 for p in contract.partners: self.contracts_signed[p] -= 1 del self._saved_contracts[contract.id] for p in contract.partners: self.contracts_dropped[p] += 1 self.on_contract_processed(contract)
@property def saved_contracts(self) -> list[dict[str, Any]]: return list(self._saved_contracts.values()) @property def executed_contracts(self) -> list[dict[str, Any]]: return list( _ for _ in self._saved_contracts.values() if _.get("executed_at", -1) >= 0 ) @property def signed_contracts(self) -> list[dict[str, Any]]: return list( _ for _ in self._saved_contracts.values() if _.get("signed_at", -1) >= 0 ) @property def nullified_contracts(self) -> list[dict[str, Any]]: return list( _ for _ in self._saved_contracts.values() if _.get("nullified_at", -1) >= 0 ) @property def erred_contracts(self) -> list[dict[str, Any]]: return list( _ for _ in self._saved_contracts.values() if _.get("erred_at", -1) >= 0 ) @property def cancelled_contracts(self) -> list[dict[str, Any]]: return list( _ for _ in self._saved_contracts.values() if not _.get("signed_at", -1) < 0 )
[docs] def save_config(self, file_name: str): """ Saves the config of the world as a yaml file Args: file_name: Name of file to save the config to Returns: """ with open(file_name, "w") as file: yaml.safe_dump(self.__dict__, file)
def _process_breach( self, contract: Contract, breaches: list[Breach], force_immediate_signing=True ) -> Contract | None: new_contract = None # calculate total breach level total_breach_levels = defaultdict(int) for breach in breaches: total_breach_levels[breach.perpetrator] += breach.level if self.breach_processing == BreachProcessing.VICTIM_THEN_PERPETRATOR: # give agents the chance to set renegotiation agenda in ascending order of their total breach levels for agent_name, _ in sorted( zip(total_breach_levels.keys(), total_breach_levels.values()), key=lambda x: x[1], ): agent = self.agents[agent_name] agenda = agent.set_renegotiation_agenda( contract=contract, breaches=breaches ) if agenda is None: continue negotiators = [] for partner in contract.partners: negotiator = self.call( self.agents[partner], self.agents[partner].respond_to_renegotiation_request, contract=contract, breaches=breaches, agenda=agenda, ) if self.time >= self.time_limit: negotiator = None if negotiator is None: break negotiators.append(negotiator) else: # everyone accepted this renegotiation results = self.run_negotiation( caller=agent, issues=agenda.issues, partners=[self.agents[_] for _ in contract.partners], ) if results is not None: new_contract, mechanism = results self._register_contract( mechanism=mechanism, negotiation=None, to_be_signed_at=self._tobe_signed_at( mechanism.agreement, force_immediate_signing ), ) break elif self.breach_processing == BreachProcessing.META_NEGOTIATION: raise NotImplementedError( "Meta negotiation is not yet implemented. Agents should negotiate about the " "agend then a negotiation should be conducted as usual" ) if new_contract is not None: for breach in breaches: if self.save_resolved_breaches: self._saved_breaches[breach.id]["resolved"] = True else: self._saved_breaches.pop(breach.id, None) return new_contract for breach in breaches: if self.save_unresolved_breaches: self._saved_breaches[breach.id]["resolved"] = False else: self._saved_breaches.pop(breach.id, None) self._register_breach(breach) return None # todo add _get_signing_delay(contract) and implement it in SCML2019 if nx:
[docs] def graph( self, steps: tuple[int, int] | int | None = None, what: list[str] | tuple[str, ...] = EDGE_TYPES, who: Callable[[TAgent], bool] | None = None, together: bool = True, ) -> nx.Graph | list[nx.Graph]: """ Generates a graph showing some aspect of the simulation Args: steps: The step/steps to generate the graphs for. If a tuple is given all edges within the given range (inclusive beginning, exclusive end) will be accumulated what: The edges to have on the graph. Options are: negotiations, concluded, signed, executed who: Either a callable that receives an agent and returns True if it is to be shown or None for all together: IF specified all edge types are put in the same graph. Returns: A networkx graph representing the world if together==True else a list of graphs one for each item in what """ if steps is None: steps = self.current_step if isinstance(steps, int): steps = [steps, steps + 1] steps = tuple(min(self.n_steps, max(0, _)) for _ in steps) def yes(x): return True if who is None: who = yes agents = [_.id for _ in self.agents.values() if who(_)] if together: g = nx.MultiDiGraph() g.add_nodes_from(agents) graphs = [g] * len(what) else: graphs = [nx.DiGraph() for _ in what] for g in graphs: g.add_nodes_from(agents) max_step = max(steps) - 1 for g, edge_type in zip(graphs, what): edge_info = getattr(self, f"_edges_{edge_type.replace('-', '_')}") edge_info = {max_step: self._combine_edges(*steps, edge_info)} color = EDGE_COLORS[edge_type] edgelist = self._get_edges(edge_info, max_step) for e in edgelist: e[2]["color"] = color g.add_edges_from(edgelist) return graphs[0] if together else graphs
[docs] def draw( self, steps: tuple[int, int] | int | None = None, what: Collection[str] = DEFAULT_EDGE_TYPES, who: Callable[[TAgent], bool] | None = None, where: Callable[[TAgent], int | tuple[float, float]] | None = None, together: bool = True, axs: Collection[Axes] | None = None, ncols: int = 4, figsize: tuple[int, int] = (15, 15), show_node_labels=True, show_edge_labels=True, **kwargs, ) -> tuple[Axes, nx.Graph] | tuple[Axes, list[nx.Graph]]: """ Generates a graph showing some aspect of the simulation Args: steps: The step/steps to generate the graphs for. If a tuple is given all edges within the given range (inclusive beginning, exclusive end) will be accomulated what: The edges to have on the graph. Options are: negotiations, concluded, signed, executed who: Either a callable that receives an agent and returns True if it is to be shown or None for all where: A callable that returns for each agent the position it showed by drawn at either as an integer specifying the column in which to draw the column or a tuple of two floats specifying the position within the drawing area of the agent. If None, the default Networkx layout will be used. together: IF specified all edge types are put in the same graph. axs: The axes used for drawing. If together is true, it should be a single `Axes` object otherwise it should be a list of `Axes` objects with the same length as what. show_node_labels: show node labels! show_edge_labels: show edge labels! kwargs: passed to networx.draw_networkx Returns: A networkx graph representing the world if together==True else a list of graphs one for each item in what """ import matplotlib.pyplot as plt if not self.construct_graphs: self.logwarning( "Asked to draw a world simulation without enabling `construct_graphs`. Will be ignored" ) return [None, None] if steps is None: steps = self.current_step if isinstance(steps, int): steps = [steps, steps + 1] steps = tuple(min(self.n_steps, max(0, _)) for _ in steps) def yes(x): return True if who is None: who = yes if together: titles = [""] else: titles = what if axs is None: if together: fig, axs = plt.subplots() else: nrows = int(math.ceil(len(what) / ncols)) fig, axs = plt.subplots(nrows, ncols, figsize=figsize) axs = axs.flatten().tolist() if together: axs = [axs] graphs = self.graph(steps, what, who, together) graph = graphs[0] if not together else graphs graphs = [graphs] if together else graphs if where is None: pos = nx.spring_layout(graph, iterations=200) else: pos = [where(a) for a in graph.nodes] if not isinstance(pos[0], tuple): deltax = 5 deltay = 5 cols = defaultdict(list) for agent, p in zip(graph.nodes, pos): cols[p].append(agent) pos = dict() for c, ros in cols.items(): for r, agent in enumerate(ros): pos[agent] = ((1 + c) * deltay, r * deltax) else: pos = dict(zip(graph.nodes, pos)) if together: g = graph nx.draw_networkx_nodes(g, pos, ax=axs[0]) edges = [_ for _ in g.edges] if len(edges) > 0: info = [_ for _ in g.edges.data("color")] colors = [_[2] for _ in info] edges = [(_[0], _[1]) for _ in info] clist = list(set(colors)) edgelists = [list() for _ in range(len(clist))] for c, lst in zip(clist, edgelists): for i, clr in enumerate(colors): if clr == c: lst.append(edges[i]) for lst, clr in zip(edgelists, clist): nx.draw_networkx_edges( g, pos, edgelist=g.edges, edge_color=clr, ax=axs[0] ) if show_edge_labels: info = [_ for _ in g.edges.data("weight")] weights = [str(_[2]) for _ in info if _[2] > 1] edges = [(_[0], _[1]) for _ in info if _[2] > 1] nx.draw_networkx_edge_labels( g, pos, dict(zip(edges, weights)), ax=axs[0] ) if show_node_labels: nx.draw_networkx_labels( g, pos, dict(zip(g.nodes, g.nodes)), ax=axs[0] ) else: for g, ax, title in zip(graphs, axs, titles): nx.draw_networkx_nodes(g, pos, ax=ax) nx.draw_networkx_edges( g, pos, edgelist=g.edges, edge_color=EDGE_COLORS[title], ax=ax ) if show_edge_labels: info = [_ for _ in g.edges.data("weight")] weights = [str(_[2]) for _ in info if _[2] > 1] edges = [(_[0], _[1]) for _ in info if _[2] > 1] nx.draw_networkx_edge_labels( g, pos, dict(zip(edges, weights)), ax=ax ) if show_node_labels: nx.draw_networkx_labels( g, pos, dict(zip(g.nodes, g.nodes)), ax=ax ) ax.set_ylabel(title) total_time = time.perf_counter() - self._sim_start step = max(steps) remaining = (self.n_steps - step - 1) * total_time / (step + 1) title = ( f"Step: {step + 1}/{self.n_steps} [{humanize_time(total_time)} rem " f"{humanize_time(remaining)}] {total_time / (remaining + total_time):04.2%}" ) if together: axs[0].set_title(title) else: f = plt.gcf() f.suptitle(title) return (axs[0], graph) if together else (axs, graphs)
[docs] def save_gif( self, path: str | Path | None = None, what: Collection[str] = EDGE_TYPES, who: Callable[[TAgent], bool] | None = None, together: bool = True, draw_every: int = 1, fps: int = 5, ) -> None: try: import gif if path is None and self.log_folder is not None: path = Path(self.log_folder) / (self.name + ".gif") # define the animation function. Simply draw the world @gif.frame def plot_frame(s): self.draw( steps=(s - draw_every, s), what=what, who=who, together=together, ncols=3, figsize=(20, 20), ) # create frames frames = [] for s in range(draw_every, self.n_steps): if s % draw_every != 0: continue frames.append(plot_frame(s)) if path is not None: path.unlink(missing_ok=True) gif.save(frames, str(path), duration=1000 // fps) return frames except Exception as e: self.logwarning(f"GIF generation failed with exception {str(e)}") warn( "GIF generation failed. Make suer you have gif installed\n\nyou can install it using >> pip install gif", NegmasImportWarning, ) return []
@property def business_size(self) -> float: """The total business size defined as the total money transferred within the system""" if "activity_level" not in self.stats: return np.nan return sum(self.stats["activity_level"]) @property def n_negotiation_rounds_successful(self) -> float: """Average number of rounds in a successful negotiation""" if "n_contracts_concluded" not in self.stats: return np.nan n_negs = sum(self.stats["n_contracts_concluded"]) if n_negs == 0: return np.nan if "n_negotiation_rounds_successful" not in self.stats: return np.nan return sum(self.stats["n_negotiation_rounds_successful"]) / n_negs @property def n_negotiation_rounds_failed(self) -> float: """Average number of rounds in a successful negotiation""" if "n_negotiations" not in self.stats: return np.nan n_negs = sum(self.stats["n_negotiations"]) - self.n_saved_contracts(True) if n_negs == 0: return np.nan if "n_negotiation_rounds_failed" not in self.stats: return np.nan return sum(self.stats["n_negotiation_rounds_failed"]) / n_negs @property def contract_execution_fraction(self) -> float: """Fraction of signed contracts successfully executed with no breaches, or errors""" if "n_contracts_executed" not in self.stats: return np.nan n_executed = sum(self.stats["n_contracts_executed"]) n_signed_contracts = len( [_ for _ in self._saved_contracts.values() if _["signed_at"] >= 0] ) return n_executed / n_signed_contracts if n_signed_contracts > 0 else np.nan @property def contract_dropping_fraction(self) -> float: """Fraction of signed contracts that were never executed because they were signed to late to be executable""" if "n_contracts_dropped" not in self.stats: return np.nan n_dropped = sum(self.stats["n_contracts_dropped"]) n_signed_contracts = len( [_ for _ in self._saved_contracts.values() if _["signed_at"] >= 0] ) return n_dropped / n_signed_contracts if n_signed_contracts > 0 else np.nan @property def contract_err_fraction(self) -> float: """Fraction of signed contracts that caused exception during their execution""" if "n_contracts_erred" not in self.stats: return np.nan n_erred = sum(self.stats["n_contracts_erred"]) n_signed_contracts = len( [_ for _ in self._saved_contracts.values() if _["signed_at"] >= 0] ) return n_erred / n_signed_contracts if n_signed_contracts > 0 else np.nan @property def contract_nullification_fraction(self) -> float: """Fraction of signed contracts were nullified by the system (e.g. due to bankruptcy)""" if "n_contracts_nullified" not in self.stats: return np.nan n_nullified = sum(self.stats["n_contracts_nullified"]) n_signed_contracts = len( [_ for _ in self._saved_contracts.values() if _["signed_at"] >= 0] ) return n_nullified / n_signed_contracts if n_signed_contracts > 0 else np.nan @property def breach_level(self) -> float: """The average breach level per contract""" if "breach_level" not in self.stats: return np.nan blevel = np.nansum(self.stats["breach_level"]) n_contracts = sum(self.stats["n_contracts_executed"]) + sum( self.stats["n_breaches"] ) return blevel / n_contracts if n_contracts > 0 else np.nan
[docs] @abstractmethod def delete_executed_contracts(self) -> None: """Called after processing executable contracts at every simulation step to delete processed contracts"""
[docs] @abstractmethod def executable_contracts(self) -> Collection[Contract]: """Called at every time-step to get the contracts that are `executable` at this point of the simulation"""
[docs] def get_dropped_contracts(self) -> Collection[Contract]: """Called at the end of every time-step to get a list of the contracts that are signed but will never be executed""" return []
[docs] def post_step_stats(self): """Called at the end of the simulation step to update all stats Kept for backward compatibility and will be dropped. Override `update_stats` ins """
[docs] def pre_step_stats(self): """Called at the beginning of the simulation step to prepare stats or update them Kept for backward compatibility and will be dropped. Override `update_stats` instead """
[docs] def update_stats(self, stage: int): """ Called to update any custom stats that the world designer wants to keep Args: stage: How many times was this method called during this stage Remarks: - Default behavior is: - If `Operations` . `StatsUpdate` appears once in operations, it calls post_step_stats once - Otherwise: it calls pre_step_stats for stage 0, and post_step_stats for any other stage. """ if self._single_stats_call: self.post_step_stats() return if stage == 0: self.pre_step_stats() return self.post_step_stats() return
[docs] @abstractmethod def order_contracts_for_execution( self, contracts: Collection[Contract] ) -> Collection[Contract]: """Orders the contracts in a specific time-step that are about to be executed""" return contracts
[docs] @abstractmethod def start_contract_execution(self, contract: Contract) -> set[Breach] | None: """ Tries to execute the contract Args: contract: Returns: Set[Breach]: The set of breaches committed if any. If there are no breaches return an empty set Remarks: - You must call super() implementation of this method before doing anything - It is possible to return None which indicates that the contract was nullified (i.e. not executed due to a reason other than an execution exeception). """ self.loginfo( f"Executing {str(contract)}", Event("executing-contract", dict(contract=contract)), ) return set()
[docs] @abstractmethod def complete_contract_execution( self, contract: Contract, breaches: list[Breach], resolution: Contract ) -> None: """ Called after breach resolution is completed for contracts for which some potential breaches occurred. Args: contract: The contract considered. breaches: The list of potential breaches that was generated by `_execute_contract`. resolution: The agreed upon resolution Returns: """
[docs] @abstractmethod def execute_action( self, action: Action, agent: TAgent, callback: Callable | None = None ) -> bool: """Executes the given action by the given agent"""
[docs] @abstractmethod def get_private_state(self, agent: TAgent) -> dict: """Reads the private state of the given agent"""
[docs] @abstractmethod def simulation_step(self, stage: int = 0): """A single step of the simulation. Args: stage: How many times so far was this method called within the current simulation step Remarks: - Using the stage parameter, it is possible to have `Operations` . `SimulationStep` several times with the list of operations while differentiating between these calls. """
[docs] @abstractmethod def contract_size(self, contract: Contract) -> float: """ Returns an estimation of the **activity level** associated with this contract. Higher is better Args: contract: Returns: """
def __getstate__(self): state = self.__dict__.copy() if "logger" in state.keys(): state.pop("logger", None) return state def __setstate__(self, state): self.__dict__ = state self.logger = create_loggers( file_name=self.log_file_name, module_name=None, screen_level=self.log_screen_level if self.log_to_screen else None, file_level=self.log_file_level, app_wide_log_file=True, )
[docs] @staticmethod def combine_stats( worlds: tuple[World, ...] | World, stat: str, pertype=False, method=np.mean, n_steps: int | None = None, ): """Combines statistics by the given combination method. Args: worlds: The worlds to combine stats from stat: The statistic to combine pertype: combine agent-statistics per type method: plot sum for type statistics instead of mean n_steps: If not given, then absolute step number will be used for combining stats, otherwise all stats will be resampled to be n_steps before being combined. If a negative number, then the maximum number of steps in all worlds will be used """ if isinstance(worlds, World): worlds = (worlds,) if not worlds: return dict() if n_steps is not None and n_steps < 0: n_steps = max(_.n_steps for _ in worlds) combined_stats = defaultdict(list) max_length = defaultdict(int) for world in worlds: world_stats = [_ for _ in world.stats.keys() if _.startswith(stat)] if len(world_stats) == 0: continue defaultdict(list) for s in world_stats: if not pertype or not any(s.endswith(_) for _ in world.agents.keys()): # this is not an agent statistic or an agent statistic but we are not combining types z = world.stats[s] combined_stats[s].append(np.asarray(z)) max_length[s] = max(max_length[s], len(z)) continue parts = s.split("_") base, aid = "_".join(parts[:-1]), parts[-1] if aid not in world.agents.keys(): z = world.stats[s] combined_stats[s].append(np.asarray(z)) max_length[s] = max(max_length[s], len(z)) continue type_ = world.agents[aid].short_type_name key = type_ if base == stat else f"{type_} ({base})" z = world.stats[s] combined_stats[key].append(np.asarray(z)) max_length[key] = max(max_length[key], len(z)) if n_steps is None: combined_stats = { k: [ np.pad(x.flatten(), (0, max_length[k] - len(x))) if len(x) < max_length[k] else x.flatten() for x in v ] for k, v in combined_stats.items() if v } else: combined_stats = { k: [ np.interp(np.arange(n_steps), np.arange(len(x)), x) if len(x) != n_steps else x for x in v ] for k, v in combined_stats.items() if v } combined_stats = { k: method(np.vstack(tuple(_.reshape((1, len(_))) for _ in v)), axis=0) for k, v in combined_stats.items() if v } return combined_stats
[docs] @classmethod def plot_combined_stats( cls, worlds: tuple[World, ...] | World, stats: str | tuple[str, ...], n_steps: int | None = -1, pertype=False, use_sum=False, makefig=False, title=True, ylabel=False, xlabel=False, legend=True, figsize=None, ylegend=2.0, legend_ncols=8, ): """Plots combined statistics of multiple worlds in a single plot Args: stats: The statistics to plot. If `None`, some selected stats will be displayed pertype: combine agent-statistics per type use_sum: plot sum for type statistics instead of mean title: If given a title will be added to each subplot ylabel: If given, the ylabel will be added to each subplot xlabel: If given The xlabel will be added (Simulation Step) legend: If given, a legend will be displayed makefig: If given a new figure will be started figsize: Size of the figure to host the plot ylegend: y-axis of legend for cases with large number of labels legend_n_cols: number of columns in the legend """ import matplotlib.pyplot as plt if makefig: plt.figure(figsize=figsize) n_plots = 0 if isinstance(stats, str): stats = (stats,) styles = [ ("solid", "solid"), # same as (0, ()) or '-' ("dotted", "dotted"), # same as (0, (1, 1)) or ':' ("dashed", "dashed"), # same as '--' ("dashdot", "dashdot"), ("loosely dotted", (0, (1, 10))), ("dotted", (0, (1, 1))), ("densely dotted", (0, (1, 1))), ("long dash with offset", (5, (10, 3))), ("loosely dashed", (0, (5, 10))), ("dashed", (0, (5, 5))), ("densely dashed", (0, (5, 1))), ("loosely dashdotted", (0, (3, 10, 1, 10))), ("dashdotted", (0, (3, 5, 1, 5))), ("densely dashdotted", (0, (3, 1, 1, 1))), ("dashdotdotted", (0, (3, 5, 1, 5, 1, 5))), ("loosely dashdotdotted", (0, (3, 10, 1, 10, 1, 10))), ("densely dashdotdotted", (0, (3, 1, 1, 1, 1, 1))), ] n_per_style = 5 for stat in stats: means = cls.combine_stats( worlds, stat=stat, pertype=pertype, method=np.mean, n_steps=n_steps ) sterrs = cls.combine_stats( worlds, stat=stat, pertype=pertype, method=scipy.stats.sem, n_steps=n_steps, ) for k, mean in means.items(): sterr = np.maximum(sterrs.get(k, np.zeros_like(mean)), 0) no_err = np.all(sterr < 1e-10) linestyle = styles[n_plots // n_per_style][1] n_plots += 1 if no_err: plt.plot(mean, label=k, linestyle=linestyle) else: if np.any(sterr < 0): pass plt.errorbar( range(mean.size) if n_steps is None or n_steps > 0 else np.linspace(0, 1, len(mean)), mean, sterr, linestyle=linestyle, label=k, ) if title: plt.title(stat) if ylabel: plt.ylabel(stat) if xlabel: plt.xlabel( "Simulation Step" if n_steps is not None else "Relative Time" ) if legend: if n_plots < 4: plt.legend() else: plt.legend( loc="upper left", bbox_to_anchor=(-0.02, ylegend), ncol=legend_ncols, fancybox=True, shadow=True, )
[docs] def plot_stats( self, stats: str | tuple[str, ...], pertype=False, use_sum=False, makefig=False, title=True, ylabel=False, xlabel=False, legend=True, figsize=None, ylegend=2.0, legend_ncols=8, ): """Plots statistics of the world in a single plot Args: stats: The statistics to plot. If `None`, some selected stats will be displayed pertype: combine agent-statistics per type use_sum: plot sum for type statistics instead of mean title: If given a title will be added to each subplot ylabel: If given, the ylabel will be added to each subplot xlabel: If given The xlabel will be added (Simulation Step) legend: If given, a legend will be displayed makefig: If given a new figure will be started figsize: Size of the figure to host the plot ylegend: y-axis of legend for cases with large number of labels legend_n_cols: number of columns in the legend """ import matplotlib.pyplot as plt if makefig: plt.figure(figsize=figsize) n_plots = 0 suffixes = tuple() if isinstance(stats, str): stats = (stats,) styles = [ ("solid", "solid"), # same as (0, ()) or '-' ("dotted", "dotted"), # same as (0, (1, 1)) or ':' ("dashed", "dashed"), # same as '--' ("dashdot", "dashdot"), ("loosely dotted", (0, (1, 10))), ("dotted", (0, (1, 1))), ("densely dotted", (0, (1, 1))), ("long dash with offset", (5, (10, 3))), ("loosely dashed", (0, (5, 10))), ("dashed", (0, (5, 5))), ("densely dashed", (0, (5, 1))), ("loosely dashdotted", (0, (3, 10, 1, 10))), ("dashdotted", (0, (3, 5, 1, 5))), ("densely dashdotted", (0, (3, 1, 1, 1))), ("dashdotdotted", (0, (3, 5, 1, 5, 1, 5))), ("loosely dashdotdotted", (0, (3, 10, 1, 10, 1, 10))), ("densely dashdotdotted", (0, (3, 1, 1, 1, 1, 1))), ] n_per_style = 5 for stat in stats: suffix = ( stat.split("_")[-1] if any(stat.endswith(_) for _ in suffixes) else "" ) prefix = ( "_".join(stat.split("_")[:-1]) if any(stat.endswith(_) for _ in suffixes) else stat ) world_stats = [ _ for _ in self.stats.keys() if _.startswith(prefix) and (not suffix or _.endswith(suffix)) ] if len(world_stats) == 0: continue if len(world_stats) == 1: linestyle = styles[n_plots // n_per_style][1] n_plots += 1 plt.plot( self.stats[world_stats[0]], label=world_stats[0], linestyle=linestyle, ) if title: plt.title(stat) if ylabel: plt.ylabel(stat) if xlabel: plt.xlabel("Simulation Step") continue type_world_stats = defaultdict(list) base = "" for s in world_stats: parts = s.split("_") base, aid = "_".join(parts[:-1]), parts[-1] if suffix: assert suffix == aid, f"{suffix=}, {aid=}, {s=}, {base=}, {stat=}" bparts = base.split("_") base = f"{'_'.join(bparts[:-1])}_{suffix}" aid = bparts[-1] if pertype: type_ = self.agents[aid].type_name.split(":")[-1].split(".")[-1] type_world_stats[type_].append(self.stats[s]) continue n_plots += 1 linestyle = styles[n_plots // n_per_style][1] plt.plot(self.stats[s], label=aid, linestyle=linestyle) if not pertype: if title: plt.title(base) if ylabel: plt.ylabel(base) if xlabel: plt.xlabel("Simulation Step") continue for t, s in type_world_stats.items(): if not s: continue n = len(s) y = sum(np.asarray(_) for _ in s) yerr = None if n > 1 and not use_sum: y = y / int(n) s = np.asarray([list(_) for _ in s]) yerr = np.std(s, axis=0) / np.sqrt(n) assert len(yerr) == len(y), f"{yerr=}\n{y=}\n{s=}" n_plots += 1 linestyle = styles[n_plots // n_per_style][1] plt.errorbar( range(len(y)), y, yerr, linestyle=linestyle, label=t if len(stats) == 1 else f"{t} ({base})", ) if len(stats) == 1 and title: plt.title(base) if len(stats) == 1 and ylabel: plt.ylabel(base) if len(stats) == 1 and xlabel: plt.xlabel("Simulation Step") if n_plots > 1 and legend: if n_plots < 4: plt.legend() else: plt.legend( loc="upper left", bbox_to_anchor=(-0.02, ylegend), ncol=legend_ncols, fancybox=True, shadow=True, )
[docs] def on_negotiation_start(self, info: NegotiationInfo): if self._saved_details_level < 2: return mech = info.mechanism if mech is None: return myid = mech.id self._neg_info[myid]["id"] = len(self._neg_info) + 1 self._neg_info[myid]["sim_step"] = self.current_step self._neg_info[myid]["simstep_id"] = self._sim_info[-1]["id"] self._neg_info[myid]["name"] = myid self._neg_info[myid]["n_steps"] = mech.n_steps self._neg_info[myid]["time_limit"] = mech.time_limit self._neg_info[myid]["hidden_time_limit"] = mech._hidden_time_limit self._neg_info[myid]["world"] = self.id
[docs] def type_name_for_logs(self, agent: Agent | None) -> str | None: if not agent: return None return agent.type_name
[docs] def on_negotiation_stepped(self, info: NegotiationInfo): try: if self._saved_details_level < 3: return mech = info.mechanism if mech is None: return if not isinstance(mech, SAOMechanism): return agent_map: dict[str | None, Agent | None] = dict( zip( mech.negotiator_ids, [mech._negotiator_map[_].owner for _ in mech.negotiator_ids], ) ) state = mech.state assert isinstance(state, SAOState) def response(state: SAOState): if state.agreement: return "agreement" if state.timedout: return "timedout" if state.ended: return "ended" if state.has_error: return "error" return "continuing" for neg, offer in state.new_offers: agent = agent_map[neg] partner_neg = [_ for _ in mech.negotiator_ids if _ != neg][0] partner = agent_map[partner_neg] d = [ len(self._offer_info) + 1, self._neg_info[mech.id]["id"], state.step, state.relative_time, state.time, neg, partner_neg, self._agent_info[agent.id if agent else None]["id"], self._agent_info[partner.id if partner else None]["id"], response(state), ] + list(self.extract_action_info(offer)) self._offer_info.append(tuple(d)) except Exception as e: self.simulation_exceptions[self._current_step].append(exception2str()) if not self.ignore_simulation_exceptions: raise (e)
[docs] def on_negotiation_end(self, info: NegotiationInfo): try: if self._saved_details_level < 2: return mech = info.mechanism if mech is None: return myid = mech.id self._neg_info[myid]["step"] = mech.current_step self._neg_info[myid]["time"] = mech.time self._neg_info[myid]["relative_time"] = mech.relative_time self._neg_info[myid]["has_agreement"] = mech.agreement is not None self._neg_info[myid]["timedout"] = mech.state.timedout self._neg_info[myid]["ended"] = mech.state.ended self._neg_info[myid]["erred"] = mech.state.has_error self._neg_info[myid]["error"] = ( mech.state.error_details if mech.state.error_details else "" ) self._neg_info[myid]["erred_agent_id"] = self._agent_info[ mech.state.erred_agent if mech.state.erred_agent else None ]["id"] for i, p in enumerate(mech.agent_ids): self._neg_info[myid][f"agent{i}_id"] = self._agent_info[p]["id"] for i, t in enumerate(mech.negotiator_times): self._neg_info[myid][f"agent_time{i}"] = t for (col, _), val in zip( self._agreement_info_cols, self.extract_agreement_info(mech.agreement) ): self._neg_info[myid][col] = val extra_info = self.extra_neg_info(info) for k, v in extra_info.items(): self._neg_info[myid][k] = v except Exception as e: self.simulation_exceptions[self._current_step].append(exception2str()) if not self.ignore_simulation_exceptions: raise (e)
def _save_extra(self): try: if self._saved_details_level < 2: return assert self._extra_folder is not None df = pd.DataFrame.from_records(list(self._neg_info.values())) df.to_csv(self._extra_folder / "negs.csv", index=False) if self._saved_details_level < 3: return df = pd.DataFrame( self._offer_info, columns=[_[0] for _ in self._offer_info_cols], # type: ignore ) df.astype( dict( zip( [_[0] for _ in self._offer_info_cols], [_[1] for _ in self._offer_info_cols], ) ) ) df.to_csv(self._extra_folder / "actions.csv", index=False) except Exception as e: self.simulation_exceptions[self._current_step].append(exception2str()) if not self.ignore_simulation_exceptions: raise (e)
[docs] def on_negotiation_processed( self, info: NegotiationInfo, contract: Contract | None ): """ Called whenever a negotiation is ended **after** calling on_negotiation_failure_/on_negotiation_success_ Remarks: If contract is `None`, the negotiation ended in failure otherwise it ended in success. """ _ = info, contract