From b423e8815fa61a207bd8e1885e71c9ec0afcce81 Mon Sep 17 00:00:00 2001 From: Martin Hunt Date: Wed, 11 May 2016 14:27:35 +0100 Subject: [PATCH 1/3] Handle when error is raised --- ariba/cluster.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/ariba/cluster.py b/ariba/cluster.py index 5968bfd0..c2d1ff86 100644 --- a/ariba/cluster.py +++ b/ariba/cluster.py @@ -132,14 +132,28 @@ def __init__(self, signal.signal(s, self._receive_signal) + atexit.register(self._atexit) + + def _atexit(self): + print('Error in cluster', self.name, '... Stopping!', file=sys.stderr, flush=True) + if self.log_fh is not None: + pyfastaq.utils.close(self.log_fh) + self.log_fh = None + if self.fail_file is not None: + with open(self.fail_file, 'w') as f: + pass + os._exit(1) + + def _receive_signal(self, signum, stack): - print('Signal', signum, 'received in cluster', self.name + '. Stopping!', file=sys.stderr, flush=True) + print('Signal', signum, 'received in cluster', self.name + '... Stopping!', file=sys.stderr, flush=True) if self.log_fh is not None: pyfastaq.utils.close(self.log_fh) self.log_fh = None if self.fail_file is not None: with open(self.fail_file, 'w') as f: pass + sys.exit(1) def _input_files_exist(self): @@ -159,7 +173,7 @@ def _set_up_input_files(self): try: os.mkdir(self.root_dir) except: - raise Error('Error making directory ' + seolf.root_dir) + raise Error('Error making directory ' + self.root_dir) self.read_store.get_reads(self.name, self.all_reads1, self.all_reads2) self.refdata.write_seqs_to_fasta(self.references_fa, self.reference_names) From 7467ec8b8b155145a1a069a0dd07b0055b40a342 Mon Sep 17 00:00:00 2001 From: Martin Hunt Date: Wed, 11 May 2016 14:28:31 +0100 Subject: [PATCH 2/3] Refactor code that handles signals to always clean tmp dir --- ariba/clusters.py | 106 +++++++++++++++++++++++++++------------------- 1 file changed, 62 insertions(+), 44 deletions(-) diff --git a/ariba/clusters.py b/ariba/clusters.py index 70d1e469..8e272b95 100644 --- a/ariba/clusters.py +++ b/ariba/clusters.py @@ -21,7 +21,7 @@ def _run_cluster(obj, verbose, clean, fails_dir): failed_clusters = os.listdir(fails_dir) if len(failed_clusters) > 0: - print('Other clusters failed. Stopping cluster', obj.name, file=sys.stderr) + print('Other clusters failed. Will not start cluster', obj.name, file=sys.stderr) return obj if verbose: @@ -125,6 +125,7 @@ def __init__(self, self.cluster_base_counts = {} # gene name -> number of bases self.pool = None self.fails_dir = os.path.join(self.outdir ,'.fails') + self.clusters_all_ran_ok = True for d in [self.outdir, self.logs_dir, self.fails_dir]: try: @@ -141,14 +142,26 @@ def __init__(self, if not os.path.exists(tmp_dir): raise Error('Temporary directory ' + tmp_dir + ' not found. Cannot continue') - self.tmp_dir = tempfile.mkdtemp(prefix='ariba.tmp.', dir=os.path.abspath(tmp_dir)) + if self.clean: + self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix='ariba.tmp.', dir=os.path.abspath(tmp_dir)) + self.tmp_dir = self.tmp_dir_obj.name + else: + self.tmp_dir_obj = None + self.tmp_dir = os.path.join(self.outdir, 'clusters') + try: + os.mkdir(self.tmp_dir) + except: + raise Error('Error making directory ' + self.tmp_dir) if self.verbose: print('Temporary directory:', self.tmp_dir) - wanted_signals = [signal.SIGABRT, signal.SIGINT, signal.SIGSEGV, signal.SIGTERM] - for s in wanted_signals: - signal.signal(s, self._receive_signal) + for i in [x for x in dir(signal) if x.startswith("SIG") and x not in {'SIGCHLD', 'SIGCLD'}]: + try: + signum = getattr(signal, i) + signal.signal(signum, self._receive_signal) + except: + pass def _stop_pool(self): @@ -163,11 +176,10 @@ def _stop_pool(self): def _emergency_stop(self): self._stop_pool() if self.clean: - if os.path.exists(self.tmp_dir): - try: - shutil.rmtree(self.tmp_dir) - except: - pass + try: + self.tmp_dir_obj.cleanup() + except: + pass def _receive_signal(self, signum, stack): @@ -373,13 +385,18 @@ def _init_and_run_clusters(self): extern_progs=self.extern_progs, )) + try: + if self.threads > 1: + self.pool = multiprocessing.Pool(self.threads) + cluster_list = self.pool.starmap(_run_cluster, zip(cluster_list, itertools.repeat(self.verbose), itertools.repeat(self.clean), itertools.repeat(self.fails_dir))) + else: + for c in cluster_list: + _run_cluster(c, self.verbose, self.clean, self.fails_dir) + except: + self.clusters_all_ran_ok = False - if self.threads > 1: - self.pool = multiprocessing.Pool(self.threads) - cluster_list = self.pool.starmap(_run_cluster, zip(cluster_list, itertools.repeat(self.verbose), itertools.repeat(self.clean), itertools.repeat(self.fails_dir))) - else: - for c in cluster_list: - _run_cluster(c, self.verbose, self.clean, self.fails_dir) + if len(os.listdir(self.fails_dir)) > 0: + self.clusters_all_ran_ok = False self.clusters = {c.name: c for c in cluster_list} @@ -430,19 +447,24 @@ def _clean(self): if self.clean: shutil.rmtree(self.fails_dir) - if self.verbose: - print('Deleting tmp directory', self.tmp_dir) - - if os.path.exists(self.tmp_dir): - shutil.rmtree(self.tmp_dir) + try: + self.tmp_dir_obj.cleanup() + except: + pass if self.verbose: print('Deleting Logs directory', self.logs_dir) - shutil.rmtree(self.logs_dir) + try: + shutil.rmtree(self.logs_dir) + except: + pass if self.verbose: print('Deleting reads store files', self.read_store.outfile + '[.tbi]') - self.read_store.clean() + try: + self.read_store.clean() + except: + pass else: if self.verbose: print('Not deleting anything because --noclean used') @@ -502,25 +524,24 @@ def _run(self): print('No reads mapped. Skipping all assemblies', flush=True) print('WARNING: no reads mapped to reference genes. Therefore no local assemblies will be run', file=sys.stderr) - failed_clusters = os.listdir(self.fails_dir) - if len(failed_clusters): - print('Failed clusters:', ', '.join(failed_clusters), file=sys.stderr) - else: - if self.verbose: - print('{:_^79}'.format(' Writing reports '), flush=True) - print('Making', self.report_file_all_tsv) - self._write_reports(self.clusters, self.report_file_all_tsv) + if not self.clusters_all_ran_ok: + raise Error('At least one cluster failed! Stopping...') - if self.verbose: - print('Making', self.report_file_filtered_prefix + '.tsv') - rf = report_filter.ReportFilter(infile=self.report_file_all_tsv) - rf.run(self.report_file_filtered_prefix) + if self.verbose: + print('{:_^79}'.format(' Writing reports '), flush=True) + print('Making', self.report_file_all_tsv) + self._write_reports(self.clusters, self.report_file_all_tsv) - if self.verbose: - print() - print('{:_^79}'.format(' Writing fasta of assembled sequences '), flush=True) - print(self.catted_assembled_seqs_fasta) - self._write_catted_assembled_seqs_fasta(self.catted_assembled_seqs_fasta) + if self.verbose: + print('Making', self.report_file_filtered_prefix + '.tsv') + rf = report_filter.ReportFilter(infile=self.report_file_all_tsv) + rf.run(self.report_file_filtered_prefix) + + if self.verbose: + print() + print('{:_^79}'.format(' Writing fasta of assembled sequences '), flush=True) + print(self.catted_assembled_seqs_fasta) + self._write_catted_assembled_seqs_fasta(self.catted_assembled_seqs_fasta) clusters_log_file = os.path.join(self.outdir, 'log.clusters.gz') if self.verbose: @@ -534,10 +555,7 @@ def _run(self): print('{:_^79}'.format(' Cleaning files '), flush=True) self._clean() - if self.verbose: + if self.clusters_all_ran_ok and self.verbose: print('\nAll done!\n') - if len(failed_clusters): - raise Error('There were failed clusters: ' + ', '.join(failed_clusters)) - os.chdir(cwd) From 933f0d8eceae2c59ffc6fef41ea49498ce00df1f Mon Sep 17 00:00:00 2001 From: Martin Hunt Date: Wed, 11 May 2016 15:28:11 +0100 Subject: [PATCH 3/3] Stop test suite returning !=0 because of temp file cleaning --- ariba/cluster.py | 3 +-- ariba/tests/clusters_test.py | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ariba/cluster.py b/ariba/cluster.py index c2d1ff86..eb8331b9 100644 --- a/ariba/cluster.py +++ b/ariba/cluster.py @@ -119,6 +119,7 @@ def __init__(self, if unittest: self.log_fh = sys.stdout else: + atexit.register(self._atexit) self.log_fh = None if extern_progs is None: @@ -132,8 +133,6 @@ def __init__(self, signal.signal(s, self._receive_signal) - atexit.register(self._atexit) - def _atexit(self): print('Error in cluster', self.name, '... Stopping!', file=sys.stderr, flush=True) if self.log_fh is not None: diff --git a/ariba/tests/clusters_test.py b/ariba/tests/clusters_test.py index 27c74954..344bf0ce 100644 --- a/ariba/tests/clusters_test.py +++ b/ariba/tests/clusters_test.py @@ -33,7 +33,7 @@ def setUp(self): reads1 = os.path.join(data_dir, 'clusters_test_dummy_reads_1.fq') reads2 = os.path.join(data_dir, 'clusters_test_dummy_reads_2.fq') - self.clusters = clusters.Clusters(self.refdata_dir, reads1, reads2, self.cluster_dir, extern_progs) + self.clusters = clusters.Clusters(self.refdata_dir, reads1, reads2, self.cluster_dir, extern_progs, clean=False) def tearDown(self): @@ -86,7 +86,7 @@ def test_bam_to_clusters_reads_no_reads_map(self): reads2 = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads_no_reads_map_2.fq') ref = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.db.fa') refdata = reference_data.ReferenceData(presence_absence_fa = ref) - c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs) + c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs, clean=False) shutil.copyfile(os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads_no_reads_map.bam'), c.bam) c._bam_to_clusters_reads() @@ -105,7 +105,7 @@ def test_bam_to_clusters_reads(self): reads2 = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.reads_2.fq') ref = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.db.fa') refdata = reference_data.ReferenceData(presence_absence_fa = ref) - c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs) + c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs, clean=False) shutil.copyfile(os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.bam'), c.bam) c._bam_to_clusters_reads() expected = [