from __future__ import annotations
import copy
import operator
import pprint
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Callable, Iterable, Sequence
from negmas.preferences.preferences import Preferences
from ..common import NegotiatorMechanismInterface, Value
from ..helpers.prob import ScipyDistribution
from ..outcomes import Outcome
from .common import _loc, _upper
__all__ = [
"Constraint",
"MarginalNeutralConstraint",
"RankConstraint",
"ComparisonConstraint",
"RangeConstraint",
"Answer",
"Query",
"QResponse",
"next_query",
"possible_queries",
"CostEvaluator",
]
[docs]
class Constraint(ABC):
"""Some constraint on allowable utility values for given outcomes."""
def __init__(
self,
full_range: Sequence[tuple[float, float]] | tuple[float, float] = (0.0, 1.0),
outcomes: list[Outcome] = None,
):
super().__init__()
self.outcomes = outcomes
self.index = None
if outcomes is not None:
self.index = dict(zip(outcomes, range(len(outcomes))))
if not isinstance(full_range, tuple):
full_range = [full_range] * len(outcomes)
self.full_range = full_range
[docs]
@abstractmethod
def is_satisfied(
self, preferences: Preferences, outcomes: Iterable[Outcome] | None = None
) -> bool:
"""
Whether or not the constraint is satisfied.
"""
[docs]
@abstractmethod
def marginals(self, outcomes: Iterable[Outcome] = None) -> list[ScipyDistribution]:
...
[docs]
@abstractmethod
def marginal(self, outcome: Outcome) -> ScipyDistribution:
...
def __repr__(self):
return self.__dict__.__repr__()
def __str__(self):
return pprint.pformat(self.__dict__)
[docs]
class MarginalNeutralConstraint(Constraint):
"""Constraints that do not affect the marginals of any outcomes. These constraints may only affect the joint
distribution."""
[docs]
def marginals(self, outcomes: Iterable[Outcome] = None) -> list[ScipyDistribution]:
if outcomes is None:
outcomes = self.outcomes
# this works only for real-valued outcomes.
return [
ScipyDistribution(
type="uniform",
loc=self.full_range[_][0],
scale=self.full_range[_][1] - self.full_range[_][0],
)
for _ in range(len(outcomes))
]
[docs]
def marginal(self, outcome: Outcome) -> ScipyDistribution:
# this works only for real-valued outcomes.
if self.outcomes is None:
return ScipyDistribution(
type="uniform",
loc=self.full_range[0],
scale=self.full_range[1] - self.full_range[0],
)
indx = self.index[outcome]
return ScipyDistribution(
type="uniform",
loc=self.full_range[indx][0],
scale=self.full_range[indx][1] - self.full_range[indx][0],
)
[docs]
class RankConstraint(MarginalNeutralConstraint):
"""Constraints the utilities of given outcomes to be in ascending order"""
def __init__(
self,
rankings: list[int],
full_range: Sequence[tuple[float, float]] | tuple[float, float] = (0.0, 1.0),
outcomes: list[Outcome] = None,
):
super().__init__(full_range=full_range, outcomes=outcomes)
self.rankings = rankings
[docs]
def is_satisfied(
self, preferences: Preferences, outcomes: Iterable[Outcome] | None = None
) -> bool:
if outcomes is None:
outcomes = self.outcomes
if outcomes is None:
raise ValueError("No outcomes are given in construction or to the call")
u = [(preferences(o), i) for i, o in enumerate(outcomes)]
ranking = sorted(u, key=lambda x: x[0])
return ranking == self.rankings
[docs]
class ComparisonConstraint(MarginalNeutralConstraint):
"""Constraints the utility of given two outcomes (must be exactly two) to satisfy the given operation (e.g. >, <)"""
def __init__(
self,
op: str | Callable[[Value, Value], bool],
full_range: Sequence[tuple[float, float]] | tuple[float, float] = (0.0, 1.0),
outcomes: list[Outcome] = None,
):
super().__init__(full_range=full_range, outcomes=outcomes)
if outcomes is not None and len(outcomes) != 2:
raise ValueError(
f"{len(outcomes)} outcomes were given to {self.__class__.__name__}"
)
self.op_name = op
if isinstance(op, str):
if op in ("less", "l", "<"):
op = operator.lt
elif op in ("greater", "g", ">"):
op = operator.gt
elif op in ("equal", "=", "=="):
op = operator.eq
elif op in ("le", "<="):
op = operator.le
elif op in ("ge", ">="):
op = operator.ge
else:
raise ValueError(f"Unknown operation {op}")
self.op = op
[docs]
def is_satisfied(
self, preferences: Preferences, outcomes: Iterable[Outcome] | None = None
) -> bool:
if outcomes is None:
outcomes = self.outcomes
if outcomes is None:
raise ValueError("No outcomes are given in construction or to the call")
if len(outcomes) != 2:
raise ValueError(
f"{len(outcomes)} outcomes were given to {self.__class__.__name__}"
)
u = [(preferences(o), i) for i, o in enumerate(outcomes)]
return self.op(u[0], u[1])
def __str__(self):
return f"{self.outcomes[0]} {self.op_name} {self.outcomes[0]}"
__repr__ = __str__
[docs]
class RangeConstraint(Constraint):
"""Constraints the utility of each of the given outcomes to lie within the given range"""
def __init__(
self,
rng: tuple = (None, None),
full_range: Sequence[tuple[float, float]] | tuple[float, float] = (0.0, 1.0),
outcomes: list[Outcome] = None,
eps=1e-5,
):
super().__init__(full_range=full_range, outcomes=outcomes)
if outcomes is not None:
self.index = dict(zip(outcomes, range(len(outcomes))))
if not isinstance(rng, tuple):
rng = [rng] * len(outcomes)
self.range = rng
self.eps = eps
if outcomes is None:
self.effective_range = (
rng[0] if rng[0] is not None else self.full_range[0],
rng[1] if rng[1] is not None else self.full_range[1],
)
else:
self.effective_range = [
(r[0] if r[0] is not None else f[0], r[1] if r[1] is not None else f[1])
for r, f in zip(self.range, self.full_range)
]
[docs]
def is_satisfied(
self, preferences: Preferences, outcomes: Iterable[Outcome] | None = None
) -> bool:
if outcomes is None:
outcomes = self.outcomes
if outcomes is None:
raise ValueError("No outcomes are given in construction or to the call")
us = [preferences(o) for o in outcomes]
mn, mx = self.range
if mn is not None:
for u in us:
if u < mn - self.eps:
return False
if mx is not None:
for u in us:
if u > mx + self.eps:
return False
return True
[docs]
def marginals(self, outcomes: Iterable[Outcome] = None) -> list[ScipyDistribution]:
if outcomes is None:
outcomes = self.outcomes
# this works only for real-valued outcomes.
return [
ScipyDistribution(
type="uniform",
loc=self.effective_range[_][0],
scale=self.effective_range[_][1] - self.effective_range[_][0],
)
for _ in range(len(outcomes))
]
[docs]
def marginal(self, outcome: Outcome) -> ScipyDistribution:
# this works only for real-valued outcomes.
if self.outcomes is None:
return ScipyDistribution(
type="uniform",
loc=self.effective_range[0],
scale=self.effective_range[1] - self.effective_range[0],
)
indx = self.index[outcome]
return ScipyDistribution(
type="uniform",
loc=self.effective_range[indx][0],
scale=self.effective_range[indx][1] - self.effective_range[indx][0],
)
def __str__(self):
result = f"{self.range}"
if self.outcomes is not None and len(self.outcomes) > 0:
result += f"{self.outcomes}"
return result
__repr__ = __str__
[docs]
@dataclass
class Answer:
outcomes: list[Outcome]
constraint: Constraint
cost: float = 0.0
name: str = ""
def __str__(self):
if len(self.name) > 0:
return self.name + f"{self.constraint}"
else:
output = f"{self.constraint}"
if self.cost > 1e-7:
output += f"(cost:{self.cost})"
if len(self.outcomes) > 0:
output += f"(outcomes:{self.outcomes})"
return output
__repr__ = __str__
[docs]
@dataclass
class Query:
answers: list[Answer]
probs: list[float]
cost: float = 0.0
name: str = ""
def __str__(self):
if len(self.name) > 0:
return self.name
else:
if self.cost < 1e-7:
return f"answers: {self.answers}"
else:
return f"answers: {self.answers} (cost:{self.cost})"
__repr__ = __str__
[docs]
@dataclass
class QResponse:
answer: Answer | None
indx: int
cost: float
[docs]
def possible_queries(
nmi: NegotiatorMechanismInterface,
strategy: EStrategy,
user: User,
outcome: Outcome = None,
) -> list[tuple[Outcome, list[ScipyDistribution], float]]:
"""Gets all queries that could be asked for that outcome until an exact value of ufun is found.
For each ask, the following tuple is returned:
(outcome, query, cost)
"""
user = copy.deepcopy(user)
strategy = copy.deepcopy(strategy)
def _possible_queries(outcome, strategy=strategy, nmi=nmi):
queries_before = user.elicited_queries()
utility_before = strategy.utility_estimate(outcome)
_lower_before, _upper_before = _loc(utility_before), _upper(utility_before)
n_before = len(queries_before)
while True:
u, _ = strategy.apply(user=user, outcome=outcome)
if isinstance(u, float):
break
_qs = user.elicited_queries()[n_before:]
# update costs
s = 0.0
qs = []
for i, q in enumerate(_qs):
qs.append((outcome, q.query, q.cost + s - user.cost))
s += q.cost
# # add possible other answers
# for old_indx, _ in enumerate(qs):
# if strategy.strategy == 'exact':
# qs[old_indx] = (_[0], [_[1]], _[3], _[4], _[5])
# continue
# others = []
# if (_[1] - lower_before) > epsilon:
# others.append(ScipyDistribution(type='uniform', loc=lower_before, scale=_[1] - lower_before))
# others.append(ScipyDistribution(type='uniform', loc=_[1], scale=_[2]) if _[2] > 0 else _[1])
# end = (_[1] + _[2])
# if (upper_before - end) > epsilon:
# others.append(ScipyDistribution(type='uniform', loc=end, scale=upper_before - end))
# qs[old_indx] = (_[0], others, _[3], _[4], _[5])
return qs
if outcome is None:
queries = []
for outcome in nmi.outcomes:
queries += _possible_queries(outcome)
else:
queries = _possible_queries(outcome)
return queries
[docs]
def next_query(
strategy: EStrategy, user: User, outcome: Outcome = None
) -> list[tuple[Outcome, Query, float]]:
"""Gets the possible outcomes for the next ask with its cost.
The following tuple is returned:
(outcome, query, cost)
"""
def _next_query(outcome, strategy=strategy):
return outcome, strategy.next_query(outcome), user.cost_of_asking()
if outcome is None:
queries = []
for outcome in strategy.outcomes:
queries.append(_next_query(outcome))
else:
queries = [_next_query(outcome)]
return queries
[docs]
class CostEvaluator:
def __init__(self, cost: float):
self.cost = cost
[docs]
def __call__(self, query: Query, answer: Answer):
return self.cost + query.cost + (answer.cost if answer.cost else 0.0)