Skip to content

scheduler: Add experimental job definition backup, as first step for job retries and better visibility #1237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import datetime
import time
import shutil

import kernelci
import kernelci.config
Expand All @@ -25,6 +26,8 @@

from base import Service

BACKUP_DIR = '/tmp/kci-backup'


class Scheduler(Service):
"""Service to schedule jobs that match received events"""
Expand Down Expand Up @@ -76,6 +79,38 @@ def _stop(self, sub_id):
self._api_helper.unsubscribe_filters(sub_id)
self._cleanup_paths()

def backup_cleanup(self):
"""
Cleanup the backup directory, removing files older than 24h
"""
if not os.path.exists(BACKUP_DIR):
return
now = datetime.datetime.now()
for f in os.listdir(BACKUP_DIR):
fpath = os.path.join(BACKUP_DIR, f)
if os.path.isfile(fpath):
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(fpath))
if (now - mtime).total_seconds() > 24 * 60 * 60:
os.remove(fpath)

def backup_job(self, filename, nodeid):
"""
Backup filename, rename to nodeid.submission and keep in BACKUP_DIR
Also check if BACKUP_DIR have files older than 24h, delete them
"""
if not os.path.exists(BACKUP_DIR):
os.makedirs(BACKUP_DIR)
# cleanup old files
self.backup_cleanup()
# backup file
new_filename = os.path.join(BACKUP_DIR, f"{nodeid}.submission")
self.log.info(f"Backing up {filename} to {new_filename}")
# copy file to backup directory
try:
shutil.copy2(filename, new_filename)
except Exception as e:
self.log.error(f"Failed to backup {filename} to {new_filename}: {e}")

def _run_job(self, job_config, runtime, platform, input_node):
try:
node = self._api_helper.create_job_node(job_config,
Expand Down Expand Up @@ -184,6 +219,7 @@ def _run_job(self, job_config, runtime, platform, input_node):
return
tmp = tempfile.TemporaryDirectory(dir=self._output)
output_file = runtime.save_file(data, tmp.name, params)
self.backup_job(output_file, node['id'])
try:
running_job = runtime.submit(output_file)
except Exception as e:
Expand Down