diff --git a/labgraph/devices/protocols/socket/.DS_Store b/labgraph/devices/protocols/socket/.DS_Store new file mode 100644 index 000000000..ac3a5bc20 Binary files /dev/null and b/labgraph/devices/protocols/socket/.DS_Store differ diff --git a/labgraph/devices/protocols/socket/README.md b/labgraph/devices/protocols/socket/README.md new file mode 100644 index 000000000..39f9c70b7 --- /dev/null +++ b/labgraph/devices/protocols/socket/README.md @@ -0,0 +1,13 @@ +# How to run the tests + +from root dir labraph run + +`python3 -m labgraph.devices.protocols.socket.tests.test_socket_sender.py` + +# Sender Node + +server file + +# Poller Node + +client file diff --git a/labgraph/devices/protocols/socket/__init__.py b/labgraph/devices/protocols/socket/__init__.py new file mode 100644 index 000000000..3a01ba008 --- /dev/null +++ b/labgraph/devices/protocols/socket/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + + +__all__ = [ + "SOCKETMessage", +] + +from .socket_message import SOCKETMessage diff --git a/labgraph/devices/protocols/socket/client.py b/labgraph/devices/protocols/socket/client.py new file mode 100644 index 000000000..918df35d3 --- /dev/null +++ b/labgraph/devices/protocols/socket/client.py @@ -0,0 +1,14 @@ +import socket + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# connect, client connect +s.connect((socket.gethostname(), 1234)) + +# 1024 is our buffer, stream of data how big of a chunk of data we want to receive +full_msg = '' +while True: + msg = s.recv(8) + if(len(msg) <= 0): + break + full_msg += msg.decode("utf-8") +print(full_msg) diff --git a/labgraph/devices/protocols/socket/server.py b/labgraph/devices/protocols/socket/server.py new file mode 100644 index 000000000..837e71b62 --- /dev/null +++ b/labgraph/devices/protocols/socket/server.py @@ -0,0 +1,16 @@ +from http import client +import socket + +# AF: Adress family, IPv4?? +# s: socket object +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.bind((socket.gethostname(), 1234)) # Tie +s.listen(5) + +while True: + clientsocket, address = s.accept() + print(f"Connection from {address} has been established!") + # client socket is our local version of the client's socket, + # so we send information to the client + clientsocket.send(bytes("Welcome to the server!", "utf-8")) + clientsocket.close() diff --git a/labgraph/devices/protocols/socket/socket_message.py b/labgraph/devices/protocols/socket/socket_message.py new file mode 100644 index 000000000..b3c9e32d8 --- /dev/null +++ b/labgraph/devices/protocols/socket/socket_message.py @@ -0,0 +1,9 @@ +from labgraph.messages import Message + + +class SOCKETMessage(Message): + """ + A message representing data that was/will be communicate to SOCKET + """ + + data: bytes diff --git a/labgraph/devices/protocols/socket/socket_poller_node.py b/labgraph/devices/protocols/socket/socket_poller_node.py new file mode 100644 index 000000000..2f53e6ee4 --- /dev/null +++ b/labgraph/devices/protocols/socket/socket_poller_node.py @@ -0,0 +1,45 @@ +from email.base64mime import header_length +import socket +import pickle +from labgraph.graphs import Config, Node +from labgraph.graphs.method import background +from labgraph.util.logger import get_logger + +# client + +logger = get_logger(__name__) + + +class SOCKETPollerConfig(Config): + read_addr: str + socket_topic: str + + +class SOCKETPollerNode(Node): + """ + Represents a node in a Labgraph graph that subscribes to messages in a + Labgraph topic and forwards them by writing to a SOCKET object. + """ + config: SOCKETPollerConfig + + def setup(self) -> None: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((socket.gethostname(), self.config.read_addr)) + + self.socket_open = False + + def cleanup(self) -> None: + self.socket.close() + + @background + def socket_monitor(self) -> None: + data = '' + while True: + # What's the size of the socket + msg = self.socket.recv(header_length) + if not len(msg): + break + data += msg + data = pickle.loads(data) + event = data["event"] + logger.debug(f"{self}:{event.name}") diff --git a/labgraph/devices/protocols/socket/socket_sender_node.py b/labgraph/devices/protocols/socket/socket_sender_node.py new file mode 100644 index 000000000..246e168bb --- /dev/null +++ b/labgraph/devices/protocols/socket/socket_sender_node.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. +import pickle +import asyncio +import socket +from labgraph.graphs.method import subscriber +from socket_message import SOCKETMessage +from labgraph.graphs import Config, Node, Topic, background +from labgraph.util.logger import get_logger + + +# server + +# lookup what is a logger +STARTUP_WAIT_TIME = 0.1 + +logger = get_logger(__name__) + + +class SOCKETSenderConfig(Config): + write_addr: str + # The message: in our case it was (Welcome to the Server) + socket_topic: str + + +class SOCKETSenderNode(Node): + """ + Represents a node in the graph which recieves data from SOCKET. + Data polled from SOCKET is subsequently pushed to rest of the graph + as as SOCKETMessage + + Args: + read_addr: The address from which ZMQ data should be polled. + socket_topic: The SOCKET topic being sent. + """ + + topic = Topic(SOCKETMessage) + config: SOCKETSenderConfig + + def setup(self) -> None: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + logger.debug(f"{self}:binding to {self.config.write_addr}") + self.socket.bind((socket.gethostname(), self.config.write_addr)) + self.socket.listen(STARTUP_WAIT_TIME) + self.has_subscrivers = False + + def cleanup(self, clientsocket) -> None: + clientsocket.close() + + @background + async def socket_monitor(self) -> None: + while True: + clientsocket, address = self.socket.accept() + logger.debug(f"Connection from {address} has been established!") + # client socket is our local version of the client's socket, + # so we send information to the client + clientsocket.send(bytes(self.config.socket_topic, "utf-8")) + self.cleanup(clientsocket) + + @subscriber(topic) + async def socket_subscriber(self) -> None: + while not self.has_subscrivers: + await asyncio.sleep(STARTUP_WAIT_TIME) diff --git a/labgraph/devices/protocols/socket/tests/README.md b/labgraph/devices/protocols/socket/tests/README.md new file mode 100644 index 000000000..38f0ae94c --- /dev/null +++ b/labgraph/devices/protocols/socket/tests/README.md @@ -0,0 +1 @@ +from root dir labraph run python3 -m labgraph.devices.protocols.socket.tests.test_socket_sender.py diff --git a/labgraph/devices/protocols/socket/tests/__init__.py b/labgraph/devices/protocols/socket/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/labgraph/devices/protocols/socket/tests/test_socket_poller.py b/labgraph/devices/protocols/socket/tests/test_socket_poller.py new file mode 100644 index 000000000..af3c6aa77 --- /dev/null +++ b/labgraph/devices/protocols/socket/tests/test_socket_poller.py @@ -0,0 +1,7 @@ +from socket_poller_node import SOCKETPollerNode + +# Intial test to verify if the poller_node works +mySocketPoller = SOCKETPollerNode() +mySocketPoller.setup() + +mySocketPoller.socket_monitor() diff --git a/labgraph/devices/protocols/socket/tests/test_socket_sender.py b/labgraph/devices/protocols/socket/tests/test_socket_sender.py new file mode 100644 index 000000000..f70bd4fe2 --- /dev/null +++ b/labgraph/devices/protocols/socket/tests/test_socket_sender.py @@ -0,0 +1,7 @@ +from socket_sender_node import SOCKETSenderNode + +# Initial test to verify if the poller_node works +mySocketSender = SOCKETSenderNode() +mySocketSender.setup() + +mySocketSender.socket_monitor()