from loguru import logger from pathlib import Path import contextlib from typing import List import os import sh import subprocess from nicegui import ui, run, app, binding import yaml import docker import aiodocker import asyncio from spiri_sdk_guitools.video_button import EnableStreamingButton from collections import defaultdict import importlib.util docker_client = docker.from_env() from rclpy.executors import ExternalShutdownException from rclpy.node import Node import rclpy import threading async def ros_loop(): rclpy.init() node = rclpy.create_node('async_subscriber') while rclpy.ok(): rclpy.spin_once(node, timeout_sec=0) await asyncio.sleep(0.1) app.on_startup(ros_loop()) @contextlib.contextmanager def modified_environ(*remove, **update): """ Temporarily updates the ``os.environ`` dictionary in-place. The ``os.environ`` dictionary is updated in-place so that the modification is sure to work in all situations. :param remove: Environment variables to remove. :param update: Dictionary of environment variables and values to add/update. """ env = os.environ update = update or {} remove = remove or [] # List of environment variables being updated or removed. stomped = (set(update.keys()) | set(remove)) & set(env.keys()) # Environment variables and values to restore on exit. update_after = {k: env[k] for k in stomped} # Environment variables and values to remove on exit. remove_after = frozenset(k for k in update if k not in env) try: env.update(update) [env.pop(k, None) for k in remove] yield finally: env.update(update_after) [env.pop(k) for k in remove_after] robots = set() async def container_logs(container, element): adocker = aiodocker.Docker() with element: with ui.scroll_area(): acontainer = await adocker.containers.get(container.id) async for log in acontainer.log(stdout=True, stderr=True, follow=True): for line in log.splitlines(): ui.label(line) # ui.html(conv.convert(bytes(log,'utf-8').decode('utf-8', 'xmlcharrefreplace'), full=False)) robot_types = {} class Robot: def __init_subclass__(self): #Register sub-classes as plugins robot_types[self.robot_type] = self for file in Path("/robots").glob("**/robot_plugins.py"): logger.info(f"Loading plugin {file}") spec = importlib.util.spec_from_file_location("robot_plugins", file) plugin = importlib.util.module_from_spec(spec) spec.loader.exec_module(plugin) logger.info(f"Loaded plugin {file}") async def topic_subscriber(element, topic_name): while True: with element: element.clear() ui.label(topic_name) await asyncio.sleep(1) class Spirimu(Robot): robot_type = "spiri_mu" def __init__(self, sysid: int, compose_files: List[Path] | str): if sysid > 255 or sysid < 0: raise ValueError("sysid must be between 0 and 255") self.sysid = int(sysid) if isinstance(compose_files, str): compose_files = [ Path(file) for file in compose_files.replace("\n", ",").split(",") ] self.compose_files = compose_files self.processes = [] self.video_button = None robots.add(self) #Ros doesn't like dashes in node names self.robot_name = f"{self.robot_type}_{self.sysid}".replace("-","_") # self.wold_name = os.environ.get("WORLD_NAME", "citadel_hill") @classmethod async def launch_widget(cls, robots_widget): newRobotParams = defaultdict(binding.BindableProperty) ui.number(value=1, label="SysID", min=1, max=254, ).bind_value(newRobotParams, 'sysid') default_robot_compose = "" for compose_file in Path("/robots").glob("**/docker-compose.yaml"): specialArgs = "" try: data = yaml.safe_load(compose_file.read_text()) except yaml.YAMLError: default_robot_compose += f"#{compose_file} not valid \n" continue specialArgs = data.get("x-spiri-sdk-default-args", "") enabled = str(data.get("x-spiri-sdk-default-enabled", True)) enabled = enabled.lower() in ["true", "yes", "1"] docstring = data.get("x-spiri-sdk-doc", "") if docstring: for line in docstring.split("\n"): if line: default_robot_compose += f"# {line}\n" if not enabled: default_robot_compose += "##" default_robot_compose += f"{compose_file} {specialArgs}\n" ui.label("Compose files").classes("text-xl") ui.codemirror(value=default_robot_compose, language="bash", theme="basicDark").bind_value(newRobotParams, 'compose_files') async def new_robot(): compose_files = [] #Split on comma or newline, and remove comments for line in newRobotParams['compose_files'].split('\n'): line = line.split('#')[0].strip() if line: compose_files.append(line) current_robot = newRobotParams.copy() current_robot['compose_files'] = compose_files robot = cls(**current_robot) asyncio.tasks.create_task(robot.ui(robots_widget)) newRobotParams['sysid'] += 1 ui.button("Add", on_click=new_robot) async def ui_containers(self, element): docker_elements = {} container_status = {} with element: while True: with logger.catch(): # Poll for data that changes for container in self.containers(): try: health = container.attrs["State"]["Health"]["Status"] except KeyError: health = "Unknown" container_status[container] = ( f"{container.name} {container.status} {health}" ) if container not in docker_elements: docker_elements[container] = ui.element().classes("w-full") with docker_elements[container]: ui.label().bind_text(container_status, container).classes( "text-2xl" ) #Show the command the container is running # ui.label(container.attrs["Config"]["Cmd"]) cmd_widget = ui.codemirror(" ".join(container.attrs["Config"]["Cmd"]), language="bash",theme="basicDark").classes('h-auto max-h-32') cmd_widget.enabled = False with ui.expansion("Env Variables").classes("w-full outline outline-1").style("margin: 10px;"): env_widget = ui.codemirror("\n".join(container.attrs["Config"]["Env"]), language="bash",theme="basicDark") env_widget.enabled = False logelement = ( ui.expansion("Logs") .style("margin: 10px;") .classes("w-full outline outline-1") ) asyncio.create_task(container_logs(container, logelement)) with ui.expansion("Full details").classes("w-full outline outline-1").style("margin: 10px;"): details_widget = ui.codemirror(yaml.dump(container.attrs), language="yaml",theme="basicDark") details_widget.enabled = False # Check for containers that have been removed removed = set(docker_elements.keys()) - set(self.containers()) for container in removed: self.robot_ui.remove(docker_elements[container]) docker_elements.pop(container) await asyncio.sleep(1) async def ui_ros(self, element): with element: node_dummy = Node("_ros2cli_dummy_to_show_topic_list") scroll_area = ui.scroll_area() def refresh_topics(): with scroll_area: scroll_area.clear() #Filter for topics that start with self.robot_name for topic in node_dummy.get_topic_names_and_types(): if self.robot_name in topic[0]: ui.label(topic[0]) # expander = ui.expansion(topic[0]).classes('w-full') # topic_subscriber(expander, topic[0]) ui.button("Refresh topics", on_click=refresh_topics).classes("m-2") async def ui_actions(self, element): pass with element: node_dummy = Node("_ros2cli_dummy_to_show_service_list") scroll_area = ui.scroll_area() def refresh_topics(): with scroll_area: scroll_area.clear() #Filter for topics that start with self.robot_name for topic in node_dummy.get_action_names_and_types(): if self.robot_name in topic[0]: ui.label(topic[0]) ui.button("Refresh topics", on_click=refresh_topics).classes("m-2") async def ui(self, element): adocker = aiodocker.Docker() with element: self.robot_ui = ui.element().classes("w-full outline p-4") with self.robot_ui: ui.label(f"{self.robot_type} {self.sysid}").classes("text-2xl") ui.label(f"""Sysid: {self.sysid}""") ui.button("Start", on_click=self.async_start).classes("m-2") ui.button("Stop", on_click=self.async_stop).classes("m-2") self.video_button = EnableStreamingButton(robot_name=self.robot_name).classes( "m-2" ) async def delete_robot(): await self.async_stop() robots.remove(self) element.remove(self.robot_ui) ui.button("Delete", on_click=delete_robot).classes("m-2") with ui.tabs() as tabs: tab_containers = ui.tab("Containers") tab_ros = ui.tab("ROS Topics") # tab_actions = ui.tab("Actions") with ui.tab_panels(tabs, value=tab_containers): tab = ui.tab_panel(tab_containers).classes("w-full") asyncio.create_task(self.ui_containers(tab)) tab = ui.tab_panel(tab_ros).classes("w-full") asyncio.create_task(self.ui_ros(tab)) # tab = ui.tab_panel(tab_actions).classes("w-full") # asyncio.create_task(self.ui_actions(tab)) async def async_stop(self): return await run.io_bound(self.stop) def stop(self): # If video was enabled, and we stop. Button state is wrong. # Stop video if it is enabled. if isinstance(self.video_button, EnableStreamingButton): self.video_button.stop_video() # Delete gazebo model self.delete_gz_model() # Signal all processes to stop for process in self.processes: process.terminate() process.kill() self.processes = [] for container in self.containers(): # container.stop() container.remove(force=True) def containers(self): return docker_client.containers.list( all=True, filters={"name": f"robot-sim-{self.robot_name}"} ) async def async_start(self): return await run.io_bound(self.start) def start(self): """Starts the simulated drone with a given sysid, each drone must have it's own unique ID. """ instance = self.sysid - 1 sysid = self.sysid with logger.contextualize(syd_id=sysid): env = os.environ with modified_environ( SERIAL0_PORT=str(int(env["SERIAL0_PORT"]) + 10 * instance), MAVROS2_PORT=str(int(env["MAVROS2_PORT"]) + 10 * instance), MAVROS1_PORT=str(int(env["MAVROS1_PORT"]) + 10 * instance), FDM_PORT_IN=str(int(env["FDM_PORT_IN"]) + 10 * instance), GSTREAMER_UDP_PORT=str(int(env["GSTREAMER_UDP_PORT"]) + 10 * instance), SITL_PORT=str(int(env["SITL_PORT"]) + 10 * instance), INSTANCE=str(instance), DRONE_SYS_ID=str(self.sysid), ROBOT_NAME=self.robot_name, WORLD_NAME=env["WORLD_NAME"], ): self.spawn_gz_model() logger.info("Starting drone stack, this may take some time") for compose_file in self.compose_files: arguments = compose_file.split(" ") arguments = [arg.strip() for arg in arguments] compose_file = arguments[0] arguments = arguments[1:] if not isinstance(compose_file, Path): compose_file = Path(compose_file) if not compose_file.exists(): raise FileNotFoundError(f"File {compose_file} does not exist") #Get the folder the compose file is in compose_folder = compose_file.parent args = [ "docker-compose", "--profile", "uav-sim", "-p", f"robot-sim-{self.robot_name}-{compose_folder.name}", "-f", compose_file.as_posix(), "up", *arguments, ] command = " ".join(args) logger.info(f"Starting drone stack with command: {command}") docker_stack = subprocess.Popen( args, # stdout=subprocess.PIPE, # stderr=subprocess.PIPE, ) logger.info(f"Started drone stack with PID: {docker_stack.pid}") @logger.catch def spawn_gz_model(self): sysid = self.sysid logger.info("") env = os.environ GSTREAMER_UDP_PORT = env["GSTREAMER_UDP_PORT"] FDM_PORT_IN = env["FDM_PORT_IN"] WORLD_NAME = env["WORLD_NAME"] DRONE_MODEL = env["DRONE_MODEL"] XACRO_CMD = [ "xacro", f"gstreamer_udp_port:={GSTREAMER_UDP_PORT}", f"fdm_port_in:={FDM_PORT_IN}", "model.xacro.sdf", "-o", "model.sdf", ] # This path is breaking if this drone_model folder doesnt exist! # TODO: fix this model path for minimal code maintenance ROS2_CMD = f"ros2 run ros_gz_sim create -world {WORLD_NAME} -file /ardupilot_gazebo/models/{DRONE_MODEL}/model.sdf -name {self.robot_name} -x {sysid - 1} -y 0 -z 0.195" xacro_proc = subprocess.Popen( XACRO_CMD, cwd=f"/ardupilot_gazebo/models/{DRONE_MODEL}", stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) ros2_gz_create_proc = subprocess.Popen( ROS2_CMD.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out, err = ros2_gz_create_proc.communicate(timeout=3) ros2_gz_create_proc.kill() return @logger.catch def delete_gz_model(self): sysid = self.sysid env = os.environ WORLD_NAME = env["WORLD_NAME"] # http://osrf-distributions.s3.amazonaws.com/gazebo/api/7.1.0/classgazebo_1_1physics_1_1Entity.html ENTITY_TYPE_MODEL = 0x00000002 REQUEST_ARG = f"name: '{self.robot_name}' type: {ENTITY_TYPE_MODEL}" GZ_SERVICE_CMD = [ "gz", "service", "-s", f"/world/{WORLD_NAME}/remove", "--reqtype", "gz.msgs.Entity", "--reptype", "gz.msgs.Boolean", "--timeout", "5000", "--req", REQUEST_ARG, ] remove_entity_proc = subprocess.Popen( GZ_SERVICE_CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out, err = remove_entity_proc.communicate(timeout=3) remove_entity_proc.kill()