from __future__ import annotations
from typing import Callable, Iterable
from negmas.generics import GenericMapping, gmap, ikeys
from negmas.helpers import get_full_type_name
from negmas.outcomes import Issue, Outcome, OutcomeRange, outcome_in_range
from negmas.outcomes.outcome_space import CartesianOutcomeSpace
from negmas.serialization import PYTHON_CLASS_IDENTIFIER, deserialize, serialize
from ..base import OutcomeUtilityMapping
from ..crisp_ufun import UtilityFunction
from ..mixins import StationaryMixin
__all__ = [
"NonLinearAggregationUtilityFunction",
"HyperRectangleUtilityFunction",
"NonlinearHyperRectangleUtilityFunction",
]
[docs]
class NonLinearAggregationUtilityFunction(StationaryMixin, UtilityFunction):
r"""A nonlinear utility function.
Allows for the modeling of a single nonlinear utility function that combines the utilities of different issues.
Args:
values: A set of mappings from issue values to utility functions. These are generic mappings so
`Callable` (s) and `Mapping` (s) are both accepted
f: A nonlinear function mapping from a dict of utility_function-per-issue to a float
name: name of the utility function. If None a random name will be generated.
Notes:
The utility is calculated as:
.. math::
u = f\left(u_0\left(i_0\right), u_1\left(i_1\right), ..., u_n\left(i_n\right)\right)
where :math:`u_j()` is the utility function for issue :math:`j` and :math:`i_j` is value of issue :math:`j` in the
evaluated outcome.
Examples:
>>> from negmas.outcomes import make_issue
>>> from negmas.preferences.crisp.mapping import MappingUtilityFunction
>>> issues = [
... make_issue((10.0, 20.0), "price"),
... make_issue(["delivered", "not delivered"], "delivery"),
... make_issue(5, "quality"),
... ]
>>> print(list(map(str, issues)))
['price: (10.0, 20.0)', "delivery: ['delivered', 'not delivered']", 'quality: (0, 4)']
>>> g = NonLinearAggregationUtilityFunction(
... {
... "price": lambda x: 2.0 * x,
... "delivery": {"delivered": 10, "not delivered": -10},
... "quality": MappingUtilityFunction(lambda x: x - 3),
... },
... f=lambda u: u[0] + 2.0 * u[-1],
... issues=issues,
... )
>>> g((14.0, "delivered", 2)) - ((2.0 * 14.0) + 2.0 * (2 - 3))
0.0
You must pass a value for each issue in the outcome. If some issues are not used for the ufun, you can pass them as any value that is acceptable to the corresponding value function
>>> g = NonLinearAggregationUtilityFunction(
... {
... "price": lambda x: 2.0 * x,
... "delivery": {"delivered": 10, "not delivered": -10},
... },
... f=lambda u: 2.0 * u[0],
... issues=issues[:2],
... )
>>> g((14.0, "delivered")) - (2.0 * (2.0 * 14))
0.0
"""
def __init__(
self,
values: dict[str, GenericMapping] | list[GenericMapping] | None,
f: Callable[[tuple[float]], float],
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
if isinstance(values, dict):
if not isinstance(self.outcome_space, CartesianOutcomeSpace):
raise ValueError(
f"Cannot create a {self.__class__.__name__} with an outcome-space that is not Cartesian while passing values as a dict"
)
if not self.outcome_space.issues:
raise ValueError(
"Cannot initializes with dict of values if you are not specifying issues"
)
values = [
values[_]
for _ in [
i.name if isinstance(i, Issue) else i
for i in self.outcome_space.issues
]
]
self.values = values
self.f = f
[docs]
def xml(self, issues: list[Issue]) -> str:
raise NotImplementedError(f"Cannot convert {self.__class__.__name__} to xml")
[docs]
def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d = {python_class_identifier: get_full_type_name(type(self))}
return dict(
**d,
values=serialize(
self.values, python_class_identifier=python_class_identifier
),
f=serialize(self.f),
)
[docs]
@classmethod
def from_dict(cls, d, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d.pop(python_class_identifier, None)
for k in ("values", "f"):
d[k] = deserialize(
d.get(k, None), python_class_identifier=python_class_identifier
)
return cls(**d)
[docs]
def eval(self, offer: Outcome | None) -> float:
if offer is None:
return self.reserved_value
if self.values is None:
raise ValueError("No issue utilities were set.")
u = tuple(gmap(v, w) for w, v in zip(offer, self.values))
return self.f(u)
[docs]
class HyperRectangleUtilityFunction(StationaryMixin, UtilityFunction):
"""A utility function defined as a set of hyper-volumes.
The utility function that is calulated by combining linearly a set of *probably nonlinear* functions applied in
predefined hyper-volumes of the outcome space.
Args:
outcome_ranges: The outcome_ranges for which the `mappings` are defined
weights: The *optional* weights to use for combining the outputs of the `mappings`
ignore_issues_not_in_input: If a hyper-volumne local function is defined for some issue
that is not in the outcome being evaluated ignore it.
ignore_failing_range_utilities: If a hyper-volume local function fails, just assume it
did not exist for this outcome.
name: name of the utility function. If None a random name will be generated.
Examples:
We will use the following issue space of cardinality :math:`10 \times 5 \times 4`:
>>> from negmas.outcomes import make_issue
>>> issues = [make_issue(10), make_issue(5), make_issue(4)]
Now create the utility function with
>>> f = HyperRectangleUtilityFunction(
... outcome_ranges=[
... {0: (1.0, 2.0), 1: (1.0, 2.0)},
... {0: (1.4, 2.0), 2: (2.0, 3.0)},
... ],
... utilities=[2.0, lambda x: 2 * x[2] + x[0]],
... )
>>> g = HyperRectangleUtilityFunction(
... outcome_ranges=[
... {0: (1.0, 2.0), 1: (1.0, 2.0)},
... {0: (1.4, 2.0), 2: (2.0, 3.0)},
... ],
... utilities=[2.0, lambda x: 2 * x[2] + x[0]],
... ignore_issues_not_in_input=True,
... )
>>> h = HyperRectangleUtilityFunction(
... outcome_ranges=[
... {0: (1.0, 2.0), 1: (1.0, 2.0)},
... {0: (1.4, 2.0), 2: (2.0, 3.0)},
... ],
... utilities=[2.0, lambda x: 2 * x[2] + x[0]],
... ignore_failing_range_utilities=True,
... )
We can now calcualte the utility_function of some outcomes:
* An outcome that belongs to the both outcome_ranges:
>>> [
... f({0: 1.5, 1: 1.5, 2: 2.5}),
... g({0: 1.5, 1: 1.5, 2: 2.5}),
... h({0: 1.5, 1: 1.5, 2: 2.5}),
... ]
[8.5, 8.5, 8.5]
* An outcome that belongs to the first hypervolume only:
>>> [
... f({0: 1.5, 1: 1.5, 2: 1.0}),
... g({0: 1.5, 1: 1.5, 2: 1.0}),
... h({0: 1.5, 1: 1.5, 2: 1.0}),
... ]
[2.0, 2.0, 2.0]
* An outcome that belongs to and has the first hypervolume only:
>>> [f({0: 1.5}), g({0: 1.5}), h({0: 1.5})]
[nan, 0.0, nan]
* An outcome that belongs to the second hypervolume only:
>>> [f({0: 1.5, 2: 2.5}), g({0: 1.5, 2: 2.5}), h({0: 1.5, 2: 2.5})]
[nan, 6.5, nan]
* An outcome that has and belongs to the second hypervolume only:
>>> [f({2: 2.5}), g({2: 2.5}), h({2: 2.5})]
[nan, 0.0, nan]
* An outcome that belongs to no outcome_ranges:
>>> [
... f({0: 11.5, 1: 11.5, 2: 12.5}),
... g({0: 11.5, 1: 11.5, 2: 12.5}),
... h({0: 11.5, 1: 11.5, 2: 12.5}),
... ]
[0.0, 0.0, 0.0]
Remarks:
- The number of outcome_ranges, mappings, and weights must be the same
- if no weights are given they are all assumed to equal unity
- mappings can either by an `OutcomeUtilityMapping` or a constant.
"""
[docs]
def adjust_params(self):
if self.weights is None:
self.weights = [1.0] * len(self.outcome_ranges)
def __init__(
self,
outcome_ranges: Iterable[OutcomeRange | None],
utilities: list[float] | list[OutcomeUtilityMapping],
weights: list[float] | None = None,
ignore_issues_not_in_input=False,
ignore_failing_range_utilities=False,
bias: float = 0.0,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.outcome_ranges = list(outcome_ranges)
self.mappings = list(utilities)
self.weights = list(weights) if weights else ([1.0] * len(self.outcome_ranges))
self.ignore_issues_not_in_input = ignore_issues_not_in_input
self.ignore_failing_range_utilities = ignore_failing_range_utilities
self.bias = bias
self.adjust_params()
[docs]
def xml(self, issues: list[Issue]) -> str:
"""Represents the function as XML
Args:
issues:
Examples:
>>> from negmas.outcomes import make_issue
>>> f = HyperRectangleUtilityFunction(
... outcome_ranges=[
... {0: (1.0, 2.0), 1: (1.0, 2.0)},
... {0: (1.4, 2.0), 2: (2.0, 3.0)},
... ],
... utilities=[2.0, 9.0 + 4.0],
... )
>>> print(
... f.xml(
... [
... make_issue((0.0, 4.0), name="0"),
... make_issue((0.0, 9.0), name="1"),
... make_issue((0.0, 9.0), name="2"),
... ]
... ).strip()
... )
<issue index="1" name="0" vtype="real" type="real" etype="real">
<range lowerbound="0.0" upperbound="4.0"></range>
</issue><issue index="2" name="1" vtype="real" type="real" etype="real">
<range lowerbound="0.0" upperbound="9.0"></range>
</issue><issue index="3" name="2" vtype="real" type="real" etype="real">
<range lowerbound="0.0" upperbound="9.0"></range>
</issue><utility_function maxutility="-1.0">
<ufun type="PlainUfun" weight="1" aggregation="sum">
<hyperRectangle utility_function="2.0">
<INCLUDES index="1" min="1.0" max="2.0" />
<INCLUDES index="2" min="1.0" max="2.0" />
</hyperRectangle>
<hyperRectangle utility_function="13.0">
<INCLUDES index="1" min="1.4" max="2.0" />
<INCLUDES index="3" min="2.0" max="3.0" />
</hyperRectangle>
</ufun>
</utility_function>
"""
output = ""
for i, issue in enumerate(issues):
name = issue.name
if isinstance(issue.values, tuple):
output += (
f'<issue index="{i+1}" name="{name}" vtype="real" type="real" etype="real">\n'
f' <range lowerbound="{issue.values[0]}" upperbound="{issue.values[1]}"></range>\n'
f"</issue>"
)
elif isinstance(issue.values, int):
output += (
f'<issue index="{i+1}" name="{name}" vtype="integer" type="integer" etype="integer" '
f'lowerbound="0" upperbound="{issue.values - 1}"/>\n'
)
else:
output += (
f'<issue index="{i+1}" name="{name}" vtype="integer" type="integer" etype="integer" '
f'lowerbound="{min(issue.values)}" upperbound="{max(issue.values)}"/>\n'
)
# todo find the real maxutility
output += '<utility_function maxutility="-1.0">\n <ufun type="PlainUfun" weight="1" aggregation="sum">\n'
for rect, u, w in zip(self.outcome_ranges, self.mappings, self.weights):
if not isinstance(u, float):
raise ValueError(
"Only hyper-rectangles with constant utility per rectangle can be convereted to xml"
)
output += f' <hyperRectangle utility_function="{u * w}">\n'
if not rect:
continue
for key in rect.keys():
# indx = [i for i, _ in enumerate(issues) if _.name == key][0] + 1
indx = key + 1
values = rect.get(key, None)
if values is None:
continue
if isinstance(values, float) or isinstance(values, int):
mn, mx = values, values
elif isinstance(values, tuple):
mn, mx = values
else:
mn, mx = min(values), max(values)
output += (
f' <INCLUDES index="{indx}" min="{mn}" max="{mx}" />\n'
)
output += " </hyperRectangle>\n"
output += " </ufun>\n</utility_function>"
return output
[docs]
def to_stationary(self):
return self
[docs]
@classmethod
def random(
cls, outcome_space, reserved_value, normalized=True, rectangles=(1, 4), **kwargs
) -> HyperRectangleUtilityFunction:
"""Generates a random ufun of the given type"""
raise NotImplementedError("random hyper-rectangle ufuns are not implemented")
[docs]
def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d = {python_class_identifier: get_full_type_name(type(self))}
d.update(super().to_dict(python_class_identifier=python_class_identifier))
return dict(
**d,
outcome_ranges=serialize(
self.outcome_ranges, python_class_identifier=python_class_identifier
),
utilities=serialize(
self.mappings, python_class_identifier=python_class_identifier
),
weights=self.weights,
ignore_issues_not_in_input=self.ignore_issues_not_in_input,
ignore_failing_range_utilities=self.ignore_failing_range_utilities,
)
[docs]
@classmethod
def from_dict(cls, d, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d.pop(python_class_identifier, None)
for k in ("oucome_ranges", "utilities"):
d[k] = deserialize(
d.get(k, None), python_class_identifier=python_class_identifier
)
return cls(**d)
[docs]
def eval(self, offer: Outcome | None) -> float:
if offer is None:
return self.reserved_value
u = self.bias
for weight, outcome_range, mapping in zip(
self.weights, self.outcome_ranges, self.mappings
):
# fail on any outcome_range that constrains issues not in the presented outcome
if (
outcome_range is not None
and set(ikeys(outcome_range)) - set(ikeys(offer)) != set()
):
if self.ignore_issues_not_in_input:
continue
return float("nan")
elif outcome_range is None or outcome_in_range(offer, outcome_range):
if isinstance(mapping, float) or isinstance(mapping, int):
u += weight * mapping
else:
# fail if any outcome_range utility_function cannot be calculated from the input
try:
u += weight * gmap(mapping, offer)
except KeyError:
if self.ignore_failing_range_utilities:
continue
return float("nan")
return u
[docs]
class NonlinearHyperRectangleUtilityFunction(StationaryMixin, UtilityFunction):
"""A utility function defined as a set of outcome_ranges.
Args:
hypervolumes: see `HyperRectangleUtilityFunction`
mappings: see `HyperRectangleUtilityFunction`
f: A nonlinear function to combine the results of `mappings`
name: name of the utility function. If None a random name will be generated
"""
def __init__(
self,
hypervolumes: Iterable[OutcomeRange],
mappings: list[OutcomeUtilityMapping],
f: Callable[[list[float]], float],
name: str | None = None,
reserved_value: float = float("-inf"),
id=None,
) -> None:
super().__init__(name=name, reserved_value=reserved_value, id=id)
self.hypervolumes = hypervolumes
self.mappings = mappings
self.f = f
[docs]
def xml(self, issues: list[Issue]) -> str:
raise NotImplementedError(f"Cannot convert {self.__class__.__name__} to xml")
[docs]
def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d = {python_class_identifier: get_full_type_name(type(self))}
d.update(super().to_dict(python_class_identifier=python_class_identifier))
return dict(
**d,
hypervolumes=serialize(
self.hypervolumes, python_class_identifier=python_class_identifier
),
mappings=serialize(
self.mappings, python_class_identifier=python_class_identifier
),
f=serialize(self.f, python_class_identifier=python_class_identifier),
)
[docs]
@classmethod
def from_dict(cls, d, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d.pop(python_class_identifier, None)
for k in ("hypervolumes", "mapoints", "f"):
d[k] = deserialize(
d.get(k, None), python_class_identifier=python_class_identifier
)
return cls(**d)
[docs]
def eval(self, offer: Outcome | None) -> float:
if offer is None:
return self.reserved_value
if not isinstance(self.hypervolumes, Iterable):
raise ValueError(
"Hypervolumes are not set. Call set_params() or pass them through the constructor."
)
u = []
for hypervolume, mapping in zip(self.hypervolumes, self.mappings):
if outcome_in_range(offer, hypervolume):
u.append(gmap(mapping, offer))
return self.f(u)