Source code for negmas.gb.mechanisms.base

from __future__ import annotations
from random import shuffle
from time import perf_counter
from typing import TYPE_CHECKING, Any, Callable

from attrs import define

from negmas.common import TraceElement
from negmas.helpers import humanize_time
from negmas.helpers.types import get_full_type_name, instantiate
from negmas.mechanisms import Mechanism, MechanismStepResult
from negmas.outcomes import Outcome
from negmas.plots.util import default_colorizer
from negmas.preferences import BaseUtilityFunction, Preferences

from ..common import GBAction, GBResponse, GBState, ResponseType, ThreadState, GBNMI
from ..constraints.base import LocalOfferingConstraint, OfferingConstraint
from ..evaluators.base import EvaluationStrategy, LocalEvaluationStrategy, all_accept
from negmas.gb.negotiators.base import GBNegotiator

if TYPE_CHECKING:
    from negmas.plots.util import Colorizer

__all__ = ["GBMechanism", "ParallelGBMechanism", "SerialGBMechanism"]

DEFAULT_COLORMAP = "jet"


@define
class GBThread:
    mechanism: BaseGBMechanism
    negotiator: GBNegotiator
    responders: list[GBNegotiator]
    indx: int
    state: ThreadState
    evaluator: LocalEvaluationStrategy | None = None
    constraint: LocalOfferingConstraint | None = None

    @property
    def accepted_offers(self) -> list[Outcome]:
        return self.state.accepted_offers

    def run(
        self, action: dict[str, GBAction] | None = None
    ) -> tuple[ThreadState, GBResponse | None]:
        mechanism_state: GBState = self.mechanism.state  #
        history: list[GBState] = self.mechanism.history  #
        source = self.negotiator.id
        if action is None:
            strt = perf_counter()
            dest = (
                self.mechanism.dest_separator.join([_.id for _ in self.responders])
                if self.mechanism.dest_separator
                else None
            )
            offer = self.negotiator.propose(mechanism_state, dest=dest)
            self.mechanism._negotiator_times[self.negotiator.id] += (
                perf_counter() - strt
            )
        else:
            offer = action.get(source, None)
            if not offer:
                offer = None
        # assert offer is None or isinstance(offer, Outcome)
        self.state.new_offer = offer  # type: ignore
        if self.constraint and not self.constraint(
            mechanism_state.threads[source], [_.threads[source] for _ in history]
        ):
            self.state.new_offer = offer = None

        if offer is None:
            if self.mechanism._extra_callbacks:
                for n in self.responders:
                    strt = perf_counter()
                    n.on_partner_ended(source)
                    self.mechanism._negotiator_times[n.id] += perf_counter() - strt
            self.state.new_responses = dict()
            return (
                self.state,
                self.evaluator.eval(
                    source,
                    self.state,
                    mechanism_state.thread_history(history, source),
                    mechanism_state.base_state,
                )
                if self.evaluator
                else None,
            )
        if self.mechanism._extra_callbacks:
            for n in self.responders:
                strt = perf_counter()
                n.on_partner_proposal(mechanism_state, source, offer)  # type: ignore
                self.mechanism._negotiator_times[n.id] += perf_counter() - strt
        responses = []
        for responder in self.responders:
            strt = perf_counter()
            responses.append(responder.respond(mechanism_state, source=source))
            self.mechanism._negotiator_times[responder.id] += perf_counter() - strt
        if self.mechanism._extra_callbacks:
            for n, r in zip(self.responders, responses):
                strt = perf_counter()
                n.on_partner_response(mechanism_state, n.id, offer, r)  # type: ignore
                self.mechanism._negotiator_times[n.id] += perf_counter() - strt
        if all(_ == ResponseType.ACCEPT_OFFER for _ in responses):
            self.state.accepted_offers.append(offer)  # type: ignore
        self.state.new_responses = dict(
            zip(tuple(_.id for _ in self.responders), responses)
        )

        return (
            self.state,
            self.evaluator.eval(
                source,
                self.state,
                mechanism_state.thread_history(history, source),
                mechanism_state.base_state,
            )
            if self.evaluator
            else None,
        )


