spiri-sdk/guiTools/spiri_sdk_guitools/sim_drone.py

71 lines
2.5 KiB
Python
Raw Normal View History

from loguru import logger
from pathlib import Path
import contextlib
from typing import List
@contextlib.contextmanager
def modified_environ(*remove, **update):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification
is sure to work in all situations.
:param remove: Environment variables to remove.
:param update: Dictionary of environment variables and values to add/update.
"""
env = os.environ
update = update or {}
remove = remove or []
# List of environment variables being updated or removed.
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
# Environment variables and values to restore on exit.
update_after = {k: env[k] for k in stomped}
# Environment variables and values to remove on exit.
remove_after = frozenset(k for k in update if k not in env)
try:
env.update(update)
[env.pop(k, None) for k in remove]
yield
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]
class Robot:
def __init__(self, sysid: int, compose_files: List[Path]):
if sysid > 255 or sysid < 0:
raise ValueError("sysid must be between 0 and 255")
self.processes = []
def start(self):
"""Starts the simulated drone with a given sys_id,
each drone must have it's own unique ID.
"""
instance = self.sysid-1
with logger.contextualize(syd_id=sys_id):
env = os.environ
with modified_environ(
SERIAL0_PORT=str(int(env["SERIAL0_PORT"]) + 10 * instance),
MAVROS2_PORT=str(int(env["MAVROS2_PORT"]) + 10 * instance),
MAVROS1_PORT=str(int(env["MAVROS1_PORT"]) + 10 * instance),
FDM_PORT_IN=str(int(env["FDM_PORT_IN"]) + 10 * instance),
SITL_PORT=str(int(env["SITL_PORT"]) + 10 * instance),
INSTANCE=str(instance),
DRONE_SYS_ID=str(self.sys_id),
):
logger.info("Starting drone stack, this may take some time")
docker_stack = sh.docker.compose(
"--profile",
"uav-sim",
"-p",
f"spiri-sdk-{sys_id}",
"up",
_out=outputLogger("docker_stack", sys_id),
_err=sys.stderr,
_bg=True,
)
self.processes.append(docker_stack)