Skip to content

Commit

Permalink
Merge pull request #210 from andreyto/at_spades
Browse files Browse the repository at this point in the history
Option to use Spades and multithreading for Bowtie2 and Spades
  • Loading branch information
martinghunt committed Feb 2, 2018
2 parents 7c37d31 + e86c9c1 commit 8478708
Show file tree
Hide file tree
Showing 15 changed files with 914 additions and 133 deletions.
114 changes: 112 additions & 2 deletions ariba/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pymummer
import fermilite_ariba
from ariba import common, faidx, mapping, bam_parse, external_progs, ref_seq_chooser
import shlex

class Error (Exception): pass

Expand All @@ -29,6 +30,9 @@ def __init__(self,
nucmer_breaklen=200,
extern_progs=None,
clean=True,
spades_mode="wgs",
spades_options=None,
threads=1
):
self.reads1 = os.path.abspath(reads1)
self.reads2 = os.path.abspath(reads2)
Expand All @@ -50,6 +54,9 @@ def __init__(self,
self.nucmer_min_len = nucmer_min_len
self.nucmer_breaklen = nucmer_breaklen
self.clean = clean
self.spades_mode = spades_mode
self.spades_options = spades_options
self.threads = threads

if extern_progs is None:
self.extern_progs = external_progs.ExternalProgs()
Expand Down Expand Up @@ -94,6 +101,106 @@ def _assemble_with_fermilite(self):
self.assembled_ok = (got_from_fermilite == 0)
os.chdir(cwd)

@staticmethod
def _check_spades_log_file(logfile):
'''SPAdes can fail with a strange error. Stop everything if this happens'''
f = pyfastaq.utils.open_file_read(logfile)

for line in f:
if line.startswith('== Error == system call for:') and line.rstrip().endswith('finished abnormally, err code: -7'):
pyfastaq.utils.close(f)
print('Error running SPAdes. Cannot continue. This is the error from the log file', logfile, '...', file=sys.stderr)
print(line, file=sys.stderr)
raise Error('Fatal error ("err code: -7") running spades. Cannot continue')

pyfastaq.utils.close(f)
return True

def _assemble_with_spades(self):
cwd = os.getcwd()
self.assembled_ok = False
try:
try:
os.chdir(self.working_dir)
except:
raise Error('Error chdir ' + self.working_dir)
spades_exe = self.extern_progs.exe('spades')
if not spades_exe:
raise Error("Spades executable has not been found")
spades_options = self.spades_options
if spades_options is not None:
spades_options = shlex.split(self.spades_options)
if self.spades_mode == "rna":
spades_options = ["--rna"] + (["-k","127"] if spades_options is None else spades_options)
spades_out_seq_base = "transcripts.fasta"
elif self.spades_mode == "sc":
spades_options = ["--sc"] + (["-k", "33,55,77,99,127","--careful"] if spades_options is None else spades_options)
spades_out_seq_base = "contigs.fasta"
elif self.spades_mode == "wgs":
spades_options = ["-k", "33,55,77,99,127","--careful"] if spades_options is None else spades_options
spades_out_seq_base = "contigs.fasta"
else:
raise ValueError("Unknown spades_mode value: {}".format(self.spades_mode))
asm_cmd = [spades_exe, "-t", str(self.threads), "--pe1-1", self.reads1, "--pe1-2", self.reads2, "-o", self.assembler_dir] + \
spades_options
asm_ok,err = common.syscall(asm_cmd, verbose=True, verbose_filehandle=self.log_fh, shell=False, allow_fail=True)
if not asm_ok:
print('Assembly finished with errors. These are the errors:', file=self.log_fh)
print(err, file=self.log_fh)
print('\nEnd of spades errors\n', file=self.log_fh)
else:

spades_log = os.path.join(self.assembler_dir, 'spades.log')
if os.path.exists(spades_log):
self._check_spades_log_file(spades_log)

with open(spades_log) as f:
print('\n______________ SPAdes log ___________________\n', file=self.log_fh)
for line in f:
print(line.rstrip(), file=self.log_fh)
print('\n______________ End of SPAdes log _________________\n', file=self.log_fh)

spades_warnings = os.path.join(self.assembler_dir, 'warnings.log')
if os.path.exists(spades_warnings):
with open(spades_warnings) as f:
print('\n______________ SPAdes warnings ___________________\n', file=self.log_fh)
for line in f:
print(line.rstrip(), file=self.log_fh)
print('\n______________ End of SPAdes warnings _________________\n', file=self.log_fh)

## fermilight module generates contig names that look like `cluster_1.l15.c17.ctg.1` where 'cluster_1'==self.contig_name_prefix
## the whole structure of the contig name is expected in several places downstream where it is parsed into individual components.
## For example, it is parsed into to l and c parts in ref_seq_chooser (although the parts are not actually used).
## This is the code from fermilight module that generates the contig ID string:
## ofs << ">" << namePrefix << ".l" << overlap << ".c" << minCount << ".ctg." << i + 1 << '\n'
##
## We generate the same contig name structure here using dummy values for overlap and minCount, in order
## to avoid distrupting the downstream code.
## Note that the fermilight module generates multiple versions of the assembly on a grid of l and c values,
## and ref_seq_chooser then picks a single "best" (l,c) version based on coverage/identity of the nucmer
## alignment to the reference. Spades generates a single version of the assembly, so ref_seq_chooser
## can only pick that one version.

spades_out_seq = os.path.join(self.assembler_dir,spades_out_seq_base)
## No need really to use general-purpose pyfastaq.sequences.file_reader here and pay performance cost for
## its multi-format line tests since we are only replacing the IDs in a pre-defined format
if os.path.exists(spades_out_seq):
with open(spades_out_seq,"r") as inp, open(self.all_assembly_contigs_fa,"w") as out:
pref = self.contig_name_prefix
i_cont = 0
for line in inp:
if line.startswith(">"):
i_cont += 1
line = ">{}.l15.c17.ctg.{}\n".format(pref,i_cont)
out.write(line)
if i_cont > 0:
self.assembled_ok = True
if self.clean:
print('Deleting assembly directory', self.assembler_dir, file=self.log_fh)
shutil.rmtree(self.assembler_dir,ignore_errors=True)
finally:
os.chdir(cwd)


@staticmethod
def _fix_contig_orientation(contigs_fa, ref_fa, outfile, min_id=90, min_length=20, breaklen=200):
Expand Down Expand Up @@ -148,7 +255,10 @@ def _parse_bam(sequences, bam, min_scaff_depth, max_insert):


def run(self):
self._assemble_with_fermilite()
if self.assembler == 'fermilite':
self._assemble_with_fermilite()
elif self.assembler == "spades":
self._assemble_with_spades()
print('Finished running assemblies', flush=True, file=self.log_fh)
self.sequences = {}

Expand Down Expand Up @@ -208,7 +318,7 @@ def run(self):
self.reads2,
self.final_assembly_fa,
self.final_assembly_bam[:-4],
threads=1,
threads=self.threads,
sort=True,
bowtie2=self.extern_progs.exe('bowtie2'),
bowtie2_version=self.extern_progs.version('bowtie2'),
Expand Down
95 changes: 66 additions & 29 deletions ariba/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def __init__(self,
max_allele_freq=0.90,
unique_threshold=0.03,
max_gene_nt_extend=30,
spades_other_options=None,
spades_mode="rna", #["rna","wgs"]
spades_options=None,
clean=True,
extern_progs=None,
random_seed=42,
threads_total=1
):
self.root_dir = os.path.abspath(root_dir)
self.read_store = read_store
Expand All @@ -71,7 +73,8 @@ def __init__(self,
self.sspace_k = sspace_k
self.sspace_sd = sspace_sd
self.reads_insert = reads_insert
self.spades_other_options = spades_other_options
self.spades_mode = spades_mode
self.spades_options = spades_options

self.reads_for_assembly1 = os.path.join(self.root_dir, 'reads_for_assembly_1.fq')
self.reads_for_assembly2 = os.path.join(self.root_dir, 'reads_for_assembly_2.fq')
Expand All @@ -96,6 +99,9 @@ def __init__(self,
self.status_flag = flag.Flag()
self.clean = clean

self.threads_total = threads_total
self.remaining_clusters = None

self.assembly_dir = os.path.join(self.root_dir, 'Assembly')
self.final_assembly_fa = os.path.join(self.root_dir, 'assembly.fa')
self.final_assembly_bam = os.path.join(self.root_dir, 'assembly.reads_mapped.bam')
Expand Down Expand Up @@ -138,13 +144,31 @@ def __init__(self,
for s in wanted_signals:
signal.signal(s, self._receive_signal)

def _update_threads(self):
"""Update available thread count post-construction.
To be called any number of times from run() method"""
if self.remaining_clusters is not None:
self.threads = max(1,self.threads_total//self.remaining_clusters.value)
#otherwise just keep the current (initial) value
print("{} detected {} threads available to it".format(self.name,self.threads), file = self.log_fh)

def _report_completion(self):
"""Update shared counters to signal that we are done with this cluster.
Call just before exiting run() method (in a finally clause)"""
rem_clust = self.remaining_clusters
if rem_clust is not None:
# -= is non-atomic, need to acquire a lock
with self.remaining_clusters_lock:
rem_clust.value -= 1
# we do not need this object anymore
self.remaining_clusters = None
print("{} reported completion".format(self.name), file=self.log_fh)

def _atexit(self):
if self.log_fh is not None:
pyfastaq.utils.close(self.log_fh)
self.log_fh = None


def _receive_signal(self, signum, stack):
print('Signal', signum, 'received in cluster', self.name + '... Stopping!', file=sys.stderr, flush=True)
if self.log_fh is not None:
Expand Down Expand Up @@ -190,7 +214,10 @@ def _set_up_input_files(self):
def _clean_file(self, filename):
if self.clean:
print('Deleting file', filename, file=self.log_fh)
os.unlink(filename)
try: #protect against OSError: [Errno 16] Device or resource busy: '.nfs0000000010f0f04f000003c9' and such
os.unlink(filename)
except:
pass


def _clean(self):
Expand Down Expand Up @@ -269,33 +296,39 @@ def _make_reads_for_assembly(number_of_wanted_reads, total_reads, reads_in1, rea
return total_reads


def run(self):
self._set_up_input_files()
def run(self,remaining_clusters=None,remaining_clusters_lock=None):
try:
self.remaining_clusters = remaining_clusters
self.remaining_clusters_lock = remaining_clusters_lock
self._update_threads()
self._set_up_input_files()

for fname in [self.all_reads1, self.all_reads2, self.references_fa]:
if not os.path.exists(fname):
raise Error('File ' + fname + ' not found. Cannot continue')
for fname in [self.all_reads1, self.all_reads2, self.references_fa]:
if not os.path.exists(fname):
raise Error('File ' + fname + ' not found. Cannot continue')

original_dir = os.getcwd()
os.chdir(self.root_dir)
original_dir = os.getcwd()
os.chdir(self.root_dir)

try:
self._run()
except Error as err:
os.chdir(original_dir)
print('Error running cluster! Error was:', err, sep='\n', file=self.log_fh)
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
raise Error('Error running cluster ' + self.name + '!')

try:
self._run()
except Error as err:
os.chdir(original_dir)
print('Error running cluster! Error was:', err, sep='\n', file=self.log_fh)
print('Finished', file=self.log_fh, flush=True)
print('{:_^79}'.format(' LOG FILE END ' + self.name + ' '), file=self.log_fh, flush=True)

# This stops multiprocessing complaining with the error:
# multiprocessing.pool.MaybeEncodingError: Error sending result: '[<ariba.cluster.Cluster object at 0x7ffa50f8bcd0>]'. Reason: 'TypeError("cannot serialize '_io.TextIOWrapper' object",)'
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
raise Error('Error running cluster ' + self.name + '!')

os.chdir(original_dir)
print('Finished', file=self.log_fh, flush=True)
print('{:_^79}'.format(' LOG FILE END ' + self.name + ' '), file=self.log_fh, flush=True)

# This stops multiprocessing complaining with the error:
# multiprocessing.pool.MaybeEncodingError: Error sending result: '[<ariba.cluster.Cluster object at 0x7ffa50f8bcd0>]'. Reason: 'TypeError("cannot serialize '_io.TextIOWrapper' object",)'
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
finally:
self._report_completion()


def _run(self):
Expand All @@ -310,6 +343,7 @@ def _run(self):
print('\nUsing', made_reads, 'from a total of', self.total_reads, 'for assembly.', file=self.log_fh, flush=True)
print('Assembling reads:', file=self.log_fh, flush=True)

self._update_threads()
self.assembly = assembly.Assembly(
self.reads_for_assembly1,
self.reads_for_assembly2,
Expand All @@ -323,7 +357,10 @@ def _run(self):
contig_name_prefix=self.name,
assembler=self.assembler,
extern_progs=self.extern_progs,
clean=self.clean
clean=self.clean,
spades_mode=self.spades_mode,
spades_options=self.spades_options,
threads=self.threads
)

self.assembly.run()
Expand All @@ -332,7 +369,7 @@ def _run(self):
self._clean_file(self.reads_for_assembly2)
if self.clean:
print('Deleting Assembly directory', self.assembly_dir, file=self.log_fh, flush=True)
shutil.rmtree(self.assembly_dir)
shutil.rmtree(self.assembly_dir,ignore_errors=True)


if self.assembled_ok and self.assembly.ref_seq_name is not None:
Expand All @@ -342,13 +379,13 @@ def _run(self):
self.is_variant_only = '1' if is_variant_only else '0'

print('\nAssembly was successful\n\nMapping reads to assembly:', file=self.log_fh, flush=True)

self._update_threads()
mapping.run_bowtie2(
self.all_reads1,
self.all_reads2,
self.final_assembly_fa,
self.final_assembly_bam[:-4],
threads=1,
threads=self.threads,
sort=True,
bowtie2=self.extern_progs.exe('bowtie2'),
bowtie2_preset='very-sensitive-local',
Expand Down
Loading

0 comments on commit 8478708

Please sign in to comment.