from __future__ import annotations
import json
import math
import xml.etree.ElementTree as ET
from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable, Sequence, TypeVar
from negmas import warnings
from negmas.common import Value
from negmas.helpers.prob import Distribution, Real, ScipyDistribution
from negmas.helpers.types import get_full_type_name
from negmas.outcomes import Issue, Outcome, dict2outcome
from negmas.outcomes.common import check_one_at_most, os_or_none
from negmas.outcomes.issue_ops import issues_from_geniusweb_json
from negmas.outcomes.outcome_space import make_os
from negmas.outcomes.protocols import IndependentIssuesOS, OutcomeSpace
from negmas.preferences.value_fun import TableFun
from negmas.serialization import PYTHON_CLASS_IDENTIFIER, deserialize, serialize
from negmas.warnings import warn_if_slow
from .preferences import Preferences
from .protocols import InverseUFun
from .value_fun import make_fun_from_xml
if TYPE_CHECKING:
from negmas.preferences import (
ConstUtilityFunction,
ProbUtilityFunction,
UtilityFunction,
WeightedUtilityFunction,
)
__all__ = ["BaseUtilityFunction"]
MAX_CARDINALITY = 10_000_000_000
T = TypeVar("T", bound="BaseUtilityFunction")
# PartiallyScalable,
# HasRange,
# HasReservedValue,
# StationaryConvertible,
# OrdinalRanking,
# CardinalRanking,
# BasePref,
[docs]
class BaseUtilityFunction(Preferences, ABC):
"""
Base class for all utility functions in negmas
"""
def __init__(
self,
*args,
reserved_value: float = float("-inf"),
invalid_value: float | None = None,
**kwargs,
):
super().__init__(*args, **kwargs)
self.reserved_value = reserved_value
self._cached_inverse: InverseUFun | None = None
self._cached_inverse_type: type[InverseUFun] | None = None
self._invalid_value = invalid_value
[docs]
@abstractmethod
def eval(self, offer: Outcome) -> Value:
...
[docs]
def to_stationary(self: T) -> T:
raise NotImplementedError(
f"I do not know how to convert a ufun of type {self.type_name} to a stationary ufun."
)
[docs]
def extreme_outcomes(
self,
outcome_space: OutcomeSpace | None = None,
issues: Iterable[Issue] | None = None,
outcomes: Iterable[Outcome] | None = None,
max_cardinality=100_000,
) -> tuple[Outcome, Outcome]:
check_one_at_most(outcome_space, issues, outcomes)
outcome_space = os_or_none(outcome_space, issues, outcomes)
if not outcome_space:
outcome_space = self.outcome_space
if outcome_space and not outcomes:
outcomes = outcome_space.enumerate_or_sample(
max_cardinality=max_cardinality
)
if not outcomes:
raise ValueError("Cannot find outcomes to use for finding extremes")
mn, mx = float("inf"), float("-inf")
worst, best = None, None
warn_if_slow(len(list(outcomes)), "Extreme Outcomes too Slow")
for o in outcomes:
u = self(o)
if u < mn:
worst, mn = o, u
if u > mx:
best, mx = o, u
if worst is None or best is None:
raise ValueError(f"Cound not find worst and best outcomes for {self}")
return worst, best
[docs]
def minmax(
self,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
max_cardinality=1000,
above_reserve=False,
) -> tuple[float, float]:
"""Finds the range of the given utility function for the given outcomes
Args:
self: The utility function
issues: List of issues (optional)
outcomes: A collection of outcomes (optional)
max_cardinality: the maximum number of outcomes to try sampling (if sampling is used and outcomes are not given)
above_reserve: If given, the minimum and maximum will be set to reserved value if they were less than it.
Returns:
(lowest, highest) utilities in that order
"""
(worst, best) = self.extreme_outcomes(
outcome_space, issues, outcomes, max_cardinality
)
w, b = self(worst), self(best)
if isinstance(w, Distribution):
w = w.min
if isinstance(b, Distribution):
b = b.max
if above_reserve:
r = self.reserved_value
if r is None:
return w, b
if b < r:
b, w = r, r
elif w < r:
w = r
return w, b
@property
def reserved_distribution(self) -> Distribution:
return ScipyDistribution(type="uniform", loc=self.reserved_value, scale=0.0)
[docs]
def max(self) -> Value:
_, mx = self.minmax()
return mx
[docs]
def min(self) -> Value:
mn, _ = self.minmax()
return mn
[docs]
def best(self) -> Outcome:
_, mx = self.extreme_outcomes()
return mx
[docs]
def worst(self) -> Outcome:
mn, _ = self.extreme_outcomes()
return mn
[docs]
def eval_normalized(
self,
offer: Outcome | None,
above_reserve: bool = True,
expected_limits: bool = True,
) -> Value:
"""
Evaluates the ufun normalizing the result between zero and one
Args:
offer (Outcome | None): offer
above_reserve (bool): If True, zero corresponds to the reserved value not the minimum
expected_limits (bool): If True, the expectation of the utility limits will be used for normalization instead of the maximum range and minimum lowest limit
Remarks:
- If the maximum and the minium are equal, finite and above reserve, will return 1.0.
- If the maximum and the minium are equal, initinte or below reserve, will return 0.0.
- For probabilistic ufuns, a distribution will still be returned.
- The minimum and maximum will be evaluated freshly every time. If they are already caached in the ufun, the cache will be used.
"""
r = self.reserved_value
u = self.eval(offer) if offer else r
mn, mx = self.minmax()
if above_reserve:
if mx < r:
mx = mn = float("-inf")
elif mn < r:
mn = r
d = mx - mn
if isinstance(d, Distribution):
d = float(d) if expected_limits else d.max
if isinstance(mn, Distribution):
mn = float(mn) if expected_limits else mn.min
if d < 1e-5:
warnings.warn(
f"Ufun has equal max and min. The outcome will be normalized to zero if they were finite otherwise 1.0: {mn=}, {mx=}, {r=}, {u=}"
)
return 1.0 if math.isfinite(mx) else 0.0
d = 1 / d
return (u - mn) * d
[docs]
def invert(
self, inverter: type[InverseUFun] | None = None, **kwargs
) -> InverseUFun:
"""
Inverts the ufun, initializes it and caches the result.
"""
from .inv_ufun import PresortingInverseUtilityFunction
if self._cached_inverse and (
inverter is None or self._cached_inverse_type == inverter
):
return self._cached_inverse
if inverter is None:
inverter = PresortingInverseUtilityFunction
self._cached_inverse_type = inverter
self._cached_inverse = inverter(self, **kwargs)
self._cached_inverse.init()
return self._cached_inverse
[docs]
def forget_inverter(self):
"""Deletes the cached inverter."""
self._cached_inverse = None
[docs]
def is_volatile(self) -> bool:
return True
[docs]
def is_session_dependent(self) -> bool:
return True
[docs]
def is_state_dependent(self) -> bool:
return True
[docs]
def scale_by(
self: T, scale: float, scale_reserved=True
) -> WeightedUtilityFunction | T:
if scale < 0:
raise ValueError(f"Cannot scale with a negative multiplier ({scale})")
from negmas.preferences.complex import WeightedUtilityFunction
r = (scale * self.reserved_value) if scale_reserved else self.reserved_value
return WeightedUtilityFunction(
ufuns=[self], weights=[scale], name=self.name, reserved_value=r
)
[docs]
def scale_min_for(
self: T,
to: float,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
rng: tuple[float, float] | None = None,
) -> T:
if rng is None:
mn, _ = self.minmax(outcome_space, issues, outcomes)
else:
mn, _ = rng
scale = to / mn
return self.scale_by(scale)
[docs]
def scale_min(self: T, to: float, rng: tuple[float, float] | None = None) -> T:
return self.scale_min_for(to, outcome_space=self.outcome_space, rng=rng)
[docs]
def scale_max_for(
self: T,
to: float,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
rng: tuple[float, float] | None = None,
) -> T:
if rng is None:
_, mx = self.minmax(outcome_space, issues, outcomes)
else:
_, mx = rng
scale = to / mx
return self.scale_by(scale)
[docs]
def scale_max(self: T, to: float, rng: tuple[float, float] | None = None) -> T:
return self.scale_max_for(to, outcome_space=self.outcome_space, rng=rng)
[docs]
def normalize_for(
self: T,
to: tuple[float, float] = (0.0, 1.0),
outcome_space: OutcomeSpace | None = None,
) -> T | ConstUtilityFunction:
max_cardinality: int = MAX_CARDINALITY
if not outcome_space:
outcome_space = self.outcome_space
if not outcome_space:
raise ValueError(
"Cannot find the outcome-space to normalize for. "
"You must pass outcome_space, issues or outcomes or have the ufun being constructed with one of them"
)
mn, mx = self.minmax(outcome_space, max_cardinality=max_cardinality)
d = float(mx - mn)
if d < 1e-7:
from negmas.preferences.crisp.const import ConstUtilityFunction
return ConstUtilityFunction(
0.0 if mx < self.reserved_value else 1.0,
outcome_space=self.outcome_space,
name=self.name,
reserved_value=1.0 if mn < self.reserved_value else 0.0,
)
scale = float(to[1] - to[0]) / d
u = self.scale_by(scale, scale_reserved=True)
return u.shift_by(to[0] - scale * mn, shift_reserved=True)
[docs]
def normalize(
self: T, to: tuple[float, float] = (0.0, 1.0), normalize_weights: bool = False
) -> T | ConstUtilityFunction:
_ = normalize_weights
from negmas.preferences import ConstUtilityFunction
if not self.outcome_space:
raise ValueError("Cannot normalize a ufun without an outcome-space")
mn, mx = self.minmax(self.outcome_space, max_cardinality=MAX_CARDINALITY)
d = float(mx - mn)
if d < 1e-8:
return ConstUtilityFunction(
0.0 if mx < self.reserved_value else 1.0,
name=self.name,
reserved_value=1.0 if mn < self.reserved_value else 0.0,
)
scale = float(to[1] - to[0]) / d
# u = self.shift_by(-mn, shift_reserved=True)
u = self.scale_by(scale, scale_reserved=True)
return u.shift_by(to[0] - scale * mn, shift_reserved=True)
[docs]
def shift_by(
self: T, offset: float, shift_reserved=True
) -> WeightedUtilityFunction | T:
from negmas.preferences.complex import WeightedUtilityFunction
from negmas.preferences.crisp.const import ConstUtilityFunction
r = (self.reserved_value + offset) if shift_reserved else self.reserved_value
return WeightedUtilityFunction(
ufuns=[self, ConstUtilityFunction(offset)],
weights=[1, 1],
name=self.name,
reserved_value=r,
)
[docs]
def shift_min_for(
self: T,
to: float,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
rng: tuple[float, float] | None = None,
) -> T:
if rng is None:
mn, _ = self.minmax(outcome_space, issues, outcomes)
else:
mn, _ = rng
offset = to - mn
return self.shift_by(offset)
[docs]
def shift_max_for(
self: T,
to: float,
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
rng: tuple[float, float] | None = None,
) -> T:
if rng is None:
_, mx = self.minmax(outcome_space, issues, outcomes)
else:
_, mx = rng
offset = to - mx
return self.shift_by(offset)
def _do_rank(self, vals, descending):
vals = sorted(vals, key=lambda x: x[1], reverse=descending)
if not vals:
return []
ranks = [([vals[0][0]], vals[0][1])]
for w, v in vals[1:]:
if v == ranks[-1][1]:
ranks[-1][0].append(w)
continue
ranks.append(([w], v))
return ranks
[docs]
def argrank_with_weights(
self, outcomes: Sequence[Outcome | None], descending=True
) -> list[tuple[list[Outcome | None], float]]:
"""
Ranks the given list of outcomes with weights. None stands for the null outcome.
Returns:
- A list of tuples each with two values:
- an list of integers giving the index in the input array (outcomes) of an outcome (at the given utility level)
- the weight of that outcome
- The list is sorted by weights descendingly
"""
vals = zip(range(len(list(outcomes))), (self(_) for _ in outcomes))
return self._do_rank(vals, descending)
[docs]
def argrank(
self, outcomes: Sequence[Outcome | None], descending=True
) -> list[list[Outcome | None]]:
"""
Ranks the given list of outcomes with weights. None stands for the null outcome.
Returns:
A list of lists of integers giving the outcome index in the input. The list is sorted by utlity value
"""
ranks = self.argrank_with_weights(outcomes, descending)
return [_[0] for _ in ranks]
[docs]
def rank_with_weights(
self, outcomes: Sequence[Outcome | None], descending=True
) -> list[tuple[list[Outcome | None], float]]:
"""
Ranks the given list of outcomes with weights. None stands for the null outcome.
Returns:
- A list of tuples each with two values:
- an list of integers giving the index in the input array (outcomes) of an outcome (at the given utility level)
- the weight of that outcome
- The list is sorted by weights descendingly
"""
vals = zip(outcomes, (self(_) for _ in outcomes))
return self._do_rank(vals, descending)
[docs]
def rank(
self, outcomes: Sequence[Outcome | None], descending=True
) -> list[list[Outcome | None]]:
"""
Ranks the given list of outcomes with weights. None stands for the null outcome.
Returns:
A list of lists of integers giving the outcome index in the input. The list is sorted by utlity value
"""
ranks = self.rank_with_weights(outcomes, descending)
return [_[0] for _ in ranks]
[docs]
def eu(self, offer: Outcome | None) -> float:
"""
calculates the **expected** utility value of the input outcome
"""
return float(self(offer))
[docs]
def to_crisp(self) -> UtilityFunction:
from negmas.preferences.crisp_ufun import CrispAdapter
return CrispAdapter(self)
[docs]
def to_prob(self) -> ProbUtilityFunction:
from negmas.preferences.prob_ufun import ProbAdapter
return ProbAdapter(self)
[docs]
def to_dict(
self, python_class_identifier=PYTHON_CLASS_IDENTIFIER
) -> dict[str, Any]:
d = {python_class_identifier: get_full_type_name(type(self))}
return dict(
**d,
outcome_space=serialize(
self.outcome_space, python_class_identifier=python_class_identifier
),
reserved_value=self.reserved_value,
name=self.name,
id=self.id,
)
[docs]
@classmethod
def from_dict(cls, d, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d.pop(python_class_identifier, None)
d["outcome_space"] = deserialize(
d.get("outcome_space", None),
python_class_identifier=python_class_identifier,
)
return cls(**d)
[docs]
def sample_outcome_with_utility(
self,
rng: tuple[float, float],
outcome_space: OutcomeSpace | None = None,
issues: Sequence[Issue] | None = None,
outcomes: Sequence[Outcome] | None = None,
n_trials: int = 100,
) -> Outcome | None:
"""
Samples an outcome in the given utiltity range or return None if not possible
Args:
rng (Tuple[float, float]): rng
outcome_space (OutcomeSpace | None): outcome_space
issues (Sequence[Issue] | None): issues
outcomes (Sequence[Outcome] | None): outcomes
n_trials (int): n_trials
Returns:
Optional["Outcome"]:
"""
if rng[0] is None:
rng = (float("-inf"), rng[1])
if rng[1] is None:
rng = (rng[0], float("inf"))
outcome_space = os_or_none(outcome_space, issues, outcomes)
if not outcome_space:
outcome_space = self.outcome_space
if not outcome_space:
raise ValueError("No outcome-space is given or defined for the ufun")
if outcome_space.cardinality < n_trials:
n_trials = outcome_space.cardinality # type: ignore I know that it is an int (see the if)
for o in outcome_space.sample(n_trials, with_replacement=False):
if o is None:
continue
assert (
o in outcome_space
), f"Sampled outcome {o} which is not in the outcome-space {outcome_space}"
if rng[0] - 1e-6 <= float(self(o)) <= rng[1] + 1e-6:
return o
return None
[docs]
@classmethod
def from_xml_str(
cls,
xml_str: str,
issues: Iterable[Issue] | Sequence[Issue],
safe_parsing=True,
ignore_discount=False,
ignore_reserved=False,
name: str | None = None,
) -> tuple[BaseUtilityFunction | None, float | None]:
"""Imports a utility function from a GENIUS XML string.
Args:
xml_str (str): The string containing GENIUS style XML utility function definition
issues (Sequence[Issue] | None): Optional issue space to confirm that the utility function is valid
product of all issues in the input
safe_parsing (bool): Turn on extra checks
Returns:
A utility function object (depending on the input file)
Examples:
>>> from negmas.preferences import UtilityFunction
>>> import pkg_resources
>>> from negmas.inout import load_genius_domain
>>> domain = load_genius_domain(
... pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml"
... )
... )
>>> with open(
... pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-prof1.xml"
... ),
... "r",
... ) as ff:
... u, _ = UtilityFunction.from_xml_str(ff.read(), issues=domain.issues)
>>> with open(
... pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-prof1.xml"
... ),
... "r",
... ) as ff:
... u, _ = UtilityFunction.from_xml_str(ff.read(), issues=domain.issues)
>>> assert abs(u(("Dell", "60 Gb", "19'' LCD")) - 21.987727736172488) < 0.000001
>>> assert abs(u(("HP", "80 Gb", "20'' LCD")) - 22.68559475583014) < 0.000001
"""
from negmas.preferences.complex import WeightedUtilityFunction
from negmas.preferences.crisp.linear import (
AffineUtilityFunction,
LinearAdditiveUtilityFunction,
)
from negmas.preferences.crisp.nonlinear import HyperRectangleUtilityFunction
root = ET.fromstring(xml_str)
if safe_parsing and root.tag != "utility_space":
raise ValueError(f"Root tag is {root.tag}: Expected utility_space")
issues = list(issues)
ordered_issues: list[Issue] = []
domain_issues_dict: dict[str, Issue] | None = None
ordered_issues = issues
domain_issues_dict = dict(zip([_.name for _ in issues], issues))
# issue_indices = dict(zip([_.name for _ in issues], range(len(issues))))
objective = None
reserved_value = 0.0
discount_factor = 0.0
for child in root:
if child.tag == "objective":
objective = child
elif child.tag == "reservation":
reserved_value = float(child.attrib["value"])
elif child.tag == "discount_factor":
discount_factor = float(child.attrib["value"])
if objective is None:
objective = root
weights = {}
found_issues = {}
issue_info = {}
issue_keys = {}
rects, rect_utils = [], []
all_numeric = True
global_bias = 0
def _get_hyperrects(ufun, max_utility, utiltype=float):
utype = ufun.attrib.get("type", "none")
uweight = float(ufun.attrib.get("weight", 1))
uagg = ufun.attrib.get("aggregation", "sum")
if uagg != "sum":
raise ValueError(
f"Hypervolumes combined using {uagg} are not supported (only sum is supported)"
)
total_util = utiltype(0)
rects = []
rect_utils = []
if utype == "PlainUfun":
for rect in ufun:
util = utiltype(rect.attrib.get("utility", 0))
total_util += util if util > 0 else 0
ranges = {}
rect_utils.append(util * uweight)
for r in rect:
ii = int(r.attrib["index"]) - 1
# key = issue_keys[ii]
ranges[ii] = (
utiltype(r.attrib["min"]),
utiltype(r.attrib["max"]),
)
rects.append(ranges)
else:
raise ValueError(f"Unknown ufun type {utype}")
total_util = total_util if not max_utility else max_utility
return rects, rect_utils
for child in objective:
if child.tag == "weight":
indx = int(child.attrib["index"]) - 1
if indx < 0 or indx >= len(issues):
global_bias += float(child.attrib["value"])
continue
weights[issues[indx].name] = float(child.attrib["value"])
elif child.tag == "utility_function" or child.tag == "utility":
utility_tag = child
max_utility = child.attrib.get("maxutility", None)
if max_utility is not None:
max_utility = float(max_utility)
ufun_found = False
for ufun in utility_tag:
if ufun.tag == "ufun":
ufun_found = True
_r, _u = _get_hyperrects(ufun, max_utility)
rects += _r
rect_utils += _u
if not ufun_found:
raise ValueError(
"Cannot find ufun tag inside a utility_function tag"
)
elif child.tag == "issue":
indx = int(child.attrib["index"]) - 1
issue_key = child.attrib["name"]
if (
domain_issues_dict is not None
and issue_key not in domain_issues_dict.keys()
):
raise ValueError(
f"Issue {issue_key} is not in the input issue names ({domain_issues_dict.keys()})"
)
issue_info[issue_key] = {"name": issue_key, "index": indx}
issue_keys[indx] = issue_key
info = {"type": "discrete", "etype": "discrete", "vtype": "discrete"}
for a in ("type", "etype", "vtype"):
info[a] = child.attrib.get(a, info[a])
issue_info[issue_key].update(info)
mytype = info["type"]
# vtype = info["vtype"]
if domain_issues_dict is None:
raise ValueError("unknown domain-issue-dict!!!")
current_issue = domain_issues_dict[issue_key]
if mytype == "discrete":
found_issues[issue_key] = dict()
if current_issue.is_continuous():
raise ValueError(
f"Got a {mytype} issue but expected a continuous valued issue"
)
elif mytype in ("integer", "real"):
lower = current_issue.min_value
upper = current_issue.max_value
lower, upper = (
child.attrib.get("lowerbound", lower),
child.attrib.get("upperbound", upper),
)
for rng_child in child:
if rng_child.tag == "range":
lower, upper = (
rng_child.attrib.get("lowerbound", lower),
rng_child.attrib.get("upperbound", upper),
)
if mytype == "integer":
if current_issue.is_continuous():
raise ValueError(
f"Got a {mytype} issue but expected a continuous valued issue"
)
lower, upper = int(lower), int(upper) # type: ignore
else:
lower, upper = float(lower), float(upper) # type: ignore
if (
lower < current_issue.min_value
or upper > current_issue.max_value
): # type: ignore
raise ValueError(
f"Bounds ({lower}, {upper}) are invalid for issue {issue_key} with bounds: "
f"{current_issue.values}"
)
else:
raise ValueError(f"Unknown type: {mytype}")
# now we found ranges for range issues and will find values for all issues
found_values = False
for item in child:
if item.tag == "item":
if mytype != "discrete":
raise ValueError(
f"cannot specify item utilities for not-discrete type: {mytype}"
)
all_numeric = False
item_indx = int(item.attrib["index"]) - 1
item_name = item.attrib.get("value", None)
if item_name is None:
warnings.warn(
f"An item without a value at index {item_indx} for issue {issue_key}",
warnings.NegmasIOWarning,
)
continue
# may be I do not need this
if current_issue.is_integer():
item_name = int(item_name)
if current_issue.is_float():
item_name = float(item_name)
if not current_issue.is_valid(item_name):
raise ValueError(
f"Value {item_name} is not in the domain issue values: "
f"{current_issue.values}"
)
val = item.attrib.get("evaluation", None)
if val is None:
raise ValueError(
f"Item {item_name} of issue {issue_key} has no evaluation attribute!!"
)
float(val)
found_issues[issue_key][item_name] = float(val)
found_values = True
issue_info[issue_key]["map_type"] = "dict"
elif item.tag == "evaluator":
_f, _name = make_fun_from_xml(item)
found_issues[issue_key] = _f
issue_info[issue_key]["map_type"] = _name
found_values = True
if not found_values and issue_key in found_issues.keys():
found_issues.pop(issue_key, None)
# add utilities specified not as hyper-rectangles
if not all_numeric and all(_.is_numeric() for _ in issues):
raise ValueError(
"Some found issues are not numeric but all input issues are"
)
u = None
if len(found_issues) > 0:
if all_numeric:
slopes, biases, ws = [], [], []
for key in (_.name for _ in issues):
if key in found_issues:
slopes.append(found_issues[key].slope)
biases.append(found_issues[key].bias)
else:
slopes.append(0.0)
biases.append(0.0)
ws.append(weights.get(key, 1.0))
bias = 0.0
for b, w in zip(biases, ws):
bias += b * w
for i, s in enumerate(slopes):
ws[i] *= s
u = AffineUtilityFunction(
weights=ws,
outcome_space=make_os(ordered_issues),
bias=bias + global_bias,
)
else:
u = LinearAdditiveUtilityFunction(
values=found_issues,
weights=weights,
outcome_space=make_os(ordered_issues),
bias=global_bias,
)
if len(rects) > 0:
uhyper = HyperRectangleUtilityFunction(
outcome_ranges=rects,
utilities=rect_utils,
name=name,
outcome_space=make_os(ordered_issues),
bias=global_bias,
)
if u is None:
u = uhyper
else:
u = WeightedUtilityFunction(
ufuns=[u, uhyper],
weights=[1.0, 1.0],
name=name,
outcome_space=make_os(ordered_issues),
)
if u is None:
raise ValueError("No issues found")
if not ignore_reserved:
u.reserved_value = reserved_value
u.name = name
# if not ignore_discount and discount_factor != 0.0:
# from negmas.preferences.discounted import ExpDiscountedUFun
# u = ExpDiscountedUFun(ufun=u, discount=discount_factor, name=name)
if ignore_discount:
discount_factor = None
return u, discount_factor
[docs]
@classmethod
def from_genius(
cls, file_name: PathLike | str, **kwargs
) -> tuple[BaseUtilityFunction | None, float | None]:
"""Imports a utility function from a GENIUS XML file.
Args:
file_name (str): File name to import from
Returns:
A utility function object (depending on the input file)
Examples:
>>> from negmas.preferences import UtilityFunction
>>> import pkg_resources
>>> from negmas.inout import load_genius_domain
>>> domain = load_genius_domain(
... pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml"
... )
... )
>>> u, d = UtilityFunction.from_genius(
... file_name=pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-prof1.xml"
... ),
... issues=domain.issues,
... )
>>> u.__class__.__name__
'LinearAdditiveUtilityFunction'
>>> u.reserved_value
0.0
>>> d
1.0
Remarks:
See ``from_xml_str`` for all the parameters
"""
kwargs["name"] = str(file_name)
with open(file_name) as f:
xml_str = f.read()
return cls.from_xml_str(xml_str=xml_str, **kwargs)
[docs]
@classmethod
def from_geniusweb_json_str(
cls,
json_str: str | dict,
safe_parsing=True,
issues: Iterable[Issue] | Sequence[Issue] | None = None,
ignore_discount=False,
ignore_reserved=False,
use_reserved_outcome=False,
name: str | None = None,
) -> tuple[BaseUtilityFunction | None, float | None]:
"""Imports a utility function from a GeniusWeb JSON string.
Args:
json_str (str): The string containing GENIUS style XML utility function definition
issues (Sequence[Issue] | None): Optional issue space to confirm that the utility function is valid
product of all issues in the input
safe_parsing (bool): Turn on extra checks
Returns:
A utility function object (depending on the input file)
"""
from negmas.preferences.crisp.linear import LinearAdditiveUtilityFunction
_ = safe_parsing
if isinstance(json_str, str):
d = json.loads(json_str)
else:
d = json_str
reserved_outcome, discount_factor, u = None, 1.0, None
if "LinearAdditiveUtilitySpace" in d.keys():
udict = d["LinearAdditiveUtilitySpace"]
domain = (
make_os(issues_from_geniusweb_json(udict["domain"])[0])
if "domain" in udict.keys()
else None
)
if domain is None and issues is not None:
domain = make_os(tuple(issues))
discount_factor = (
udict.get("discount_factor", 1.0) if not ignore_discount else 1.0
)
reserved_value = udict.get("reserved_value", None)
uname = udict.get("name", name)
reserved_dict = udict.get("reservationBid", dict()).get("issuevalues", None)
if reserved_dict and not ignore_reserved:
reserved_outcome = dict2outcome(reserved_dict, issues=domain.issues) # type: ignore
weights = udict.get("issueWeights", None)
utils = udict.get("issueUtilities", dict())
values = dict()
for iname, idict in utils.items():
vals = idict.get("discreteutils", dict()).get("valueUtilities", dict())
values[iname] = TableFun(vals)
u = LinearAdditiveUtilityFunction(
values=values,
weights=weights,
bias=0.0,
name=uname,
reserved_outcome=None,
reserved_value=None,
outcome_space=domain,
)
if not ignore_reserved:
if reserved_outcome and not use_reserved_outcome:
reserved_value = u(reserved_outcome)
if use_reserved_outcome:
u.reserved_outcome = reserved_outcome # type: ignore
u.reserved_value = reserved_value # type: ignore
return u, discount_factor
[docs]
@classmethod
def from_geniusweb(
cls, file_name: PathLike | str, **kwargs
) -> tuple[BaseUtilityFunction | None, float | None]:
"""Imports a utility function from a GeniusWeb json file.
Args:
file_name (str): File name to import from
Returns:
A utility function object (depending on the input file)
Remarks:
See ``from_geniusweb_json_str`` for all the parameters
"""
kwargs["name"] = str(file_name)
with open(file_name) as f:
xml_str = f.read()
return cls.from_geniusweb_json_str(json_str=xml_str, **kwargs)
[docs]
def to_xml_str(
self, issues: Iterable[Issue] | None = None, discount_factor=None
) -> str:
"""
Exports a utility function to a well formatted string
"""
if not hasattr(self, "xml"):
raise ValueError(
f"ufun of type {self.__class__.__name__} has no xml() member and cannot be saved to XML string\nThe ufun params: {self.to_dict()}"
)
if issues is None:
if not isinstance(self.outcome_space, IndependentIssuesOS):
raise ValueError(
"Cannot convert to xml because the outcome-space of the ufun is not a cartesian outcome space"
)
issues = self.outcome_space.issues
n_issues = 0
else:
issues = list(issues)
n_issues = len(issues)
output = (
f'<utility_space type="any" number_of_issues="{n_issues}">\n'
f'<objective index="1" etype="objective" type="objective" description="" name="any">\n'
)
output += self.xml(issues=issues) # type: ignore
if "</objective>" not in output:
output += "</objective>\n"
if discount_factor is not None:
output += f'<discount_factor value="{discount_factor}" />\n'
if (
self.reserved_value is not None
and self.reserved_value != float("-inf")
and "<reservation value" not in output
):
output += f'<reservation value="{self.reserved_value}" />\n'
if "</utility_space>" not in output:
output += "</utility_space>\n"
return output
[docs]
def to_genius(
self, file_name: PathLike | str, issues: Iterable[Issue] | None = None, **kwargs
):
"""
Exports a utility function to a GENIUS XML file.
Args:
file_name (str): File name to export to
u: utility function
issues: The issues being considered as defined in the domain
Returns:
None
Examples:
>>> from negmas.preferences import UtilityFunction
>>> from negmas.inout import load_genius_domain
>>> import pkg_resources
>>> domain = load_genius_domain(
... domain_file_name=pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml"
... )
... )
>>> u, d = UtilityFunction.from_genius(
... file_name=pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/Laptop/Laptop-C-prof1.xml"
... ),
... issues=domain.issues,
... )
>>> u.to_genius(
... discount_factor=d,
... file_name=pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/LaptopConv/Laptop-C-prof1.xml"
... ),
... issues=domain.issues,
... )
Remarks:
See ``to_xml_str`` for all the parameters
"""
file_name = Path(file_name).absolute()
if file_name.suffix == "":
file_name = file_name.parent / f"{file_name.stem}.xml"
with open(file_name, "w") as f:
f.write(self.to_xml_str(issues=issues, **kwargs))
[docs]
def difference_prob(
self, first: Outcome | None, second: Outcome | None
) -> Distribution:
"""
Returns a numeric difference between the utility of the two given outcomes
"""
f, s = self(first), self(second)
if not isinstance(f, Distribution):
f = Real(f)
if not isinstance(s, Distribution):
s = Real(s)
return f - s
[docs]
def is_not_worse(self, first: Outcome | None, second: Outcome | None) -> bool:
return self.difference_prob(first, second) >= 0.0
[docs]
def difference(self, first: Outcome | None, second: Outcome | None) -> float:
"""
Returns a numeric difference between the utility of the two given outcomes
"""
return float(self(first)) - float(self(second))
[docs]
def __call__(self, offer: Outcome | None) -> Value:
"""
Calculate the utility for a given outcome at the given negotiation state.
Args:
offer: The offer to be evaluated.
Remarks:
- It calls the abstract method `eval` after opationally adjusting the
outcome type.
- It is preferred to override eval instead of directly overriding this method
- You cannot return None from overriden eval() functions but raise an exception (ValueError) if it was
not possible to calculate the Value.
- Return a float from your `eval` implementation.
- Return the reserved value if the offer was None
Returns:
The utility of the given outcome
"""
if offer is None:
return self.reserved_value # type: ignore I know that concrete subclasses will be returning the correct type
if (
self._invalid_value is not None
and self.outcome_space
and offer not in self.outcome_space
):
return self._invalid_value
return self.eval(offer)
class _FullyStatic:
"""
Used internally to indicate that the ufun can **NEVER** change due to anything.
"""
def is_session_dependent(self) -> bool:
return False
def is_volatile(self) -> bool:
return False
def is_state_dependent(self) -> bool:
return False
def is_stationary(self) -> bool:
return True
class _ExtremelyDynamic:
"""
Used internally to indicate that the ufun can change due to anything.
"""
def is_session_dependent(self) -> bool:
return True
def is_volatile(self) -> bool:
return True
def is_state_dependent(self) -> bool:
return True
def is_stationary(self) -> bool:
return False