diff --git a/extensions/graphviz_support/README.md b/extensions/graphviz_support/README.md index 9bacfeae..c79d81dd 100644 --- a/extensions/graphviz_support/README.md +++ b/extensions/graphviz_support/README.md @@ -1,6 +1,6 @@ # Graphviz for LabGraph graphs -This extension provides an API to generate a graphviz visualization of the LabGraph topology. +This extension provides an API to generate a [graphviz](https://graphviz.org/) visualization of the LabGraph topology. ## Quick Start @@ -23,22 +23,33 @@ python setup.py install To make sure things are working: -1- Move to the root of the LabGraph directory: -``` -labgraph\extensions\graphviz_support> cd ../.. +1. Move to the root of the LabGraph directory: + +```bash +labgraph/extensions/graphviz_support> cd ../.. labgraph> ``` -2- Run the following test -``` +2. Run the following test + +```bash python -m extensions.graphviz_support.graphviz_support.tests.test_lg_graphviz_api ``` -**The output of the file for this test can be found at:**\ -extensions\graphviz_support\graphviz_support\tests\output + +**The output of the file for this test can be found at:** + +`extensions/graphviz_support/graphviz_support/tests/output/test.svg` + ### Generating a graphviz file To generate a graph visualization just call 'generate_graphviz' function and pass the appropriate parameters -``` -from extensions/graphviz_support/graphviz_support/generate_graphviz/generate_graphviz.py import generate_graphviz.py -generate_graphviz(graph, output_file) +```python +from extensions.graphviz_support.graphviz_support.generate_graphviz.generate_graphviz import generate_graphviz +from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo + +generate_graphviz(Demo(), "output.png") # you can also use "output.svg" ``` + +**It will produce the following diagram named `output.png` in your current directory:** + +![Graphviz diagram for nodes: RollingAverager, NoiseGenerator, Amplifier, Sink, and Attenuator](https://i.imgur.com/M4yL39x.png) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py index 1ac52cd6..9d5a1067 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/amplifier.py @@ -12,20 +12,20 @@ class AmplifierConfig(lg.Config): class Amplifier(lg.Node): - INPUT = lg.Topic(RandomMessage) - OUTPUT = lg.Topic(RandomMessage) + AMPLIFIER_INPUT = lg.Topic(RandomMessage) + AMPLIFIER_OUTPUT = lg.Topic(RandomMessage) config: AmplifierConfig def output(self, _in: float) -> float: return self.config.out_in_ratio * _in - @lg.subscriber(INPUT) - @lg.publisher(OUTPUT) + @lg.subscriber(AMPLIFIER_INPUT) + @lg.publisher(AMPLIFIER_OUTPUT) async def amplify(self, message: RandomMessage) -> lg.AsyncPublisher: current_time = time.time() output_data = np.array( [self.output(_in) for _in in message.data] ) - yield self.OUTPUT, RandomMessage( + yield self.AMPLIFIER_OUTPUT, RandomMessage( timestamp=current_time, data=output_data ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py index 55ecdb02..d90a5c6d 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/attenuator.py @@ -12,20 +12,20 @@ class AttenuatorConfig(lg.Config): class Attenuator(lg.Node): - INPUT = lg.Topic(RandomMessage) - OUTPUT = lg.Topic(RandomMessage) + ATTENUATOR_INPUT = lg.Topic(RandomMessage) + ATTENUATOR_OUTPUT = lg.Topic(RandomMessage) config: AttenuatorConfig def output(self, _in: float) -> float: return pow(10, (self.config.attenuation / 20)) * _in - @lg.subscriber(INPUT) - @lg.publisher(OUTPUT) + @lg.subscriber(ATTENUATOR_INPUT) + @lg.publisher(ATTENUATOR_OUTPUT) async def attenuate(self, message: RandomMessage) -> lg.AsyncPublisher: current_time = time.time() output_data = np.array( [self.output(_in) for _in in message.data] ) - yield self.OUTPUT, RandomMessage( + yield self.ATTENUATOR_OUTPUT, RandomMessage( timestamp=current_time, data=output_data ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py index 27237c94..3dadf96c 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py @@ -47,12 +47,12 @@ def setup(self) -> None: def connections(self) -> lg.Connections: return ( - (self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT), - (self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT), - (self.ROLLING_AVERAGER.OUTPUT, self.SINK.INPUT_1), - (self.AMPLIFIER.OUTPUT, self.SINK.INPUT_2), - (self.ATTENUATOR.OUTPUT, self.SINK.INPUT_3), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ROLLING_AVERAGER.ROLLING_AVERAGER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.AMPLIFIER.AMPLIFIER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ATTENUATOR.ATTENUATOR_INPUT), + (self.ROLLING_AVERAGER.ROLLING_AVERAGER_OUTPUT, self.SINK.SINK_INPUT_1), + (self.AMPLIFIER.AMPLIFIER_OUTPUT, self.SINK.SINK_INPUT_2), + (self.ATTENUATOR.ATTENUATOR_OUTPUT, self.SINK.SINK_INPUT_3), ) def process_modules(self) -> Tuple[lg.Module, ...]: diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py index f42aef9f..31f04e05 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/noise_generator.py @@ -14,13 +14,13 @@ class NoiseGeneratorConfig(lg.Config): class NoiseGenerator(lg.Node): - OUTPUT = lg.Topic(RandomMessage) + NOISE_GENERATOR_OUTPUT = lg.Topic(RandomMessage) config: NoiseGeneratorConfig - @lg.publisher(OUTPUT) + @lg.publisher(NOISE_GENERATOR_OUTPUT) async def generate_noise(self) -> lg.AsyncPublisher: while True: - yield self.OUTPUT, RandomMessage( + yield self.NOISE_GENERATOR_OUTPUT, RandomMessage( timestamp=time.time(), data=np.random.rand(self.config.num_features) ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py index 90bc7857..c4d51a46 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/rolling_averager.py @@ -18,14 +18,14 @@ class RollingConfig(lg.Config): class RollingAverager(lg.Node): - INPUT = lg.Topic(RandomMessage) - OUTPUT = lg.Topic(RandomMessage) + ROLLING_AVERAGER_INPUT = lg.Topic(RandomMessage) + ROLLING_AVERAGER_OUTPUT = lg.Topic(RandomMessage) state: RollingState config: RollingConfig - @lg.subscriber(INPUT) - @lg.publisher(OUTPUT) + @lg.subscriber(ROLLING_AVERAGER_INPUT) + @lg.publisher(ROLLING_AVERAGER_OUTPUT) async def average(self, message: RandomMessage) -> lg.AsyncPublisher: current_time = time.time() self.state.messages.append(message) @@ -40,7 +40,7 @@ async def average(self, message: RandomMessage) -> lg.AsyncPublisher: [message.data for message in self.state.messages] ) mean_data = np.mean(all_data, axis=0) - yield self.OUTPUT, RandomMessage( + yield self.ROLLING_AVERAGER_OUTPUT, RandomMessage( timestamp=current_time, data=mean_data ) diff --git a/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py b/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py index 59c340b6..3736482e 100644 --- a/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py +++ b/extensions/graphviz_support/graphviz_support/tests/demo_graph/sink.py @@ -14,20 +14,20 @@ class SinkState(lg.State): class Sink(lg.Node): - INPUT_1 = lg.Topic(RandomMessage) - INPUT_2 = lg.Topic(RandomMessage) - INPUT_3 = lg.Topic(RandomMessage) + SINK_INPUT_1 = lg.Topic(RandomMessage) + SINK_INPUT_2 = lg.Topic(RandomMessage) + SINK_INPUT_3 = lg.Topic(RandomMessage) state: SinkState - @lg.subscriber(INPUT_1) + @lg.subscriber(SINK_INPUT_1) def got_message(self, message: RandomMessage) -> None: self.state.data_1 = message.data - @lg.subscriber(INPUT_2) + @lg.subscriber(SINK_INPUT_2) def got_message_2(self, message: RandomMessage) -> None: self.state.data_2 = message.data - @lg.subscriber(INPUT_3) + @lg.subscriber(SINK_INPUT_3) def got_message_3(self, message: RandomMessage) -> None: self.state.data_3 = message.data diff --git a/extensions/graphviz_support/graphviz_support/tests/output/test b/extensions/graphviz_support/graphviz_support/tests/output/test new file mode 100644 index 00000000..b7eb1c3b --- /dev/null +++ b/extensions/graphviz_support/graphviz_support/tests/output/test @@ -0,0 +1,17 @@ +digraph Demo { + center=true rankdir=LR + node [fontsize=12 height=1.5 shape=circle width=1.5] + NoiseGenerator + RollingAverager + Amplifier + Attenuator + Sink + Sink + Sink + NoiseGenerator -> RollingAverager + NoiseGenerator -> Amplifier + NoiseGenerator -> Attenuator + RollingAverager -> Sink + Amplifier -> Sink + Attenuator -> Sink +} diff --git a/extensions/graphviz_support/graphviz_support/tests/output/test.svg b/extensions/graphviz_support/graphviz_support/tests/output/test.svg new file mode 100644 index 00000000..daf81fcc --- /dev/null +++ b/extensions/graphviz_support/graphviz_support/tests/output/test.svg @@ -0,0 +1,79 @@ + + + + + + +Demo + + + +NoiseGenerator + +NoiseGenerator + + + +RollingAverager + +RollingAverager + + + +NoiseGenerator->RollingAverager + + + + + +Amplifier + +Amplifier + + + +NoiseGenerator->Amplifier + + + + + +Attenuator + +Attenuator + + + +NoiseGenerator->Attenuator + + + + + +Sink + +Sink + + + +RollingAverager->Sink + + + + + +Amplifier->Sink + + + + + +Attenuator->Sink + + + + + diff --git a/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py b/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py index ea63d439..2711b564 100644 --- a/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py +++ b/extensions/graphviz_support/graphviz_support/tests/test_lg_graphviz_api.py @@ -33,47 +33,47 @@ def test_out_edge_node_mapper(self) -> None: self.assertEqual(4, len(out_edge_node_map)) self.assertEqual( 'generate_noise', - out_edge_node_map['NOISE_GENERATOR/OUTPUT'].name + out_edge_node_map['NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT'].name ) self.assertEqual( 'average', - out_edge_node_map['ROLLING_AVERAGER/OUTPUT'].name + out_edge_node_map['ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT'].name ) self.assertEqual( 'amplify', - out_edge_node_map['AMPLIFIER/OUTPUT'].name + out_edge_node_map['AMPLIFIER/AMPLIFIER_OUTPUT'].name ) self.assertEqual( 'attenuate', - out_edge_node_map['ATTENUATOR/OUTPUT'].name + out_edge_node_map['ATTENUATOR/ATTENUATOR_OUTPUT'].name ) def test_in_out_edge_mapper(self) -> None: in_out_edge_map = in_out_edge_mapper(self.graph.__streams__.values()) self.assertEqual(6, len(in_out_edge_map)) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ROLLING_AVERAGER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['AMPLIFIER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['AMPLIFIER/AMPLIFIER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ATTENUATOR/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ATTENUATOR/ATTENUATOR_INPUT'] ) self.assertEqual( - 'ROLLING_AVERAGER/OUTPUT', - in_out_edge_map['SINK/INPUT_1'] + 'ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_1'] ) self.assertEqual( - 'AMPLIFIER/OUTPUT', - in_out_edge_map['SINK/INPUT_2'] + 'AMPLIFIER/AMPLIFIER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_2'] ) self.assertEqual( - 'ATTENUATOR/OUTPUT', - in_out_edge_map['SINK/INPUT_3'] + 'ATTENUATOR/ATTENUATOR_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_3'] ) def test_connect_to_upstream(self) -> None: diff --git a/extensions/yaml_support/README.md b/extensions/yaml_support/README.md index 0eeb1b5d..1aae31ba 100644 --- a/extensions/yaml_support/README.md +++ b/extensions/yaml_support/README.md @@ -1,20 +1,21 @@ -# YAML Support for LabGraph +# YAML Support | LabGraph Monitor -This extension provides an API to generate a serialized version of the labgraph topology. The serialized graph topology can be used in different applications E.g: server-client communication or to get a simplified overview of the topology in case of complicated graphs. +**This extension provides an API to generate a serialized version of the graph topology.** + +The serialized graph topology can be used in different applications. But it was specifically designed to work with server-client communications, such as working in sync with [LabGraph Monitor Front-End](https://github.com/facebookresearch/labgraph/tree/main/extensions/prototypes/labgraph_monitor) to get a simplified overview of the topology, along with real-time messaging, in case of complicated graphs. -## Quick Start -### Method 1 - building from source code +## Quick Start -**Prerequisites**: +### Prerequisites: - Python3\ Supported python version(s) - _ [Python3.6](https://www.python.org/downloads/) - _ [Python3.8](https://www.python.org/downloads/) (**RECOMMENDED**) -- Make sure to install [labgraph](https://github.com/facebookresearch/labgraph) before proceeding + - [Python3.6](https://www.python.org/downloads/) + - [Python3.8](https://www.python.org/downloads/) (**RECOMMENDED**) +- [LabGraph](https://github.com/facebookresearch/labgraph) -``` +```bash cd labgraph/extensions/yaml_support python setup.py install ``` @@ -25,43 +26,52 @@ To make sure things are working: 1- Move to the root of the LabGraph directory: -``` +```bash labgraph\extensions\yaml_support> cd ../.. labgraph> ``` 2- Run the following tests -``` +```bash python -m extensions.yaml_support.labgraph_monitor.tests.test_lg_monitor_api -``` -``` python -m extensions.yaml_support.labgraph_yaml_parser.tests.test_lg_yaml_api ``` -### API +### LabGraph Monitor API: -#### Labgraph Monitor: +#### Stream Graph Topology Only: -**generate_labgraph_monitor(graph: lg.Graph) -> None** : This function can be used to generate a serialized version of the passed graph instance. The serialized version of the graph will be streamed -to the clients using LabGraph Websockets API. + - **`LabgraphMonitor(graph: lg.Graph)`** : This class serves as a facade for monitor's functions, + such as sending either topology or topology + real-time messages + - **`stream_graph_topology() -> None`**: This function is used to send the generated serialized version of the graph instance via LabGraph WebSockets API -1. Call **generate_labgraph_monitor(graph: lg.Graph) -> None** and pass an instance of the graph as a parameter +The serialized version of the graph will be streamed to the clients using LabGraph's [WebSockets API](https://github.com/facebookresearch/labgraph/blob/main/docs/websockets-api.md). -``` -from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import ( - generate_labgraph_monitor -) +1. Instantiate a monitor object with **`LabgraphMonitor(graph: lg.Graph)`** to serialize your graph and stream via WebSockets API using its method **`stream_graph_topology() -> None`** + +```python +from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo + +from extensions.yaml_support.labgraph_monitor.labgraph_monitor import LabgraphMonitor -generate_labgraph_monitor(graph) + +# Initialize a Demo graph +graph = Demo() + +# Initialize a monitor object +monitor = LabgraphMonitor(graph) + +# Run the WebSockets API to send the topology to Front-End +monitor.stream_graph_topology() ``` This will start a websocket server on localhost port 9000 (127.0.0.1:9000) 2. To start receiving data from the server, send the following `StartStreamRequest` to **ws://127.0.0.1:9000** -``` +```bash { "api_version": "0.1", "api_request": { @@ -79,7 +89,7 @@ A serialized representation of the graph should be received each 200ms The graph representation has the following schema: -``` +```python { name: "graph_name", nodes: { @@ -88,7 +98,14 @@ The graph representation has the following schema: "upstream_name":[ { name: "message_name", - type: "message_type", + fields: { + "timestamp": { + "type": "float", + }, + "data": { + "type": "ndarray" + } + } } ] } @@ -97,9 +114,9 @@ The graph representation has the following schema: } ``` -E.g: +An example of a single WebSockets API message -``` +```python { "stream_batch": { "stream_id": "LABGRAPH.MONITOR", @@ -118,8 +135,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -131,8 +152,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -144,8 +169,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -157,8 +186,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ], @@ -166,8 +199,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ], @@ -175,8 +212,12 @@ E.g: { "name": "RandomMessage", "fields": { - "timestamp": "float", - "data": "ndarray" + "timestamp": { + "type": "float" + }, + "data": { + "type": "ndarray" + } } } ] @@ -184,22 +225,1029 @@ E.g: } } }, - "produced_timestamp_s": 1644931422.141309, - "timestamp_s": 1644931422.141309 + "produced_timestamp_s": 1648581339.9652574, + "timestamp_s": 1648581339.965258 } ], - "batch_num": 54 + "batch_num": 31 } } } ``` The above-serialized representation was generated for the following graph: -https://github.com/facebookresearch/labgraph/blob/main/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py +[`extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py`](https://github.com/facebookresearch/labgraph/blob/main/extensions/graphviz_support/graphviz_support/tests/demo_graph/demo.py +) 3. To stop receiving data from the server, send the following `EndStreamRequest` to **ws://127.0.0.1:9000** +```python +{ + "api_version": "0.1", + "api_request": { + "request_id": 1, + "end_stream_request": { + "stream_id": "LABGRAPH.MONITOR", + "labgraph.monitor": { + } + } + } +} +``` + +#### Stream Graph Topology AND Real-Time Messages: + +This is used to stream both the graph topology and real-time messages received by nodes. However, it requires certain modifications to your graph outlined below: + +1. Make sure that your graph is compatible with real-time messaging + + - It requires a `set_topology()` method to add attributes to your graph. This function is used by `generate_graph_topology(graph: lg.Graph) -> None` internally + + ```python + def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict[str, str]) -> None: + self._topology = topology + self._sub_pub_match = sub_pub_map + ``` + +- It also requires adding `WS_SERVER_NODE` and `SERIALIZER` nodes to your graph + + ```python + self.WS_SERVER_NODE.configure( + WSAPIServerConfig( + app_id=APP_ID, + ip=WS_SERVER.DEFAULT_IP, + port=ENUMS.WS_SERVER.DEFAULT_PORT, + api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, + num_messages=-1, + enums=ENUMS(), + sample_rate=SAMPLE_RATE, + ) + ) + self.SERIALIZER.configure( + SerializerConfig( + data=self._topology, + sub_pub_match=self._sub_pub_match, + sample_rate=SAMPLE_RATE, + stream_name=STREAM.LABGRAPH_MONITOR, + stream_id=STREAM.LABGRAPH_MONITOR_ID, + ) + ) + ``` + + - As well as establishing connections between those nodes. Learn more in the example at [`yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py`](https://github.com/facebookresearch/labgraph/blob/main/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py) + + 2. You can then run your graph + + ```python + from extensions.yaml_support.labgraph_monitor.examples.labgraph_monitor_example import Demo + + from extensions.yaml_support.labgraph_monitor.labgraph_monitor import LabgraphMonitor + + # Initialize a Demo graph + graph = Demo() + + # Initialize a monitor object + monitor = LabgraphMonitor(graph) + + # Run the WebSockets API to send the real-time topology to Front-End + monitor.stream_real_time_graph() + ``` + +3. Alternatively, you can simply run the example graph to try it out + + ```bash + labgraph> python extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py + ``` + +4. To start receiving data from the server, send the following `StartStreamRequest` to **ws://127.0.0.1:9000** + + ```bash + { + "api_version": "0.1", + "api_request": { + "request_id": 1, + "start_stream_request": { + "stream_id": "LABGRAPH.MONITOR", + "labgraph.monitor": { + } + } + } + } + ``` + +The graph representation has the following schema: + +```python +{ + name: "graph_name", + nodes: { + "node_name":{ + upstreams:{ + "upstream_name":[ + { + name: "message_name", + fields: { + "timestamp": { + "type": "float", + "content": real-time value + }, + "data": { + "type": "ndarray", + "content" real-time value + } + } + } + ] + } + } + } +} +``` + + +
+ +Click to see a full example of a serialized graph + +```python + +{ + "stream_batch": { + "stream_id": "LABGRAPH.MONITOR", + "labgraph.monitor": { + "samples": [ + { + "data": { + "nodes": { + "WSAPIServerNode": { + "upstreams": { + "Serializer": [ + { + "name": "WSStreamMessage", + "fields": { + "samples": { + "type": "float64" + }, + "stream_name": { + "type": "str" + }, + "stream_id": { + "type": "str" + } + } + } + ] + } + }, + "Serializer": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ], + "RollingAverager": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.778323 + }, + "data": { + "type": "ndarray", + "content": [ + 0.4808310849498126, + 0.45765738986698584, + 0.5179472617869983, + 0.4545298437611365, + 0.511475834694006, + 0.6370143504941551, + 0.5240116417975883, + 0.6621674003277452, + 0.39416145544465786, + 0.626802455660307, + 0.4379461617134113, + 0.626555296204266, + 0.5212543973073265, + 0.5634172170782334, + 0.6610150120658533, + 0.5648671072511644, + 0.36228861954728664, + 0.40611552677263923, + 0.528085032194146, + 0.3795449315341961, + 0.5564513831254658, + 0.6020673565819801, + 0.3920276747808733, + 0.5398939864307285, + 0.44071762995863917, + 0.4685674882997472, + 0.523320469875816, + 0.4682660890981684, + 0.4880738071953582, + 0.6247477549472256, + 0.37262642294916287, + 0.5292429080330315, + 0.3560642133686165, + 0.42654180215294774, + 0.48254525230183826, + 0.474625466796003, + 0.566613001586366, + 0.6124545612880383, + 0.521925508913315, + 0.3644889004429638, + 0.5516832937307365, + 0.5487269330238846, + 0.3776278084794831, + 0.45893729288107465, + 0.4358817754996432, + 0.5017929922223582, + 0.6077143546380854, + 0.43706695205815843, + 0.586934260076837, + 0.5603784617106875, + 0.5966846437692761, + 0.38536876311879753, + 0.5872066280396688, + 0.2818328200671419, + 0.5298252461922792, + 0.47645896345478744, + 0.6548108936817794, + 0.4247103938231261, + 0.5238473325787164, + 0.42887801140492804, + 0.5385221653828477, + 0.4554488020724207, + 0.5162366459311485, + 0.47389905275366334, + 0.4999364257892879, + 0.5837869623887693, + 0.5223744126545582, + 0.4912632015305095, + 0.45473824666589113, + 0.49088925869540045, + 0.5301869530654425, + 0.6173096417477979, + 0.4567178020980392, + 0.47735235791449043, + 0.4157221801958209, + 0.4549033636138871, + 0.46876329693604396, + 0.4322555231630121, + 0.5574453194471654, + 0.5911688810594924, + 0.5747328520895914, + 0.5189652304462694, + 0.5323873893686856, + 0.40735471004222557, + 0.5515387981684451, + 0.38342621200700266, + 0.438531088819206, + 0.515587963010385, + 0.32695685320343026, + 0.5585212227871539, + 0.6052757151735452, + 0.5496641665985119, + 0.5069727632398621, + 0.5296129345037791, + 0.5996831098341185, + 0.4823400590757597, + 0.44537321262358665, + 0.46411654667438745, + 0.5707572616456332, + 0.5388152812937068 + ] + } + } + } + ], + "Amplifier": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7791843 + }, + "data": { + "type": "ndarray", + "content": [ + 0.7867965622316418, + 0.2944690951701668, + 0.6217885613132417, + 0.19181257588713677, + 0.4441880131751828, + 0.9236195626153395, + 0.6591513461545676, + 0.2610645648553857, + 0.21090667013004238, + 0.4206178226898203, + 0.5220292131703821, + 1.0039528919248435, + 0.03962971011029399, + 1.0445686230135514, + 0.03243597174126238, + 0.2450564136315156, + 0.3314336010974114, + 0.00311911015655606, + 1.022226095903913, + 1.1637648170957282, + 0.06966836668004629, + 0.8038283748433855, + 0.48806314091822833, + 0.6010566776291577, + 0.7812766059684811, + 0.631212804888037, + 0.06984619047279468, + 0.3012805643412633, + 0.6171560192041464, + 0.8183869230556443, + 0.017048250187390444, + 0.30420115289707306, + 0.051410444589873185, + 0.5354734525827248, + 0.31806289102436297, + 0.8067989283695666, + 0.47926640984070656, + 1.034355842125948, + 0.681003153480963, + 0.5294071873487055, + 1.076523494403658, + 0.16157285233471733, + 0.39094627902452467, + 0.1114738093779294, + 0.9703331433752653, + 0.2865777116883934, + 0.8619156584835853, + 0.7766182324794452, + 0.14709613450973325, + 0.22326836458290716, + 0.8807218415126187, + 0.40358535306715654, + 1.0236033123509827, + 0.48281698714380783, + 0.0004707446766783363, + 0.8297359029422232, + 1.036071566742531, + 0.5414258328775033, + 0.5787425389745308, + 1.1693301430674283, + 0.21785791101069032, + 1.170371603076593, + 0.3261364482703189, + 0.12638813964541176, + 0.9393226217997521, + 0.8825211612333059, + 0.9517122958633015, + 1.1120154189762954, + 0.8447883833839713, + 0.13914489686398096, + 0.9360657643374276, + 0.7077148903765025, + 0.32143001012764044, + 0.37154919620231974, + 0.26105391286924323, + 0.4491997238330288, + 0.010728834830787237, + 0.7121670006816283, + 0.029591622280117445, + 0.007499650861221285, + 0.7127230023261887, + 1.0839258553090234, + 0.6993921179698958, + 0.49761646258628134, + 0.984102352729517, + 0.8430476067533387, + 0.4275628979722584, + 0.7159762100268545, + 0.33342969923370464, + 0.6223438546697133, + 1.058541845610797, + 0.030123377969919794, + 1.0332960131749906, + 1.041793343992565, + 0.5797628511379451, + 0.06191910327971288, + 0.7649924119373708, + 0.3087152127442952, + 0.49805541889845684, + 0.48466801099347423 + ] + } + } + } + ], + "Attenuator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7761025 + }, + "data": { + "type": "ndarray", + "content": [ + 1.1659534387549473, + 0.43637360736158304, + 0.9214281633175516, + 0.28424696190551973, + 0.6582419983462712, + 1.3687113757566782, + 0.9767960558050885, + 0.3868714503109195, + 0.3125424907767712, + 0.6233133446539249, + 0.7735948343497585, + 1.4877572969658177, + 0.05872724792912261, + 1.54794572889809, + 0.048066850576742697, + 0.3631489788824215, + 0.49115129052215034, + 0.004622207807539101, + 1.5148363489586678, + 1.7245825103075385, + 0.10324151833180674, + 1.191192873490868, + 0.7232605285781757, + 0.8907055950786112, + 1.1577734182823687, + 0.9353939452377601, + 0.10350503532285629, + 0.44646752017747954, + 0.9145632014435997, + 1.2127671789291337, + 0.02526379357119053, + 0.4507955389224727, + 0.07618511256259868, + 0.7935178461252609, + 0.47133724183839865, + 1.19559493530089, + 0.7102246571191703, + 1.5328114139217033, + 1.0091782383389774, + 0.7845282506573513, + 1.5952996371009802, + 0.23943473044007263, + 0.5793430986838699, + 0.16519298331281304, + 1.437936208118817, + 0.42467937005961726, + 1.277272390559583, + 1.1508701768991823, + 0.21798168941247872, + 0.3308612797090394, + 1.3051412639445443, + 0.5980729362938445, + 1.5168772453344939, + 0.7154862558790507, + 0.0006975963049354294, + 1.2295852266435081, + 1.5353539453875087, + 0.8023386755576991, + 0.8576382839767142, + 1.7328297641288963, + 0.32284353122033543, + 1.73437310320446, + 0.4833014423519436, + 0.18729452200379984, + 1.3919817314418979, + 1.3078076749540237, + 1.4103419833454838, + 1.6478951026761268, + 1.25189149000982, + 0.20619875425433903, + 1.3871553959696619, + 1.0487623481120731, + 0.4763269739821545, + 0.5505985711859143, + 0.3868556651378918, + 0.6656688499062046, + 0.015899055061081146, + 1.0553599281844293, + 0.04385181050865158, + 0.011113728924158739, + 1.0561838667481538, + 1.6062691920873873, + 1.0364288357744824, + 0.7374175912613233, + 1.4583407926914678, + 1.2493118339767106, + 0.6336052483005331, + 1.0610047936403824, + 0.4941093073689731, + 0.9222510522695058, + 1.5686526405952337, + 0.04463981900394038, + 1.531240853920328, + 1.5438330442813126, + 0.8591502840700574, + 0.0917578887086558, + 1.1336418791535336, + 0.4574849219908048, + 0.7380680804045303, + 0.7182292872118445 + ] + } + } + } + ] + } + }, + "NoiseGenerator": { + "upstreams": {} + }, + "RollingAverager": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ] + } + }, + "Amplifier": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ] + } + }, + "Attenuator": { + "upstreams": { + "NoiseGenerator": [ + { + "name": "RandomMessage", + "fields": { + "timestamp": { + "type": "float", + "content": 1649542864.7681 + }, + "data": { + "type": "ndarray", + "content": [ + 0.6556638018597015, + 0.24539091264180568, + 0.5181571344277014, + 0.15984381323928065, + 0.3701566776459857, + 0.7696829688461163, + 0.5492927884621397, + 0.21755380404615476, + 0.175755558441702, + 0.3505148522415169, + 0.4350243443086518, + 0.8366274099373696, + 0.03302475842524499, + 0.8704738525112928, + 0.027029976451051985, + 0.204213678026263, + 0.27619466758117617, + 0.0025992584637967164, + 0.8518550799199274, + 0.9698040142464401, + 0.05805697223337192, + 0.6698569790361546, + 0.40671928409852365, + 0.5008805646909648, + 0.6510638383070676, + 0.5260106707400308, + 0.0582051587273289, + 0.25106713695105276, + 0.5142966826701221, + 0.6819891025463702, + 0.014206875156158705, + 0.2535009607475609, + 0.04284203715822765, + 0.44622787715227075, + 0.26505240918696915, + 0.6723324403079721, + 0.3993886748672555, + 0.8619632017716233, + 0.5675026279008025, + 0.4411726561239213, + 0.8971029120030483, + 0.13464404361226445, + 0.3257885658537706, + 0.09289484114827451, + 0.8086109528127211, + 0.23881475974032784, + 0.7182630487363211, + 0.6471818603995376, + 0.12258011209144437, + 0.18605697048575598, + 0.7339348679271822, + 0.3363211275559638, + 0.8530027602924856, + 0.40234748928650654, + 0.00039228723056528025, + 0.691446585785186, + 0.8633929722854425, + 0.4511881940645861, + 0.48228544914544236, + 0.9744417858895236, + 0.18154825917557527, + 0.9753096692304941, + 0.2717803735585991, + 0.1053234497045098, + 0.7827688514997935, + 0.735434301027755, + 0.7930935798860846, + 0.9266795158135795, + 0.7039903194866428, + 0.11595408071998414, + 0.7800548036145231, + 0.5897624086470854, + 0.2678583417730337, + 0.3096243301685998, + 0.21754492739103604, + 0.37433310319419066, + 0.008940695692322698, + 0.5934725005680236, + 0.024659685233431206, + 0.006249709051017738, + 0.5939358352718239, + 0.9032715460908528, + 0.5828267649749131, + 0.4146803854885678, + 0.8200852939412642, + 0.7025396722944489, + 0.356302414976882, + 0.5966468416890455, + 0.27785808269475387, + 0.5186198788914278, + 0.8821182046756642, + 0.025102814974933163, + 0.861080010979159, + 0.8681611199938043, + 0.483135709281621, + 0.05159925273309407, + 0.6374936766144756, + 0.25726267728691266, + 0.4150461824153807, + 0.4038900091612285 + ] + } + } + } + ] + } + } + } + }, + "produced_timestamp_s": 1649542864.9706066, + "timestamp_s": 1649542864.9706097 + } + ], + "batch_num": 53 + } + } +} + ``` + +
+ +3. To stop receiving data from the server, send the following `EndStreamRequest` to **ws://127.0.0.1:9000** + +```python { "api_version": "0.1", "api_request": { @@ -213,13 +1261,13 @@ https://github.com/facebookresearch/labgraph/blob/main/extensions/graphviz_suppo } ``` -#### Labgraph YAML Parser: +# LabGraph YAML Parser: -**yamlify(python_file: str, yaml_file: str) -> str** : This function can be used to generate a YAMLIFIED version of the passed LabGraph source code(.py). The serialized version will be saved as a YAML file at the specified folder(yml_file) +**`yamlify(python_file: str, yaml_file: str) -> str`** : This function can be used to generate a YAMLIFIED version of the passed LabGraph source code (.py). The serialized version will be saved as a YAML file at the specified folder (yml_file) 1. Call **yamlify(python_file: str, yaml_file: str) -> str** and pass the appropriate parameters -``` +```python from extensions.yaml_support.labgraph_yaml_parser.yamlify import ( yamlify ) @@ -228,3 +1276,88 @@ yamlify(python_file, output_yaml_file) ``` This will generate a YAML file in the specified location + +Example of a YAML file produced for [`simple_viz.py`](https://github.com/facebookresearch/labgraph/blob/main/labgraph/examples/simple_viz.py) example: + +```python +RandomMessage: + type: Message + fields: + timestamp: float + data: np.ndarray + +NoiseGeneratorConfig: + type: Config + fields: + sample_rate: float + num_features: int + +NoiseGenerator: + type: Node + config: NoiseGeneratorConfig + inputs: [] + outputs: + - RandomMessage + +RollingState: + type: State + fields: + messages: List.RandomMessage + +RollingConfig: + type: Config + fields: + window: float + +RollingAverager: + type: Node + state: RollingState + config: RollingConfig + inputs: + - RandomMessage + outputs: + - RandomMessage + +AveragedNoiseConfig: + type: Config + fields: + sample_rate: float + num_features: int + window: float + +AveragedNoise: + type: Group + config: AveragedNoiseConfig + inputs: [] + outputs: + - RandomMessage + connections: + NoiseGenerator: RollingAverager + RollingAverager: AveragedNoise + +PlotState: + type: State + fields: + data: Optional.np.ndarray + +PlotConfig: + type: Config + fields: + refresh_rate: float + num_bars: int + +Plot: + type: Node + state: PlotState + config: PlotConfig + inputs: + - RandomMessage + outputs: [] + +Demo: + type: Graph + inputs: [] + outputs: [] + connections: + AveragedNoise: Plot +``` diff --git a/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py new file mode 100644 index 00000000..59bcc724 --- /dev/null +++ b/extensions/yaml_support/labgraph_monitor/examples/labgraph_monitor_example.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + +import labgraph as lg +from typing import Dict, Tuple + +# Make the imports work when running from LabGraph root directory +import sys +sys.path.append("./") + +# LabGraph WebSockets Components +from labgraph.websockets.ws_server.ws_api_node_server import ( + WSAPIServerConfig, + WSAPIServerNode, +) + +# LabGraph Monitor Components +from extensions.yaml_support.labgraph_monitor.server.enums.enums import ENUMS +from extensions.yaml_support.labgraph_monitor.aliases.aliases import SerializedGraph +from extensions.yaml_support.labgraph_monitor.server.serializer_node import SerializerConfig, Serializer +from extensions.yaml_support.labgraph_monitor.generate_lg_monitor.generate_lg_monitor import set_graph_topology + +# Graph Components +from extensions.graphviz_support.graphviz_support.tests.demo_graph.noise_generator import NoiseGeneratorConfig, NoiseGenerator +from extensions.graphviz_support.graphviz_support.tests.demo_graph.amplifier import AmplifierConfig, Amplifier +from extensions.graphviz_support.graphviz_support.tests.demo_graph.attenuator import AttenuatorConfig, Attenuator +from extensions.graphviz_support.graphviz_support.tests.demo_graph.rolling_averager import RollingConfig, RollingAverager + +# LabGraph WebSockets Configurations +APP_ID = 'LABGRAPH.MONITOR' +WS_SERVER = ENUMS.WS_SERVER +STREAM = ENUMS.STREAM +DEFAULT_IP = WS_SERVER.DEFAULT_IP +DEFAULT_PORT = WS_SERVER.DEFAULT_PORT +DEFAULT_API_VERSION = WS_SERVER.DEFAULT_API_VERSION +SAMPLE_RATE = 5 + +# Graph Configurations +NUM_FEATURES = 100 +WINDOW = 2.0 +REFRESH_RATE = 2.0 +OUT_IN_RATIO = 1.2 +ATTENUATION = 5.0 + +class Demo(lg.Graph): + # LabGraph WebSockets Component + WS_SERVER_NODE: WSAPIServerNode + + # LabGraph Monitor Component + SERIALIZER: Serializer + + # Graph Components + NOISE_GENERATOR: NoiseGenerator + ROLLING_AVERAGER: RollingAverager + AMPLIFIER: Amplifier + ATTENUATOR: Attenuator + + # Used when running `generate_labgraph_monitor(graph)` + def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict) -> None: + self._topology = topology + self._sub_pub_match = sub_pub_map + + def setup(self) -> None: + self.WS_SERVER_NODE.configure( + WSAPIServerConfig( + app_id=APP_ID, + ip=WS_SERVER.DEFAULT_IP, + port=ENUMS.WS_SERVER.DEFAULT_PORT, + api_version=ENUMS.WS_SERVER.DEFAULT_API_VERSION, + num_messages=-1, + enums=ENUMS(), + sample_rate=SAMPLE_RATE, + ) + ) + self.SERIALIZER.configure( + SerializerConfig( + data=self._topology, + sub_pub_match=self._sub_pub_match, + sample_rate=SAMPLE_RATE, + stream_name=STREAM.LABGRAPH_MONITOR, + stream_id=STREAM.LABGRAPH_MONITOR_ID, + ) + ) + self.NOISE_GENERATOR.configure( + NoiseGeneratorConfig( + sample_rate=float(SAMPLE_RATE), + num_features=NUM_FEATURES, + ) + ) + self.ROLLING_AVERAGER.configure( + RollingConfig( + window=WINDOW, + ) + ) + self.AMPLIFIER.configure( + AmplifierConfig( + out_in_ratio=OUT_IN_RATIO, + ) + ) + self.ATTENUATOR.configure( + AttenuatorConfig( + attenuation=ATTENUATION, + ) + ) + + def connections(self) -> lg.Connections: + return ( + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ROLLING_AVERAGER.ROLLING_AVERAGER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.AMPLIFIER.AMPLIFIER_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ATTENUATOR.ATTENUATOR_INPUT), + (self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_1), + (self.ROLLING_AVERAGER.ROLLING_AVERAGER_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_2), + (self.AMPLIFIER.AMPLIFIER_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_3), + (self.ATTENUATOR.ATTENUATOR_OUTPUT, self.SERIALIZER.SERIALIZER_INPUT_4), + (self.SERIALIZER.SERIALIZER_OUTPUT, self.WS_SERVER_NODE.topic), + ) + + def process_modules(self) -> Tuple[lg.Module, ...]: + return ( + self.NOISE_GENERATOR, + self.ROLLING_AVERAGER, + self.AMPLIFIER, + self.ATTENUATOR, + self.SERIALIZER, + self.WS_SERVER_NODE, + ) + +if __name__ == "__main__": + graph = Demo() + set_graph_topology(graph=graph) + + runner = lg.ParallelRunner(graph=graph) + runner.run() \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py index 7f67f115..7f50c183 100644 --- a/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py +++ b/extensions/yaml_support/labgraph_monitor/generate_lg_monitor/generate_lg_monitor.py @@ -1,14 +1,12 @@ #!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserve +# Copyright 2004-present Facebook. All Rights Reserved. import labgraph as lg from labgraph.graphs.stream import Stream -from typing import List, Dict +from typing import List, Dict, Tuple from ..lg_monitor_node.lg_monitor_node import LabgraphMonitorNode from ..lg_monitor_node.lg_monitor_message import LabgraphMonitorMessage from ..aliases.aliases import SerializedGraph -from ..server.lg_monitor_server import run_server - def identify_upstream_message( in_edge: str, @@ -141,10 +139,7 @@ def connect_to_upstream( return nodes -def serialize_graph( - name: str, - nodes: List[LabgraphMonitorNode] -) -> SerializedGraph: +def serialize_graph(graph: lg.Graph) -> SerializedGraph: """ A function that returns a serialized version of the graph topology. @@ -154,8 +149,17 @@ def serialize_graph( @return: A serialized version of the graph topology """ + # List of graph nodes + nodes: List[LabgraphMonitorNode] = [] + + # Identify graph nodes + nodes = identify_graph_nodes(graph) + + # Connect graph edges + nodes = connect_to_upstream(nodes, graph.__streams__.values()) + serialized_graph: SerializedGraph = { - "name": name, + "name": type(graph).__name__, "nodes": {} } @@ -185,31 +189,91 @@ def serialize_graph( return serialized_graph - -def generate_labgraph_monitor(graph: lg.Graph) -> None: +def sub_pub_grouping_map(graph: lg.Graph) -> Dict[str, str]: """ - A function that serialize the graph topology - and send it using to LabGraphMonitor Front-End - using Labgraph Websocket API + A function that matches subscribers with their publishers + to automate assigning real-time messages to serialized graph @params: graph: An instance of the computational graph + + @return: A dictionary where the key is a publisher grouping + and the value is a dictionary of sets of topic paths subscribed and their groupings """ - # Local variables - nodes: List[LabgraphMonitorNode] = [] + sub_pub_grouping_map: Dict[str, str] = {} + for stream in graph.__streams__.values(): + difference = set(stream.topic_paths).difference(LabgraphMonitorNode.in_edges) + if difference: + upstream_edge = max(difference, key=len) + for edge in stream.topic_paths: + if edge != upstream_edge: + # convert SERIALIZER/SERIALIZER_INPUT_1 to its grouping Serializer + edge_path = "/".join(edge.split("/")[:-1]) + edge_grouping = type(graph.__descendants__[edge_path]).__name__ + + # convert SERIALIZER/SERIALIZER_INPUT_1 to its topic SERIALIZER_INPUT_1 + topic_path = edge.split("/")[-1] + + # convert NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT to its grouping NoiseGenerator + group_path = "/".join(upstream_edge.split("/")[:-1]) + grouping = type(graph.__descendants__[group_path]).__name__ + + if grouping in sub_pub_grouping_map: + sub_pub_grouping_map[grouping]["topics"].add(topic_path) + sub_pub_grouping_map[grouping]["subscribers"].add(edge_grouping) + else: + sub_pub_grouping_map[grouping] = { + "topics": {topic_path}, + "subscribers": {edge_grouping}, + } + + return sub_pub_grouping_map + +def generate_graph_topology(graph: lg.Graph) -> SerializedGraph: + """ + A function that serializes the graph topology + and sends it to LabGraph Monitor Front-End + using WebSockets API - # Identify graph nodes - nodes = identify_graph_nodes(graph) + @params: + graph: An instance of the computational graph + + @return: Serialized topology of the graph + """ + serialized_graph = serialize_graph(graph) - # Connect graph edges - nodes = connect_to_upstream(nodes, graph.__streams__.values()) + return serialized_graph +def set_graph_topology(graph: lg.Graph) -> None: + """ + A function that serializes the graph topology + and applies the information to serve as graph + attribute for LabGraph Monitor Front-End + real-time messaging using WebSockets API + + @params: + graph: An instance of the computational graph + """ # Serialize the graph topology - serialized_graph = serialize_graph( - type(graph).__name__, - nodes - ) - - # Send the serialized graph to Front-End - # using LabGraph Websockets API - run_server(serialized_graph) + serialized_graph = serialize_graph(graph) + + # Match subscribers with their publishers + sub_pub_map = sub_pub_grouping_map(graph) + + # Set graph's topology and real-time messages matching + if hasattr(graph, "set_topology"): + graph.set_topology(serialized_graph, sub_pub_map) + else: + raise AttributeError( + """ + Provided graph is missing `set_topology` method to establish + its topology and possible real-time messsaging + + Please add the following method to your graph + ``` + def set_topology(self, topology: SerializedGraph, sub_pub_map: Dict) -> None: + self._topology = topology + self._sub_pub_match = sub_pub_map + ``` + """ + ) \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py b/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py new file mode 100644 index 00000000..2c607892 --- /dev/null +++ b/extensions/yaml_support/labgraph_monitor/labgraph_monitor.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved, + +import labgraph as lg +from .generate_lg_monitor.generate_lg_monitor import generate_graph_topology, set_graph_topology +from .server.lg_monitor_server import run_topology + +class LabgraphMonitor: + """ + A class that serves as a facade for + LabGraph Monitor's Back-End functions + """ + + def __init__(self, graph: lg.Graph) -> None: + self.graph = graph + + def stream_graph_topology(self) -> None: + """ + Stream graph topology via WebSockets + """ + topology = generate_graph_topology(self.graph) + run_topology(topology) + + def stream_real_time_graph(self) -> None: + """ + Stream graph topology and real-time + messages via WebSockets + """ + set_graph_topology(self.graph) + + runner = lg.ParallelRunner(self.graph) + runner.run() \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py b/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py index 1e491676..c4024647 100644 --- a/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py +++ b/extensions/yaml_support/labgraph_monitor/lg_monitor_node/lg_monitor_message.py @@ -34,6 +34,6 @@ def serialize(self) -> SerializedMessage: name = annotation[0] type = (annotation[1]).__name__ - serialized_message['fields'][name] = type + serialized_message['fields'][name] = {"type": type} return serialized_message diff --git a/extensions/yaml_support/labgraph_monitor/server/enums/enums.py b/extensions/yaml_support/labgraph_monitor/server/enums/enums.py index 2a39b096..83275601 100644 --- a/extensions/yaml_support/labgraph_monitor/server/enums/enums.py +++ b/extensions/yaml_support/labgraph_monitor/server/enums/enums.py @@ -2,9 +2,15 @@ # Copyright 2004-present Facebook. All Rights Reserved. from .enums_base import ENUMS as ENUMS_base +class STREAM_LISTS: + supported_stream_list = [ + "labgraph.monitor", + ] + sleep_pause_streams = supported_stream_list class ENUMS: API = ENUMS_base.API WS_SERVER = ENUMS_base.WS_SERVER STREAM = ENUMS_base.STREAM + STREAM_LISTS = STREAM_LISTS \ No newline at end of file diff --git a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py index 552226f1..1f5053b6 100644 --- a/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py +++ b/extensions/yaml_support/labgraph_monitor/server/lg_monitor_server.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserve +# Copyright 2004-present Facebook. All Rights Reserved. import labgraph as lg from typing import Tuple @@ -20,7 +20,7 @@ SAMPLE_RATE = 5 -def run_server(data: SerializedGraph) -> None: +def run_topology(data: SerializedGraph) -> None: """ A function that creates a Websocket server graph. The server graph streams the lagraph topology to the clients @@ -50,7 +50,7 @@ def setup(self) -> None: self.WS_SERVER_NODE.configure(wsapi_server_config) def connections(self) -> lg.Connections: - return ((self.SERIALIZER.TOPIC, self.WS_SERVER_NODE.topic),) + return ((self.SERIALIZER.SERIALIZER_OUTPUT, self.WS_SERVER_NODE.topic),) def process_modules(self) -> Tuple[lg.Module, ...]: return (self.SERIALIZER, self.WS_SERVER_NODE, ) diff --git a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py index 064294ab..5395bd3e 100644 --- a/extensions/yaml_support/labgraph_monitor/server/serializer_node.py +++ b/extensions/yaml_support/labgraph_monitor/server/serializer_node.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 -# Copyright 2004-present Facebook. All Rights Reserve +# Copyright 2004-present Facebook. All Rights Reserved. +from dataclasses import field +from typing import Dict, Optional import labgraph as lg import asyncio from labgraph.websockets.ws_server.ws_server_stream_message import ( @@ -8,30 +10,111 @@ ) from ..aliases.aliases import SerializedGraph +# Make it work with RandomMessage +from ....graphviz_support.graphviz_support.tests.demo_graph.random_message import RandomMessage class SerializerConfig(lg.Config): data: SerializedGraph + sub_pub_match: Optional[Dict] = field(default_factory=dict) sample_rate: int stream_name: str stream_id: str +class DataState(lg.State): + data_1: Optional[Dict] = None + data_2: Optional[Dict] = None + data_3: Optional[Dict] = None + data_4: Optional[Dict] = None class Serializer(lg.Node): """ Convenience node for sending messages to a `WSAPIServerNode`. """ - TOPIC = lg.Topic(WSStreamMessage) + SERIALIZER_OUTPUT = lg.Topic(WSStreamMessage) config: SerializerConfig + state: DataState - @lg.publisher(TOPIC) + SERIALIZER_INPUT_1 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_2 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_3 = lg.Topic(RandomMessage) + SERIALIZER_INPUT_4 = lg.Topic(RandomMessage) + + def get_grouping(self, topic: lg.Topic) -> str: + """ + Matches subscriber topics with grouping that produced information + """ + for key, value in self.config.sub_pub_match.items(): + if topic.name in value["topics"]: + return key + return "" + + @lg.subscriber(SERIALIZER_INPUT_1) + def add_message_1(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_1) + self.state.data_1 = { + "grouping": grouping, + "timestamp": message.timestamp, + "numpy": list(message.data), + } + + @lg.subscriber(SERIALIZER_INPUT_2) + def add_message_2(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_2) + self.state.data_2 = { + "grouping": grouping, + "timestamp": message.timestamp, + "numpy": list(message.data), + } + + @lg.subscriber(SERIALIZER_INPUT_3) + def add_message_3(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_3) + self.state.data_3 = { + "grouping": grouping, + "timestamp": message.timestamp, + "numpy": list(message.data), + } + + @lg.subscriber(SERIALIZER_INPUT_4) + def add_message_4(self, message: RandomMessage) -> None: + grouping = self.get_grouping(self.SERIALIZER_INPUT_4) + self.state.data_4 = { + "grouping": grouping, + "timestamp": message.timestamp, + "numpy": list(message.data), + } + + def output(self, _in: Dict) -> Dict: + """ + Updates serialized message with data according to grouping + + @params: + value of a dictionary that represents individual nodes + """ + for node, value in _in.items(): + for state in self.state.__dict__.values(): + if state["grouping"] in value["upstreams"].keys(): + value["upstreams"][state["grouping"]][0]["fields"]["timestamp"]["content"] = state["timestamp"] + value["upstreams"][state["grouping"]][0]["fields"]["data"]["content"] = state["numpy"] + + return _in + + @lg.publisher(SERIALIZER_OUTPUT) async def source(self) -> lg.AsyncPublisher: - await asyncio.sleep(.01) + await asyncio.sleep(.1) while True: - msg = WSStreamMessage( - samples=self.config.data, + output_data = self.config.data + # check if the monitor is in real-time mode + if self.config.sub_pub_match: + # populate Serialized Graph with real-time data + output_data = { + key: self.output(value) for key, value in self.config.data.items() if key == "nodes" + } + # otherwise, only send the topology + yield self.SERIALIZER_OUTPUT, WSStreamMessage( + samples=output_data, stream_name=self.config.stream_name, stream_id=self.config.stream_id, ) - yield self.TOPIC, msg - await asyncio.sleep(1 / self.config.sample_rate) + await asyncio.sleep(1 / self.config.sample_rate), diff --git a/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py b/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py index 94870d6f..30407800 100644 --- a/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py +++ b/extensions/yaml_support/labgraph_monitor/tests/test_lg_monitor_api.py @@ -21,7 +21,7 @@ def setUp(self) -> None: def test_identify_upstream_message(self) -> None: upstream_message = identify_upstream_message( - 'ROLLING_AVERAGER/INPUT', + 'ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT', self.graph.__topics__ ) @@ -38,47 +38,47 @@ def test_out_edge_node_mapper(self) -> None: self.assertEqual(4, len(out_edge_node_map)) self.assertEqual( 'generate_noise', - out_edge_node_map['NOISE_GENERATOR/OUTPUT'].name + out_edge_node_map['NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT'].name ) self.assertEqual( 'average', - out_edge_node_map['ROLLING_AVERAGER/OUTPUT'].name + out_edge_node_map['ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT'].name ) self.assertEqual( 'amplify', - out_edge_node_map['AMPLIFIER/OUTPUT'].name + out_edge_node_map['AMPLIFIER/AMPLIFIER_OUTPUT'].name ) self.assertEqual( 'attenuate', - out_edge_node_map['ATTENUATOR/OUTPUT'].name + out_edge_node_map['ATTENUATOR/ATTENUATOR_OUTPUT'].name ) def test_in_out_edge_mapper(self) -> None: in_out_edge_map = in_out_edge_mapper(self.graph.__streams__.values()) self.assertEqual(6, len(in_out_edge_map)) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ROLLING_AVERAGER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['AMPLIFIER/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['AMPLIFIER/AMPLIFIER_INPUT'] ) self.assertEqual( - 'NOISE_GENERATOR/OUTPUT', - in_out_edge_map['ATTENUATOR/INPUT'] + 'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT', + in_out_edge_map['ATTENUATOR/ATTENUATOR_INPUT'] ) self.assertEqual( - 'ROLLING_AVERAGER/OUTPUT', - in_out_edge_map['SINK/INPUT_1'] + 'ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_1'] ) self.assertEqual( - 'AMPLIFIER/OUTPUT', - in_out_edge_map['SINK/INPUT_2'] + 'AMPLIFIER/AMPLIFIER_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_2'] ) self.assertEqual( - 'ATTENUATOR/OUTPUT', - in_out_edge_map['SINK/INPUT_3'] + 'ATTENUATOR/ATTENUATOR_OUTPUT', + in_out_edge_map['SINK/SINK_INPUT_3'] ) def test_connect_to_upstream(self) -> None: @@ -89,9 +89,7 @@ def test_connect_to_upstream(self) -> None: self.assertEqual(expected_node_count, len(nodes)) def test_serialize_graph(self) -> None: - nodes = identify_graph_nodes(self.graph) - nodes = connect_to_upstream(nodes, self.graph.__streams__.values()) - serialized_graph = serialize_graph("Demo", nodes) + serialized_graph = serialize_graph(self.graph) self.assertEqual('Demo', serialized_graph["name"]) self.assertEqual(5, len(serialized_graph["nodes"]))