"""
Defines import/export functionality
"""
from __future__ import annotations
from copy import deepcopy
import math
import xml.etree.ElementTree as ET
from os import PathLike, listdir
from pathlib import Path
from random import shuffle
from typing import Any, Callable, Iterable, Sequence
from attrs import define, field
from negmas.helpers.inout import dump, load
from negmas.helpers.types import get_full_type_name
from negmas.outcomes.outcome_space import make_os
from negmas.preferences.crisp.linear import LinearAdditiveUtilityFunction
from negmas.preferences.ops import ScenarioStats, calc_scenario_stats
from negmas.sao.mechanism import SAOMechanism
from negmas.serialization import PYTHON_CLASS_IDENTIFIER, deserialize, serialize
from .mechanisms import Mechanism
from .negotiators import Negotiator
from .outcomes import (
CartesianOutcomeSpace,
Issue,
issues_from_genius,
issues_from_geniusweb,
)
from .preferences import (
DiscountedUtilityFunction,
UtilityFunction,
conflict_level,
make_discounted_ufun,
nash_points,
opposition_level,
pareto_frontier,
winwin_level,
)
from .preferences.value_fun import TableFun
__all__ = [
"Scenario",
"scenario_size",
"load_genius_domain",
"load_genius_domain_from_folder",
"find_genius_domain_and_utility_files",
"load_geniusweb_domain",
"load_geniusweb_domain_from_folder",
"find_geniusweb_domain_and_utility_files",
"get_domain_issues",
]
STATS_MAX_CARDINALITY = 10_000_000_000
GENIUSWEB_UFUN_TYPES = ("LinearAdditiveUtilitySpace",)
INFO_FILE_NAME = "_info"
[docs]
def scenario_size(self: Scenario):
size = self.outcome_space.cardinality
if math.isinf(size):
size = self.outcome_space.cardinality_if_discretized(10)
for key in ("n_steps", "time_limit", "hiddent_time_limit"):
n = self.mechanism_params.get(key, float("inf"))
if n is not None and not math.isinf(n):
size = size * n
return size
[docs]
@define
class Scenario:
"""
A class representing a negotiation domain
"""
outcome_space: CartesianOutcomeSpace
ufuns: tuple[UtilityFunction, ...]
mechanism_type: type[Mechanism] | None = SAOMechanism
mechanism_params: dict = field(factory=dict)
info: dict[str, Any] = field(factory=dict)
def __lt__(self, other: Scenario):
return scenario_size(self) < scenario_size(other)
@property
def issues(self) -> tuple[Issue, ...]:
return self.outcome_space.issues
[docs]
def plot(self, **kwargs):
from negmas.plots.util import plot_2dutils
return plot_2dutils(
[],
self.ufuns, #
tuple(_.name for _ in self.ufuns), #
offering_negotiators=tuple(_.name for _ in self.ufuns), #
issues=self.outcome_space.issues, #
**kwargs,
)
[docs]
def to_genius_files(self, domain_path: Path, ufun_paths: list[Path]):
"""
Save domain and ufun files to the `path` as XML.
"""
domain_path = Path(domain_path)
ufun_paths = [Path(_) for _ in ufun_paths]
if len(self.ufuns) != len(ufun_paths):
raise ValueError(
f"I have {len(self.ufuns)} ufuns but {len(ufun_paths)} paths were passed!!"
)
domain_path.parent.mkdir(parents=True, exist_ok=True)
self.outcome_space.to_genius(domain_path)
for ufun, path in zip(self.ufuns, ufun_paths):
ufun.to_genius(path, issues=self.issues)
return self
[docs]
def to_genius_folder(self, path: Path):
"""
Save domain and ufun files to the `path` as XML.
"""
path.mkdir(parents=True, exist_ok=True)
domain_name = (
self.outcome_space.name.split("/")[-1]
if self.outcome_space.name
else "domain"
)
ufun_names = [_.name.split("/")[-1] for _ in self.ufuns]
self.outcome_space.to_genius(path / domain_name)
for ufun, name in zip(self.ufuns, ufun_names):
ufun.to_genius(path / name, issues=self.issues)
return self
@property
def n_negotiators(self) -> int:
return len(self.ufuns)
@property
def n_issues(self) -> int:
return len(self.outcome_space.issues)
@property
def issue_names(self) -> list[str]:
return self.outcome_space.issue_names
[docs]
def to_numeric(self) -> Scenario:
"""
Forces all issues in the domain to become numeric
Remarks:
- maps the agenda and ufuns to work correctly together
"""
raise NotImplementedError()
def _randomize(self) -> Scenario:
"""
Randomizes the outcomes in a single issue scneario
"""
shuffle(self.outcome_space.issues[0].values)
return self
[docs]
def to_single_issue(
self, numeric=False, stringify=True, randomize=False
) -> Scenario:
"""
Forces the domain to have a single issue with all possible outcomes
Args:
numeric: If given, the output issue will be a `ContiguousIssue` otherwise it will be a `DiscreteCategoricalIssue`
stringify: If given, the output issue will have string values. Checked only if `numeric` is `False`
randomize: Randomize outcome order when creating the single issue
Remarks:
- maps the agenda and ufuns to work correctly together
- Only works if the outcome space is finite
"""
if (
hasattr(self.outcome_space, "issues")
and len(self.outcome_space.issues) == 1
):
return self if not randomize else self._randomize()
outcomes = list(self.outcome_space.enumerate_or_sample())
sos = self.outcome_space.to_single_issue(numeric, stringify)
ufuns = []
souts = list(sos.issues[0].all)
for u in self.ufuns:
if isinstance(u, DiscountedUtilityFunction):
usave = u
v = u.ufun
while isinstance(v, DiscountedUtilityFunction):
u, v = v, v.ufun
u.ufun = LinearAdditiveUtilityFunction(
values=(
TableFun(dict(zip(souts, [v(_) for _ in outcomes]))),
), # (The error comes from having LRU cach for table ufun's minmax which should be OK)
bias=0.0,
reserved_value=v.reserved_value,
name=v.name,
outcome_space=sos,
)
ufuns.append(usave)
continue
ufuns.append(
LinearAdditiveUtilityFunction(
values=(
TableFun(dict(zip(souts, [u(_) for _ in outcomes]))),
), # (The error comes from having LRU cach for table ufun's minmax which should be OK)
bias=0.0,
reserved_value=u.reserved_value,
name=u.name,
outcome_space=sos,
)
)
self.ufuns = tuple(ufuns)
self.outcome_space = sos
return self if not randomize else self._randomize()
[docs]
def make_session(
self,
negotiators: Callable[[], Negotiator]
| type[Negotiator]
| list[Negotiator]
| tuple[Negotiator, ...]
| None = None,
n_steps: int | float | None = None,
time_limit: float | None = None,
roles: list[str] | None = None,
raise_on_failure_to_enter: bool = True,
share_ufuns: bool = False,
share_reserved_values: bool = False,
**kwargs,
):
"""
Generates a ready to run mechanism session for this domain.
"""
if not self.mechanism_type:
raise ValueError(
"Cannot create the domain because it has no `mechanism_type`"
)
args = self.mechanism_params
args.update(kwargs)
if n_steps:
args["n_steps"] = n_steps
if time_limit:
args["time_limit"] = time_limit
m = self.mechanism_type(outcome_space=self.outcome_space, **args)
if not negotiators:
return m
negs: list[Negotiator]
if share_ufuns:
assert (
len(self.ufuns) == 2
), "Sharing ufuns in multilateral negotiations is not yet supported"
opp_ufuns = reversed(deepcopy(self.ufuns))
if not share_reserved_values:
for u in opp_ufuns:
u.reserved_value = float("nan")
else:
opp_ufuns = [None] * len(self.ufuns)
if not isinstance(negotiators, Iterable):
negs = [
negotiators(
name=ufun.name.split("/")[-1] # type: ignore We trust that the class given is a negotiator and has a name
.replace(".xml", "")
.replace(".yml", ""),
private_info=dict(opponent_ufun=opp_ufun) if opp_ufun else None, # type: ignore We trust that the class given is a negotiator and has private_info
)
for ufun, opp_ufun in zip(self.ufuns, opp_ufuns)
]
else:
negs = list(negotiators)
if share_ufuns:
for neg, ou in zip(negs, opp_ufuns):
if share_ufuns and neg.opponent_ufun is None and ou is not None:
neg._private_info["opponent_ufun"] = ou
if len(self.ufuns) != len(negs) or len(negs) < 1:
raise ValueError(
f"Invalid ufuns ({self.ufuns}) or negotiators ({negotiators})"
)
if not roles:
roles = ["negotiator"] * len(negs)
for n, r, u in zip(negs, roles, self.ufuns):
added = m.add(n, preferences=u, role=r)
if not added and raise_on_failure_to_enter:
raise ValueError(
f"{n.name} (of type {get_full_type_name(n.__class__)}) failed to enter the negotiation {m.name}"
)
return m
[docs]
def scale_min(self, to: float = 1.0) -> Scenario:
"""Normalizes a utility function to the given range
Args:
ufun: The utility function to normalize
outcomes: A collection of outcomes to normalize for
rng: range to normalize to. Default is [0, 1]
levels: Number of levels to use for discretizing continuous issues (if any)
max_cardinality: Maximum allowed number of outcomes resulting after all discretization is done
"""
self.ufuns = tuple(_.scale_min(to) for _ in self.ufuns) # The type is correct
return self
[docs]
def scale_max(self, to: float = 1.0) -> Scenario:
"""Normalizes a utility function to the given range
Args:
ufun: The utility function to normalize
outcomes: A collection of outcomes to normalize for
rng: range to normalize to. Default is [0, 1]
levels: Number of levels to use for discretizing continuous issues (if any)
max_cardinality: Maximum allowed number of outcomes resulting after all discretization is done
"""
self.ufuns = tuple(_.scale_max(to) for _ in self.ufuns) # The type is correct
return self
[docs]
def normalize(self, to: tuple[float, float] = (0.0, 1.0)) -> Scenario:
"""Normalizes a utility function to the given range
Args:
rng: range to normalize to. Default is [0, 1]
"""
self.ufuns = tuple(_.normalize(to) for _ in self.ufuns) # The type is correct
return self
[docs]
def is_normalized(
self,
to: tuple[float | None, float | None] = (None, 1.0),
positive: bool = True,
eps: float = 1e-6,
) -> bool:
"""Checks that all ufuns are normalized in the given range"""
mnmx = [_.minmax() for _ in self.ufuns]
return all(
(to[0] is None or abs(a - to[0]) < eps)
and (to[1] is None or abs(b - to[1]) < eps)
and (not positive or a >= -eps)
for a, b in mnmx
)
[docs]
def discretize(self, levels: int = 10):
"""Discretize all issues"""
self.outcome_space = self.outcome_space.to_discrete(levels)
return self
[docs]
def remove_discounting(self):
"""Removes discounting from all ufuns"""
ufuns = []
for u in self.ufuns:
while isinstance(u, DiscountedUtilityFunction):
u = u.ufun
ufuns.append(u)
self.ufuns = tuple(ufuns)
return self
[docs]
def remove_reserved_values(self, r: float = float("-inf")):
"""Removes reserved values from all ufuns replaacing it with `r`"""
for u in self.ufuns:
u.reserved_value = r
return self
[docs]
def calc_stats(self) -> ScenarioStats:
return calc_scenario_stats(self.ufuns)
[docs]
def serialize(
self, python_class_identifier=PYTHON_CLASS_IDENTIFIER
) -> dict[str, Any]:
"""
Converts the current scenario into a serializable dict.
Remarks:
Rturns a dictionary with the following keys:
- domain: The agenda/outcome-space
- ufuns: A list of utility functions
"""
def get_name(x, default):
if not x:
return str(default)
return x.split("/")[-1].replace(".xml", "")
def adjust(
d,
default_name,
remove_dunder=False,
adjust_name=True,
ignored=("id", "n_values", "outcome_space"),
rename={PYTHON_CLASS_IDENTIFIER: "type"},
):
if isinstance(d, list) or isinstance(d, tuple):
return [
adjust(_, default_name, remove_dunder, adjust_name, ignored)
for _ in d
]
if not isinstance(d, dict):
return d
if adjust_name and "name" in d:
d["name"] = get_name(d["name"], default_name)
if d.get(PYTHON_CLASS_IDENTIFIER, "").startswith("negmas."):
d[PYTHON_CLASS_IDENTIFIER] = d[PYTHON_CLASS_IDENTIFIER].split(".")[-1]
for old, new in rename.items():
if old in d.keys():
d[new] = d[old]
del d[old]
for i in ignored:
if i in d.keys():
del d[i]
for k, v in d.items():
d[k] = adjust(
v,
default_name,
remove_dunder=remove_dunder,
adjust_name=False,
ignored=ignored,
)
if not remove_dunder:
return d
d = {k: v for k, v in d.items() if not k.startswith("__")}
return d
domain = adjust(
serialize(
self.outcome_space,
shorten_type_field=True,
add_type_field=True,
python_class_identifier=python_class_identifier,
),
"domain",
)
ufuns = [
adjust(
serialize(
u,
shorten_type_field=True,
add_type_field=True,
python_class_identifier=python_class_identifier,
),
i,
)
for i, u in enumerate(self.ufuns)
]
return dict(domain=domain, ufuns=ufuns)
[docs]
def to_yaml(self, folder: Path | str) -> None:
"""
Saves the scenario as yaml
Args:
folder: The destination path
"""
self.dumpas(folder, "yml")
[docs]
def to_json(self, folder: Path | str) -> None:
"""
Saves the scenario as json
Args:
folder: The destination path
"""
self.dumpas(folder, "json")
[docs]
def dumpas(
self,
folder: Path | str,
type="yml",
compact: bool = False,
python_class_identifier=PYTHON_CLASS_IDENTIFIER,
) -> None:
"""
Dumps the scenario in the given file format.
"""
folder = Path(folder)
folder.mkdir(parents=True, exist_ok=True)
serialized = self.serialize(python_class_identifier=python_class_identifier)
dump(serialized["domain"], folder / f"{serialized['domain']['name']}.{type}")
for u in serialized["ufuns"]:
dump(u, folder / f"{u['name']}.{type}", sort_keys=True, compact=compact)
[docs]
def load_info_file(self, file: Path):
if not file.is_file():
return self
self.info = load(file)
return self
[docs]
def load_info(self, folder: PathLike | str):
for ext in ("yml", "yaml", "json"):
path = Path(folder) / f"{INFO_FILE_NAME}.{ext}"
if not path.is_file():
continue
self.info = load(path)
break
return self
[docs]
@staticmethod
def from_genius_folder(
path: PathLike | str,
ignore_discount=False,
ignore_reserved=False,
safe_parsing=True,
) -> Scenario | None:
s = load_genius_domain_from_folder(
folder_name=str(path),
ignore_discount=ignore_discount,
ignore_reserved=ignore_reserved,
safe_parsing=safe_parsing,
)
if s is None:
return s
return s.load_info(path)
[docs]
@classmethod
def load(cls, folder: Path | str, safe_parsing=False) -> Scenario | None:
"""
Loads the scenario from a folder with supported formats: XML, YML
"""
for finder, loader in (
(find_domain_and_utility_files_yaml, cls.from_yaml_folder),
(find_domain_and_utility_files_xml, cls.from_genius_folder),
(find_domain_and_utility_files_geniusweb, cls.from_geniusweb_folder),
):
domain, _ = finder(folder)
if domain is not None:
s = loader(folder, safe_parsing=safe_parsing)
if s is not None:
return s.load_info(folder)
[docs]
@classmethod
def is_loadable(cls, path: PathLike | str):
if not Path(path).is_dir():
return False
for finder in (
find_domain_and_utility_files_yaml,
find_domain_and_utility_files_xml,
find_domain_and_utility_files_geniusweb,
):
d, _ = finder(Path(path))
if d is not None:
return True
return False
[docs]
@staticmethod
def from_genius_files(
domain: PathLike,
ufuns: Iterable[PathLike],
info: PathLike | None = None,
ignore_discount=False,
ignore_reserved=False,
safe_parsing=True,
) -> Scenario | None:
s = load_genius_domain(
domain,
[_ for _ in ufuns],
safe_parsing=safe_parsing,
ignore_discount=ignore_discount,
ignore_reserved=ignore_reserved,
normalize_utilities=False,
normalize_max_only=False,
)
if not s:
return s
if info is not None:
return s.load_info_file(Path(info))
return s
[docs]
@staticmethod
def from_geniusweb_folder(
path: PathLike | str,
ignore_discount=False,
ignore_reserved=False,
use_reserved_outcome=False,
safe_parsing=True,
) -> Scenario | None:
s = load_geniusweb_domain_from_folder(
folder_name=str(path),
ignore_discount=ignore_discount,
use_reserved_outcome=use_reserved_outcome,
ignore_reserved=ignore_reserved,
safe_parsing=safe_parsing,
)
if not s:
return s
return s.load_info(path)
[docs]
@staticmethod
def from_geniusweb_files(
domain: PathLike,
ufuns: Iterable[PathLike],
info: PathLike | None = None,
ignore_discount=False,
ignore_reserved=False,
use_reserved_outcome=False,
safe_parsing=True,
) -> Scenario | None:
s = load_geniusweb_domain(
domain,
[_ for _ in ufuns],
safe_parsing=safe_parsing,
ignore_discount=ignore_discount,
ignore_reserved=ignore_reserved,
use_reserved_outcome=use_reserved_outcome,
normalize_utilities=False,
normalize_max_only=False,
)
if not s:
return s
if info is not None:
return s.load_info_file(Path(info))
return s
[docs]
@classmethod
def from_yaml_folder(
cls,
path: PathLike | str,
ignore_discount=False,
ignore_reserved=False,
safe_parsing=True,
) -> Scenario | None:
domain, ufuns = find_domain_and_utility_files_yaml(path)
if not domain:
return None
s = cls.from_yaml_files(
domain=domain,
ufuns=ufuns,
ignore_discount=ignore_discount,
ignore_reserved=ignore_reserved,
safe_parsing=safe_parsing,
)
if not s:
return s
return s.load_info(path)
[docs]
@classmethod
def from_yaml_files(
cls,
domain: PathLike,
ufuns: Iterable[PathLike],
info: PathLike | None = None,
ignore_discount=False,
ignore_reserved=False,
safe_parsing=True,
python_class_identifier="type",
) -> Scenario | None:
_ = safe_parsing # yaml parsing is always safe
def adjust_type(d: dict, base: str = "negmas", domain=None) -> dict:
d["type"] = f"{base}.{d['type']}"
if domain is not None:
d["outcome_space"] = domain
return d
os = deserialize(
adjust_type(load(domain)),
base_module="negmas",
python_class_identifier=python_class_identifier,
)
utils = tuple(
deserialize(
adjust_type(load(fname), domain=os),
python_class_identifier=python_class_identifier,
base_module="negmas",
)
for fname in ufuns
)
# d = load(domain)
# type_ = d.pop("type", "")
# assert (
# "OutcomeSpace" in type_
# ), f"Unknown type or no type for domain file: {domain=}\n{d=}"
# type_ = f"negmas.outcomes.{type_}"
# d["issues"]
# os = instantiate(type_, **d)
# utils = []
# for fname in ufuns:
# d = load(fname)
# type_ = d.pop("type", "")
# assert (
# "Fun" in type_
# ), f"Unknown type or no type for ufun file: {domain=}\n{d=}"
# type_ = f"negmas.preferences.{type_}"
# utils.append(instantiate(type_, **d))
assert isinstance(os, CartesianOutcomeSpace)
s = Scenario(outcome_space=os, ufuns=tuple(utils)) # type: ignore
if s and ignore_discount:
s = s.remove_discounting()
if s and ignore_reserved:
s = s.remove_reserved_values()
if info is not None:
return s.load_info_file(Path(info))
return s
[docs]
def get_domain_issues(
domain_file_name: PathLike | str,
n_discretization: int | None = None,
safe_parsing=False,
) -> Sequence[Issue] | None:
"""
Returns the issues of a given XML domain (Genius Format)
Args:
domain_file_name: Name of the file
n_discretization: Number of discrete levels per continuous variable.
max_cardinality: Maximum number of outcomes in the outcome space after discretization. Used only if `n_discretization` is given.
safe_parsing: Apply more checks while parsing
Returns:
List of issues
"""
issues = None
if domain_file_name is not None:
issues, _ = issues_from_genius(
domain_file_name,
safe_parsing=safe_parsing,
n_discretization=n_discretization,
)
return issues
[docs]
def load_genius_domain(
domain_file_name: PathLike,
utility_file_names: Iterable[PathLike] | None = None,
ignore_discount=False,
ignore_reserved=False,
safe_parsing=True,
**kwargs,
) -> Scenario:
"""
Loads a genius domain, creates appropriate negotiators if necessary
Args:
domain_file_name: XML file containing Genius-formatted domain spec
utility_file_names: XML files containing Genius-fromatted ufun spec
ignore_reserved: Sets the reserved_value of all ufuns to -inf
ignore_discount: Ignores discounting
safe_parsing: Applies more stringent checks during parsing
Returns:
A `Domain` ready to run
"""
issues = None
if domain_file_name is not None:
issues, _ = issues_from_genius(domain_file_name, safe_parsing=safe_parsing)
agent_info = []
if utility_file_names is None:
utility_file_names = []
for ufname in utility_file_names:
try:
utility, discount_factor = UtilityFunction.from_genius(
file_name=ufname,
issues=issues,
safe_parsing=safe_parsing,
ignore_discount=ignore_discount,
ignore_reserved=ignore_reserved,
name=str(ufname),
)
except Exception as e:
raise OSError(
f"Ufun named {Path(ufname).name} cannot be read: {e.__class__.__name__}({e})"
)
agent_info.append(
{
"ufun": utility,
"ufun_name": ufname,
"reserved_value_func": utility.reserved_value
if utility is not None
else float("-inf"),
"discount_factor": discount_factor,
}
)
if domain_file_name is not None:
kwargs["dynamic_entry"] = False
kwargs["max_n_agents"] = None
if not ignore_discount:
for info in agent_info:
info["ufun"] = (
info["ufun"]
if info["discount_factor"] is None or info["discount_factor"] == 1.0
else make_discounted_ufun(
ufun=info["ufun"],
discount_per_round=info["discount_factor"],
power_per_round=1.0,
)
)
if issues is None:
raise ValueError(f"Could not load domain {domain_file_name}")
return Scenario(
outcome_space=make_os(issues, name=str(domain_file_name)), #
ufuns=tuple(_["ufun"] for _ in agent_info), #
)
[docs]
def load_genius_domain_from_folder(
folder_name: str | PathLike,
ignore_reserved=False,
ignore_discount=False,
safe_parsing=False,
**kwargs,
) -> Scenario:
"""
Loads a genius domain from a folder. See ``load_genius_domain`` for more details.
Args:
folder_name: A folder containing one XML domain file and one or more ufun files in Genius format
ignore_reserved: Sets the reserved_value of all ufuns to -inf
ignore_discount: Ignores discounting
safe_parsing: Applies more stringent checks during parsing
kwargs: Extra arguments to pass verbatim to SAOMechanism constructor
Returns:
A domain ready for `make_session`
Examples:
>>> import pkg_resources
>>> from negmas import load_genius_domain_from_folder
Try loading and running a domain with predetermined agents:
>>> domain = load_genius_domain_from_folder(
... pkg_resources.resource_filename("negmas", resource_name="tests/data/Laptop")
... )
Try loading a domain and check the resulting ufuns
>>> domain = load_genius_domain_from_folder(
... pkg_resources.resource_filename("negmas", resource_name="tests/data/Laptop")
... )
>>> domain.n_issues, domain.n_negotiators
(3, 2)
>>> [type(_) for _ in domain.ufuns]
[<class 'negmas.preferences.crisp.linear.LinearAdditiveUtilityFunction'>, <class 'negmas.preferences.crisp.linear.LinearAdditiveUtilityFunction'>]
Try loading a domain forcing a single issue space
>>> domain = load_genius_domain_from_folder(
... pkg_resources.resource_filename("negmas", resource_name="tests/data/Laptop")
... ).to_single_issue()
>>> domain.n_issues, domain.n_negotiators
(1, 2)
>>> [type(_) for _ in domain.ufuns]
[<class 'negmas.preferences.crisp.linear.LinearAdditiveUtilityFunction'>, <class 'negmas.preferences.crisp.linear.LinearAdditiveUtilityFunction'>]
Try loading a domain with nonlinear ufuns:
>>> folder_name = pkg_resources.resource_filename(
... "negmas", resource_name="tests/data/10issues"
... )
>>> domain = load_genius_domain_from_folder(folder_name)
>>> print(domain.n_issues)
10
>>> print(domain.n_negotiators)
2
>>> print([type(u) for u in domain.ufuns])
[<class 'negmas.preferences.crisp.nonlinear.HyperRectangleUtilityFunction'>, <class 'negmas.preferences.crisp.nonlinear.HyperRectangleUtilityFunction'>]
>>> u = domain.ufuns[0]
>>> print(u.outcome_ranges[0])
{1: (7.0, 9.0), 3: (2.0, 7.0), 5: (0.0, 8.0), 8: (0.0, 7.0)}
>>> print(u.mappings[0])
97.0
>>> print(u([0.0] * domain.n_issues))
0
>>> print(u([0.5] * domain.n_issues))
186.0
"""
folder_name = str(folder_name)
files = sorted(listdir(folder_name))
domain_file_name = None
utility_file_names = []
for f in files:
if not f.endswith(".xml") or f.endswith("pareto.xml"):
continue
full_name = folder_name + "/" + f
root = ET.parse(full_name).getroot()
if root.tag == "negotiation_template":
domain_file_name = Path(full_name)
elif root.tag == "utility_space":
utility_file_names.append(full_name)
if domain_file_name is None:
raise ValueError("Cannot find a domain file")
return load_genius_domain(
domain_file_name=domain_file_name,
utility_file_names=utility_file_names,
safe_parsing=safe_parsing,
ignore_reserved=ignore_reserved,
ignore_discount=ignore_discount,
**kwargs,
)
def find_domain_and_utility_files_yaml(
folder_name,
) -> tuple[PathLike | None, list[PathLike]]:
"""Finds the domain and utility_function files in a folder"""
files = sorted(listdir(folder_name))
domain_file_name = None
utility_file_names = []
folder_name = str(folder_name)
for f in files:
if not f.endswith(".yml") and not f.endswith(".yaml"):
continue
full_name = folder_name + "/" + f
data = load(full_name)
if data and "OutcomeSpace" in data.get("type", ""):
domain_file_name = full_name
elif data and ("fun" in data.get("type", "").lower()):
utility_file_names.append(full_name)
return domain_file_name, utility_file_names
def find_domain_and_utility_files_geniusweb(
folder_name,
) -> tuple[PathLike | None, list[PathLike]]:
"""Finds the domain and utility_function files in a folder"""
files = sorted(listdir(folder_name))
domain_file_name = None
utility_file_names = []
folder_name = str(folder_name)
for f in files:
if not f.endswith(".json") or f.endswith("specials.json"):
continue
full_name = folder_name + "/" + f
data = load(full_name)
if data and "issuesValues" in data.keys():
domain_file_name = full_name
elif data and "LinearAdditiveUtilitySpace" in data.keys():
utility_file_names.append(full_name)
return domain_file_name, utility_file_names
def find_domain_and_utility_files_xml(
folder_name,
) -> tuple[PathLike | None, list[PathLike]]:
"""Finds the domain and utility_function files in a folder"""
files = sorted(listdir(folder_name))
domain_file_name = None
utility_file_names = []
folder_name = str(folder_name)
for f in files:
if not f.endswith(".xml") or f.endswith("pareto.xml"):
continue
full_name = folder_name + "/" + f
root = ET.parse(full_name).getroot()
if root.tag == "negotiation_template":
domain_file_name = full_name
elif root.tag == "utility_space":
utility_file_names.append(full_name)
return domain_file_name, utility_file_names #
[docs]
def load_geniusweb_domain_from_folder(
folder_name: str | PathLike,
ignore_reserved=False,
ignore_discount=False,
use_reserved_outcome=False,
safe_parsing=False,
**kwargs,
) -> Scenario:
"""
Loads a genius-web domain from a folder. See ``load_geniusweb_domain`` for more details.
Args:
folder_name: A folder containing one XML domain file and one or more ufun files in Genius format
ignore_reserved: Sets the reserved_value of all ufuns to -inf
ignore_discount: Ignores discounting
safe_parsing: Applies more stringent checks during parsing
kwargs: Extra arguments to pass verbatim to SAOMechanism constructor
Returns:
A domain ready for `make_session`
"""
folder_name = str(folder_name)
domain_file_name, utility_file_names = find_geniusweb_domain_and_utility_files(
folder_name
)
if domain_file_name is None:
raise ValueError("Cannot find a domain file")
return load_geniusweb_domain(
domain_file_name=domain_file_name,
utility_file_names=utility_file_names,
safe_parsing=safe_parsing,
ignore_reserved=ignore_reserved,
ignore_discount=ignore_discount,
use_reserved_outcome=use_reserved_outcome,
**kwargs,
)
[docs]
def find_geniusweb_domain_and_utility_files(
folder_name,
) -> tuple[PathLike | None, list[PathLike]]:
"""Finds the domain and utility_function files in a GeniusWeb formatted json folder"""
files = sorted(listdir(folder_name))
domain_file_name = None
utility_file_names = []
folder_name = str(folder_name)
for f in files:
if not f.endswith(".json"):
continue
full_name = folder_name + "/" + f
d = load(full_name)
if any(_ in d.keys() for _ in GENIUSWEB_UFUN_TYPES):
utility_file_names.append(full_name)
elif "issuesValues" in d.keys():
domain_file_name = full_name
return domain_file_name, utility_file_names
[docs]
def find_genius_domain_and_utility_files(
folder_name,
) -> tuple[PathLike | None, list[PathLike]]:
"""Finds the domain and utility_function files in a folder"""
files = sorted(listdir(folder_name))
domain_file_name = None
utility_file_names = []
folder_name = str(folder_name)
for f in files:
if not f.endswith(".xml") or f.endswith("pareto.xml"):
continue
full_name = folder_name + "/" + f
root = ET.parse(full_name).getroot()
if root.tag == "negotiation_template":
domain_file_name = full_name
elif root.tag == "utility_space":
utility_file_names.append(full_name)
return domain_file_name, utility_file_names #
[docs]
def load_geniusweb_domain(
domain_file_name: PathLike,
utility_file_names: Iterable[PathLike] | None = None,
ignore_discount=False,
ignore_reserved=False,
use_reserved_outcome=False,
safe_parsing=True,
**kwargs,
) -> Scenario:
"""
Loads a geniusweb domain, creates appropriate negotiators if necessary
Args:
domain_file_name: XML file containing Genius-formatted domain spec
utility_file_names: XML files containing Genius-fromatted ufun spec
ignore_reserved: Sets the reserved_value of all ufuns to -inf
ignore_discount: Ignores discounting
safe_parsing: Applies more stringent checks during parsing
kwargs: Extra arguments to pass verbatim to SAOMechanism constructor
Returns:
A `Domain` ready to run
"""
issues = None
name = load(domain_file_name).get("name", domain_file_name)
if domain_file_name is not None:
issues, _ = issues_from_geniusweb(domain_file_name, safe_parsing=safe_parsing)
agent_info = []
if utility_file_names is None:
utility_file_names = []
for ufname in utility_file_names:
utility, discount_factor = UtilityFunction.from_geniusweb(
file_name=ufname,
issues=issues,
safe_parsing=safe_parsing,
ignore_discount=ignore_discount,
ignore_reserved=ignore_reserved,
use_reserved_outcome=use_reserved_outcome,
name=str(ufname),
)
agent_info.append(
{
"ufun": utility,
"ufun_name": ufname,
"reserved_value_func": utility.reserved_value
if utility is not None
else float("-inf"),
"discount_factor": discount_factor,
}
)
if domain_file_name is not None:
kwargs["dynamic_entry"] = False
kwargs["max_n_agents"] = None
if not ignore_discount:
for info in agent_info:
info["ufun"] = (
info["ufun"]
if info["discount_factor"] is None or info["discount_factor"] == 1.0
else make_discounted_ufun(
ufun=info["ufun"],
discount_per_round=info["discount_factor"],
power_per_round=1.0,
)
)
if issues is None:
raise ValueError(f"Could not load domain {domain_file_name}")
return Scenario(
outcome_space=make_os(issues, name=name),
ufuns=[_["ufun"] for _ in agent_info], # type: ignore We trust that the ufun will be loaded
)