from loguru import logger from pathlib import Path import contextlib from typing import List import os import sh import subprocess from nicegui import ui import docker docker_client = docker.from_env() @contextlib.contextmanager def modified_environ(*remove, **update): """ Temporarily updates the ``os.environ`` dictionary in-place. The ``os.environ`` dictionary is updated in-place so that the modification is sure to work in all situations. :param remove: Environment variables to remove. :param update: Dictionary of environment variables and values to add/update. """ env = os.environ update = update or {} remove = remove or [] # List of environment variables being updated or removed. stomped = (set(update.keys()) | set(remove)) & set(env.keys()) # Environment variables and values to restore on exit. update_after = {k: env[k] for k in stomped} # Environment variables and values to remove on exit. remove_after = frozenset(k for k in update if k not in env) try: env.update(update) [env.pop(k, None) for k in remove] yield finally: env.update(update_after) [env.pop(k) for k in remove_after] class Robot: robot_type = "spiri-mu" def __init__(self, sysid: int, compose_files: List[Path] | str ): if sysid > 255 or sysid < 0: raise ValueError("sysid must be between 0 and 255") self.sysid = int(sysid) if isinstance(compose_files, str): compose_files = [ Path(file) for file in compose_files.replace("\n",",").split(",") ] self.compose_files = compose_files self.processes = [] def stop(self): #Signal all processes to stop for process in self.processes: process.terminate() process.kill() self.processes = [] for container in self.containers(): container.stop() container.remove(force=True) def containers(self): return docker_client.containers.list(all=True, filters={"name": f"robot-sim-{self.robot_type}-{self.sysid}"}) def start(self): """Starts the simulated drone with a given sysid, each drone must have it's own unique ID. """ instance = self.sysid-1 sysid = self.sysid with logger.contextualize(syd_id=sysid): env = os.environ with modified_environ( SERIAL0_PORT=str(int(env["SERIAL0_PORT"]) + 10 * instance), MAVROS2_PORT=str(int(env["MAVROS2_PORT"]) + 10 * instance), MAVROS1_PORT=str(int(env["MAVROS1_PORT"]) + 10 * instance), FDM_PORT_IN=str(int(env["FDM_PORT_IN"]) + 10 * instance), SITL_PORT=str(int(env["SITL_PORT"]) + 10 * instance), INSTANCE=str(instance), DRONE_SYS_ID=str(self.sysid), ): logger.info("Starting drone stack, this may take some time") for compose_file in self.compose_files: if not isinstance(compose_file, Path): compose_file = Path(compose_file) if not compose_file.exists(): raise FileNotFoundError(f"File {compose_file} does not exist") ui.label(f"Starting drone stack {compose_file}") args = [ "docker-compose", "--profile", "uav-sim", "-p", f"robot-sim-{self.robot_type}-{sysid}", "-f", compose_file.as_posix(), "up", ] command = " ".join(args) logger.info(f"Starting drone stack with command: {command}") docker_stack = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, )