"""Implements a concurrent set of negotiations creating a chain of bilateral negotiations."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict, namedtuple
from dataclasses import dataclass
from negmas.preferences.base_ufun import BaseUtilityFunction
from ..common import NegotiatorMechanismInterface, MechanismAction, MechanismState
from ..mechanisms import Mechanism, MechanismStepResult
from ..negotiators import Negotiator
from ..outcomes import Outcome
from ..preferences import Preferences
from ..sao import ResponseType
__all__ = [
"ChainNegotiationsMechanism",
"ChainNegotiator",
"MultiChainNegotiationsMechanism",
"MultiChainNegotiator",
]
@dataclass
class Offer:
"""Defines an offer"""
outcome: Outcome
left: bool
temp: bool
partner: str | None = None
Agreement = namedtuple("Agreement", ["outcome", "negotiators", "level"])
"""Defines an agreement for a multi-channel mechanism"""
class ChainNMI(NegotiatorMechanismInterface):
def __init__(
self,
*args,
parent: ChainNegotiationsMechanism,
negotiator: ChainNegotiator,
level: int,
**kwargs,
):
super().__init__(*args, **kwargs)
self.__parent = parent
self.__negotiator = negotiator
self.__level = level
def confirm(self, parent: bool) -> bool:
return self.__parent.on_confirm(self.__level, parent)
[docs]
class ChainNegotiator(Negotiator, ABC):
"""Base class for all nested negotiations negotiator"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._nmi: ChainNMI
self.__level = -1
[docs]
def join(
self,
nmi: NegotiatorMechanismInterface,
state: MechanismState,
*,
preferences: Preferences | None = None,
ufun: BaseUtilityFunction | None = None,
role: str = "negotiator",
) -> bool:
to_join = super().join(nmi, state, preferences=preferences, role=role)
if to_join:
self.__level = int(role)
return to_join
[docs]
def confirm(self, left: bool) -> bool:
"""
Called to confirm a offer to either the left or the right
Args:
left: If true confirm the offer sent to the left (otherwise the right)
Returns:
"""
return self._nmi.confirm(left)
[docs]
@abstractmethod
def on_acceptance(self, state: MechanismState, offer: Offer) -> Offer:
"""
Called when one of the negotiator's proposals gets accepted
Args:
state: mechanism state
offer: The offer accepted
Returns:
A new offer (possibly to another negotiator)
"""
[docs]
@abstractmethod
def propose(self, state: MechanismState, dest: str | None = None) -> Offer:
"""
Called to allow the agent to propose to either its left or its right in the chain
Args:
state: Mechanism state
Returns:
The offer
"""
[docs]
@abstractmethod
def respond(
self, state: MechanismState, outcome: Outcome, from_left: bool, temp: bool
) -> ResponseType:
"""
Called to respond to an offer
Args:
state: Mechanism state
outcome: The offer to respond to
from_left: Whether the offer is coming from the left
temp: Whether the offer is a temporary offer
Returns:
A response type which can only be ACCEPT_OFFER, REJECT_OFFER, or END_NEGOTIATION
"""
[docs]
class MultiChainNegotiator(Negotiator, ABC):
"""Base class for all nested negotiations negotiator"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__level = -1
[docs]
def join(
self,
nmi: NegotiatorMechanismInterface,
state: MechanismState,
*,
preferences: Preferences | None = None,
ufun: BaseUtilityFunction | None = None,
role: str = "negotiator",
) -> bool:
to_join = super().join(nmi, state, preferences=preferences, role=role)
if to_join:
self.__level = int(role)
return to_join
[docs]
def confirm(self, left: bool) -> bool:
"""
Called to confirm a offer to either the left or the right
Args:
left: If true confirm the offer sent to the left (otherwise the right)
Returns:
"""
return self.nmi.confirm(left) #
[docs]
@abstractmethod
def on_acceptance(self, state: MechanismState, offer: Offer) -> Offer:
"""
Called when one of the negotiator's proposals gets accepted
Args:
state: mechanism state
offer: The offer accepted
Returns:
A new offer (possibly to another negotiator)
"""
[docs]
@abstractmethod
def propose(self, state: MechanismState, dest: str | None = None) -> Offer:
"""
Called to allow the agent to propose to either its left or its right in the chain
Args:
state: Mechanism state
Returns:
The offer
"""
[docs]
@abstractmethod
def respond(
self,
state: MechanismState,
outcome: Outcome,
from_left: bool,
temp: bool,
source: str,
) -> ResponseType:
"""
Called to respond to an offer
Args:
state: Mechanism state
outcome: The offer to respond to
from_left: Whether the offer is coming from the left
temp: Whether the offer is a temporary offer
source: The ID of the source agent
Returns:
A response type which can only be ACCEPT_OFFER, REJECT_OFFER, or END_NEGOTIATION
"""
[docs]
class ChainNegotiationsMechanism(
Mechanism[ChainNMI, MechanismState, MechanismAction, ChainNegotiator]
):
def __init__(self, initial_state: MechanismState | None = None, *args, **kwargs):
super().__init__(
initial_state if initial_state else MechanismState(), *args, **kwargs
)
self.__chain: list[ChainNegotiator | None] = []
self.__next_agent = 0
self.__last_proposal: Offer | None = None
self.__last_proposer_index: int = -1
self.__agreements: dict[int, Outcome | None] = defaultdict(lambda: None)
self.__temp_agreements: dict[int, Outcome | None] = defaultdict(lambda: None)
def _get_ami(
self, negotiator: ChainNegotiator, role: str
) -> NegotiatorMechanismInterface:
"""
Returns a chain AMI instead of the standard AMI.
Args:
negotiator: The negotiator to create the AMI for
role: Its role in the negotiation (an integer >= -1 as a string)
Returns:
"""
return ChainNMI(
id=self.id,
n_outcomes=self.nmi.n_outcomes,
issues=self.nmi.outcome_space,
outcomes=self.nmi.outcomes,
time_limit=self.nmi.time_limit,
step_time_limit=self.nmi.step_time_limit,
n_steps=self.nmi.n_steps,
dynamic_entry=self.nmi.dynamic_entry,
max_n_agents=self.nmi.max_n_agents,
annotation=self.nmi.annotation,
parent=self,
negotiator=negotiator, #
level=int(role) + 1,
)
[docs]
def add(
self,
negotiator: Negotiator,
*,
preferences: Preferences | None = None,
role: str | None = None,
**kwargs,
) -> bool | None:
if role is None:
raise ValueError(
"You cannot join this protocol without specifying the role. "
"Possible roles are integers >= -1 "
)
added = super().add(negotiator, preferences=preferences, role=role, **kwargs)
if not added:
return added
level = int(role) + 1
if len(self.__chain) > level:
if self.__chain[level] is not None:
raise ValueError(f"A negotiator already exist in the role {role}")
self.__chain[level] = negotiator
return
self.__chain += [None] * (level - len(self.__chain) + 1)
self.__chain[level] = negotiator
def _update_next(self) -> None:
if self.__last_proposal.left:
self.__next_agent = (self.__last_proposer_index - 1) % len(self.__chain)
else:
self.__next_agent = (self.__last_proposer_index + 1) % len(self.__chain)
[docs]
def __call__(self, state, action=None) -> MechanismStepResult:
# check that the chain is complete
if not all(self.__chain):
if self.dynamic_entry:
state.has_error = True
state.error_details = "The chain is not complete"
return MechanismStepResult(state=state)
raise ValueError(
"The chain is not complete and dynamic entry is not allowed"
)
# find the next negotiator to ask
negotiator = self.__chain[self.__next_agent]
# if this is the first proposal, get it from the left most agent and ask the next one to respond
if self.__last_proposal is None:
self.__last_proposal = negotiator.propose(self.state)
self._update_next()
assert self.__next_agent == 1
return MechanismStepResult(state)
# if all agreements are finalized end the mechanism session with success
agreements = [
self.__agreements[L]
for L in range(len(self.__chain))
if self.__agreements[L] is not None
]
if len(agreements) == len(self.__chain) - 1:
state.agreement = agreements
return MechanismStepResult(state)
response = negotiator.respond(
self.state,
self.__last_proposal.outcome,
not self.__last_proposal.left,
self.__last_proposal.temp,
)
# handle unacceptable responses
if response not in (
ResponseType.ACCEPT_OFFER,
ResponseType.REJECT_OFFER,
ResponseType.END_NEGOTIATION,
):
state.has_error = True
state.error_details = "An unacceptable response was returned"
return MechanismStepResult(state)
# If the response is to end the negotiation, end it but only if there are not partial negotiations
if response == ResponseType.END_NEGOTIATION:
if len(self.__agreements) > 0:
state.has_error = True
state.error_details = (
"Cannot end a negotiation chain with some agreements"
)
return MechanismStepResult(state)
state.broken = True
return MechanismStepResult(state)
# if the response is an acceptance then either register an agreement or a temporary agreement depending on
# proposal
if response == ResponseType.ACCEPT_OFFER:
agreement_index = (
self.__next_agent
if self.__last_proposal.left
else self.__next_agent - 1
)
if not self.__last_proposal.temp:
assert agreement_index >= 0
self.__agreements[agreement_index] = self.__last_proposal.outcome
else:
assert self.__temp_agreements[agreement_index] is None
self.__temp_agreements[agreement_index] = self.__last_proposal.outcome
self.__last_proposal = self.__chain[
self.__last_proposer_index
].on_acceptance(self.state, self.__last_proposal)
self.__last_proposer_index = self.__next_agent
self._update_next()
return MechanismStepResult(state)
# now it must be a rejection, ask the one who rejected to propose (in either direction)
self.__last_proposal = self.__chain[self.__next_agent].propose(self.state)
self.__last_proposer_index = self.__next_agent
self._update_next()
return MechanismStepResult()
[docs]
def on_confirm(self, level: int, left: bool) -> bool:
"""
Called by negotiators to confirm their temporary accepted agreements
Args:
level: The caller level
left: Whether to confirm its left or right temporary accepted agreement
"""
if left:
level = (level - 1) % len(self.__chain)
else:
level = (level + 1) % len(self.__chain)
if self.__temp_agreements[level] is None:
raise ValueError(f"No temporary agreement exists at level {level}")
if self.__agreements[level] is not None:
raise ValueError(f"An agreement already exists at level {level}")
self.__agreements[level] = self.__temp_agreements[level]
return True
[docs]
class MultiChainNegotiationsMechanism(
Mechanism[ChainNMI, MechanismState, MechanismAction, MultiChainNegotiator]
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__chain: list[list[MultiChainNegotiator]] = []
self.__next_agent_level = 0
self.__next_agent_number = 0
self.__last_proposal: Offer = None
self.__last_proposer_level: int = -1
self.__last_proposer_number: int = -1
self.__agreements: dict[int, Agreement] = defaultdict(lambda: None)
self.__temp_agreements: dict[int, Agreement] = defaultdict(lambda: None)
self.__level: dict[str, int] = {}
self.__number: dict[str, int] = {}
def _get_ami(
self, negotiator: Negotiator, role: str
) -> NegotiatorMechanismInterface:
"""
Returns a chain AMI instead of the standard AMI.
Args:
negotiator: The negotiator to create the AMI for
role: Its role in the negotiation (an integer >= -1 as a string)
Returns:
"""
return ChainNMI(
id=self.id,
n_outcomes=self.nmi.n_outcomes,
issues=self.nmi.outcome_space,
outcomes=self.nmi.outcomes,
time_limit=self.nmi.time_limit,
step_time_limit=self.nmi.step_time_limit,
n_steps=self.nmi.n_steps,
dynamic_entry=self.nmi.dynamic_entry,
max_n_agents=self.nmi.max_n_negotiators,
annotation=self.nmi.annotation,
parent=self,
negotiator=negotiator,
level=int(role) + 1,
)
[docs]
def add(
self,
negotiator: Negotiator,
*,
preferences: Preferences | None = None,
role: str | None = None,
**kwargs,
) -> bool | None:
if role is None:
raise ValueError(
"You cannot join this protocol without specifying the role. "
"Possible roles are integers >= -1 "
)
added = super().add(negotiator, preferences=preferences, role=role, **kwargs)
if not added:
return added
level = int(role) + 1
if len(self.__chain) > level:
if self.__chain[level] is not None:
raise ValueError(f"A negotiator already exist in the role {role}")
self.__chain[level] = negotiator
return
self.__chain += [list() for _ in range(level - len(self.__chain) + 1)]
self.__chain[level].append(negotiator)
self.__level[negotiator.id] = level
self.__number[negotiator.id] = len(self.__chain[level]) - 1
def _update_next(self) -> None:
if self.__last_proposal.left:
self.__next_agent_level = (self.__last_proposer_level - 1) % len(
self.__chain
)
else:
self.__next_agent_level = (self.__last_proposer_level + 1) % len(
self.__chain
)
self.__next_agent_number = self.__number[self.__last_proposal.partner]
[docs]
def __call__(self, state, action=None) -> MechanismStepResult:
# check that the chain is complete
if not all(len(_) > 0 for _ in self.__chain):
if self.dynamic_entry:
state.has_error = True
state.error_details = "The chain is not complete"
return MechanismStepResult(state)
raise ValueError(
"The chain is not complete and dynamic entry is not allowed"
)
# find the next negotiator to ask
negotiator = self.__chain[self.__next_agent_level][self.__next_agent_number]
# if this is the first proposal, get it from the left most agent and ask the next one to respond
if self.__last_proposal is None:
self.__last_proposal = negotiator.propose(self.state)
self._update_next()
assert self.__next_agent_level == 1
return MechanismStepResult(state)
# if all agreements are finalized end the mechanism session with success
agreements = [
self.__agreements[L]
for L in range(len(self.__chain))
if self.__agreements[L] is not None
]
if len(agreements) == len(self.__chain) - 1:
state.agreement = agreements
return MechanismStepResult(state)
response = negotiator.respond(
self.state,
self.__last_proposal.outcome,
not self.__last_proposal.left,
self.__last_proposal.temp,
source=self.__chain[self.__next_agent_level][self.__next_agent_number].id,
)
# handle unacceptable responses
if response not in (
ResponseType.ACCEPT_OFFER,
ResponseType.REJECT_OFFER,
ResponseType.END_NEGOTIATION,
):
state.has_error = True
state.erred_negotiator = negotiator.id
state.erred_agent = (
negotiator.owner.id if negotiator.owner is not None else ""
)
return MechanismStepResult(state)
# If the response is to end the negotiation, end it but only if there are not partial negotiations
if response == ResponseType.END_NEGOTIATION:
if len(self.__agreements) > 0:
state.has_error = True
state.erred_negotiator = negotiator.id
state.erred_agent = (
negotiator.owner.id if negotiator.owner is not None else ""
)
stae.error_details = (
"Cannot end a negotiation chain with some agreements",
)
return MechanismStepResult(stae)
state.broken = True
return MechanismStepResult(state)
# if the response is an acceptance then either register an agreement or a temporary agreement depending on
# proposal
if response == ResponseType.ACCEPT_OFFER:
agreement_index = (
self.__next_agent_level
if self.__last_proposal.left
else self.__next_agent_level - 1
)
if not self.__last_proposal.temp:
assert agreement_index >= 0
self.__agreements[agreement_index] = self.__last_proposal.outcome
else:
assert self.__temp_agreements[agreement_index] is None
self.__temp_agreements[agreement_index] = self.__last_proposal.outcome
self.__last_proposal = self.__chain[self.__last_proposer_level][
self.__last_proposer_number
].on_acceptance(self.state, self.__last_proposal)
self.__last_proposer_level = self.__next_agent_level
self.__last_proposer_number = self.__next_agent_number
self._update_next()
return MechanismStepResult(state)
# now it must be a rejection, ask the one who rejected to propose (in either direction)
self.__last_proposal = self.__chain[self.__next_agent_level][
self.__next_agent_number
].propose(self.state)
self.__last_proposer_level = self.__next_agent_level
self.__last_proposer_number = self.__next_agent_number
self._update_next()
return MechanismStepResult(state)
[docs]
def on_confirm(self, level: int, left: bool) -> None:
"""
Called by negotiators to confirm their temporary accepted agreements
Args:
level: The caller level
left: Whether to confirm its left or right temporary accepted agreement
"""
if left:
level = (level - 1) % len(self.__chain)
else:
level = (level + 1) % len(self.__chain)
if self.__temp_agreements[level] is None:
raise ValueError(f"No temporary agreement exists at level {level}")
if self.__agreements[level] is not None:
raise ValueError(f"An agreement already exists at level {level}")
self.__agreements[level] = self.__temp_agreements[level]