Source code for negmas.helpers.inout

#!/usr/bin/env python
"""A set of utilities that can be used by agents developed for the platform.

This set of utlities can be extended but must be backward compatible for
at least two versions
"""

from __future__ import annotations

import base64
import itertools
import json
import os
import pathlib
from os import PathLike
from pathlib import Path
from typing import Any, Iterable

import dill as pickle
import inflect
import numpy as np
import pandas as pd
import stringcase
import yaml

from negmas import warnings
from negmas.config import negmas_config

from .types import TYPE_START, get_class, get_full_type_name, is_jsonable

__all__ = [
    "is_nonzero_file",
    "ConfigReader",
    "DEFAULT_DUMP_EXTENSION",
    "dump",
    "load",
    "add_records",
    "TYPE_START",
    "has_needed_files",
]
# conveniently named classes
BYTES_START = "__BYTES__:"
PATH_START = "__PATH__:"
"""Maps from a single issue to a Negotiator function."""

DEFAULT_DUMP_EXTENSION = negmas_config("default_dump_extension", "json")


def convert_numpy(obj):
    """Recursively converts NumPy types to standard Python types."""
    if isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()  # Convert NumPy array to list
    elif isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy(v) for v in obj]
    else:
        return obj  # for all other types, return the object.


def is_nonzero_file(fpath: PathLike) -> bool:
    """Whether or not the path is for an existing nonzero file.

    Args:
        fpath: path to the file to test. It accepts both str and pathlib.Path
    """
    return os.path.isfile(fpath) and os.path.getsize(fpath) > 0


_inflect_engine = inflect.engine()


