Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tmp dir cleaning #82

Merged
merged 3 commits into from
May 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions ariba/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(self,
if unittest:
self.log_fh = sys.stdout
else:
atexit.register(self._atexit)
self.log_fh = None

if extern_progs is None:
Expand All @@ -132,14 +133,26 @@ def __init__(self,
signal.signal(s, self._receive_signal)


def _atexit(self):
print('Error in cluster', self.name, '... Stopping!', file=sys.stderr, flush=True)
if self.log_fh is not None:
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
if self.fail_file is not None:
with open(self.fail_file, 'w') as f:
pass
os._exit(1)


def _receive_signal(self, signum, stack):
print('Signal', signum, 'received in cluster', self.name + '. Stopping!', file=sys.stderr, flush=True)
print('Signal', signum, 'received in cluster', self.name + '... Stopping!', file=sys.stderr, flush=True)
if self.log_fh is not None:
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
if self.fail_file is not None:
with open(self.fail_file, 'w') as f:
pass
sys.exit(1)


def _input_files_exist(self):
Expand All @@ -159,7 +172,7 @@ def _set_up_input_files(self):
try:
os.mkdir(self.root_dir)
except:
raise Error('Error making directory ' + seolf.root_dir)
raise Error('Error making directory ' + self.root_dir)
self.read_store.get_reads(self.name, self.all_reads1, self.all_reads2)
self.refdata.write_seqs_to_fasta(self.references_fa, self.reference_names)

Expand Down
106 changes: 62 additions & 44 deletions ariba/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _run_cluster(obj, verbose, clean, fails_dir):
failed_clusters = os.listdir(fails_dir)

if len(failed_clusters) > 0:
print('Other clusters failed. Stopping cluster', obj.name, file=sys.stderr)
print('Other clusters failed. Will not start cluster', obj.name, file=sys.stderr)
return obj

if verbose:
Expand Down Expand Up @@ -125,6 +125,7 @@ def __init__(self,
self.cluster_base_counts = {} # gene name -> number of bases
self.pool = None
self.fails_dir = os.path.join(self.outdir ,'.fails')
self.clusters_all_ran_ok = True

for d in [self.outdir, self.logs_dir, self.fails_dir]:
try:
Expand All @@ -141,14 +142,26 @@ def __init__(self,
if not os.path.exists(tmp_dir):
raise Error('Temporary directory ' + tmp_dir + ' not found. Cannot continue')

self.tmp_dir = tempfile.mkdtemp(prefix='ariba.tmp.', dir=os.path.abspath(tmp_dir))
if self.clean:
self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix='ariba.tmp.', dir=os.path.abspath(tmp_dir))
self.tmp_dir = self.tmp_dir_obj.name
else:
self.tmp_dir_obj = None
self.tmp_dir = os.path.join(self.outdir, 'clusters')
try:
os.mkdir(self.tmp_dir)
except:
raise Error('Error making directory ' + self.tmp_dir)

if self.verbose:
print('Temporary directory:', self.tmp_dir)

wanted_signals = [signal.SIGABRT, signal.SIGINT, signal.SIGSEGV, signal.SIGTERM]
for s in wanted_signals:
signal.signal(s, self._receive_signal)
for i in [x for x in dir(signal) if x.startswith("SIG") and x not in {'SIGCHLD', 'SIGCLD'}]:
try:
signum = getattr(signal, i)
signal.signal(signum, self._receive_signal)
except:
pass


def _stop_pool(self):
Expand All @@ -163,11 +176,10 @@ def _stop_pool(self):
def _emergency_stop(self):
self._stop_pool()
if self.clean:
if os.path.exists(self.tmp_dir):
try:
shutil.rmtree(self.tmp_dir)
except:
pass
try:
self.tmp_dir_obj.cleanup()
except:
pass


def _receive_signal(self, signum, stack):
Expand Down Expand Up @@ -373,13 +385,18 @@ def _init_and_run_clusters(self):
extern_progs=self.extern_progs,
))

try:
if self.threads > 1:
self.pool = multiprocessing.Pool(self.threads)
cluster_list = self.pool.starmap(_run_cluster, zip(cluster_list, itertools.repeat(self.verbose), itertools.repeat(self.clean), itertools.repeat(self.fails_dir)))
else:
for c in cluster_list:
_run_cluster(c, self.verbose, self.clean, self.fails_dir)
except:
self.clusters_all_ran_ok = False

if self.threads > 1:
self.pool = multiprocessing.Pool(self.threads)
cluster_list = self.pool.starmap(_run_cluster, zip(cluster_list, itertools.repeat(self.verbose), itertools.repeat(self.clean), itertools.repeat(self.fails_dir)))
else:
for c in cluster_list:
_run_cluster(c, self.verbose, self.clean, self.fails_dir)
if len(os.listdir(self.fails_dir)) > 0:
self.clusters_all_ran_ok = False

self.clusters = {c.name: c for c in cluster_list}

Expand Down Expand Up @@ -430,19 +447,24 @@ def _clean(self):
if self.clean:
shutil.rmtree(self.fails_dir)

if self.verbose:
print('Deleting tmp directory', self.tmp_dir)

if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)
try:
self.tmp_dir_obj.cleanup()
except:
pass

