Source code for negmas.outcomes.outcome_space

from __future__ import annotations
from math import isinf
import numbers
from functools import reduce
import random
from itertools import filterfalse
from operator import mul
from typing import TYPE_CHECKING, Callable, Iterable, Sequence, Union

from attrs import define, field

from negmas.helpers import unique_name
from negmas.helpers.types import get_full_type_name
from negmas.outcomes.outcome_ops import (
    cast_value_types,
    outcome_is_valid,
    outcome_types_are_ok,
)
from negmas.protocols import XmlSerializable
from negmas.serialization import PYTHON_CLASS_IDENTIFIER, deserialize, serialize
from negmas.warnings import NegmasSpeedWarning, warn

from .base_issue import DiscreteIssue, Issue
from .categorical_issue import CategoricalIssue
from .common import Outcome
from .contiguous_issue import ContiguousIssue
from .issue_ops import (
    enumerate_discrete_issues,
    issues_from_outcomes,
    issues_from_xml_str,
    issues_to_xml_str,
    sample_issues,
)
from .protocols import DiscreteOutcomeSpace, OutcomeSpace
from .range_issue import RangeIssue

if TYPE_CHECKING:
    from negmas.preferences.protocols import HasReservedOutcome, HasReservedValue

__all__ = [
    "CartesianOutcomeSpace",
    "EnumeratingOutcomeSpace",
    "DiscreteCartesianOutcomeSpace",
    "make_os",
    "DistanceFun",
]

NLEVELS = 5


DistanceFun = Callable[[Outcome, Outcome, Union[OutcomeSpace, None]], float]
"""A callable that can calculate the distance between two outcomes in an outcome-space"""


