spiri-sdk/guiTools/spiri_sdk_guitools/video_button.py

83 lines
2.9 KiB
Python

from nicegui import run, ui
import subprocess
from loguru import logger
import os
GZ_TOPIC_INFO = ["gz", "topic", "-i", "-t"]
ENABLE_STREAMING_TOPIC = "/world/{world_name}/model/{robot_name}/link/pitch_link/sensor/camera/image/enable_streaming"
class EnableStreamingButton(ui.element):
def __init__(self, robot_name, state: bool = False) -> None:
super().__init__()
self.robot_name = robot_name
self._state = state
self.button = None
with self.classes():
with ui.row():
self.button = ui.button(on_click=self.on_click)
self.button._text = (
f'{"Disable" if self._state else "Enable"}' + " Video"
)
self.button.props(f'color={"red" if self._state else "green"}')
async def on_click(self) -> None:
spinner = ui.spinner(size="lg")
# So we don't block UI
result = await run.cpu_bound(self.enable_streaming, self.robot_name, not self._state)
if result:
ui.notify("Success", type="positive]")
self.set_state(state=not self._state)
else:
self.set_state(state=False)
ui.notify("No Video Streaming Available..", type="negative")
spinner.delete()
def set_state(self, state: bool):
self._state = state
self.update()
def update(self) -> None:
self.button._text = f'{"Disable" if self._state else "Enable"}' + " Video"
self.button.props(f'color={"red" if self._state else "green"}')
def stop_video(self):
if self._state != False:
self.enable_streaming(robot_name=self.robot_name, is_streaming=False)
self.set_state(state=False)
@staticmethod
def enable_streaming(robot_name: str, is_streaming: bool) -> bool:
world_name = os.environ["WORLD_NAME"]
# Check if this topic has any subscribers i.e. model is up
gz_topic_list_proc = subprocess.Popen(
GZ_TOPIC_INFO
+ [ENABLE_STREAMING_TOPIC.format(world_name=world_name, robot_name=robot_name)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Check exceptions.. timeout, error etc.
output, error = gz_topic_list_proc.communicate(timeout=15)
if gz_topic_list_proc.returncode != 0:
logger.error(error)
return False
if "No subscribers".casefold() in output.casefold():
logger.error("No Subscribers on enable_streaming topic.")
return False
gz_topic_pub = [
"gz",
"topic",
"-t",
ENABLE_STREAMING_TOPIC.format(world_name=world_name, robot_name=robot_name),
"-m",
"gz.msgs.Boolean",
"-p",
f"data:{is_streaming}",
]
subprocess.run((gz_topic_pub))
return True