From 0c297f96ec8879ea8ee2d0b44cfe846cc7f67b6b Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Tue, 8 Jul 2025 17:48:17 +0300 Subject: [PATCH] scheduler: Add experimental job definition backup, as first step for job retries and better visibility Signed-off-by: Denys Fedoryshchenko --- src/scheduler.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/scheduler.py b/src/scheduler.py index 887a2e04a..f7f31e780 100755 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -15,6 +15,7 @@ import re import datetime import time +import shutil import kernelci import kernelci.config @@ -25,6 +26,8 @@ from base import Service +BACKUP_DIR = '/tmp/kci-backup' + class Scheduler(Service): """Service to schedule jobs that match received events""" @@ -76,6 +79,38 @@ def _stop(self, sub_id): self._api_helper.unsubscribe_filters(sub_id) self._cleanup_paths() + def backup_cleanup(self): + """ + Cleanup the backup directory, removing files older than 24h + """ + if not os.path.exists(BACKUP_DIR): + return + now = datetime.datetime.now() + for f in os.listdir(BACKUP_DIR): + fpath = os.path.join(BACKUP_DIR, f) + if os.path.isfile(fpath): + mtime = datetime.datetime.fromtimestamp(os.path.getmtime(fpath)) + if (now - mtime).total_seconds() > 24 * 60 * 60: + os.remove(fpath) + + def backup_job(self, filename, nodeid): + """ + Backup filename, rename to nodeid.submission and keep in BACKUP_DIR + Also check if BACKUP_DIR have files older than 24h, delete them + """ + if not os.path.exists(BACKUP_DIR): + os.makedirs(BACKUP_DIR) + # cleanup old files + self.backup_cleanup() + # backup file + new_filename = os.path.join(BACKUP_DIR, f"{nodeid}.submission") + self.log.info(f"Backing up {filename} to {new_filename}") + # copy file to backup directory + try: + shutil.copy2(filename, new_filename) + except Exception as e: + self.log.error(f"Failed to backup {filename} to {new_filename}: {e}") + def _run_job(self, job_config, runtime, platform, input_node): try: node = self._api_helper.create_job_node(job_config, @@ -184,6 +219,7 @@ def _run_job(self, job_config, runtime, platform, input_node): return tmp = tempfile.TemporaryDirectory(dir=self._output) output_file = runtime.save_file(data, tmp.name, params) + self.backup_job(output_file, node['id']) try: running_job = runtime.submit(output_file) except Exception as e: