"""
Defines a world for running negotiations directly
"""
from __future__ import annotations
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Callable, Collection, Iterable
from negmas import MechanismState, NegotiatorMechanismInterface
from negmas.helpers import get_class, get_full_type_name, instantiate
from negmas.negotiators import Negotiator
from negmas.outcomes import Issue
from negmas.preferences import Preferences
from negmas.sao import SAONegotiator
from negmas.serialization import (
deserialize,
serialize,
to_flat_dict,
PYTHON_CLASS_IDENTIFIER,
)
from negmas.situated import (
Action,
Agent,
AgentWorldInterface,
Breach,
Contract,
NoContractExecutionMixin,
RenegotiationRequest,
World,
)
__all__ = ["NegWorld", "NegAgent", "Condition"]
[docs]
class NegAgent(Agent):
"""Wraps a negotiator for evaluaton"""
def __init__(
self,
*args,
negotiator_type: str | type[Negotiator],
negotiator_params: dict[str, Any] | None = None,
**kwargs,
):
super().__init__(*args, **kwargs)
self._negotiator_params = negotiator_params if negotiator_params else dict()
self._negotiator_type = get_class(negotiator_type)
@property
def short_type_name(self):
"""Returns a short name of the type of this entity"""
return self._negotiator_type.__name__.replace("Negotiator", "").replace(
"Agent", ""
)
@property
def type_name(self):
"""Returns a short name of the type of this entity"""
return get_full_type_name(self._negotiator_type)
@classmethod
def _type_name(cls):
return cls.__module__ + "." + cls.__name__
[docs]
def make_negotiator(self, preferences: Preferences | None = None):
"""Makes a negotiator of the appropriate type passing it an optional ufun"""
return instantiate(
self._negotiator_type, preferences=preferences, **self._negotiator_params
)
def _respond_to_negotiation_request(
self,
initiator: str,
partners: list[str],
issues: list[Issue],
annotation: dict[str, Any],
mechanism: NegotiatorMechanismInterface,
role: str | None,
req_id: str | None,
) -> Negotiator | None:
"""Responds to any negotiation request by creating a negotiator"""
return self.make_negotiator(self.awi.get_preferences(partners.index(self.id)))
[docs]
def step(self):
"""Called by the simulator at every simulation step"""
[docs]
def init(self):
"""Called to initialize the agent **after** the world is initialized. the AWI is accessible at this point."""
[docs]
def on_neg_request_rejected(self, req_id: str, by: list[str] | None):
"""Called when a requested negotiation is rejected"""
[docs]
def on_neg_request_accepted(
self, req_id: str, mechanism: NegotiatorMechanismInterface
):
"""Called when a requested negotiation is accepted"""
[docs]
def on_negotiation_failure(
self,
partners: list[str],
annotation: dict[str, Any],
mechanism: NegotiatorMechanismInterface,
state: MechanismState,
) -> None:
"""Called whenever a negotiation ends without agreement"""
[docs]
def on_negotiation_success(
self, contract: Contract, mechanism: NegotiatorMechanismInterface
) -> None:
"""Called whenever a negotiation ends with agreement"""
[docs]
def on_contract_signed(self, contract: Contract) -> None:
"""Called whenever a contract is signed by all partners"""
[docs]
def on_contract_cancelled(self, contract: Contract, rejectors: list[str]) -> None:
"""Called whenever at least a partner did not sign the contract"""
[docs]
def respond_to_renegotiation_request(
self, contract: Contract, breaches: list[Breach], agenda: RenegotiationRequest
) -> Negotiator | None:
"""Called to respond to a renegotiation request"""
[docs]
def on_contract_executed(self, contract: Contract) -> None:
"""
Called after successful contract execution for which the agent is one of the partners.
"""
[docs]
def on_contract_breached(
self, contract: Contract, breaches: list[Breach], resolution: Contract | None
) -> None:
"""
Called after complete processing of a contract that involved a breach.
"""
[docs]
def set_renegotiation_agenda(
self, contract: Contract, breaches: list[Breach]
) -> RenegotiationRequest | None:
"""
Received by partners in ascending order of their total breach levels in order to set the
renegotiation agenda when contract execution fails
"""
def _unwrap_negotiators(types, params):
"""
Removes the agent wrapping the negotiator for each type
"""
types = list(types)
old_types = [_ if issubclass(_, NegAgent) else NegAgent for _ in types]
n = len(types)
if params is None:
params = [dict() for _ in range(n)]
params = [dict() if _ is None else _ for _ in params]
for i, (t, p) in enumerate(zip(types, params)):
t = types[i] = get_class(t)
if issubclass(t, Negotiator):
continue
params[i] = p.get("negotiator_params", dict())
types[i] = p["negotiator_type"]
return types, params, old_types
def _wrap_in_agents(types, params, agent_types):
"""
wraps each negotiator in `types` with an agent from `agent_types`
"""
types = list(types)
n = len(types)
if params is None:
params = [dict() for _ in range(n)]
if isinstance(agent_types, str):
agent_types = get_class(agent_types)
if not isinstance(agent_types, Iterable):
agent_types = [agent_types] * n
params = [dict() if _ is None else _ for _ in params]
for i, (t, p, w) in enumerate(zip(types, params, agent_types)):
t = types[i] = get_class(t)
if issubclass(t, NegAgent):
continue
name = p.get("name", None)
params[i] = dict(
negotiator_type=t, negotiator_params={k: v for k, v in p.items()}
)
params[i]["name"] = name
types[i] = w
return types, params
[docs]
@dataclass
class Condition:
"""
A representation of a negotiation scenario in which a negotiator can be evaluated
"""
name: str
"""Scenario name"""
issues: tuple[Issue, ...]
"""The issue space as a list of issues"""
ufuns: tuple[Preferences, ...]
"""The utility functions used by all negotiators in the scenario"""
partner_types: tuple[str | type[Negotiator] | type[SAONegotiator], ...]
"""The types of all partners (other than the agent being evaluated). Its length must be one less than `ufuns`"""
index: int = 0
"""The index of the negotiator being evaluated in the list of negotiators passed to the mechanism"""
partner_params: tuple[dict[str, Any] | None, ...] | None = None
"""Any parameters used to construct partners (must be the same length as `partner_types`)"""
roles: tuple[str, ...] | None = None
"""Roles of all negotiators (including the negotiator being evaluated) in order"""
annotation: dict[str, Any] | None = None
"""Any extra annotation to add to the mechanism."""
scored_indices: tuple[int, ...] | None = None
"""Indices of negotiators to be scored in this negotiation. `None` is equivalent to `[self.index]`"""
[docs]
def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
return dict(
name=self.name,
issues=[
i.to_dict(python_class_identifier=python_class_identifier)
for i in self.issues
],
ufuns=[
serialize(u, python_class_identifier=python_class_identifier)
for u in self.ufuns
],
partner_types=[get_full_type_name(_) for _ in self.partner_types], # type: ignore
index=self.index,
partner_params=serialize(
self.partner_params, python_class_identifier=python_class_identifier
),
roles=self.roles,
annotation=serialize(
self.annotation, python_class_identifier=python_class_identifier
),
scored_indices=self.scored_indices,
)
[docs]
@classmethod
def from_dict(cls, d, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
return cls(
name=d["name"],
issues=tuple(Issue.from_dict(_) for _ in d["issues"]),
ufuns=tuple(
deserialize(u, python_class_identifier=python_class_identifier)
for u in d["ufuns"]
), # type: ignore The type should be correct in the dict
partner_types=tuple(get_class(_) for _ in d["partner_types"]),
index=d["index"],
partner_params=deserialize(
d["partner_params"], python_class_identifier=python_class_identifier
), # type: ignore The type should be correct in the dict
roles=d["roles"],
annotation=deserialize(
d["annotation"], python_class_identifier=python_class_identifier
), # type: ignore The type should be correct in the dict
scored_indices=d["scored_indices"],
)
class _NegPartner(NegAgent):
"""A `NegAgent` representing a partner that is not being evaluated"""
class _NegAWI(AgentWorldInterface):
"""The AWI for the `NegWorld`"""
def get_preferences(self, uid: int):
"""Get the agent's ufun"""
return self._world._scenario.ufuns[uid]
[docs]
class NegWorld(NoContractExecutionMixin, World):
"""
A world that runs a list of negotiators in a given scenario to evaluate them
Args:
scenario: The `NegScenario` specifying all information about the situation
in which negotiators are to be evaluated including the partners.
types: The negotiator types to be evaluated
params: Any parameters needed to create negotiators
agent_names_reveal_type: if given the agent name for each negotiator will
simply be the negotiator's type
compact: If given the system will strive to save memory and minimize logging
no_logs: disables all logging
kwargs: Any extra arguments to be passed to the `World` constructor
"""
def __init__(
self,
*args,
scenario: Condition,
types: list[Negotiator | NegAgent],
params: list[dict[str, Any] | None] | None = None,
agent_names_reveal_type: bool = True,
compact: bool = False,
no_logs: bool = False,
normalize_scores: bool = False,
python_class_identifier: str = PYTHON_CLASS_IDENTIFIER,
**kwargs,
):
kwargs["log_to_file"] = not no_logs
if compact:
kwargs["event_file_name"] = None
kwargs["event_types"] = []
kwargs["log_screen_level"] = logging.CRITICAL
kwargs["log_file_level"] = logging.ERROR
kwargs["log_negotiations"] = False
kwargs["log_ufuns"] = False
# kwargs["save_mechanism_state_in_contract"] = False
kwargs["save_cancelled_contracts"] = False
kwargs["save_resolved_breaches"] = False
kwargs["save_negotiations"] = True
else:
kwargs["save_negotiations"] = True
self.compact = compact
super().__init__(*args, awi_type="negmas.situated.neg._NegAWI", **kwargs)
if self.info is None:
self.info = {}
self.info["scenario"] = serialize(
scenario, python_class_identifier=python_class_identifier
)
self.info["n_steps"] = self.n_steps
self.info["n_negotiators"] = len(scenario.ufuns)
if scenario.annotation is None:
scenario.annotation = dict()
self._scenario = scenario
self._received_utility: dict[str, list[float]] = defaultdict(list)
self._partner_utility: dict[str, list[float]] = defaultdict(list)
self._success: dict[str, bool] = defaultdict(bool)
self._received_advantage: dict[str, list[float]] = defaultdict(list)
self._n_agreements_per_cometitor: dict[str, list[int]] = defaultdict(list)
self._partner_advantage: dict[str, list[float]] = defaultdict(list)
self._competitors: dict[str, NegAgent] = dict()
self._partners: dict[str, NegAgent] = dict()
self._n_negs_per_copmetitor = defaultdict(int)
self._normalize_scores = normalize_scores
self._preferences_ranges = None
# if not all(isinstance(_, HasRange) for _ in scenario.ufuns):
# raise ValueError(f"Not all ufuns has a range!!!")
if self._normalize_scores:
self._preferences_ranges = [
u.minmax()
for u in scenario.ufuns # type: ignore We already check just above
]
partner_types = scenario.partner_types
partner_params = scenario.partner_params
if partner_params is None:
partner_params = [dict() for _ in partner_types]
for i, p in enumerate(partner_params):
if p is None:
partner_params[i] = dict()
types = list(types)
len(types)
types, params, wrappers = _unwrap_negotiators(types, params)
for i, t in enumerate(types):
types[i] = get_full_type_name(t)
self.agent_unique_types = [
f"{t}:{hash(str(p)) if p else ''}" if len(p) > 0 else t
for t, p in zip(types, params)
]
def add_agents(types, params, wrappers, target):
types, params = _wrap_in_agents(types, params, wrappers)
for t, p in zip(types, params):
agent = instantiate(t, **p)
if agent_names_reveal_type:
agent.name = agent.short_type_name
agent.id = agent.short_type_name
target[agent.id] = agent
self.join(agent)
add_agents(types, params, wrappers, self._competitors)
add_agents(partner_types, partner_params, _NegPartner, self._partners)
[docs]
def simulation_step(self, stage):
def unormalize(u, indx):
if self._normalize_scores:
mn, mx = self._preferences_ranges[indx]
if mx > mn:
u = (u - mn) / (mx - mn)
else:
u = u / mx
return u
def calcu(ufun, indx, agreement):
u = ufun(agreement)
return unormalize(u, indx)
def getid(indx, aid, special_index=self._scenario.index):
lst = list(self._partners.keys())
lst.insert(special_index, aid)
return lst[indx]
for aid, agent in self._competitors.items():
partner_ids = list(self._partners.keys())
partners = list(self._partners.values())
partner_ids.insert(self._scenario.index, aid)
partners.insert(self._scenario.index, agent)
_, mechanism = self.run_negotiation(
caller=agent,
issues=self._scenario.issues,
partners=partner_ids,
roles=self._scenario.roles,
annotation=self._scenario.annotation,
negotiator=None,
)
agreement = mechanism.state.agreement if mechanism else None
# agent = self.agents[aid]
# index = self._scenario.index
# caid = aid
scored_indices = self._scenario.scored_indices
if scored_indices is None:
scored_indices = [self._scenario.index]
for index in scored_indices:
current_aid = getid(index, aid)
if index == self._scenario.index:
assert (
current_aid == aid
), f"{index=}, {self._scenario.index=} but the IDS are not equal: {aid=}, {current_aid=}"
ufun = self._scenario.ufuns[index]
u = float(
calcu(ufun, index, agreement)
if agreement
else unormalize(ufun.reserved_value, index)
)
r = unormalize(float(ufun.reserved_value), index)
self._received_utility[current_aid].append(u)
self._n_agreements_per_cometitor[current_aid].append(
int(mechanism is not None)
)
self._n_negs_per_copmetitor[current_aid] += 1
self._success[current_aid] = mechanism is not None
self._received_advantage[current_aid].append(u - r)
pufuns = [
(
partner_ids.index(pid),
p.awi.get_preferences(partner_ids.index(pid)),
)
for pid, p in zip(partner_ids, partners)
if pid != current_aid
]
pu = sum(unormalize(float(_(agreement)), i) for i, _ in pufuns)
pa = sum(
unormalize(float(_(agreement)), i)
- unormalize(float(_.reserved_value), i)
for i, _ in pufuns
)
self._partner_utility[current_aid].append(pu)
self._partner_advantage[current_aid].append(pa)
partner_names = [self.agents[_].name for _ in partner_ids]
self.loginfo(f"{partner_names} -> {agreement}")
[docs]
def received_utility(self, aid: str):
return sum(
self._received_utility.get(aid, [0])
) / self._n_negs_per_copmetitor.get(aid, 0)
[docs]
def agreement_rate(self, aid: str):
return sum(
self._n_agreements_per_cometitor.get(aid, [0])
) / self._n_negs_per_agent.get(aid, 0)
[docs]
def partner_utility(self, aid: str):
return sum(
self._partner_utility.get(aid, [0])
) / self._n_negs_per_copmetitor.get(aid, 0)
[docs]
def received_advantage(self, aid: str):
return sum(
self._received_advantage.get(aid, [0])
) / self._n_negs_per_copmetitor.get(aid, 0)
[docs]
def partner_advantage(self, aid: str):
return sum(
self._partner_advantage.get(aid, [0])
) / self._n_negs_per_copmetitor.get(aid, 0)
@property
def competitors(self):
return self._competitors
@property
def partners(self):
return self._partners
[docs]
def post_step_stats(self):
for aid in self._competitors.keys():
self._stats[f"has_agreement_{aid}"].append(self._success[aid])
self._stats[f"received_utility_{aid}"].append(
self._received_utility[aid][-1]
)
self._stats[f"partner_utility_{aid}"].append(self._partner_utility[aid][-1])
self._stats[f"received_advantage_{aid}"].append(
self._received_advantage[aid][-1]
)
self._stats[f"agrement_rate_{aid}"].append(
self._n_agreements_per_cometitor[aid][-1]
)
self._stats[f"partner_advantage_{aid}"].append(
self._partner_advantage[aid][-1]
)
[docs]
def pre_step_stats(self):
pass
[docs]
def order_contracts_for_execution(
self, contracts: Collection[Contract]
) -> Collection[Contract]:
return contracts
[docs]
def contract_record(self, contract: Contract) -> dict[str, Any]:
return to_flat_dict(contract, deep=True)
[docs]
def breach_record(self, breach: Breach) -> dict[str, Any]:
return to_flat_dict(breach, deep=True)
[docs]
def contract_size(self, contract: Contract) -> float:
n = len(self._competitors)
return 1.0 / (n if n else 1)
[docs]
def delete_executed_contracts(self) -> None:
pass
[docs]
def execute_action(
self, action: Action, agent: Agent, callback: Callable | None = None
) -> bool:
"""Executes the given action by the given agent"""
...
[docs]
def get_private_state(self, agent: Agent) -> dict:
"""Reads the private state of the given agent"""
...
[docs]
def executable_contracts(self) -> Collection[Contract]:
return []
[docs]
def start_contract_execution(self, contract: Contract) -> set[Breach]:
return set()
[docs]
def complete_contract_execution(
self, contract: Contract, breaches: list[Breach], resolution: Contract
) -> None:
pass
if __name__ == "__main__":
from negmas.genius import genius_bridge_is_running
from negmas.genius.gnegotiators import Atlas3, NiceTitForTat
from negmas.outcomes import Issue
from negmas.preferences import LinearAdditiveUtilityFunction as U
from negmas.sao import AspirationNegotiator, NaiveTitForTatNegotiator
from negmas.situated import save_stats
issues = [Issue(10, "quantity"), Issue(5, "price")]
competitors = [AspirationNegotiator, NaiveTitForTatNegotiator]
if genius_bridge_is_running():
competitors += [Atlas3, NiceTitForTat]
scenario = Condition(
name="d0",
issues=issues,
ufuns=[
U.random(issues=issues, reserved_value=(0.0, 0.2), normalized=True),
U.random(issues=issues, reserved_value=(0.0, 0.2), normalized=True),
],
partner_types=[AspirationNegotiator],
index=0,
)
world = NegWorld(
scenario=scenario,
types=competitors,
n_steps=2,
neg_n_steps=10,
neg_time_limit=None,
)
world.run()
print("World ran", flush=True)
for aid in world.agents.keys():
print(world.received_utility(aid))
save_stats(world, world.log_folder)