from __future__ import annotations
import random
from functools import lru_cache, partial
from typing import Any, Callable, Iterable, Mapping, TYPE_CHECKING
from negmas import warnings
from negmas.helpers import get_full_type_name
from negmas.helpers.numeric import make_range
from negmas.helpers.prob import EPSILON
from negmas.outcomes import Issue, Outcome
from negmas.outcomes.base_issue import DiscreteIssue
from negmas.outcomes.common import check_one_at_most, os_or_none
from negmas.outcomes.outcome_space import CartesianOutcomeSpace
from negmas.outcomes.protocols import IndependentIssuesOS, OutcomeSpace
from negmas.preferences.protocols import SingleIssueFun
from negmas.serialization import PYTHON_CLASS_IDENTIFIER, deserialize, serialize
from ..crisp_ufun import UtilityFunction
from ..mixins import StationaryMixin
from ..value_fun import IdentityFun, LambdaFun, TableFun
if TYPE_CHECKING:
from negmas.preferences.crisp.const import ConstUtilityFunction
__all__ = [
"LinearUtilityAggregationFunction",
"LinearAdditiveUtilityFunction",
"LinearUtilityFunction",
"AffineUtilityFunction",
]
NLEVELS = 20
def _rand_mapping(x, r):
return (r - 0.5) * x
def _rand_mapping_normalized(x, mx, mn, r):
if mx == mn:
return mx
return r * (x - mn) / (mx - mn)
def _random_mapping(issue: Issue, normalized=False):
r = random.random()
if issue.is_numeric():
return (
partial(
_rand_mapping_normalized, mx=issue.max_value, mn=issue.min_value, r=r
)
if normalized
else partial(_rand_mapping, r=r)
)
if isinstance(issue, DiscreteIssue):
return dict(
zip(
issue.all,
[
random.random() - (0.5 if not normalized else 0.0)
for _ in range(issue.cardinality)
],
)
)
return (
partial(_rand_mapping_normalized, mx=issue.max_value, mn=issue.min_value)
if normalized
else partial(_rand_mapping, r=r)
)
[docs]
class AffineUtilityFunction(StationaryMixin, UtilityFunction):
r"""
An affine utility function for multi-issue negotiations.
Models a linear utility function using predefined weights.
Args:
weights: weights for combining `values`
bias: The offset added
name: name of the utility function. If None a random name will be generated.
Notes:
The utility value is calculated as:
.. math::
u = \alpha + \sum_{i=0}^{n_{outcomes}-1} {\alpha_i * \omega_i}
where $\alpha$ is the bias term and $\alpha_i$ is the weight of issue $i$,
and $\omega_i$ is the value of issue $i$ in the input outcome $\omega$.
Examples:
>>> from negmas.outcomes import make_issue
>>> issues = [make_issue((10.0, 20.0), "price"), make_issue(5, "quality")]
>>> print(list(map(str, issues)))
['price: (10.0, 20.0)', 'quality: (0, 4)']
>>> f = AffineUtilityFunction({"price": 1.0, "quality": 4.0}, issues=issues)
>>> f((2, 14.0)) - (2 * 1.0 + 14.0 * 4)
0.0
>>> f = LinearUtilityFunction([1.0, 2.0])
>>> f((2, 14)) - (2 * 1.0 + 14 * 2.0)
0.0
Remarks:
- If an outcome contains combinations of strings and numeric values that have corresponding weights, an
exception will be raised when its utility is calculated
- If you pass weights as a dictionary (mapping issue names to wieght
values), you **must** pass the issues as well ( using `outcome_space`
or `issues` ) and issue names must match the keys of the weights
dict exactly (i.e. all issues are
represented and all keys in the dict are in the issues).
- If you pass the weights as a tuple, you need not pass the issues (but
you still should to help with things like scaling and shifting ufun
values)
"""
def __init__(
self,
weights: dict[str, float] | list[float] | tuple[float, ...] | None = None,
bias: float = 0,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
if self.outcome_space and not isinstance(
self.outcome_space, IndependentIssuesOS
):
raise ValueError(
f"Cannot create {self.type} ufun with an outcomespace without indpendent issues"
f".\n Given OS: {self.outcome_space} of type {type(self.outcome_space)}\n"
f"Given args {kwargs}"
)
self.issues: list[Issue] | None = (
list(self.outcome_space.issues) if self.outcome_space else None # type: ignore
)
if weights is None:
if not self.issues:
raise ValueError(
"Cannot initializes with no weights if you are not specifying issues"
)
weights = [1.0 / len(self.issues)] * len(self.issues)
if isinstance(weights, dict):
if not self.issues:
raise ValueError(
"Cannot initializes with dict of weights if you are not specifying issues"
)
weights = [
weights[_]
for _ in [i.name if isinstance(i, Issue) else i for i in self.issues]
]
self._weights: list[float] = list(weights)
self._bias = bias
self._values = [IdentityFun() for _ in self.issues] if self.issues else []
@property
def bias(self):
return self._bias
@property
def weights(self):
return self._weights
@property
def values(self):
return self._values
[docs]
def eval(self, offer: Outcome | None) -> float:
if offer is None:
return self.reserved_value
return self._bias + sum(w * v for w, v in zip(self._weights, offer))
[docs]
def xml(self, issues: list[Issue] | None = None) -> str:
"""Generates an XML string representing the utility function
Args:
issues:
Examples:
>>> from negmas.outcomes import make_issue
>>> issues = [make_issue(values=10, name="i1"), make_issue(values=4, name="i2")]
>>> f = LinearUtilityFunction(weights=[1.0, 4.0], issues=issues)
>>> print(f.xml(issues))
<issue index="1" etype="discrete" type="discrete" vtype="integer" name="i1">
<item index="1" value="0" evaluation="0.0" />
<item index="2" value="1" evaluation="1.0" />
<item index="3" value="2" evaluation="2.0" />
<item index="4" value="3" evaluation="3.0" />
<item index="5" value="4" evaluation="4.0" />
<item index="6" value="5" evaluation="5.0" />
<item index="7" value="6" evaluation="6.0" />
<item index="8" value="7" evaluation="7.0" />
<item index="9" value="8" evaluation="8.0" />
<item index="10" value="9" evaluation="9.0" />
</issue>
<issue index="2" etype="discrete" type="discrete" vtype="integer" name="i2">
<item index="1" value="0" evaluation="0.0" />
<item index="2" value="1" evaluation="1.0" />
<item index="3" value="2" evaluation="2.0" />
<item index="4" value="3" evaluation="3.0" />
</issue>
<weight index="1" value="1.0">
</weight>
<weight index="2" value="4.0">
</weight>
<BLANKLINE>
"""
# todo save for continuous issues using evaluator ftype linear offset slope (see from_xml_str)
output = ""
if not issues:
if not self.issues:
raise ValueError(
"Cannot convert the ufn to xml as its outcome-space is unknown"
)
issues = list(self.issues)
for i, (issue, vfun, weight) in enumerate(
zip(issues, self._values, self._weights)
):
if not issue.is_numeric():
raise ValueError(
f"Issue {issue} is not numeric. Cannot use a LinearUtilityFunction. Try a LinearAdditiveUtilityFunction"
)
bias = self._bias / weight if weight else 0.0
output += vfun.xml(i, issue, bias)
for i, w in enumerate(self._weights):
output += f'<weight index="{i+1}" value="{w}">\n</weight>\n'
if abs(self._bias) > EPSILON:
output += f'<weight index="{len(self._weights) + 1}" value="{self._bias}">\n</weight>\n'
return output
[docs]
@classmethod
def random(
cls,
issues: list[Issue] | tuple[Issue, ...],
reserved_value=(0.0, 1.0),
normalized=True,
):
# from negmas.preferences.ops import normalize
for issue in issues:
if not issue.is_numeric():
raise ValueError(
f"Issue {issue} is not numeric. Cannot use a LinearUtilityFunction. Try a LinearAdditiveUtilityFunction"
)
reserved_value = make_range(reserved_value)
n_issues = len(issues)
reserved_value = (
reserved_value
if reserved_value is not None
else tuple([random.random()] * 2)
)
if normalized:
weights = [random.random() for _ in range(n_issues)]
m = sum(weights)
if m:
weights = [_ / m for _ in weights]
for i, issue in enumerate(issues):
weights[i] /= issue.max_value # type: ignore (we know that all numeric issues has a maximum value)
bias = 0.0
else:
weights = [2 * (random.random() - 0.5) for _ in range(n_issues)]
bias = sum(2 * (random.random() - 0.5) * _ for _ in weights)
ufun = cls(
weights=weights,
bias=bias,
issues=issues,
reserved_value=random.random() * (reserved_value[1] - reserved_value[0])
+ reserved_value[0],
)
return ufun
[docs]
def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d = {python_class_identifier: get_full_type_name(type(self))}
d.update(super().to_dict(python_class_identifier=PYTHON_CLASS_IDENTIFIER))
return dict(**d, weights=self._weights, bias=self._bias)
[docs]
@classmethod
def from_dict(cls, d: dict, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
if isinstance(d, cls):
return d
d.pop(python_class_identifier, None)
# d["values"]=deserialize(d["values"]), # type: ignore (deserialize can return anything but it should be OK)
d = deserialize(
d,
deep=True,
remove_type_field=True,
python_class_identifier=python_class_identifier,
) # type: ignore
return cls(**d) # type: ignore I konw that d will be a dict with string keys
[docs]
def shift_by(
self, offset: float, shift_reserved: bool = True
) -> AffineUtilityFunction:
return AffineUtilityFunction(
self._weights,
self._bias + offset,
outcome_space=self.outcome_space,
name=self.name,
reserved_value=self.reserved_value
if not shift_reserved
else (self.reserved_value + offset),
)
[docs]
def scale_by(
self, scale: float, scale_reserved: bool = True
) -> AffineUtilityFunction:
if scale < 0:
raise ValueError(f"Cannot have a negative scale: {scale}")
weights = [_ * scale for _ in self._weights]
return AffineUtilityFunction(
weights=weights,
bias=self._bias * scale,
outcome_space=self.outcome_space,
name=self.name,
reserved_value=self.reserved_value
if not scale_reserved
else (self.reserved_value * scale),
)
[docs]
def normalize_for(
self,
to: tuple[float, float] = (0.0, 1.0),
outcome_space: OutcomeSpace | None = None,
) -> ConstUtilityFunction | AffineUtilityFunction:
"""
Creates a new utility function that is normalized based on input conditions.
Args:
to: The minimum and maximum value to normalize to. If either is None, it is ignored.
This means that passing `(None, 1.0)` will normalize the ufun so that the maximum
is `1` but will not guarantee any limit for the minimum and so on.
outcome_space: the outcome space to normalize within
"""
epsilon: float = 1e-8
if outcome_space is None:
outcome_space = self.outcome_space
mn, mx = self.minmax(outcome_space)
if sum(self._weights) < epsilon:
raise ValueError(
"Cannot normalize a ufun with zero weights to have a non-zero range"
)
if abs(mx - to[1]) < epsilon and abs(mn - to[0]) < epsilon:
return self
if abs(mx - mn) < epsilon:
if (to[1] - to[0]) < epsilon:
scale = 1.0
else:
from negmas.preferences.crisp.const import ConstUtilityFunction
return ConstUtilityFunction(
to[1], outcome_space=outcome_space, name=self.name
)
else:
scale = (to[1] - to[0]) / (mx - mn)
if scale < 0:
raise ValueError(
f"Cannot have a negative scale: max, min = ({mx}, {mn}) and rng = {to}"
)
bias = to[1] - scale * mx + self._bias * scale
weights = [_ * scale for _ in self._weights]
if abs(bias) < epsilon:
return LinearUtilityFunction(
weights, outcome_space=outcome_space, name=self.name
)
return AffineUtilityFunction(
weights, bias, outcome_space=outcome_space, name=self.name
)
[docs]
def normalize(
self, to: tuple[float, float] = (0.0, 1.0), normalize_weights: bool = False
) -> ConstUtilityFunction | AffineUtilityFunction | LinearUtilityFunction:
return self.normalize_for(to, self.outcome_space)
[docs]
def extreme_outcomes(
self,
outcome_space: OutcomeSpace | None = None,
issues: Iterable[Issue] | None = None,
outcomes: Iterable[Outcome] | None = None,
max_cardinality=1000,
) -> tuple[Outcome, Outcome]:
return self._extreme_outcomes(outcome_space, issues, outcomes, max_cardinality)
@lru_cache
def _extreme_outcomes(
self,
outcome_space: OutcomeSpace | None = None,
issues: list[Issue] | None = None,
outcomes: list[Outcome] | None = None,
max_cardinality=1000,
) -> tuple[Outcome, Outcome]:
"""Finds the best and worst outcomes
Args:
ufun: The utility function
issues: list of issues (optional)
outcomes: A collection of outcomes (optional)
max_cardinality: the maximum number of outcomes to try sampling (if sampling is used and outcomes are not
given)
Returns:
(worst, best) outcomes
"""
# The minimum and maximum must be at one of the edges of the outcome space. Just enumerate them
original_os = outcome_space
check_one_at_most(outcome_space, issues, outcomes)
outcome_space = os_or_none(outcome_space, issues, outcomes)
if outcome_space is None:
outcome_space = self.outcome_space
if outcome_space is not None:
if not isinstance(outcome_space, IndependentIssuesOS):
return super().extreme_outcomes(
original_os, issues, outcomes, max_cardinality
)
if outcomes is not None:
warnings.warn(
"Passing outcomes and issues (or having known issues) to linear ufuns is redundant. The outcomes passed will be used which is much slower than if you do not pass them",
warnings.NegmasSpeedWarning,
)
return super().extreme_outcomes(
outcome_space=original_os,
issues=issues,
outcomes=outcomes,
max_cardinality=max_cardinality,
)
uranges = []
if not self.issues:
raise ValueError(
"Cannot find extreme outcomes of a ufun without knowing its outcome-space"
)
issues = self.issues
for issue in issues:
mx, mn = float("-inf"), float("inf")
for v in issue.value_generator(n=max_cardinality):
if v >= mx:
mx = v
if v < mn:
mn = v
uranges.append((mn, mx))
best_outcome, worst_outcome = [], []
for w, urng in zip(self._weights, uranges):
if w > 0:
best_outcome.append(urng[1])
worst_outcome.append(urng[0])
else:
best_outcome.append(urng[0])
worst_outcome.append(urng[1])
return tuple(worst_outcome), tuple(best_outcome)
return super().extreme_outcomes(original_os, issues, outcomes, max_cardinality)
def __str__(self):
return f"w: {self._weights}, b: {self._bias}"
[docs]
class LinearUtilityFunction(AffineUtilityFunction):
r"""
A special case of the `AffineUtilityFunciton` for which the bias is zero.
Args:
weights: weights for combining `values`
bias: The offset added
name: name of the utility function. If None a random name will be generated.
Notes:
The utility value is calculated as:
.. math::
u = \sum_{i=0}^{n_{outcomes}-1} {\alpha_i * \omega_i}
where $\alpha_i$ is the weight of issue $i$, and $\omega_i$ is the value of issue $i$ in the input outcome $\omega$.
"""
def __init__(
self,
weights: dict[str, float] | list[float] | tuple[float, ...] | None = None,
*args,
**kwargs,
) -> None:
kwargs["bias"] = 0
super().__init__(weights, *args, **kwargs)
[docs]
class LinearAdditiveUtilityFunction( # type: ignore
StationaryMixin, UtilityFunction
):
r"""A linear aggregation utility function for multi-issue negotiations.
Models a linear utility function using predefined weights:\.
Args:
values: utility functions for individual issues
weights: weights for combining `values`
name: name of the utility function. If None a random name will be generated.
Notes:
The utility value is calculated as:
.. math::
u = \sum_{i=0}^{n_{outcomes}-1} {w_i * u_i(\omega_i)}
Examples:
>>> from negmas.outcomes import dict2outcome, make_issue
>>> issues = [
... make_issue((10.0, 20.0), "price"),
... make_issue(["delivered", "not delivered"], "delivery"),
... make_issue(5, "quality"),
... ]
>>> print(list(map(str, issues)))
['price: (10.0, 20.0)', "delivery: ['delivered', 'not delivered']", 'quality: (0, 4)']
>>> f = LinearAdditiveUtilityFunction(
... {
... "price": lambda x: 2.0 * x,
... "delivery": {"delivered": 10, "not delivered": -10},
... "quality": lambda x: x - 3,
... },
... weights={"price": 1.0, "delivery": 2.0, "quality": 4.0},
... issues=issues,
... )
>>> float(f((2, "delivered", 14.0)))
68.0
You can confirm this yourself by calculating the ufun manually:
>>> 68.0 == (1.0 * 2.0 * 2 + 2.0 * 10 + 4.0 * (14.0 - 3))
True
Yout can use a dictionary to represent the outcome for more readability:
>>> float(
... f(
... dict2outcome(
... dict(price=2, quality=14, delivery="delivered"), issues=issues
... )
... )
... )
68.0
You can use lists instead of dictionaries for defining outcomes, weights
but that is less readable. The advantage here is that you do not need to pass the issues
>>> f = LinearAdditiveUtilityFunction(
... [
... lambda x: 2.0 * x,
... {"delivered": 10, "not delivered": -10},
... LambdaFun(lambda x: x - 3),
... ],
... weights=[1.0, 2.0, 4.0],
... issues=issues,
... )
>>> float(f((14.0, "delivered", 2)))
44.0
Remarks:
The mapping need not use all the issues in the output as the last example show.
"""
def __init__(
self,
values: dict[str, SingleIssueFun]
| tuple[SingleIssueFun, ...]
| list[SingleIssueFun],
weights: Mapping[Any, float] | list[float] | tuple[float, ...] | None = None,
bias: float = 0.0,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self._bias = bias
if self.outcome_space and not isinstance(
self.outcome_space, IndependentIssuesOS
):
raise ValueError(
f"Cannot create {self.type} ufun with an outcomespace without indpendent "
f"issues.\n Given OS: {self.outcome_space} of type "
f"{type(self.outcome_space)}\nGiven args {kwargs}"
)
self.issues: list[Issue] | None = (
list(self.outcome_space.issues) if self.outcome_space else None # type: ignore
)
if isinstance(values, dict):
if self.issues is None:
raise ValueError(
"Must specify issues when passing `values` or `weights` is a dict"
)
values = [
values.get(_, IdentityFun()) # type: ignore
for _ in [i.name if isinstance(i, Issue) else i for i in self.issues]
]
else:
values = list(values)
if weights is None:
weights = [1.0] * len(values)
if isinstance(weights, dict):
if self.issues is None:
raise ValueError(
"Must specify issues when passing `values` or `weights` is a dict"
)
weights = [
weights.get(_, 1.0)
# weights[_]
for _ in [i.name if isinstance(i, Issue) else i for i in self.issues]
]
self.values = []
for i, v in enumerate(values):
if isinstance(v, SingleIssueFun):
self.values.append(v)
elif isinstance(v, dict):
self.values.append(TableFun(v))
elif isinstance(v, Callable):
self.values.append(LambdaFun(v))
elif isinstance(v, Iterable):
if (
not self.issues
or len(self.issues) < i + 1
or not self.issues[i].is_discrete()
):
raise TypeError(
"When passing an iterable as the value function for an issue, "
"the issue MUST be discrete"
)
d = dict(zip(self.issues[i].enumerate(), v)) # type: ignore We know the issue is discrete
self.values.append(TableFun(d))
else:
raise TypeError(
f"Mapping {v} is not supported: Itis of type ({type(v)}) but we only support SingleIssueFun, Dict or Lambda mappings"
)
self._weights = list(weights) # type: ignore
@property
def weights(self):
return self._weights
[docs]
def eval(self, offer: Outcome | None) -> float:
if offer is None:
return self.reserved_value
u = self._bias
for v, w, iu in zip(offer, self.weights, self.values):
current_utility = iu(v)
if current_utility is None:
return float("nan")
try:
u += w * current_utility
except FloatingPointError:
continue
return u
[docs]
def xml(self, issues: list[Issue] | None = None) -> str:
"""Generates an XML string representing the utility function
Args:
issues:
Examples:
>>> from negmas.outcomes import make_issue
>>> issues = [
... make_issue(values=10, name="i1"),
... make_issue(values=["delivered", "not delivered"], name="i2"),
... make_issue(values=4, name="i3"),
... ]
>>> f = LinearAdditiveUtilityFunction(
... [
... lambda x: 2.0 * x,
... {"delivered": 10, "not delivered": -10},
... LambdaFun(lambda x: x - 3),
... ],
... weights=[1.0, 2.0, 4.0],
... issues=issues,
... )
>>> print(f.xml(issues))
<issue index="1" etype="discrete" type="discrete" vtype="integer" name="i1">
<item index="1" value="0" evaluation="0.0" />
<item index="2" value="1" evaluation="2.0" />
<item index="3" value="2" evaluation="4.0" />
<item index="4" value="3" evaluation="6.0" />
<item index="5" value="4" evaluation="8.0" />
<item index="6" value="5" evaluation="10.0" />
<item index="7" value="6" evaluation="12.0" />
<item index="8" value="7" evaluation="14.0" />
<item index="9" value="8" evaluation="16.0" />
<item index="10" value="9" evaluation="18.0" />
</issue>
<issue index="2" etype="discrete" type="discrete" vtype="discrete" name="i2">
<item index="1" value="delivered" evaluation="10.0" />
<item index="2" value="not delivered" evaluation="-10.0" />
</issue>
<issue index="3" etype="discrete" type="discrete" vtype="integer" name="i3">
<item index="1" value="0" evaluation="-3.0" />
<item index="2" value="1" evaluation="-2.0" />
<item index="3" value="2" evaluation="-1.0" />
<item index="4" value="3" evaluation="0.0" />
</issue>
<weight index="1" value="1.0">
</weight>
<weight index="2" value="2.0">
</weight>
<weight index="3" value="4.0">
</weight>
<BLANKLINE>
"""
output = ""
if not issues:
issues = self.issues
if not issues:
raise ValueError(
"Cannot convert a ufun to xml() without konwing its outcome-space"
)
# <issue vtype="integer" lowerbound="1" upperbound="17" name="Charging Speed" index="3" etype="integer" type="integer">
# <evaluator ftype="linear" offset="0.4" slope="0.0375">
# </evaluator>
for i, (issue, vfun) in enumerate(zip(issues, self.values)):
output += vfun.xml(i, issue, 0.0)
for i, w in enumerate(self.weights):
output += f'<weight index="{i+1}" value="{w}">\n</weight>\n'
# if we have a bias, just add one extra issue with a weight equal to the bias (this issue will implicitly be assumed to be numeric with a single value of 1)
if abs(self._bias) > EPSILON:
output += f'<weight index="{len(self.weights) + 1}" value="{self._bias}">\n</weight>\n'
return output
[docs]
def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
d = {python_class_identifier: get_full_type_name(type(self))}
d.update(super().to_dict(python_class_identifier=python_class_identifier))
return dict(
**d,
weights=self.weights,
values=serialize(
self.values, python_class_identifier=python_class_identifier
),
)
[docs]
@classmethod
def from_dict(cls, d: dict, python_class_identifier=PYTHON_CLASS_IDENTIFIER):
if isinstance(d, cls):
return d
d.pop(python_class_identifier, None)
# d["values"]=deserialize(d["values"]), # type: ignore (deserialize can return anything but it should be OK)
d = deserialize(
d,
deep=True,
remove_type_field=True,
python_class_identifier=python_class_identifier,
) # type: ignore
return cls(**d) # type: ignore I konw that d will be a dict with string keys
[docs]
def extreme_outcomes(
self,
outcome_space: OutcomeSpace | None = None,
issues: Iterable[Issue] | None = None,
outcomes: Iterable[Outcome] | None = None,
max_cardinality=1000,
) -> tuple[Outcome, Outcome]:
return self._extreme_outcomes(outcome_space, issues, outcomes, max_cardinality)
@lru_cache
def _extreme_outcomes(
self,
outcome_space: OutcomeSpace | None = None,
issues: list[Issue] | None = None,
outcomes: list[Outcome] | None = None,
max_cardinality=1000,
) -> tuple[Outcome, Outcome]:
"""Finds the best and worst outcomes
Args:
ufun: The utility function
issues: list of issues (optional)
outcomes: A collection of outcomes (optional)
max_cardinality: the maximum number of outcomes to try sampling (if sampling is used and outcomes are not
given)
Returns:
(worst, best) outcomes
"""
# The minimum and maximum must be at one of the edges of the outcome space. Just enumerate them
original_os = outcome_space
check_one_at_most(outcome_space, issues, outcomes)
outcome_space = os_or_none(outcome_space, issues, outcomes)
if outcome_space is None:
outcome_space = self.outcome_space
if outcome_space is None or not isinstance(
outcome_space, CartesianOutcomeSpace
):
return super().extreme_outcomes(
original_os, issues, outcomes, max_cardinality
)
if outcomes is not None:
warnings.warn(
"Passing outcomes and issues (or having known issues) to linear ufuns is redundant. The outcomes passed will be used which is much slower than if you do not pass them",
warnings.NegmasSpeedWarning,
)
return super().extreme_outcomes(
original_os,
issues,
outcomes,
max_cardinality, # type:ignore
)
uranges, vranges = [], []
myissues: list[Issue] = outcome_space.issues # type: ignore We checked earlier that this is an CartesianOutcomeSpace. It MUST have issues
for i, issue in enumerate(myissues):
fn = self.values[i]
mx, mn = float("-inf"), float("inf")
mxv, mnv = None, None
for v in issue.value_generator(n=max_cardinality):
uval = fn(v)
if uval >= mx:
mx, mxv = uval, v
if uval < mn:
mn, mnv = uval, v
vranges.append((mnv, mxv))
uranges.append((mn, mx))
best_outcome, worst_outcome = [], []
best_util, worst_util = 0.0, 0.0
for w, urng, vrng in zip(self.weights, uranges, vranges):
if w > 0:
best_util += w * urng[1]
best_outcome.append(vrng[1])
worst_util += w * urng[0]
worst_outcome.append(vrng[0])
else:
best_util += w * urng[0]
best_outcome.append(vrng[0])
worst_util += w * urng[1]
worst_outcome.append(vrng[1])
return tuple(worst_outcome), tuple(best_outcome)
[docs]
@classmethod
def random(
cls,
outcome_space: CartesianOutcomeSpace | None = None,
issues: list[Issue] | tuple[Issue, ...] | None = None,
reserved_value=(0.0, 1.0),
normalized=True,
**kwargs,
):
# from negmas.preferences.ops import normalize
if not issues and outcome_space:
issues = outcome_space.issues
if not issues:
raise ValueError("Cannot generate a random ufun withot knowing the issues")
reserved_value = make_range(reserved_value)
n_issues = len(issues)
# r = reserved_value if reserved_value is not None else random.random()
rand_weights = [random.random() for _ in range(n_issues)]
if normalized:
m = sum(rand_weights)
if m:
rand_weights = [_ / m for _ in rand_weights]
weights = rand_weights
values = [_random_mapping(issue, normalized) for issue in issues]
ufun = cls(
weights=weights,
values=values, # type: ignore
issues=issues,
reserved_value=random.random() * (reserved_value[1] - reserved_value[0])
+ reserved_value[0],
**kwargs,
)
return ufun
[docs]
def shift_by(
self, offset: float, shift_reserved: bool = True, change_bias_only: bool = False
) -> LinearAdditiveUtilityFunction:
if change_bias_only:
return LinearAdditiveUtilityFunction(
values=self.values, # type: ignore
weights=self.weights,
name=self.name,
bias=self._bias + offset,
reserved_value=self.reserved_value
if not shift_reserved
else (self.reserved_value + offset),
outcome_space=self.outcome_space,
)
values = [v.shift_by(offset) for v in self.values]
return LinearAdditiveUtilityFunction(
values=values,
weights=self.weights,
name=self.name,
bias=self._bias,
reserved_value=self.reserved_value
if not shift_reserved
else (self.reserved_value + offset),
outcome_space=self.outcome_space,
)
[docs]
def scale_by(
self,
scale: float,
scale_reserved: bool = True,
change_weights_only: bool = False,
normalize_weights: bool = True,
) -> LinearAdditiveUtilityFunction:
if scale < 0:
raise ValueError(f"Cannot have a negative scale: {scale}")
if change_weights_only and normalize_weights:
raise ValueError(
"Cannot normalize weights only and in the same time change weights only"
)
wscale = 1.0
if normalize_weights:
w = sum(self.weights)
if w > 1e-6:
wscale = 1.0 / w
else:
wscale = 1.0
if change_weights_only:
wscale *= scale
return LinearAdditiveUtilityFunction(
values=self.values, # type: ignore
weights=[wscale * _ for _ in self.weights],
outcome_space=self.outcome_space,
reserved_value=self.reserved_value
if not scale_reserved
else (self.reserved_value * wscale),
name=self.name,
)
return LinearAdditiveUtilityFunction(
values=[_.scale_by(scale / wscale) for _ in self.values],
weights=[wscale * _ for _ in self.weights],
outcome_space=self.outcome_space,
reserved_value=self.reserved_value
if not scale_reserved
else (self.reserved_value * wscale * scale),
name=self.name,
)
def __str__(self):
return f"u: {self.values}\n w: {self.weights}"
LinearUtilityAggregationFunction = LinearAdditiveUtilityFunction
"""An alias for `LinearAdditiveUtilityFunction`"""