from nicegui import run, ui import subprocess from loguru import logger import os GZ_TOPIC_INFO = ["gz", "topic", "-i", "-t"] ENABLE_STREAMING_TOPIC = "/world/{world_name}/model/{robot_name}/link/pitch_link/sensor/camera/image/enable_streaming" class EnableStreamingButton(ui.element): def __init__(self, robot_name, state: bool = False) -> None: super().__init__() self.robot_name = robot_name self._state = state self.button = None with self.classes(): with ui.row(): self.button = ui.button(on_click=self.on_click) self.button._text = ( f'{"Disable" if self._state else "Enable"}' + " Video" ) self.button.props(f'color={"red" if self._state else "green"}') async def on_click(self) -> None: spinner = ui.spinner(size="lg") # So we don't block UI result = await run.cpu_bound(self.enable_streaming, self.robot_name, not self._state) if result: ui.notify("Success", type="positive]") self.set_state(state=not self._state) else: self.set_state(state=False) ui.notify("No Video Streaming Available..", type="negative") spinner.delete() def set_state(self, state: bool): self._state = state self.update() def update(self) -> None: self.button._text = f'{"Disable" if self._state else "Enable"}' + " Video" self.button.props(f'color={"red" if self._state else "green"}') def stop_video(self): if self._state != False: self.enable_streaming(robot_name=self.robot_name, is_streaming=False) self.set_state(state=False) @staticmethod def enable_streaming(robot_name: str, is_streaming: bool) -> bool: world_name = os.environ["WORLD_NAME"] # Check if this topic has any subscribers i.e. model is up gz_topic_list_proc = subprocess.Popen( GZ_TOPIC_INFO + [ENABLE_STREAMING_TOPIC.format(world_name=world_name, robot_name=robot_name)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) # Check exceptions.. timeout, error etc. output, error = gz_topic_list_proc.communicate(timeout=15) if gz_topic_list_proc.returncode != 0: logger.error(error) return False if "No subscribers".casefold() in output.casefold(): logger.error("No Subscribers on enable_streaming topic.") return False gz_topic_pub = [ "gz", "topic", "-t", ENABLE_STREAMING_TOPIC.format(world_name=world_name, robot_name=robot_name), "-m", "gz.msgs.Boolean", "-p", f"data:{is_streaming}", ] subprocess.run((gz_topic_pub)) return True