From 6babf640018eb3a04ec50a3902e502d3ceae159a Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Fri, 25 Mar 2022 10:58:25 -0500 Subject: [PATCH 01/12] WIP real-time messaging example --- .../server/lg_monitor_server.py | 101 ++++++++++++++---- .../server/serializer_node.py | 57 +++++++++- 2 files changed, 132 insertions(+), 26 deletions(-) diff --git a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py index 552226f1..9fda5e20 100644 --- a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py +++ b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py @@ -11,7 +11,20 @@ from ..aliases.aliases import SerializedGraph from .enums.enums import ENUMS -APP_ID = 'LABGRAPH_MONITOR' +# Add additional nodes +from ....graphviz_support.graphviz_support.tests.demo_graph.noise_generator import NoiseGenerator, NoiseGeneratorConfig +from ....graphviz_support.graphviz_support.tests.demo_graph.amplifier import Amplifier, AmplifierConfig +from ....graphviz_support.graphviz_support.tests.demo_graph.attenuator import Attenuator, AttenuatorConfig +from ....graphviz_support.graphviz_support.tests.demo_graph.rolling_averager import RollingAverager, RollingConfig + +# Needed test configurations +NUM_FEATURES = 100 +WINDOW = 2.0 +REFRESH_RATE = 2.0 +OUT_IN_RATIO = 1.2 +ATTENUATION = 5.0 + +APP_ID = 'LABGRAPH.MONITOR' WS_SERVER = ENUMS.WS_SERVER STREAM = ENUMS.STREAM DEFAULT_IP = WS_SERVER.DEFAULT_IP @@ -19,40 +32,86 @@ DEFAULT_API_VERSION = WS_SERVER.DEFAULT_API_VERSION SAMPLE_RATE = 5 - -def run_server(data: SerializedGraph) -> None: - """ - A function that creates a Websocket server graph. - The server graph streams the lagraph topology to the clients - """ - class WSSenderNode(lg.Graph): +class WSSenderNode(lg.Graph): SERIALIZER: Serializer WS_SERVER_NODE: WSAPIServerNode + NOISE_GENERATOR: NoiseGenerator + ROLLING_AVERAGER: RollingAverager + AMPLIFIER: Amplifier + ATTENUATOR: Attenuator + + def setup_data(self, data) -> None: + self.data = data + def setup(self) -> None: - wsapi_server_config = WSAPIServerConfig( - app_id=APP_ID, - ip=WS_SERVER.DEFAULT_IP, - port=ENUMS.WS_SERVER.DEFAULT_PORT, - api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, - num_messages=-1, - enums=ENUMS(), - sample_rate=SAMPLE_RATE, + self.WS_SERVER_NODE.configure( + WSAPIServerConfig( + app_id=APP_ID, + ip=WS_SERVER.DEFAULT_IP, + port=ENUMS.WS_SERVER.DEFAULT_PORT, + api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, + num_messages=-1, + enums=ENUMS(), + sample_rate=SAMPLE_RATE, + ) ) self.SERIALIZER.configure( SerializerConfig( - data=data, + data=self.data, sample_rate=SAMPLE_RATE, stream_name=STREAM.LABGRAPH_MONITOR, stream_id=STREAM.LABGRAPH_MONITOR_ID ) ) - self.WS_SERVER_NODE.configure(wsapi_server_config) + self.NOISE_GENERATOR.configure( + NoiseGeneratorConfig( + sample_rate=float(SAMPLE_RATE), + num_features=NUM_FEATURES + ) + ) + + self.ROLLING_AVERAGER.configure( + RollingConfig(window=WINDOW) + ) + + self.AMPLIFIER.configure( + AmplifierConfig(out_in_ratio=OUT_IN_RATIO) + ) + + self.ATTENUATOR.configure( + AttenuatorConfig(attenuation=ATTENUATION) + ) def connections(self) -> lg.Connections: - return ((self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic),) + return ( + (self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT), + (self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT), + (self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT), + (self.NOISE_GENERATOR.OUTPUT, self.SERIALIZER.INPUT_1), + (self.ROLLING_AVERAGER.OUTPUT, self.SERIALIZER.INPUT_2), + (self.AMPLIFIER.OUTPUT, self.SERIALIZER.INPUT_3), + (self.ATTENUATOR.OUTPUT, self.SERIALIZER.INPUT_4), + (self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic), + ) def process_modules(self) -> Tuple[lg.Module, ...]: - return (self.SERIALIZER, self.WS_SERVER_NODE, ) + return ( + self.NOISE_GENERATOR, + self.ROLLING_AVERAGER, + self.AMPLIFIER, + self.ATTENUATOR, + self.SERIALIZER, + self.WS_SERVER_NODE, + ) + +def run_server(data: SerializedGraph) -> None: + """ + A function that creates a Websocket server graph. + The server graph streams the lagraph topology to the clients + """ + WS = WSSenderNode() + WS.setup_data(data) - lg.run(WSSenderNode) + runner = lg.ParallelRunner(graph=WS) + runner.run() diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 064294ab..47f8c273 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2004-present Facebook. All Rights Reserve +from typing import Optional import labgraph as lg import asyncio from labgraph.websockets.ws_server.ws_server_stream_message import ( @@ -8,6 +9,9 @@ ) from ..aliases.aliases import SerializedGraph +# Make it work with RandomMessage +from ....graphviz_support.graphviz_support.tests.demo_graph.random_message import RandomMessage +import numpy as np class SerializerConfig(lg.Config): data: SerializedGraph @@ -15,6 +19,11 @@ class SerializerConfig(lg.Config): stream_name: str stream_id: str +class DataState(lg.State): + data_1: Optional[np.ndarray] = None + data_2: Optional[np.ndarray] = None + data_3: Optional[np.ndarray] = None + data_4: Optional[np.ndarray] = None class Serializer(lg.Node): """ @@ -23,15 +32,53 @@ class Serializer(lg.Node): TOPIC = lg.Topic(WSStreamMessage) config: SerializerConfig + state: DataState + INPUT_1 = lg.Topic(RandomMessage) + INPUT_2 = lg.Topic(RandomMessage) + INPUT_3 = lg.Topic(RandomMessage) + INPUT_4 = lg.Topic(RandomMessage) + + @lg.subscriber(INPUT_1) + def add_message_1(self, message: RandomMessage) -> None: + self.state.data_1 = message.data + + @lg.subscriber(INPUT_2) + def add_message_2(self, message: RandomMessage) -> None: + self.state.data_2 = message.data + + @lg.subscriber(INPUT_3) + def add_message_3(self, message: RandomMessage) -> None: + self.state.data_3 = message.data + + @lg.subscriber(INPUT_4) + def add_message_4(self, message: RandomMessage) -> None: + self.state.data_4 = message.data + @lg.publisher(TOPIC) async def source(self) -> lg.AsyncPublisher: - await asyncio.sleep(.01) + await asyncio.sleep(.1) while True: - msg = WSStreamMessage( - samples=self.config.data, + output_data = { + key: value for key, value in self.config.data.items() + } + # Populate Serializer Node + output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_1[0]) + output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["data_content"] = str(self.state.data_2[0]) + output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["data_content"] = str(self.state.data_3[0]) + output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["data_content"] = str(self.state.data_4[0]) + + # Populate RollingAverage Node + output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_1[0]) + + # Populate Amplifier Node + output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_2[0]) + + # Populate Attenuator Node + output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_3[0]) + yield self.TOPIC, WSStreamMessage( + samples=output_data, stream_name=self.config.stream_name, stream_id=self.config.stream_id, ) - yield self.TOPIC, msg - await asyncio.sleep(1 / self.config.sample_rate) + await asyncio.sleep(1 / self.config.sample_rate), From b4a1c8c97786cbc27b2fa5c3b79e856d20b5029c Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Fri, 25 Mar 2022 14:01:30 -0500 Subject: [PATCH 02/12] WIP updated serialized message representation --- .../lg_monitor_node/lg_monitor_message.py | 2 +- .../server/serializer_node.py | 51 +++++++++++++------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py b/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py index 1e491676..c4024647 100644 --- a/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py +++ b/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py @@ -34,6 +34,6 @@ def serialize(self) -> SerializedMessage: name = annotation[0] type = (annotation[1]).__name__ - serialized_message['fields'][name] = type + serialized_message['fields'][name] = {"type": type} return serialized_message diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 47f8c273..59be6c33 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Copyright 2004-present Facebook. All Rights Reserve -from typing import Optional +from typing import Dict, Optional import labgraph as lg import asyncio from labgraph.websockets.ws_server.ws_server_stream_message import ( @@ -20,10 +20,10 @@ class SerializerConfig(lg.Config): stream_id: str class DataState(lg.State): - data_1: Optional[np.ndarray] = None - data_2: Optional[np.ndarray] = None - data_3: Optional[np.ndarray] = None - data_4: Optional[np.ndarray] = None + data_1: Optional[Dict] = None + data_2: Optional[Dict] = None + data_3: Optional[Dict] = None + data_4: Optional[Dict] = None class Serializer(lg.Node): """ @@ -41,19 +41,31 @@ class Serializer(lg.Node): @lg.subscriber(INPUT_1) def add_message_1(self, message: RandomMessage) -> None: - self.state.data_1 = message.data + self.state.data_1 = { + "timestamp": message.timestamp, + "numpy": list(message.data), + } @lg.subscriber(INPUT_2) def add_message_2(self, message: RandomMessage) -> None: - self.state.data_2 = message.data + self.state.data_2 = { + "timestamp": message.timestamp, + "numpy": list(message.data), + } @lg.subscriber(INPUT_3) def add_message_3(self, message: RandomMessage) -> None: - self.state.data_3 = message.data + self.state.data_3 = { + "timestamp": message.timestamp, + "numpy": list(message.data), + } @lg.subscriber(INPUT_4) def add_message_4(self, message: RandomMessage) -> None: - self.state.data_4 = message.data + self.state.data_4 = { + "timestamp": message.timestamp, + "numpy": list(message.data), + } @lg.publisher(TOPIC) async def source(self) -> lg.AsyncPublisher: @@ -63,19 +75,26 @@ async def source(self) -> lg.AsyncPublisher: key: value for key, value in self.config.data.items() } # Populate Serializer Node - output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_1[0]) - output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["data_content"] = str(self.state.data_2[0]) - output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["data_content"] = str(self.state.data_3[0]) - output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["data_content"] = str(self.state.data_4[0]) + output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] + output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] + output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] + output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["timestamp"]["content"] = self.state.data_4["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["data"]["content"] = self.state.data_4["numpy"] # Populate RollingAverage Node - output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_1[0]) + output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] + output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] # Populate Amplifier Node - output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_2[0]) + output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] + output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] # Populate Attenuator Node - output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["data_content"] = str(self.state.data_3[0]) + output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] + output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] yield self.TOPIC, WSStreamMessage( samples=output_data, stream_name=self.config.stream_name, From f9af520095e018b94023eb4450a4efeee5ea5b72 Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Mon, 28 Mar 2022 15:48:51 -0500 Subject: [PATCH 03/12] WIP separated real-time messaging example from lg_monitor_server.py --- .../generate_lg_monitor.py | 5 +- .../server/serializer_node.py | 46 ++++--- labgraph/examples/labgraph_monitor_example.py | 129 ++++++++++++++++++ 3 files changed, 155 insertions(+), 25 deletions(-) create mode 100644 labgraph/examples/labgraph_monitor_example.py diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 7f67f115..018b0728 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -210,6 +210,5 @@ def generate_labgraph_monitor(graph: lg.Graph) -> None: nodes ) - # Send the serialized graph to Front-End - # using LabGraph Websockets API - run_server(serialized_graph) + # Return the serialized topology of the graph + return serialized_graph \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 59be6c33..8f057844 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -71,30 +71,32 @@ def add_message_4(self, message: RandomMessage) -> None: async def source(self) -> lg.AsyncPublisher: await asyncio.sleep(.1) while True: - output_data = { - key: value for key, value in self.config.data.items() - } - # Populate Serializer Node - output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] - output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] - output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] - output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["timestamp"]["content"] = self.state.data_4["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["data"]["content"] = self.state.data_4["numpy"] + output_data = dict() + if hasattr(self.config, "data"): + output_data = { + key: value for key, value in self.config.data.items() + } + # Populate Serializer Node + output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] + output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] + output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] + output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["timestamp"]["content"] = self.state.data_4["timestamp"] + output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["data"]["content"] = self.state.data_4["numpy"] - # Populate RollingAverage Node - output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] - output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] - - # Populate Amplifier Node - output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] - output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] + # Populate RollingAverage Node + output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] + output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] + + # Populate Amplifier Node + output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] + output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] - # Populate Attenuator Node - output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] - output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] + # Populate Attenuator Node + output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] + output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] yield self.TOPIC, WSStreamMessage( samples=output_data, stream_name=self.config.stream_name, diff --git a/labgraph/examples/labgraph_monitor_example.py b/labgraph/examples/labgraph_monitor_example.py new file mode 100644 index 00000000..5fd7260f --- /dev/null +++ b/labgraph/examples/labgraph_monitor_example.py @@ -0,0 +1,129 @@ +import labgraph as lg +from typing import Tuple + +# Make the imports work when running from LabGraph root directory +import sys +sys.path.append("./") + +# LabGraph WebSockets Components +from labgraph.websockets.ws_server.ws_api_node_server import ( + WSAPIServerConfig, + WSAPIServerNode, +) + +# LabGraph Monitor Components +from extensions.yaml_support.labgraph_monitor.server.enums.enums import ENUMS +from extensions.yaml_support.labgraph_monitor.aliases.aliases import SerializedGraph +from extensions.yaml_support.labgraph_monitor.server.serializer_node import SerializerConfig, Serializer +from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import generate_labgraph_monitor + +# Graph Components +from extensions.graphviz_support.graphviz_support.tests.demo_graph.noise_generator import NoiseGeneratorConfig, NoiseGenerator +from extensions.graphviz_support.graphviz_support.tests.demo_graph.amplifier import AmplifierConfig, Amplifier +from extensions.graphviz_support.graphviz_support.tests.demo_graph.attenuator import AttenuatorConfig, Attenuator +from extensions.graphviz_support.graphviz_support.tests.demo_graph.rolling_averager import RollingConfig, RollingAverager + +# LabGraph WebSockets Configurations +APP_ID = 'LABGRAPH.MONITOR' +WS_SERVER = ENUMS.WS_SERVER +STREAM = ENUMS.STREAM +DEFAULT_IP = WS_SERVER.DEFAULT_IP +DEFAULT_PORT = WS_SERVER.DEFAULT_PORT +DEFAULT_API_VERSION = WS_SERVER.DEFAULT_API_VERSION +SAMPLE_RATE = 5 + +# Graph Configurations +NUM_FEATURES = 100 +WINDOW = 2.0 +REFRESH_RATE = 2.0 +OUT_IN_RATIO = 1.2 +ATTENUATION = 5.0 + +class Demo(lg.Graph): + # LabGraph WebSockets Component + WS_SERVER_NODE: WSAPIServerNode + + # LabGraph Monitor Component + SERIALIZER: Serializer + + # Graph Components + NOISE_GENERATOR: NoiseGenerator + ROLLING_AVERAGER: RollingAverager + AMPLIFIER: Amplifier + ATTENUATOR: Attenuator + + # Provide graph topology with `generate_labgraph_monitor()` + def set_topology(self, topology: SerializedGraph) -> None: + self._topology = topology + + def setup(self) -> None: + self.WS_SERVER_NODE.configure( + WSAPIServerConfig( + app_id=APP_ID, + ip=WS_SERVER.DEFAULT_IP, + port=ENUMS.WS_SERVER.DEFAULT_PORT, + api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, + num_messages=-1, + enums=ENUMS(), + sample_rate=SAMPLE_RATE, + ) + ) + self.SERIALIZER.configure( + SerializerConfig( + data=self._topology, + sample_rate=SAMPLE_RATE, + stream_name=STREAM.LABGRAPH_MONITOR, + stream_id=STREAM.LABGRAPH_MONITOR_ID, + ) + ) + self.NOISE_GENERATOR.configure( + NoiseGeneratorConfig( + sample_rate=float(SAMPLE_RATE), + num_features=NUM_FEATURES, + ) + ) + self.ROLLING_AVERAGER.configure( + RollingConfig( + window=WINDOW, + ) + ) + self.AMPLIFIER.configure( + AmplifierConfig( + out_in_ratio=OUT_IN_RATIO, + ) + ) + self.ATTENUATOR.configure( + AttenuatorConfig( + attenuation=ATTENUATION, + ) + ) + + def connections(self) -> lg.Connections: + return ( + (self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT), + (self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT), + (self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT), + (self.NOISE_GENERATOR.OUTPUT, self.SERIALIZER.INPUT_1), + (self.ROLLING_AVERAGER.OUTPUT, self.SERIALIZER.INPUT_2), + (self.AMPLIFIER.OUTPUT, self.SERIALIZER.INPUT_3), + (self.ATTENUATOR.OUTPUT, self.SERIALIZER.INPUT_4), + (self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic), + ) + + def process_modules(self) -> Tuple[lg.Module, ...]: + return ( + self.NOISE_GENERATOR, + self.ROLLING_AVERAGER, + self.AMPLIFIER, + self.ATTENUATOR, + self.SERIALIZER, + self.WS_SERVER_NODE, + ) + +if __name__ == "__main__": + graph = Demo() + topology = generate_labgraph_monitor(graph=graph) + graph.set_topology(topology) + + runner = lg.ParallelRunner(graph=graph) + runner.run() \ No newline at end of file From 33b07e4383c56deff51005e29040669d8e631968 Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Tue, 29 Mar 2022 14:17:57 -0500 Subject: [PATCH 04/12] WIP updated documentation and function names --- extensions/yaml_support/README.md | 79 ++++++++++---- .../generate_lg_monitor.py | 4 +- .../server/lg_monitor_server.py | 101 ++++-------------- 3 files changed, 81 insertions(+), 103 deletions(-) diff --git a/extensions/yaml_support/README.md b/extensions/yaml_support/README.md index 0eeb1b5d..b9d4a40b 100644 --- a/extensions/yaml_support/README.md +++ b/extensions/yaml_support/README.md @@ -44,17 +44,32 @@ python -m extensions.yaml_support.labgraph_yaml_parser.tests.test_lg_yaml_api #### Labgraph Monitor: -**generate_labgraph_monitor(graph: lg.Graph) -> None** : This function can be used to generate a serialized version of the passed graph instance. The serialized version of the graph will be streamed -to the clients using LabGraph Websockets API. +**generate_graph_topology(graph: lg.Graph) -> SerializedGraph** : This function can be used to generate a serialized version of the passed graph instance. +**run_topology(data: SerializedGraph) -> None**: This function can be used to send the generated serialized version of the graph instance via LabGraph Websockets API. -1. Call **generate_labgraph_monitor(graph: lg.Graph) -> None** and pass an instance of the graph as a parameter +The serialized version of the graph will be streamed to the clients using LabGraph Websockets API. + +1. Call **generate_graph_topology(graph: lg.Graph) -> SerializedGraph** and pass an instance of the graph as a parameter ``` +from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo + from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import ( - generate_labgraph_monitor + generate_graph_topology, +) + +from extensions.yaml_support.labgraph_monitor.server.lg_monitor_server import ( + run_topology, ) -generate_labgraph_monitor(graph) +# Initialize a Demo graph +graph = Demo() + +# Serialize its topology +topology = generate_graph_topology(graph) + +# Run the WebSockets API to send the topology to Front-End +run_topology(topology) ``` This will start a websocket server on localhost port 9000 (127.0.0.1:9000) @@ -118,8 +133,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -131,8 +150,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -144,8 +167,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -157,8 +184,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ], @@ -166,8 +197,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ], @@ -175,8 +210,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -184,11 +223,11 @@ E.g: } } }, - "produced_timestamp_s": 1644931422.141309, - "timestamp_s": 1644931422.141309 + "produced_timestamp_s": 1648581339.9652574, + "timestamp_s": 1648581339.965258 } ], - "batch_num": 54 + "batch_num": 31 } } } diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 018b0728..859298fb 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -7,8 +7,6 @@ from ..lg_monitor_node.lg_monitor_node import LabgraphMonitorNode from ..lg_monitor_node.lg_monitor_message import LabgraphMonitorMessage from ..aliases.aliases import SerializedGraph -from ..server.lg_monitor_server import run_server - def identify_upstream_message( in_edge: str, @@ -186,7 +184,7 @@ def serialize_graph( return serialized_graph -def generate_labgraph_monitor(graph: lg.Graph) -> None: +def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: """ A function that serialize the graph topology and send it using to LabGraphMonitor Front-End diff --git a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py index 9fda5e20..800ef1c5 100644 --- a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py +++ b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py @@ -11,20 +11,7 @@ from ..aliases.aliases import SerializedGraph from .enums.enums import ENUMS -# Add additional nodes -from ....graphviz_support.graphviz_support.tests.demo_graph.noise_generator import NoiseGenerator, NoiseGeneratorConfig -from ....graphviz_support.graphviz_support.tests.demo_graph.amplifier import Amplifier, AmplifierConfig -from ....graphviz_support.graphviz_support.tests.demo_graph.attenuator import Attenuator, AttenuatorConfig -from ....graphviz_support.graphviz_support.tests.demo_graph.rolling_averager import RollingAverager, RollingConfig - -# Needed test configurations -NUM_FEATURES = 100 -WINDOW = 2.0 -REFRESH_RATE = 2.0 -OUT_IN_RATIO = 1.2 -ATTENUATION = 5.0 - -APP_ID = 'LABGRAPH.MONITOR' +APP_ID = 'LABGRAPH_MONITOR' WS_SERVER = ENUMS.WS_SERVER STREAM = ENUMS.STREAM DEFAULT_IP = WS_SERVER.DEFAULT_IP @@ -32,86 +19,40 @@ DEFAULT_API_VERSION = WS_SERVER.DEFAULT_API_VERSION SAMPLE_RATE = 5 -class WSSenderNode(lg.Graph): + +def run_topology(data: SerializedGraph) -> None: + """ + A function that creates a Websocket server graph. + The server graph streams the lagraph topology to the clients + """ + class WSSenderNode(lg.Graph): SERIALIZER: Serializer WS_SERVER_NODE: WSAPIServerNode - NOISE_GENERATOR: NoiseGenerator - ROLLING_AVERAGER: RollingAverager - AMPLIFIER: Amplifier - ATTENUATOR: Attenuator - - def setup_data(self, data) -> None: - self.data = data - def setup(self) -> None: - self.WS_SERVER_NODE.configure( - WSAPIServerConfig( - app_id=APP_ID, - ip=WS_SERVER.DEFAULT_IP, - port=ENUMS.WS_SERVER.DEFAULT_PORT, - api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, - num_messages=-1, - enums=ENUMS(), - sample_rate=SAMPLE_RATE, - ) + wsapi_server_config = WSAPIServerConfig( + app_id=APP_ID, + ip=WS_SERVER.DEFAULT_IP, + port=ENUMS.WS_SERVER.DEFAULT_PORT, + api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, + num_messages=-1, + enums=ENUMS(), + sample_rate=SAMPLE_RATE, ) self.SERIALIZER.configure( SerializerConfig( - data=self.data, + data=data, sample_rate=SAMPLE_RATE, stream_name=STREAM.LABGRAPH_MONITOR, stream_id=STREAM.LABGRAPH_MONITOR_ID ) ) - self.NOISE_GENERATOR.configure( - NoiseGeneratorConfig( - sample_rate=float(SAMPLE_RATE), - num_features=NUM_FEATURES - ) - ) - - self.ROLLING_AVERAGER.configure( - RollingConfig(window=WINDOW) - ) - - self.AMPLIFIER.configure( - AmplifierConfig(out_in_ratio=OUT_IN_RATIO) - ) - - self.ATTENUATOR.configure( - AttenuatorConfig(attenuation=ATTENUATION) - ) + self.WS_SERVER_NODE.configure(wsapi_server_config) def connections(self) -> lg.Connections: - return ( - (self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.SERIALIZER.INPUT_1), - (self.ROLLING_AVERAGER.OUTPUT, self.SERIALIZER.INPUT_2), - (self.AMPLIFIER.OUTPUT, self.SERIALIZER.INPUT_3), - (self.ATTENUATOR.OUTPUT, self.SERIALIZER.INPUT_4), - (self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic), - ) + return ((self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic),) def process_modules(self) -> Tuple[lg.Module, ...]: - return ( - self.NOISE_GENERATOR, - self.ROLLING_AVERAGER, - self.AMPLIFIER, - self.ATTENUATOR, - self.SERIALIZER, - self.WS_SERVER_NODE, - ) - -def run_server(data: SerializedGraph) -> None: - """ - A function that creates a Websocket server graph. - The server graph streams the lagraph topology to the clients - """ - WS = WSSenderNode() - WS.setup_data(data) + return (self.SERIALIZER, self.WS_SERVER_NODE, ) - runner = lg.ParallelRunner(graph=WS) - runner.run() + lg.run(WSSenderNode) From 34fc1452bc9bd4c5765cb5eb97f95322ad5490dd Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Tue, 29 Mar 2022 14:42:23 -0500 Subject: [PATCH 05/12] WIP updated labgraph monitor example --- labgraph/examples/labgraph_monitor_example.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/labgraph/examples/labgraph_monitor_example.py b/labgraph/examples/labgraph_monitor_example.py index 5fd7260f..95c35297 100644 --- a/labgraph/examples/labgraph_monitor_example.py +++ b/labgraph/examples/labgraph_monitor_example.py @@ -15,7 +15,7 @@ from extensions.yaml_support.labgraph_monitor.server.enums.enums import ENUMS from extensions.yaml_support.labgraph_monitor.aliases.aliases import SerializedGraph from extensions.yaml_support.labgraph_monitor.server.serializer_node import SerializerConfig, Serializer -from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import generate_labgraph_monitor +from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import generate_graph_topology # Graph Components from extensions.graphviz_support.graphviz_support.tests.demo_graph.noise_generator import NoiseGeneratorConfig, NoiseGenerator @@ -122,7 +122,7 @@ def process_modules(self) -> Tuple[lg.Module, ...]: if __name__ == "__main__": graph = Demo() - topology = generate_labgraph_monitor(graph=graph) + topology = generate_graph_topology(graph=graph) graph.set_topology(topology) runner = lg.ParallelRunner(graph=graph) From 40c5126de84c714f86cdef917df0b031bfab1d80 Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Sun, 3 Apr 2022 11:07:49 -0500 Subject: [PATCH 06/12] updated graphviz demo graph for work with labgraph monitor fixed test case values with new graph topic names updated graphviz support README added graphviz example --- extensions/graphviz_support/README.md | 33 +++++--- .../tests/demo_graph/amplifier.py | 10 +-- .../tests/demo_graph/attenuator.py | 10 +-- .../graphviz_support/tests/demo_graph/demo.py | 12 +-- .../tests/demo_graph/noise_generator.py | 6 +- .../tests/demo_graph/rolling_averager.py | 10 +-- .../graphviz_support/tests/demo_graph/sink.py | 12 +-- .../graphviz_support/tests/output/test | 17 ++++ .../graphviz_support/tests/output/test.svg | 79 +++++++++++++++++++ .../tests/test_lg_graphviz_api.py | 32 ++++---- 10 files changed, 164 insertions(+), 57 deletions(-) create mode 100644 extensions/graphviz_support/graphviz_support/tests/output/test create mode 100644 extensions/graphviz_support/graphviz_support/tests/output/test.svg diff --git a/extensions/graphviz_support/README.md b/extensions/graphviz_support/README.md index 9bacfeae..c79d81dd 100644 --- a/extensions/graphviz_support/README.md +++ b/extensions/graphviz_support/README.md @@ -1,6 +1,6 @@ # Graphviz for LabGraph graphs -This extension provides an API to generate a graphviz visualization of the LabGraph topology. +This extension provides an API to generate a [graphviz](https://graphviz.org/) visualization of the LabGraph topology. ## Quick Start @@ -23,22 +23,33 @@ python setup.py install To make sure things are working: -1- Move to the root of the LabGraph directory: -``` -labgraph\extensions\graphviz_support> cd ../.. +1. Move to the root of the LabGraph directory: + +```bash +labgraph/extensions/graphviz_support> cd ../.. labgraph> ``` -2- Run the following test -``` +2. Run the following test + +```bash python -m extensions.graphviz_support.graphviz_support.tests.test_lg_graphviz_api ``` -**The output of the file for this test can be found at:**\ -extensions\graphviz_support\graphviz_support\tests\output + +**The output of the file for this test can be found at:** + +`extensions/graphviz_support/graphviz_support/tests/output/test.svg` + ### Generating a graphviz file To generate a graph visualization just call 'generate_graphviz' function and pass the appropriate parameters -``` -from extensions/graphviz_support/graphviz_support/generate_graphviz/generate_graphviz.py import generate_graphviz.py -generate_graphviz(graph, output_file) +```python +from extensions.graphviz_support.graphviz_support.generate_graphviz.generate_graphviz import generate_graphviz +from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo + +generate_graphviz(Demo(), "output.png") # you can also use "output.svg" ``` + +**It will produce the following diagram named `output.png` in your current directory:** + +![Graphviz diagram for nodes: RollingAverager, NoiseGenerator, Amplifier, Sink, and Attenuator](https://i.imgur.com/M4yL39x.png) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py index 1ac52cd6..9d5a1067 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py @@ -12,20 +12,20 @@ class AmplifierConfig(lg.Config): class Amplifier(lg.Node): - INPUT = lg.Topic(RandomMessage) - OUTPUT = lg.Topic(RandomMessage) + AMPLIFIER_INPUT = lg.Topic(RandomMessage) + AMPLIFIER_OUTPUT = lg.Topic(RandomMessage) config: AmplifierConfig def output(self, _in: float) -> float: return self.config.out_in_ratio * _in - @lg.subscriber(INPUT) - @lg.publisher(OUTPUT) + @lg.subscriber(AMPLIFIER_INPUT) + @lg.publisher(AMPLIFIER_OUTPUT) async def amplify(self, message: RandomMessage) -> lg.AsyncPublisher: current_time = time.time() output_data = np.array( [self.output(_in) for _in in message.data] ) - yield self.OUTPUT, RandomMessage( + yield self.AMPLIFIER_OUTPUT, RandomMessage( timestamp=current_time, data=output_data ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py index 55ecdb02..d90a5c6d 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py @@ -12,20 +12,20 @@ class AttenuatorConfig(lg.Config): class Attenuator(lg.Node): - INPUT = lg.Topic(RandomMessage) - OUTPUT = lg.Topic(RandomMessage) + ATTENUATOR_INPUT = lg.Topic(RandomMessage) + ATTENUATOR_OUTPUT = lg.Topic(RandomMessage) config: AttenuatorConfig def output(self, _in: float) -> float: return pow(10, (self.config.attenuation / 20)) * _in - @lg.subscriber(INPUT) - @lg.publisher(OUTPUT) + @lg.subscriber(ATTENUATOR_INPUT) + @lg.publisher(ATTENUATOR_OUTPUT) async def attenuate(self, message: RandomMessage) -> lg.AsyncPublisher: current_time = time.time() output_data = np.array( [self.output(_in) for _in in message.data] ) - yield self.OUTPUT, RandomMessage( + yield self.ATTENUATOR_OUTPUT, RandomMessage( timestamp=current_time, data=output_data ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py index 27237c94..3dadf96c 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py @@ -47,12 +47,12 @@ def setup(self) -> None: def connections(self) -> lg.Connections: return ( - (self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT), - (self.ROLLING_AVERAGER.OUTPUT, self.SINK.INPUT_1), - (self.AMPLIFIER.OUTPUT, self.SINK.INPUT_2), - (self.ATTENUATOR.OUTPUT, self.SINK.INPUT_3), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ROLLING_AVERAGER.ROLLING_AVERAGER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.AMPLIFIER.AMPLIFIER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ATTENUATOR.ATTENUATOR_INPUT), + (self.ROLLING_AVERAGER.ROLLING_AVERAGER_OUTPUT, self.SINK.SINK_INPUT_1), + (self.AMPLIFIER.AMPLIFIER_OUTPUT, self.SINK.SINK_INPUT_2), + (self.ATTENUATOR.ATTENUATOR_OUTPUT, self.SINK.SINK_INPUT_3), ) def process_modules(self) -> Tuple[lg.Module, ...]: diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py index f42aef9f..31f04e05 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py @@ -14,13 +14,13 @@ class NoiseGeneratorConfig(lg.Config): class NoiseGenerator(lg.Node): - OUTPUT = lg.Topic(RandomMessage) + NOISE_GENERATOR_OUTPUT = lg.Topic(RandomMessage) config: NoiseGeneratorConfig - @lg.publisher(OUTPUT) + @lg.publisher(NOISE_GENERATOR_OUTPUT) async def generate_noise(self) -> lg.AsyncPublisher: while True: - yield self.OUTPUT, RandomMessage( + yield self.NOISE_GENERATOR_OUTPUT, RandomMessage( timestamp=time.time(), data=np.random.rand(self.config.num_features) ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py index 90bc7857..c4d51a46 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py @@ -18,14 +18,14 @@ class RollingConfig(lg.Config): class RollingAverager(lg.Node): - INPUT = lg.Topic(RandomMessage) - OUTPUT = lg.Topic(RandomMessage) + ROLLING_AVERAGER_INPUT = lg.Topic(RandomMessage) + ROLLING_AVERAGER_OUTPUT = lg.Topic(RandomMessage) state: RollingState config: RollingConfig - @lg.subscriber(INPUT) - @lg.publisher(OUTPUT) + @lg.subscriber(ROLLING_AVERAGER_INPUT) + @lg.publisher(ROLLING_AVERAGER_OUTPUT) async def average(self, message: RandomMessage) -> lg.AsyncPublisher: current_time = time.time() self.state.messages.append(message) @@ -40,7 +40,7 @@ async def average(self, message: RandomMessage) -> lg.AsyncPublisher: [message.data for message in self.state.messages] ) mean_data = np.mean(all_data, axis=0) - yield self.OUTPUT, RandomMessage( + yield self.ROLLING_AVERAGER_OUTPUT, RandomMessage( timestamp=current_time, data=mean_data ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py index 59c340b6..3736482e 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py @@ -14,20 +14,20 @@ class SinkState(lg.State): class Sink(lg.Node): - INPUT_1 = lg.Topic(RandomMessage) - INPUT_2 = lg.Topic(RandomMessage) - INPUT_3 = lg.Topic(RandomMessage) + SINK_INPUT_1 = lg.Topic(RandomMessage) + SINK_INPUT_2 = lg.Topic(RandomMessage) + SINK_INPUT_3 = lg.Topic(RandomMessage) state: SinkState - @lg.subscriber(INPUT_1) + @lg.subscriber(SINK_INPUT_1) def got_message(self, message: RandomMessage) -> None: self.state.data_1 = message.data - @lg.subscriber(INPUT_2) + @lg.subscriber(SINK_INPUT_2) def got_message_2(self, message: RandomMessage) -> None: self.state.data_2 = message.data - @lg.subscriber(INPUT_3) + @lg.subscriber(SINK_INPUT_3) def got_message_3(self, message: RandomMessage) -> None: self.state.data_3 = message.data diff --git a/extensions/graphviz_support/graphviz_support/tests/output/test b/extensions/graphviz_support/graphviz_support/tests/output/test new file mode 100644 index 00000000..b7eb1c3b --- /dev/null +++ b/extensions/graphviz_support/graphviz_support/tests/output/test @@ -0,0 +1,17 @@ +digraph Demo { + center=true rankdir=LR + node [fontsize=12 height=1.5 shape=circle width=1.5] + NoiseGenerator + RollingAverager + Amplifier + Attenuator + Sink + Sink + Sink + NoiseGenerator -> RollingAverager + NoiseGenerator -> Amplifier + NoiseGenerator -> Attenuator + RollingAverager -> Sink + Amplifier -> Sink + Attenuator -> Sink +} diff --git a/extensions/graphviz_support/graphviz_support/tests/output/test.svg b/extensions/graphviz_support/graphviz_support/tests/output/test.svg new file mode 100644 index 00000000..daf81fcc --- /dev/null +++ b/extensions/graphviz_support/graphviz_support/tests/output/test.svg @@ -0,0 +1,79 @@ + + + + + + +Demo + + + +NoiseGenerator + +NoiseGenerator + + + +RollingAverager + +RollingAverager + + + +NoiseGenerator->RollingAverager + + + + + +Amplifier + +Amplifier + + + +NoiseGenerator->Amplifier + + + + + +Attenuator + +Attenuator + + + +NoiseGenerator->Attenuator + + + + + +Sink + +Sink + + + +RollingAverager->Sink + + + + + +Amplifier->Sink + + + + + +Attenuator->Sink + + + + + diff --git a/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py b/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py index ea63d439..2711b564 100644 --- a/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py +++ b/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py @@ -33,47 +33,47 @@ def test_out_edge_node_mapper(self) -> None: self.assertEqual(4, len(out_edge_node_map)) self.assertEqual( 'generate_noise', - out_edge_node_map['NOISE_GENERATOR/OUTPUT'].name + out_edge_node_map['NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT'].name ) self.assertEqual( 'average', - out_edge_node_map['ROLLING_AVERAGER/OUTPUT'].name + out_edge_node_map['ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT'].name ) self.assertEqual( 'amplify', - out_edge_node_map['AMPLIFIER/OUTPUT'].name + out_edge_node_map['AMPLIFIER/AMPLIFIER_OUTPUT'].name ) self.assertEqual( 'attenuate', - out_edge_node_map['ATTENUATOR/OUTPUT'].name + out_edge_node_map['ATTENUATOR/ATTENUATOR_OUTPUT'].name ) def test_in_out_edge_mapper(self) -> None: in_out_edge_map = in_out_edge_mapper(self.graph.__streams__.values()) self.assertEqual(6, len(in_out_edge_map)) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ROLLING_AVERAGER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['AMPLIFIER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['AMPLIFIER/AMPLIFIER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ATTENUATOR/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ATTENUATOR/ATTENUATOR_INPUT'] ) self.assertEqual( - 'ROLLING_AVERAGER/OUTPUT', - in_out_edge_map['SINK/INPUT_1'] + 'ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_1'] ) self.assertEqual( - 'AMPLIFIER/OUTPUT', - in_out_edge_map['SINK/INPUT_2'] + 'AMPLIFIER/AMPLIFIER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_2'] ) self.assertEqual( - 'ATTENUATOR/OUTPUT', - in_out_edge_map['SINK/INPUT_3'] + 'ATTENUATOR/ATTENUATOR_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_3'] ) def test_connect_to_upstream(self) -> None: From 76e1119c750e006b276eef72c916615f20641c8a Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Fri, 8 Apr 2022 15:50:12 -0500 Subject: [PATCH 07/12] matched subscriber topic paths with their publishers updated labgraph_monitor_example.py with new approach added a function that matches subscribers with their publisher grouping --- .../generate_lg_monitor.py | 45 ++++++++++- .../server/serializer_node.py | 80 +++++++++++-------- labgraph/examples/labgraph_monitor_example.py | 26 +++--- 3 files changed, 104 insertions(+), 47 deletions(-) diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 859298fb..471bff1a 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -183,6 +183,44 @@ def serialize_graph( return serialized_graph +def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: + """ + A function that matches subscribers with their publishers + + @params: + graph: An instance of the computational graph + + @return: A dictionary where the key is a publisher grouping + and the value is a dictionary of sets of topic paths subscribed and their groupings + """ + sub_pub_grouping_map: Dict[str, str] = {} + for stream in graph.__streams__.values(): + difference = set(stream.topic_paths).difference(LabgraphMonitorNode.in_edges) + if difference: + upstream_edge = max(difference, key=len) + for edge in stream.topic_paths: + if edge != upstream_edge: + # convert SERIALIZER/SERIALIZER_INPUT_1 to its grouping Serializer + edge_path = "/".join(edge.split("/")[:-1]) + edge_grouping = type(graph.__descendants__[edge_path]).__name__ + + # convert SERIALIZER/SERIALIZER_INPUT_1 to its topic SERIALIZER_INPUT_1 + topic_path = edge.split("/")[-1] + + # convert NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT to its grouping NoiseGenerator + group_path = "/".join(upstream_edge.split("/")[:-1]) + grouping = type(graph.__descendants__[group_path]).__name__ + + if grouping in sub_pub_grouping_map: + sub_pub_grouping_map[grouping]["topics"].add(topic_path) + sub_pub_grouping_map[grouping]["subscribers"].add(edge_grouping) + else: + sub_pub_grouping_map[grouping] = { + "topics": {topic_path}, + "subscribers": {edge_grouping}, + } + + return sub_pub_grouping_map def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: """ @@ -208,5 +246,8 @@ def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: nodes ) - # Return the serialized topology of the graph - return serialized_graph \ No newline at end of file + # Match subscribers with their publishers + sub_pub_map = sub_pub_grouping_map(graph) + + # Return the serialized topology of the graph + return serialized_graph, sub_pub_map \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 8f057844..24772481 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -15,6 +15,7 @@ class SerializerConfig(lg.Config): data: SerializedGraph + sub_pub_match: Dict sample_rate: int stream_name: str stream_id: str @@ -30,74 +31,87 @@ class Serializer(lg.Node): Convenience node for sending messages to a `WSAPIServerNode`. """ - TOPIC = lg.Topic(WSStreamMessage) + SERIALIZER_OUTPUT = lg.Topic(WSStreamMessage) config: SerializerConfig state: DataState - INPUT_1 = lg.Topic(RandomMessage) - INPUT_2 = lg.Topic(RandomMessage) - INPUT_3 = lg.Topic(RandomMessage) - INPUT_4 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_1 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_2 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_3 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_4 = lg.Topic(RandomMessage) - @lg.subscriber(INPUT_1) + def get_grouping(self, topic: lg.Topic) -> str: + """ + Matches subscriber topics with grouping that produced information + """ + for key, value in self.config.sub_pub_match.items(): + if topic.name in value["topics"]: + return key + return "" + + @lg.subscriber(SERIALIZER_INPUT_1) def add_message_1(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_1) self.state.data_1 = { + "grouping": grouping, "timestamp": message.timestamp, "numpy": list(message.data), } - @lg.subscriber(INPUT_2) + @lg.subscriber(SERIALIZER_INPUT_2) def add_message_2(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_2) self.state.data_2 = { + "grouping": grouping, "timestamp": message.timestamp, "numpy": list(message.data), } - @lg.subscriber(INPUT_3) + @lg.subscriber(SERIALIZER_INPUT_3) def add_message_3(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_3) self.state.data_3 = { + "grouping": grouping, "timestamp": message.timestamp, "numpy": list(message.data), } - @lg.subscriber(INPUT_4) + @lg.subscriber(SERIALIZER_INPUT_4) def add_message_4(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_4) self.state.data_4 = { + "grouping": grouping, "timestamp": message.timestamp, "numpy": list(message.data), } - @lg.publisher(TOPIC) + def output(self, _in: Dict) -> Dict: + """ + Updates serialized message with data according to grouping + + @params: + value of a dictionary that represents individual nodes + """ + try: + for node, value in _in.items(): + for state in self.state.__dict__.values(): + if state["grouping"] in value["upstreams"].keys(): + value["upstreams"][state["grouping"]][0]["fields"]["timestamp"]["content"] = state["timestamp"] + value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["numpy"] + except: + pass + return _in + + @lg.publisher(SERIALIZER_OUTPUT) async def source(self) -> lg.AsyncPublisher: await asyncio.sleep(.1) while True: output_data = dict() if hasattr(self.config, "data"): output_data = { - key: value for key, value in self.config.data.items() + key: self.output(value) for key, value in self.config.data.items() } - # Populate Serializer Node - output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] - output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["RollingAverager"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] - output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["Amplifier"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] - output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["timestamp"]["content"] = self.state.data_4["timestamp"] - output_data["nodes"]["Serializer"]["upstreams"]["Attenuator"][0]["fields"]["data"]["content"] = self.state.data_4["numpy"] - - # Populate RollingAverage Node - output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_1["timestamp"] - output_data["nodes"]["RollingAverager"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_1["numpy"] - - # Populate Amplifier Node - output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_2["timestamp"] - output_data["nodes"]["Amplifier"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_2["numpy"] - - # Populate Attenuator Node - output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["timestamp"]["content"] = self.state.data_3["timestamp"] - output_data["nodes"]["Attenuator"]["upstreams"]["NoiseGenerator"][0]["fields"]["data"]["content"] = self.state.data_3["numpy"] - yield self.TOPIC, WSStreamMessage( + yield self.SERIALIZER_OUTPUT, WSStreamMessage( samples=output_data, stream_name=self.config.stream_name, stream_id=self.config.stream_id, diff --git a/labgraph/examples/labgraph_monitor_example.py b/labgraph/examples/labgraph_monitor_example.py index 95c35297..ccd57a27 100644 --- a/labgraph/examples/labgraph_monitor_example.py +++ b/labgraph/examples/labgraph_monitor_example.py @@ -1,5 +1,5 @@ import labgraph as lg -from typing import Tuple +from typing import Dict, Tuple # Make the imports work when running from LabGraph root directory import sys @@ -53,8 +53,9 @@ class Demo(lg.Graph): ATTENUATOR: Attenuator # Provide graph topology with `generate_labgraph_monitor()` - def set_topology(self, topology: SerializedGraph) -> None: + def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict) -> None: self._topology = topology + self._sub_pub_match = sub_pub_map def setup(self) -> None: self.WS_SERVER_NODE.configure( @@ -71,6 +72,7 @@ def setup(self) -> None: self.SERIALIZER.configure( SerializerConfig( data=self._topology, + sub_pub_match=self._sub_pub_match, sample_rate=SAMPLE_RATE, stream_name=STREAM.LABGRAPH_MONITOR, stream_id=STREAM.LABGRAPH_MONITOR_ID, @@ -100,14 +102,14 @@ def setup(self) -> None: def connections(self) -> lg.Connections: return ( - (self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.SERIALIZER.INPUT_1), - (self.ROLLING_AVERAGER.OUTPUT, self.SERIALIZER.INPUT_2), - (self.AMPLIFIER.OUTPUT, self.SERIALIZER.INPUT_3), - (self.ATTENUATOR.OUTPUT, self.SERIALIZER.INPUT_4), - (self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ROLLING_AVERAGER.ROLLING_AVERAGER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.AMPLIFIER.AMPLIFIER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ATTENUATOR.ATTENUATOR_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_1), + (self.ROLLING_AVERAGER.ROLLING_AVERAGER_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_2), + (self.AMPLIFIER.AMPLIFIER_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_3), + (self.ATTENUATOR.ATTENUATOR_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_4), + (self.SERIALIZER.SERIALIZER_OUTPUT, self.WS_SERVER_NODE.topic), ) def process_modules(self) -> Tuple[lg.Module, ...]: @@ -122,8 +124,8 @@ def process_modules(self) -> Tuple[lg.Module, ...]: if __name__ == "__main__": graph = Demo() - topology = generate_graph_topology(graph=graph) - graph.set_topology(topology) + topology, sub_pub_map = generate_graph_topology(graph=graph) + graph.set_topology(topology, sub_pub_map) runner = lg.ParallelRunner(graph=graph) runner.run() \ No newline at end of file From 5dd7a2d4121705c2f475994421bc634661d1b69b Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Sat, 9 Apr 2022 18:05:03 -0500 Subject: [PATCH 08/12] added code review suggestions moved the labgraph monitor example to extensions instead of core library added abstraction to setting up graph topology updated serializer note to only work with right dictionaries fixed a mistake in lg_monitor_server.py --- .../examples/labgraph_monitor_example.py | 5 ++-- .../generate_lg_monitor.py | 23 +++++++++++++++---- .../server/lg_monitor_server.py | 2 +- .../server/serializer_node.py | 21 ++++++++--------- 4 files changed, 32 insertions(+), 19 deletions(-) rename {labgraph => extensions/yaml_support/labgraph_monitor}/examples/labgraph_monitor_example.py (96%) diff --git a/labgraph/examples/labgraph_monitor_example.py b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py similarity index 96% rename from labgraph/examples/labgraph_monitor_example.py rename to extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py index ccd57a27..8c04504c 100644 --- a/labgraph/examples/labgraph_monitor_example.py +++ b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py @@ -52,7 +52,7 @@ class Demo(lg.Graph): AMPLIFIER: Amplifier ATTENUATOR: Attenuator - # Provide graph topology with `generate_labgraph_monitor()` + # Used when running `generate_labgraph_monitor(graph)` def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict) -> None: self._topology = topology self._sub_pub_match = sub_pub_map @@ -124,8 +124,7 @@ def process_modules(self) -> Tuple[lg.Module, ...]: if __name__ == "__main__": graph = Demo() - topology, sub_pub_map = generate_graph_topology(graph=graph) - graph.set_topology(topology, sub_pub_map) + generate_graph_topology(graph=graph) runner = lg.ParallelRunner(graph=graph) runner.run() \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 471bff1a..1f07c0f8 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -3,7 +3,7 @@ import labgraph as lg from labgraph.graphs.stream import Stream -from typing import List, Dict +from typing import List, Dict, Tuple from ..lg_monitor_node.lg_monitor_node import LabgraphMonitorNode from ..lg_monitor_node.lg_monitor_message import LabgraphMonitorMessage from ..aliases.aliases import SerializedGraph @@ -222,7 +222,7 @@ def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: return sub_pub_grouping_map -def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: +def generate_graph_topology(graph: lg.Graph) -> None: """ A function that serialize the graph topology and send it using to LabGraphMonitor Front-End @@ -249,5 +249,20 @@ def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: # Match subscribers with their publishers sub_pub_map = sub_pub_grouping_map(graph) - # Return the serialized topology of the graph - return serialized_graph, sub_pub_map \ No newline at end of file + # Set graph's topology and real-time messages matching + if hasattr(graph, "set_topology"): + graph.set_topology(serialized_graph, sub_pub_map) + else: + raise AttributeError( + """ + Provided graph is missing `set_topology` method to establish + its topology and possible real-time messsaging + + Please add the following method to your graph + ``` + def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict) -> None: + self._topology = topology + self._sub_pub_match = sub_pub_map + ``` + """ + ) \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py index 800ef1c5..b83ed9dd 100644 --- a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py +++ b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py @@ -50,7 +50,7 @@ def setup(self) -> None: self.WS_SERVER_NODE.configure(wsapi_server_config) def connections(self) -> lg.Connections: - return ((self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic),) + return ((self.SERIALIZER.SERIALIZER_OUTPUT, self.WS_SERVER_NODE.topic),) def process_modules(self) -> Tuple[lg.Module, ...]: return (self.SERIALIZER, self.WS_SERVER_NODE, ) diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 24772481..0dc787dc 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2004-present Facebook. All Rights Reserve +from dataclasses import field from typing import Dict, Optional import labgraph as lg import asyncio @@ -11,11 +12,10 @@ # Make it work with RandomMessage from ....graphviz_support.graphviz_support.tests.demo_graph.random_message import RandomMessage -import numpy as np class SerializerConfig(lg.Config): data: SerializedGraph - sub_pub_match: Dict + sub_pub_match: Optional[Dict] = field(default_factory=dict) sample_rate: int stream_name: str stream_id: str @@ -92,14 +92,12 @@ def output(self, _in: Dict) -> Dict: @params: value of a dictionary that represents individual nodes """ - try: - for node, value in _in.items(): - for state in self.state.__dict__.values(): - if state["grouping"] in value["upstreams"].keys(): - value["upstreams"][state["grouping"]][0]["fields"]["timestamp"]["content"] = state["timestamp"] - value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["numpy"] - except: - pass + for node, value in _in.items(): + for state in self.state.__dict__.values(): + if state["grouping"] in value["upstreams"].keys(): + value["upstreams"][state["grouping"]][0]["fields"]["timestamp"]["content"] = state["timestamp"] + value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["numpy"] + return _in @lg.publisher(SERIALIZER_OUTPUT) @@ -108,8 +106,9 @@ async def source(self) -> lg.AsyncPublisher: while True: output_data = dict() if hasattr(self.config, "data"): + # Populate Serialized Graph with real-time data output_data = { - key: self.output(value) for key, value in self.config.data.items() + key: self.output(value) for key, value in self.config.data.items() if key == "nodes" } yield self.SERIALIZER_OUTPUT, WSStreamMessage( samples=output_data, From 05a3937a69ca0e6248c7d5c0f8f2c585f6d8a5dd Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Sat, 9 Apr 2022 19:29:53 -0500 Subject: [PATCH 09/12] updated labgraph monitor readme and fixed test cases --- extensions/yaml_support/README.md | 1062 ++++++++++++++++- .../examples/labgraph_monitor_example.py | 4 +- .../generate_lg_monitor.py | 53 +- .../tests/test_lg_monitor_api.py | 38 +- 4 files changed, 1088 insertions(+), 69 deletions(-) diff --git a/extensions/yaml_support/README.md b/extensions/yaml_support/README.md index b9d4a40b..0ef179f5 100644 --- a/extensions/yaml_support/README.md +++ b/extensions/yaml_support/README.md @@ -1,20 +1,21 @@ -# YAML Support for LabGraph +# YAML Support | LabGraph Monitor -This extension provides an API to generate a serialized version of the labgraph topology. The serialized graph topology can be used in different applications E.g: server-client communication or to get a simplified overview of the topology in case of complicated graphs. +**This extension provides an API to generate a serialized version of the graph topology.** + +The serialized graph topology can be used in different applications. But it was specifically designed to work with server-client communications, such as working in sync with [LabGraph Monitor Front-End](https://github.com/facebookresearch/labgraph/tree/main/extensions/prototypes/labgraph_monitor) to get a simplified overview of the topology, along with real-time messaging, in case of complicated graphs. -## Quick Start -### Method 1 - building from source code +## Quick Start -**Prerequisites**: +### Prerequisites: - Python3\ Supported python version(s) - _ [Python3.6](https://www.python.org/downloads/) - _ [Python3.8](https://www.python.org/downloads/) (**RECOMMENDED**) -- Make sure to install [labgraph](https://github.com/facebookresearch/labgraph) before proceeding + - [Python3.6](https://www.python.org/downloads/) + - [Python3.8](https://www.python.org/downloads/) (**RECOMMENDED**) +- [LabGraph](https://github.com/facebookresearch/labgraph) -``` +```bash cd labgraph/extensions/yaml_support python setup.py install ``` @@ -32,26 +33,24 @@ labgraph> 2- Run the following tests -``` +```bash python -m extensions.yaml_support.labgraph_monitor.tests.test_lg_monitor_api -``` -``` python -m extensions.yaml_support.labgraph_yaml_parser.tests.test_lg_yaml_api ``` -### API +### LabGraph Monitor API: -#### Labgraph Monitor: +#### Stream Graph Topology Only: -**generate_graph_topology(graph: lg.Graph) -> SerializedGraph** : This function can be used to generate a serialized version of the passed graph instance. -**run_topology(data: SerializedGraph) -> None**: This function can be used to send the generated serialized version of the graph instance via LabGraph Websockets API. + - **`generate_graph_topology(graph: lg.Graph) -> SerializedGraph`** : This function can be used to generate a serialized version of the graph + - **`run_topology(data: SerializedGraph) -> None`**: This function can be used to send the generated serialized version of the graph instance via LabGraph WebSockets API -The serialized version of the graph will be streamed to the clients using LabGraph Websockets API. +The serialized version of the graph will be streamed to the clients using LabGraph's [WebSockets API](https://github.com/facebookresearch/labgraph/blob/main/docs/websockets-api.md). -1. Call **generate_graph_topology(graph: lg.Graph) -> SerializedGraph** and pass an instance of the graph as a parameter +1. Call **`generate_graph_topology(graph: lg.Graph) -> None`** to serialize your graph and stream via WebSockets API using **`run_topology(data: SerializedGraph) -> None`** -``` +```python from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import ( @@ -76,7 +75,7 @@ This will start a websocket server on localhost port 9000 (127.0.0.1:9000) 2. To start receiving data from the server, send the following `StartStreamRequest` to **ws://127.0.0.1:9000** -``` +```bash { "api_version": "0.1", "api_request": { @@ -94,7 +93,7 @@ A serialized representation of the graph should be received each 200ms The graph representation has the following schema: -``` +```python { name: "graph_name", nodes: { @@ -103,7 +102,14 @@ The graph representation has the following schema: "upstream_name":[ { name: "message_name", - type: "message_type", + fields: { + "timestamp": { + "type": "float", + }, + "data": { + "type": "ndarray" + } + } } ] } @@ -112,9 +118,9 @@ The graph representation has the following schema: } ``` -E.g: +An example of a single WebSockets API message -``` +```python { "stream_batch": { "stream_id": "LABGRAPH.MONITOR", @@ -234,11 +240,1015 @@ E.g: ``` The above-serialized representation was generated for the following graph: -https://github.com/facebookresearch/labgraph/blob/main/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py +[`extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py`](https://github.com/facebookresearch/labgraph/blob/main/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py +) 3. To stop receiving data from the server, send the following `EndStreamRequest` to **ws://127.0.0.1:9000** +```python +{ + "api_version": "0.1", + "api_request": { + "request_id": 1, + "end_stream_request": { + "stream_id": "LABGRAPH.MONITOR", + "labgraph.monitor": { + } + } + } +} +``` + +#### Stream Graph Topology AND Real-Time Messages: + +This is used to stream both the graph topology and real-time messages received by nodes. However, it requires certain modifications to your graph outlined below: + +1. Make sure that your graph is compatible with real-time messaging + + - It requires a `set_topology()` method to add attributes to your graph. This function is used by `generate_graph_topology(graph: lg.Graph) -> None` internally + + ```python + def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict[str, str]) -> None: + self._topology = topology + self._sub_pub_match = sub_pub_map + ``` + +- It also requires adding `WS_SERVER_NODE` and `SERIALIZER` nodes to your graph + + ```python + self.WS_SERVER_NODE.configure( + WSAPIServerConfig( + app_id=APP_ID, + ip=WS_SERVER.DEFAULT_IP, + port=ENUMS.WS_SERVER.DEFAULT_PORT, + api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, + num_messages=-1, + enums=ENUMS(), + sample_rate=SAMPLE_RATE, + ) + ) + self.SERIALIZER.configure( + SerializerConfig( + data=self._topology, + sub_pub_match=self._sub_pub_match, + sample_rate=SAMPLE_RATE, + stream_name=STREAM.LABGRAPH_MONITOR, + stream_id=STREAM.LABGRAPH_MONITOR_ID, + ) + ) + ``` + + - As well as establishing connections between those nodes. Learn more in the example at [`yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py`](https://github.com/facebookresearch/labgraph/blob/main/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py) + + 2. You can then run your graph + + ```python + from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import ( + set_graph_topology, + ) + + graph = Demo() + set_graph_topology(graph=graph) + + runner = lg.ParallelRunner(graph=graph) + runner.run() + ``` + +3. For now, you can simply run the example graph LabGraph's root directory to try it put + + ```bash + labgraph> python extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py + ``` + +4. To start receiving data from the server, send the following `StartStreamRequest` to **ws://127.0.0.1:9000** + + ```bash + { + "api_version": "0.1", + "api_request": { + "request_id": 1, + "start_stream_request": { + "stream_id": "LABGRAPH.MONITOR", + "labgraph.monitor": { + } + } + } + } + ``` + +The graph representation has the following schema: + +```python +{ + name: "graph_name", + nodes: { + "node_name":{ + upstreams:{ + "upstream_name":[ + { + name: "message_name", + fields: { + "timestamp": { + "type": "float", + "content": "real-time value" + }, + "data": { + "type": "ndarray", + "content" "real-time value" + } + } + } + ] + } + } + } +} +``` + + +
+ +Click to see a full example of a serialized graph + +```python + +{ + "stream_batch": { + "stream_id": "LABGRAPH.MONITOR", + "labgraph.monitor": { + "samples": [ + { + "data": { + "nodes": { + "WSAPIServerNode": { + "upstreams": { + "Serializer": [ + { + "name": "WSStreamMessage", + "fields": { + "samples": { + "type": "float64" + }, + "stream_name": { + "type": "str" + }, + "stream_id": { + "type": "str" + } + } + } + ] + } + }, + "Serializer": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ], + "RollingAverager": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.778323 + }, + "data": { + "type": "ndarray", + "content": [ + 0.4808310849498126, + 0.45765738986698584, + 0.5179472617869983, + 0.4545298437611365, + 0.511475834694006, + 0.6370143504941551, + 0.5240116417975883, + 0.6621674003277452, + 0.39416145544465786, + 0.626802455660307, + 0.4379461617134113, + 0.626555296204266, + 0.5212543973073265, + 0.5634172170782334, + 0.6610150120658533, + 0.5648671072511644, + 0.36228861954728664, + 0.40611552677263923, + 0.528085032194146, + 0.3795449315341961, + 0.5564513831254658, + 0.6020673565819801, + 0.3920276747808733, + 0.5398939864307285, + 0.44071762995863917, + 0.4685674882997472, + 0.523320469875816, + 0.4682660890981684, + 0.4880738071953582, + 0.6247477549472256, + 0.37262642294916287, + 0.5292429080330315, + 0.3560642133686165, + 0.42654180215294774, + 0.48254525230183826, + 0.474625466796003, + 0.566613001586366, + 0.6124545612880383, + 0.521925508913315, + 0.3644889004429638, + 0.5516832937307365, + 0.5487269330238846, + 0.3776278084794831, + 0.45893729288107465, + 0.4358817754996432, + 0.5017929922223582, + 0.6077143546380854, + 0.43706695205815843, + 0.586934260076837, + 0.5603784617106875, + 0.5966846437692761, + 0.38536876311879753, + 0.5872066280396688, + 0.2818328200671419, + 0.5298252461922792, + 0.47645896345478744, + 0.6548108936817794, + 0.4247103938231261, + 0.5238473325787164, + 0.42887801140492804, + 0.5385221653828477, + 0.4554488020724207, + 0.5162366459311485, + 0.47389905275366334, + 0.4999364257892879, + 0.5837869623887693, + 0.5223744126545582, + 0.4912632015305095, + 0.45473824666589113, + 0.49088925869540045, + 0.5301869530654425, + 0.6173096417477979, + 0.4567178020980392, + 0.47735235791449043, + 0.4157221801958209, + 0.4549033636138871, + 0.46876329693604396, + 0.4322555231630121, + 0.5574453194471654, + 0.5911688810594924, + 0.5747328520895914, + 0.5189652304462694, + 0.5323873893686856, + 0.40735471004222557, + 0.5515387981684451, + 0.38342621200700266, + 0.438531088819206, + 0.515587963010385, + 0.32695685320343026, + 0.5585212227871539, + 0.6052757151735452, + 0.5496641665985119, + 0.5069727632398621, + 0.5296129345037791, + 0.5996831098341185, + 0.4823400590757597, + 0.44537321262358665, + 0.46411654667438745, + 0.5707572616456332, + 0.5388152812937068 + ] + } + } + } + ], + "Amplifier": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7791843 + }, + "data": { + "type": "ndarray", + "content": [ + 0.7867965622316418, + 0.2944690951701668, + 0.6217885613132417, + 0.19181257588713677, + 0.4441880131751828, + 0.9236195626153395, + 0.6591513461545676, + 0.2610645648553857, + 0.21090667013004238, + 0.4206178226898203, + 0.5220292131703821, + 1.0039528919248435, + 0.03962971011029399, + 1.0445686230135514, + 0.03243597174126238, + 0.2450564136315156, + 0.3314336010974114, + 0.00311911015655606, + 1.022226095903913, + 1.1637648170957282, + 0.06966836668004629, + 0.8038283748433855, + 0.48806314091822833, + 0.6010566776291577, + 0.7812766059684811, + 0.631212804888037, + 0.06984619047279468, + 0.3012805643412633, + 0.6171560192041464, + 0.8183869230556443, + 0.017048250187390444, + 0.30420115289707306, + 0.051410444589873185, + 0.5354734525827248, + 0.31806289102436297, + 0.8067989283695666, + 0.47926640984070656, + 1.034355842125948, + 0.681003153480963, + 0.5294071873487055, + 1.076523494403658, + 0.16157285233471733, + 0.39094627902452467, + 0.1114738093779294, + 0.9703331433752653, + 0.2865777116883934, + 0.8619156584835853, + 0.7766182324794452, + 0.14709613450973325, + 0.22326836458290716, + 0.8807218415126187, + 0.40358535306715654, + 1.0236033123509827, + 0.48281698714380783, + 0.0004707446766783363, + 0.8297359029422232, + 1.036071566742531, + 0.5414258328775033, + 0.5787425389745308, + 1.1693301430674283, + 0.21785791101069032, + 1.170371603076593, + 0.3261364482703189, + 0.12638813964541176, + 0.9393226217997521, + 0.8825211612333059, + 0.9517122958633015, + 1.1120154189762954, + 0.8447883833839713, + 0.13914489686398096, + 0.9360657643374276, + 0.7077148903765025, + 0.32143001012764044, + 0.37154919620231974, + 0.26105391286924323, + 0.4491997238330288, + 0.010728834830787237, + 0.7121670006816283, + 0.029591622280117445, + 0.007499650861221285, + 0.7127230023261887, + 1.0839258553090234, + 0.6993921179698958, + 0.49761646258628134, + 0.984102352729517, + 0.8430476067533387, + 0.4275628979722584, + 0.7159762100268545, + 0.33342969923370464, + 0.6223438546697133, + 1.058541845610797, + 0.030123377969919794, + 1.0332960131749906, + 1.041793343992565, + 0.5797628511379451, + 0.06191910327971288, + 0.7649924119373708, + 0.3087152127442952, + 0.49805541889845684, + 0.48466801099347423 + ] + } + } + } + ], + "Attenuator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7761025 + }, + "data": { + "type": "ndarray", + "content": [ + 1.1659534387549473, + 0.43637360736158304, + 0.9214281633175516, + 0.28424696190551973, + 0.6582419983462712, + 1.3687113757566782, + 0.9767960558050885, + 0.3868714503109195, + 0.3125424907767712, + 0.6233133446539249, + 0.7735948343497585, + 1.4877572969658177, + 0.05872724792912261, + 1.54794572889809, + 0.048066850576742697, + 0.3631489788824215, + 0.49115129052215034, + 0.004622207807539101, + 1.5148363489586678, + 1.7245825103075385, + 0.10324151833180674, + 1.191192873490868, + 0.7232605285781757, + 0.8907055950786112, + 1.1577734182823687, + 0.9353939452377601, + 0.10350503532285629, + 0.44646752017747954, + 0.9145632014435997, + 1.2127671789291337, + 0.02526379357119053, + 0.4507955389224727, + 0.07618511256259868, + 0.7935178461252609, + 0.47133724183839865, + 1.19559493530089, + 0.7102246571191703, + 1.5328114139217033, + 1.0091782383389774, + 0.7845282506573513, + 1.5952996371009802, + 0.23943473044007263, + 0.5793430986838699, + 0.16519298331281304, + 1.437936208118817, + 0.42467937005961726, + 1.277272390559583, + 1.1508701768991823, + 0.21798168941247872, + 0.3308612797090394, + 1.3051412639445443, + 0.5980729362938445, + 1.5168772453344939, + 0.7154862558790507, + 0.0006975963049354294, + 1.2295852266435081, + 1.5353539453875087, + 0.8023386755576991, + 0.8576382839767142, + 1.7328297641288963, + 0.32284353122033543, + 1.73437310320446, + 0.4833014423519436, + 0.18729452200379984, + 1.3919817314418979, + 1.3078076749540237, + 1.4103419833454838, + 1.6478951026761268, + 1.25189149000982, + 0.20619875425433903, + 1.3871553959696619, + 1.0487623481120731, + 0.4763269739821545, + 0.5505985711859143, + 0.3868556651378918, + 0.6656688499062046, + 0.015899055061081146, + 1.0553599281844293, + 0.04385181050865158, + 0.011113728924158739, + 1.0561838667481538, + 1.6062691920873873, + 1.0364288357744824, + 0.7374175912613233, + 1.4583407926914678, + 1.2493118339767106, + 0.6336052483005331, + 1.0610047936403824, + 0.4941093073689731, + 0.9222510522695058, + 1.5686526405952337, + 0.04463981900394038, + 1.531240853920328, + 1.5438330442813126, + 0.8591502840700574, + 0.0917578887086558, + 1.1336418791535336, + 0.4574849219908048, + 0.7380680804045303, + 0.7182292872118445 + ] + } + } + } + ] + } + }, + "NoiseGenerator": { + "upstreams": {} + }, + "RollingAverager": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ] + } + }, + "Amplifier": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ] + } + }, + "Attenuator": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ] + } + } + } + }, + "produced_timestamp_s": 1649542864.9706066, + "timestamp_s": 1649542864.9706097 + } + ], + "batch_num": 53 + } + } +} + ``` + +
+ +3. To stop receiving data from the server, send the following `EndStreamRequest` to **ws://127.0.0.1:9000** + +```python { "api_version": "0.1", "api_request": { @@ -254,7 +1264,7 @@ https://github.com/facebookresearch/labgraph/blob/main/extensions/graphviz_suppo #### Labgraph YAML Parser: -**yamlify(python_file: str, yaml_file: str) -> str** : This function can be used to generate a YAMLIFIED version of the passed LabGraph source code(.py). The serialized version will be saved as a YAML file at the specified folder(yml_file) +**`yamlify(python_file: str, yaml_file: str) -> str`** : This function can be used to generate a YAMLIFIED version of the passed LabGraph source code (.py). The serialized version will be saved as a YAML file at the specified folder (yml_file) 1. Call **yamlify(python_file: str, yaml_file: str) -> str** and pass the appropriate parameters diff --git a/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py index 8c04504c..45d6b5ff 100644 --- a/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py +++ b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py @@ -15,7 +15,7 @@ from extensions.yaml_support.labgraph_monitor.server.enums.enums import ENUMS from extensions.yaml_support.labgraph_monitor.aliases.aliases import SerializedGraph from extensions.yaml_support.labgraph_monitor.server.serializer_node import SerializerConfig, Serializer -from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import generate_graph_topology +from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import set_graph_topology # Graph Components from extensions.graphviz_support.graphviz_support.tests.demo_graph.noise_generator import NoiseGeneratorConfig, NoiseGenerator @@ -124,7 +124,7 @@ def process_modules(self) -> Tuple[lg.Module, ...]: if __name__ == "__main__": graph = Demo() - generate_graph_topology(graph=graph) + set_graph_topology(graph=graph) runner = lg.ParallelRunner(graph=graph) runner.run() \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 1f07c0f8..9c03738d 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -139,10 +139,7 @@ def connect_to_upstream( return nodes -def serialize_graph( - name: str, - nodes: List[LabgraphMonitorNode] -) -> SerializedGraph: +def serialize_graph(graph: lg.Graph) -> SerializedGraph: """ A function that returns a serialized version of the graph topology. @@ -152,8 +149,17 @@ def serialize_graph( @return: A serialized version of the graph topology """ + # List of graph nodes + nodes: List[LabgraphMonitorNode] = [] + + # Identify graph nodes + nodes = identify_graph_nodes(graph) + + # Connect graph edges + nodes = connect_to_upstream(nodes, graph.__streams__.values()) + serialized_graph: SerializedGraph = { - "name": name, + "name": type(graph).__name__, "nodes": {} } @@ -186,7 +192,8 @@ def serialize_graph( def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: """ A function that matches subscribers with their publishers - + to automate assigning real-time messages to serialized graph + @params: graph: An instance of the computational graph @@ -222,29 +229,33 @@ def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: return sub_pub_grouping_map -def generate_graph_topology(graph: lg.Graph) -> None: +def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: """ - A function that serialize the graph topology - and send it using to LabGraphMonitor Front-End - using Labgraph Websocket API + A function that serializes the graph topology + and sends it to LabGraph Monitor Front-End + using WebSockets API @params: graph: An instance of the computational graph + + @return: Serialized topology of the graph """ - # Local variables - nodes: List[LabgraphMonitorNode] = [] + serialized_graph = serialize_graph(graph) - # Identify graph nodes - nodes = identify_graph_nodes(graph) - - # Connect graph edges - nodes = connect_to_upstream(nodes, graph.__streams__.values()) + return serialized_graph +def set_graph_topology(graph: lg.Graph) -> None: + """ + A function that serializes the graph topology + and applies the information to serve as graph + attribute for LabGraph Monitor Front-End + real-time messaging using WebSockets API + + @params: + graph: An instance of the computational graph + """ # Serialize the graph topology - serialized_graph = serialize_graph( - type(graph).__name__, - nodes - ) + serialized_graph = serialize_graph(graph) # Match subscribers with their publishers sub_pub_map = sub_pub_grouping_map(graph) diff --git a/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py b/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py index 94870d6f..30407800 100644 --- a/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py +++ b/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py @@ -21,7 +21,7 @@ def setUp(self) -> None: def test_identify_upstream_message(self) -> None: upstream_message = identify_upstream_message( - 'ROLLING_AVERAGER/INPUT', + 'ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT', self.graph.__topics__ ) @@ -38,47 +38,47 @@ def test_out_edge_node_mapper(self) -> None: self.assertEqual(4, len(out_edge_node_map)) self.assertEqual( 'generate_noise', - out_edge_node_map['NOISE_GENERATOR/OUTPUT'].name + out_edge_node_map['NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT'].name ) self.assertEqual( 'average', - out_edge_node_map['ROLLING_AVERAGER/OUTPUT'].name + out_edge_node_map['ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT'].name ) self.assertEqual( 'amplify', - out_edge_node_map['AMPLIFIER/OUTPUT'].name + out_edge_node_map['AMPLIFIER/AMPLIFIER_OUTPUT'].name ) self.assertEqual( 'attenuate', - out_edge_node_map['ATTENUATOR/OUTPUT'].name + out_edge_node_map['ATTENUATOR/ATTENUATOR_OUTPUT'].name ) def test_in_out_edge_mapper(self) -> None: in_out_edge_map = in_out_edge_mapper(self.graph.__streams__.values()) self.assertEqual(6, len(in_out_edge_map)) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ROLLING_AVERAGER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['AMPLIFIER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['AMPLIFIER/AMPLIFIER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ATTENUATOR/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ATTENUATOR/ATTENUATOR_INPUT'] ) self.assertEqual( - 'ROLLING_AVERAGER/OUTPUT', - in_out_edge_map['SINK/INPUT_1'] + 'ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_1'] ) self.assertEqual( - 'AMPLIFIER/OUTPUT', - in_out_edge_map['SINK/INPUT_2'] + 'AMPLIFIER/AMPLIFIER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_2'] ) self.assertEqual( - 'ATTENUATOR/OUTPUT', - in_out_edge_map['SINK/INPUT_3'] + 'ATTENUATOR/ATTENUATOR_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_3'] ) def test_connect_to_upstream(self) -> None: @@ -89,9 +89,7 @@ def test_connect_to_upstream(self) -> None: self.assertEqual(expected_node_count, len(nodes)) def test_serialize_graph(self) -> None: - nodes = identify_graph_nodes(self.graph) - nodes = connect_to_upstream(nodes, self.graph.__streams__.values()) - serialized_graph = serialize_graph("Demo", nodes) + serialized_graph = serialize_graph(self.graph) self.assertEqual('Demo', serialized_graph["name"]) self.assertEqual(5, len(serialized_graph["nodes"])) From c33de90f3618ae9e3c915fb1f0d2db4c259df961 Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Thu, 14 Apr 2022 10:33:45 -0500 Subject: [PATCH 10/12] built a LabgraphMonitor facade for entry point fixed a mistake in the serializer node, checking whether the graph is in real-time messaging mode or not --- .../labgraph_monitor/labgraph_monitor.py | 29 +++++++++++++++++++ .../server/serializer_node.py | 8 +++-- 2 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 extensions/yaml_support/labgraph_monitor/labgraph_monitor.py diff --git a/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py b/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py new file mode 100644 index 00000000..6bc43cd8 --- /dev/null +++ b/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py @@ -0,0 +1,29 @@ +import labgraph as lg +from .generate_lg_monitor.generate_lg_monitor import generate_graph_topology, set_graph_topology +from .server.lg_monitor_server import run_topology + +class LabgraphMonitor: + """ + A class that serves as a facade for + LabGraph Monitor's Back-End functions + """ + + def __init__(self, graph: lg.Graph) -> None: + self.graph = graph + + def stream_graph_topology(self): + """ + Stream graph topology via WebSockets + """ + topology = generate_graph_topology(self.graph) + run_topology(topology) + + def stream_real_time_graph(self): + """ + Stream graph topology and real-time + messages via WebSockets + """ + set_graph_topology(self.graph) + + runner = lg.ParallelRunner(self.graph) + runner.run() \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 0dc787dc..235ae604 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -104,12 +104,14 @@ def output(self, _in: Dict) -> Dict: async def source(self) -> lg.AsyncPublisher: await asyncio.sleep(.1) while True: - output_data = dict() - if hasattr(self.config, "data"): - # Populate Serialized Graph with real-time data + output_data = self.config.data + # check if the monitor is in real-time mode + if self.config.sub_pub_match: + # populate Serialized Graph with real-time data output_data = { key: self.output(value) for key, value in self.config.data.items() if key == "nodes" } + # otherwise, only send the topology yield self.SERIALIZER_OUTPUT, WSStreamMessage( samples=output_data, stream_name=self.config.stream_name, From 6151edf6952c0daa56165ad35cab16aa7a5454fb Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Thu, 14 Apr 2022 12:44:02 -0500 Subject: [PATCH 11/12] added Facebook Rights Reserved fixed WebSockets "not in the supported list" warning with @bennaaym added return types for LabgraphMonitor facade methods --- .../labgraph_monitor/examples/labgraph_monitor_example.py | 3 +++ .../generate_lg_monitor/generate_lg_monitor.py | 2 +- .../yaml_support/labgraph_monitor/labgraph_monitor.py | 7 +++++-- .../yaml_support/labgraph_monitor/server/enums/enums.py | 6 ++++++ .../labgraph_monitor/server/lg_monitor_server.py | 2 +- .../labgraph_monitor/server/serializer_node.py | 2 +- 6 files changed, 17 insertions(+), 5 deletions(-) diff --git a/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py index 45d6b5ff..59bcc724 100644 --- a/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py +++ b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + import labgraph as lg from typing import Dict, Tuple diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 9c03738d..7f50c183 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserve +# Copyright 2004-present Facebook. All Rights Reserved. import labgraph as lg from labgraph.graphs.stream import Stream diff --git a/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py b/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py index 6bc43cd8..2c607892 100644 --- a/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved, + import labgraph as lg from .generate_lg_monitor.generate_lg_monitor import generate_graph_topology, set_graph_topology from .server.lg_monitor_server import run_topology @@ -11,14 +14,14 @@ class LabgraphMonitor: def __init__(self, graph: lg.Graph) -> None: self.graph = graph - def stream_graph_topology(self): + def stream_graph_topology(self) -> None: """ Stream graph topology via WebSockets """ topology = generate_graph_topology(self.graph) run_topology(topology) - def stream_real_time_graph(self): + def stream_real_time_graph(self) -> None: """ Stream graph topology and real-time messages via WebSockets diff --git a/extensions/yaml_support/labgraph_monitor/server/enums/enums.py b/extensions/yaml_support/labgraph_monitor/server/enums/enums.py index 2a39b096..83275601 100644 --- a/extensions/yaml_support/labgraph_monitor/server/enums/enums.py +++ b/extensions/yaml_support/labgraph_monitor/server/enums/enums.py @@ -2,9 +2,15 @@ # Copyright 2004-present Facebook. All Rights Reserved. from .enums_base import ENUMS as ENUMS_base +class STREAM_LISTS: + supported_stream_list = [ + "labgraph.monitor", + ] + sleep_pause_streams = supported_stream_list class ENUMS: API = ENUMS_base.API WS_SERVER = ENUMS_base.WS_SERVER STREAM = ENUMS_base.STREAM + STREAM_LISTS = STREAM_LISTS \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py index b83ed9dd..1f5053b6 100644 --- a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py +++ b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserve +# Copyright 2004-present Facebook. All Rights Reserved. import labgraph as lg from typing import Tuple diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 235ae604..5395bd3e 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserve +# Copyright 2004-present Facebook. All Rights Reserved. from dataclasses import field from typing import Dict, Optional From 4751118039e1bea0ea0468cc27ef44755d2e224f Mon Sep 17 00:00:00 2001 From: Damir Temir Date: Mon, 18 Apr 2022 17:44:35 -0500 Subject: [PATCH 12/12] updated labgraph monitor readme --- extensions/yaml_support/README.md | 132 ++++++++++++++++++++++++------ 1 file changed, 108 insertions(+), 24 deletions(-) diff --git a/extensions/yaml_support/README.md b/extensions/yaml_support/README.md index 0ef179f5..1aae31ba 100644 --- a/extensions/yaml_support/README.md +++ b/extensions/yaml_support/README.md @@ -26,7 +26,7 @@ To make sure things are working: 1- Move to the root of the LabGraph directory: -``` +```bash labgraph\extensions\yaml_support> cd ../.. labgraph> ``` @@ -43,32 +43,28 @@ python -m extensions.yaml_support.labgraph_yaml_parser.tests.test_lg_yaml_api #### Stream Graph Topology Only: - - **`generate_graph_topology(graph: lg.Graph) -> SerializedGraph`** : This function can be used to generate a serialized version of the graph - - **`run_topology(data: SerializedGraph) -> None`**: This function can be used to send the generated serialized version of the graph instance via LabGraph WebSockets API + - **`LabgraphMonitor(graph: lg.Graph)`** : This class serves as a facade for monitor's functions, + such as sending either topology or topology + real-time messages + - **`stream_graph_topology() -> None`**: This function is used to send the generated serialized version of the graph instance via LabGraph WebSockets API The serialized version of the graph will be streamed to the clients using LabGraph's [WebSockets API](https://github.com/facebookresearch/labgraph/blob/main/docs/websockets-api.md). -1. Call **`generate_graph_topology(graph: lg.Graph) -> None`** to serialize your graph and stream via WebSockets API using **`run_topology(data: SerializedGraph) -> None`** +1. Instantiate a monitor object with **`LabgraphMonitor(graph: lg.Graph)`** to serialize your graph and stream via WebSockets API using its method **`stream_graph_topology() -> None`** ```python from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo -from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import ( - generate_graph_topology, -) +from extensions.yaml_support.labgraph_monitor.labgraph_monitor import LabgraphMonitor -from extensions.yaml_support.labgraph_monitor.server.lg_monitor_server import ( - run_topology, -) # Initialize a Demo graph graph = Demo() -# Serialize its topology -topology = generate_graph_topology(graph) +# Initialize a monitor object +monitor = LabgraphMonitor(graph) # Run the WebSockets API to send the topology to Front-End -run_topology(topology) +monitor.stream_graph_topology() ``` This will start a websocket server on localhost port 9000 (127.0.0.1:9000) @@ -303,18 +299,21 @@ This is used to stream both the graph topology and real-time messages received b 2. You can then run your graph ```python - from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import ( - set_graph_topology, - ) + from extensions.yaml_support.labgraph_monitor.examples.labgraph_monitor_example import Demo + + from extensions.yaml_support.labgraph_monitor.labgraph_monitor import LabgraphMonitor + # Initialize a Demo graph graph = Demo() - set_graph_topology(graph=graph) - runner = lg.ParallelRunner(graph=graph) - runner.run() + # Initialize a monitor object + monitor = LabgraphMonitor(graph) + + # Run the WebSockets API to send the real-time topology to Front-End + monitor.stream_real_time_graph() ``` -3. For now, you can simply run the example graph LabGraph's root directory to try it put +3. Alternatively, you can simply run the example graph to try it out ```bash labgraph> python extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py @@ -350,11 +349,11 @@ The graph representation has the following schema: fields: { "timestamp": { "type": "float", - "content": "real-time value" + "content": real-time value }, "data": { "type": "ndarray", - "content" "real-time value" + "content" real-time value } } } @@ -1262,13 +1261,13 @@ The graph representation has the following schema: } ``` -#### Labgraph YAML Parser: +# LabGraph YAML Parser: **`yamlify(python_file: str, yaml_file: str) -> str`** : This function can be used to generate a YAMLIFIED version of the passed LabGraph source code (.py). The serialized version will be saved as a YAML file at the specified folder (yml_file) 1. Call **yamlify(python_file: str, yaml_file: str) -> str** and pass the appropriate parameters -``` +```python from extensions.yaml_support.labgraph_yaml_parser.yamlify import ( yamlify ) @@ -1277,3 +1276,88 @@ yamlify(python_file, output_yaml_file) ``` This will generate a YAML file in the specified location + +Example of a YAML file produced for [`simple_viz.py`](https://github.com/facebookresearch/labgraph/blob/main/labgraph/examples/simple_viz.py) example: + +```python +RandomMessage: + type: Message + fields: + timestamp: float + data: np.ndarray + +NoiseGeneratorConfig: + type: Config + fields: + sample_rate: float + num_features: int + +NoiseGenerator: + type: Node + config: NoiseGeneratorConfig + inputs: [] + outputs: + - RandomMessage + +RollingState: + type: State + fields: + messages: List.RandomMessage + +RollingConfig: + type: Config + fields: + window: float + +RollingAverager: + type: Node + state: RollingState + config: RollingConfig + inputs: + - RandomMessage + outputs: + - RandomMessage + +AveragedNoiseConfig: + type: Config + fields: + sample_rate: float + num_features: int + window: float + +AveragedNoise: + type: Group + config: AveragedNoiseConfig + inputs: [] + outputs: + - RandomMessage + connections: + NoiseGenerator: RollingAverager + RollingAverager: AveragedNoise + +PlotState: + type: State + fields: + data: Optional.np.ndarray + +PlotConfig: + type: Config + fields: + refresh_rate: float + num_bars: int + +Plot: + type: Node + state: PlotState + config: PlotConfig + inputs: + - RandomMessage + outputs: [] + +Demo: + type: Graph + inputs: [] + outputs: [] + connections: + AveragedNoise: Plot +```