class BaseGBMechanism(Mechanism[GBNMI, GBState, GBAction, GBNegotiator]):
    """
    Base for all Generalized Bargaining Mechanisms
    """

    def __init__(
        self,
        *args,
        dynamic_entry=False,
        extra_callbacks=True,
        check_offers=False,
        enforce_issue_types=False,
        cast_offers=False,
        end_on_no_response=True,
        ignore_negotiator_exceptions=False,
        parallel: bool = True,
        sync_calls: bool = False,
        initial_state: GBState | None = None,
        dest_separator: str | None = ";",
        **kwargs,
    ):
        self.dest_separator = dest_separator
        super().__init__(
            *args,
            dynamic_entry=dynamic_entry,
            extra_callbacks=extra_callbacks,
            initial_state=GBState() if not initial_state else initial_state,
            **kwargs,
        )
        self._current_state: GBState
        self._parallel = parallel
        self._threads: list[GBThread] = []
        self._ignore_negotiator_exceptions = ignore_negotiator_exceptions
        self._dynamic_entry = dynamic_entry
        self._extra_callbacks = extra_callbacks
        self._check_offers = check_offers
        self._enforce_issue_types = enforce_issue_types
        self._cast_offers = cast_offers
        self._end_on_no_response = enforce_issue_types
        self._ignore_negotiator_exceptions = enforce_issue_types
        self._parallel = parallel
        self._sync_calls = sync_calls
        self.params["check_offers"] = check_offers
        self.params["enforce_issue_types"] = enforce_issue_types
        self.params["ignore_negotiator_exceptions"] = ignore_negotiator_exceptions
        self.params["cast_offers"] = cast_offers
        self.params["end_on_no_response"] = end_on_no_response
        self.params["enable_callbacks"] = extra_callbacks
        self.params["parallel"] = parallel
        self.params["sync_calls"] = sync_calls

    @property
    def state(self) -> GBState:
        """Returns the current state.

        Override `extra_state` if you want to keep extra state
        """
        return self._current_state

    def set_sync_call(self, v: bool):
        self._sync_call = v

    def run_threads(
        self, action: dict[str, GBAction] | None = None
    ) -> dict[str, tuple[ThreadState, GBResponse | None]]:
        def _do_run(idd, thread: GBThread):
            if self.verbosity > 2:
                print(
                    f"{self.name}: Thread {thread.negotiator.name} starts after {humanize_time(perf_counter() - self._start_time, show_ms=True) if self._start_time else 0}",
                    flush=True,
                )
            r = thread.run(action)
            state: GBState = self.state
            state.last_thread = idd
            return r

        if not self._parallel:
            threads = self._threads
            return {
                thread.negotiator.id: _do_run(thread.negotiator.id, thread)
                for thread in threads
            }
        # todo: make this really parallel
        indices = [_ for _ in range(len(self._threads))]
        shuffle(indices)
        threads = [self._threads[_] for _ in indices]
        results = dict()
        for t in threads:
            results[t.negotiator.id] = _do_run(t.negotiator.id, t)
        return results

    @property
    def full_trace(self) -> list[TraceElement]:
        def response(state: GBState):
            if state.timedout:
                return "timedout"
            if state.ended:
                return "ended"
            if state.has_error:
                return "error"
            if state.agreement:
                return "agreement"
            return "continuing"

        offers = []
        self._history: list[GBState]  #
        for state in self._history:
            state: GBState
            offers += [
                TraceElement(
                    state.time,
                    state.relative_time,
                    state.step,
                    source,
                    t.new_offer,
                    {neg: int(resp) for neg, resp in t.new_responses.items()},
                    response(state),
                )
                for source, t in state.threads.items()
            ]

        return offers

    @property
    def extended_trace(self) -> list[tuple[int, str, Outcome]]:
        """Returns the negotiation history as a list of step/negotiator/offer
        tuples."""
        offers = []
        self._history: list[GBState]  #
        for state in self._history:
            offers += [
                (state.step, source, t.new_offer) for source, t in state.threads.items()
            ]
        return offers

    @property
    def trace(self) -> list[tuple[str, Outcome]]:
        """Returns the negotiation history as a list of negotiator/offer
        tuples."""
        offers = []
        for state in self._history:
            offers += [(source, t.new_offer) for source, t in state.threads.items()]

        return offers

    def negotiator_offers(self, negotiator_id: str) -> list[Outcome]:
        """Returns the offers given by a negotiator (in order)"""
        return [o for n, o in self.trace if n == negotiator_id]

    def negotiator_full_trace(
        self, negotiator_id: str
    ) -> list[tuple[float, float, int, Outcome, str]]:
        """Returns the (time/relative-time/step/outcome/response) given by a
        negotiator (in order)"""
        return [
            (t, rt, s, o, a)
            for t, rt, s, n, o, _, a in self.full_trace
            if n == negotiator_id
        ]

    @property
    def offers(self) -> list[Outcome]:
        """Returns the negotiation history as a list of offers."""
        return [o for _, o in self.trace]

    @property
    def _step(self):
        """A private property used by the checkpoint system."""
        return self._current_state.step

    def plot(
        self,
        plotting_negotiators: tuple[int, int] | tuple[str, str] = (0, 1),
        save_fig: bool = False,
        path: str | None = None,
        fig_name: str | None = None,
        ignore_none_offers: bool = True,
        with_lines: bool = True,
        show_agreement: bool = False,
        show_pareto_distance: bool = True,
        show_nash_distance: bool = True,
        show_kalai_distance: bool = True,
        show_max_welfare_distance: bool = True,
        show_max_relative_welfare_distance: bool = False,
        show_end_reason: bool = True,
        show_annotations: bool = False,
        show_reserved: bool = True,
        show_total_time=True,
        show_relative_time=True,
        show_n_steps=True,
        colors: list | None = None,
        markers: list[str] | None = None,
        colormap: str = DEFAULT_COLORMAP,
        ylimits: tuple[float, float] | None = None,
        common_legend: bool = True,
        xdim: str = "step",
        colorizer: Colorizer | None = default_colorizer,
        only2d: bool = False,
        fast=False,
        simple_offers_view=False,
        **kwargs,
    ):
        from negmas.plots.util import plot_mechanism_run

        return plot_mechanism_run(
            mechanism=self,
            negotiators=plotting_negotiators,
            save_fig=save_fig,
            path=path,
            fig_name=fig_name,
            ignore_none_offers=ignore_none_offers,
            with_lines=with_lines,
            show_agreement=show_agreement,
            show_pareto_distance=show_pareto_distance,
            show_nash_distance=show_nash_distance,
            show_kalai_distance=show_kalai_distance,
            show_max_welfare_distance=show_max_welfare_distance,
            show_max_relative_welfare_distance=show_max_relative_welfare_distance,
            show_end_reason=show_end_reason,
            show_annotations=show_annotations,
            show_reserved=show_reserved,
            colors=colors,
            markers=markers,
            colormap=colormap,
            ylimits=ylimits,
            common_legend=common_legend,
            extra_annotation="",
            xdim=xdim,
            colorizer=colorizer,
            show_total_time=show_total_time,
            show_relative_time=show_relative_time,
            show_n_steps=show_n_steps,
            only2d=only2d,
            fast=fast,
            simple_offers_view=simple_offers_view,
            **kwargs,
        )

    def add(  #
        self,
        negotiator: GBNegotiator,
        *,
        preferences: Preferences | None = None,
        role: str | None = None,
        ufun: BaseUtilityFunction | None = None,
    ) -> bool | None:
        added = super().add(negotiator, preferences=preferences, role=role, ufun=ufun)
        if added:
            for thread in self._threads:
                thread.responders.append(negotiator)

            thread = GBThread(
                self,  #
                negotiator,
                responders=(
                    responders := [
                        _ for _ in self.negotiators if id(_) != id(negotiator)
                    ]
                ),
                indx=len(self._threads),
                evaluator=None,
                constraint=None,
                state=ThreadState(
                    new_offer=None,
                    new_responses={_.id: ResponseType.REJECT_OFFER for _ in responders},
                    accepted_offers=[],
                ),
            )
            self._threads.append(thread)
            self._current_state.threads[negotiator.id] = thread.state

        return added