[docs] def make_os( issues: Sequence[Issue] | None = None, outcomes: Sequence[Outcome] | None = None, name: str | None = None, ) -> CartesianOutcomeSpace: """ A factory to create outcome-spaces from lists of `Issue` s or `Outcome` s. Remarks: - must pass one and exactly one of `issues` and `outcomes` """ if issues and outcomes: raise ValueError( "Cannot make an outcome space passing both issues and outcomes" ) if not issues and not outcomes: raise ValueError( "Cannot make an outcome space without passing issues or outcomes" ) if not issues and outcomes: issues_ = issues_from_outcomes(outcomes) else: issues_ = issues if issues_ is None: raise ValueError( "Cannot make an outcome space without passing issues or outcomes" ) issues_ = tuple(issues_) if all(_.is_discrete() for _ in issues_): return DiscreteCartesianOutcomeSpace(issues_, name=name if name else "") return CartesianOutcomeSpace(issues_, name=name if name else "")
@define class OSWithValidity: invalid: set[Outcome] = field(factory=set) _baseset: set[Outcome] = field(factory=set) def __attrs_post_init__(self): self.update() def update(self): self._baseset = set(self.enumerate()).difference(self.invalid)
[docs] @define class EnumeratingOutcomeSpace(DiscreteOutcomeSpace, OSWithValidity): """An outcome space representing the enumeration of some outcomes. No issues defined""" name: str | None = field(eq=False, default=None)
[docs] def invalidate(self, outcome: Outcome) -> None: """Indicates that the outcome is invalid""" self.invalid.add(outcome) self.update()
[docs] def validate(self, outcome: Outcome) -> None: """Indicates that the outcome is invalid""" try: self.invalid.remove(outcome) except Exception: pass self.update()
[docs] def is_valid(self, outcome: Outcome) -> bool: """Checks if the given outcome is valid for that outcome space""" return outcome in self._baseset
[docs] def are_types_ok(self, outcome: Outcome) -> bool: """Checks if the type of each value in the outcome is correct for the given issue""" return True
[docs] def ensure_correct_types(self, outcome: Outcome) -> Outcome: """Returns an outcome that is guaratneed to have correct types or raises an exception""" return outcome
@property def cardinality(self) -> int: """The space cardinality = the number of outcomes""" return len(self._baseset)
[docs] def is_numeric(self) -> bool: """Checks whether all values in all outcomes are numeric""" samples = random.choices(list(self._baseset), k=int(min(self.cardinality, 10))) numeric = [all(isinstance(_, numbers.Number) for _ in s) for s in samples] return all(numeric)
[docs] def is_integer(self) -> bool: """Checks whether all values in all outcomes are integers""" samples = random.choices(list(self._baseset), k=int(min(self.cardinality, 10))) numeric = [all(isinstance(_, numbers.Integral) for _ in s) for s in samples] return all(numeric)
[docs] def is_float(self) -> bool: """Checks whether all values in all outcomes are real""" samples = random.choices(list(self._baseset), k=int(min(self.cardinality, 10))) numeric = [ all( isinstance(_, numbers.Real) and not isinstance(_, numbers.Integral) for _ in s ) for s in samples ] return all(numeric)
[docs] def to_discrete( self, levels: int | float = 5, max_cardinality: int | float = float("inf") ) -> DiscreteOutcomeSpace: """ Returns a **stable** finite outcome space. If the outcome-space is already finite. It shoud return itself. Args: levels: The levels of discretization of any continuous dimension (or subdimension) max_cardintlity: The maximum cardinality allowed for the resulting outcomespace (if the original OS was infinite). This limitation is **NOT** applied for outcome spaces that are alredy discretized. See `limit_cardinality()` for a method to limit the cardinality of an already discrete space If called again, it should return the same discrete outcome space every time. """ return self
[docs] def random_outcome(self) -> Outcome: """Returns a single random outcome.""" return list(self.sample(1))[0]
[docs] def to_largest_discrete( self, levels: int, max_cardinality: int | float = float("inf"), **kwargs ) -> DiscreteOutcomeSpace: return self
[docs] def cardinality_if_discretized( self, levels: int, max_cardinality: int | float = float("inf") ) -> int: """ Returns the cardinality if discretized the given way. """ return self.cardinality
[docs] def enumerate_or_sample( self, levels: int | float = float("inf"), max_cardinality: int | float = float("inf"), ) -> Iterable[Outcome]: """Enumerates all outcomes if possible (i.e. discrete space) or returns `max_cardinality` different outcomes otherwise""" return self.enumerate()
[docs] def is_discrete(self) -> bool: """Checks whether there are no continua components of the space""" return True
[docs] def is_finite(self) -> bool: """Checks whether the space is finite""" return self.is_discrete()
def __contains__(self, item: Outcome | OutcomeSpace | Issue) -> bool: # type: ignore if isinstance(item, Issue): return False if isinstance(item, Outcome): return item in self._baseset if not isinstance(item, OutcomeSpace): return False if isinf(item.cardinality): return False return all(x in self for x in item.enumerate_or_sample())
[docs] def enumerate(self) -> Iterable[Outcome]: """ Enumerates the outcome space returning all its outcomes (or up to max_cardinality for infinite ones) """ return self._baseset
[docs] def sample( self, n_outcomes: int, with_replacement: bool = False, fail_if_not_enough=False ) -> Iterable[Outcome]: """Samples up to n_outcomes with or without replacement""" if self.cardinality < n_outcomes and not with_replacement: return [] if with_replacement: return (random.choice(list(self._baseset)) for _ in range(n_outcomes)) return random.sample(list(self._baseset), k=n_outcomes)
[docs] def limit_cardinality( self, max_cardinality: int | float = float("inf"), levels: int | float = float("inf"), ) -> DiscreteOutcomeSpace: """ Limits the cardinality of the outcome space to the given maximum (or the number of levels for each issue to `levels`) Args: max_cardinality: The maximum number of outcomes in the resulting space levels: The maximum levels allowed per issue (if issues are defined for this outcome space) """ ...
[docs] def to_single_issue( self, numeric: bool = False, stringify: bool = True ) -> CartesianOutcomeSpace: ...
[docs] @define(frozen=True) class CartesianOutcomeSpace(XmlSerializable): """ An outcome-space that is generated by the cartesian product of a tuple of `Issue` s. """ issues: tuple[Issue, ...] = field(converter=tuple) name: str | None = field(eq=False, default=None) def __attrs_post_init__(self): if not self.name: object.__setattr__(self, "name", unique_name("os", add_time=False, sep="")) def __mul__(self, other: CartesianOutcomeSpace) -> CartesianOutcomeSpace: issues = list(self.issues) + list(other.issues) name = f"{self.name}*{other.name}" return CartesianOutcomeSpace(tuple(issues), name=name)
[docs] def contains_issue(self, x: Issue) -> bool: """Cheks that the given issue is in the tuple of issues constituting the outcome space (i.e. it is one of its dimensions)""" return x in self.issues
[docs] def is_valid(self, outcome: Outcome) -> bool: return outcome_is_valid(outcome, self.issues)
[docs] def is_discrete(self) -> bool: """Checks whether all issues are discrete""" return all(_.is_discrete() for _ in self.issues)
[docs] def is_finite(self) -> bool: """Checks whether the space is finite""" return self.is_discrete()
[docs] def contains_os(self, x: OutcomeSpace) -> bool: """Checks whether an outcome-space is contained in this outcome-space""" if isinstance(x, CartesianOutcomeSpace): return len(self.issues) == len(x.issues) and all( b in a for a, b in zip(self.issues, x.issues) ) if self.is_finite() and not x.is_finite(): return False if not self.is_finite() and not x.is_finite(): raise NotImplementedError( "Cannot check an infinite outcome space that is not cartesian for inclusion in an infinite cartesian outcome space!!" ) warn( f"Testing inclusion of a finite non-carteisan outcome space in a cartesian outcome space can be very slow (will do {x.cardinality} checks)", NegmasSpeedWarning, ) return all(self.is_valid(_) for _ in x.enumerate()) # type: ignore If we are here, we know that x is finite
[docs] def to_dict(self, python_class_identifier=PYTHON_CLASS_IDENTIFIER): d = {python_class_identifier: get_full_type_name(type(self))} return dict( **d, name=self.name, issues=serialize( self.issues, python_class_identifier=python_class_identifier ), )
[docs] @classmethod def from_dict(cls, d, python_class_identifier=PYTHON_CLASS_IDENTIFIER): return cls(**deserialize(d, python_class_identifier=python_class_identifier)) # type: ignore
@property def issue_names(self) -> list[str]: """Returns an ordered list of issue names""" return [_.name for _ in self.issues] @property def cardinality(self) -> int | float: """The space cardinality = the number of outcomes""" return reduce(mul, [_.cardinality for _ in self.issues], 1)
[docs] def is_compact(self) -> bool: """Checks whether all issues are complete ranges""" return all(isinstance(_, RangeIssue) for _ in self.issues)
[docs] def is_all_continuous(self) -> bool: """Checks whether all issues are discrete""" return all(_.is_continuous() for _ in self.issues)
[docs] def is_not_discrete(self) -> bool: """Checks whether all issues are discrete""" return any(_.is_continuous() for _ in self.issues)
[docs] def is_numeric(self) -> bool: """Checks whether all issues are numeric""" return all(_.is_numeric() for _ in self.issues)
[docs] def is_integer(self) -> bool: """Checks whether all issues are integer""" return all(_.is_integer() for _ in self.issues)
[docs] def is_float(self) -> bool: """Checks whether all issues are real""" return all(_.is_float() for _ in self.issues)
[docs] def to_discrete( self, levels: int | float = 10, max_cardinality: int | float = float("inf") ) -> DiscreteCartesianOutcomeSpace: """ Discretizes the outcome space by sampling `levels` values for each continuous issue. The result of the discretization is stable in the sense that repeated calls will return the same output. """ if max_cardinality != float("inf"): c = reduce( mul, [_.cardinality if _.is_discrete() else levels for _ in self.issues], 1, ) if c > max_cardinality: raise ValueError( f"Cannot convert OutcomeSpace to a discrete OutcomeSpace with at most {max_cardinality} (at least {c} outcomes are required)" ) issues = tuple( issue.to_discrete( levels if issue.is_continuous() else None, compact=False, grid=True, endpoints=True, ) for issue in self.issues ) return DiscreteCartesianOutcomeSpace(issues=issues, name=self.name)
[docs] @classmethod def from_xml_str( cls, xml_str: str, safe_parsing=True, name=None, **kwargs ) -> CartesianOutcomeSpace: issues, _ = issues_from_xml_str( xml_str, safe_parsing=safe_parsing, n_discretization=None ) if not issues: raise ValueError("Failed to read an issue space from an xml string") issues = tuple(issues) if all(isinstance(_, DiscreteIssue) for _ in issues): return DiscreteCartesianOutcomeSpace(issues, name=name) return cls(issues, name=name)
[docs] @staticmethod def from_outcomes( outcomes: list[Outcome], numeric_as_ranges: bool = False, issue_names: list[str] | None = None, name: str | None = None, ) -> DiscreteCartesianOutcomeSpace: return DiscreteCartesianOutcomeSpace( issues_from_outcomes(outcomes, numeric_as_ranges, issue_names), name=name )
[docs] def to_xml_str(self, **kwargs) -> str: return issues_to_xml_str(self.issues)
[docs] def are_types_ok(self, outcome: Outcome) -> bool: """Checks if the type of each value in the outcome is correct for the given issue""" return outcome_types_are_ok(outcome, self.issues)
[docs] def ensure_correct_types(self, outcome: Outcome) -> Outcome: """Returns an outcome that is guaratneed to have correct types or raises an exception""" return cast_value_types(outcome, self.issues)
[docs] def sample( self, n_outcomes: int, with_replacement: bool = True, fail_if_not_enough=True ) -> Iterable[Outcome]: return sample_issues( self.issues, n_outcomes, with_replacement, fail_if_not_enough )
[docs] def random_outcome(self): return tuple(_.rand() for _ in self.issues)
[docs] def cardinality_if_discretized( self, levels: int, max_cardinality: int | float = float("inf") ) -> int: c = reduce( mul, [_.cardinality if _.is_discrete() else levels for _ in self.issues], 1 ) return min(c, max_cardinality)
[docs] def to_largest_discrete( self, levels: int, max_cardinality: int | float = float("inf"), **kwargs ) -> DiscreteCartesianOutcomeSpace: for level in range(levels, 0, -1): if self.cardinality_if_discretized(levels) < max_cardinality: break else: raise ValueError( f"Cannot discretize with levels <= {levels} keeping the cardinality under {max_cardinality} Outocme space cardinality is {self.cardinality}\nOutcome space: {self}" ) return self.to_discrete(level, max_cardinality, **kwargs)
[docs] def enumerate_or_sample_rational( self, preferences: Iterable[HasReservedValue | HasReservedOutcome], levels: int | float = float("inf"), max_cardinality: int | float = float("inf"), aggregator: Callable[[Iterable[bool]], bool] = any, ) -> Iterable[Outcome]: """ Enumerates all outcomes if possible (i.e. discrete space) or returns `max_cardinality` different outcomes otherwise. Args: preferences: A list of `Preferences` that is used to judge outcomes levels: The number of levels to use for discretization if needed max_cardinality: The maximum cardinality allowed in case of discretization aggregator: A predicate that takes an `Iterable` of booleans representing whether or not an outcome is rational for a given `Preferences` (i.e. better than reservation) and returns a single boolean representing the result for all preferences. Default is any but can be all. """ from negmas.preferences.protocols import HasReservedOutcome, HasReservedValue if ( levels == float("inf") and max_cardinality == float("inf") and not self.is_discrete() ): raise ValueError( "Cannot enumerate-or-sample an outcome space with infinite outcomes without specifying `levels` and/or `max_cardinality`" ) from negmas.outcomes.outcome_space import DiscreteCartesianOutcomeSpace if isinstance(self, DiscreteCartesianOutcomeSpace): results = self.enumerate() # type: ignore We know the outcome space is correct else: if max_cardinality == float("inf"): return self.to_discrete( levels=levels, max_cardinality=max_cardinality ).enumerate() results = self.sample( int(max_cardinality), with_replacement=False, fail_if_not_enough=False ) def is_irrational(x: Outcome): def irrational(u: HasReservedOutcome | HasReservedValue, x: Outcome): if isinstance(u, HasReservedValue): if u.reserved_value is None: return False return u(x) < u.reserved_value # type: ignore if isinstance(u, HasReservedOutcome) and u.reserved_outcome is not None: return u.is_worse(x, u.reserved_outcome) # type: ignore return False return aggregator(irrational(u, x) for u in preferences) return filterfalse(lambda x: is_irrational(x), results)
[docs] def enumerate_or_sample( self, levels: int | float = float("inf"), max_cardinality: int | float = float("inf"), ) -> Iterable[Outcome]: """Enumerates all outcomes if possible (i.e. discrete space) or returns `max_cardinality` different outcomes otherwise""" if ( levels == float("inf") and max_cardinality == float("inf") and not self.is_discrete() ): raise ValueError( "Cannot enumerate-or-sample an outcome space with infinite outcomes without specifying `levels` and/or `max_cardinality`" ) from negmas.outcomes.outcome_space import DiscreteCartesianOutcomeSpace if isinstance(self, DiscreteCartesianOutcomeSpace): return self.enumerate() # type: ignore We know the outcome space is correct if max_cardinality == float("inf"): return self.to_discrete( levels=levels, max_cardinality=max_cardinality ).enumerate() return self.sample( int(max_cardinality), with_replacement=False, fail_if_not_enough=False )
[docs] def to_single_issue( self, numeric=False, stringify=True, levels: int = NLEVELS, max_cardinality: int | float = float("inf"), ) -> DiscreteCartesianOutcomeSpace: """ Creates a new outcome space that is a single-issue version of this one discretizing it as needed Args: numeric: If given, the output issue will be a `ContiguousIssue` otberwise it will be a `CategoricalIssue` stringify: If given, the output issue will have string values. Checked only if `numeric` is `False` levels: Number of levels to discretize any continuous issue max_cardinality: Maximum allowed number of outcomes in the resulting issue. Remarks: - Will discretize inifinte outcome spaces """ if isinstance(self, DiscreteCartesianOutcomeSpace) and len(self.issues) == 1: return self dos = self.to_discrete(levels, max_cardinality) return dos.to_single_issue(numeric, stringify) # type: ignore
def __contains__(self, item): if isinstance(item, OutcomeSpace): return self.contains_os(item) if isinstance(item, Issue): return self.contains_issue(item) if isinstance(item, Outcome): return self.is_valid(item) if not isinstance(item, Sequence): return False if not item: return True if isinstance(item[0], Issue): return len(self.issues) == len(item) and self.contains_os( make_os(issues=item) ) if isinstance(item[0], Outcome): return len(self.issues) == len(item) and self.contains_os( make_os(outcomes=item) ) return False
[docs] @define(frozen=True) class DiscreteCartesianOutcomeSpace(CartesianOutcomeSpace): """ A discrete outcome-space that is generated by the cartesian product of a tuple of `Issue` s (i.e. with finite number of outcomes). """
[docs] def to_largest_discrete( self, levels: int, max_cardinality: int | float = float("inf"), **kwargs ) -> DiscreteCartesianOutcomeSpace: return self
def __attrs_post_init__(self): for issue in self.issues: if not issue.is_discrete(): raise ValueError( f"Issue is not discrete. Cannot be added to a DiscreteOutcomeSpace. You must discretize it first: {issue} " ) @property def cardinality(self) -> int: return reduce(mul, [_.cardinality for _ in self.issues], 1)
[docs] def cardinality_if_discretized( self, levels: int, max_cardinality: int | float = float("inf") ) -> int: return self.cardinality
[docs] def enumerate(self) -> Iterable[Outcome]: return enumerate_discrete_issues( # type: ignore I know that all my issues are actually discrete self.issues # type: ignore I know that all my issues are actually discrete )
[docs] def limit_cardinality( self, max_cardinality: int | float = float("inf"), levels: int | float = float("inf"), ) -> DiscreteCartesianOutcomeSpace: """ Limits the cardinality of the outcome space to the given maximum (or the number of levels for each issue to `levels`) Args: max_cardinality: The maximum number of outcomes in the resulting space levels: The maximum number of levels for each issue/subissue """ if self.cardinality <= max_cardinality or all( _.cardinality < levels for _ in self.issues ): return self new_levels = [_.cardinality for _ in self.issues] # type: ignore will be corrected the next line new_levels = [int(_) if _ < levels else int(levels) for _ in new_levels] new_cardinality = reduce(mul, new_levels, 1) def _reduce_total_cardinality(new_levels, max_cardinality, new_cardinality): sort = reversed(sorted((_, i) for i, _ in enumerate(new_levels))) sorted_levels = [_[0] for _ in sort] indices = [_[1] for _ in sort] needed = new_cardinality - max_cardinality current = 0 n = len(sorted_levels) while needed > 0 and current < n: nxt = n - 1 v = sorted_levels[current] if v == 1: continue for i in range(current + 1, n - 1): if v == sorted_levels[i]: continue nxt = i break diff = v - sorted_levels[nxt] if not diff: diff = 1 new_levels[indices[current]] -= 1 max_cardinality = (max_cardinality // v) * (v - 1) sort = reversed(sorted((_, i) for i, _ in enumerate(new_levels))) sorted_levels = [_[0] for _ in sort] current = 0 needed = new_cardinality - max_cardinality return new_levels if new_cardinality > max_cardinality: new_levels: list[int] = _reduce_total_cardinality( new_levels, max_cardinality, new_cardinality ) issues: list[Issue] = [] for j, i, issue in zip( new_levels, (_.cardinality for _ in self.issues), self.issues ): issues.append(issue if j >= i else issue.to_discrete(j, compact=True)) return DiscreteCartesianOutcomeSpace( tuple(issues), name=f"{self.name}-{max_cardinality}" )
[docs] def is_discrete(self) -> bool: """Checks whether there are no continua components of the space""" return True
[docs] def to_discrete( self, levels: int | float = 10, max_cardinality: int | float = float("inf") ) -> DiscreteCartesianOutcomeSpace: return self
[docs] def to_single_issue( self, numeric=False, stringify=True, levels: int = NLEVELS, max_cardinality: int | float = float("inf"), ) -> DiscreteCartesianOutcomeSpace: """ Creates a new outcome space that is a single-issue version of this one Args: numeric: If given, the output issue will be a `ContiguousIssue` otherwise it will be a `CategoricalIssue` stringify: If given, the output issue will have string values. Checked only if `numeric` is `False` Remarks: - maps the agenda and ufuns to work correctly together - Only works if the outcome space is finite """ outcomes = list(self.enumerate()) values = ( range(len(outcomes)) if numeric else [f"v{_}" for _ in range(len(outcomes))] if stringify else outcomes ) issue = ( ContiguousIssue(len(outcomes), name="-".join(self.issue_names)) if numeric else CategoricalIssue(values, name="-".join(self.issue_names)) ) return DiscreteCartesianOutcomeSpace(issues=(issue,), name=self.name)
# def sample( # self, # n_outcomes: int, # with_replacement: bool = False, # fail_if_not_enough=True, # ) -> Iterable[Outcome]: # """ # Samples up to n_outcomes with or without replacement. # """ # # return sample_issues( # self.issues, n_outcomes, with_replacement, fail_if_not_enough # ) # # # outcomes = self.enumerate() # # outcomes = list(outcomes) # # if with_replacement: # # return random.choices(outcomes, k=n_outcomes) # # if fail_if_not_enough and n_outcomes > self.cardinality: # # raise ValueError("Cannot sample enough") # # random.shuffle(outcomes) # # return outcomes[:n_outcomes] def __iter__(self): return self.enumerate().__iter__() def __len__(self) -> int: return self.cardinality
# def flat_issues( # outcome_spaces: tuple[OutcomeSpace, ...], # add_index_to_issue_names: bool = False, # add_os_to_issue_name: bool = False, # ) -> tuple[Issue, ...]: # """Generates a single outcome-space which is the Cartesian product of input outcome_spaces.""" # # from negmas.outcomes import make_issue # from negmas.outcomes.optional_issue import OptionalIssue # # def _name(i: int, os_name: str | None, issue_name: str | None) -> str: # x = issue_name if issue_name else "" # if add_os_to_issue_name and os_name: # x = f"{os_name}:{x}" # if add_index_to_issue_names: # x = f"{x}:{i}" # return x # # values, names, nissues = [], [], [] # for i, os in enumerate(outcome_spaces): # if isinstance(os, EnumeratingOutcomeSpace): # values.append(list(os.enumerate())) # names.append(_name(i, "", os.name)) # nissues.append(1) # elif isinstance(os, CartesianOutcomeSpace): # for issue in os.issues: # values.append(issue.values) # names.append(_name(i, os.name, issue.name)) # nissues.append(len(os.issues)) # else: # raise TypeError( # f"Outcome space of type {type(os)} cannot be combined with other outcome-spaces" # ) # return tuple(OptionalIssue(make_issue(v, n), n) for v, n in zip(values, names)) # # # @define(frozen=True) # class DiscreteCartesianOutcomeSpaceProduct(DiscreteCartesianOutcomeSpace): # """ # A discrete outcome-space that is by multiplying multiple discrete outcome spaces # """ # # issues: tuple[Issue, ...] = field(init=False) # type: ignore # outcome_spaces: tuple[CartesianOutcomeSpace, ...] = field(init=True, default=None) # _sizes: tuple[int, ...] = field(init=False, default=None) # _extended_sizes: tuple[int, ...] = field(init=False, default=None) # _cardinality: int = field(init=False, default=0) # # def __attrs_post_init__(self): # object.__setattr__(self, "issues", flat_issues(self.outcome_spaces)) # for issue in self.issues: # if not issue.is_discrete(): # raise ValueError( # f"Issue is not discrete. Cannot be added to a DiscreteOutcomeSpace. You must discretize it first: {issue} " # ) # object.__setattr__( # self, "_sizes", tuple(int(_.cardinality) for _ in self.outcome_spaces) # ) # object.__setattr__(self, "_extended_sizes", tuple(_ + 1 for _ in self._sizes)) # object.__setattr__(self, "_cardinality", reduce(mul, self._extended_sizes, 1)) # # @property # def cardinality(self) -> int: # return self._cardinality # # def cardinality_if_discretized( # self, levels: int, max_cardinality: int | float = float("inf") # ) -> int: # return self._cardinality # # def enumerate(self) -> Iterable[Outcome]: # return enumerate_discrete_issues( # type: ignore I know that all my issues are actually discrete # self.issues # type: ignore I know that all my issues are actually discrete # ) # # def limit_cardinality( # self, # max_cardinality: int | float = float("inf"), # levels: int | float = float("inf"), # ) -> DiscreteCartesianOutcomeSpace: # """ # Limits the cardinality of the outcome space to the given maximum (or the number of levels for each issue to `levels`) # # Args: # max_cardinality: The maximum number of outcomes in the resulting space # levels: The maximum number of levels for each issue/subissue # """ # if self.cardinality <= max_cardinality or all( # _.cardinality < levels for _ in self.issues # ): # return self # new_levels = [_.cardinality for _ in self.issues] # type: ignore will be corrected the next line # new_levels = [int(_) if _ < levels else int(levels) for _ in new_levels] # new_cardinality = reduce(mul, new_levels, 1) # # def _reduce_total_cardinality(new_levels, max_cardinality, new_cardinality): # sort = reversed(sorted((_, i) for i, _ in enumerate(new_levels))) # sorted_levels = [_[0] for _ in sort] # indices = [_[1] for _ in sort] # needed = new_cardinality - max_cardinality # current = 0 # n = len(sorted_levels) # while needed > 0 and current < n: # nxt = n - 1 # v = sorted_levels[current] # if v == 1: # continue # for i in range(current + 1, n - 1): # if v == sorted_levels[i]: # continue # nxt = i # break # diff = v - sorted_levels[nxt] # if not diff: # diff = 1 # new_levels[indices[current]] -= 1 # max_cardinality = (max_cardinality // v) * (v - 1) # sort = reversed(sorted((_, i) for i, _ in enumerate(new_levels))) # sorted_levels = [_[0] for _ in sort] # current = 0 # needed = new_cardinality - max_cardinality # return new_levels # # if new_cardinality > max_cardinality: # new_levels: list[int] = _reduce_total_cardinality( # new_levels, max_cardinality, new_cardinality # ) # issues: list[Issue] = [] # for j, i, issue in zip( # new_levels, (_.cardinality for _ in self.issues), self.issues # ): # issues.append(issue if j >= i else issue.to_discrete(j, compact=True)) # return DiscreteCartesianOutcomeSpace( # tuple(issues), name=f"{self.name}-{max_cardinality}" # ) # # def is_discrete(self) -> bool: # """Checks whether there are no continua components of the space""" # return True # # def to_discrete( # self, levels: int | float = 10, max_cardinality: int | float = float("inf") # ) -> DiscreteCartesianOutcomeSpace: # return self # # def to_single_issue( # self, # numeric=False, # stringify=True, # levels: int = NLEVELS, # max_cardinality: int | float = float("inf"), # ) -> DiscreteCartesianOutcomeSpace: # """ # Creates a new outcome space that is a single-issue version of this one # # Args: # numeric: If given, the output issue will be a `ContiguousIssue` otherwise it will be a `CategoricalIssue` # stringify: If given, the output issue will have string values. Checked only if `numeric` is `False` # # Remarks: # - maps the agenda and ufuns to work correctly together # - Only works if the outcome space is finite # """ # outcomes = list(self.enumerate()) # values = ( # range(len(outcomes)) # if numeric # else [f"v{_}" for _ in range(len(outcomes))] # if stringify # else outcomes # ) # issue = ( # ContiguousIssue(len(outcomes), name="-".join(self.issue_names)) # if numeric # else CategoricalIssue(values, name="-".join(self.issue_names)) # ) # return DiscreteCartesianOutcomeSpace(issues=(issue,), name=self.name) # # # def sample( # # self, # # n_outcomes: int, # # with_replacement: bool = False, # # fail_if_not_enough=True, # # ) -> Iterable[Outcome]: # # """ # # Samples up to n_outcomes with or without replacement. # # """ # # # # return sample_issues( # # self.issues, n_outcomes, with_replacement, fail_if_not_enough # # ) # # # # # outcomes = self.enumerate() # # # outcomes = list(outcomes) # # # if with_replacement: # # # return random.choices(outcomes, k=n_outcomes) # # # if fail_if_not_enough and n_outcomes > self.cardinality: # # # raise ValueError("Cannot sample enough") # # # random.shuffle(outcomes) # # # return outcomes[:n_outcomes] # # def __iter__(self): # return self.enumerate().__iter__() # # def __len__(self) -> int: # return self.cardinality