from __future__ import annotations
import itertools
import math
from functools import reduce
from math import sqrt
from typing import TYPE_CHECKING, Any, Iterable, Literal, Sequence, TypeVar, overload
import numpy as np
from attrs import define, field
from numpy.typing import NDArray
from scipy import spatial
from scipy.stats import rankdata
from negmas import warnings
from negmas.helpers.numba_checks import jit # type: ignore
from negmas.outcomes import Issue, Outcome, discretize_and_enumerate_issues
from negmas.outcomes.common import os_or_none
from negmas.outcomes.issue_ops import enumerate_issues
from negmas.outcomes.protocols import OutcomeSpace
from negmas.preferences.crisp.mapping import MappingUtilityFunction
from negmas.warnings import NegmasUnexpectedValueWarning, warn_if_slow
if TYPE_CHECKING:
from negmas.preferences.prob_ufun import ProbUtilityFunction
from .base_ufun import BaseUtilityFunction
from .crisp_ufun import UtilityFunction
from .discounted import DiscountedUtilityFunction
UFunType = TypeVar("UFunType", UtilityFunction, ProbUtilityFunction)
__all__ = [
"pareto_frontier",
"pareto_frontier_of",
"pareto_frontier_bf",
"pareto_frontier_active",
"nash_points",
"kalai_points",
"ks_points",
"max_welfare_points",
"max_relative_welfare_points",
"make_discounted_ufun",
"scale_max",
"normalize",
"sample_outcome_with_utility",
"extreme_outcomes",
"minmax",
"conflict_level",
"opposition_level",
"winwin_level",
"get_ranks",
"distance_to",
"distance_between",
"calc_outcome_distances",
"calc_outcome_optimality",
"calc_scenario_stats",
"ScenarioStats",
"OutcomeDistances",
"OutcomeOptimality",
"sort_by_utility",
"calc_reserved_value",
"dominating_points",
]
@overload
def sort_by_utility(
ufun: BaseUtilityFunction,
outcomes: Iterable[Outcome] | None = None,
*,
max_cardinality: int | float = float("inf"),
best_first: bool = True,
rational_only: bool = False,
return_sorted_outcomes: Literal[True] = True,
) -> tuple[NDArray[np.floating[Any]], list[Outcome]]:
...
@overload
def sort_by_utility(
ufun: BaseUtilityFunction,
outcomes: Iterable[Outcome] | None = None,
*,
max_cardinality: int | float = float("inf"),
best_first: bool = True,
rational_only: bool = False,
return_sorted_outcomes: Literal[False],
) -> NDArray[np.floating[Any]]:
...
[docs]
def sort_by_utility(
ufun: BaseUtilityFunction,
outcomes: Iterable[Outcome] | None = None,
*,
max_cardinality: int | float = float("inf"),
best_first: bool = True,
rational_only: bool = False,
return_sorted_outcomes: bool = True,
) -> tuple[NDArray[np.floating[Any]], list[Outcome]] | NDArray[np.floating[Any]]:
"""
Returns an ordered list of utility values and outcomes for the given ufun
Remarks:
- If outcomes is not given, the outcome-space of the ufun is used
Returns:
A tuple of two lists (first two are numpy arrays):
- utility values ordered from best to worst or worst to best based on `best_first`
- outcomes corresponding to the sorted utility values
"""
if outcomes is None:
if ufun.outcome_space is None:
raise ValueError(
"Cannot find outcomes of the given ufun. Pass them explicitly"
)
outcomes = ufun.outcome_space.enumerate_or_sample(
max_cardinality=max_cardinality
)
outcomes = list(outcomes)
c = -1.0 if best_first else 1.0
r = ufun.reserved_value
if r is None:
rational_only = False
if rational_only:
ou = [(outcome, u) for outcome in outcomes if (u := ufun(outcome)) >= r]
outcomes = [_[0] for _ in ou]
utils = c * np.asarray([_[1] for _ in ou], dtype=float)
else:
utils = c * np.asarray(
[float(ufun(outcome)) for outcome in outcomes], dtype=float
)
indices = np.argsort(utils)
utils = c * utils
if not return_sorted_outcomes:
return utils[indices]
return utils[indices], [outcomes[_] for _ in indices]
[docs]
@define
class ScenarioStats:
opposition: float
utility_ranges: list[tuple[float, float]]
pareto_utils: tuple[tuple[float, ...], ...]
pareto_outcomes: list[Outcome]
nash_utils: list[tuple[float, ...]]
nash_outcomes: list[Outcome]
kalai_utils: list[tuple[float, ...]]
kalai_outcomes: list[Outcome]
modified_kalai_utils: list[tuple[float, ...]]
modified_kalai_outcomes: list[Outcome]
max_welfare_utils: list[tuple[float, ...]]
max_welfare_outcomes: list[Outcome]
max_relative_welfare_utils: list[tuple[float, ...]]
max_relative_welfare_outcomes: list[Outcome]
ks_utils: list[tuple[float, ...]] = field(factory=list)
ks_outcomes: list[Outcome] = field(factory=list)
modified_ks_utils: list[tuple[float, ...]] = field(factory=list)
modified_ks_outcomes: list[Outcome] = field(factory=list)
# TODO: Add more stats here. See the negobench overleaf for examples.
# Add advantage_range_ratio (ratio of rational ufun ranges), utility_range_ratio.
# Add distances between major points like kalai, kalai-smorodinsky, nash and max-welfare
# Add max and min advantage-range and utility-range
# Add Spread = area of the convexhull of all points divided by are of the rectangle constructed by ranges
# Add Optimality = fraction of outcomes on the Pareto-frontier
[docs]
@classmethod
def from_ufuns(
cls,
ufuns: list[UtilityFunction] | tuple[UtilityFunction, ...],
outcomes: Sequence[Outcome] | None = None,
eps=1e-12,
) -> ScenarioStats:
return calc_scenario_stats(ufuns, outcomes, eps)
[docs]
def restrict(
self, ufuns: tuple[UtilityFunction], reserved_values: tuple[float, ...]
) -> ScenarioStats:
ranges = self.utility_ranges
pareto_indices = [
i
for i, _ in enumerate(self.pareto_utils)
if all(_[j] >= r for j, r in enumerate(reserved_values))
]
warn_if_slow(
len(pareto_indices), "Restricting a large Pareto Frontier", lambda x: x * 10
)
pareto_utils = tuple(self.pareto_utils[_] for _ in pareto_indices)
pareto_outcomes = [self.pareto_outcomes[_] for _ in pareto_indices]
nash = nash_points(ufuns, ranges=ranges, frontier=pareto_utils)
nash_utils, nash_indices = [_[0] for _ in nash], [_[1] for _ in nash]
nash_outcomes = [pareto_outcomes[_] for _ in nash_indices]
kalai = kalai_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=True
)
kalai_utils, kalai_indices = [_[0] for _ in kalai], [_[1] for _ in kalai]
kalai_outcomes = [pareto_outcomes[_] for _ in kalai_indices]
modified_kalai = kalai_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=False
)
modified_kalai_utils, modified_kalai_indices = (
[_[0] for _ in modified_kalai],
[_[1] for _ in modified_kalai],
)
modified_kalai_outcomes = [pareto_outcomes[_] for _ in modified_kalai_indices]
ks = ks_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=True
)
ks_utils, ks_indices = [_[0] for _ in ks], [_[1] for _ in ks]
ks_outcomes = [pareto_outcomes[_] for _ in ks_indices]
modified_ks = ks_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=False
)
modified_ks_utils, modified_ks_indices = (
[_[0] for _ in modified_ks],
[_[1] for _ in modified_ks],
)
modified_ks_outcomes = [pareto_outcomes[_] for _ in modified_ks_indices]
welfare = max_welfare_points(ufuns, ranges=ranges, frontier=pareto_utils)
welfare_utils, welfare_indices = (
[_[0] for _ in welfare],
[_[1] for _ in welfare],
)
welfare_outcomes = [pareto_outcomes[_] for _ in welfare_indices]
relative_welfare = max_relative_welfare_points(
ufuns, ranges=ranges, frontier=pareto_utils
)
relative_welfare_utils, relative_welfare_indices = (
[_[0] for _ in relative_welfare],
[_[1] for _ in relative_welfare],
)
relative_welfare_outcomes = [
pareto_outcomes[_] for _ in relative_welfare_indices
]
return ScenarioStats(
opposition=self.opposition,
utility_ranges=self.utility_ranges,
pareto_utils=pareto_utils,
pareto_outcomes=pareto_outcomes,
nash_utils=nash_utils,
nash_outcomes=nash_outcomes,
kalai_utils=kalai_utils,
kalai_outcomes=kalai_outcomes,
modified_kalai_utils=modified_kalai_utils,
modified_kalai_outcomes=modified_kalai_outcomes,
ks_utils=ks_utils,
ks_outcomes=ks_outcomes,
modified_ks_utils=modified_ks_utils,
modified_ks_outcomes=modified_ks_outcomes,
max_welfare_utils=welfare_utils,
max_welfare_outcomes=welfare_outcomes,
max_relative_welfare_utils=relative_welfare_utils,
max_relative_welfare_outcomes=relative_welfare_outcomes,
)
[docs]
@define
class OutcomeOptimality:
pareto_optimality: float
nash_optimality: float
kalai_optimality: float
modified_kalai_optimality: float
max_welfare_optimality: float
ks_optimality: float = float("nan")
modified_ks_optimality: float = float("nan")
# max_relative_welfare_optimality: float
[docs]
@define
class OutcomeDistances:
pareto_dist: float
nash_dist: float
kalai_dist: float
modified_kalai_dist: float
max_welfare: float
ks_dist: float = float("nan")
modified_ks_dist: float = float("nan")
# max_relative_welfare: float
[docs]
def calc_reserved_value(
ufun: UtilityFunction,
fraction: float = float("inf"),
nmin: int = 0,
nmax: int = float("inf"), # type: ignore
max_cardinality: int | float = float("inf"),
finite: bool = True,
tight: bool = True,
) -> float:
"""
Calculates a reserved value that keeps the given fraction of outcomes
(saturated between nmin and nmax).
Remarks:
- If `finite`, the returned reserved value will be guaranteed to be finite as long as ufun always returns finite values.
- max_cardinality is used to sample outcomes for continuous outcome spaces
- If tight is given then the reserved values will be as near as possible to the range of the ufun
"""
os = ufun.outcome_space
if os is None:
raise ValueError(
"Cannot calc reserved values if the outcome space is not given and the same in all ufuns"
)
utils, _ = sort_by_utility(ufun, max_cardinality=max_cardinality, best_first=True)
noutcomes = len(utils)
warn_if_slow(
noutcomes, "Calculating Reserved Value for the given fraction is too slow"
)
n = min(noutcomes, min(nmax, max(nmin, int(math.ceil(fraction * noutcomes)))))
if n <= 0:
r = utils[0] + 1e-9 if finite else float("inf")
elif n < noutcomes:
r = 0.5 * (utils[n - 1] + utils[n])
else:
r = utils[-1] - 1e-9 if finite else float("-inf")
if tight:
mn, mx = ufun.minmax(above_reserve=False)
r = max(mn - 1e-9, min(mx + 1e-9, r))
return r
[docs]
def make_discounted_ufun(
ufun: UFunType,
cost_per_round: float | None = None,
power_per_round: float | None = None,
discount_per_round: float | None = None,
cost_per_relative_time: float | None = None,
power_per_relative_time: float | None = None,
discount_per_relative_time: float | None = None,
cost_per_real_time: float | None = None,
power_per_real_time: float | None = None,
discount_per_real_time: float | None = None,
dynamic_reservation: bool = True,
) -> DiscountedUtilityFunction | UFunType:
from negmas.preferences.discounted import ExpDiscountedUFun, LinDiscountedUFun
if cost_per_round is not None and cost_per_round > 0.0:
ufun = LinDiscountedUFun( # type: ignore (discounted ufuns return values of the same tyupe as their base ufun)
ufun=ufun,
cost=cost_per_round,
factor="step",
power=power_per_round,
dynamic_reservation=dynamic_reservation,
name=ufun.name,
outcome_space=ufun.outcome_space,
)
if cost_per_relative_time is not None and cost_per_relative_time > 0.0:
ufun = LinDiscountedUFun( # type: ignore (discounted ufuns return values of the same tyupe as their base ufun)
ufun=ufun,
cost=cost_per_relative_time,
factor="relative_time",
power=power_per_relative_time,
dynamic_reservation=dynamic_reservation,
name=ufun.name,
outcome_space=ufun.outcome_space,
)
if cost_per_real_time is not None and cost_per_real_time > 0.0:
ufun = LinDiscountedUFun( # type: ignore (discounted ufuns return values of the same tyupe as their base ufun)
ufun=ufun,
cost=cost_per_real_time,
factor="real_time",
power=power_per_real_time,
dynamic_reservation=dynamic_reservation,
name=ufun.name,
outcome_space=ufun.outcome_space,
)
if discount_per_round is not None and discount_per_round > 0.0:
ufun = ExpDiscountedUFun( # type: ignore (discounted ufuns return values of the same tyupe as their base ufun)
ufun=ufun,
discount=discount_per_round,
factor="step",
dynamic_reservation=dynamic_reservation,
name=ufun.name,
outcome_space=ufun.outcome_space,
)
if discount_per_relative_time is not None and discount_per_relative_time > 0.0:
ufun = ExpDiscountedUFun( # type: ignore (discounted ufuns return values of the same tyupe as their base ufun)
ufun=ufun,
discount=discount_per_relative_time,
factor="relative_time",
dynamic_reservation=dynamic_reservation,
name=ufun.name,
outcome_space=ufun.outcome_space,
)
if discount_per_real_time is not None and discount_per_real_time > 0.0:
ufun = ExpDiscountedUFun( # type: ignore (discounted ufuns return values of the same tyupe as their base ufun)
ufun=ufun,
discount=discount_per_real_time,
factor="real_time",
dynamic_reservation=dynamic_reservation,
name=ufun.name,
outcome_space=ufun.outcome_space,
)
return ufun
[docs]
def pareto_frontier_bf(
points: np.ndarray | Iterable[Iterable[float]], eps=-1e-12, sort_by_welfare=True
) -> np.ndarray:
points = np.asarray(points, dtype=np.float32)
if len(points) < 1:
return points
warn_if_slow(
len(points), "Pareto's Quadratic Operation is too Slow", lambda x: x * x
)
return _pareto_frontier_bf(points, eps, sort_by_welfare)
def pareto_frontier_chatgpt(
points: np.ndarray | list[tuple[float, ...]],
eps=-1e-12,
sort_by_welfare=True,
presort=True,
):
"""
Finds the pareto-frontier of a set of points.
Args:
points: list of points each is a tuple of utility values for one outcome
eps: A (usually negative) small number to treat as zero during calculations
sort_by_welfare: If True, the results are sorted descindingly by total welfare
presort: Apply the heuristic of pre-sorting all points by sum of utility
Returns:
indices of Pareto optimal outcomes
"""
points = np.asarray(points)
n_points = points.shape[0]
sorted_indices = []
if presort:
sort_mask = points.sum(1).argsort()[::-1]
sorted_indices = np.arange(n_points, dtype=np.int32)[sort_mask]
points = points[sort_mask]
warn_if_slow(n_points, "Pareto's Operation is too Slow", lambda x: x * 10)
points = -points
indices = np.arange(n_points, dtype=np.int32)
points = np.asarray(points)
hull = spatial.ConvexHull(points)
pareto_points = []
pareto_indices = []
for eq in hull.equations:
if eq[-1] > eps:
continue
pareto_points.extend(points[np.dot(points, eq[:-1]) + eq[-1] <= eps])
pareto_indices.extend(indices[np.dot(points, eq[:-1]) + eq[-1] <= eps])
indices = np.asarray(pareto_indices, dtype=np.int32)
if sort_by_welfare:
frontier = points[indices]
welfare = np.sum(frontier, axis=1)
assert frontier.shape[0] == welfare.size
welfare_sort_order = welfare.argsort()[::-1]
indices = indices[welfare_sort_order]
if presort:
indices = sorted_indices[indices]
return indices
def pareto_frontier_convex_hull(
points: np.ndarray | list[tuple[float, ...]],
eps=-1e-12,
sort_by_welfare=True,
presort=True,
) -> np.ndarray:
"""
Finds the pareto-frontier of a set of points.
Args:
points: list of points each is a tuple of utility values for one outcome
eps: A (usually negative) small number to treat as zero during calculations
sort_by_welfare: If True, the results are sorted descindingly by total welfare
presort: Apply the heuristic of pre-sorting all points by sum of utility
Returns:
indices of Pareto optimal outcomes
"""
points = np.asarray(points)
n_points = points.shape[0]
sorted_indices = []
if presort:
sort_mask = points.sum(1).argsort()[::-1]
sorted_indices = np.arange(n_points, dtype=np.int32)[sort_mask]
points = points[sort_mask]
warn_if_slow(n_points, "Pareto's Operation is too Slow", lambda x: x * 10)
n_negotiators = points.shape[1]
if n_points < 1 or n_negotiators < 1:
return np.empty(0, dtype=np.int64)
def get_pareto_undominated_by(pts1, indices, pts2=None) -> NDArray[np.integer[Any]]:
"""
Return all points in pts1 that are not Pareto dominated by any points in pts2
"""
if pts2 is None:
pts2 = pts1
def filter_(pts, point, indices=indices):
"""
Get all points in pts that are not Pareto dominated by the point pt
"""
weakly_worse = (pts >= point).all(axis=-1)
strictly_worse = (pts > point - eps).any(axis=-1)
return indices[~(weakly_worse & strictly_worse)]
return reduce(filter_, pts2, pts1)
def get_pareto_frontier(points, indices):
"""
Iteratively filter points based on the convex hull heuristic
"""
pareto_groups = []
# loop while there are points remaining
while points.shape[0]:
# brute force if there are few points:
if points.shape[0] < 10:
pareto_groups.append(get_pareto_undominated_by(points, indices))
break
# compute vertices of the convex hull
hull_vertices = spatial.ConvexHull(points).vertices
# get corresponding points
hull_pts = points[hull_vertices]
hull_indices = indices[hull_vertices]
# get points in pts that are not convex hull vertices
nonhull_mask = np.ones(points.shape[0], dtype=bool)
nonhull_mask[hull_vertices] = False
points = points[nonhull_mask]
indices = indices[nonhull_mask]
# get points in the convex hull that are on the Pareto frontier
pareto_indices = get_pareto_undominated_by(hull_pts, hull_indices)
pareto_groups.append(points[pareto_indices])
pareto = points[pareto_indices]
# filter remaining points to keep those not dominated by
# Pareto points of the convex hull
points = get_pareto_undominated_by(points, indices, pareto)
return np.vstack(pareto_groups)
indices = np.arange(n_points, dtype=np.int32)
points = get_pareto_frontier(points, indices)
if sort_by_welfare:
frontier = points[indices]
welfare = np.sum(frontier, axis=1)
assert frontier.shape[0] == welfare.size
welfare_sort_order = welfare.argsort()[::-1]
indices = indices[welfare_sort_order]
if presort:
indices = sorted_indices[indices]
return indices
# Faster than is_pareto_efficient_simple, but less readable.
def pareto_frontier_numpy(
points: np.ndarray | list[tuple[float, ...]],
eps=-1e-12,
sort_by_welfare=True,
presort=True,
) -> np.ndarray:
"""
Finds the pareto-frontier of a set of points.
Args:
points: list of points each is a tuple of utility values for one outcome
eps: A (usually negative) small number to treat as zero during calculations
sort_by_welfare: If True, the results are sorted descindingly by total welfare
presort: Apply the heuristic of pre-sorting all points by sum of utility
Returns:
indices of Pareto optimal outcomes
"""
points = np.asarray(points)
n_points = points.shape[0]
sorted_indices = []
if presort:
sorted_indices = points.sum(1).argsort()[::-1]
points = points[sorted_indices]
warn_if_slow(n_points, "Pareto's Operation is too Slow", lambda x: x * 10)
n_negotiators = points.shape[1]
if n_points < 1 or n_negotiators < 1:
return np.empty(0, dtype=np.int64)
indices = np.ones(n_points, dtype=bool)
for i, c in enumerate(points):
if not indices[i]:
continue
# Keep any point with a higher utility
indices[indices] = np.any(points[indices] > c, axis=1)
indices[i] = True # And keep self
indices = np.nonzero(indices)[0]
# indices = np.arange(n_points)
# next_point_index = 0 # Next index in the indices array to search for
# while next_point_index < n_points:
# nondominated_point_mask = np.any(points > points[next_point_index], axis=1)
# nondominated_point_mask[next_point_index] = True
# indices = indices[nondominated_point_mask] # Remove dominated points
# points = points[nondominated_point_mask]
# next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1
if sort_by_welfare:
frontier = points[indices]
welfare = np.sum(frontier, axis=1)
assert frontier.shape[0] == welfare.size
welfare_sort_order = welfare.argsort()[::-1]
indices = indices[welfare_sort_order]
if presort:
indices = sorted_indices[indices]
return indices
def pareto_frontier_numpy_faster(
points: np.ndarray | list[tuple[float, ...]],
eps=-1e-12,
sort_by_welfare=True,
presort=True,
) -> np.ndarray:
"""
Finds the pareto-frontier of a set of points.
Args:
points: list of points each is a tuple of utility values for one outcome
eps: A (usually negative) small number to treat as zero during calculations
sort_by_welfare: If True, the results are sorted descindingly by total welfare
presort: Apply the heuristic of pre-sorting all points by sum of utility
Returns:
indices of Pareto optimal outcomes
"""
points = np.asarray(points)
n_points = points.shape[0]
sorted_indices = []
if presort:
sort_mask = points.sum(1).argsort()[::-1]
sorted_indices = np.arange(n_points, dtype=np.int32)[sort_mask]
points = points[sort_mask]
warn_if_slow(n_points, "Pareto's Operation is too Slow", lambda x: x * 10)
n_negotiators = points.shape[1]
if n_points < 1 or n_negotiators < 1:
return np.empty(0, dtype=np.int64)
indices = np.arange(n_points)
next_point_index = 0 # Next index in the indices array to search for
while next_point_index < n_points:
nondominated_point_mask = np.any(
points > points[next_point_index] - eps, axis=1
)
nondominated_point_mask[next_point_index] = True
indices = indices[nondominated_point_mask] # Remove dominated points
points = points[nondominated_point_mask]
next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1
if sort_by_welfare:
frontier = points[indices]
welfare = np.sum(frontier, axis=1)
assert frontier.shape[0] == welfare.size
welfare_sort_order = welfare.argsort()[::-1]
indices = indices[welfare_sort_order]
if presort:
indices = sorted_indices[indices]
return indices
[docs]
def pareto_frontier_of(
points: np.ndarray | Iterable[Iterable[float]], eps=-1e-12, sort_by_welfare=True
) -> np.ndarray:
"""Finds the pareto-frontier of a set of utils (i.e. utility values). Uses
a fast algorithm.
Args:
points: list of utils
eps: A (usually negative) small number to treat as zero during calculations
sort_by_welfare: If True, the results are sorted descindingly by total welfare
Returns:
"""
utils = np.asarray(points)
n = len(utils)
warn_if_slow(n, "Pareto's Linear Operation is too Slow", lambda x: x * 5)
# for j in range(utils.shape[1]):
# order = utils[:, 0].argsort()[::-1]
# utils = utils[order]
# indices = order
frontier = []
found = []
for p in range(0, n):
if p in found:
continue
current = utils[p, :]
to_remove = []
for i, f in enumerate(frontier):
current_better, current_worse = current > f, current < f
if (current == f).all():
frontier.append(current)
found.append(p)
if not current_better.any() and current_worse.any():
# current is dominated, break
break
if current_better.any():
if not current_worse.any():
# current dominates f, append it, remove f and scan for anything else dominated by current
for j, g in enumerate(frontier[i + 1 :]):
if (current == g).all():
to_remove.append(i)
break
if (current > g).any() and not (current < g).any():
to_remove.append(j)
else:
frontier[i] = current
found[i] = p
for i in sorted(to_remove, reverse=True):
frontier = frontier[:i] + frontier[i + 1 :]
found = found[:i] + found[i + 1 :]
else:
# neither current nor f dominate each other, append current only if it is not
# dominated by anything in frontier
for j, g in enumerate(frontier[i + 1 :]):
if (current == g).all() or (
(g > current).any() and not (current > g).any()
):
break
else:
if p not in found:
frontier.append(current)
found.append(p)
else:
if p not in found:
frontier.append(current)
found.append(p)
if sort_by_welfare:
frontier = np.asarray(frontier)
welfare = np.sum(frontier, axis=1)
welfare_sort_order = welfare.argsort()[::-1]
found = [found[_] for _ in welfare_sort_order]
# welfare = [_.sum() for _ in frontier]
# indx = sorted(range(len(welfare)), key=lambda x: welfare[x], reverse=True)
# found = [found[_] for _ in indx]
return np.asarray([_ for _ in found])
@jit(nopython=True)
def _pareto_frontier_bf(
points: np.ndarray, eps=-1e-12, sort_by_welfare=True
) -> np.ndarray:
"""
Finds the pareto-frontier of a set of points using brute-force. This is
extremely slow but is guaranteed to be correct.
Args:
points: list of points each is a tuple of utility values for one outcome
eps: A (usually negative) small number to treat as zero during calculations
sort_by_welfare: If True, the results are sorted descindingly by total welfare
Returns:
indices of Pareto optimal outcomes
"""
frontier, indices = [], []
if len(points) < 1:
return np.empty(0, dtype=np.int64)
m = points.shape[1]
if m < 1:
return np.empty(0, dtype=np.int64)
# points = points[points[:, 0].argsort()[::-1]]
for i, current in enumerate(points):
for j, test in enumerate(points):
if j == i:
continue
has_better = has_worse = False
for k in range(m):
a, b = current[k], test[k]
if a > b:
has_better = True
continue
if a < b - eps:
has_worse = True
if not has_better and has_worse:
# current is dominated, break
break
else:
indices.append(i)
frontier.append(current)
indices = np.asarray(indices, dtype=np.int64)
# frontier = np.vstack(tuple(frontier))
if sort_by_welfare:
n = len(frontier)
# welfare = frontier.sum(axis=1)
welfare = np.zeros(n, dtype=np.float32)
for i, f in enumerate(frontier):
welfare[i] = f.sum()
welfare_sort_order = welfare.argsort()[::-1]
indices = indices[welfare_sort_order]
# welfare = [(np.sum(_[0]), i) for i, _ in enumerate(frontier)]
# indx = sorted(welfare, reverse=True)
# indices = [frontier[_] for _ in indx]
return indices
[docs]
def ks_points(
ufuns: Sequence[UtilityFunction],
frontier: Sequence[tuple[float, ...]],
ranges: Sequence[tuple[float, ...]] | None = None,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
eps: float = 1e-12,
subtract_reserved_value: bool = True,
exponent: float = float("inf"),
) -> tuple[tuple[tuple[float, ...], int], ...]:
"""
Calculates the all Kalai-Somordinsky bargaining solutions on the Pareto frontier of a negotiation which is the bargaining solution that maximizes minimum advantage ratio
ref: Kalai, Ehud, and Meir Smorodinsky. "Other solutions to Nash's bargaining problem." Econometrica: Journal of the Econometric Society (1975): 513-518.
Args:
ufuns: A list of ufuns to use
frontier: a list of tuples each giving the utility values at some outcome on the frontier (usually found by `pareto_frontier`) to search within
outcome_space: The outcome-space to consider
issues: The issues on which the ufun is defined (outcomes may be passed instead)
outcomes: The outcomes on which the ufun is defined (outcomes may be passed instead)
exponent: The exponent used for evaluating distance to the optimal KS solution as the degree of the L-norm used.
The default is infinity which indicates the maximum difference. Currently not used
Returns:
A tuple of three values (all will be None if reserved values are unknown)
- A tuple of utility values at the nash point
- The index of the given frontier corresponding to the nash point
Remarks:
- The function searches within the given frontier only.
"""
if not math.isinf(exponent):
warnings.warn(
"kalai-smorodinsky solution currently only supports the extreme case of exponent=float('inf')",
NegmasUnexpectedValueWarning,
)
if not frontier:
return tuple()
# calculate the minimum and maximum value for each ufun
if not ranges:
ranges = [
_.minmax(outcome_space, above_reserve=False)
if outcome_space
else _.minmax(issues=issues, above_reserve=False)
if issues
else _.minmax(outcomes=outcomes, above_reserve=False)
for _ in ufuns
]
# find reserved values
rs = tuple(_.reserved_value for _ in ufuns)
rs = tuple(float(_) if _ is not None else float("-inf") for _ in rs)
# find all reserved values
ranges = list(ranges)
for i, (r, rng) in enumerate(zip(rs, ranges)):
if any(_ is None or not math.isfinite(_) for _ in rng):
raise ValueError(f"Cannot find the range for ufun {i}: {rng}")
if r is None or r < rng[0]:
continue
ranges[i] = (r, rng[1])
# if all ranges are very tiny, return everything as optimal
if any([(_[1] - _[0]) <= eps for _ in ranges]):
return tuple(zip(frontier, range(len(frontier))))
# find difference between reserved value and maximum
vals, results = [], []
optim_val, optim_indx = float("inf"), None
for indx, outcome in enumerate(frontier):
if any(u < r for u, r in zip(outcome, rs)):
vals.append(float("inf"))
continue
if subtract_reserved_value:
advantages = [
((float(u) - r) / (x - r)) if (x - r) else (float(u) - r)
for u, (r, x) in zip(outcome, ranges)
]
else:
advantages = [
(float(u) / x) if x else float(u) for u, (_, x) in zip(outcome, ranges)
]
val = max(advantages) - min(advantages)
vals.append(val)
if val < optim_val:
optim_val = val
optim_indx = indx
if optim_indx is None:
return tuple()
for indx, (val, outcome) in enumerate(zip(vals, frontier)):
if val <= optim_val + eps:
results.append((outcome, indx))
return tuple(results)
[docs]
def kalai_points(
ufuns: Sequence[UtilityFunction],
frontier: Sequence[tuple[float, ...]],
ranges: Sequence[tuple[float, ...]] | None = None,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
eps: float = 1e-12,
subtract_reserved_value: bool = True,
) -> tuple[tuple[tuple[float, ...], int], ...]:
"""
Calculates the all Kalai bargaining solutions on the Pareto frontier of a negotiation which is the most Egaliterian solution
ref: Kalai, Ehud (1977). "Proportional solutions to bargaining situations: Intertemporal utility comparisons" (PDF). Econometrica. 45 (7): 1623–1630. doi:10.2307/1913954. JSTOR 1913954.
Args:
ufuns: A list of ufuns to use
frontier: a list of tuples each giving the utility values at some outcome on the frontier (usually found by `pareto_frontier`) to search within
outcome_space: The outcome-space to consider
issues: The issues on which the ufun is defined (outcomes may be passed instead)
outcomes: The outcomes on which the ufun is defined (outcomes may be passed instead)
Returns:
A tuple of three values (all will be None if reserved values are unknown)
- A tuple of utility values at the nash point
- The index of the given frontier corresponding to the nash point
Remarks:
- The function searches within the given frontier only.
"""
if not frontier:
return tuple()
# calculate the minimum and maximum value for each ufun
if not ranges:
ranges = [
_.minmax(outcome_space, above_reserve=False)
if outcome_space
else _.minmax(issues=issues, above_reserve=False)
if issues
else _.minmax(outcomes=outcomes, above_reserve=False)
for _ in ufuns
]
# find reserved values
rs = tuple(_.reserved_value for _ in ufuns)
rs = tuple(float(_) if _ is not None else float("-inf") for _ in rs)
# find all reserved values
ranges = list(ranges)
for i, (r, rng) in enumerate(zip(rs, ranges)):
if any(_ is None or not math.isfinite(_) for _ in rng):
raise ValueError(f"Cannot find the range for ufun {i}: {rng}")
if r is None or r < rng[0]:
continue
ranges[i] = (r, rng[1])
# if all ranges are very tiny, return everything as optimal
if any([(_[1] - _[0]) <= eps for _ in ranges]):
return tuple(zip(frontier, range(len(frontier))))
# find difference between reserved value and maximum
vals, results = [], []
optim_val, optim_indx = float("-inf"), None
for indx, outcome in enumerate(frontier):
if any(u < r for u, r in zip(outcome, rs)):
vals.append(float("-inf"))
continue
if subtract_reserved_value:
val = min(float(u) - r for u, (r, _) in zip(outcome, ranges))
else:
val = min(float(u) for u in outcome)
vals.append(val)
if val > optim_val:
optim_val = val
optim_indx = indx
if optim_indx is None:
return tuple()
for indx, (val, outcome) in enumerate(zip(vals, frontier)):
if val >= optim_val - eps:
results.append((outcome, indx))
return tuple(results)
[docs]
def nash_points(
ufuns: Sequence[UtilityFunction] | None,
frontier: Sequence[tuple[float, ...]],
ranges: Sequence[tuple[float, ...]] | None = None,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
eps=1e-12,
) -> tuple[tuple[tuple[float, ...], int], ...]:
"""Calculates all the Nash Bargaining Solutions on the Pareto frontier of a
negotiation.
Args:
ufuns: A list of ufuns to use. If not given, the reserved value is assumed to be zero for all ufuns
frontier: a list of tuples each giving the utility values at some outcome on the frontier (usually found by `pareto_frontier`) to search within
ranges: The ranges of the utility functions involved.
outcome_space: The outcome-space to consider
issues: The issues on which the ufun is defined (outcomes may be passed instead)
outcomes: The outcomes on which the ufun is defined (outcomes may be passed instead)
Returns:
A list of tuples (empty if cannot be calculated) each consists of:
- A tuple of utility values at the Nash point
- The index of the given frontier corresponding to the Nash point
Remarks:
- The function searches within the given frontier only.
"""
if not frontier or len(frontier) == 0:
return tuple()
# calculate the minimum and maximum value for each ufun
if not ranges:
assert ufuns is not None
ranges = [
_.minmax(outcome_space, above_reserve=False)
if outcome_space
else _.minmax(issues=issues, above_reserve=False)
if issues
else _.minmax(outcomes=outcomes, above_reserve=False)
for _ in ufuns
]
n_ufuns = len(ufuns) if ufuns else len(frontier[0])
# find reserved values
rs = (
tuple(_.reserved_value for _ in ufuns)
if ufuns
else tuple(0 for _ in range(n_ufuns))
)
rs = tuple(float(_) if _ is not None else float("-inf") for _ in rs)
# find all reserved values
ranges = list(ranges)
for i, (r, rng) in enumerate(zip(rs, ranges)):
if any(_ is None or not math.isfinite(_) for _ in rng):
raise ValueError(f"Cannot find the range for ufun {i}: {rng}")
if r is None or r < rng[0]:
continue
ranges[i] = (r, rng[1])
# if all ranges are very tiny, return everything as optimal
if any([(_[1] - _[0]) <= eps for _ in ranges]):
return tuple(zip(frontier, range(len(frontier))))
vals, results = [], []
optim_val, optim_indx = float("-inf"), None
for indx, outcome in enumerate(frontier):
val = 1.0
for u, (r, _) in zip(outcome, ranges):
val *= float(u) - r
vals.append(val)
if val > optim_val:
optim_val = val
optim_indx = indx
if optim_indx is None:
return tuple()
for indx, (val, outcome) in enumerate(zip(vals, frontier)):
if val >= optim_val - eps:
results.append((outcome, indx))
return tuple(results)
[docs]
def max_welfare_points(
ufuns: Sequence[UtilityFunction],
frontier: Sequence[tuple[float, ...]],
ranges: Sequence[tuple[float, ...]] | None = None,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
eps=1e-12,
) -> tuple[tuple[tuple[float, ...], int], ...]:
"""Calculates all the points with maximum relative welfare (i.e. sum of
improvements above reserved value) on the Pareto frontier of a negotiation.
Args:
ufuns: A list of ufuns to use
frontier: a list of tuples each giving the utility values at some outcome on the frontier (usually found by `pareto_frontier`) to search within
ranges: the minimum and maximum value for each ufun. If not given, outcome_space, issues, or outcomes must be given to calculate it
outcome_space: The outcome-space to consider
issues: The issues on which the ufun is defined (outcomes may be passed instead)
outcomes: The outcomes on which the ufun is defined (outcomes may be passed instead)
Returns:
A list of tuples (empty if cannot be calculated) each consists of:
- A tuple of utility values at the Nash point
- The index of the given frontier corresponding to the Nash point
Remarks:
- The function searches within the given frontier only.
"""
if not frontier:
return tuple()
# calculate the minimum and maximum value for each ufun
if not ranges:
ranges = [
_.minmax(outcome_space, above_reserve=False)
if outcome_space
else _.minmax(issues=issues, above_reserve=False)
if issues
else _.minmax(outcomes=outcomes, above_reserve=False)
for _ in ufuns
]
# find all reserved values
for i, rng in enumerate(ranges):
if any(_ is None or not math.isfinite(_) for _ in rng):
raise ValueError(f"Cannot find the range for ufun {i}: {rng}")
# if r is None or r < rng[0]:
# continue
# ranges[i] = (r, rng[1])
# if all ranges are very tiny, return everything as optimal
if any([(_[1] - _[0]) <= eps for _ in ranges]):
return tuple(zip(frontier, range(len(frontier))))
vals, results = [], []
optim_val, optim_indx = float("-inf"), None
for indx, outcome in enumerate(frontier):
val = sum(float(u) for u in outcome)
vals.append(val)
if val > optim_val:
optim_val = val
optim_indx = indx
if optim_indx is None:
return tuple()
for indx, (val, outcome) in enumerate(zip(vals, frontier)):
if val >= optim_val - eps:
results.append((outcome, indx))
return tuple(results)
def dist(x: tuple[float, ...], y: tuple[float, ...]) -> float:
if x is None or y is None:
return float("nan")
return sqrt(sum((a - b) ** 2 for a, b in zip(x, y, strict=True)))
[docs]
def distance_between(w: tuple[float, ...], n: tuple[float, ...]) -> float:
return dist(w, n)
[docs]
def distance_to(w: tuple[float, ...], p: Iterable[tuple[float, ...]]) -> float:
dists = list(distance_between(w, x) for x in p)
if not dists:
return float("nan")
return min(dists)
def max_distance_to(w: tuple[float, ...], p: Iterable[tuple[float, ...]]) -> float:
dists = list(distance_between(w, x) for x in p)
if not dists:
return float("nan")
return max(dists)
def is_rational(ufuns: Iterable[UtilityFunction], outcome: Outcome) -> bool:
for u in ufuns:
if u(outcome) < u.reserved_value:
return False
return True
def is_irrational(utils: Iterable[UtilityFunction], outcome: Outcome) -> bool:
return not is_rational(utils, outcome)
[docs]
def calc_outcome_distances(
utils: tuple[float, ...], stats: ScenarioStats
) -> OutcomeDistances:
pdist = distance_to(utils, stats.pareto_utils)
ndist = distance_to(utils, stats.nash_utils)
kdist = distance_to(utils, stats.kalai_utils)
mkdist = distance_to(utils, stats.modified_kalai_utils)
ksdist = distance_to(utils, stats.ks_utils)
mksdist = distance_to(utils, stats.modified_ks_utils)
welfare = sum(utils)
return OutcomeDistances(pdist, ndist, kdist, mkdist, welfare, ksdist, mksdist)
def estimate_max_dist(ufuns: Sequence[UtilityFunction]) -> float:
ranges = [_.minmax(ufuns[0].outcome_space, above_reserve=False) for _ in ufuns]
diffs = [float(b) - float(a) for a, b in ranges]
assert all(_ >= 0 for _ in diffs)
return sqrt(sum(d**2 for d in diffs))
def estimate_max_dist_using_outcomes(
ufuns: Sequence[UtilityFunction], outcome_utils: Sequence[tuple[float, ...]]
) -> float:
if not outcome_utils:
return estimate_max_dist(ufuns)
ranges = [_.minmax(ufuns[0].outcome_space) for _ in ufuns]
mins = [float(a) for a, _ in ranges]
maxs = [float(a) for a, _ in ranges]
# distances between outcomes and minimum ranges
dists = [(_ - mn for _, mn in zip(p, mins, strict=True)) for p in outcome_utils]
# distances between outcomes and maximum ranges
dists += [(_ - mx for _, mx in zip(p, maxs, strict=True)) for p in outcome_utils]
# distances between outcomes themselves
dists += [
(a - b for a, b in zip(p, p2, strict=True))
for p, p2 in itertools.product(outcome_utils, outcome_utils)
if p != p2
]
return sqrt(max(sum(_**2 for _ in d) for d in dists))
[docs]
def calc_outcome_optimality(
dists: OutcomeDistances, stats: ScenarioStats, max_dist: float
) -> OutcomeOptimality:
optim = dict()
for name, lst, diff in (
("pareto_optimality", stats.pareto_utils, dists.pareto_dist),
("nash_optimality", stats.nash_utils, dists.nash_dist),
("kalai_optimality", stats.kalai_utils, dists.kalai_dist),
(
"modified_kalai_optimality",
stats.modified_kalai_utils,
dists.modified_kalai_dist,
),
("ks_optimality", stats.ks_utils, dists.ks_dist),
("modified_ks_optimality", stats.modified_ks_utils, dists.modified_ks_dist),
):
if not lst:
optim[name] = float("nan")
continue
if abs(max_dist) < 1e-12:
optim[name] = float("nan")
continue
optim[name] = max(0, 1 - diff / max_dist)
# assert 0 <= optim[name] <= 1, f"{name=}, {optim[name]=}"
for name, lst, diff in (
("max_welfare_optimality", stats.pareto_utils, dists.max_welfare),
):
if not lst:
optim[name] = float("nan")
continue
mx = max(sum(_) for _ in lst)
if abs(mx) < 1e-12:
optim[name] = float("nan")
continue
optim[name] = max(0, diff / mx)
assert 0 <= optim[name], f"{name=}, {optim[name]=}"
return OutcomeOptimality(**optim)
[docs]
def calc_scenario_stats(
ufuns: tuple[UtilityFunction, ...] | list[UtilityFunction],
outcomes: Sequence[Outcome] | None = None,
eps=1e-12,
) -> ScenarioStats:
if not ufuns:
raise ValueError("Must pass the ufuns")
ufuns = list(ufuns)
os = ufuns[0].outcome_space
ranges = [_.minmax(os, above_reserve=False) for _ in ufuns]
if os is None:
raise ValueError(
"Cannot find stats if the outcome space is not given and the same in all ufuns"
)
for i, u in enumerate(ufuns):
if u.outcome_space is None or u.outcome_space != os:
raise ValueError(
f"Ufun {i} has a different outcome space than the first ufun:\n\tos[0]: {os}\n\tos[{i}]={u.outcome_space}"
)
if outcomes is None:
outcomes = list(os.enumerate_or_sample(max_cardinality=float("inf"))) # type: ignore
else:
for o in outcomes:
if not os.is_valid(o):
raise ValueError(f"Outcome {o} is invalid for outcome space {os}")
pareto_utils, pareto_indices = pareto_frontier(
ufuns, outcomes, sort_by_welfare=True, eps=eps
)
pareto_outcomes = [outcomes[_] for _ in pareto_indices]
nash = nash_points(ufuns, ranges=ranges, frontier=pareto_utils)
nash_utils, nash_indices = [_[0] for _ in nash], [_[1] for _ in nash]
nash_outcomes = [pareto_outcomes[_] for _ in nash_indices]
kalai = kalai_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=True
)
kalai_utils, kalai_indices = [_[0] for _ in kalai], [_[1] for _ in kalai]
kalai_outcomes = [pareto_outcomes[_] for _ in kalai_indices]
modified_kalai = kalai_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=False
)
modified_kalai_utils, modified_kalai_indices = (
[_[0] for _ in modified_kalai],
[_[1] for _ in modified_kalai],
)
modified_kalai_outcomes = [pareto_outcomes[_] for _ in modified_kalai_indices]
ks = ks_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=True
)
ks_utils, ks_indices = [_[0] for _ in ks], [_[1] for _ in ks]
ks_outcomes = [pareto_outcomes[_] for _ in ks_indices]
modified_ks = ks_points(
ufuns, ranges=ranges, frontier=pareto_utils, subtract_reserved_value=False
)
modified_ks_utils, modified_ks_indices = (
[_[0] for _ in modified_ks],
[_[1] for _ in modified_ks],
)
modified_ks_outcomes = [pareto_outcomes[_] for _ in modified_ks_indices]
welfare = max_welfare_points(ufuns, ranges=ranges, frontier=pareto_utils)
welfare_utils, welfare_indices = [_[0] for _ in welfare], [_[1] for _ in welfare]
welfare_outcomes = [pareto_outcomes[_] for _ in welfare_indices]
relative_welfare = max_relative_welfare_points(
ufuns, ranges=ranges, frontier=pareto_utils
)
relative_welfare_utils, relative_welfare_indices = (
[_[0] for _ in relative_welfare],
[_[1] for _ in relative_welfare],
)
relative_welfare_outcomes = [pareto_outcomes[_] for _ in relative_welfare_indices]
minmax = [u.minmax() for u in ufuns]
opposition = opposition_level(
ufuns,
max_utils=tuple(_[1] for _ in minmax), # type: ignore
outcomes=outcomes,
)
return ScenarioStats(
opposition=opposition,
utility_ranges=ranges,
pareto_utils=pareto_utils,
pareto_outcomes=pareto_outcomes,
nash_utils=nash_utils,
nash_outcomes=nash_outcomes,
kalai_utils=kalai_utils,
kalai_outcomes=kalai_outcomes,
modified_kalai_utils=modified_kalai_utils,
modified_kalai_outcomes=modified_kalai_outcomes,
ks_utils=ks_utils,
ks_outcomes=ks_outcomes,
modified_ks_utils=modified_ks_utils,
modified_ks_outcomes=modified_ks_outcomes,
max_welfare_utils=welfare_utils,
max_welfare_outcomes=welfare_outcomes,
max_relative_welfare_utils=relative_welfare_utils,
max_relative_welfare_outcomes=relative_welfare_outcomes,
)
[docs]
def max_relative_welfare_points(
ufuns: Sequence[UtilityFunction],
frontier: Sequence[tuple[float, ...]],
ranges: Sequence[tuple[float, ...]] | None = None,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
eps=1e-12,
) -> tuple[tuple[tuple[float, ...], int], ...]:
"""Calculates all the points with maximum relative welfare (i.e. sum of
improvements above reserved value) on the Pareto frontier of a negotiation.
Args:
ufuns: A list of ufuns to use
frontier: a list of tuples each giving the utility values at some outcome on the frontier (usually found by `pareto_frontier`) to search within
outcome_space: The outcome-space to consider
issues: The issues on which the ufun is defined (outcomes may be passed instead)
outcomes: The outcomes on which the ufun is defined (outcomes may be passed instead)
Returns:
A list of tuples (empty if cannot be calculated) each consists of:
- A tuple of utility values at the Nash point
- The index of the given frontier corresponding to the Nash point
Remarks:
- The function searches within the given frontier only.
"""
if not frontier:
return tuple()
# calculate the minimum and maximum value for each ufun
if not ranges:
ranges = [
_.minmax(outcome_space, above_reserve=False)
if outcome_space
else _.minmax(issues=issues, above_reserve=False)
if issues
else _.minmax(outcomes=outcomes, above_reserve=False)
for _ in ufuns
]
# find all reserved values
for i, rng in enumerate(ranges):
if any(_ is None or not math.isfinite(_) for _ in rng):
raise ValueError(f"Cannot find the range for ufun {i}: {rng}")
# if r is None or r < rng[0]:
# continue
# ranges[i] = (r, rng[1])
# if all ranges are very tiny, return everything as optimal
if any([(_[1] - _[0]) <= eps for _ in ranges]):
return tuple(zip(frontier, range(len(frontier))))
# find reserved values
rs = tuple(_.reserved_value for _ in ufuns)
rs = tuple(float(_) if _ is not None else float("-inf") for _ in rs)
# find difference between reserved value and maximum
diffs = [a[1] - max(b, a[0]) for a, b in zip(ranges, rs)]
# find the optimal utilities
vals, results = [], []
optim_val, optim_indx = float("-inf"), None
for indx, outcome in enumerate(frontier):
if any(u < r for u, r in zip(outcome, rs)):
vals.append(float("-inf"))
continue
val = 0.0
for u, (r, _), d in zip(outcome, ranges, diffs):
val += (float(u) - r) / (d if d else 1.0)
vals.append(val)
if val > optim_val:
optim_val = val
optim_indx = indx
if optim_indx is None:
return tuple()
for indx, (val, outcome) in enumerate(zip(vals, frontier)):
if val >= optim_val - eps:
results.append((outcome, indx))
return tuple(results)
[docs]
def dominating_points(
utils: NDArray[np.floating[Any]] | tuple[float, ...],
points: NDArray[np.floating[Any]] | tuple[tuple[float, ...]],
) -> NDArray[np.integer[Any]]:
"""
Tests whether the given point in utility space is dominated by any in the given points (eps is the tolerance).
"""
points = np.asarray(points)
utils = np.asarray(utils)
dominating = np.any(points > utils, axis=1)
dominating[dominating] = np.all(points[dominating] >= utils, axis=1)
return np.nonzero(dominating)[0]
[docs]
def pareto_frontier(
ufuns: Sequence[BaseUtilityFunction],
outcomes: Sequence[Outcome] | None = None,
issues: Sequence[Issue] | None = None,
n_discretization: int | None = None,
max_cardinality: int | float = float("inf"),
sort_by_welfare=True,
eps: float = 1e-12,
) -> tuple[tuple[tuple[float, ...], ...], tuple[int, ...]]:
"""Finds all pareto-optimal outcomes in the list.
Args:
ufuns: The utility functions
outcomes: the outcomes to be checked. If None then all possible outcomes from the issues will be checked
issues: The set of issues (only used when outcomes is None)
n_discretization: The number of items to discretize each real-dimension into
sort_by_welfare: If True, the results are sorted descendingly by total welfare
rational_only: If true, only rational outcomes can be members of the Pareto frontier.
eps: resolution
Returns:
Two lists of the same length. First list gives the utilities at Pareto frontier points and second list gives their indices
"""
ufuns = tuple(ufuns)
if issues:
issues = tuple(issues)
if outcomes:
outcomes = tuple(outcomes)
# calculate all candidate outcomes
if outcomes is None:
if issues is None:
try:
issues = ufuns[0].outcome_space.issues # type: ignore
except Exception:
return ((), ())
outcomes = discretize_and_enumerate_issues(
issues, # type: ignore
n_discretization=n_discretization,
max_cardinality=max_cardinality, # type: ignore
)
# outcomes = itertools.product(
# *[issue.value_generator(n=n_discretization) for issue in issues]
# )
points = np.asarray(
[[ufun(outcome) for ufun in ufuns] for outcome in outcomes], dtype=float
)
warn_if_slow(len(points), "Too many outcomes in the OS (Pareto Calculation)")
reservs = np.asarray(
[_.reserved_value if _ is not None else float("-inf") for _ in ufuns],
dtype=float,
)
rational_indices = np.all(points >= reservs, axis=1)
rational_indices = np.nonzero(rational_indices)[0]
points = points[rational_indices]
# rational_indices = [
# i for i, _ in enumerate(points) if all(a >= b for a, b in zip(_, reservs))
# ]
# points = [points[_] for _ in rational_indices]
pareto_indices = pareto_frontier_active(
points, sort_by_welfare=sort_by_welfare, eps=eps
)
return tuple(map(tuple, points[pareto_indices])), tuple(
rational_indices[pareto_indices]
)
# return [points[_] for _ in indices], [rational_indices[_] for _ in indices]
[docs]
def scale_max(
ufun: BaseUtilityFunction,
to: float = 1.0,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
) -> BaseUtilityFunction:
"""Normalizes a utility function to the given range.
Args:
ufun: The utility function to normalize
outcomes: A collection of outcomes to normalize for
rng: range to normalize to. Default is [0, 1]
epsilon: A small number specifying the resolution
Returns:
UtilityFunction: A utility function that is guaranteed to be normalized for the set of given outcomes
"""
return ufun.scale_max_for(
to, issues=issues, outcome_space=outcome_space, outcomes=outcomes
)
[docs]
def normalize(
ufun: BaseUtilityFunction,
to: tuple[float, float] = (0.0, 1.0),
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
) -> BaseUtilityFunction:
"""Normalizes a utility function to the given range.
Args:
ufun: The utility function to normalize
to: range to normalize to. Default is [0, 1]
outcomes: A collection of outcomes to normalize for
Returns:
UtilityFunction: A utility function that is guaranteed to be normalized for the set of given outcomes
"""
outcome_space = os_or_none(outcome_space, issues, outcomes)
if outcome_space is None:
return ufun.normalize(to)
return ufun.normalize_for(to, outcome_space=outcome_space)
[docs]
def sample_outcome_with_utility(
ufun: BaseUtilityFunction,
rng: tuple[float, float],
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
n_trials: int = 100,
) -> Outcome | None:
"""Gets one outcome within the given utility range or None on failure.
Args:
ufun: The utility function
rng: The utility range
outcome_space: The outcome-space within which to sample
issues: The issues the utility function is defined on
outcomes: The outcomes to sample from
n_trials: The maximum number of trials
Returns:
- Either issues, or outcomes should be given but not both
"""
return ufun.sample_outcome_with_utility(
rng, outcome_space, issues, outcomes, n_trials
)
[docs]
def extreme_outcomes(
ufun: UtilityFunction,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
max_cardinality=1000,
) -> tuple[Outcome, Outcome]:
"""Finds the best and worst outcomes.
Args:
ufun: The utility function
outcome_space: An outcome-space to consider
issues: list of issues (optional)
outcomes: A collection of outcomes (optional)
max_cardinality: the maximum number of outcomes to try sampling (if sampling is used and outcomes are not
given)
Returns:
Outcomes with minumum utility, maximum utility
"""
return ufun.extreme_outcomes(
outcome_space=outcome_space,
issues=issues,
outcomes=outcomes,
max_cardinality=max_cardinality,
)
[docs]
def minmax(
ufun: UtilityFunction,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | tuple[Issue, ...] | None = None,
outcomes: Sequence[Outcome] | tuple[Outcome, ...] | None = None,
max_cardinality=1000,
) -> tuple[float, float]:
"""Finds the range of the given utility function for the given outcomes.
Args:
ufun: The utility function
outcome_space: An outcome-space to consider
issues: list of issues (optional)
outcomes: A collection of outcomes (optional)
max_cardinality: the maximum number of outcomes to try sampling (if sampling is used and outcomes are not
given)
Returns:
Minumum utility, maximum utility
"""
return ufun.minmax(
outcome_space=outcome_space,
issues=issues,
outcomes=outcomes,
max_cardinality=max_cardinality,
)
[docs]
def opposition_level(
ufuns: Sequence[UtilityFunction],
max_utils: float | tuple[float | int, ...] | list[float | int] = 1.0,
outcomes: int | Sequence[Outcome] | None = None,
issues: Sequence[Issue] | None = None,
max_tests: int = 10000,
) -> float:
"""Finds the opposition level of the two ufuns defined as the minimum
distance to outcome (1, 1)
Args:
ufuns: A list of utility functions to use.
max_utils: A list of maximum utility value for each ufun (or a single number if they are equal).
outcomes: A list of outcomes (should be the complete issue space) or an integer giving the number
of outcomes. In the later case, ufuns should expect a tuple of a single integer.
issues: The issues (only used if outcomes is None).
max_tests: The maximum number of outcomes to use. Only used if issues is given and has more
outcomes than this value.
Examples:
- Opposition level of the same ufun repeated is always 0
>>> from negmas.preferences.crisp.mapping import MappingUtilityFunction
>>> from negmas.preferences.ops import opposition_level
>>> u1, u2 = lambda x: x[0], lambda x: x[0]
>>> opposition_level([u1, u2], outcomes=10, max_utils=9)
0.0
- Opposition level of two ufuns that are zero-sum
>>> u1, u2 = (
... MappingUtilityFunction(lambda x: x[0]),
... MappingUtilityFunction(lambda x: 9 - x[0]),
... )
>>> opposition_level([u1, u2], outcomes=10, max_utils=9)
0.7114582486036499
"""
if outcomes is None:
if issues is None:
raise ValueError("You must either give outcomes or issues")
outcomes = list(enumerate_issues(tuple(issues), max_cardinality=max_tests))
if isinstance(outcomes, int):
outcomes = [(_,) for _ in range(outcomes)]
if not isinstance(max_utils, Iterable):
max_utils = [max_utils] * len(ufuns)
if len(ufuns) != len(max_utils):
raise ValueError(
f"Cannot use {len(ufuns)} ufuns with only {len(max_utils)} max. utility values"
)
nearest_val = float("inf")
def is_irrational(outcome, ufun: BaseUtilityFunction):
try:
return ufun.is_worse(outcome, None)
except Exception:
try:
return ufun(outcome) < ufun(None)
except Exception:
return False
for outcome in outcomes:
if any(is_irrational(outcome, u) for u in ufuns):
continue
v = sum(
(1.0 - float(u(outcome)) / max_util) ** 2
if max_util
else (1.0 - float(u(outcome))) ** 2
for max_util, u in zip(max_utils, ufuns)
)
if v == float("inf"):
warnings.warn(
f"u is infinity: {outcome}, {[_(outcome) for _ in ufuns]}, max_utils",
warnings.NegmasNumericWarning,
)
if v < nearest_val:
nearest_val = v
return float(sqrt(nearest_val))
[docs]
def conflict_level(
u1: UtilityFunction,
u2: UtilityFunction,
outcomes: int | Sequence[Outcome],
max_tests: int = 10000,
) -> float:
"""Finds the conflict level in these two ufuns.
Args:
u1: first utility function
u2: second utility function
Examples:
- A nonlinear strictly zero sum case
>>> from negmas.preferences.crisp.mapping import MappingUtilityFunction
>>> from negmas.preferences import conflict_level
>>> outcomes = [(_,) for _ in range(10)]
>>> u1 = MappingUtilityFunction(
... dict(zip(outcomes, np.random.random(len(outcomes))))
... )
>>> u2 = MappingUtilityFunction(
... dict(zip(outcomes, 1.0 - np.array(list(u1.mapping.values()))))
... )
>>> print(conflict_level(u1=u1, u2=u2, outcomes=outcomes))
1.0
- The same ufun
>>> print(conflict_level(u1=u1, u2=u1, outcomes=outcomes))
0.0
- A linear strictly zero sum case
>>> outcomes = [(i,) for i in range(10)]
>>> u1 = MappingUtilityFunction(
... dict(zip(outcomes, np.linspace(0.0, 1.0, len(outcomes), endpoint=True)))
... )
>>> u2 = MappingUtilityFunction(
... dict(zip(outcomes, np.linspace(1.0, 0.0, len(outcomes), endpoint=True)))
... )
>>> print(conflict_level(u1=u1, u2=u2, outcomes=outcomes))
1.0
"""
if isinstance(outcomes, int):
outcomes = [(_,) for _ in range(outcomes)]
else:
outcomes = list(outcomes)
n_outcomes = len(outcomes)
if n_outcomes == 0:
raise ValueError("Cannot calculate conflit level with no outcomes")
points = np.array([[u1(o), u2(o)] for o in outcomes])
order = np.random.permutation(np.array(range(n_outcomes)))
p1, p2 = points[order, 0], points[order, 1]
signs = []
trial = 0
for i in range(n_outcomes - 1):
for j in range(i + 1, n_outcomes):
if trial >= max_tests:
break
trial += 1
o11, o12 = p1[i], p1[j]
o21, o22 = p2[i], p2[j]
if o12 == o11 and o21 == o22:
continue
signs.append(int((o12 > o11 and o21 > o22) or (o12 < o11 and o21 < o22)))
signs = np.asarray(signs)
# TODO: confirm this is correct
if len(signs) == 0:
return 1.0
return float(signs.mean())
[docs]
def winwin_level(
u1: UtilityFunction,
u2: UtilityFunction,
outcomes: int | Sequence[Outcome],
max_tests: int = 10000,
) -> float:
"""Finds the win-win level in these two ufuns.
Args:
u1: first utility function
u2: second utility function
Examples:
- A nonlinear same ufun case
>>> from negmas.preferences.crisp.mapping import MappingUtilityFunction
>>> outcomes = [(_,) for _ in range(10)]
>>> u1 = MappingUtilityFunction(
... dict(zip(outcomes, np.linspace(1.0, 0.0, len(outcomes), endpoint=True)))
... )
- A linear strictly zero sum case
>>> outcomes = [(_,) for _ in range(10)]
>>> u1 = MappingUtilityFunction(
... dict(zip(outcomes, np.linspace(0.0, 1.0, len(outcomes), endpoint=True)))
... )
>>> u2 = MappingUtilityFunction(
... dict(zip(outcomes, np.linspace(1.0, 0.0, len(outcomes), endpoint=True)))
... )
"""
if isinstance(outcomes, int):
outcomes = [(_,) for _ in range(outcomes)]
else:
outcomes = list(outcomes)
n_outcomes = len(outcomes)
points = np.array([[u1(o), u2(o)] for o in outcomes])
order = np.random.permutation(np.array(range(n_outcomes)))
p1, p2 = points[order, 0], points[order, 1]
signed_diffs = []
for trial, (i, j) in enumerate(zip(range(n_outcomes - 1), range(1, n_outcomes))):
if trial >= max_tests:
break
o11, o12 = p1[i], p1[j]
o21, o22 = p2[i], p2[j]
if o11 == o12:
if o21 == o22:
continue
else:
win = abs(o22 - o21)
elif o11 < o12:
if o21 == o22:
win = o12 - o11
else:
win = (o12 - o11) + (o22 - o21)
else:
if o21 == o22:
win = o11 - o12
else:
win = (o11 - o12) + (o22 - o21)
signed_diffs.append(win)
signed_diffs = np.asarray(signed_diffs)
if len(signed_diffs) == 0:
raise ValueError("Could not calculate any signs")
return signed_diffs.mean()
def get_ranks_bf(
ufun: UtilityFunction, outcomes: Sequence[Outcome | None]
) -> list[float]:
assert ufun.outcome_space is not None
assert ufun.outcome_space.is_discrete()
alloutcomes = list(ufun.outcome_space.enumerate_or_sample())
n = len(alloutcomes)
warn_if_slow(n, "Calculating Rank UFun is too Slow")
# vals: list[tuple[float, Outcome | None]]
u, o = sort_by_utility(
ufun, alloutcomes, best_first=True, return_sorted_outcomes=True
)
ordered: list[tuple[float, Outcome | None]]
ordered = list(zip(u, o))
# vals = [(ufun(_), _) for _ in alloutcomes]
# ordered = sorted(vals, reverse=True)
# insert null into its place
r = ufun.reserved_value
loc = n
if r is None:
r = float("-inf")
else:
for k, (u, o) in enumerate(ordered):
if u < r:
loc = k
break
if loc == n:
ordered.append((r, None))
else:
ordered.insert(loc, (r, None))
ordered_lists = list(zip(range(n, -1, -1), ordered, strict=True))
# mark outcomes with equal utils with the same rank
for i, (second, first) in enumerate(
zip(ordered_lists[1:], ordered_lists[:-1], strict=True)
):
if abs(first[1][0] - second[1][0]) < 1e-10:
ordered_lists[i + 1] = (first[0], second[1])
results = []
for outcome in outcomes:
for v in ordered_lists:
k, _ = v
u, o = _
if o == outcome:
results.append(k / n)
break
else:
raise ValueError(f"Could not find {outcome}")
return results
[docs]
def get_ranks(
ufun: UtilityFunction, outcomes: Sequence[Outcome | None], normalize=False
) -> list[float] | NDArray[np.floating[Any]]:
assert ufun.outcome_space is not None
assert ufun.outcome_space.is_discrete()
alloutcomes = (
list(ufun.outcome_space.enumerate_or_sample())
if not outcomes
else list(outcomes)
)
n = len(alloutcomes)
warn_if_slow(n, "Calculating Rank UFun is too Slow", lambda x: x * math.log(x))
r = ufun.reserved_value
changed = False
if r is None:
changed, ufun.reserved_value = True, float("-inf")
vals = np.asarray([ufun(_) for _ in alloutcomes + [None]])
if changed:
ufun.reserved_value = None # type: ignore
ranks = rankdata(vals, method="dense") - 1.0
if normalize:
ranks = ranks / np.max(ranks)
return ranks
# # insert null into its place
# # loc = n
# # r = ufun.reserved_value
# # loc = n
# # if r is None:
# # r = float("-inf")
# # else:
# # for k, (u, o) in enumerate(ordered):
# # if u < r:
# # loc = k
# # break
# # if loc == n:
# # ordered.append((r, None))
# # else:
# # ordered.insert(loc, (r, None))
#
# ordered = list(zip(range(n, -1, -1), ordered, strict=True))
# # mark outcomes with equal utils with the same rank
# for i, (second, first) in enumerate(zip(ordered[1:], ordered[:-1], strict=True)):
# if abs(first[1][0] - second[1][0]) < 1e-10:
# ordered[i + 1] = (first[0], second[1])
# results = []
# for outcome in outcomes:
# for v in ordered:
# k, _ = v
# u, o = _
# if o == outcome:
# results.append(k / n)
# break
# else:
# raise ValueError(f"Could not find {outcome}")
# return results
def make_rank_ufun(ufun: UtilityFunction, normalize: bool = False) -> UtilityFunction:
"""
Generates a ufun with the same ordering as the given one but with all differences
between reserved values being equal
Args:
ufun: input ufun.
normalize: if True, the resulting ranks will be normalized between 0 and 1.
"""
assert ufun.outcome_space is not None
assert ufun.outcome_space.is_discrete()
alloutcomes = list(ufun.outcome_space.enumerate_or_sample()) + [None]
ranks = get_ranks(ufun, alloutcomes, normalize=normalize)
reserved = ranks[-1]
return MappingUtilityFunction(
mapping=dict(zip(alloutcomes[:-1], ranks[:-1])),
outcome_space=ufun.outcome_space,
reserved_value=reserved,
)
# pareto_frontier_active = pareto_frontier_bf if NUMBA_OK else pareto_frontier_of
pareto_frontier_active = pareto_frontier_numpy
# pareto_frontier_of = pareto_frontier_numpy