Source code for negmas.genius.bridge

"""
Implements GeniusBridge which manages connections to Genius through Py4J.

The main class is the GeniusBridge class which encapsulates a gateway
(connection) to a JVM running the geniusbridge.jar file.
We can have multiple gateways connected to the same JVM bridge.

The bridge life-cycle is as follows:
start -> connect -> stop/restart

The most important methods that this class provides are:

Status enquiry
--------------
- is_running: Tells you if a bridge is running on a given port
- is_installed: Tells you if the bridge jar is insalled in the default location

Bridge Lifetime Control
-----------------------
- start() starts a bridge and connects to it
- connect() connects to a *running* bridge
- stop() stops a running bridge.
- restart() stops then starts a bridge.

Bridge Control Operations
-------------------------

- clean() removes all agents from the bridge and runs garbage collection. You
  must be sure that no active negotiations are happening when you call this
  method.
- kill_threads() [not recommended] Kills all threads started by the bridge. The
  bridge is most likely going to become unusable after running this command


"""

from __future__ import annotations

import os
import pathlib
import socket
import subprocess
import time
import traceback
from pathlib import Path
from typing import Any

import psutil
from py4j.java_gateway import (
    CallbackServerParameters,
    GatewayParameters,
    JavaGateway,
    JavaObject,
)
from py4j.protocol import Py4JNetworkError

import negmas.warnings as warnings

from ..config import CONFIG_KEY_GENIUS_BRIDGE_JAR, negmas_config
from ..helpers import TimeoutCaller, TimeoutError, unique_name
from .common import DEFAULT_JAVA_PORT, get_free_tcp_port

__all__ = [
    "GeniusBridge",
    "init_genius_bridge",
    "genius_bridge_is_running",
    "genius_bridge_is_installed",
]

GENIUS_LOG_BASE = Path(
    negmas_config("genius_log_base", Path.home() / "negmas" / "geniusbridge" / "logs")  # type: ignore
)


def _kill_process(proc_pid):
    process = psutil.Process(proc_pid)
    for proc in process.children(recursive=True):
        proc.kill()
    process.kill()


