Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Add tpe option to the makeACDC.py #1138

Open
wants to merge 3 commits into
base: python3-migration-fast
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Unified/recoveror.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ def singleRecovery(url, task , initial, actions, do=False):
else:
payload['TrustPUSitelists'] = False

if action.startswith('tpe'):
tpe = float(action.split("-")[1])
if tpe:
payload['TimePerEvent'] = tpe

acdc_round = 0
initial_string = payload['RequestString']
if initial_string.startswith('ACDC'):
Expand Down
107 changes: 58 additions & 49 deletions makeACDC.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,73 @@
import logging
import os, sys
from optparse import OptionParser
#from reqmgr import ReqMgrClient

# from reqmgr import ReqMgrClient
logging.basicConfig(level=logging.WARNING)
import reqMgrClient
from utils import workflowInfo
from collections import defaultdict
from collections import defaultdict

prod_url = 'cmsweb.cern.ch'
testbed_url = 'cmsweb-testbed.cern.ch'

from Unified.recoveror import singleRecovery
from utils import workflowInfo


def makeACDC(**args):
url = args.get('url')
wfi = args.get('wfi')
task = args.get('task')
initial = wfi
actions = []
memory = args.get('memory',None)
memory = args.get('memory', None)
if memory:
#increment = initial.request['Memory'] - memory
#actions.append( 'mem-%d'% increment )
actions.append( 'mem-%s'% memory )
mcore = args.get('mcore',None)
# increment = initial.request['Memory'] - memory
# actions.append( 'mem-%d'% increment )
actions.append('mem-%s' % memory)
mcore = args.get('mcore', None)
if mcore:
actions.append( 'core-%s'% mcore)
xrootd = args.get('xrootd',None)
actions.append('core-%s' % mcore)
xrootd = args.get('xrootd', None)
if xrootd:
actions.append( 'xrootd-%s'% xrootd)

actions.append('xrootd-%s' % xrootd)
tpe = args.get('tpe', None)
if tpe:
actions.append('tpe-%s' % tpe)

acdc = singleRecovery(url, task, initial.request, actions, do=True)
if acdc:
return acdc
else:
print("Issue while creating the acdc for",task)
print("Issue while creating the acdc for", task)
return None

def main():

#Create option parser
def main():
# Create option parser
usage = "usage: %prog (-w workflow|-f filelist) (-t TASK|--all) [--tesbed]"
parser = OptionParser(usage=usage)
parser.add_option("-f","--file", dest="file", default=None,
help="Text file with a list of workflows")
parser.add_option("-w","--workflow", default=None,
parser.add_option("-f", "--file", dest="file", default=None,
help="Text file with a list of workflows")
parser.add_option("-w", "--workflow", default=None,
help="Coma separated list of wf to handle")
parser.add_option("-t","--task", default=None,
parser.add_option("-t", "--task", default=None,
help="Coma separated task to be recovered")
parser.add_option("-p","--path", default=None,
parser.add_option("-p", "--path", default=None,
help="Coma separated list of paths to recover")
parser.add_option("-a","--all",
help="Make acdc for all tasks to be recovered",default=False, action='store_true')
parser.add_option("-m","--memory", dest="memory", default=None, type=int,
help="Memory to override the original request memory")
parser.add_option("-c","--mcore", dest="mcore", default=None,
parser.add_option("-a", "--all",
help="Make acdc for all tasks to be recovered", default=False, action='store_true')
parser.add_option("-m", "--memory", dest="memory", default=None, type=int,
help="Memory to override the original request memory")
parser.add_option("-c", "--mcore", dest="mcore", default=None,
help="Multicore to override the original request multicore")
parser.add_option("-x","--xrootd", dest="xrootd", default=None, type=int,
parser.add_option("-x", "--xrootd", dest="xrootd", default=None, type=int,
help="Enable xrootd")
parser.add_option("-o","--out", dest="out", default='acdc_wf_list.txt', type=str,
parser.add_option("-o", "--out", dest="out", default='acdc_wf_list.txt', type=str,
help="Output file, to be filled with workflows for which ACDC was submitted.")
parser.add_option("--tpe", dest="tpe", default=None,
help="Time per event")
parser.add_option("--testbed", default=False, action="store_true")

(options, args) = parser.parse_args()
Expand All @@ -77,7 +85,7 @@ def main():
if os.path.isfile(outACDClist):
sys.exit("Make a new name for output file.")

if options.all : options.task = 'all'
if options.all: options.task = 'all'

if not options.task:
parser.error("Provide the -t Task Name or --all")
Expand All @@ -86,7 +94,7 @@ def main():
if not ((options.workflow) or (options.path) or (options.file)):
parser.error("Provide the -w Workflow Name or the -p path or the -f workflow filelist")
sys.exit(1)

wfs = None
wf_and_task = defaultdict(set)
if options.file:
Expand All @@ -97,36 +105,36 @@ def main():
## self contained
paths = options.path.split(',')
for p in paths:
_,wf,t = p.split('/',2)
wf_and_task[wf].add('/%s/%s'%(wf,t))
_, wf, t = p.split('/', 2)
wf_and_task[wf].add('/%s/%s' % (wf, t))
else:
parser.error("Either provide a -f filelist or a -w workflow or -p path")
sys.exit(1)

if not wf_and_task:
if options.task == 'all':
for wfname in wfs:
for wfname in wfs:
wf_and_task[wfname] = None
else:
for wfname in wfs:
wf_and_task[wfname].update( [('/%s/%s'%(wfname,task)).replace('//','/') for task in options.task.split(',')] )
for wfname in wfs:
wf_and_task[wfname].update(
[('/%s/%s' % (wfname, task)).replace('//', '/') for task in options.task.split(',')])

if not wf_and_task:
parser.error("Provide the -w Workflow Name and the -t Task Name or --all")
sys.exit(1)
sys.exit(1)


for wfname,tasks in wf_and_task.items():
for wfname, tasks in wf_and_task.items():
wfi = workflowInfo(url, wfname)
if tasks == None:
where,how_much,how_much_where = wfi.getRecoveryInfo()
where, how_much, how_much_where = wfi.getRecoveryInfo()
tasks = sorted(how_much.keys())
else:
tasks = sorted(tasks)

created = {}
print("Workflow:",wfname)
print("Tasks:",tasks)
print("Workflow:", wfname)
print("Tasks:", tasks)

# FIXME: eventually, we want to be able to target each task
# with different options
Expand All @@ -137,23 +145,24 @@ def main():
# create an ACDC workflow
for task in tasks:
r = makeACDC(url=url, wfi=wfi, task=task,
memory = options.memory,
mcore = options.mcore,
xrootd = options.xrootd)
if not r:
print("Error in creating ACDC for",task,"on",wfname)
memory=options.memory,
mcore=options.mcore,
xrootd=options.xrootd,
tpe=options.tpe)
if not r:
print("Error in creating ACDC for", task, "on", wfname)
break
created[task] = r

if len(created)!=len(tasks):
if len(created) != len(tasks):
print("Error in creating all required ACDCs")
sys.exit(1)

print("Created:")
for task in created:
print(created[task],"for",task)
with open(outACDClist, 'a') as f: f.write(str(created[task])+"\n")
print(created[task], "for", task)
with open(outACDClist, 'a') as f: f.write(str(created[task]) + "\n")

if __name__ == '__main__':
main()

if __name__ == '__main__':
main()