From 03925222d269a13fe35f2aece66cc6a98c854018 Mon Sep 17 00:00:00 2001 From: Ryan Bunney Date: Wed, 18 Sep 2024 12:09:09 +0800 Subject: [PATCH] Node Class: Successful testing with distributed local case (ArrayLoop) --- daliuge-common/dlg/clients.py | 1 + daliuge-engine/dlg/manager/composite_manager.py | 16 +++++++++------- daliuge-engine/dlg/manager/rest.py | 7 ++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index 2cff59f71..d1fd9da39 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -233,6 +233,7 @@ def __init__( super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout) def add_node_subscriptions(self, sessionId, node_subscriptions): + self._post_json( f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions ) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 0c175ab4c..b87c93f45 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -179,7 +179,7 @@ def __init__( # This list is different from the dmHosts, which are the machines that # are directly managed by this manager (which in turn could manage more # machines) - self._use_dmHosts = False + self.use_dmHosts = False self._nodes = [] self.startDMChecker() @@ -237,19 +237,19 @@ def removeDmHost(self, host_str): @property def nodes(self): - if self._use_dmHosts: + if self.use_dmHosts: return [str(n) for n in self._dmHosts[:]] else: return self._nodes def add_node(self, node): - if self._use_dmHosts: + if self.use_dmHosts: return self._dmHosts.append(Node(node)) else: self._nodes.append(node) def remove_node(self, node): - if self._use_dmHosts: + if self.use_dmHosts: self._dmHosts.remove(Node(node)) else: self._nodes.remove(node) @@ -296,6 +296,8 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable) # if ":" in host: # host, port = host.split(":") # port = int(port) + if isinstance(host, str): + host = Node(host) try: with self.dmAt(host) as dm: @@ -448,8 +450,8 @@ def addGraphSpec(self, sessionId, graphSpec): for rel in inter_partition_rels: # rhn = self._graph[rel.rhs]["node"].split(":")[0] # lhn = self._graph[rel.lhs]["node"].split(":")[0] - rhn = Node(self._graph[rel.rhs]["node"]) - lhn = Node(self._graph[rel.lhs]["node"]) + rhn = (self._graph[rel.rhs]["node"]) + lhn = (self._graph[rel.lhs]["node"]) drop_rels[lhn][rhn].append(rel) drop_rels[rhn][lhn].append(rel) @@ -611,7 +613,7 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10): ) # In the case of the Data Island the dmHosts are the final nodes as well - self.use_dm_hosts = True + self.use_dmHosts = True # self._nodes = dmHosts logger.info("Created DataIslandManager for hosts: %r", self._dmHosts) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 651184ad2..46e7e5446 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -444,7 +444,6 @@ def linkGraphParts(self, sessionId): @daliuge_aware def add_node_subscriptions(self, sessionId): - # TODO translate node information here logger.debug("NM REST call: add_subscriptions %s", bottle.request.json) if bottle.request.content_type != "application/json": bottle.response.status = 415 @@ -453,8 +452,10 @@ def add_node_subscriptions(self, sessionId): self.dm.add_node_subscriptions(sessionId, subscriptions) def _parse_subscriptions(self, json_request): - - return [Node(n) for n in json_request] + relationships = {} + for host, droprels in json_request.items(): + relationships[Node(host)] = droprels + return relationships @daliuge_aware def trigger_drops(self, sessionId):