From 9832b016db07e5a6cbfa5234bd28c4b6ebdcc90d Mon Sep 17 00:00:00 2001 From: Matt Myers Date: Mon, 1 Aug 2022 18:01:23 -0400 Subject: [PATCH] Fix bug w/ argument order in run.py phase_snps (#148) * Fix bug w/ argument order in run.py phase_snps * using Worker for count_alleles to further debug issue 150 (#151) Co-authored-by: Vineet Bansal --- .pre-commit-config.yaml | 2 +- pyproject.toml | 2 +- src/hatchet/__init__.py | 2 +- src/hatchet/utils/count_alleles.py | 122 +++++---------------------- src/hatchet/utils/multiprocessing.py | 2 + src/hatchet/utils/run.py | 2 +- 6 files changed, 29 insertions(+), 103 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 98834783..c5c1f821 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: - id: flake8 - repo: https://github.com/grantjenks/blue.git - rev: v0.9.0 + rev: v0.9.1 hooks: - id: blue args: [--line-length=120] diff --git a/pyproject.toml b/pyproject.toml index e0430d6e..05397636 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hatchet" -version = "1.0.1" +version = "1.0.2" authors = [ { name="Simone Zaccaria", email="s.zaccaria@ucl.ac.uk" }, { name="Ben Raphael", email="braphael@cs.princeton.edu" }, diff --git a/src/hatchet/__init__.py b/src/hatchet/__init__.py index 28a119ef..75703b65 100644 --- a/src/hatchet/__init__.py +++ b/src/hatchet/__init__.py @@ -1,4 +1,4 @@ -__version__ = '1.0.1' +__version__ = '1.0.2' import os.path from importlib.resources import path diff --git a/src/hatchet/utils/count_alleles.py b/src/hatchet/utils/count_alleles.py index cd69679a..745161fb 100644 --- a/src/hatchet/utils/count_alleles.py +++ b/src/hatchet/utils/count_alleles.py @@ -3,13 +3,12 @@ import os.path import shlex import subprocess as pr -from multiprocessing import Process, Queue, JoinableQueue, Lock, Value from scipy.stats import beta import tempfile -import hatchet.utils.ProgressBar as pb import hatchet.utils.ArgParsing as ap from hatchet.utils.Supporting import log, logArgs, error, close +from hatchet.utils.multiprocessing import Worker def main(args=None): @@ -172,84 +171,33 @@ def counting( verbose, outdir, ): - # Define a Lock and a shared value for log printing through ProgressBar - err_lock = Lock() - counter = Value('i', 0) - progress_bar = pb.ProgressBar( - total=len(samples) * len(chromosomes), - length=40, - lock=err_lock, - counter=counter, - verbose=verbose, - ) - - # Establish communication queues - tasks = JoinableQueue() - results = Queue() - # Enqueue jobs - jobs_count = 0 + work = [] for bam in samples: for chro in chromosomes: - tasks.put((bam[0], bam[1], chro)) - jobs_count += 1 - - # Setting up the workers - workers = [ - AlleleCounter( - tasks, - results, - progress_bar, - bcftools, - reference, - q, - Q, - mincov, - dp, - E, - snplist, - verbose, - outdir, - ) - for i in range(min(num_workers, jobs_count)) - ] - - # Add a poison pill for each worker - for i in range(len(workers)): - tasks.put(None) - - # Start the workers - for w in workers: - w.start() + work.append((bam[0], bam[1], chro)) - # Wait for all of the tasks to finish - tasks.join() - - # Get the results - sorted_results = {} - for i in range(jobs_count): - res = results.get() - if len(res) > 0: - sorted_results[res[0][0], res[0][1]] = res - - # Close Queues - tasks.close() - results.close() - - # Ensure each worker terminates - for w in workers: - w.terminate() - w.join() + worker = AlleleCounter( + bcftools, + reference, + q, + Q, + mincov, + dp, + E, + snplist, + verbose, + outdir, + ) - return sorted_results + results = worker.run(work=work, n_instances=num_workers) + results = {(v[0][0], v[0][1]): v for v in results} + return results -class AlleleCounter(Process): +class AlleleCounter(Worker): def __init__( self, - task_queue, - result_queue, - progress_bar, bcftools, reference, q, @@ -261,10 +209,6 @@ def __init__( verbose, outdir, ): - Process.__init__(self) - self.task_queue = task_queue - self.result_queue = result_queue - self.progress_bar = progress_bar self.bcftools = bcftools self.reference = reference self.q = q @@ -276,30 +220,10 @@ def __init__( self.verbose = verbose self.outdir = outdir - def run(self): - while True: - next_task = self.task_queue.get() - if next_task is None: - # Poison pill means shutdown - self.task_queue.task_done() - break - - self.progress_bar.progress( - advance=False, - msg='{} starts on {} for {}'.format(self.name, next_task[1], next_task[2]), - ) - snps = self.countAlleles( - bamfile=next_task[0], - samplename=next_task[1], - chromosome=next_task[2], - ) - self.progress_bar.progress( - advance=True, - msg='{} ends on {} for {}'.format(self.name, next_task[1], next_task[2]), - ) - self.task_queue.task_done() - self.result_queue.put(snps) - return + def work(self, bamfile, samplename, chromosome): + # return 42 + snps = self.countAlleles(bamfile=bamfile, samplename=samplename, chromosome=chromosome) + return snps def countAlleles(self, bamfile, samplename, chromosome): cmd_mpileup = '{} mpileup {} -Ou -f {} --skip-indels -a INFO/AD -q {} -Q {} -d {} -T {}'.format( diff --git a/src/hatchet/utils/multiprocessing.py b/src/hatchet/utils/multiprocessing.py index 2e81c4d8..9d71eb43 100644 --- a/src/hatchet/utils/multiprocessing.py +++ b/src/hatchet/utils/multiprocessing.py @@ -57,6 +57,8 @@ def run(self, work, n_instances=None, show_progress=True): n_work = len(work) if n_instances is None: n_instances = min(cpu_count(), n_work) + else: + n_instances = min(n_instances, n_work) if show_progress: progress_bar = pb.ProgressBar( diff --git a/src/hatchet/utils/run.py b/src/hatchet/utils/run.py index 5344e795..9210b72f 100644 --- a/src/hatchet/utils/run.py +++ b/src/hatchet/utils/run.py @@ -158,8 +158,8 @@ def main(args=None): f'{output}/phase/', '-L', ] - + (['-N'] if config.genotype_snps.chr_notation else []) + glob.glob(f'{output}/snps/*.vcf.gz') + + (['-N'] if config.genotype_snps.chr_notation else []) + extra_args )