Source code for negmas.negotiators.controller

from __future__ import annotations
from collections import namedtuple
from typing import TYPE_CHECKING, Any, TypeVar, Generic

from negmas.common import MechanismState, NegotiatorMechanismInterface
from negmas.events import Notification
from negmas.helpers import get_class
from negmas.preferences import Preferences
from negmas.types import Rational

from .negotiator import Negotiator

if TYPE_CHECKING:
    from negmas.preferences import BaseUtilityFunction
    from negmas.situated import Agent

    from .controlled import ControlledNegotiator


__all__ = ["Controller"]

NegotiatorInfo = namedtuple("NegotiatorInfo", ["negotiator", "context"])
"""
The return type of `negotiators` member of `Controller`.
"""

TControlledNegotiator = TypeVar("TControlledNegotiator", bound=Negotiator)
TNMI = TypeVar("TNMI", bound=NegotiatorMechanismInterface)
TState = TypeVar("TState", bound=MechanismState)


[docs] class Controller(Rational, Generic[TNMI, TState, TControlledNegotiator]): """ Controls the behavior of multiple negotiators in multiple negotiations. The controller class MUST implement any methods of the negotiator class it is controlling with one added argument negotiator_id (str) which represents ID of the negotiator on which the method is being invoked (passed first). Controllers for specific classes should inherit from this class and implement whatever methods they want to override on their `ControlledNegotiator` objects. For example, the SAO module defines `SAOController` that needs only to implement `propose` and `respond` . Args: default_negotiator_type: The negotiator type to use for adding negotiator if no type is explicitly given. default_negotiator_params: The parameters to use to construct the default negotiator type. parent: The parent which can be an `Agent` or another `Controller` auto_kill: If True, negotiators will be killed once their negotiation finishes. name: The controller name Remarks: - Controllers should always call negotiator methods using the `call` method defined in this class. Direct calls may lead to infinite loops """ def __init__( self, default_negotiator_type: str | TControlledNegotiator | None = None, default_negotiator_params: dict[str, Any] | None = None, parent: Controller | Agent | None = None, auto_kill: bool = True, **kwargs, ): super().__init__(**kwargs) self._negotiators: dict[str, NegotiatorInfo] = {} if default_negotiator_params is None: default_negotiator_params = {} if isinstance(default_negotiator_type, str): default_negotiator_type = get_class(default_negotiator_type) # type: ignore self.__default_negotiator_type = default_negotiator_type self.__default_negotiator_params = default_negotiator_params self.__parent = parent self._auto_kill = auto_kill @property def default_negotiator_type(self): return self.__default_negotiator_type
[docs] def is_clean(self) -> bool: """Checks that the agent has no negotiators and that all its intermediate data-structures are reset""" return len(self._negotiators) == 0
[docs] def reset(self): """ Resets the controller and kills any negotiators it may have """ negs = list(self.negotiators.keys()) for neg in negs: self.kill_negotiator(neg, True)
@property def negotiators(self) -> dict[str, NegotiatorInfo]: """ Returns a dictionary mapping negotiator ID to the a tuple containing the negotiator and its context. """ return self._negotiators @property def unfinished_negotiators(self) -> dict[str, NegotiatorInfo]: """ Returns the negotiators whose negotiations started and did not complete yet Returns a dictionary mapping negotiator ID to the a tuple containing the negotiator and its context """ return { k: v for k, v in self._negotiators.items() if v[0].nmi is not None and v[0].nmi.state.started and v[0].nmi.state.running } @property def finished_negotiators(self) -> dict[str, NegotiatorInfo]: """ Returns the negotiators whose negotiations started and completed Returns a dictionary mapping negotiator ID to the a tuple containing the negotiator and its context """ return { k: v for k, v in self._negotiators.items() if v[0].nmi is not None and v[0].nmi.state.started and not v[0].nmi.state.running } @property def to_start_negotiators(self) -> dict[str, NegotiatorInfo]: """ Returns the negotiators whose negotiations did not start yet Returns a dictionary mapping negotiator ID to the a tuple containing the negotiator and its context """ return { k: v for k, v in self._negotiators.items() if v[0].nmi is not None and not v[0].nmi.state.started } @property def started_negotiators(self) -> dict[str, NegotiatorInfo]: """ Returns the negotiators whose negotiations started Returns a dictionary mapping negotiator ID to the a tuple containing the negotiator and its context """ return { k: v for k, v in self._negotiators.items() if v[0].nmi is not None and (v[0].nmi.state.started) } @property def active_negotiators(self) -> dict[str, NegotiatorInfo]: """ Returns the negotiators whose negotiations running or did not start yet. Returns a dictionary mapping negotiator ID to the a tuple containing the negotiator and its context """ return { k: v for k, v in self._negotiators.items() if v[0].nmi is not None and (v[0].nmi.state.running or not v[0].nmi.state.started) } @property def states(self) -> dict[str, TState]: """ Gets the current states of all negotiations as a mapping from negotiator ID to mechanism. """ return dict( zip( self._negotiators.keys(), (self._negotiators[k][0]._nmi.state for k in self._negotiators.keys()), ) )
[docs] def partner_negotiator_ids(self, negotiator_id: str) -> list[str] | None: """ Finds the negotiator ID negotiating with one of our negotiators. Args: negotiator_id: Our negotiator ID """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if not negotiator or not negotiator.nmi: return None return [_ for _ in negotiator.nmi.negotiator_ids if _ != negotiator_id]
[docs] def partner_negotiator_names(self, negotiator_id: str) -> list[str] | None: """ Finds the negotiator names negotiating with one of our negotiators. Args: negotiator_id: Our negotiator ID """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if not negotiator or not negotiator.nmi: return None return [_ for _ in negotiator.nmi.negotiator_names if _ != negotiator.name]
[docs] def partner_agent_ids(self, negotiator_id: str) -> list[str] | None: """ Finds the agent ID negotiating with one of our negotiators. Args: negotiator_id: Our negotiator ID """ negotiator, _ = self._negotiators.get(negotiator_id, (None, None)) if not negotiator or not negotiator.nmi: return None me = negotiator.owner.id if negotiator.owner else "" return [_ for _ in negotiator.nmi.agent_ids if _ and _ != me]
[docs] def partner_agent_names(self, negotiator_id: str) -> list[str] | None: """ Finds the negotiator names negotiating with one of our negotiators. Args: negotiator_id: Our negotiator ID """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if not negotiator or not negotiator.nmi: return None me = negotiator.owner.name if negotiator.owner else "" return [_ for _ in negotiator.nmi.agent_names if _ and _ != me]
[docs] def make_negotiator( self, negotiator_type: str | TControlledNegotiator | None = None, name: str | None = None, **kwargs, ) -> TControlledNegotiator: """ Creates a negotiator but does not add it to the controller. Call `add_negotiator` to add it. Args: negotiator_type: Type of the negotiator to be created. If None, A `ControlledNegotiator` negotiator will be controlled (which is **fully** controlled by the controller). name: negotiator name **kwargs: any key-value pairs to be passed to the negotiator constructor Returns: The negotiator to be controlled. None for failure """ if negotiator_type is None: negotiator_type = self.__default_negotiator_type elif isinstance(negotiator_type, str): negotiator_type = get_class(negotiator_type) # type: ignore if negotiator_type is None: raise ValueError( "No negotiator type is passed and no default negotiator type is defined for this " "controller" ) args = self.__default_negotiator_params if kwargs: args.update(kwargs) return negotiator_type(name=name, parent=self, **args) # type: ignore I already make sure it is a class in advance
[docs] def add_negotiator(self, negotiator: Negotiator, cntxt: Any = None) -> None: """ Adds a negotiator to the controller. Args: negotaitor: The negotaitor to add name: negotiator name cntxt: The context to be associated with this negotiator. **kwargs: any key-value pairs to be passed to the negotiator constructor """ if negotiator is not None: self._negotiators[negotiator.id] = NegotiatorInfo(negotiator, cntxt)
[docs] def create_negotiator( self, negotiator_type: str | TControlledNegotiator | None = None, name: str | None = None, cntxt: Any = None, **kwargs, ) -> TControlledNegotiator: """ Creates a negotiator passing it the context Args: negotiator_type: Type of the negotiator to be created name: negotiator name cntxt: The context to be associated with this negotiator. **kwargs: any key-value pairs to be passed to the negotiator constructor Returns: The negotiator to be controlled. None for failure """ new_negotiator = self.make_negotiator(negotiator_type, name, **kwargs) self.add_negotiator(new_negotiator, cntxt=cntxt) return new_negotiator
[docs] def call(self, negotiator: ControlledNegotiator, method: str, *args, **kwargs): """ Calls the given method on the given negotiator safely without causing recursion. The controller MUST use this function to access any callable on the negotiator. Args: negotiator: method: *args: **kwargs: Returns: """ negotiator._Negotiator__parent = None # type: ignore (little bad python magic) result = getattr(negotiator, method)(*args, **kwargs) negotiator._Negotiator__parent = self # type: ignore (little bad python magic) return result
[docs] def kill_negotiator(self, negotiator_id: str, force: bool = False) -> None: """ Kills the negotiator sending it an `before_death` message. Args: negotiator_id: The ID of the negotiator to kill. force: Whether to kill the negotiator in case it refused to die. Remarks: - Killing a negotiator amounts to nothing more than removing it form the list of negotiators maintained by the controller. """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return response = negotiator.before_death(cntxt=cntxt) if response or force: negotiator._Negotiator__parent = None self._negotiators.pop(negotiator_id, None)
[docs] def before_join( self, negotiator_id: str, nmi: TNMI, state: TState, *, preferences: Preferences | None = None, role: str = "negotiator", ) -> bool: """ Called by children negotiators to get permission to join negotiations Args: negotiator_id: The negotiator ID nmi (AgentMechanismInterface): The negotiation. state (TState): The current state of the negotiation preferences (UtilityFunction): The prefrences to use before any discounting. role (str): role of the agent. Returns: True if the negotiator is allowed to join the negotiation otherwise False """ return True
[docs] def after_join( self, negotiator_id: str, nmi: TNMI, state: TState, *, preferences: Preferences | None = None, role: str = "negotiator", ) -> None: """ Called by children negotiators after joining a negotiation to inform the controller Args: negotiator_id: The negotiator ID nmi (AgentMechanismInterface): The negotiation. state (TState): The current state of the negotiation preferences (UtilityFunction): The ufun function to use before any discounting. role (str): role of the agent. """
[docs] def join( self, negotiator_id: str, nmi: TNMI, state: TState, *, preferences: Preferences | None = None, ufun: BaseUtilityFunction | None = None, role: str = "negotiator", ) -> bool: """ Called by the mechanism when the agent is about to enter a negotiation. It can prevent the agent from entering Args: negotiator_id: The negotiator ID nmi (AgentMechanismInterface): The negotiation. state (TState): The current state of the negotiation preferences (Preferences): The preferences. ufun (BaseUtilityFunction): The ufun function to use before any discounting (overrides preferences) role (str): role of the agent. Returns: bool indicating whether or not the agent accepts to enter.If False is returned it will not enter the negotiation. """ if ufun is not None: preferences = ufun negotiator, _ = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: raise ValueError(f"Unknown negotiator {negotiator_id}") permission = self.before_join( negotiator, nmi, state, preferences=preferences, role=role ) if not permission: return False if hasattr(negotiator, "join") and self.call( negotiator, "join", nmi=nmi, state=state, preferences=preferences, role=role ): self.after_join( negotiator_id, nmi, state, preferences=preferences, role=role ) return True return False
# def _on_negotiation_start(self, negotiator_id: str, state: TState) -> None: # """ # A call back called at each negotiation start dirctly by the mechanism # # Args: # negotiator_id: The negotiator ID # state: `TState` giving current state of the negotiation. # # """ # negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) # if negotiator is None: # raise ValueError(f"Unknown negotiator {negotiator_id}") # return self.call(negotiator, "_on_negotiation_start", state=state)
[docs] def on_negotiation_start(self, negotiator_id: str, state: TState) -> None: """ A call back called at each negotiation start Args: negotiator_id: The negotiator ID state: `TState` giving current state of the negotiation. """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return return self.call(negotiator, "on_negotiation_start", state=state)
[docs] def on_round_start(self, negotiator_id: str, state: TState) -> None: """ A call back called at each negotiation round start Args: negotiator_id: The negotiator ID state: `TState` giving current state of the negotiation. """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return return self.call(negotiator, "on_round_start", state=state)
[docs] def on_round_end(self, negotiator_id: str, state: TState) -> None: """ A call back called at each negotiation round end Args: negotiator_id: The negotiator ID state: `TState` giving current state of the negotiation. """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return return self.call(negotiator, "on_round_end", state=state)
[docs] def on_leave(self, negotiator_id: str, state: TState) -> None: """ A call back called after leaving a negotiation. Args: negotiator_id: The negotiator ID state: `TState` giving current state of the negotiation. """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return return self.call(negotiator, "on_leave", state=state)
[docs] def on_negotiation_end(self, negotiator_id: str, state: TState) -> None: """ A call back called at each negotiation end Args: negotiator_id: The negotiator ID state: `TState` or one of its descendants giving the state at which the negotiation ended. """ negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return None result = self.call(negotiator, "on_negotiation_end", state=state) if self._auto_kill: self.kill_negotiator(negotiator_id=negotiator_id, force=True) return result
[docs] def on_mechanism_error(self, negotiator_id: str, state: TState) -> None: negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return return self.call(negotiator, "on_mechanism_error", state=state)
[docs] def on_notification( self, negotiator_id: str, notification: Notification, notifier: str ): negotiator, cntxt = self._negotiators.get(negotiator_id, (None, None)) if negotiator is None: return return self.call( negotiator, "on_notification", notification=notification, notifier=notifier )
def __str__(self): return f"{self.name}"