spiri-sdk/skel/Desktop/simulated-drone/sim_drone.py

59 lines
1.7 KiB
Python
Executable File

#!/bin/env python3
import typer, os, sys
from loguru import logger
import sh
import atexit
logger.remove()
logger.add(sys.stdout, format="<green>{time}</green> <level>{level}</level> {extra} {message}")
px4Path = os.environ.get("SPIRI_SIM_PX4_PATH","/opt/spiri-sdk/PX4-Autopilot/build/px4_sitl_default/bin/px4")
logger.info(f"SPIRI_SIM_PX4_PATH={px4Path}")
px4 = sh.Command(px4Path)
app = typer.Typer()
class outputLogger():
"""
Logs command output to loguru
"""
def __init__(self, name):
self.name=name
def __call__(self, message):
with logger.contextualize(cmd=self.name):
if message.endswith('\n'):
message = message[:-1]
#ToDo, this doesn't work because the output is coloured
if message.startswith('INFO'):
message=message.lstrip("INFO")
logger.info(message)
elif message.startswith('WARN'):
message=message.lstrip("WARN")
logger.warning(message)
else:
logger.info(message)
@app.command()
def start(sys_id: int =1):
"""Starts the simulated drone with a given sys_id,
each drone must have it's own unique ID.
"""
instance_id = sys_id-1 #PX4 will add 1 to the instance ID to get sysID
px4Instance = px4(i=instance_id,
_out=outputLogger("px4"), _bg=True)
atexit.register(lambda: px4Instance.kill())
docker_stack = sh.docker.compose(
"--project-name", f"simulated_robot_{sys_id}",
"up",
_out=outputLogger("docker_stack"),
_bg=True
)
atexit.register(lambda: docker_stack.kill())
px4Instance.wait()
docker_stack.wait()
if __name__ == "__main__":
app()