class ConfigReader:
    @classmethod
    def _split_key(cls, key: str) -> tuple[str, str | None]:
        """Splits the key into a key name and a class name.

        Remarks:

            - Note that if the given key has multiple colons the first two will be parsed as key name: class name and
              the rest will be ignored. This can be used to add comments
        """
        keys = key.split(":")
        if len(keys) == 1:
            return keys[0], None
        else:
            return keys[0], keys[1]

    @classmethod
    def _parse_children_config(cls, children, scope):
        """Parses children in the given scope."""
        remaining_children = {}
        myconfig = {}
        setters = []
        for key, v in children.items():
            k, class_name = cls._split_key(key)
            if isinstance(v, dict):
                if class_name is None:
                    class_name = stringcase.pascalcase(k)
                the_class = get_class(class_name=class_name, scope=scope)
                assert hasattr(
                    the_class, "from_config"
                ), f"{the_class} does not have a from_config attribute"
                obj, obj_children = the_class.from_config(  # type: ignore
                    config=v,
                    ignore_children=False,
                    try_parsing_children=True,
                    scope=scope,
                )
                if obj_children is not None and len(obj_children) > 0:
                    remaining_children[k] = obj_children
                setter_name = "set_" + k
                if hasattr(cls, setter_name):
                    setters.append((setter_name, obj))
                else:
                    myconfig[k] = obj
            elif isinstance(v, Iterable) and not isinstance(v, str):
                singular = _inflect_engine.singular_noun(k)
                if singular is False:
                    singular = k
                if class_name is None:
                    class_name = stringcase.pascalcase(singular)
                setter_name = "set_" + k
                objs = []
                for current in list(v):
                    the_class = get_class(class_name=class_name, scope=scope)
                    assert hasattr(
                        the_class, "from_config"
                    ), f"{the_class} does not have a from_config attribute"
                    obj = the_class.from_config(  # type: ignore
                        config=current,
                        ignore_children=True,
                        try_parsing_children=True,
                        scope=scope,
                    )
                    objs.append(obj)
                if hasattr(cls, setter_name):
                    setters.append((setter_name, objs))
                else:
                    myconfig[k] = objs
            else:
                # not a dictionary and not an iterable.
                remaining_children[k] = v

        return myconfig, remaining_children, setters

    @classmethod
    def read_config(
        cls, config: str | dict, section: str | None = None
    ) -> dict[str, Any]:
        """Reads the configuration from a file or a dict and prepares it for
        parsing.

        Args:
            config: Either a file name or a dictionary
            section: A section in the file or a key in the dictionary to use for loading params

        Returns:

            A dict ready to be parsed by from_config

        Remarks:
        """
        if isinstance(config, str):
            # If config is a string, assume it is a file and read it from the appropriate location
            def exists(nm):
                return os.path.exists(nm) and not os.path.isdir(nm)

            if not exists(config):
                name = pathlib.Path("./") / pathlib.Path(config)
                if exists(name):
                    config = str(name.absolute())
                else:
                    name = (pathlib.Path("./.negmas") / config).absolute()
                    if exists(name):
                        config = str(name)
                    else:
                        name = (
                            pathlib.Path(os.path.expanduser("~/.negmas")) / config
                        ).absolute()
                        if exists(name):
                            config = str(name)
                        else:
                            raise ValueError(f"Cannot find config in {config}.")
            with open(config) as f:
                if config.endswith(".json"):
                    config = json.load(f)
                elif config.endswith(".cfg"):
                    config = eval(f.read())
                elif config.endswith(".yaml") or config.endswith(".yml"):
                    config = yaml.safe_load(f)
                else:
                    raise ValueError(f"Cannot parse {config}")

        if section is not None:
            config = config[section]  # type: ignore

        return config  # type: ignore

    @classmethod
    def from_config(
        cls,
        config: str | dict,
        section: str | None = None,
        ignore_children: bool = True,
        try_parsing_children: bool = True,
        scope=None,
    ):
        """Creates an object of this class given the configuration info.

        Args:
            config: Either a file name or a dictionary
            section: A section in the file or a key in the dictionary to use for loading params
            ignore_children: If true then children will be ignored and there will be a single return
            try_parsing_children: If true the children will first be parsed as `ConfigReader` classes if they are not
            simple types (e.g. int, str, float, Iterable[int|str|float]
            scope: The scope at which to evaluate any child classes. This MUST be passed as scope=globals() if you are
            having any children that are to be parsed.

        Returns:

            An object of cls if ignore_children is True or a tuple with an object of cls and a dictionary with children
            that were not parsed.

        Remarks:

            - This function will return an object of its class after passing the key-value pairs found in the config to
              the init function.

            - Requiring passing scope=globals() to this function is to get around the fact that in python eval() will be
              called with a globals dictionary based on the module in which the function is defined not called. This means
              that in general when eval() is called to create the children, it will not have access to the class
              definitions of these children (except if they happen to be imported in this file). To avoid this problem
              causing an undefined_name exception, the caller must pass her globals() as the scope.
        """
        config = cls.read_config(config=config, section=section)

        if config is None:
            if ignore_children:
                return None
            else:
                return None, {}

        # now we have a dict called config which has our configuration

        myconfig = {}  # parts of the config that can directly be parsed
        children = {}  # parts of the config that need further parsing
        setters = []  # the setters are those configs that have a set_ function for them.

        def _is_simple(x):
            """Tests whether the input can directly be parsed."""
            return (
                x is None
                or isinstance(x, int)
                or isinstance(x, str)
                or isinstance(x, float)
                or (
                    isinstance(x, Iterable)
                    and not isinstance(x, dict)
                    and all(_is_simple(_) for _ in list(x))
                )
            )

        def _set_simple_config(key, v) -> dict[str, Any] | None:
            """Sets a simple value v for key taken into accout its class and
            the class we are constructing."""
            key_name, class_name = cls._split_key(key)
            _setter = "set_" + key_name
            params = {}
            if hasattr(cls, _setter):
                setters.append((_setter, v))
                return None
            params[key_name] = (
                v
                if class_name is None
                else get_class(class_name=class_name, scope=scope)(v)
            )
            return params

        # read the configs key by key and try to parse anything that is simple enough to parse

        for k, v in config.items():  # type: ignore
            if isinstance(v, dict):
                children[k] = v
            elif isinstance(v, Iterable) and not isinstance(v, str):
                lst = list(v)
                if all(_is_simple(_) for _ in lst):
                    # that is a simple value of the form k:class_name = v. We construct class_name (if it exists) with v
                    # notice that we need to remove class_name when setting the key in myconfig
                    val = _set_simple_config(k, v)
                    if val is not None:
                        myconfig.update(val)
                else:
                    children[k] = v  # type: ignore
            else:
                # that is a simple value of the form k:class_name = v. We construct class_name (if it exists) with v
                val = _set_simple_config(k, v)
                if val is not None:
                    myconfig.update(val)

        # now myconfig has all simply parsed parts and children has all non-parsed parts

        if len(children) > 0 and try_parsing_children:
            if scope is None:
                ValueError(
                    "scope is None but that is not allowed. You must pass scope=globals() or scope=locals() to "
                    "from_config. If your classes are defined in the global scope pass globals() and if they "
                    "are defined in local scope then pass locals(). You can only pass scope=None if you are "
                    "sure that all of the constructor parameters of the class you are creating are simple "
                    "values like ints floats and strings."
                )
            parsed_conf, remaining_children, setters = cls._parse_children_config(
                children=children, scope=scope
            )
            myconfig.update(parsed_conf)
            children = remaining_children

        main_object = cls(**myconfig)  # type: ignore

        if try_parsing_children:
            # we will only have setters if we have children
            for setter, value in setters:
                getattr(main_object, setter)(value)

        if ignore_children:
            return main_object
        return main_object, children


