Source code for negmas.outcomes.issue_ops

from __future__ import annotations
import copy
import itertools
import json
import numbers
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence

from negmas.outcomes.contiguous_issue import ContiguousIssue
from negmas.warnings import warn_if_slow

if TYPE_CHECKING:
    from .common import Outcome

import numpy as np

from negmas.helpers import PathLike

from .base_issue import DiscreteIssue, Issue, make_issue
from .outcome_ops import outcome2dict

__all__ = [
    "generate_issues",
    "issues_from_genius",
    "issues_from_geniusweb",
    "issues_from_xml_str",
    "issues_from_geniusweb_json_str",
    "issues_to_genius",
    "issues_to_xml_str",
    "issues_from_outcomes",
    "num_outcomes",
    "enumerate_issues",
    "enumerate_discrete_issues",
    "discretize_and_enumerate_issues",
    "sample_issues",
    "sample_outcomes",
    "combine_issues",
]


DUMMY_ISSUE_NAME = "DUMMY_ISSUE"


[docs] def num_outcomes(issues: Sequence[Issue]) -> int | float: """ Returns the total number of outcomes in a set of issues. """ n = 1 for issue in issues: n *= issue.cardinality return n
[docs] def enumerate_discrete_issues(issues: Sequence[DiscreteIssue]) -> list[Outcome]: """ Enumerates all outcomes of this set of discrete issues if possible. Args: issues: A list of issues Returns: list of outcomes """ n = num_outcomes(issues) warn_if_slow(n, "Enumerating large OS (discrete)") return list(itertools.product(*(_.all for _ in issues)))
[docs] def sample_outcomes( issues: Sequence[Issue], n_outcomes: int | None = None, min_per_dim: int = 5, expansion_policy=None, ) -> list[Outcome]: """ Discretizes the issue space and returns either a predefined number of outcomes or uniform samples. Args: issues: The issues describing the issue space to be discretized n_outcomes: If None then exactly `min_per_dim` bins will be used for every continuous dimension and all outcomes will be returned min_per_dim: Max levels of discretization per dimension expansion_policy: None or 'repeat' or 'null' or 'no'. If repeat, then some of the outcomes will be repeated if None or 'no' then no expansion will happen if the total number of outcomes is less than n_outcomes: If 'null' then expansion will be with None values Returns: list of outcomes Examples: enumberate the whole space >>> from negmas.outcomes import make_issue >>> issues = [ ... make_issue(values=(0.0, 1.0), name="Price"), ... make_issue(values=["a", "b"], name="Name"), ... ] >>> sample_outcomes(issues=issues) [(0.0, 'a'), (0.0, 'b'), (0.25, 'a'), (0.25, 'b'), (0.5, 'a'), (0.5, 'b'), (0.75, 'a'), (0.75, 'b'), (1.0, 'a'), (1.0, 'b')] enumerate with sampling for very large space (we have 10 outcomes in the discretized space) >>> from negmas.outcomes import make_issue >>> issues = [ ... make_issue(values=(0.0, 1.0), name="Price"), ... make_issue(values=["a", "b"], name="Name"), ... ] >>> issues[0].is_continuous() True >>> sampled = sample_outcomes(issues=issues, n_outcomes=5) >>> len(sampled) 5 >>> len(set(sampled)) 5 >>> from negmas.outcomes import make_issue >>> issues = [ ... make_issue(values=(0, 1), name="Price"), ... make_issue(values=["a", "b"], name="Name"), ... ] >>> issues[0].is_continuous() False >>> sampled = sample_outcomes(issues=issues, n_outcomes=5) >>> len(sampled) 4 >>> len(set(sampled)) 4 """ from negmas.outcomes import enumerate_discrete_issues issues = list(issues) issues = [copy.deepcopy(_) for _ in issues] continuous = [] indx = [] discrete = [] n_disc = 0 for i, issue in enumerate(issues): if issue.is_continuous(): continuous.append(issue) indx.append(i) else: discrete.append(issue) n_disc += issue.cardinality if len(continuous) > 0: if n_outcomes is not None: n_per_issue = max(min_per_dim, int(n_outcomes - n_disc) // len(continuous)) else: n_per_issue = min_per_dim for i, issue in enumerate(continuous): issues[indx[i]] = make_issue( name=issue.name, values=list( np.linspace( issue.min_value, issue.max_value, num=n_per_issue, endpoint=True ).tolist() ), ) cardinality = 1 for issue in issues: cardinality *= issue.cardinality if cardinality == n_outcomes or n_outcomes is None: return list( enumerate_discrete_issues(issues) # type: ignore I am sure that the issues are all discrete by this point ) if cardinality < n_outcomes: cardinality = int(cardinality) outcomes = list( enumerate_discrete_issues(issues) # type: ignore I am sure that the issues are all discrete by this point ) if expansion_policy == "no" or expansion_policy is None: return outcomes elif expansion_policy == "null": return outcomes + [None] * (n_outcomes - cardinality) elif expansion_policy == "repeat": n_reps = n_outcomes // cardinality n_rem = n_outcomes % cardinality if n_reps > 1: for _ in range(n_reps): outcomes += outcomes if n_rem > 0: outcomes += outcomes[:n_rem] return outcomes n_per_issue = 1 + int(n_outcomes ** (1 / len(issues))) vals = [] n_found = 1 for issue in issues: if n_per_issue < 2: vals.append([issue.rand()]) continue if issue.cardinality < n_per_issue: vals.append(issue.all) n_found *= issue.cardinality else: vals.append(issue.rand_outcomes(n=n_per_issue, with_replacement=False)) n_found *= n_per_issue if n_found >= n_outcomes: n_per_issue = 1 outcomes = itertools.product(*vals) return list(outcomes)[:n_outcomes]
def _sample_issues( issues: Sequence[Issue], n: int, with_replacement, n_total, old_values, trial, max_trials, ) -> Iterable[Outcome]: if trial > max_trials: return old_values remaining = n - len(old_values) if remaining < 0: return set(list(old_values)[:n]) if remaining == 0: return old_values samples = [] for issue in issues: samples.append( issue.rand_outcomes( n=remaining, with_replacement=True, fail_if_not_enough=True ) ) _v = [] for i in range(remaining): _v.append([s[i] for s in samples]) new_values = [] for value in _v: new_values.append(tuple(value)) new_values = set(new_values) values = old_values.union(new_values) remaining = n - len(values) if remaining < 0: return set(list(values)[:n]) if remaining < 1: return values return values.union( _sample_issues( issues, remaining, with_replacement, n_total, values, trial + 1, max_trials ) )
[docs] def sample_issues( issues: Sequence[Issue], n_outcomes: int, with_replacement: bool = True, fail_if_not_enough=True, ) -> Iterable[Outcome]: """ Samples some outcomes from the outcome space defined by the list of issues. Args: issues: list of issues to sample from n_outcomes: The number of outcomes required with_replacement: Whether sampling is with replacement (allowing repetition) fail_if_not_enough: IF given then an exception is raised if not enough outcomes are available Returns: a list of outcomes Examples: >>> from negmas.outcomes import make_issue >>> issues = [ ... make_issue(name="price", values=(0.0, 3.0)), ... make_issue(name="quantity", values=10), ... ] Sampling outcomes as tuples >>> samples = sample_issues(issues=issues, n_outcomes=10) >>> len(samples) == 10 True >>> type(samples[0]) == tuple True """ n_total = num_outcomes(issues) if ( n_total is not None and n_total != float("inf") and n_outcomes is not None and n_total < n_outcomes and fail_if_not_enough and not with_replacement ): raise ValueError( f"Cannot sample {n_outcomes} from a total of possible {n_total} outcomes" ) if n_total is not None and n_total != float("inf") and n_outcomes is None: return enumerate_discrete_issues(issues=issues) # type: ignore I know that these issues are discrete if n_total is None and n_outcomes is None: raise ValueError( "Cannot sample unknown number of outcomes from continuous outcome spaces" ) return list( _sample_issues(issues, n_outcomes, with_replacement, n_total, set(), 0, 10) )
[docs] def enumerate_issues( issues: Sequence[Issue], max_cardinality: int | float | None = None ) -> list[Outcome]: """ Enumerates the outcomes of a list of issues. Args: issues: The list of issues. max_cardinality: The maximum number of outcomes to return Returns: list of outcomes of the given type. """ n = num_outcomes(issues) if isinstance(max_cardinality, float) and max_cardinality == float("inf"): max_cardinality = None if n is None and max_cardinality is not None: warn_if_slow(max_cardinality, "Enumerating a large OS") if max_cardinality is None: warn_if_slow(n, "Enumerating a continuous OS") if n is None: if max_cardinality is None: raise ValueError( "Cannot enumerate continuous issues without specifying `max_cardinality`" ) return list( sample_issues( issues=issues, n_outcomes=int(max_cardinality), fail_if_not_enough=False, with_replacement=False, ) ) if max_cardinality is not None and n > max_cardinality: return sample_outcomes(issues=issues, n_outcomes=int(max_cardinality)) else: return list(tuple(_) for _ in itertools.product(*(_.all for _ in issues)))
[docs] def issues_from_outcomes( outcomes: Sequence[Outcome] | int, numeric_as_ranges: bool = True, issue_names: list[str] | None = None, ) -> tuple[DiscreteIssue]: """ Create a set of issues given some outcomes. Args: outcomes: A list of outcomes or the number of outcomes issue_names: If given, will be used as issue names, otherwise random issue names will be used numeric_as_ranges: If True, all numeric issues generated will have ranges that are defined by the minimum and maximum values of that issue in the given outcomes instead of a list of the values that appeared in them. Returns: a list of issues that include the given outcomes. Remarks: - The outcome space spanned by the generated issues can in principle contain many more possible outcomes than the ones given """ if isinstance(outcomes, int): outcomes = [(_,) for _ in range(outcomes)] def convert_type(v, old, values): if isinstance(v, numbers.Integral) and not isinstance(old, numbers.Integral): return float(v) if not isinstance(v, numbers.Integral) and isinstance(old, numbers.Integral): for i, _ in enumerate(values): values[i] = float(_) return v if isinstance(v, str) and (isinstance(old, numbers.Number)): raise ValueError("a string after a number") if isinstance(old, str) and (isinstance(v, numbers.Number)): raise ValueError("a number after a string") return v names = None n_issues = None values = defaultdict(list) for i, o in enumerate(outcomes): if o is None: continue if issue_names is None: if isinstance(o, dict): issue_names = list(o.keys()) else: issue_names = [f"i{_}" for _ in range(len(o))] if n_issues is not None and len(o) != n_issues: raise ValueError( f"Outcome {o} at {i} has {len(o)} issues but an earlier outcome had {n_issues} issues" ) n_issues = len(o) if len(issue_names) != n_issues: raise ValueError( f"Outcome {i} ({o}) has {len(o)} values but we have {len(issue_names)} issue names" ) o_dict = outcome2dict(o, issue_names) if names is not None and not all(a == b for a, b in zip(names, o_dict.keys())): raise ValueError( f"Outcome {o} at {i} has issues {list(o_dict.keys())} but an earlier outcome had issues {names}" ) for i, v in enumerate(o): k = issue_names[i] if len(values[k]) > 0: try: v = convert_type(v, values[k][-1], values[k]) except ValueError as e: raise ValueError( f"Outcome {o} at {i} has value {v} for issue {k} which is incompatible with an earlier " f"value {values[k][-1]} ({str(e)})" ) values[k].append(v) for k, vals in values.items(): values[k] = sorted(list(set(vals))) if numeric_as_ranges: return tuple( # type: ignore (seems ok but not sure) make_issue(values=(v[0], v[-1]), name=n) if len(v) > 0 and (isinstance(v[0], int)) and all(a == b + 1 for a, b in zip(v[1:], v[:-1])) else make_issue(values=v, name=n) for n, v in values.items() ) else: return tuple(make_issue(values=v, name=n) for n, v in values.items()) # type: ignore (seems ok but not sure)
[docs] def issues_to_xml_str(issues: Sequence[Issue]) -> str: """ Converts the list of issues into a well-formed xml string. Examples: >>> issues = [ ... make_issue(values=10, name="i1"), ... make_issue(values=["a", "b", "c"], name="i2"), ... make_issue(values=(2.5, 3.5), name="i3"), ... ] >>> s = issues_to_xml_str(issues) >>> print(s.strip()) <negotiation_template> <utility_space number_of_issues="3"> <objective description="" etype="objective" index="0" name="root" type="objective"> <issue etype="discrete" index="1" name="i1" type="discrete" vtype="integer"> <item index="1" value="0" cost="0" description="0"> </item> <item index="2" value="1" cost="0" description="1"> </item> <item index="3" value="2" cost="0" description="2"> </item> <item index="4" value="3" cost="0" description="3"> </item> <item index="5" value="4" cost="0" description="4"> </item> <item index="6" value="5" cost="0" description="5"> </item> <item index="7" value="6" cost="0" description="6"> </item> <item index="8" value="7" cost="0" description="7"> </item> <item index="9" value="8" cost="0" description="8"> </item> <item index="10" value="9" cost="0" description="9"> </item> </issue> <issue etype="discrete" index="2" name="i2" type="discrete" vtype="discrete"> <item index="1" value="a" cost="0" description="a"> </item> <item index="2" value="b" cost="0" description="b"> </item> <item index="3" value="c" cost="0" description="c"> </item> </issue> <issue etype="real" index="3" name="i3" type="real" vtype="real"> <range lowerbound="2.5" upperbound="3.5"></range> </issue> </objective> </utility_space> </negotiation_template> >>> issues2, _ = issues_from_xml_str(s) >>> print([_.__class__.__name__ for _ in issues2]) ['CategoricalIssue', 'CategoricalIssue', 'ContinuousIssue'] >>> print(len(issues2)) 3 >>> print([str(_) for _ in issues2]) ["i1: ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']", "i2: ['a', 'b', 'c']", 'i3: (2.5, 3.5)'] >>> print([_.values for _ in issues2]) [['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'], ['a', 'b', 'c'], (2.5, 3.5)] """ output = ( f'<negotiation_template>\n<utility_space number_of_issues="{len(issues)}">\n' f'<objective description="" etype="objective" index="0" name="root" type="objective">\n' ) for indx, issue in enumerate(issues): output += issue._to_xml_str(indx) output += "</objective>\n</utility_space>\n</negotiation_template>" return output
[docs] def issues_to_genius(issues: Sequence[Issue], file_name: PathLike | str) -> None: """ Exports a the domain issues to a GENIUS XML file. Args: issues: The issues to be exported file_name (str): File name to export to Returns: A tuple[Issue, ...] or dict[Issue] Examples: >>> import pkg_resources >>> issues, _ = issues_from_genius( ... file_name=pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml" ... ) ... ) >>> issues_to_genius( ... issues=issues, ... file_name=pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/LaptopConv/Laptop-C-domain.xml" ... ), ... ) >>> issues2, _ = issues_from_genius( ... file_name=pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/LaptopConv/Laptop-C-domain.xml" ... ) ... ) >>> print("\\n".join([" ".join(list(issue.all)) for issue in issues])) Dell Macintosh HP 60 Gb 80 Gb 120 Gb 19'' LCD 20'' LCD 23'' LCD >>> print("\\n".join([" ".join(list(issue.all)) for issue in issues2])) Dell Macintosh HP 60 Gb 80 Gb 120 Gb 19'' LCD 20'' LCD 23'' LCD - Forcing Single outcome >>> issues, _ = issues_from_genius( ... file_name=pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml" ... ) ... ) >>> print([list(issue.all) for issue in issues]) [['Dell', 'Macintosh', 'HP'], ['60 Gb', '80 Gb', '120 Gb'], ["19'' LCD", "20'' LCD", "23'' LCD"]] Remarks: See ``from_xml_str`` for all the parameters """ with open(file_name, "w") as f: f.write(issues_to_xml_str(issues=issues))
def issues_from_geniusweb_json( d: dict, safe_parsing=True, n_discretization: int | None = None ) -> tuple[Sequence[Issue] | None, Sequence[str] | None]: """ Exports a list of issues from a GeniusWeb loaded dict Args: d: the loaded dict from GeniusWeb json file safe_parsing (bool): Turn on extra checks n_discretization (Optional[int]): If not None, real valued issues are discretized with the given number of values max_cardinality (int): Maximum number of outcomes allowed (effective only if force_single_issue is True) Returns: - tuple[Issue, ...] The issues (note that issue names will be stored in the name attribute of each issue if keep_issue_names) - list[dict] A list of agent information dicts each contains 'agent', 'class', 'utility_file_name' """ issue_values = d["issuesValues"] issues = [ make_issue(name=name, values=vals["values"]) for name, vals in issue_values.items() ] return tuple(issues), tuple()
[docs] def issues_from_geniusweb_json_str( json_str: str, safe_parsing=True, n_discretization: int | None = None ) -> tuple[Sequence[Issue] | None, Sequence[str] | None]: """ Exports a list/dict of issues from a GeniusWeb json file. Args: json_str (str): The string containing GENIUS style XML domain issue definitions safe_parsing (bool): Turn on extra checks n_discretization (Optional[int]): If not None, real valued issues are discretized with the given number of values max_cardinality (int): Maximum number of outcomes allowed (effective only if force_single_issue is True) Returns: - tuple[Issue, ...] The issues (note that issue names will be stored in the name attribute of each issue if keep_issue_names) - list[dict] A list of agent information dicts each contains 'agent', 'class', 'utility_file_name' """ d = json.loads(json_str) return issues_from_geniusweb_json(d, safe_parsing, n_discretization)
[docs] def issues_from_xml_str( xml_str: str, safe_parsing=True, n_discretization: int | None = None ) -> tuple[Sequence[Issue] | None, Sequence[str] | None]: """ Exports a list/dict of issues from a GENIUS XML file. Args: xml_str (str): The string containing GENIUS style XML domain issue definitions safe_parsing (bool): Turn on extra checks n_discretization (Optional[int]): If not None, real valued issues are discretized with the given number of values max_cardinality (int): Maximum number of outcomes allowed (effective only if force_single_issue is True) Returns: - tuple[Issue, ...] The issues (note that issue names will be stored in the name attribute of each issue if keep_issue_names) - list[dict] A list of agent information dicts each contains 'agent', 'class', 'utility_file_name' Examples: >>> import pkg_resources >>> domain_file_name = pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml" ... ) >>> with open(domain_file_name, "r") as ff: ... issues, _ = issues_from_xml_str(ff.read()) >>> print([_.cardinality for _ in issues]) [3, 3, 3] >>> domain_file_name = pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/fuzzyagent/single_issue_domain.xml" ... ) >>> with open(domain_file_name, "r") as ff: ... issues, _ = issues_from_xml_str(ff.read()) >>> len(issues) 1 >>> type(issues) <class 'tuple'> >>> str(issues[0]).split(": ")[-1] '(10.0, 40.0)' >>> print([_.cardinality for _ in issues]) [inf] """ root = ET.fromstring(xml_str) if safe_parsing and root.tag != "negotiation_template": raise ValueError(f"Root tag is {root.tag}: negotiation_template") utility_space = None agents = [] for child in root: if child.tag == "utility_space": utility_space = child for _ in utility_space: if _.tag == "objective": utility_space = _ break elif child.tag == "agent": agents.append(child.attrib) if utility_space is None: if safe_parsing: raise ValueError("No objective child was found in the root") utility_space = root issues_dict: dict[int | str, Any] = {} issue_info = {} for child in utility_space: if child.tag == "issue": indx = int(child.attrib["index"]) - 1 issue_name = str(child.attrib["name"]) issue_info[issue_name] = {"name": issue_name, "index": indx} info = {"type": "discrete", "etype": "discrete", "vtype": "discrete"} for a in ("type", "etype", "vtype"): info[a] = child.attrib.get(a, info[a]) mytype = info["type"] if mytype == "discrete": issues_dict[issue_name] = [] for item in child: if item.tag == "item": item_name = item.attrib.get("value", None) if ( item_name not in issues_dict[issue_name] ): # ignore repeated items issues_dict[issue_name].append(item_name) elif mytype in ("integer", "real"): lower_, upper_ = ( child.attrib.get("lowerbound", None), child.attrib.get("upperbound", None), ) for rng_child in child: if rng_child.tag == "range": lower_, upper_ = ( rng_child.attrib.get("lowerbound", lower_), rng_child.attrib.get("upperbound", upper_), ) if lower_ is None: if upper_ is not None and float(upper_) < 0: lower_ = str( -(sys.maxsize // 2) if mytype == "integer" else float("-inf") ) else: lower_ = "0" if upper_ is None: upper_ = str( (sys.maxsize // 2) if mytype == "integer" else float("-inf") ) if mytype == "integer": lower, upper = int(lower_), int(upper_) issues_dict[issue_name] = lower, upper else: lower, upper = float(lower_), float(upper_) if n_discretization is None: issues_dict[issue_name] = lower, upper else: issues_dict[issue_name] = n_discretization else: # I should add the real-valued issues_dict code here raise ValueError(f"Unknown type: {mytype}") else: raise ValueError(f"Unknown child for objective: {child.tag}") issues_by_index = dict() for key, value in issues_dict.items(): indx = issue_info[key]["index"] issues_by_index[indx] = make_issue(values=value, name=issue_info[key]["name"]) n_issues = max(issues_by_index.keys()) + 1 issues = [] for i in range(n_issues): if i not in issues_by_index.keys(): if safe_parsing: raise ValueError( f"No issue with index {i} is found even though we have issues with higher indices" ) issues.append(ContiguousIssue((1, 1), name=DUMMY_ISSUE_NAME)) continue issues.append(issues_by_index[i]) return tuple(issues), tuple(agents)
[docs] def issues_from_geniusweb( file_name: PathLike | str, safe_parsing=True, n_discretization: int | None = None ) -> tuple[Sequence[Issue] | None, Sequence[str] | None]: """ Imports a the domain issues from a GENIUS XML file. Args: file_name (str): File name to import from safe_parsing: Add more checks to parsing n_discretization: Number of discretization levels per issue Returns: A tuple of two optional lists: - tuple[Issue, ...] containing the issues - list[str] containing agent names (that are sometimes stored in the genius domain) Examples: >>> import pkg_resources >>> issues, _ = issues_from_genius( ... file_name=pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml" ... ) ... ) >>> print([_.name for _ in issues]) ['Laptop', 'Harddisk', 'External Monitor'] Remarks: See ``from_xml_str`` for all the parameters """ with open(file_name, encoding="utf-8") as f: json_str = f.read() return issues_from_geniusweb_json_str( json_str=json_str, safe_parsing=safe_parsing, n_discretization=n_discretization, )
[docs] def issues_from_genius( file_name: PathLike | str, safe_parsing=True, n_discretization: int | None = None ) -> tuple[Sequence[Issue] | None, Sequence[str] | None]: """ Imports a the domain issues from a GENIUS XML file. Args: file_name (str): File name to import from safe_parsing: Add more checks to parsing n_discretization: Number of discretization levels per issue Returns: A tuple of two optional lists: - tuple[Issue, ...] containing the issues - list[str] containing agent names (that are sometimes stored in the genius domain) Examples: >>> import pkg_resources >>> issues, _ = issues_from_genius( ... file_name=pkg_resources.resource_filename( ... "negmas", resource_name="tests/data/Laptop/Laptop-C-domain.xml" ... ) ... ) >>> print([_.name for _ in issues]) ['Laptop', 'Harddisk', 'External Monitor'] Remarks: See ``from_xml_str`` for all the parameters """ with open(file_name, encoding="utf-8") as f: xml_str = f.read() return issues_from_xml_str( xml_str=xml_str, safe_parsing=safe_parsing, n_discretization=n_discretization, )
[docs] def generate_issues( params: Sequence[ int | list[str] | tuple[int, int] | Callable | tuple[float, float] ], counts: list[int] | None = None, names: list[str] | None = None, ) -> tuple[Issue, ...]: """ Generates a set of issues with given parameters. Each is optionally repeated. Args: params: The parameters of the issues counts: The number of times to repeat each of the `issues` names: The names to assign to the issues. If None, then string representations of integers starting from zero will be used. Returns: list['Issue']: The list of issues with given conditions """ one_each = counts is None int_names = names is None result: list[Issue] = [] nxt = 0 for i, issue in enumerate(params): count = 1 if one_each else counts[i] # type: ignore for _ in range(count): name = str(nxt) if int_names else names[i] # type: ignore # if count > 1: # name = name + f' {j}' nxt += 1 result.append(make_issue(values=issue, name=name)) return tuple(result)
[docs] def discretize_and_enumerate_issues( issues: Iterable[Issue], n_discretization: int | None = 10, max_cardinality: int | float | None = None, ) -> list[Outcome]: """ Enumerates the outcomes of a list of issues. Args: issues: The list of issues. max_cardinality: The maximum number of outcomes to return Returns: list of outcomes of the given type. """ issues = [ _ if _.is_finite() else make_issue(values=list(_.value_generator(n_discretization)), name=_.name) for _ in issues ] return enumerate_issues(issues, max_cardinality=max_cardinality)
[docs] def combine_issues( issues: Sequence[Issue], name: str | None = None, keep_value_names=True, issue_sep="_", value_sep="-", ) -> Issue | None: """ Combines multiple issues into a single issue. Args: issues: The issues to be combined name: The name of the resulting issue (If not given, combines input issue names) keep_value_names: If true, the values for the generated issue will be a concatenation of values from earlier issues separated by `value_sep`. issue_sep: Separator for the issue name (used only if `keep_issue_names`) value_sep: Separator for the issue name (used only if `keep_value_names`) Remarks: - Only works if the issues have finite cardinality """ n_outcomes = num_outcomes(issues) if n_outcomes == float("inf"): return None name = issue_sep.join([_.name for _ in issues]) if name is None else name if keep_value_names: values = [ value_sep.join([str(_) for _ in outcomes]) for outcomes in enumerate_issues(issues) ] else: values = n_outcomes return make_issue(name=name, values=values)