#!/bin/env python3 import typer import os import sys import contextlib from dotenv import load_dotenv from typing import List from loguru import logger import sh import atexit load_dotenv() logger.remove() logger.add( sys.stdout, format="{time} {level} {extra} {message}" ) # px4Path = os.environ.get("SPIRI_SIM_PX4_PATH", "/opt/spiri-sdk/PX4-Autopilot/") # logger.info(f"SPIRI_SIM_PX4_PATH={px4Path}") app = typer.Typer() # This is a list of processes that we need to .kill and .wait for on exit processes = [] class outputLogger: """ Logs command output to loguru """ def __init__(self, name, instance): self.name = name self.instance = instance def __call__(self, message): with logger.contextualize(cmd=self.name, instance=self.instance): if message.endswith("\n"): message = message[:-1] # ToDo, this doesn't work because the output is coloured if message.startswith("INFO"): message = message.lstrip("INFO") logger.info(message) elif message.startswith("WARN"): message = message.lstrip("WARN") logger.warning(message) elif message.startswith("ERROR"): message = message.lstrip("ERROR") logger.error(message) elif message.startswith("DEBUG"): message = message.lstrip("DEBUG") logger.debug(message) else: logger.info(message) @contextlib.contextmanager def modified_environ(*remove, **update): """ Temporarily updates the ``os.environ`` dictionary in-place. The ``os.environ`` dictionary is updated in-place so that the modification is sure to work in all situations. :param remove: Environment variables to remove. :param update: Dictionary of environment variables and values to add/update. """ env = os.environ update = update or {} remove = remove or [] # List of environment variables being updated or removed. stomped = (set(update.keys()) | set(remove)) & set(env.keys()) # Environment variables and values to restore on exit. update_after = {k: env[k] for k in stomped} # Environment variables and values to remove on exit. remove_after = frozenset(k for k in update if k not in env) try: env.update(update) [env.pop(k, None) for k in remove] yield finally: env.update(update_after) [env.pop(k) for k in remove_after] # @app.command() def start(tracker_instance: int = 0, sys_id: int = 1): """Starts the simulated drone with a given sys_id, each drone must have it's own unique ID. """ if sys_id < 1 or sys_id > 254: logger.error("sys_id must be between 1 and 254") raise typer.Exit(code=1) with logger.contextualize(syd_id=sys_id): env = os.environ with modified_environ( SERIAL0_PORT=str(int(env["SERIAL0_PORT"]) + 10 * tracker_instance), MAVROS2_PORT=str(int(env["MAVROS2_PORT"]) + 10 * tracker_instance), MAVROS1_PORT=str(int(env["MAVROS1_PORT"]) + 10 * tracker_instance), FDM_PORT_IN=str(int(env["FDM_PORT_IN"]) + 10 * tracker_instance), SITL_PORT=str(int(env["SITL_PORT"]) + 10 * tracker_instance), TRACKER_INSTANCE=str(tracker_instance), DRONE_SYS_ID=str(sys_id), ): logger.info("Starting drone stack, this may take some time") docker_stack = sh.docker.compose( "--profile", "uav-sim", "up", _out=outputLogger("docker_stack", sys_id), _err=sys.stderr, _bg=True, ) processes.append(docker_stack) @app.command() def start_group(): env = os.environ sim_drone_count = int(env["SIM_DRONE_COUNT"]) start_ros_master() """Start a group of robots""" for i in range(sim_drone_count): logger.info(f"start robot {i}") start(tracker_instance=i, sys_id=i + 1) # if i == 0: # wait_for_gazebo() def start_ros_master(): docker_stack = sh.docker.compose( "--profile", "ros-master", "up", _out=outputLogger("docker_stack", "ros-master"), _err=sys.stderr, _bg=True, ) processes.append(docker_stack) def cleanup(): # Wait for all subprocesses to exit logger.info("Waiting for commands to exit") try: if processes: print(processes) for waitable in processes: waitable.kill() waitable.wait() except Exception as e: print(e) atexit.register(cleanup) if __name__ == "__main__": try: app() except KeyboardInterrupt: logger.info("KeyboardInterrupt caught, exiting...") cleanup() sys.exit(0)