class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, bytes):
            encoded = base64.b64encode(
                obj
            )  # b'ZGF0YSB0byBiZSBlbmNvZGVk' (notice the "b")
            return BYTES_START + encoded.decode("ascii")  #
        elif isinstance(obj, Path):
            return PATH_START + str(obj)
        elif not is_jsonable(obj):
            # it may be a type. Always convert types to full names when saving to json
            try:
                obj = TYPE_START + get_full_type_name(obj)
            except Exception:
                return obj
            return obj
        else:
            return super().default(obj)


class NpDecorder(json.JSONDecoder):
    def default(self, obj):
        if isinstance(obj, str):
            if obj.startswith(BYTES_START):
                return base64.b64decode(
                    obj[BYTES_START:]
                )  # b'ZGF0YSB0byBiZSBlbmNvZGVk' (notice the "b")
            elif obj.startswith(TYPE_START):
                return get_class(
                    obj[TYPE_START:]
                )  # b'ZGF0YSB0byBiZSBlbmNvZGVk' (notice the "b")
            elif obj.startswith(PATH_START):
                return Path(
                    obj[PATH_START:]
                )  # b'ZGF0YSB0byBiZSBlbmNvZGVk' (notice the "b")
        return super().default(obj)  #  type: ignore


[docs] def dump( d: Any, file_name: str | os.PathLike | pathlib.Path, sort_keys=True, compact=False ) -> None: """Saves an object depending on the extension of the file given. If the filename given has no extension, `DEFAULT_DUMP_EXTENSION` will be used. Args: d: Object to save file_name: file name sort_keys: If true, the keys will be sorted before saving compact: If given, a compact representation will be tried Remarks: - Supported formats are json, yaml - If None is given, the file will be created but will be empty - Numpy arrays will be converted to lists before being dumped """ file_name = pathlib.Path(file_name).expanduser().absolute() if file_name.suffix == "": file_name = pathlib.Path(str(file_name) + "." + str(DEFAULT_DUMP_EXTENSION)) file_name.parent.mkdir(parents=True, exist_ok=True) if d is None: with open(file_name, "w") as f: pass if file_name.suffix == ".json": with open(file_name, "w") as f: json.dump( d, f, sort_keys=sort_keys, indent=2 if not compact else None, cls=NpEncoder, ) elif file_name.suffix in (".yaml", ".yml"): with open(file_name, "w") as f: yaml.safe_dump( convert_numpy(d), f, default_flow_style=compact, sort_keys=sort_keys ) elif file_name.suffix == ".pickle": with open(file_name, "wb") as f: pickle.dump(d, f) elif file_name.suffix == ".csv": if not isinstance(d, pd.DataFrame): try: d = pd.DataFrame(d) except Exception as e: raise ValueError(f"Failed to convert to a dataframe: {str(e)}") d.to_csv(file_name, index_label="index") else: raise ValueError(f"Unknown extension {file_name.suffix} for {file_name}")
[docs] def load(file_name: str | os.PathLike | pathlib.Path) -> Any: """Loads an object depending on the extension of the file given. If the filename given has no extension, `DEFAULT_DUMP_EXTENSION` will be used. Args: file_name: file name Remarks: - Supported formats are json, yaml - If None is given, the file will be created but will be empty """ file_name = pathlib.Path(file_name).expanduser().absolute() if file_name.suffix == "": file_name = pathlib.Path(str(file_name) + "." + str(DEFAULT_DUMP_EXTENSION)) d = {} if not file_name.exists() or os.stat(file_name).st_size < 2: return d if file_name.suffix == ".json": with open(file_name) as f: d = json.load(f) elif file_name.suffix in (".yaml", ".yml"): with open(file_name) as f: d = yaml.safe_load(f) elif file_name.suffix == ".pickle": with open(file_name, "rb") as f: d = pickle.load(f) elif file_name.suffix == ".csv": d = pd.read_csv(file_name).to_dict() # type: ignore else: raise ValueError(f"Unknown extension {file_name.suffix} for {file_name}") return d
def add_records( file_name: str | os.PathLike, data: Any, col_names: list[str] | None = None, raise_exceptions=False, ) -> None: """Adds records to a csv file. Args: file_name: file name data: data to use for creating the record col_names: Names in the data. raise_exceptions: If given, exceptions are raised on failure Returns: None Remarks: - If col_names are not given, the function will try to normalize the input data if it was a dict or a list of dicts """ if col_names is None and ( isinstance(data, dict) or (isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict)) ): data = pd.json_normalize(data) else: data = pd.DataFrame(data=data, columns=col_names) if len(data) < 1: return file_name = pathlib.Path(file_name) file_name.parent.mkdir(parents=True, exist_ok=True) new_file = True mode = "a" # if file_name.exists(): if is_nonzero_file(file_name): new_file = False with open(file_name) as f: header = f.readline().strip().strip("\n") cols = header.split(",") for col in cols: if len(col) > 0 and col not in data.columns: data[col] = None if {_ for _ in data.columns} == set(cols): data = data.loc[:, cols] else: try: old_data = pd.read_csv(file_name, index_col=None) data = pd.concat((old_data, data), axis=0, ignore_index=True) # type: ignore except Exception as e: if raise_exceptions: raise e warnings.warn( f"Failed to read data from file {str(file_name)} will override it\n{e}", warnings.NegmasIOWarning, ) mode = "w" new_file = True data.to_csv(str(file_name), index=False, index_label="", mode=mode, header=new_file) StrOrTwo = tuple[str, str] | str def has_needed_files( path: Path, needed_files: Iterable[StrOrTwo] = tuple(), needed_nonzero_files: Iterable[StrOrTwo] = tuple(), verbose: bool = False, ) -> list[Path]: """Checks if the given path is a folder that has the given needed files. Args: path: The Path to check needed_files: The files/folders needed to be inside the given folder. needed_nonzero_files: The files needed to be inside the given folder and of nonzero size. Returns: An empty list if any condition is not matched or a list giving the corresponding matching file/folder for each condition (specified by needed_files then needed_nonzero_files) Remarks: - `needed_files` and `Needed_nonzero_files` is an `Iterable` where each element is either - A string giving the name of of a file/folder that MUST exist (AND) - A tuple of strings that gives names of files/folders one of which needs to exist (OR) """ if not path.exists() or not path.is_dir(): if verbose: print(f"Cannot find {path} or is not a directory") return [] found = [] needed_nonzero_files = list(needed_nonzero_files) for x in itertools.chain(needed_files, needed_nonzero_files): if isinstance(x, str): if not (path / x).exists(): if verbose: print(f"Cannot find {path / x}") return [] found.append(path / x) else: for p in x: if (path / p).exists(): found.append(path / p) break else: if verbose: print(f"Cannot find any of {x} under {path}") return [] if needed_nonzero_files: found = [_ for _ in found if is_nonzero_file(path / _)] return found