spiri-sdk/skel/Desktop/simulated-drone/sim_drone.py
2024-06-18 15:09:29 -03:00

187 lines
5.8 KiB
Python
Executable File

#!/bin/env python3
import typer
import os
import sys
import contextlib
import pathlib
import time
import functools
from typing import List
from loguru import logger
import sh
import atexit
logger.remove()
logger.add(
sys.stdout, format="<green>{time}</green> <level>{level}</level> {extra} {message}"
)
px4Path = os.environ.get(
"SPIRI_SIM_PX4_PATH", "/opt/spiri-sdk/PX4-Autopilot/build/px4_sitl_default/bin/px4"
)
logger.info(f"SPIRI_SIM_PX4_PATH={px4Path}")
px4 = sh.Command(px4Path)
app = typer.Typer()
# This is a list of processes that we need to .kill and .wait for on exit
processes = []
class outputLogger:
"""
Logs command output to loguru
"""
def __init__(self, name, instance):
self.name = name
self.instance = instance
def __call__(self, message):
with logger.contextualize(cmd=self.name, instance=self.instance):
if message.endswith("\n"):
message = message[:-1]
# ToDo, this doesn't work because the output is coloured
if message.startswith("INFO"):
message = message.lstrip("INFO")
logger.info(message)
elif message.startswith("WARN"):
message = message.lstrip("WARN")
logger.warning(message)
elif message.startswith("ERROR"):
message = message.lstrip("ERROR")
logger.error(message)
elif message.startswith("DEBUG"):
message = message.lstrip("DEBUG")
logger.debug(message)
else:
logger.info(message)
@contextlib.contextmanager
def modified_environ(*remove, **update):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification
is sure to work in all situations.
:param remove: Environment variables to remove.
:param update: Dictionary of environment variables and values to add/update.
"""
env = os.environ
update = update or {}
remove = remove or []
# List of environment variables being updated or removed.
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
# Environment variables and values to restore on exit.
update_after = {k: env[k] for k in stomped}
# Environment variables and values to remove on exit.
remove_after = frozenset(k for k in update if k not in env)
try:
env.update(update)
[env.pop(k, None) for k in remove]
yield
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]
def wait_for_gazebo(timeout=60, interval=1):
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Run the 'gz topic list' command
topics = sh.gz("topic", "-l").strip()
if topics:
logger.info("Gazebo Ignition is running.")
return True
except sh.ErrorReturnCode:
# If the command fails, Gazebo might not be running yet
pass
logger.info(
f"Gazebo Ignition is not running. Retrying in {interval} seconds..."
)
time.sleep(interval)
logger.error("Timeout reached. Gazebo Ignition is not running.")
return False
@app.command()
def start(sys_id: int = 1, extra_apps: List[pathlib.Path] = []):
"""Starts the simulated drone with a given sys_id,
each drone must have it's own unique ID.
"""
with logger.contextualize(syd_id=sys_id):
with modified_environ(
PX4_SIM_MODEL=os.environ.get("PX4_SIM_MODEL") or "gz_x500",
PX4_GZ_MODEL_POSE=os.environ.get("PX4_GZ_MODEL_POSE") or f"0,{(sys_id-1)}",
PX4_INSTANCE="intance_id",
ROS_MASTER_PORT=str(11310 + sys_id),
MAVROS_SIM_GCS_PORT=str(14556 + sys_id),
MAVROS_SIM_FCU_PORT=str(14539 + sys_id),
DRONE_SYS_ID=str(sys_id),
):
instance_id = sys_id - 1 # PX4 will add 1 to the instance ID to get sysID
px4Instance = px4(
"-d", i=instance_id, _out=outputLogger("px4", instance_id), _bg=True
)
processes.append(px4Instance)
logger.info("Starting drone stack, this may take some time")
docker_stack = sh.docker.compose(
"--project-name",
f"simulated_robot_{sys_id}",
"up",
_out=outputLogger("docker_stack", sys_id),
_err=sys.stderr,
_bg=True,
)
processes.append(docker_stack)
for extra_app in extra_apps:
logger.info(f"Starting app {extra_app}, this may take some time")
app_name = extra_app.stem
app_stack = sh.docker.compose(
"--project-name",
f"simulated_robot_{sys_id}_{app_name}",
"up",
_out=outputLogger(app_name, sys_id),
_err=sys.stderr,
_bg=True,
)
processes.append(app_stack)
@app.command()
def start_group(
count: int = typer.Argument(min=1, max=10), extra_apps: List[pathlib.Path] = []
):
"""Start a group of robots"""
for i in range(count):
logger.info(f"start robot {i}")
start(sys_id=i + 1, extra_apps=extra_apps)
if i == 0:
wait_for_gazebo()
def cleanup():
# Wait for all subprocesses to exit
logger.info("Waiting for commands to exit")
print(processes)
for waitable in processes:
waitable.kill()
waitable.wait()
atexit.register(cleanup)
if __name__ == "__main__":
try:
app()
except KeyboardInterrupt:
logger.info("KeyboardInterrupt caught, exiting...")
cleanup()
sys.exit(0)