Skip to content

Commit

Permalink
Merge pull request #20 from CompEpigen/dev
Browse files Browse the repository at this point in the history
release 0.2.0
  • Loading branch information
KerstenBreuer committed Oct 17, 2019
2 parents e29a528 + 5156ac3 commit cf61029
Show file tree
Hide file tree
Showing 83 changed files with 4,130 additions and 821 deletions.
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
*.pyc
/migrations
/scratch/database
/scratch/exec
/scratch/CWL
/scratch
.vscode
test
/scratch/temp
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ If you have any question or are experiencing problems with CWLab, please contact
If you would like to contribute to the development and like to extend the functionality of CWLab to meet your requirements, you are more than welcome. We will do our best to support you and your contribution will be acknowledged.

## About Us:
CWLab is developed with love in the Division of Cancer Epigenomics at the German Cancer Research Center (DKFZ) in the beautiful university city of Heidelberg. We are an interdisciplinary team with wet-lab scientists and bioinformaticians working closely together. Our DNA sequencing-drive methodologies produce challenging amounts of data. CWLab helps us by giving all members of our team the ability to perform common bioinformatic analyses autonomously without having to acquire programming skills. This allows our bioinformatic stuff to focus on method development and interpretation of computationally complex data interpretation and integration.
CWLab is developed with love at the Division of Cancer Epigenomics of the German Cancer Research Center (DKFZ) in the beautiful university city of Heidelberg. We are an interdisciplinary team with wet-lab scientists and bioinformaticians working closely together. Our DNA sequencing-driven methodologies produce challenging amounts of data. CWLab helps us by giving all members of our team the ability to perform common bioinformatic analyses autonomously without having to acquire programming skills. This allows our bioinformatic stuff to focus on method development and interpretation of computationally complex data.

If you like to know more about us, please visit our website https://www.dkfz.de/en/CanEpi/contact.html.

Expand Down
57 changes: 39 additions & 18 deletions cwlab/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from flask import Flask
from .config import Config
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager

basedir = os.path.abspath(os.path.dirname(__file__))

Expand All @@ -18,31 +19,51 @@

app.config.from_object(Config())
db = SQLAlchemy(app)
login = LoginManager(app)

from .web_app import main, import_cwl, create_job, job_exec
from .web_app import main, import_cwl, create_job, job_exec, users, browse

def up(config_file=None):
# server up
def setup_db():
global app
global db
app.config.from_object(Config(config_file))

# set up the working environment:
if not os.path.isdir(app.config['TEMP_DIR']):
os.makedirs(app.config['TEMP_DIR'])
if not os.path.isdir(app.config['CWL_DIR']):
os.makedirs(app.config['CWL_DIR'])
if not os.path.isdir(app.config['EXEC_DIR']):
os.makedirs(app.config['EXEC_DIR'])
if not os.path.isdir(app.config['INPUT_DIR']):
os.makedirs(app.config['INPUT_DIR'])
if not os.path.isdir(app.config['DB_DIR']):
os.makedirs(app.config['DB_DIR'])

if app.config['ENABLE_USERS']:
from .users.db import User
from .exec.db import Exec
db.init_app(app)
db.create_all()
db.session.commit()
app.run(host=app.config["WEB_SERVER_HOST"], port=app.config["WEB_SERVER_PORT"])
if app.config['ENABLE_USERS']:
from .users.manage import get_users, interactively_add_user
admin_users = get_users(only_admins=True)
if len(admin_users) == 0:
interactively_add_user(
level="admin",
instruction="No admin user was defined yet. Please set the credentials for the first admin user."
)

def setup_working_dirs():
global app
for param in [
'TEMP_DIR',
'CWL_DIR',
'EXEC_DIR',
'DB_DIR'
]:
if not os.path.isdir(app.config[param]):
os.makedirs(app.config[param])


def up(config_file=None, webapp=True):
global app
app.config.from_object(Config(config_file))

setup_working_dirs()
setup_db()

if webapp:
if app.config['ENABLE_USERS']:
global login
login.login_view = 'login'
app.run(host=app.config["WEB_SERVER_HOST"], port=app.config["WEB_SERVER_PORT"])


71 changes: 59 additions & 12 deletions cwlab/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
from platform import system
basedir = os.path.abspath(os.path.dirname(__file__))

def normalize_path(path):
return os.path.realpath(path)

def normalize_path_dict(dict):
norm_dict = {}
for key in dict.keys():
norm_dict[key] = normalize_path(dict[key])
return norm_dict

class Config(object):
def __init__(self,CONFIG_FILE=None):
if system() == "Windows":
Expand Down Expand Up @@ -40,31 +49,43 @@ def __init__(self,CONFIG_FILE=None):
strftime("%Y%m%d%H%M%S", gmtime())
)

