83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
from nicegui import run, ui
|
|
import subprocess
|
|
from loguru import logger
|
|
import os
|
|
|
|
GZ_TOPIC_INFO = ["gz", "topic", "-i", "-t"]
|
|
ENABLE_STREAMING_TOPIC = "/world/{world_name}/model/spiri-{sysid}/link/pitch_link/sensor/camera/image/enable_streaming"
|
|
|
|
|
|
class EnableStreamingButton(ui.element):
|
|
def __init__(self, sysid, state: bool = False) -> None:
|
|
super().__init__()
|
|
self.sysid = sysid
|
|
self._state = state
|
|
self.button = None
|
|
with self.classes():
|
|
with ui.row():
|
|
self.button = ui.button(on_click=self.on_click)
|
|
self.button._text = (
|
|
f'{"Disable" if self._state else "Enable"}' + " Video"
|
|
)
|
|
self.button.props(f'color={"red" if self._state else "green"}')
|
|
|
|
async def on_click(self) -> None:
|
|
spinner = ui.spinner(size="lg")
|
|
# So we don't block UI
|
|
result = await run.cpu_bound(self.enable_streaming, self.sysid, not self._state)
|
|
if result:
|
|
ui.notify("Success", type="positive]")
|
|
self.set_state(state=not self._state)
|
|
else:
|
|
self.set_state(state=False)
|
|
ui.notify("No Video Streaming Available..", type="negative")
|
|
spinner.delete()
|
|
|
|
def set_state(self, state: bool):
|
|
self._state = state
|
|
self.update()
|
|
|
|
def update(self) -> None:
|
|
self.button._text = f'{"Disable" if self._state else "Enable"}' + " Video"
|
|
self.button.props(f'color={"red" if self._state else "green"}')
|
|
|
|
def stop_video(self):
|
|
if self._state != False:
|
|
self.enable_streaming(sysid=self.sysid, is_streaming=False)
|
|
self.set_state(state=False)
|
|
|
|
@staticmethod
|
|
def enable_streaming(sysid: int, is_streaming: bool) -> bool:
|
|
|
|
world_name = os.environ["WORLD_NAME"]
|
|
# Check if this topic has any subscribers i.e. model is up
|
|
gz_topic_list_proc = subprocess.Popen(
|
|
GZ_TOPIC_INFO
|
|
+ [ENABLE_STREAMING_TOPIC.format(world_name=world_name, sysid=sysid)],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
# Check exceptions.. timeout, error etc.
|
|
|
|
output, error = gz_topic_list_proc.communicate(timeout=15)
|
|
if gz_topic_list_proc.returncode != 0:
|
|
logger.error(error)
|
|
return False
|
|
if "No subscribers".casefold() in output.casefold():
|
|
logger.error("No Subscribers on enable_streaming topic.")
|
|
return False
|
|
|
|
gz_topic_pub = [
|
|
"gz",
|
|
"topic",
|
|
"-t",
|
|
ENABLE_STREAMING_TOPIC.format(world_name=world_name, sysid=sysid),
|
|
"-m",
|
|
"gz.msgs.Boolean",
|
|
"-p",
|
|
f"data:{is_streaming}",
|
|
]
|
|
subprocess.run((gz_topic_pub))
|
|
return True
|