Skip to content

Commit

Permalink
Node Class: Successful testing with distributed local case (ArrayLoop)
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed Sep 18, 2024
1 parent 09960c1 commit 0392522
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
1 change: 1 addition & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def __init__(
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def add_node_subscriptions(self, sessionId, node_subscriptions):

self._post_json(
f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions
)
Expand Down
16 changes: 9 additions & 7 deletions daliuge-engine/dlg/manager/composite_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def __init__(
# This list is different from the dmHosts, which are the machines that
# are directly managed by this manager (which in turn could manage more
# machines)
self._use_dmHosts = False
self.use_dmHosts = False
self._nodes = []

self.startDMChecker()
Expand Down Expand Up @@ -237,19 +237,19 @@ def removeDmHost(self, host_str):

@property
def nodes(self):
if self._use_dmHosts:
if self.use_dmHosts:
return [str(n) for n in self._dmHosts[:]]
else:
return self._nodes

def add_node(self, node):
if self._use_dmHosts:
if self.use_dmHosts:
return self._dmHosts.append(Node(node))
else:
self._nodes.append(node)

def remove_node(self, node):
if self._use_dmHosts:
if self.use_dmHosts:
self._dmHosts.remove(Node(node))
else:
self._nodes.remove(node)
Expand Down Expand Up @@ -296,6 +296,8 @@ def _do_in_host(self, action, sessionId, exceptions, f, collect, port, iterable)
# if ":" in host:
# host, port = host.split(":")
# port = int(port)
if isinstance(host, str):
host = Node(host)

try:
with self.dmAt(host) as dm:
Expand Down Expand Up @@ -448,8 +450,8 @@ def addGraphSpec(self, sessionId, graphSpec):
for rel in inter_partition_rels:
# rhn = self._graph[rel.rhs]["node"].split(":")[0]
# lhn = self._graph[rel.lhs]["node"].split(":")[0]
rhn = Node(self._graph[rel.rhs]["node"])
lhn = Node(self._graph[rel.lhs]["node"])
rhn = (self._graph[rel.rhs]["node"])
lhn = (self._graph[rel.lhs]["node"])
drop_rels[lhn][rhn].append(rel)
drop_rels[rhn][lhn].append(rel)

Expand Down Expand Up @@ -611,7 +613,7 @@ def __init__(self, dmHosts: list[str] = None, pkeyPath=None, dmCheckTimeout=10):
)

# In the case of the Data Island the dmHosts are the final nodes as well
self.use_dm_hosts = True
self.use_dmHosts = True
# self._nodes = dmHosts
logger.info("Created DataIslandManager for hosts: %r", self._dmHosts)

Expand Down
7 changes: 4 additions & 3 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ def linkGraphParts(self, sessionId):

@daliuge_aware
def add_node_subscriptions(self, sessionId):
# TODO translate node information here
logger.debug("NM REST call: add_subscriptions %s", bottle.request.json)
if bottle.request.content_type != "application/json":
bottle.response.status = 415
Expand All @@ -453,8 +452,10 @@ def add_node_subscriptions(self, sessionId):
self.dm.add_node_subscriptions(sessionId, subscriptions)

def _parse_subscriptions(self, json_request):

return [Node(n) for n in json_request]
relationships = {}
for host, droprels in json_request.items():
relationships[Node(host)] = droprels
return relationships

@daliuge_aware
def trigger_drops(self, sessionId):
Expand Down

0 comments on commit 0392522

Please sign in to comment.