if self.verbose:
print('Deleting Logs directory', self.logs_dir)
shutil.rmtree(self.logs_dir)
try:
shutil.rmtree(self.logs_dir)
except:
pass

if self.verbose:
print('Deleting reads store files', self.read_store.outfile + '[.tbi]')
self.read_store.clean()
try:
self.read_store.clean()
except:
pass
else:
if self.verbose:
print('Not deleting anything because --noclean used')
Expand Down Expand Up @@ -502,25 +524,24 @@ def _run(self):
print('No reads mapped. Skipping all assemblies', flush=True)
print('WARNING: no reads mapped to reference genes. Therefore no local assemblies will be run', file=sys.stderr)

failed_clusters = os.listdir(self.fails_dir)
if len(failed_clusters):
print('Failed clusters:', ', '.join(failed_clusters), file=sys.stderr)
else:
if self.verbose:
print('{:_^79}'.format(' Writing reports '), flush=True)
print('Making', self.report_file_all_tsv)
self._write_reports(self.clusters, self.report_file_all_tsv)
if not self.clusters_all_ran_ok:
raise Error('At least one cluster failed! Stopping...')

if self.verbose:
print('Making', self.report_file_filtered_prefix + '.tsv')
rf = report_filter.ReportFilter(infile=self.report_file_all_tsv)
rf.run(self.report_file_filtered_prefix)
if self.verbose:
print('{:_^79}'.format(' Writing reports '), flush=True)
print('Making', self.report_file_all_tsv)
self._write_reports(self.clusters, self.report_file_all_tsv)

if self.verbose:
print()
print('{:_^79}'.format(' Writing fasta of assembled sequences '), flush=True)
print(self.catted_assembled_seqs_fasta)
self._write_catted_assembled_seqs_fasta(self.catted_assembled_seqs_fasta)
if self.verbose:
print('Making', self.report_file_filtered_prefix + '.tsv')
rf = report_filter.ReportFilter(infile=self.report_file_all_tsv)
rf.run(self.report_file_filtered_prefix)

if self.verbose:
print()
print('{:_^79}'.format(' Writing fasta of assembled sequences '), flush=True)
print(self.catted_assembled_seqs_fasta)
self._write_catted_assembled_seqs_fasta(self.catted_assembled_seqs_fasta)

clusters_log_file = os.path.join(self.outdir, 'log.clusters.gz')
if self.verbose:
Expand All @@ -534,10 +555,7 @@ def _run(self):
print('{:_^79}'.format(' Cleaning files '), flush=True)
self._clean()

if self.verbose:
if self.clusters_all_ran_ok and self.verbose:
print('\nAll done!\n')

if len(failed_clusters):
raise Error('There were failed clusters: ' + ', '.join(failed_clusters))

os.chdir(cwd)
6 changes: 3 additions & 3 deletions ariba/tests/clusters_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setUp(self):

reads1 = os.path.join(data_dir, 'clusters_test_dummy_reads_1.fq')
reads2 = os.path.join(data_dir, 'clusters_test_dummy_reads_2.fq')
self.clusters = clusters.Clusters(self.refdata_dir, reads1, reads2, self.cluster_dir, extern_progs)
self.clusters = clusters.Clusters(self.refdata_dir, reads1, reads2, self.cluster_dir, extern_progs, clean=False)


def tearDown(self):
Expand Down Expand Up @@ -86,7 +86,7 @@ def test_bam_to_clusters_reads_no_reads_map(self):
reads2 = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads_no_reads_map_2.fq')
ref = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.db.fa')
refdata = reference_data.ReferenceData(presence_absence_fa = ref)
c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs)
c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs, clean=False)
shutil.copyfile(os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads_no_reads_map.bam'), c.bam)
c._bam_to_clusters_reads()

Expand All @@ -105,7 +105,7 @@ def test_bam_to_clusters_reads(self):
reads2 = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.reads_2.fq')
ref = os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.db.fa')
refdata = reference_data.ReferenceData(presence_absence_fa = ref)
c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs)
c = clusters.Clusters(self.refdata_dir, reads1, reads2, clusters_dir, extern_progs, clean=False)
shutil.copyfile(os.path.join(data_dir, 'clusters_test_bam_to_clusters_reads.bam'), c.bam)
c._bam_to_clusters_reads()
expected = [
Expand Down