Skip to content
This repository was archived by the owner on Nov 1, 2024. It is now read-only.

Real Time Messages #64

Merged
merged 13 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions extensions/graphviz_support/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Graphviz for LabGraph graphs

This extension provides an API to generate a graphviz visualization of the LabGraph topology.
This extension provides an API to generate a [graphviz](https://graphviz.org/) visualization of the LabGraph topology.

## Quick Start

Expand All @@ -23,22 +23,33 @@ python setup.py install

To make sure things are working:

1- Move to the root of the LabGraph directory:
```
labgraph\extensions\graphviz_support> cd ../..
1. Move to the root of the LabGraph directory:

```bash
labgraph/extensions/graphviz_support> cd ../..
labgraph>
```
2- Run the following test
```
2. Run the following test

```bash
python -m extensions.graphviz_support.graphviz_support.tests.test_lg_graphviz_api
```
**The output of the file for this test can be found at:**\
extensions\graphviz_support\graphviz_support\tests\output

**The output of the file for this test can be found at:**

`extensions/graphviz_support/graphviz_support/tests/output/test.svg`

### Generating a graphviz file

To generate a graph visualization just call 'generate_graphviz' function and pass the appropriate parameters
```
from extensions/graphviz_support/graphviz_support/generate_graphviz/generate_graphviz.py import generate_graphviz.py

generate_graphviz(graph, output_file)
```python
from extensions.graphviz_support.graphviz_support.generate_graphviz.generate_graphviz import generate_graphviz
from extensions.graphviz_support.graphviz_support.tests.demo_graph.demo import Demo

generate_graphviz(Demo(), "output.png") # you can also use "output.svg"
```

**It will produce the following diagram named `output.png` in your current directory:**

![Graphviz diagram for nodes: RollingAverager, NoiseGenerator, Amplifier, Sink, and Attenuator](https://i.imgur.com/M4yL39x.png)
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ class AmplifierConfig(lg.Config):


class Amplifier(lg.Node):
INPUT = lg.Topic(RandomMessage)
OUTPUT = lg.Topic(RandomMessage)
AMPLIFIER_INPUT = lg.Topic(RandomMessage)
AMPLIFIER_OUTPUT = lg.Topic(RandomMessage)
config: AmplifierConfig

def output(self, _in: float) -> float:
return self.config.out_in_ratio * _in

@lg.subscriber(INPUT)
@lg.publisher(OUTPUT)
@lg.subscriber(AMPLIFIER_INPUT)
@lg.publisher(AMPLIFIER_OUTPUT)
async def amplify(self, message: RandomMessage) -> lg.AsyncPublisher:
current_time = time.time()
output_data = np.array(
[self.output(_in) for _in in message.data]
)
yield self.OUTPUT, RandomMessage(
yield self.AMPLIFIER_OUTPUT, RandomMessage(
timestamp=current_time, data=output_data
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ class AttenuatorConfig(lg.Config):


class Attenuator(lg.Node):
INPUT = lg.Topic(RandomMessage)
OUTPUT = lg.Topic(RandomMessage)
ATTENUATOR_INPUT = lg.Topic(RandomMessage)
ATTENUATOR_OUTPUT = lg.Topic(RandomMessage)
config: AttenuatorConfig

def output(self, _in: float) -> float:
return pow(10, (self.config.attenuation / 20)) * _in

@lg.subscriber(INPUT)
@lg.publisher(OUTPUT)
@lg.subscriber(ATTENUATOR_INPUT)
@lg.publisher(ATTENUATOR_OUTPUT)
async def attenuate(self, message: RandomMessage) -> lg.AsyncPublisher:
current_time = time.time()
output_data = np.array(
[self.output(_in) for _in in message.data]
)
yield self.OUTPUT, RandomMessage(
yield self.ATTENUATOR_OUTPUT, RandomMessage(
timestamp=current_time, data=output_data
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ def setup(self) -> None:

def connections(self) -> lg.Connections:
return (
(self.NOISE_GENERATOR.OUTPUT, self.ROLLING_AVERAGER.INPUT),
(self.NOISE_GENERATOR.OUTPUT, self.AMPLIFIER.INPUT),
(self.NOISE_GENERATOR.OUTPUT, self.ATTENUATOR.INPUT),
(self.ROLLING_AVERAGER.OUTPUT, self.SINK.INPUT_1),
(self.AMPLIFIER.OUTPUT, self.SINK.INPUT_2),
(self.ATTENUATOR.OUTPUT, self.SINK.INPUT_3),
(self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ROLLING_AVERAGER.ROLLING_AVERAGER_INPUT),
(self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.AMPLIFIER.AMPLIFIER_INPUT),
(self.NOISE_GENERATOR.NOISE_GENERATOR_OUTPUT, self.ATTENUATOR.ATTENUATOR_INPUT),
(self.ROLLING_AVERAGER.ROLLING_AVERAGER_OUTPUT, self.SINK.SINK_INPUT_1),
(self.AMPLIFIER.AMPLIFIER_OUTPUT, self.SINK.SINK_INPUT_2),
(self.ATTENUATOR.ATTENUATOR_OUTPUT, self.SINK.SINK_INPUT_3),
)

def process_modules(self) -> Tuple[lg.Module, ...]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class NoiseGeneratorConfig(lg.Config):


class NoiseGenerator(lg.Node):
OUTPUT = lg.Topic(RandomMessage)
NOISE_GENERATOR_OUTPUT = lg.Topic(RandomMessage)
config: NoiseGeneratorConfig

@lg.publisher(OUTPUT)
@lg.publisher(NOISE_GENERATOR_OUTPUT)
async def generate_noise(self) -> lg.AsyncPublisher:
while True:
yield self.OUTPUT, RandomMessage(
yield self.NOISE_GENERATOR_OUTPUT, RandomMessage(
timestamp=time.time(),
data=np.random.rand(self.config.num_features)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ class RollingConfig(lg.Config):


class RollingAverager(lg.Node):
INPUT = lg.Topic(RandomMessage)
OUTPUT = lg.Topic(RandomMessage)
ROLLING_AVERAGER_INPUT = lg.Topic(RandomMessage)
ROLLING_AVERAGER_OUTPUT = lg.Topic(RandomMessage)

state: RollingState
config: RollingConfig

@lg.subscriber(INPUT)
@lg.publisher(OUTPUT)
@lg.subscriber(ROLLING_AVERAGER_INPUT)
@lg.publisher(ROLLING_AVERAGER_OUTPUT)
async def average(self, message: RandomMessage) -> lg.AsyncPublisher:
current_time = time.time()
self.state.messages.append(message)
Expand All @@ -40,7 +40,7 @@ async def average(self, message: RandomMessage) -> lg.AsyncPublisher:
[message.data for message in self.state.messages]
)
mean_data = np.mean(all_data, axis=0)
yield self.OUTPUT, RandomMessage(
yield self.ROLLING_AVERAGER_OUTPUT, RandomMessage(
timestamp=current_time,
data=mean_data
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ class SinkState(lg.State):


class Sink(lg.Node):
INPUT_1 = lg.Topic(RandomMessage)
INPUT_2 = lg.Topic(RandomMessage)
INPUT_3 = lg.Topic(RandomMessage)
SINK_INPUT_1 = lg.Topic(RandomMessage)
SINK_INPUT_2 = lg.Topic(RandomMessage)
SINK_INPUT_3 = lg.Topic(RandomMessage)
state: SinkState

@lg.subscriber(INPUT_1)
@lg.subscriber(SINK_INPUT_1)
def got_message(self, message: RandomMessage) -> None:
self.state.data_1 = message.data

@lg.subscriber(INPUT_2)
@lg.subscriber(SINK_INPUT_2)
def got_message_2(self, message: RandomMessage) -> None:
self.state.data_2 = message.data

@lg.subscriber(INPUT_3)
@lg.subscriber(SINK_INPUT_3)
def got_message_3(self, message: RandomMessage) -> None:
self.state.data_3 = message.data

Expand Down
17 changes: 17 additions & 0 deletions extensions/graphviz_support/graphviz_support/tests/output/test
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
digraph Demo {
center=true rankdir=LR
node [fontsize=12 height=1.5 shape=circle width=1.5]
NoiseGenerator
RollingAverager
Amplifier
Attenuator
Sink
Sink
Sink
NoiseGenerator -> RollingAverager
NoiseGenerator -> Amplifier
NoiseGenerator -> Attenuator
RollingAverager -> Sink
Amplifier -> Sink
Attenuator -> Sink
}
79 changes: 79 additions & 0 deletions extensions/graphviz_support/graphviz_support/tests/output/test.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,47 @@ def test_out_edge_node_mapper(self) -> None:
self.assertEqual(4, len(out_edge_node_map))
self.assertEqual(
'generate_noise',
out_edge_node_map['NOISE_GENERATOR/OUTPUT'].name
out_edge_node_map['NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT'].name
)
self.assertEqual(
'average',
out_edge_node_map['ROLLING_AVERAGER/OUTPUT'].name
out_edge_node_map['ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT'].name
)
self.assertEqual(
'amplify',
out_edge_node_map['AMPLIFIER/OUTPUT'].name
out_edge_node_map['AMPLIFIER/AMPLIFIER_OUTPUT'].name
)
self.assertEqual(
'attenuate',
out_edge_node_map['ATTENUATOR/OUTPUT'].name
out_edge_node_map['ATTENUATOR/ATTENUATOR_OUTPUT'].name
)

def test_in_out_edge_mapper(self) -> None:
in_out_edge_map = in_out_edge_mapper(self.graph.__streams__.values())
self.assertEqual(6, len(in_out_edge_map))
self.assertEqual(
'NOISE_GENERATOR/OUTPUT',
in_out_edge_map['ROLLING_AVERAGER/INPUT']
'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT',
in_out_edge_map['ROLLING_AVERAGER/ROLLING_AVERAGER_INPUT']
)
self.assertEqual(
'NOISE_GENERATOR/OUTPUT',
in_out_edge_map['AMPLIFIER/INPUT']
'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT',
in_out_edge_map['AMPLIFIER/AMPLIFIER_INPUT']
)
self.assertEqual(
'NOISE_GENERATOR/OUTPUT',
in_out_edge_map['ATTENUATOR/INPUT']
'NOISE_GENERATOR/NOISE_GENERATOR_OUTPUT',
in_out_edge_map['ATTENUATOR/ATTENUATOR_INPUT']
)
self.assertEqual(
'ROLLING_AVERAGER/OUTPUT',
in_out_edge_map['SINK/INPUT_1']
'ROLLING_AVERAGER/ROLLING_AVERAGER_OUTPUT',
in_out_edge_map['SINK/SINK_INPUT_1']
)
self.assertEqual(
'AMPLIFIER/OUTPUT',
in_out_edge_map['SINK/INPUT_2']
'AMPLIFIER/AMPLIFIER_OUTPUT',
in_out_edge_map['SINK/SINK_INPUT_2']
)
self.assertEqual(
'ATTENUATOR/OUTPUT',
in_out_edge_map['SINK/INPUT_3']
'ATTENUATOR/ATTENUATOR_OUTPUT',
in_out_edge_map['SINK/SINK_INPUT_3']
)

def test_connect_to_upstream(self) -> None:
Expand Down
Loading