[docs] class GBMechanism(BaseGBMechanism): def __init__( self, *args, evaluator_type: type[EvaluationStrategy] | None = None, evaluator_params: dict[str, Any] | None = None, local_evaluator_type: type[LocalEvaluationStrategy] | None = None, local_evaluator_params: dict[str, Any] | None = None, constraint_type: type[OfferingConstraint] | None = None, constraint_params: dict[str, Any] | None = None, local_constraint_type: type[LocalOfferingConstraint] | None = None, local_constraint_params: dict[str, Any] | None = None, response_combiner: Callable[[list[GBResponse]], GBResponse] = all_accept, dynamic_entry=False, extra_callbacks=False, check_offers=False, enforce_issue_types=False, cast_offers=False, end_on_no_response=True, ignore_negotiator_exceptions=False, parallel: bool = True, sync_calls: bool = False, initial_state: GBState | None = None, **kwargs, ): if not evaluator_type and not local_evaluator_type: raise ValueError( "You must pass either `evaluator_type` or `local_evaluator_type` to GBMechanism" ) super().__init__( *args, dynamic_entry=dynamic_entry, extra_callbacks=extra_callbacks, initial_state=GBState() if not initial_state else initial_state, **kwargs, ) self._current_state: GBState if not constraint_params: constraint_params = dict() self._global_constraint: OfferingConstraint | None = ( instantiate(constraint_type, **constraint_params) if constraint_type else None ) self._local_constraint_type: type[ LocalOfferingConstraint ] | None = local_constraint_type self._local_constraint_params = ( local_constraint_params if local_constraint_params else dict() ) if not evaluator_params: evaluator_params = dict() self._global_evaluator = ( instantiate(evaluator_type, **evaluator_params) if evaluator_type else None ) self._local_evaluator_type = local_evaluator_type self._local_evaluator_params = ( local_evaluator_params if local_evaluator_params else dict() ) self._combiner = response_combiner self._parallel = parallel self._threads: list[GBThread] = [] self._ignore_negotiator_exceptions = ignore_negotiator_exceptions self._dynamic_entry = dynamic_entry self._extra_callbacks = extra_callbacks self._check_offers = check_offers self._enforce_issue_types = enforce_issue_types self._cast_offers = cast_offers self._end_on_no_response = enforce_issue_types self._ignore_negotiator_exceptions = enforce_issue_types self._parallel = parallel self._sync_calls = sync_calls self.params["check_offers"] = check_offers self.params["enforce_issue_types"] = enforce_issue_types self.params["ignore_negotiator_exceptions"] = ignore_negotiator_exceptions self.params["cast_offers"] = cast_offers self.params["end_on_no_response"] = end_on_no_response self.params["enable_callbacks"] = extra_callbacks self.params["parallel"] = parallel self.params["sync_calls"] = sync_calls self.params["evaluator_type"] = get_full_type_name(evaluator_type) # self.params["evaluator_params"] = evaluator_params self.params["constraint_type"] = get_full_type_name(constraint_type) # self.params["constraint_params"] = constraint_params self.params["local_evaluator_type"] = get_full_type_name( local_evaluator_type ) # self.params["local_evaluator_params"] = local_evaluator_params self.params["local_constraint_type"] = get_full_type_name( local_constraint_type ) # self.params["local_constraint_params"] = local_constraint_params
[docs] def add( self, negotiator: GBNegotiator, *, preferences: Preferences | None = None, role: str | None = None, ufun: BaseUtilityFunction | None = None, ) -> bool | None: added = super().add(negotiator, preferences=preferences, role=role, ufun=ufun) if added: for thread in self._threads: thread.responders.append(negotiator) evaluator, constraint = None, None if self._local_evaluator_type: evaluator = instantiate( self._local_evaluator_type, **self._local_evaluator_params ) if self._local_constraint_type: constraint = instantiate( self._local_constraint_type, **self._local_constraint_params ) thread = GBThread( self, negotiator, responders=( responders := [ _ for _ in self.negotiators if id(_) != id(negotiator) ] ), indx=len(self._threads), evaluator=evaluator, constraint=constraint, state=ThreadState( new_offer=None, new_responses={_.id: ResponseType.REJECT_OFFER for _ in responders}, accepted_offers=[], ), ) self._threads.append(thread) self._current_state.threads[negotiator.id] = thread.state return added
[docs] def set_sync_call(self, v: bool): self._sync_call = v
[docs] def __call__( # self, state: GBState, action: dict[str, GBAction] | None = None ) -> MechanismStepResult: # print(f"Round {self._current_state.step}") results = self.run_threads(action) # if state.step > self.outcome_space.cardinality: # self.plot() # breakpoint() if self.verbosity > 2: print( f"{self.name}: Central Processing after {humanize_time(perf_counter() - self._start_time, show_ms=True) if self._start_time else 0}", flush=True, ) responses = [] for source, (tstate, response) in results.items(): state.threads[source] = tstate if self._local_evaluator_type: responses.append(response) if self.verbosity > 2: print( f"{self.name}: Global Evaluator after {humanize_time(perf_counter() - self._start_time, show_ms=True) if self._start_time else 0}", flush=True, ) if self._global_evaluator: responses.append( self._global_evaluator(self.negotiator_ids, state, self._history) ) if self.verbosity > 2: print( f"{self.name}: Global Constraint after {humanize_time(perf_counter() - self._start_time, show_ms=True) if self._start_time else 0}", flush=True, ) if self._global_constraint: responses.append(self._global_constraint(state, self._history)) # type: ignore e = self._combiner(responses) if e == "continue": return MechanismStepResult(state) if e is None: state.broken = True state.agreement = e return MechanismStepResult(state)
@property def full_trace(self) -> list[TraceElement]: def response(state: GBState): if state.timedout: return "timedout" if state.ended: return "ended" if state.has_error: return "error" if state.agreement: return "agreement" return "continuing" offers = [] self._history: list[GBState] # for state in self._history: state: GBState offers += [ TraceElement( state.time, state.relative_time, state.step, source, t.new_offer, {neg: int(resp) for neg, resp in t.new_responses.items()}, response(state), ) for source, t in state.threads.items() ] return offers @property def extended_trace(self) -> list[tuple[int, str, Outcome]]: """Returns the negotiation history as a list of step/negotiator/offer tuples.""" offers = [] self._history: list[GBState] # for state in self._history: offers += [ (state.step, source, t.new_offer) for source, t in state.threads.items() ] return offers @property def trace(self) -> list[tuple[str, Outcome]]: """Returns the negotiation history as a list of negotiator/offer tuples.""" offers = [] for state in self._history: offers += [(source, t.new_offer) for source, t in state.threads.items()] return offers
[docs] def negotiator_offers(self, negotiator_id: str) -> list[Outcome]: """Returns the offers given by a negotiator (in order)""" return [o for n, o in self.trace if n == negotiator_id]
[docs] def negotiator_full_trace( self, negotiator_id: str ) -> list[tuple[float, float, int, Outcome, str]]: """Returns the (time/relative-time/step/outcome/response) given by a negotiator (in order)""" return [ (t, rt, s, o, a) for t, rt, s, n, o, _, a in self.full_trace if n == negotiator_id ]
@property def offers(self) -> list[Outcome]: """Returns the negotiation history as a list of offers.""" return [o for _, o in self.trace] @property def _step(self): """A private property used by the checkpoint system.""" return self._current_state.step
[docs] def plot( self, plotting_negotiators: tuple[int, int] | tuple[str, str] = (0, 1), save_fig: bool = False, path: str | None = None, fig_name: str | None = None, ignore_none_offers: bool = True, with_lines: bool = True, show_agreement: bool = False, show_pareto_distance: bool = True, show_nash_distance: bool = True, show_kalai_distance: bool = True, show_max_welfare_distance: bool = True, show_max_relative_welfare_distance: bool = False, show_end_reason: bool = True, show_annotations: bool = False, show_reserved: bool = True, show_total_time=True, show_relative_time=True, show_n_steps=True, colors: list | None = None, markers: list[str] | None = None, colormap: str = DEFAULT_COLORMAP, ylimits: tuple[float, float] | None = None, common_legend: bool = True, xdim: str = "step", colorizer: Colorizer | None = default_colorizer, only2d: bool = False, fast=False, simple_offers_view=False, **kwargs, ): from negmas.plots.util import plot_mechanism_run return plot_mechanism_run( mechanism=self, # negotiators=plotting_negotiators, save_fig=save_fig, path=path, fig_name=fig_name, ignore_none_offers=ignore_none_offers, with_lines=with_lines, show_agreement=show_agreement, show_pareto_distance=show_pareto_distance, show_nash_distance=show_nash_distance, show_kalai_distance=show_kalai_distance, show_max_welfare_distance=show_max_welfare_distance, show_max_relative_welfare_distance=show_max_relative_welfare_distance, show_end_reason=show_end_reason, show_annotations=show_annotations, show_reserved=show_reserved, colors=colors, markers=markers, colormap=colormap, ylimits=ylimits, common_legend=common_legend, extra_annotation="", xdim=xdim, colorizer=colorizer, show_total_time=show_total_time, show_relative_time=show_relative_time, show_n_steps=show_n_steps, only2d=only2d, fast=fast, simple_offers_view=simple_offers_view, **kwargs, )
[docs] class ParallelGBMechanism(GBMechanism): def __init__(self, *args, **kwargs): kwargs["parallel"] = True super().__init__(*args, **kwargs)
[docs] class SerialGBMechanism(GBMechanism): def __init__(self, *args, **kwargs): kwargs["parallel"] = False super().__init__(*args, **kwargs)