Skip to content

Alascan parallel tests #1322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: alascan-parallel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions src/haddock/libs/libparallel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Module in charge of parallelizing the execution of tasks."""

import time
import signal
import math
import subprocess
import os
from multiprocessing import Process, Queue

from haddock import log
Expand Down Expand Up @@ -206,3 +210,205 @@ def terminate(self) -> None:
worker.terminate()

log.info("The workers terminated in a controlled way")

# Using normal Scheduler and Worker inside alascan causes semaphore leak if keyboard interrupt comes at a wrong moment.
# These new AlascanWorker/Scehduler classes prevents multiprocessing-level semaphore leaks.
# TODO: look into CNSJob.run() in libsubprocess to address subprocess-level semaphore leaks.
class AlascanWorker(Worker):
"""Worker with signal handling and subprocess cleanup for graceful shutdown during alascan."""

def __init__(self, tasks, results):
super().__init__(tasks, results)
self._should_stop = False

def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
self._should_stop = True
# Also clean up any child processes
self._cleanup_child_processes()

def _cleanup_child_processes(self):
"""Clean up any child processes that may have been spawned."""
try:
# Kill all child processes of this worker
pid = os.getpid()
# Use pkill to kill all processes in the same process group
try:
subprocess.run(['pkill', '-P', str(pid)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2,
check=False) # Don't raise exception on non-zero exit
except (subprocess.TimeoutExpired, FileNotFoundError):
# if pkill not available or timeout
pass
except Exception as e:
log.debug(f"Error during child process cleanup: {e}")

def run(self):
"""Execute tasks with signal handling and subprocess cleanup."""
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
results = []
try:
for task in self.tasks:
# Check if we should stop before starting each task
if self._should_stop:
log.debug(f"{self.name} stopping early due to signal")
break
r = None

try:
r = task.run()
except Exception as e:
log.warning(f"Exception in task execution: {e}")
results.append(r)

# Check again after each task in case we got interrupted
if self._should_stop:
log.debug(f"{self.name} stopping after task completion")
break

except (KeyboardInterrupt, SystemExit):
log.debug(f"{self.name} interrupted during task execution")
self._cleanup_child_processes()

finally:
# Always clean up child processes on exit
self._cleanup_child_processes()

# Put results into the queue (even if partial)
try:
self.result_queue.put(results)
# Signal completion
self.result_queue.put(f"{self.name}_done")
except:
# If queue is closed, just exit
pass


class AlascanScheduler(Scheduler):
"""
without breaking the existing Scheduler interface.
Specialized scheduler for alascan that prevents semaphore leaks
"""

def __init__(self, tasks, ncores=None, max_cpus=False):
# Handle empty tasks case BEFORE calling parent init
if not tasks:
# Set up minimal state for empty tasks
self.max_cpus = max_cpus
self.num_tasks = 0
self._ncores = 0 # Set directly to avoid property setter
self.queue = Queue()
self.results = []
self.worker_list = []
log.info("Using 0 cores")
log.debug("0 tasks ready.")
return

# Initialize parent class normally
super().__init__(tasks, ncores, max_cpus)

# Replace workers with AlascanWorkers (only if we have workers)
if self.num_processes > 0:
job_list = self._get_job_list(tasks)
self.worker_list = [AlascanWorker(jobs, self.queue) for jobs in job_list]

def _get_job_list(self, tasks):
"""Get the same job distribution as parent class."""
# Replicate parent's task sorting and splitting logic
if all(hasattr(t, "input_file") for t in tasks):
task_name_dic = {}
for i, t in enumerate(tasks):
task_name_dic[i] = (t.input_file, len(str(t.input_file)))

sorted_task_list = []
for e in sorted(task_name_dic.items(), key=lambda x: (x[0], x[1])):
idx = e[0]
sorted_task_list.append(tasks[idx])
else:
sorted_task_list = tasks

# Use the split_tasks function from this module
return list(split_tasks(sorted_task_list, self.num_processes))

def run(self):
"""Run with enhanced cleanup but same interface as parent."""
try:
super().run() # Use parent's run logic
except KeyboardInterrupt:
log.info("Alascan interrupted. Performing enhanced cleanup...")
self.safe_terminate()
raise
finally:
# Always perform enhanced cleanup
self._enhanced_cleanup()

def safe_terminate(self):
"""Enhanced termination that prevents semaphore leaks."""
log.info("Performing safe termination...")

# Step 1: Send SIGTERM to allow graceful shutdown
for worker in self.worker_list:
if worker.is_alive():
try:
worker.terminate()
except:
pass

# Step 2: Give workers time to finish current tasks
shutdown_timeout = 3.0
start_time = time.time()

while time.time() - start_time < shutdown_timeout:
alive_workers = [w for w in self.worker_list if w.is_alive()]
if not alive_workers:
break
time.sleep(0.1)

# Step 3: Force kill any remaining workers
for worker in self.worker_list:
if worker.is_alive():
try:
worker.kill()
worker.join(timeout=1.0)
except:
pass

def _enhanced_cleanup(self):
"""Enhanced cleanup to prevent semaphore leaks."""
try:
# Ensure all workers are properly joined
for worker in self.worker_list:
if worker.is_alive():
worker.join(timeout=0.5)

# Clean up the queue - critical for preventing semaphore leaks
if hasattr(self, 'queue') and self.queue:
try:
# Drain remaining items
while True:
try:
self.queue.get_nowait()
except:
break

# Properly close the queue
self.queue.close()
self.queue.join_thread()
except:
pass

except Exception as e:
log.debug(f"Warning during enhanced cleanup: {e}")

def __enter__(self):
"""Context manager support."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager cleanup."""
if exc_type == KeyboardInterrupt:
self.safe_terminate()
self._enhanced_cleanup()
28 changes: 20 additions & 8 deletions src/haddock/modules/analysis/alascan/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@
from haddock import log
from haddock.core.defaults import MODULE_DEFAULT_YAML
from haddock.libs.libparallel import get_index_list
from haddock.libs.libparallel import AlascanScheduler
from haddock.libs.libutil import parse_ncores
from haddock.modules import BaseHaddockModule
from haddock.modules import get_engine
from haddock.modules.analysis import get_analysis_exec_mode
#from haddock.modules import get_engine
#from haddock.modules.analysis import get_analysis_exec_mode
from haddock.modules.analysis.alascan.scan import (
Scan,
ScanJob,
Expand Down Expand Up @@ -130,14 +131,25 @@ def _run(self):
# next libutil and libparallel will log info about per-model cores/tasks.
# This is misleading, if per-residue parallelization is present.
# This log makes log look more coherent, in a way.
log.info(f"Model-level parallelization:")

## Actually. This can cause semaphore leaks
# log.info(f"Model-level parallelization:")

exec_mode = get_analysis_exec_mode(self.params["mode"])
# exec_mode = get_analysis_exec_mode(self.params["mode"])

# Engine = get_engine(exec_mode, self.params)
# engine = Engine(alascan_jobs)
# engine.run()
## So replacing it with AlascanScheduler

log.info("Using AlascanScheduler for model-level parallelization")
#try:
with AlascanScheduler(alascan_jobs, ncores=model_cores) as scheduler:
scheduler.run()

#except Exception as e:
# self.finish_with_error(f"Alascan model-level parallelization failed: {e}")

Engine = get_engine(exec_mode, self.params)
engine = Engine(alascan_jobs)
engine.run()

# cluster-based analysis
clt_alascan = alascan_cluster_analysis(models)
# now plot the data
Expand Down
Loading