[docs] def genius_bridge_is_running(port: int = DEFAULT_JAVA_PORT) -> bool: """ Checks whether a Genius Bridge is running. A genius bridge allows you to use `GeniusNegotiator` objects. Remarks: You can start a Genius Bridge in at least three ways: - execute the python function `init_genius_bridge()` in this module - run "negmas genius" on the terminal - execute `GeniusBridge.start()` """ s = socket.socket() s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.connect(("127.0.0.1", port)) s.close() # we know someone is listening. Now we check that it is us gateway = JavaGateway( gateway_parameters=GatewayParameters(port=port, auto_close=True), callback_server_parameters=CallbackServerParameters( port=0, daemonize=True, daemonize_connections=True ), ) gateway.jvm.System.currentTimeMillis() # type: ignore return True except ConnectionRefusedError: # try: # s.shutdown(2) # except Exception: # pass s.close() return False except IndexError: # try: # s.shutdown(2) # except Exception: # pass s.close() return False except Py4JNetworkError: # try: # s.shutdown(2) # except Exception: # pass s.close() return False
[docs] def init_genius_bridge( path: Path | str | None = None, port: int = DEFAULT_JAVA_PORT, debug: bool = False, timeout: float = 0, force_timeout: bool = True, save_logs: bool = False, log_path: os.PathLike | None = None, die_on_exit: bool = False, use_shell: bool = False, verbose: bool = False, allow_agent_print: bool = False, # capture_output: bool = False, ) -> int: """Initializes a genius connection Args: path: The path to a JAR file that runs negloader port: port number to use debug: If true, passes --debug to the bridge timeout: If positive and nonzero, passes it as the global timeout for the bridge. Note that currently, the bridge supports only integer timeout values and the fraction will be truncated. Returns: Port if started. -1 if a bridge is already running on this port. 0 if failed """ if port <= 0: port = get_free_tcp_port() if genius_bridge_is_running(port): return -1 port = GeniusBridge.start( port=port, debug=debug, timeout=timeout, path=str(path) if path else path, force_timeout=force_timeout, save_logs=save_logs, log_path=log_path, die_on_exit=die_on_exit, use_shell=use_shell, verbose=verbose, allow_agent_print=allow_agent_print, # capture_output=capture_output, ) return port
# # if not force and common_gateway is not None and common_port == port: # # print("Java already initialized") # # return True # # if not path: # path = negmas_config( # type: ignore # CONFIG_KEY_GENIUS_BRIDGE_JAR, # pathlib.Path.home() / "negmas" / "files" / "geniusbridge.jar", # ) # if path is None: # warnings.warn( # "Cannot find the path to genius bridge jar. Download the jar somewhere in your machine and add its path" # 'to ~/negmas/config.json under the key "genius_bridge_jar".\n\nFor example, if you downloaded the jar' # " to /path/to/your/jar then edit ~/negmas/config.json to read something like\n\n" # '{\n\t"genius_bridge_jar": "/path/to/your/jar",\n\t.... rest of the config\n}\n\n' # "You can find the jar at https://yasserfarouk.github.io/files/geniusbridge.jar", # warnings.NegmasBridgePathWarning, # ) # return False # path = Path(path).expanduser().absolute() # if debug: # params = " --debug" # else: # params = " --silent --no-logs" # if timeout >= 0: # params += f" --timeout={int(timeout)}" # # try: # subprocess.Popen( # ['java', '-jar', path, '--die-on-exit', f'{port}'] # f"java -jar {path} --die-on-exit {params} {port}", shell=True # ) # except (OSError, TimeoutError, RuntimeError, ValueError): # return False # time.sleep(0.5) # gateway = JavaGateway( # gateway_parameters=GatewayParameters(port=port, auto_close=True), # callback_server_parameters=CallbackServerParameters( # port=0, daemonize=True, daemonize_connections=True # ), # ) # python_port = gateway.get_callback_server().get_listening_port() # type: ignore # gateway.java_gateway_server.resetCallbackClient( # type: ignore # gateway.java_gateway_server.getCallbackClient().getAddress(), # type: ignore # python_port, # type: ignore # ) # return True
[docs] def genius_bridge_is_installed() -> bool: """ Checks if geniusbridge is available in the default path location """ return (Path.home() / "negmas" / "files" / "geniusbridge.jar").exists()
[docs] class GeniusBridge: gateways: dict[int, JavaGateway] = dict() java_processes: dict[int, Any] = dict() python_ports: dict[int, int] = dict() def __init__(self): raise RuntimeError("Cannot create objects of type GeniusBridge.") """Gateways to different genius bridges""" """handles to the java processes running the bridges""" """The port used by python's Gateway to connect to the bridge"""
[docs] @classmethod def is_running(cls, port: int) -> bool: """Returns true if a geniusbridge.jar is running on the given port""" return genius_bridge_is_running(port)
[docs] @classmethod def is_installed(cls) -> bool: """Returns true if a geniusbridge.jar is available""" return genius_bridge_is_installed()
[docs] @classmethod def gateway(cls, port=DEFAULT_JAVA_PORT, force=False) -> JavaGateway | None: """ Finds and returns a gateway for a genius bridge on the given port Args: port: The port used by the Jave genius bridge. force: If true, a new gateway is created even if one exists in the list of gateways available in `GeniusBridge`.gateways. Returns: The gateway if found otherwise an exception will be thrown Remarks: - this method does NOT start a bridge. It only connects to a running bridge. """ assert port > 0 gateway = cls.gateways.get(port, None) if not force else None if gateway is not None: return gateway try: gateway = JavaGateway( gateway_parameters=GatewayParameters(port=port, auto_close=True), callback_server_parameters=CallbackServerParameters( port=0, daemonize=True, daemonize_connections=True ), ) python_port = gateway.get_callback_server().get_listening_port() # type: ignore gateway.java_gateway_server.resetCallbackClient( # type: ignore gateway.java_gateway_server.getCallbackClient().getAddress(), # type: ignore python_port, ) except Exception: if gateway is not None: gateway.shutdown() gateway.shutdown_callback_server() return None cls.python_ports[port] = python_port cls.gateways[port] = gateway return gateway
[docs] @classmethod def wait_until_listening( cls, port: int = DEFAULT_JAVA_PORT, timeout: float = 0.5 ) -> bool: """ waits until a genius bridge is listening to the given port Args: port: The port to test timeout: Maximum time to wait before returning (in seconds) Returns: True if the genius bridge is running any more (success). """ if genius_bridge_is_running(port): return True time.sleep(timeout) return TimeoutCaller.run( lambda: genius_bridge_is_running(port), timeout=timeout )
[docs] @classmethod def start( cls, port: int = DEFAULT_JAVA_PORT, path: str | None = None, debug: bool = False, timeout: float = 0, force_timeout: bool = True, save_logs: bool = False, log_path: os.PathLike | None = None, die_on_exit: bool = False, use_shell: bool = False, verbose: bool = False, allow_agent_print: bool = False, # capture_output: bool = False, ) -> int: """Initializes a genius connection Args: port: port number to use. A value <= 0 means get any free tcp port. path: The path to a JAR file that runs negloader debug: If true, passes --debug to the bridge timeout: If positive and nonzero, passes it as the global timeout for the bridge. Note that currently, the bridge supports only integer timeout values and the fraction will be truncated. force_timeout: if false, no timeout will be forced by the bridge save_logs: If false, the brige is instructed not to save any logs log_path: the path to store logs from the bridge. Onle effective if `save_logs` If not given, defaults to ~/negmas/geniusbridge/logs/{port}-{datetime}.txt die_on_exit: If given, the bridge will be closed when this process is ended use_shell: If given, the bridge will be started in a subshell. Returns: The port number used by the java process. 0 for failure Remarks: - if a bridge is running, it will return its port and it does not matter whether or not the bridge is started from this process or any other way. - it is recommended not to change the defaults for this function. """ if port <= 0: port = get_free_tcp_port() if cls.is_running(port): return 0 if cls.gateway(port) is None else port path = ( # type: ignore negmas_config( CONFIG_KEY_GENIUS_BRIDGE_JAR, pathlib.Path.home() / "negmas" / "files" / "geniusbridge.jar", ) if path is None or not path else path ) if path is None: warnings.warn( "Cannot find the path to genius bridge jar. Download the jar somewhere in your machine and add its path" 'to ~/negmas/config.json under the key "genius_bridge_jar".\n\nFor example, if you downloaded the jar' " to /path/to/your/jar then edit ~/negmas/config.json to read something like\n\n" '{\n\t"genius_bridge_jar": "/path/to/your/jar",\n\t.... rest of the config\n}\n\n' "You can find the jar at https://yasserfarouk.github.io/files/geniusbridge.jar", warnings.NegmasBridgePathWarning, ) return 0 path = Path(path).expanduser().absolute() # type: ignore if log_path is None or not log_path: log_path = ( GENIUS_LOG_BASE / f"{unique_name(str(port), add_time=True, rand_digits=4, sep='.')}.txt" ).absolute() log_path.parent.mkdir(parents=True, exist_ok=True) else: log_path = Path(log_path).absolute() log_path.parent.mkdir(parents=True, exist_ok=True) # if die_on_exit: # if debug: # params = " --die-on-exit --debug" # else: # params = " --die-on-exit" # if force_timeout: # params += " --force-timeout" # if timeout >= 0: # params += f" --timeout={int(timeout)}" # else: # params += " --no-timeout" # params += " --with-logs" if save_logs else " --no-logs" # if save_logs: # params += f"--log-file={str(log_path)}" # else: if debug: params = ["--debug"] else: params = [] params.append("--verbose" if verbose else "--silent") if allow_agent_print: params.append("--allow-agent-print") if die_on_exit: params.append("--die-on-exit") if force_timeout: params.append("--force-timeout") if timeout >= 0: params.append(f"--timeout={int(timeout)}") else: params.append("--no-timeout") params.append("--with-logs" if save_logs else "--no-logs") if save_logs: params.append(f"--logfile={str(log_path)}") try: # if die_on_exit: # cls.java_processes[port] = subprocess.Popen( # f"java -jar {str(path)} {params} {port}", # shell=use_shell, # cwd=path.parent, # ) # else: cls.java_processes[port] = subprocess.Popen( ["java", "-jar", str(path)] + params + [f"{port}"], shell=use_shell, # capture_output=capture_output, cwd=path.parent, # type: ignore ) except (OSError, TimeoutError, RuntimeError, ValueError) as e: warnings.warn(str(e), warnings.NegmasBridgeProcessWarning) return 0 cls.wait_until_listening(port, timeout=0.1) return port if cls.gateway(port, force=True) is not None else 0
@classmethod def _close_gateway(cls, port): """ Closes the gateway and removes it from all class dicts. """ gateway = cls.gateways.get(port, None) if gateway is None: cls.java_processes.pop(port, None) cls.python_ports.pop(port, None) gateway.shutdown() # type: ignore gateway.shutdown_callback_server() # type: ignore cls.gateways.pop(port, None) cls.java_processes.pop(port, None) cls.python_ports.pop(port, None)
[docs] @classmethod def close_gateway(cls, port=DEFAULT_JAVA_PORT): """ Closes the gateway. Args: port: The port the gateway is connected to. If None, DEFAULT_JAVA_PORT is used. """ if port is None: port = DEFAULT_JAVA_PORT cls._close_gateway(port)
[docs] @classmethod def close_gateways(cls): """ Closes all open gateways. """ keys = list(cls.gateways.keys()) for p in keys: cls._close_gateway(p)
[docs] @classmethod def wait_until_not_listening( cls, port: int = DEFAULT_JAVA_PORT, timeout: float = 0.5 ) -> bool: """ waits until the genius bridge is not listening to the given port Args: port: The port to test max_sleep: Maximum time to wait before returning (in seconds) Returns: True if the genius bridge is NOT running any more (success). """ if not genius_bridge_is_running(port): return False time.sleep(timeout) return not genius_bridge_is_running(port)
[docs] @classmethod def shutdown(cls, port: int = DEFAULT_JAVA_PORT, wait: bool = True) -> bool: """ Attempts to shutdown the bridge on that port. Args: port: The port to shutdown. Remarks: - This is the cleanest way to close a java bridge and it simply sends a message to the bridge to shut itself down and cleanly shuts down the py4j bridge. """ assert port > 0 try: gateway = cls.gateway(port) except Exception: return not genius_bridge_is_running(port) if gateway is None: return True try: gateway.entry_point.shutdown() # type: ignore except Exception: pass if wait: cls.wait_until_not_listening(port) cls._close_gateway(port) return True
[docs] @classmethod def kill(cls, port: int = DEFAULT_JAVA_PORT, wait: bool = True) -> bool: """Kills the java bridge connected to this port by asking it to exit""" assert port > 0 try: gateway = cls.gateway(port, force=True) except Exception: return False if gateway is None: return False try: gateway.entry_point.kill() # type: ignore except Exception: pass if wait: cls.wait_until_not_listening(port) cls._close_gateway(port) return True
[docs] @classmethod def kill_forced(cls, port: int = DEFAULT_JAVA_PORT, wait: bool = True) -> bool: """Kills the java bridge connected to this port forcibly. Remarks: - The java bridge process must have been started by this process. """ assert port > 0 p = cls.java_processes.pop(port, None) if p is None: warnings.warn( "Attempting to force-kill a genius brdige we did not start " "at port {port}", warnings.NegmasBridgeProcessWarning, ) return False _kill_process(p.pid) if wait: cls.wait_until_not_listening(port) cls._close_gateway(port) return True
[docs] @classmethod def stop(cls, port: int = DEFAULT_JAVA_PORT) -> bool: """Stops a running bridge Args: port: port number to use Returns: True if successful Remarks: - You should use this method to stop bridges. - It tries the following in order: 1. shutdown the java bridge by calling its shutdown() method. 2. killing the java bridge by calling its kill() method. 3. killing the java bridge forcibly by killing the process - This method always waits for a short time to allow each process to complete. If it returns True then the bridge is no longer listening on the given port. """ assert port > 0 if not genius_bridge_is_running(port): return True cls.shutdown(port, wait=True) if not genius_bridge_is_running(port): return True cls.kill(port, wait=True) if not genius_bridge_is_running(port): return True cls.kill_forced(port, wait=True) if not genius_bridge_is_running(port): return True return False
[docs] @classmethod def restart(cls, port: int = DEFAULT_JAVA_PORT, *args, **kwargs) -> bool: """Starts or restarts the genius bridge Args: port: port number to use Returns: True if successful """ assert port > 0 if not cls.stop(port): return False cls.wait_until_not_listening(port, 1) if not cls.start(port, *args, **kwargs): return False cls.wait_until_listening(port, 1) return genius_bridge_is_running(port)
[docs] @classmethod def kill_threads( cls, port: int = DEFAULT_JAVA_PORT, wait_time: float = 0.5 ) -> bool: """kills all threads in the given java bridge""" assert port > 0 try: gateway = cls.gateway(port, force=True) except Exception as e: warnings.warn( f"Failed to kill threads at port {port} with error: {str(e)}\n" f"{traceback.format_exc()}", warnings.NegmasBridgeProcessWarning, ) return False if gateway is None: return False try: gateway.entry_point.kill_threads(int(wait_time * 1000)) # type: ignore except Exception: pass return True
[docs] @classmethod def clean(cls, port=DEFAULT_JAVA_PORT) -> bool: """ Removes all agents and runs garbage collection on the bridge """ assert port > 0 try: gateway = cls.gateway(port, force=True) except Exception as e: warnings.warn( f"Failed to kill threads at port {port} with error: {str(e)}\n" f"{traceback.format_exc()}", warnings.NegmasBridgeProcessWarning, ) return False if gateway is None: return False gateway.entry_point.clean() # type: ignore return True
[docs] @classmethod def clean_all(cls) -> bool: """ Removes all agents and runs garbage collection on all bridges. """ success = True for port in cls.gateways.keys(): success = success and cls.clean(port) return success
[docs] @classmethod def connect(cls, port: int = DEFAULT_JAVA_PORT) -> JavaObject: """ Connects to a running genius-bridge Args: port: The port at which the bridge in listening in Java Remarks: - The difference between this method and start() is that this one does not attempt to start a java bridge if one does not exist. """ assert port > 0 try: gateway = cls.gateway(port, force=True) except Exception as e: warnings.warn( f"Failed to kill threads at port {port} with error: {str(e)}\n" f"{traceback.format_exc()}", warnings.NegmasBridgeProcessWarning, ) raise RuntimeError(e) if gateway is None: raise RuntimeError("Got None as the gateway!!") return gateway.entry_point