diff --git a/Unified/recoveror.py b/Unified/recoveror.py index 9714ec81..711000f6 100755 --- a/Unified/recoveror.py +++ b/Unified/recoveror.py @@ -125,6 +125,11 @@ def singleRecovery(url, task , initial, actions, do=False): else: payload['TrustPUSitelists'] = False + if action.startswith('tpe'): + tpe = float(action.split("-")[1]) + if tpe: + payload['TimePerEvent'] = tpe + acdc_round = 0 initial_string = payload['RequestString'] if initial_string.startswith('ACDC'): diff --git a/makeACDC.py b/makeACDC.py index 00d2d239..5526d043 100755 --- a/makeACDC.py +++ b/makeACDC.py @@ -7,65 +7,73 @@ import logging import os, sys from optparse import OptionParser -#from reqmgr import ReqMgrClient + +# from reqmgr import ReqMgrClient logging.basicConfig(level=logging.WARNING) import reqMgrClient from utils import workflowInfo -from collections import defaultdict +from collections import defaultdict prod_url = 'cmsweb.cern.ch' testbed_url = 'cmsweb-testbed.cern.ch' from Unified.recoveror import singleRecovery from utils import workflowInfo + + def makeACDC(**args): url = args.get('url') wfi = args.get('wfi') task = args.get('task') initial = wfi actions = [] - memory = args.get('memory',None) + memory = args.get('memory', None) if memory: - #increment = initial.request['Memory'] - memory - #actions.append( 'mem-%d'% increment ) - actions.append( 'mem-%s'% memory ) - mcore = args.get('mcore',None) + # increment = initial.request['Memory'] - memory + # actions.append( 'mem-%d'% increment ) + actions.append('mem-%s' % memory) + mcore = args.get('mcore', None) if mcore: - actions.append( 'core-%s'% mcore) - xrootd = args.get('xrootd',None) + actions.append('core-%s' % mcore) + xrootd = args.get('xrootd', None) if xrootd: - actions.append( 'xrootd-%s'% xrootd) - + actions.append('xrootd-%s' % xrootd) + tpe = args.get('tpe', None) + if tpe: + actions.append('tpe-%s' % tpe) + acdc = singleRecovery(url, task, initial.request, actions, do=True) if acdc: return acdc else: - print("Issue while creating the acdc for",task) + print("Issue while creating the acdc for", task) return None -def main(): - #Create option parser +def main(): + # Create option parser usage = "usage: %prog (-w workflow|-f filelist) (-t TASK|--all) [--tesbed]" parser = OptionParser(usage=usage) - parser.add_option("-f","--file", dest="file", default=None, - help="Text file with a list of workflows") - parser.add_option("-w","--workflow", default=None, + parser.add_option("-f", "--file", dest="file", default=None, + help="Text file with a list of workflows") + parser.add_option("-w", "--workflow", default=None, help="Coma separated list of wf to handle") - parser.add_option("-t","--task", default=None, + parser.add_option("-t", "--task", default=None, help="Coma separated task to be recovered") - parser.add_option("-p","--path", default=None, + parser.add_option("-p", "--path", default=None, help="Coma separated list of paths to recover") - parser.add_option("-a","--all", - help="Make acdc for all tasks to be recovered",default=False, action='store_true') - parser.add_option("-m","--memory", dest="memory", default=None, type=int, - help="Memory to override the original request memory") - parser.add_option("-c","--mcore", dest="mcore", default=None, + parser.add_option("-a", "--all", + help="Make acdc for all tasks to be recovered", default=False, action='store_true') + parser.add_option("-m", "--memory", dest="memory", default=None, type=int, + help="Memory to override the original request memory") + parser.add_option("-c", "--mcore", dest="mcore", default=None, help="Multicore to override the original request multicore") - parser.add_option("-x","--xrootd", dest="xrootd", default=None, type=int, + parser.add_option("-x", "--xrootd", dest="xrootd", default=None, type=int, help="Enable xrootd") - parser.add_option("-o","--out", dest="out", default='acdc_wf_list.txt', type=str, + parser.add_option("-o", "--out", dest="out", default='acdc_wf_list.txt', type=str, help="Output file, to be filled with workflows for which ACDC was submitted.") + parser.add_option("--tpe", dest="tpe", default=None, + help="Time per event") parser.add_option("--testbed", default=False, action="store_true") (options, args) = parser.parse_args() @@ -77,7 +85,7 @@ def main(): if os.path.isfile(outACDClist): sys.exit("Make a new name for output file.") - if options.all : options.task = 'all' + if options.all: options.task = 'all' if not options.task: parser.error("Provide the -t Task Name or --all") @@ -86,7 +94,7 @@ def main(): if not ((options.workflow) or (options.path) or (options.file)): parser.error("Provide the -w Workflow Name or the -p path or the -f workflow filelist") sys.exit(1) - + wfs = None wf_and_task = defaultdict(set) if options.file: @@ -97,36 +105,36 @@ def main(): ## self contained paths = options.path.split(',') for p in paths: - _,wf,t = p.split('/',2) - wf_and_task[wf].add('/%s/%s'%(wf,t)) + _, wf, t = p.split('/', 2) + wf_and_task[wf].add('/%s/%s' % (wf, t)) else: parser.error("Either provide a -f filelist or a -w workflow or -p path") sys.exit(1) if not wf_and_task: if options.task == 'all': - for wfname in wfs: + for wfname in wfs: wf_and_task[wfname] = None else: - for wfname in wfs: - wf_and_task[wfname].update( [('/%s/%s'%(wfname,task)).replace('//','/') for task in options.task.split(',')] ) + for wfname in wfs: + wf_and_task[wfname].update( + [('/%s/%s' % (wfname, task)).replace('//', '/') for task in options.task.split(',')]) if not wf_and_task: parser.error("Provide the -w Workflow Name and the -t Task Name or --all") - sys.exit(1) + sys.exit(1) - - for wfname,tasks in wf_and_task.items(): + for wfname, tasks in wf_and_task.items(): wfi = workflowInfo(url, wfname) if tasks == None: - where,how_much,how_much_where = wfi.getRecoveryInfo() + where, how_much, how_much_where = wfi.getRecoveryInfo() tasks = sorted(how_much.keys()) else: tasks = sorted(tasks) created = {} - print("Workflow:",wfname) - print("Tasks:",tasks) + print("Workflow:", wfname) + print("Tasks:", tasks) # FIXME: eventually, we want to be able to target each task # with different options @@ -137,23 +145,24 @@ def main(): # create an ACDC workflow for task in tasks: r = makeACDC(url=url, wfi=wfi, task=task, - memory = options.memory, - mcore = options.mcore, - xrootd = options.xrootd) - if not r: - print("Error in creating ACDC for",task,"on",wfname) + memory=options.memory, + mcore=options.mcore, + xrootd=options.xrootd, + tpe=options.tpe) + if not r: + print("Error in creating ACDC for", task, "on", wfname) break created[task] = r - if len(created)!=len(tasks): + if len(created) != len(tasks): print("Error in creating all required ACDCs") sys.exit(1) print("Created:") for task in created: - print(created[task],"for",task) - with open(outACDClist, 'a') as f: f.write(str(created[task])+"\n") + print(created[task], "for", task) + with open(outACDClist, 'a') as f: f.write(str(created[task]) + "\n") -if __name__ == '__main__': - main() +if __name__ == '__main__': + main() \ No newline at end of file