From f7abaf199c94f34cb07c0dfe296dd4459cf57ba8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 21:46:01 -0400 Subject: [PATCH 01/14] Drop runner related hooks --- pysipp/hookspec.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pysipp/hookspec.py b/pysipp/hookspec.py index 5bc9e57..ddec3a5 100644 --- a/pysipp/hookspec.py +++ b/pysipp/hookspec.py @@ -65,18 +65,3 @@ def pysipp_conf_scen(agents, scen): socket arguments. It it the recommended hook for applying a default scenario configuration. """ - - -@hookspec(firstresult=True) -def pysipp_new_runner(): - """Create and return a runner instance to be used for invoking - multiple SIPp commands. The runner must be callable and support both a - `block` and `timeout` kwarg. - """ - - -@hookspec(firstresult=True) -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """Perform steps to execute all SIPp commands usually by calling a - preconfigured command launcher/runner. - """ From 6ca3d490ed355272d75b67dcae6d62139cebdc93 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 21:47:12 -0400 Subject: [PATCH 02/14] Use `trio` for process and scenario launching! After attempting to find an OS portable way to spawn subprocesses using the stdlib and coming out unsatisfied, I've decided use the new subprocess launching support in `trio`! This will of course require that the project moves to python 3.6+ giving us access to a lot of neat features of modern python including async/await support and adherence to the structured concurrency principles prominent in `trio`. It turns out this is a good fit since SIPp already has a built in cancellation mechanism via the SIGUSR1 signal. There's a lot of "core" changes to go over in this commit: - drop the "run protocol" and "runner creation" related hooks since they really shouldn't be overridden until there's some need for it and it's likely smarter to keep those "machinery" details strictly internal for now - the run "protocol" has now been relegated to an async function: `pysipp.launch.run_all_agents()` - many routines have been converted to async functions particularly at the runner (`pysipp.TrioRunner.run()`, `.get()`) and scenario (`pysipp.Scenario.arun()`) levels allowing us to expose both a sync and async interface for running subprocesses / agents - drop all the epoll/select loop stuff as this is entirely delegated to `trio.open_process()` and it's underlying machinery and APIs Resolves #53 --- pysipp/__init__.py | 60 +------------ pysipp/agent.py | 61 +++++++------ pysipp/launch.py | 210 ++++++++++++++++++++++++--------------------- pysipp/report.py | 3 +- 4 files changed, 154 insertions(+), 180 deletions(-) diff --git a/pysipp/__init__.py b/pysipp/__init__.py index 440090f..445613b 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -14,17 +14,17 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Authors : Tyler Goodlet + """ -pysipp - a python wrapper for launching SIPp +pysipp - a Python wrapper for launching SIPp + """ import sys from os.path import dirname from . import agent -from . import launch from . import netplug from . import plugin -from . import report from .agent import client from .agent import server from .load import iter_scen_dirs @@ -202,59 +202,5 @@ def pysipp_conf_scen(agents, scen): ua.rtp_echo = True -@plugin.hookimpl -def pysipp_new_runner(): - """Provision and assign a default cmd runner""" - return launch.PopenRunner() - - -@plugin.hookimpl -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """ "Run all rendered commands with the provided runner or the built-in - PopenRunner which runs commands locally. - """ - # use provided runner or default provided by hook - runner = runner or plugin.mng.hook.pysipp_new_runner() - agents = scen.prepare() - - def finalize(cmds2procs=None, timeout=180, raise_exc=True): - """Wait for all remaining agents in the scenario to finish executing - and perform error and logfile reporting. - """ - cmds2procs = cmds2procs or runner.get(timeout=timeout) - agents2procs = list(zip(agents, cmds2procs.values())) - msg = report.err_summary(agents2procs) - if msg: - # report logs and stderr - report.emit_logfiles(agents2procs) - if raise_exc: - # raise RuntimeError on agent failure(s) - # (HINT: to rerun type `scen()` from the debugger) - raise SIPpFailure(msg) - - return cmds2procs - - try: - # run all agents (raises RuntimeError on timeout) - cmds2procs = runner( - (ua.render() for ua in agents), block=block, timeout=timeout - ) - except launch.TimeoutError: # sucessful timeout - cmds2procs = finalize(timeout=0, raise_exc=False) - if raise_exc: - raise - else: - # async - if not block: - # XXX async run must bundle up results for later processing - scen.finalize = finalize - return finalize - - # sync - finalize(cmds2procs, raise_exc=raise_exc) - - return runner - - # register the default hook set plugin.mng.register(sys.modules[__name__]) diff --git a/pysipp/agent.py b/pysipp/agent.py index fe1b4d8..75c84b4 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -6,14 +6,17 @@ import tempfile from collections import namedtuple from collections import OrderedDict +from functools import partial from copy import deepcopy from os import path - from distutils import spawn +import trio + from . import command from . import plugin from . import utils +from . import launch log = utils.get_logger() @@ -66,8 +69,13 @@ def name(self): ipcaddr = tuple_property(("ipc_host", "ipc_port")) call_load = tuple_property(("rate", "limit", "call_count")) - def __call__( - self, block=True, timeout=180, runner=None, raise_exc=True, **kwargs + def __call__(self, *args, **kwargs): + return self.run(*args, **kwargs) + + def run( + self, + timeout=180, + **kwargs ): # create and configure a temp scenario @@ -76,16 +84,7 @@ def __call__( confpy=None, scenkwargs={}, ) - # run the standard protocol - # (attach allocted runner for reuse/post-portem) - return plugin.mng.hook.pysipp_run_protocol( - scen=scen, - block=block, - timeout=timeout, - runner=runner, - raise_exc=raise_exc, - **kwargs - ) + return scen.run(timeout=timeout, **kwargs) def is_client(self): return "uac" in self.name.lower() @@ -277,6 +276,9 @@ def __init__( confpy=None, enable_screen_file=True, ): + # placeholder for process "runner" + self._runner = None + # agents iterable in launch-order self._agents = agents ua_attrs = UserAgent.keys() @@ -452,21 +454,30 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs): self.prepare(agents), self._defaults, confpy=self.mod ) - def __call__( + async def arun( self, - agents=None, - block=True, timeout=180, runner=None, - raise_exc=True, - copy_agents=False, + ): + agents = self.prepare() + runner = runner or launch.TrioRunner() + + return await launch.run_all_agents(runner, agents, timeout=timeout) + + def run( + self, + timeout=180, **kwargs ): - return plugin.mng.hook.pysipp_run_protocol( - scen=self, - block=block, - timeout=timeout, - runner=runner, - raise_exc=raise_exc, - **kwargs + """Run scenario blocking to completion.""" + return trio.run( + partial( + self.arun, + timeout=timeout, + **kwargs + ) ) + + def __call__(self, *args, **kwargs): + # TODO: deprecation warning here + return self.run(*args, **kwargs) diff --git a/pysipp/launch.py b/pysipp/launch.py index d6098cd..1f3938a 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -1,12 +1,9 @@ """ Launchers for invoking SIPp user agents """ -import os -import select import shlex import signal import subprocess -import threading import time from collections import namedtuple from collections import OrderedDict @@ -14,6 +11,10 @@ from . import utils +import trio + +from . import report + log = utils.get_logger() Streams = namedtuple("Streams", "stdout stderr") @@ -23,31 +24,30 @@ class TimeoutError(Exception): "SIPp process timeout exception" -class PopenRunner(object): - """Run a sequence of SIPp agents asynchronously. If any process terminates - with a non-zero exit code, immediately kill all remaining processes and - collect std streams. +class SIPpFailure(RuntimeError): + """SIPp commands failed + """ + - Adheres to an interface similar to `multiprocessing.pool.AsyncResult`. +class TrioRunner(object): + """Run a sequence of SIPp cmds asynchronously. If any process terminates + with a non-zero exit code, immediately cancel all remaining processes and + collect std streams. """ def __init__( self, - subprocmod=subprocess, - osmod=os, - poller=select.epoll, ): - # these could optionally be rpyc proxy objs - self.spm = subprocmod - self.osm = osmod - self.poller = poller() - # collector thread placeholder - self._waiter = None # store proc results self._procs = OrderedDict() - def __call__(self, cmds, block=True, rate=300, **kwargs): - if self._waiter and self._waiter.is_alive(): + async def run( + self, + cmds, + rate=300, + **kwargs + ): + if self.is_alive(): raise RuntimeError( "Not all processes from a prior run have completed" ) @@ -55,80 +55,78 @@ def __call__(self, cmds, block=True, rate=300, **kwargs): raise RuntimeError( "Process results have not been cleared from previous run" ) - sp = self.spm - os = self.osm - DEVNULL = open(os.devnull, "wb") - fds2procs = OrderedDict() - # run agent commands in sequence for cmd in cmds: - log.debug('launching cmd:\n"{}"\n'.format(cmd)) - proc = sp.Popen(shlex.split(cmd), stdout=DEVNULL, stderr=sp.PIPE) - fd = proc.stderr.fileno() - log.debug("registering fd '{}' for pid '{}'".format(fd, proc.pid)) - fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + log.debug( + "launching cmd:\n\"{}\"\n".format(cmd)) + proc = await trio.open_process( + shlex.split(cmd), + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE + ) + self._procs[cmd] = proc + # limit launch rate time.sleep(1.0 / rate) - # launch waiter - self._waiter = threading.Thread(target=self._wait, args=(fds2procs,)) - self._waiter.daemon = True - self._waiter.start() - - return self.get(**kwargs) if block else self._procs + return self._procs - def _wait(self, fds2procs): - log.debug("started waiter for procs {}".format(fds2procs)) + async def get(self, timeout=180): + """Block up to `timeout` seconds for all agents to complete. + Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + """ signalled = None - left = len(fds2procs) - collected = 0 - while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: - collected += 1 - proc = fds2procs[fd] - # attach streams so they can be read more then once - log.debug("collecting streams for {}".format(proc)) - proc.streams = Streams(*proc.communicate()) # timeout=2)) - if proc.returncode != 0 and not signalled: + + # taken mostly verbatim from ``trio.run_process()`` + async def read_output(stream): + chunks = [] + async with stream: + try: + while True: + chunk = await stream.receive_some(32768) + if not chunk: + break + chunks.append(chunk) + except trio.ClosedResourceError: + pass + + return b"".join(chunks) + + async def wait_on_proc(proc): + nonlocal signalled + async with proc as proc: + rc = await proc.wait() + if rc != 0 and not signalled: # stop all other agents if there is a failure signalled = self.stop() - log.debug("terminating waiter thread") + # collect stderr output + proc.stderr_output = await read_output(proc.stderr) + + try: + with trio.fail_after(timeout): + async with trio.open_nursery() as n: + for cmd, proc in self._procs.items(): + # async wait on each process to complete + n.start_soon(wait_on_proc, proc) + + return self._procs + + except trio.TooSlowError: + # kill all SIPp processes + signalled = self.stop() + # all procs were killed by SIGUSR1 + raise TimeoutError( + "pids '{}' failed to complete after '{}' seconds".format( + pformat([p.pid for p in signalled.values()]), timeout) + ) - def get(self, timeout=180): - """Block up to `timeout` seconds for all agents to complete. - Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + def iterprocs(self): + """Iterate all processes which are still alive yielding + (cmd, proc) pairs """ - if self._waiter.is_alive(): - self._waiter.join(timeout=timeout) - - if self._waiter.is_alive(): - # kill them mfin SIPps - signalled = self.stop() - self._waiter.join(timeout=10) - - if self._waiter.is_alive(): - # try to stop a few more times - for _ in range(3): - signalled = self.stop() - self._waiter.join(timeout=1) - - if self._waiter.is_alive(): - # some procs failed to terminate via signalling - raise RuntimeError("Unable to kill all agents!?") - - # all procs were killed by SIGUSR1 - raise TimeoutError( - "pids '{}' failed to complete after '{}' seconds".format( - pformat([p.pid for p in signalled.values()]), timeout - ) - ) - - return self._procs + return ((cmd, proc) for cmd, proc in self._procs.items() + if proc and proc.poll() is None) def stop(self): """Stop all agents with SIGUSR1 as per SIPp's signal handling""" @@ -150,25 +148,43 @@ def _signalall(self, signum): signalled[cmd] = proc return signalled - def iterprocs(self): - """Iterate all processes which are still alive yielding - (cmd, proc) pairs - """ - return ( - (cmd, proc) - for cmd, proc in self._procs.items() - if proc and proc.poll() is None - ) - def is_alive(self): """Return bool indicating whether some agents are still alive""" return any(self.iterprocs()) - def ready(self): - """Return bool indicating whether all agents have completed""" - return not self.is_alive() - def clear(self): - """Clear all processes from the last run""" - assert self.ready(), "Not all processes have completed" + """Clear all processes from the last run + """ + assert not self.is_alive(), "Not all processes have completed" self._procs.clear() + + +async def run_all_agents(runner, agents, timeout=180): + """Run a sequencec of agents using a ``TrioRunner``. + """ + async def finalize(): + # this might raise TimeoutError + cmds2procs = await runner.get(timeout=timeout) + agents2procs = list(zip(agents, cmds2procs.values())) + msg = report.err_summary(agents2procs) + if msg: + # report logs and stderr + await report.emit_logfiles(agents2procs) + raise SIPpFailure(msg) + + return cmds2procs + + try: + await runner.run( + (ua.render() for ua in agents), + timeout=timeout + ) + await finalize() + return runner + except TimeoutError as terr: + # print error logs even when we timeout + try: + await finalize() + except SIPpFailure as err: + assert 'exit code -9' in str(err) + raise terr diff --git a/pysipp/report.py b/pysipp/report.py index ef76c22..9f36c1c 100644 --- a/pysipp/report.py +++ b/pysipp/report.py @@ -17,6 +17,7 @@ 99: "Normal exit without calls processed", -1: "Fatal error", -2: "Fatal error binding a socket", + -9: "Signalled to stop with SIGUSR1", -10: "Signalled to stop with SIGUSR1", 254: "Connection Error: socket already in use", 255: "Command or syntax error: check stderr output", @@ -52,7 +53,7 @@ def emit_logfiles(agents2procs, level="warning", max_lines=100): # print stderr emit( "stderr for '{}' @ {}\n{}\n".format( - ua.name, ua.srcaddr, proc.streams.stderr + ua.name, ua.srcaddr, proc.stderr_output ) ) # FIXME: no idea, but some logs are not being printed without this From ef1a19bac7efb56a694975fe28565ebc260121db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:16:23 -0400 Subject: [PATCH 03/14] Adjust tests to match new `trio` apis --- tests/test_agent.py | 11 +++++++++-- tests/test_launcher.py | 33 ++++++++++++++++++++++----------- tests/test_scenario.py | 3 +++ tests/test_stack.py | 2 +- 4 files changed, 35 insertions(+), 14 deletions(-) create mode 100644 tests/test_scenario.py diff --git a/tests/test_agent.py b/tests/test_agent.py index 0b7b551..764fe78 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -124,9 +124,16 @@ def test_server(): ) def test_failures(ua, retcode, kwargs, exc): """Test failure cases for all types of agents""" + runner = launch.TrioRunner() + # run it without raising - runner = ua(raise_exc=False, **kwargs) - cmds2procs = runner.get(timeout=0) + if exc: + with pytest.raises(exc): + ua(runner=runner, **kwargs) + + # runner = ua(raise_exc=False, **kwargs) + + cmds2procs = runner._procs assert not runner.is_alive() assert len(list(runner.iterprocs())) == 0 # tests transparency of the defaults config pipeline diff --git a/tests/test_launcher.py b/tests/test_launcher.py index 021ab05..de43f03 100644 --- a/tests/test_launcher.py +++ b/tests/test_launcher.py @@ -1,15 +1,22 @@ """ Basic agent/scenario launching + """ from pysipp.agent import client from pysipp.agent import server -from pysipp.launch import PopenRunner +from pysipp.launch import ( + TrioRunner, + run_all_agents, + SIPpFailure, +) + +import trio +import pytest -def run_blocking(*agents): - runner = PopenRunner() +def run_blocking(runner, agents): assert not runner.is_alive() - runner(ua.render() for ua in agents) + trio.run(run_all_agents, runner, agents) assert not runner.is_alive() return runner @@ -23,24 +30,28 @@ def test_agent_fails(): uac.recv_timeout = 1 # avoids SIPp issue #176 uac.call_count = 1 # avoids SIPp issue #176 - runner = run_blocking(uas, uac) + runner = TrioRunner() + with pytest.raises(SIPpFailure): + run_blocking(runner, (uas, uac)) # fails due to invalid ip - uasproc = runner.get(timeout=0)[uas.render()] - assert uasproc.streams.stderr + uasproc = runner._procs[uas.render()] + print(uasproc.stderr_output) + assert uasproc.stderr_output assert uasproc.returncode == 255, uasproc.streams.stderr # killed by signal - uacproc = runner.get(timeout=0)[uac.render()] - # assert not uacproc.streams.stderr # sometimes this has a log msg? + uacproc = runner._procs[uac.render()] + # assert not uacproc.stderr_output # sometimes this has a log msg? ret = uacproc.returncode # killed by SIGUSR1 or terminates before it starts (racy) assert ret == -10 or ret == 0 def test_default_scen(default_agents): - runner = run_blocking(*default_agents) + runner = TrioRunner() + runner = run_blocking(runner, default_agents) # both agents should be successful - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert not proc.returncode diff --git a/tests/test_scenario.py b/tests/test_scenario.py new file mode 100644 index 0000000..60c7d65 --- /dev/null +++ b/tests/test_scenario.py @@ -0,0 +1,3 @@ +''' +pysipp.agent module tests +''' diff --git a/tests/test_stack.py b/tests/test_stack.py index 15d6d7c..a84ede0 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -77,7 +77,7 @@ def test_sync_run(scenwalk): synchronous mode""" for path, scen in scenwalk(): runner = scen(timeout=6) - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert proc.returncode == 0 From 363ce2f9a8de0c41ea9b6ff25b28d4a257373bc6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:20:18 -0400 Subject: [PATCH 04/14] Drop py2.7 from CI --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1acac4a..8454dd0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,6 @@ cache: - pip python: - - 2.7 - 3.5 - 3.6 # - 3.7 From 84e4e123209352f33a49472feb81b265c4353801 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:19:58 -0400 Subject: [PATCH 05/14] Prepare setup script for 1.0 release --- setup.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 3aa1c21..e747b5b 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version="0.1.0", + version='1.0.0.dev', description="A SIPp scenario launcher", long_description=readme, long_description_content_type="text/markdown", @@ -36,7 +36,10 @@ url="https://github.com/SIPp/pysipp", platforms=["linux"], packages=["pysipp", "pysipp.cli"], - install_requires=["pluggy>=1.0.0"], + install_requires=[ + "pluggy>=1.0.0", + 'trio>=0.11.0', + ], tests_require=["pytest"], entry_points={ "console_scripts": ["sippfmt=pysipp.cli.sippfmt:main"], @@ -46,7 +49,7 @@ "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v2 (GPLv2)", "Operating System :: POSIX :: Linux", - "Programming Language :: Python :: 2.7", + 'Programming Language :: Python :: 3.6', "Topic :: Software Development", "Topic :: Software Development :: Testing", "Topic :: Software Development :: Quality Assurance", From ac24688b8f89f8ae5ff1ffe1744dd6a155c42abd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 12 Jul 2019 17:35:20 -0400 Subject: [PATCH 06/14] trio.open_process() isn't released yet --- pysipp/__init__.py | 4 +++- pysipp/launch.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pysipp/__init__.py b/pysipp/__init__.py index 445613b..c890200 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -107,7 +107,9 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, **scenkwargs): # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( - agents=[uas, uac], confpy=None, scenkwargs=scenkwargs + agents=[uas, uac], + confpy=None, + scenkwargs=scenkwargs, ) if proxyaddr: diff --git a/pysipp/launch.py b/pysipp/launch.py index 1f3938a..455dfd1 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -59,7 +59,8 @@ async def run( for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - proc = await trio.open_process( + # proc = await trio.open_process( + proc = trio.Process( shlex.split(cmd), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE From 22ba429a17231dfed4a375769b0d042ba483d0e1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 12 Feb 2021 10:39:21 -0500 Subject: [PATCH 07/14] Be explicit on dev version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e747b5b..7238fa8 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version='1.0.0.dev', + version='1.0.0.dev0', description="A SIPp scenario launcher", long_description=readme, long_description_content_type="text/markdown", From cd9fd0c5f7572f1c3d4295b3f310519d9fce19ac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 13 Feb 2021 11:56:02 -0500 Subject: [PATCH 08/14] Update readme to reflect python version requirement --- README.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6967e5c..291dbd2 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,12 @@ but (want to) use it for automated testing because it gets the job done... ## What is it? -Python configuring and launching the infamous -[SIPp](http://sipp.sourceforge.net/) using an api inspired by -[requests](http://docs.python-requests.org/) +Python 3.6+ configuring and launching the infamous +[SIPp](http://sipp.sourceforge.net/) using a simple API to +generate commands and spawn them in subprocesses. + +Command subprocess launching now uses +[`trio`](https://trio.readthedocs.io/en/stable/reference-io.html#spawning-subprocesses)! ## It definitely lets you @@ -25,7 +28,7 @@ Python configuring and launching the infamous ## Basic Usage -Launching the default UAC scenario is a short line: +Launching the default UAC script is a short line: ```python import pysipp From 15aeb382e33631c6913e39fc9df0037645a075b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 18 Dec 2022 16:25:20 -0500 Subject: [PATCH 09/14] Raise a runtime error when `sipp` binary can not beinvoked --- pysipp/agent.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pysipp/agent.py b/pysipp/agent.py index 75c84b4..fc34c65 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -173,8 +173,12 @@ def ua(logdir=None, **kwargs): """Default user agent factory. Returns a command string instance with sensible default arguments. """ + bin_path = spawn.find_executable("sipp") + if not bin_path: + raise RuntimeError("SIPp binary (sipp) does not seem to be accessible") + defaults = { - "bin_path": spawn.find_executable("sipp"), + "bin_path": bin_path, } # drop any built-in scen if a script file is provided if "scen_file" in kwargs: @@ -462,7 +466,11 @@ async def arun( agents = self.prepare() runner = runner or launch.TrioRunner() - return await launch.run_all_agents(runner, agents, timeout=timeout) + return await launch.run_all_agents( + runner, + agents, + timeout=timeout, + ) def run( self, From 995e8050f9a92f60718e92b616168b59a0f6f613 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 18 Dec 2022 16:26:09 -0500 Subject: [PATCH 10/14] Use the new `trio.Nursery.start(trio.run_process, ..)` API --- pysipp/launch.py | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/pysipp/launch.py b/pysipp/launch.py index 455dfd1..332a917 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -7,6 +7,7 @@ import time from collections import namedtuple from collections import OrderedDict +from functools import partial from pprint import pformat from . import utils @@ -29,7 +30,7 @@ class SIPpFailure(RuntimeError): """ -class TrioRunner(object): +class TrioRunner: """Run a sequence of SIPp cmds asynchronously. If any process terminates with a non-zero exit code, immediately cancel all remaining processes and collect std streams. @@ -43,6 +44,7 @@ def __init__( async def run( self, + nursery, cmds, rate=300, **kwargs @@ -58,12 +60,16 @@ async def run( # run agent commands in sequence for cmd in cmds: log.debug( - "launching cmd:\n\"{}\"\n".format(cmd)) - # proc = await trio.open_process( - proc = trio.Process( - shlex.split(cmd), - stdout=subprocess.DEVNULL, - stderr=subprocess.PIPE + "launching cmd:\n\"{}\"\n".format(cmd) + ) + + proc = await nursery.start( + partial( + trio.run_process, + shlex.split(cmd), + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE + ) ) self._procs[cmd] = proc @@ -160,7 +166,12 @@ def clear(self): self._procs.clear() -async def run_all_agents(runner, agents, timeout=180): +async def run_all_agents( + runner, + agents, + timeout=180, + +) -> TrioRunner: """Run a sequencec of agents using a ``TrioRunner``. """ async def finalize(): @@ -176,12 +187,15 @@ async def finalize(): return cmds2procs try: - await runner.run( - (ua.render() for ua in agents), - timeout=timeout - ) - await finalize() - return runner + async with trio.open_nursery() as nurse: + await runner.run( + nurse, + (ua.render() for ua in agents), + timeout=timeout + ) + await finalize() + return runner + except TimeoutError as terr: # print error logs even when we timeout try: From fb73483e748929304d9e406aa65874e9003acc0b Mon Sep 17 00:00:00 2001 From: Victor Seva Date: Tue, 20 Dec 2022 00:05:51 +0100 Subject: [PATCH 11/14] trio: don't check return value of cmd by trio.run_process --- pysipp/launch.py | 45 ++++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/pysipp/launch.py b/pysipp/launch.py index 332a917..fd115c7 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -10,11 +10,10 @@ from functools import partial from pprint import pformat -from . import utils - import trio from . import report +from . import utils log = utils.get_logger() @@ -26,8 +25,7 @@ class TimeoutError(Exception): class SIPpFailure(RuntimeError): - """SIPp commands failed - """ + """SIPp commands failed""" class TrioRunner: @@ -42,13 +40,7 @@ def __init__( # store proc results self._procs = OrderedDict() - async def run( - self, - nursery, - cmds, - rate=300, - **kwargs - ): + async def run(self, nursery, cmds, rate=300, **kwargs): if self.is_alive(): raise RuntimeError( "Not all processes from a prior run have completed" @@ -59,16 +51,15 @@ async def run( ) # run agent commands in sequence for cmd in cmds: - log.debug( - "launching cmd:\n\"{}\"\n".format(cmd) - ) + log.debug('launching cmd:\n"{}"\n'.format(cmd)) proc = await nursery.start( partial( trio.run_process, shlex.split(cmd), stdout=subprocess.DEVNULL, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, + check=False, ) ) self._procs[cmd] = proc @@ -125,15 +116,19 @@ async def wait_on_proc(proc): # all procs were killed by SIGUSR1 raise TimeoutError( "pids '{}' failed to complete after '{}' seconds".format( - pformat([p.pid for p in signalled.values()]), timeout) + pformat([p.pid for p in signalled.values()]), timeout + ) ) def iterprocs(self): """Iterate all processes which are still alive yielding (cmd, proc) pairs """ - return ((cmd, proc) for cmd, proc in self._procs.items() - if proc and proc.poll() is None) + return ( + (cmd, proc) + for cmd, proc in self._procs.items() + if proc and proc.poll() is None + ) def stop(self): """Stop all agents with SIGUSR1 as per SIPp's signal handling""" @@ -160,8 +155,7 @@ def is_alive(self): return any(self.iterprocs()) def clear(self): - """Clear all processes from the last run - """ + """Clear all processes from the last run""" assert not self.is_alive(), "Not all processes have completed" self._procs.clear() @@ -170,10 +164,9 @@ async def run_all_agents( runner, agents, timeout=180, - ) -> TrioRunner: - """Run a sequencec of agents using a ``TrioRunner``. - """ + """Run a sequencec of agents using a ``TrioRunner``.""" + async def finalize(): # this might raise TimeoutError cmds2procs = await runner.get(timeout=timeout) @@ -189,9 +182,7 @@ async def finalize(): try: async with trio.open_nursery() as nurse: await runner.run( - nurse, - (ua.render() for ua in agents), - timeout=timeout + nurse, (ua.render() for ua in agents), timeout=timeout ) await finalize() return runner @@ -201,5 +192,5 @@ async def finalize(): try: await finalize() except SIPpFailure as err: - assert 'exit code -9' in str(err) + assert "exit code -9" in str(err) raise terr From 44dffbddc234abefe094616bb4875faa5e5a9035 Mon Sep 17 00:00:00 2001 From: Victor Seva Date: Tue, 20 Dec 2022 00:08:07 +0100 Subject: [PATCH 12/14] trio: remove deprecated block argument at __call__ --- pysipp/agent.py | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/pysipp/agent.py b/pysipp/agent.py index fc34c65..7e460fe 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -6,17 +6,17 @@ import tempfile from collections import namedtuple from collections import OrderedDict -from functools import partial from copy import deepcopy +from functools import partial from os import path -from distutils import spawn import trio +from distutils import spawn from . import command +from . import launch from . import plugin from . import utils -from . import launch log = utils.get_logger() @@ -72,11 +72,7 @@ def name(self): def __call__(self, *args, **kwargs): return self.run(*args, **kwargs) - def run( - self, - timeout=180, - **kwargs - ): + def run(self, timeout=180, **kwargs): # create and configure a temp scenario scen = plugin.mng.hook.pysipp_conf_scen_protocol( @@ -472,20 +468,11 @@ async def arun( timeout=timeout, ) - def run( - self, - timeout=180, - **kwargs - ): + def run(self, timeout=180, **kwargs): """Run scenario blocking to completion.""" - return trio.run( - partial( - self.arun, - timeout=timeout, - **kwargs - ) - ) + return trio.run(partial(self.arun, timeout=timeout, **kwargs)) def __call__(self, *args, **kwargs): # TODO: deprecation warning here + kwargs.pop("block", None) return self.run(*args, **kwargs) From 8146211dbeb0f59941050fbc7d306e60c8cc2a7c Mon Sep 17 00:00:00 2001 From: Victor Seva Date: Tue, 20 Dec 2022 00:09:32 +0100 Subject: [PATCH 13/14] trio: don't use await for report.emit_logfiles() --- pysipp/launch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pysipp/launch.py b/pysipp/launch.py index fd115c7..f7a1c63 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -174,7 +174,7 @@ async def finalize(): msg = report.err_summary(agents2procs) if msg: # report logs and stderr - await report.emit_logfiles(agents2procs) + report.emit_logfiles(agents2procs) raise SIPpFailure(msg) return cmds2procs From 2e93c9a540e0f953cdf1de73c62169904a4ad395 Mon Sep 17 00:00:00 2001 From: Victor Seva Date: Tue, 20 Dec 2022 00:10:37 +0100 Subject: [PATCH 14/14] trio: fix test_server(), add new returncode for timeout --- tests/test_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_agent.py b/tests/test_agent.py index 764fe78..213a667 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -118,7 +118,7 @@ def test_server(): # test client failure on bad remote destination (agent.client(destaddr=("99.99.99.99", 5060)), 1, {}, RuntimeError), # test if server times out it is signalled - (agent.server(), 0, {"timeout": 1}, launch.TimeoutError), + (agent.server(), -9, {"timeout": 1}, launch.TimeoutError), ], ids=["ua", "uac", "uas"], )