Skip to content

Commit

Permalink
test/infamy: update get_data
Browse files Browse the repository at this point in the history
Refactor NETCONF functions (get_data) to accommodate the upgrade
in the netconf_client library within the Docker container image.

Fixes #390
  • Loading branch information
axkar committed Oct 3, 2024
1 parent f453504 commit 143e475
Showing 1 changed file with 50 additions and 58 deletions.
108 changes: 50 additions & 58 deletions test/infamy/netconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,26 +204,40 @@ def _modules_in_xpath(self, xpath):
return list(filter(lambda m: m["name"] in modnames,
self.modules.values()))

def _build_xpath_filter(self, xpath, get_data_xpath=False):
"""Helper function to build the XPath filter with the necessary xmlns."""
if xpath:
xmlns = " ".join([f"xmlns:{m['name']}=\"{m['namespace']}\""
for m in self._modules_in_xpath(xpath)])
if get_data_xpath is True:
return f'<xpath-filter {xmlns}>{xpath}</xpath-filter>'
else:
return f"<filter type=\"xpath\" select=\"{xpath}\" {xmlns} />"
return None

def _parse_response(self, response, parse):
"""Helper function to handle XML response parsing."""
if not response:
return None

if not parse:
return response.raw_reply

if len(response.data_ele) == 0:
return None

cfg = lxml.etree.tostring(response.data_ele[0])
parsed_data = self.ly.parse_data_mem(cfg, "xml", parse_only=True)

return parsed_data

def _ncc_make_rpc(self, guts, msg_id=None):
if not msg_id:
msg_id = uuid.uuid4()

return '<rpc message-id="{id}" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">{guts}</rpc>' \
.format(guts=guts, id=msg_id).encode("utf-8")

def _ncc_get_data_rpc(self, datastore="operational", filter=None, msg_id=None):
pieces = []
pieces.append('<get-data xmlns="urn:ietf:params:xml:ns:yang:ietf-netconf-nmda">')
pieces.append(f'<datastore xmlns:ds="urn:ietf:params:xml:ns:yang:ietf-datastores">'
f'ds:{datastore}'
f'</datastore>')
if filter:
xmlns = " ".join([f"xmlns:{m['name']}=\"{m['namespace']}\""
for m in self._modules_in_xpath(filter)])
pieces.append(f'<xpath-filter {xmlns}>{filter}</xpath-filter>')
pieces.append("</get-data>")
return self._ncc_make_rpc("".join(pieces), msg_id=msg_id)

def copy(self, source, target):
cmd = f'''<copy-config>
<target>
Expand All @@ -241,39 +255,10 @@ def reboot(self):
"system-restart": {}
})

def _get(self, xpath, getter, parse=True):
# Figure out which modules we are referencing
mods = self._modules_in_xpath(xpath)

# Fetch the data
xmlns = " ".join([f"xmlns:{m['name']}=\"{m['namespace']}\""
for m in mods])
filt = f"<filter type=\"xpath\" select=\"{xpath}\" {xmlns} />"
# pylint: disable=c-extension-no-member
data = getter(filter=filt).data_ele[0]
if parse==False:
return data
cfg = lxml.etree.tostring(data)
return self.ly.parse_data_mem(cfg, "xml", parse_only=True)

def _get_data(self, xpath, parse=True):
"""Local member wrapper for netconf-client <get-data> RPC"""
# pylint: disable=protected-access
(raw, ele) = self.ncc._send_rpc(self._ncc_get_data_rpc(filter=xpath))
data = NccGetDataReply(raw, ele)
if(parse==False):
return data

if len(data.data_ele) == 0:
return None

# pylint: disable=c-extension-no-member
cfg = lxml.etree.tostring(data.data_ele[0])
return self.ly.parse_data_mem(cfg, "xml", parse_only=True)

def get(self, xpath, parse=True):
"""RPC <get> (legacy NETCONF) fetches config:false data"""
return self._get(xpath, self.ncc.get, parse)
xpath_filter = self._build_xpath_filter(xpath)
response = self.ncc.get(filter=xpath_filter)
return self._parse_response(response, parse)

def get_dict(self, xpath):
"""Return Python dictionary of <get> RPC data"""
Expand All @@ -285,20 +270,19 @@ def get_dict(self, xpath):
return data.print_dict()

def get_data(self, xpath=None, parse=True):
"""RPC <get-data> to fetch operational data"""

if parse is False:
return self._get_data(xpath, parse)

data = self._get_data(xpath)
if not data:
return None

return data.print_dict()
xpath_filter = self._build_xpath_filter(xpath, get_data_xpath=True)
response = self.ncc.get_data(datastore="ds:operational", filter=xpath_filter)
parsed_data = self._parse_response(response, parse)

if parse and parsed_data:
return parsed_data.print_dict()

return parsed_data

def get_config(self, xpath):
"""Get NETCONF XML configuration for a given XPath"""
return self._get(xpath, self.ncc.get_config)
xpath_filter = self._build_xpath_filter(xpath)
response = self.ncc.get_config(source="running", filter=xpath_filter)
return self._parse_response(response, True)

def get_config_dict(self, xpath):
"""Get Python dictionary version of XML configuration"""
Expand Down Expand Up @@ -435,5 +419,13 @@ def get_current_time_with_offset(self):
unadjusted time.
"""
data = self.get_data("/ietf-system:system-state/clock", parse=False)
parsed_data = lxml.etree.fromstring(data)

xpath = './/{urn:ietf:params:xml:ns:yang:ietf-system}current-datetime'
return data.data_ele.find(xpath).text
current_datetime = parsed_data.find(xpath)

if current_datetime is not None:
return current_datetime.text
else:
raise ValueError("current-datetime element not found in the response")

0 comments on commit 143e475

Please sign in to comment.