self.TEMP_DIR = (
self.TEMP_DIR = normalize_path(
os.environ.get('CWLAB_TEMP_DIR') or
self.CONFIG_FILE_content.get('TEMP_DIR') or
os.path.join( cwlab_fallback_dir, "temp")
)
self.CWL_DIR = (
self.CWL_DIR = normalize_path(
os.environ.get('CWLAB_CWL_DIR') or
self.CONFIG_FILE_content.get('CWL_DIR') or
os.path.join( cwlab_fallback_dir, "CWL")
)
self.EXEC_DIR = (
self.EXEC_DIR = normalize_path(
os.environ.get('CWLAB_EXEC_DIR') or
self.CONFIG_FILE_content.get('EXEC_DIR') or
os.path.join( cwlab_fallback_dir, "exec")
)
self.INPUT_DIR = (
os.environ.get('CWLAB_INPUT_DIR') or
self.CONFIG_FILE_content.get('INPUT_DIR') or
os.path.join( cwlab_fallback_dir, "input")
)
self.DB_DIR = (
self.DB_DIR = normalize_path(
os.environ.get('CWLAB_DB_DIR') or
self.CONFIG_FILE_content.get('DB_DIR') or
os.path.join( cwlab_fallback_dir, "database")
)
self.ADD_INPUT_DIRS = normalize_path_dict(
self.CONFIG_FILE_content.get('ADD_INPUT_DIRS') or
{"ROOT": "/"}
)
self.ADD_INPUT_UPLOAD_DIRS = normalize_path_dict(
self.CONFIG_FILE_content.get('ADD_INPUT_UPLOAD_DIRS') or
{"ROOT": "/"}
)

self.UPLOAD_ALLOWED = (
self.CONFIG_FILE_content.get('UPLOAD_ALLOWED') or
True
)
self.DOWNLOAD_ALLOWED = (
self.CONFIG_FILE_content.get('DOWNLOAD_ALLOWED') or
True
)

self.DEBUG = (
os.environ.get('CWLAB_DEBUG') == "True" or
Expand Down Expand Up @@ -98,6 +119,12 @@ def __init__(self,CONFIG_FILE=None):
1
)

self.ENABLE_USERS = (
os.environ.get('CWLAB_ENABLE_USERS') or
self.CONFIG_FILE_content.get('ENABLE_USERS') or
False
)

# execution profile:
self.EXEC_PROFILES = self.CONFIG_FILE_content.get('EXEC_PROFILES') or {}

Expand All @@ -109,9 +136,11 @@ def __init__(self,CONFIG_FILE=None):
"post_exec": 120
}
general_defaults = {
"max_retries_default": 3,
"max_parallel_runs_default": 3, # if exceeded, jobs will be queued
"wait_when_queued": 10, # When beeing queued, wait this long before trying to start again
"max_retries": 2,
"max_parallel_exec": 4, # if exceeded, jobs will be queued
"allow_user_decrease_max_parallel_exec": True,
"max_queue_duration": 864000,
"wait_in_queue_period": 4
}
for exec_profile in self.EXEC_PROFILES.keys():
timeout = timeout_defaults
Expand All @@ -120,6 +149,9 @@ def __init__(self,CONFIG_FILE=None):
self.EXEC_PROFILES[exec_profile]["timeout"] = timeout
general = general_defaults
general.update(self.EXEC_PROFILES[exec_profile])
self.EXEC_PROFILES[exec_profile] = general




# Configure web server:
Expand All @@ -134,6 +166,21 @@ def __init__(self,CONFIG_FILE=None):
"5000"
)

# custumatize messages:
self.LOGIN_INSTRUCTION = (
os.environ.get('CWLAB_LOGIN_INSTRUCTION') or
self.CONFIG_FILE_content.get('LOGIN_INSTRUCTION') or
""
)
self.REGISTRATION_INSTRUCTION = (
os.environ.get('CWLAB_REGISTRATION_INSTRUCTION') or
self.CONFIG_FILE_content.get('REGISTRATION_INSTRUCTION') or
"Please fill in the following fields. " +
"Your registration request will need approval by the administrator to acitivate your account."
)

# not accessible by user:
self.SEND_FILE_MAX_AGE_DEFAULT = 0 # disables caching



2 changes: 2 additions & 0 deletions cwlab/default_config_windows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ WEB_SERVER_PORT: 5000

DEBUG: False

ENABLE_USERS: True

EXEC_PROFILES:
cwltool_windows:

Expand Down
70 changes: 60 additions & 10 deletions cwlab/exec/cwlab_bg_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
else:
db_retry_delays = [1, 5, 20, 60, 600]

# wait random time:
sleep(1 + 1*random())

# open connection to database
for db_retry_delay in db_retry_delays:
try:
Expand All @@ -51,17 +54,22 @@
else:
sleep(db_retry_delay + db_retry_delay*random())

def query_info_from_db(what):
for db_retry_delay in db_retry_delays:
def query_info_from_db(what, db_retry_delays_=None, no_exit=False):
db_retry_delays_ = db_retry_delays if db_retry_delays_ is None else db_retry_delays_
db_request = ""
for db_retry_delay in db_retry_delays_:
try:
if what == "run_info":
db_request = session.query(Exec).get(int(exec_db_id))
else:
pass #! no other options yet
db_request = session.query(Exec).get(exec_db_id)
elif what == "next_in_queue":
db_request = session.query(Exec).filter(Exec.status == "queued").order_by(Exec.id.asc()).first()
elif what == "running_exec":
db_request = session.query(Exec).filter(Exec.time_finished == None).filter(Exec.status != "queued")
break
except:
if db_retry_delay == db_retry_delays[-1]:
sys.exit("Exception query to database: \n" + str(e))
if not no_exit:
sys.exit("Exception query to database: \n" + str(e))
else:
sleep(db_retry_delay + db_retry_delay*random())
return db_request
Expand All @@ -76,28 +84,70 @@ def query_info_from_db(what):
out_dir = exec_db_entry.out_dir
global_temp_dir = exec_db_entry.global_temp_dir
log = exec_db_entry.log
time_started = exec_db_entry.time_started
exec_profile = exec_db_entry.exec_profile
exec_profile_name = exec_db_entry.exec_profile_name



# retry on commit:
def commit():
for db_retry_delay in db_retry_delays:
def commit(db_retry_delays_=None, no_exit=False):
db_retry_delays_ = db_retry_delays if db_retry_delays_ is None else db_retry_delays_
for db_retry_delay in db_retry_delays_:
try:
session.commit()
break
except Exception as e:
print(">>> retry db commit: " + str(db_retry_delay))
if db_retry_delay == db_retry_delays[-1]:
sys.exit("Exception during commit to database: \n" + str(e))
if not no_exit:
sys.exit("Exception during commit to database: \n" + str(e))
else:
sleep(db_retry_delay + db_retry_delay*random())

# set pid:
pid = os.getpid()
print(">>> Run's pid: " + str(pid))
exec_db_entry.pid = pid
commit()
print(">>> Run's pid: " + str(pid))

# wait until number of running jobs decreases below max_parallel_exec:
db_retry_delay_queue = [1]
wait = True
def wait_queue():
sleep(exec_profile["wait_in_queue_period"] + exec_profile["wait_in_queue_period"]*random())
def calc_duration(time_a, time_b):
delta = time_b - time_a
delta_second = delta.seconds + delta.days*86400
return delta_second
while wait:
if calc_duration(time_started, datetime.now()) > exec_profile["max_queue_duration"]:
exec_db_entry.status = "queued too long"
exec_db_entry.err_message = "Max queueing duration exceeded."
exec_db_entry.time_finished = datetime.now()
commit()
sys.exit(exec_db_entry.err_message)
running_exec = query_info_from_db("running_exec", db_retry_delay_queue, no_exit=True)
if running_exec == "":
wait_queue()
continue
if not running_exec is None:
number_running_exec = running_exec.count()
max_parallel_exec_running = 0
for exec_ in running_exec.all():
if exec_.exec_profile["max_parallel_exec"] > max_parallel_exec_running:
max_parallel_exec_running = exec_.exec_profile["max_parallel_exec"]
if number_running_exec >= max(exec_profile["max_parallel_exec"], max_parallel_exec_running):
wait_queue()
continue
next_in_queue = query_info_from_db("next_in_queue", db_retry_delay_queue, no_exit=True)
if next_in_queue == "" or next_in_queue.id != exec_db_id:
wait_queue()
continue
wait=False

exec_db_entry.time_started = datetime.now()
commit()

# create out_dir:
if not os.path.exists(out_dir):
Expand Down
1 change: 1 addition & 0 deletions cwlab/exec/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Exec(db.Model):
time_finished = db.Column(db.DateTime())
timeout_limit = db.Column(db.DateTime())
pid = db.Column(db.Integer())
user_id = db.Column(db.Integer())
exec_profile = db.Column(db.JSON(none_as_null=True))
exec_profile_name = db.Column(db.String(64))

Expand Down
Loading

0 comments on commit cf61029

Please sign in to comment.