"""Implements a concurrent set of negotiations creating a chain of bilateral negotiations."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict, namedtuple
from dataclasses import dataclass
from ..common import NegotiatorMechanismInterface, MechanismAction, MechanismState
from ..mechanisms import Mechanism, MechanismStepResult
from ..negotiators import Negotiator
from ..outcomes import Outcome
from ..preferences import Preferences
from ..sao import ResponseType
__all__ = [
"ChainNegotiationsMechanism",
"ChainNegotiator",
"MultiChainNegotiationsMechanism",
"MultiChainNegotiator",
]
@dataclass
class Offer:
"""Defines an offer"""
outcome: Outcome
left: bool
temp: bool
partner: str | None = None
Agreement = namedtuple("Agreement", ["outcome", "negotiators", "level"])
"""Defines an agreement for a multi-channel mechanism"""
class ChainNMI(NegotiatorMechanismInterface):
def __init__(
self,
*args,
parent: ChainNegotiationsMechanism,
negotiator: ChainNegotiator,
level: int,
**kwargs,
):
super().__init__(*args, **kwargs)
self.__parent = parent
self.__negotiator = negotiator
self.__level = level
def confirm(self, parent: bool) -> bool:
return self.__parent.on_confirm(self.__level, parent)
[docs]
class ChainNegotiator(Negotiator, ABC):
"""Base class for all nested negotiations negotiator"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._nmi: ChainNMI
self.__level = -1
[docs]
def join(
self,
nmi: NegotiatorMechanismInterface,
state: MechanismState,
*,
preferences: Preferences | None = None,
role: str = "negotiator",
) -> bool:
to_join = super().join(nmi, state, preferences=preferences, role=role)
if to_join:
self.__level = int(role)
return to_join
[docs]
def confirm(self, left: bool) -> bool:
"""
Called to confirm a offer to either the left or the right
Args:
left: If true confirm the offer sent to the left (otherwise the right)
Returns:
"""
return self._nmi.confirm(left)
[docs]
@abstractmethod
def on_acceptance(self, state: MechanismState, offer: Offer) -> Offer:
"""
Called when one of the negotiator's proposals gets accepted
Args:
state: mechanism state
offer: The offer accepted
Returns:
A new offer (possibly to another negotiator)
"""
[docs]
@abstractmethod
def propose(self, state: MechanismState) -> Offer:
"""
Called to allow the agent to propose to either its left or its right in the chain
Args:
state: Mechanism state
Returns:
The offer
"""
[docs]
@abstractmethod
def respond(
self, state: MechanismState, outcome: Outcome, from_left: bool, temp: bool
) -> ResponseType:
"""
Called to respond to an offer
Args:
state: Mechanism state
outcome: The offer to respond to
from_left: Whether the offer is coming from the left
temp: Whether the offer is a temporary offer
Returns:
A response type which can only be ACCEPT_OFFER, REJECT_OFFER, or END_NEGOTIATION
"""
[docs]
class MultiChainNegotiator(Negotiator, ABC):
"""Base class for all nested negotiations negotiator"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__level = -1
[docs]
def join(
self,
nmi: NegotiatorMechanismInterface,
state: MechanismState,
*,
preferences: Preferences | None = None,
role: str = "negotiator",
) -> bool:
to_join = super().join(nmi, state, preferences=preferences, role=role)
if to_join:
self.__level = int(role)
return to_join
[docs]
def confirm(self, left: bool) -> bool:
"""
Called to confirm a offer to either the left or the right
Args:
left: If true confirm the offer sent to the left (otherwise the right)
Returns:
"""
return self.nmi.confirm(left) #
[docs]
@abstractmethod
def on_acceptance(self, state: MechanismState, offer: Offer) -> Offer:
"""
Called when one of the negotiator's proposals gets accepted
Args:
state: mechanism state
offer: The offer accepted
Returns:
A new offer (possibly to another negotiator)
"""
[docs]
@abstractmethod
def propose(self, state: MechanismState) -> Offer:
"""
Called to allow the agent to propose to either its left or its right in the chain
Args:
state: Mechanism state
Returns:
The offer
"""
[docs]
@abstractmethod
def respond(
self,
state: MechanismState,
outcome: Outcome,
from_left: bool,
temp: bool,
source: str,
) -> ResponseType:
"""
Called to respond to an offer
Args:
state: Mechanism state
outcome: The offer to respond to
from_left: Whether the offer is coming from the left
temp: Whether the offer is a temporary offer
source: The ID of the source agent
Returns:
A response type which can only be ACCEPT_OFFER, REJECT_OFFER, or END_NEGOTIATION
"""
[docs]
class ChainNegotiationsMechanism(
Mechanism[MechanismState, MechanismAction, ChainNMI, ChainNegotiator]
):
def __init__(self, initial_state: MechanismState | None = None, *args, **kwargs):
super().__init__(
initial_state if initial_state else MechanismState() * args, **kwargs
)
self.__chain: list[ChainNegotiator | None] = []
self.__next_agent = 0
self.__last_proposal: Offer | None = None
self.__last_proposer_index: int = -1
self.__agreements: dict[int, Outcome | None] = defaultdict(lambda: None)
self.__temp_agreements: dict[int, Outcome | None] = defaultdict(lambda: None)
def _get_ami(
self, negotiator: Negotiator, role: str
) -> NegotiatorMechanismInterface:
"""
Returns a chain AMI instead of the standard AMI.
Args:
negotiator: The negotiator to create the AMI for
role: Its role in the negotiation (an integer >= -1 as a string)
Returns:
"""
return ChainNMI(
id=self.id,
n_outcomes=self.nmi.n_outcomes,
issues=self.nmi.outcome_space,
outcomes=self.nmi.outcomes,
time_limit=self.nmi.time_limit,
step_time_limit=self.nmi.step_time_limit,
n_steps=self.nmi.n_steps,
dynamic_entry=self.nmi.dynamic_entry,
max_n_agents=self.nmi.max_n_agents,
annotation=self.nmi.annotation,
parent=self,
negotiator=negotiator, #
level=int(role) + 1,
)
[docs]
def add(
self,
negotiator: Negotiator,
*,
preferences: Preferences | None = None,
role: str | None = None,
**kwargs,
) -> bool | None:
if role is None:
raise ValueError(
"You cannot join this protocol without specifying the role. "
"Possible roles are integers >= -1 "
)
added = super().add(negotiator, preferences=preferences, role=role, **kwargs)
if not added:
return added
level = int(role) + 1
if len(self.__chain) > level:
if self.__chain[level] is not None:
raise ValueError(f"A negotiator already exist in the role {role}")
self.__chain[level] = negotiator
return
self.__chain += [None] * (level - len(self.__chain) + 1)
self.__chain[level] = negotiator
def _update_next(self) -> None:
if self.__last_proposal.left:
self.__next_agent = (self.__last_proposer_index - 1) % len(self.__chain)
else:
self.__next_agent = (self.__last_proposer_index + 1) % len(self.__chain)
[docs]
def __call__(self, state, action=None) -> MechanismStepResult:
# check that the chain is complete
if not all(self.__chain):
if self.dynamic_entry:
state.has_error = True
state.error_details = "The chain is not complete"
return MechanismStepResult(state=state)
raise ValueError(
"The chain is not complete and dynamic entry is not allowed"
)
# find the next negotiator to ask
negotiator = self.__chain[self.__next_agent]
# if this is the first proposal, get it from the left most agent and ask the next one to respond
if self.__last_proposal is None:
self.__last_proposal = negotiator.propose(self.state)
self._update_next()
assert self.__next_agent == 1
return MechanismStepResult(state)
# if all agreements are finalized end the mechanism session with success
agreements = [
self.__agreements[L]
for L in range(len(self.__chain))
if self.__agreements[L] is not None
]
if len(agreements) == len(self.__chain) - 1:
state.agreement = agreements
return MechanismStepResult(state)
response = negotiator.respond(
self.state,
self.__last_proposal.outcome,
not self.__last_proposal.left,
self.__last_proposal.temp,
)
# handle unacceptable responses
if response not in (
ResponseType.ACCEPT_OFFER,
ResponseType.REJECT_OFFER,
ResponseType.END_NEGOTIATION,
):
state.has_error = True
state.error_details = "An unacceptable response was returned"
return MechanismStepResult(state)
# If the response is to end the negotiation, end it but only if there are not partial negotiations
if response == ResponseType.END_NEGOTIATION:
if len(self.__agreements) > 0:
state.has_error = True
state.error_details = (
"Cannot end a negotiation chain with some agreements"
)
return MechanismStepResult(state)
state.broken = True
return MechanismStepResult(state)
# if the response is an acceptance then either register an agreement or a temporary agreement depending on
# proposal
if response == ResponseType.ACCEPT_OFFER:
agreement_index = (
self.__next_agent
if self.__last_proposal.left
else self.__next_agent - 1
)
if not self.__last_proposal.temp:
assert agreement_index >= 0
self.__agreements[agreement_index] = self.__last_proposal.outcome
else:
assert self.__temp_agreements[agreement_index] is None
self.__temp_agreements[agreement_index] = self.__last_proposal.outcome
self.__last_proposal = self.__chain[
self.__last_proposer_index
].on_acceptance(self.state, self.__last_proposal)
self.__last_proposer_index = self.__next_agent
self._update_next()
return MechanismStepResult(state)
# now it must be a rejection, ask the one who rejected to propose (in either direction)
self.__last_proposal = self.__chain[self.__next_agent].propose(self.state)
self.__last_proposer_index = self.__next_agent
self._update_next()
return MechanismStepResult()
[docs]
def on_confirm(self, level: int, left: bool) -> bool:
"""
Called by negotiators to confirm their temporary accepted agreements
Args:
level: The caller level
left: Whether to confirm its left or right temporary accepted agreement
"""
if left:
level = (level - 1) % len(self.__chain)
else:
level = (level + 1) % len(self.__chain)
if self.__temp_agreements[level] is None:
raise ValueError(f"No temporary agreement exists at level {level}")
if self.__agreements[level] is not None:
raise ValueError(f"An agreement already exists at level {level}")
self.__agreements[level] = self.__temp_agreements[level]
return True
[docs]
class MultiChainNegotiationsMechanism(
Mechanism[ChainNMI, MechanismState, MechanismAction, MultiChainNegotiator]
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__chain: list[list[MultiChainNegotiator]] = []
self.__next_agent_level = 0
self.__next_agent_number = 0
self.__last_proposal: Offer = None
self.__last_proposer_level: int = -1
self.__last_proposer_number: int = -1
self.__agreements: dict[int, Agreement] = defaultdict(lambda: None)
self.__temp_agreements: dict[int, Agreement] = defaultdict(lambda: None)
self.__level: dict[str, int] = {}
self.__number: dict[str, int] = {}
def _get_ami(
self, negotiator: Negotiator, role: str
) -> NegotiatorMechanismInterface:
"""
Returns a chain AMI instead of the standard AMI.
Args:
negotiator: The negotiator to create the AMI for
role: Its role in the negotiation (an integer >= -1 as a string)
Returns:
"""
return ChainNMI(
id=self.id,
n_outcomes=self.nmi.n_outcomes,
issues=self.nmi.outcome_space,
outcomes=self.nmi.outcomes,
time_limit=self.nmi.time_limit,
step_time_limit=self.nmi.step_time_limit,
n_steps=self.nmi.n_steps,
dynamic_entry=self.nmi.dynamic_entry,
max_n_agents=self.nmi.max_n_negotiators,
annotation=self.nmi.annotation,
parent=self,
negotiator=negotiator,
level=int(role) + 1,
)
[docs]
def add(
self,
negotiator: Negotiator,
*,
preferences: Preferences | None = None,
role: str | None = None,
**kwargs,
) -> bool | None:
if role is None:
raise ValueError(
"You cannot join this protocol without specifying the role. "
"Possible roles are integers >= -1 "
)
added = super().add(negotiator, preferences=preferences, role=role, **kwargs)
if not added:
return added
level = int(role) + 1
if len(self.__chain) > level:
if self.__chain[level] is not None:
raise ValueError(f"A negotiator already exist in the role {role}")
self.__chain[level] = negotiator
return
self.__chain += [list() for _ in range(level - len(self.__chain) + 1)]
self.__chain[level].append(negotiator)
self.__level[negotiator.id] = level
self.__number[negotiator.id] = len(self.__chain[level]) - 1
def _update_next(self) -> None:
if self.__last_proposal.left:
self.__next_agent_level = (self.__last_proposer_level - 1) % len(
self.__chain
)
else:
self.__next_agent_level = (self.__last_proposer_level + 1) % len(
self.__chain
)
self.__next_agent_number = self.__number[self.__last_proposal.partner]
[docs]
def __call__(self, state, action=None) -> MechanismStepResult:
# check that the chain is complete
if not all(len(_) > 0 for _ in self.__chain):
if self.dynamic_entry:
state.has_error = True
state.error_details = "The chain is not complete"
return MechanismStepResult(state)
raise ValueError(
"The chain is not complete and dynamic entry is not allowed"
)
# find the next negotiator to ask
negotiator = self.__chain[self.__next_agent_level][self.__next_agent_number]
# if this is the first proposal, get it from the left most agent and ask the next one to respond
if self.__last_proposal is None:
self.__last_proposal = negotiator.propose(self.state)
self._update_next()
assert self.__next_agent_level == 1
return MechanismStepResult(state)
# if all agreements are finalized end the mechanism session with success
agreements = [
self.__agreements[L]
for L in range(len(self.__chain))
if self.__agreements[L] is not None
]
if len(agreements) == len(self.__chain) - 1:
state.agreement = agreements
return MechanismStepResult(state)
response = negotiator.respond(
self.state,
self.__last_proposal.outcome,
not self.__last_proposal.left,
self.__last_proposal.temp,
source=self.__chain[self.__next_agent_level][self.__next_agent_number].id,
)
# handle unacceptable responses
if response not in (
ResponseType.ACCEPT_OFFER,
ResponseType.REJECT_OFFER,
ResponseType.END_NEGOTIATION,
):
state.has_error = True
state.erred_negotiator = negotiator.id
state.erred_agent = (
negotiator.owner.id if negotiator.owner is not None else ""
)
return MechanismStepResult(state)
# If the response is to end the negotiation, end it but only if there are not partial negotiations
if response == ResponseType.END_NEGOTIATION:
if len(self.__agreements) > 0:
state.has_error = True
state.erred_negotiator = negotiator.id
state.erred_agent = (
negotiator.owner.id if negotiator.owner is not None else ""
)
stae.error_details = (
"Cannot end a negotiation chain with some agreements",
)
return MechanismStepResult(stae)
state.broken = True
return MechanismStepResult(state)
# if the response is an acceptance then either register an agreement or a temporary agreement depending on
# proposal
if response == ResponseType.ACCEPT_OFFER:
agreement_index = (
self.__next_agent_level
if self.__last_proposal.left
else self.__next_agent_level - 1
)
if not self.__last_proposal.temp:
assert agreement_index >= 0
self.__agreements[agreement_index] = self.__last_proposal.outcome
else:
assert self.__temp_agreements[agreement_index] is None
self.__temp_agreements[agreement_index] = self.__last_proposal.outcome
self.__last_proposal = self.__chain[self.__last_proposer_level][
self.__last_proposer_number
].on_acceptance(self.state, self.__last_proposal)
self.__last_proposer_level = self.__next_agent_level
self.__last_proposer_number = self.__next_agent_number
self._update_next()
return MechanismStepResult(state)
# now it must be a rejection, ask the one who rejected to propose (in either direction)
self.__last_proposal = self.__chain[self.__next_agent_level][
self.__next_agent_number
].propose(self.state)
self.__last_proposer_level = self.__next_agent_level
self.__last_proposer_number = self.__next_agent_number
self._update_next()
return MechanismStepResult(state)
[docs]
def on_confirm(self, level: int, left: bool) -> None:
"""
Called by negotiators to confirm their temporary accepted agreements
Args:
level: The caller level
left: Whether to confirm its left or right temporary accepted agreement
"""
if left:
level = (level - 1) % len(self.__chain)
else:
level = (level + 1) % len(self.__chain)
if self.__temp_agreements[level] is None:
raise ValueError(f"No temporary agreement exists at level {level}")
if self.__agreements[level] is not None:
raise ValueError(f"An agreement already exists at level {level}")
self.__agreements[level] = self.__temp_agreements[level]