From 3863766cb72354681162dfe865c84e26517bdee0 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Wed, 14 Jun 2023 23:30:51 -0600 Subject: [PATCH 1/6] Large refactoring of ttbar analysis The goal of the refactoring was to make the actual analysis logic stand out more, making the code easier to follow. The main changes are: - class TtbarAnalysis has been removed: its logic has been split between a few free functions - a few helper functions and types have been added to utils.py, so that analysis.py contains almost only the actual analysis logic - plotting logic has been moved to plotting.py The new implementation is expected to produce exactly the same plots and output histograms as before the refactoring. --- analyses/cms-open-data-ttbar/analysis.py | 849 +++++++----------- .../{helper.cpp => helpers.cpp} | 26 - analyses/cms-open-data-ttbar/plotting.py | 86 ++ analyses/cms-open-data-ttbar/utils.py | 150 +++- 4 files changed, 576 insertions(+), 535 deletions(-) rename analyses/cms-open-data-ttbar/{helper.cpp => helpers.cpp} (50%) create mode 100644 analyses/cms-open-data-ttbar/plotting.py diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index db6d2d5..f0f92e7 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -1,518 +1,359 @@ import argparse -import json import os -import time from pathlib import Path - -from utils import cache_files - -PARSER = argparse.ArgumentParser() -PARSER.add_argument("--ncores", - "-c", - help=("How many cores to use. If choosing a distributed execution, " - "this is the amount of cores per node."), - default = len(os.sched_getaffinity(0)), - type=int) -PARSER.add_argument("--scheduling-mode", - "-s", - help=("The scheduling mode of the analysis. RDataFrame supports " - "both single-node and multi-node parallelization."), - default="imt", - choices=["imt", "dask-local", "dask-ssh"]) -PARSER.add_argument("--nodes", help="String containing the list of hostnames to be used. Useful only in pair with 'dask-ssh'" - " choice for the '--scheduling-mode' parameter", - type=str, default=None) -PARSER.add_argument("--npartitions", help="How many partitions to use.", type=int, default=None) -PARSER.add_argument("--n-files-max-per-sample", "-f", - help="How many files per sample will be processed. Default -1 (all files for all samples).", - type=int, default=-1) -PARSER.add_argument("--storage-location", "-l", - help="Where the data resides. Default downloads original dataset stored at UNL.", - default="unl", choices=["unl", "cern-xrootd"]) -PARSER.add_argument("--histograms-output-file", - help="Name of the output file to store histograms.", default="histograms.root") -PARSER.add_argument("--data-cache", "-d", help="Use the specified directory as a local data cache: required input datasets will be downloaded here and the analysis will read this local copy of the data.") -PARSER.add_argument("-v", "--verbose", action="store_true") -ARGS = PARSER.parse_args() - -# importing ROOT takes a little while, so we only do it if argument parsing succeeded -import ROOT # noqa: {E402} - -if ARGS.scheduling_mode == "imt": - - RDataFrame = ROOT.RDataFrame - RunGraphs = ROOT.RDF.RunGraphs - VariationsFor = ROOT.RDF.Experimental.VariationsFor - - def init_functions(): - ROOT.gSystem.CompileMacro("helper.cpp", "kO") - -else: - - # Dask configuration useful in distributed mode - RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame - RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs - VariationsFor = ROOT.RDF.Experimental.Distributed.VariationsFor - initialize = ROOT.RDF.Experimental.Distributed.initialize - - if ARGS.scheduling_mode == "dask-ssh": - if not ARGS.nodes: - raise ValueError("For SSHCluster deployments, please specify a " - "string with a comma-separated list of hostnames of nodes that will be used.") - - if ARGS.npartitions is None: - n_compute_nodes = len(ARGS.nodes.split(",")) - 1 - ARGS.npartitions = ARGS.ncores * n_compute_nodes - - elif ARGS.scheduling_mode == "dask-local": - if ARGS.npartitions is None: - ARGS.npartitions = ARGS.ncores - - from distributed import Client, LocalCluster, SSHCluster, get_worker - - def create_localcluster_connection(ncores: int) -> Client: - cluster = LocalCluster(n_workers=ncores, threads_per_worker=1, processes=True) - client = Client(cluster) - return client - - def create_sshcluster_connection(nodes: str, ncores: int) -> Client: - parsed_nodes = nodes.split(',') - scheduler = parsed_nodes[:1] - workers = parsed_nodes[1:] - - print(f"List of nodes: {scheduler=}, {workers=}") - - # The creation of the SSHCluster object needs to be further configured according to needs. +from time import time +from typing import Optional + +import ROOT +from distributed import Client, LocalCluster, SSHCluster, get_worker +from plotting import save_plots +from utils import ( + AGCInput, + AGCResult, + postprocess_results, + retrieve_inputs, + save_histos, +) + +# Using https://atlas-groupdata.web.cern.ch/atlas-groupdata/dev/AnalysisTop/TopDataPreparation/XSection-MC15-13TeV.data +# as a reference. Values are in pb. +XSEC_INFO = { + "ttbar": 396.87 + 332.97, # nonallhad + allhad, keep same x-sec for all + "single_top_s_chan": 2.0268 + 1.2676, + "single_top_t_chan": (36.993 + 22.175) / 0.252, # scale from lepton filter to inclusive + "single_top_tW": 37.936 + 37.906, + "wjets": 61457 * 0.252, # e/mu+nu final states +} + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser() + p.add_argument( + "--n-max-files-per-sample", + "-n", + help="How many files per sample will be processed (if absent, all files for all samples).", + type=int, + ) + p.add_argument( + "--data-cache", + "-d", + help="Use the specified directory as a local data cache: required input datasets will be downloaded here and the analysis will read this local copy of the data.", + ) + p.add_argument( + "--remote-data-prefix", + help="""The original data is stored at 'https://xrootd-local.unl.edu:1094//store/user/AGC'. + If this option is passed, that prefix is replaced with the argument to this option when accessing + remote data. For example for the version of the input datasets stored on EOS use + `--remote-data-prefix='root://eoscms.cern.ch//eos/cms/store/test/agc'`.""", + ) + p.add_argument( + "--output", + "-o", + help="Name of the file where analysis results will be stored. If it already exists, contents are overwritten.", + default="histograms.root", + ) + p.add_argument( + "--scheduler", + "-s", + help="""The scheduler for RDataFrame parallelization. Will honor the --ncores flag. + The default is `mt`, i.e. multi-thread execution. + If dask-ssh, a list of worker node hostnames to connect to should be provided via the --nodes option.""", + default="mt", + choices=["mt", "dask-local", "dask-ssh"], + ) + p.add_argument( + "--ncores", + "-c", + help=( + "Number of cores to use. In case of distributed execution this is the amount of cores per node." + ), + default=len(os.sched_getaffinity(0)), + type=int, + ) + p.add_argument( + "--npartitions", + help="Number of data partitions. Only used in case of distributed execution. By default it follows RDataFrame defaults.", + type=int, + ) + p.add_argument( + "--hosts", + help="A comma-separated list of worker node hostnames. Only required if --scheduler=dask-ssh, ignored otherwise.", + ) + p.add_argument("-v", "--verbose", help="Turn on verbose execution logs.", action="store_true") + + return p.parse_args() + + +def create_dask_client(scheduler: str, ncores: int, hosts: str) -> Client: + """Create a Dask distributed client.""" + if scheduler == "dask-local": + lc = LocalCluster(n_workers=ncores, threads_per_worker=1, processes=True) + return Client(lc) + + if scheduler == "dask-ssh": + workers = hosts.split(",") + print(f"Using worker nodes: {workers=}") + # The creation of the SSHCluster object might need to be further configured to fit specific use cases. # For example, in some clusters the "local_directory" key must be supplied in the worker_options dictionary. - cluster = SSHCluster(scheduler + workers, - connect_options={"known_hosts": None}, - worker_options={"nprocs": ncores, "nthreads": 1, "memory_limit": "32GB"}) - - return Client(cluster) - - def create_connection(nodes: str, ncores: int, scheduling_mode: str) -> Client: - if scheduling_mode == "dask-local": - return create_localcluster_connection(ncores) - elif scheduling_mode == "dask-ssh": - return create_sshcluster_connection(nodes, ncores) - # Add more cluster types here to accomodate different deployments - else: - raise ValueError( - f"Unexpected scheduling mode '{scheduling_mode}'. Acceptable values are ['dask-local', 'dask-ssh'].") - - def init_functions(): - try: - localdir = get_worker().local_directory - helper_path = os.path.join(localdir, "helper.cpp") - except ValueError: - # get_worker raises an error in case it is called from the local machine - # for now work around this by silencing the error. - helper_path = "helper.cpp" - - ROOT.gSystem.CompileMacro(helper_path, "kO") - -if ARGS.verbose: - verbosity = ROOT.Experimental.RLogScopedVerbosity( - ROOT.Detail.RDF.RDFLogChannel(), ROOT.Experimental.ELogLevel.kInfo) - - -class TtbarAnalysis(dict): - - def __init__(self, n_files_max_per_sample, cache_dir, storage_location, num_bins=25, bin_low=50, bin_high=550, connection=None): - - # Store input arguments - self.n_files_max_per_sample = n_files_max_per_sample # the number of files to be processed per sample - self.cache_dir = cache_dir - self.storage_location = storage_location - self.ntuples_file = "ntuples.json" - self.num_bins = num_bins - self.bin_low = bin_low - self.bin_high = bin_high - - # Connection handle in case distributed mode was selected - self.connection = connection - - self.variations = {} # serves as temporary storage for all histograms produced by VariationsFor - self._nevts_total = {} - # dictionary assigning file URLs (paths) to each process, variation, and region - self.input_data = self._construct_fileset() - # using https://atlas-groupdata.web.cern.ch/atlas-groupdata/dev/AnalysisTop/TopDataPreparation/XSection-MC15-13TeV.data - # for reference - # x-secs are in pb - self.xsec_info = { - "ttbar": 396.87 + 332.97, # nonallhad + allhad, keep same x-sec for all - "single_top_s_chan": 2.0268 + 1.2676, - "single_top_t_chan": (36.993 + 22.175) / 0.252, # scale from lepton filter to inclusive - "single_top_tW": 37.936 + 37.906, - "wjets": 61457 * 0.252, # e/mu+nu final states - "data": None + sshc = SSHCluster( + workers, + connect_options={"known_hosts": None}, + worker_options={"nprocs": ncores, "nthreads": 1, "memory_limit": "32GB"}, + ) + return Client(sshc) + + raise ValueError( + f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh']." + ) + + +def make_rdf( + files: list[str], client: Optional[Client], npartitions: Optional[int] +) -> ROOT.RDataFrame: + """Construct and return a dataframe or, if a dask client is present, a distributed dataframe.""" + if client is not None: + d = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame( + "events", files, daskclient=client, npartitions=npartitions + ) + d._headnode.backend.distribute_unique_paths( + [ + "helpers.cpp", + ] + ) + return d + + return ROOT.RDataFrame("events", files) + + +def define_trijet_mass(df: ROOT.RDataFrame) -> ROOT.RDataFrame: + """Add the trijet_mass observable to the dataframe after applying the appropriate selections.""" + + # First, select events with at least 2 b-tagged jets + df = df.Filter("Sum(jet_btag[jet_pt_mask]>=0.5)>1") + + # Build four-momentum vectors for each jet + df = df.Define( + "jet_p4", + """ + ROOT::VecOps::Construct( + jet_px[jet_pt_mask], jet_py[jet_pt_mask], jet_pz[jet_pt_mask], jet_mass[jet_pt_mask]) + """, + ) + + # Build trijet combinations + df = df.Define("trijet", "ROOT::VecOps::Combinations(jet_pt[jet_pt_mask],3)") + df = df.Define("ntrijet", "trijet[0].size()") + + # Assign four-momentums to each trijet combination + df = df.Define( + "trijet_p4", + """ + ROOT::RVec trijet_p4(ntrijet); + for (int i = 0; i < ntrijet; ++i) + { + int j1 = trijet[0][i]; + int j2 = trijet[1][i]; + int j3 = trijet[2][i]; + trijet_p4[i] = jet_p4[j1] + jet_p4[j2] + jet_p4[j3]; } - - def _construct_fileset(self): - - with open(self.ntuples_file) as f: - file_info = json.load(f) - - fileset = {} - for process in file_info.keys(): - if process == "data": - continue # skip data - fileset[process] = {} - self[process] = {} - self._nevts_total[process] = {} - - for variation in file_info[process].keys(): - file_list = file_info[process][variation]["files"] - if self.n_files_max_per_sample != -1: - file_list = file_list[:self.n_files_max_per_sample] # use partial set of samples - file_paths = [f["path"] for f in file_list] - prefix = "https://xrootd-local.unl.edu:1094//store/user/AGC" - assert all(f.startswith(prefix) for f in file_paths) - if (self.storage_location == "cern-xrootd"): - old_prefix, prefix = prefix, "root://eoscms.cern.ch//eos/cms/store/test/agc" - file_paths = [f.replace(old_prefix, prefix) for f in file_paths] - if self.cache_dir is not None: - cache_files(file_paths, self.cache_dir, prefix) - old_prefix, prefix = prefix, str(Path(self.cache_dir).absolute()) - file_paths = [f.replace(old_prefix, prefix) for f in file_paths] - fileset[process].update({variation: file_paths}) - nevts_total = sum([f["nevts"] for f in file_list]) - self._nevts_total[process].update({variation: nevts_total}) - self[process][variation] = {} - - return fileset - - def fill(self, process: str, variation: str): - - # all operations are handled by RDataFrame class, so the first step is the RDataFrame object instantiating - input_data = self.input_data[process][variation] - if ARGS.scheduling_mode == "imt": - d = RDataFrame("events", input_data) + return trijet_p4; + """, + ) + + # Get trijet transverse momentum values from four-momentum vectors + df = df.Define( + "trijet_pt", + "return ROOT::VecOps::Map(trijet_p4, [](ROOT::Math::PxPyPzMVector v) { return v.Pt(); })", + ) + + # trijet_btag is a helpful array of bool values indicating whether or not the maximum btag value in trijet is larger than 0.5 threshold + df = df.Define( + "trijet_btag", + """ + ROOT::RVecB btag(ntrijet); + for (int i = 0; i < ntrijet; ++i) + { + int j1 = trijet[0][i]; + int j2 = trijet[1][i]; + int j3 = trijet[2][i]; + btag[i] = std::max({jet_btag[j1], jet_btag[j2], jet_btag[j3]}) > 0.5; + } + return btag; + """, + ) + + # Evaluate mass of trijet with maximum pt and btag higher than threshold + df = df.Define( + "trijet_mass", + """ + double mass{}; + double Pt{}; + double indx{}; + for (int i = 0; i < ntrijet; ++i) { + if ((Pt < trijet_pt[i]) && (trijet_btag[i])) { + Pt = trijet_pt[i]; + indx = i; + } + } + mass = trijet_p4[indx].M(); + return mass; + """, + ) + + return df + + +def book_histos( + df: ROOT.RDataFrame, + process: str, + variation: str, + nevents: int, +) -> list[AGCResult]: + """Return the RDataFrame results pertaining to the desired process and variation.""" + # Calculate normalization for MC + x_sec = XSEC_INFO[process] + lumi = 3378 # /pb + xsec_weight = x_sec * lumi / nevents + df = df.Define("weights", str(xsec_weight)) # default weights + + if variation == "nominal": + # Jet_pt variations definition + # pt_scale_up() and pt_res_up(jet_pt) return scaling factors applying to jet_pt + # pt_scale_up() - jet energy scaly systematic + # pt_res_up(jet_pt) - jet resolution systematic + df = df.Vary( + "jet_pt", + "ROOT::RVec{jet_pt*pt_scale_up(), jet_pt*jet_pt_resolution(jet_pt.size())}", + ["pt_scale_up", "pt_res_up"], + ) + + if process == "wjets": + # Flat weight variation definition + df = df.Vary( + "weights", + "weights*flat_variation()", + [f"scale_var_{direction}" for direction in ["up", "down"]], + ) + + # Event selection - the core part of the algorithm applied for both regions + # Selecting events containing at least one lepton and four jets with pT > 25 GeV + # Applying requirement at least one of them must be b-tagged jet (see details in the specification) + df = ( + df.Define("electron_pt_mask", "electron_pt>25") + .Define("muon_pt_mask", "muon_pt>25") + .Define("jet_pt_mask", "jet_pt>25") + .Filter("Sum(electron_pt_mask) + Sum(muon_pt_mask) == 1") + .Filter("Sum(jet_pt_mask) >= 4") + .Filter("Sum(jet_btag[jet_pt_mask]>=0.5)>=1") + ) + + # b-tagging variations for nominal samples + if variation == "nominal": + df = df.Vary( + "weights", + "ROOT::RVecD{weights*btag_weight_variation(jet_pt[jet_pt_mask])}", + [ + f"{weight_name}_{direction}" + for weight_name in [f"btag_var_{i}" for i in range(4)] + for direction in ["up", "down"] + ], + ) + + # Define HT observable for the 4j1b region + # Only one b-tagged region required + # The observable is the total transvesre momentum + # fmt: off + df4j1b = df.Filter("Sum(jet_btag[jet_pt_mask]>=0.5)==1")\ + .Define("HT", "Sum(jet_pt[jet_pt_mask])") + # fmt: on + + # Define trijet_mass observable for the 4j2b region (this one is more complicated) + df4j2b = define_trijet_mass(df) + + # Select the right VariationsFor function depending on RDF or DistRDF + if type(df).__module__ == "DistRDF.Proxy": + variationsfor_func = ROOT.RDF.Experimental.Distributed.VariationsFor + else: + variationsfor_func = ROOT.RDF.Experimental.VariationsFor + + # Book histograms and, if needed, their systematic variations + results = [] + for df, observable, region in zip([df4j1b, df4j2b], ["HT", "trijet_mass"], ["4j1b", "4j2b"]): + histo_model = ROOT.RDF.TH1DModel( + name=f"{region}_{process}_{variation}", title=process, nbinsx=25, xlow=50, xup=550 + ) + nominal_histo = df.Histo1D(histo_model, observable, "weights") + if variation == "nominal": + varied_histos = variationsfor_func(nominal_histo) + results.append(AGCResult(varied_histos, region, process, variation, nominal_histo)) else: - d = RDataFrame("events", input_data, daskclient=self.connection, npartitions=ARGS.npartitions) - d._headnode.backend.distribute_unique_paths(["helper.cpp", ]) + results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo)) + print(f"Booked histogram {histo_model.fName}") - # normalization for MC - x_sec = self.xsec_info[process] - nevts_total = self._nevts_total[process][variation] - lumi = 3378 # /pb - xsec_weight = x_sec * lumi / nevts_total - d = d.Define("weights", str(xsec_weight)) # default weights + # Return the booked results + # Note that no event loop has run yet at this point (RDataFrame is lazy) + return results - if variation == "nominal": - # jet_pt variations definition - # pt_scale_up() and pt_res_up(jet_pt) return scaling factors applying to jet_pt - # pt_scale_up() - jet energy scaly systematic - # pt_res_up(jet_pt) - jet resolution systematic - - d = d.Vary("jet_pt", - "ROOT::RVec{jet_pt*pt_scale_up(), jet_pt*jet_pt_resolution(jet_pt.size())}", - ["pt_scale_up", "pt_res_up"]) - if process == "wjets": - - # flat weight variation definition - d = d.Vary("weights", - "weights*flat_variation()", - [f"scale_var_{direction}" for direction in ["up", "down"]] - ) - - # event selection - the core part of the algorithm applied for both regions - # selecting events containing at least one lepton and four jets with pT > 25 GeV - # applying requirement at least one of them must be b-tagged jet (see details in the specification) - d = d.Define("electron_pt_mask", "electron_pt>25").Define("muon_pt_mask", "muon_pt>25").Define("jet_pt_mask", "jet_pt>25")\ - .Filter("Sum(electron_pt_mask) + Sum(muon_pt_mask) == 1")\ - .Filter("Sum(jet_pt_mask) >= 4")\ - .Filter("Sum(jet_btag[jet_pt_mask]>=0.5)>=1") - - # b-tagging variations for nominal samples - d = d.Vary("weights", - "ROOT::RVecD{weights*btag_weight_variation(jet_pt[jet_pt_mask])}", - [f"{weight_name}_{direction}" for weight_name in [f"btag_var_{i}" for i in range(4)] for direction in [ - "up", "down"]] - ) if variation == "nominal" else d - - # as next steps for different regions are different, there is a fork in the algorithm - # we create another RDF pointer for each region called "fork" - measured = {"4j1b": "HT", "4j2b": "trijet_mass"} # columns names of observables for two regions - for region in ["4j1b", "4j2b"]: - observable = measured[region] - - if region == "4j1b": - - # only one b-tagged region required - # observable is total transvesre momentum - fork = d.Filter("Sum(jet_btag[jet_pt_mask]>=0.5)==1").Define(observable, "Sum(jet_pt[jet_pt_mask])") - - elif region == "4j2b": - - # select events with at least 2 b-tagged jets - # building four-momentum vectors for each jet - fork = ( - d.Filter("Sum(jet_btag[jet_pt_mask]>=0.5)>1") - .Define("jet_p4", - """ - ROOT::VecOps::Construct( - jet_px[jet_pt_mask], jet_py[jet_pt_mask], jet_pz[jet_pt_mask], jet_mass[jet_pt_mask])""") - ) - # building trijet combinations - fork = fork.Define("trijet", - "ROOT::VecOps::Combinations(jet_pt[jet_pt_mask],3)" - ).Define("ntrijet", "trijet[0].size()") - - # assigning four-momentums to each trijet combination - fork = fork.Define("trijet_p4", - """ - ROOT::RVec trijet_p4(ntrijet); - for (int i = 0; i < ntrijet; ++i) - { - int j1 = trijet[0][i]; - int j2 = trijet[1][i]; - int j3 = trijet[2][i]; - trijet_p4[i] = jet_p4[j1] + jet_p4[j2] + jet_p4[j3]; - } - return trijet_p4; - """ - ) - - # getting trijet transverse momentum values from four-momentum vectors - fork = fork.Define("trijet_pt", - "return ROOT::VecOps::Map(trijet_p4, [](ROOT::Math::PxPyPzMVector v) { return v.Pt(); })" - ) - - # trijet_btag is a helpful array of bool values indicating whether or not the maximum btag value in trijet is larger than 0.5 threshold - fork = fork.Define("trijet_btag", - """ - ROOT::RVecB btag(ntrijet); - for (int i = 0; i < ntrijet; ++i) - { - int j1 = trijet[0][i]; - int j2 = trijet[1][i]; - int j3 = trijet[2][i]; - btag[i] = std::max({jet_btag[j1], jet_btag[j2], jet_btag[j3]}) > 0.5; - } - return btag; - """ - ) - # find trijet with maximum pt and higher that threshold btag - # get mass for found jet four-vector - # trijet mass themself is an observable quantity - fork = fork.Define(observable, - """ - double mass{}; - double Pt{}; - double indx{}; - for (int i = 0; i < ntrijet; ++i) { - if ((Pt < trijet_pt[i]) && (trijet_btag[i])) { - Pt = trijet_pt[i]; - indx = i; - } - } - mass = trijet_p4[indx].M(); - return mass; - """ - ) - - # fill histogram for observable column in RDF object - res = fork.Histo1D((f"{process}_{variation}_{region}", process, self.num_bins, - self.bin_low, self.bin_high), observable, "weights") - self.hist.append(res) # save the pointer to further triggering - print(f"histogram {process}_{variation}_{region} has been created") - - # save pointers for variations - # self.variations is a temporary container for all pointers - if variation == "nominal": - self.variations[f"{process}__{region}"] = VariationsFor(res) - else: - self[process][variation][region] = res - - # build 9 Graphs for each data sample - def Fill(self): - self.hist = [] - for process in self: - for variation in self.input_data[process]: - self.fill(process, variation) - - # run 9 Graphs for each data sample - def Accumulate(self): - RunGraphs(self.hist) - - # transform TtbarAnalysis to dictionary (process, variation, region) -> histogram - def TransfToDict(self): - for key in self.variations.keys(): - hist_map = self.variations[key] - key = str(key).split("__") - process = key[0] - region = key[1] - for hist_name in hist_map.GetKeys(): - variation = "nominal" if hist_name == "nominal" else str(hist_name).split(":")[1] - if variation not in self[process]: - self[process][variation] = {} - hist = hist_map[hist_name] - if not isinstance(hist, ROOT.TH1D): - hist = hist.GetValue() - self[process][variation][region] = hist - self.ExportJSON() - - def GetProcStack(self, region, variation="nominal"): - return [self[process][variation][region] for process in self] - - def GetVarStack(self, region, process="ttbar", variations=None): - variations = variations if variations is not None else self[process].keys() - histos = [self[process][variation][region] for variation in variations] - return [h.GetValue() if not isinstance(h, ROOT.TH1D) else h for h in histos] - - # necessary only for sanity checks - def ExportJSON(self): - data = {} - for process in self: - data[process] = {} - for variation in self[process]: - data[process][variation] = [region for region in self[process][variation]] - with open("data.json", "w") as f: - json.dump(data, f) - - -def analyse(connection=None): - - analysisManager = TtbarAnalysis(cache_dir=ARGS.data_cache, - n_files_max_per_sample=ARGS.n_files_max_per_sample, - storage_location=ARGS.storage_location, - connection=connection) - - # At this stage, analysisManager keeps all file URLs: - print(f"processes in fileset: {list(analysisManager.keys())}") - print( - f'\nexample of information inside analysisManager:\n{{\n "urls": [{analysisManager.input_data["ttbar"]["nominal"][0]}, ...],') - - t0 = time.time() - analysisManager.Fill() - t1 = time.time() - - print(f"\npreprocessing took {round(t1 - t0,2)} seconds") - - analysisManager.Accumulate() - t2 = time.time() - - print(f"processing took {round(t2 - t1,2)} seconds") - print(f"execution took {round(t2 - t0,2)} seconds") - - analysisManager.TransfToDict() - - return analysisManager - - -def make_plots(analysisManager): - - width = 2160 - height = 2160 - c = ROOT.TCanvas("c", "c", width, height) - ROOT.gStyle.SetPalette(ROOT.kRainBow) - - # Region 1 stack - hlist = analysisManager.GetProcStack(region="4j1b") - hs = ROOT.THStack("j4b1", ">=4 jets, 1 b-tag; H_{T} [GeV]") - for h in hlist: - h = ROOT.Slice(h, 120, 550) - ptr = h.Rebin(2, h.GetTitle()) - hs.Add(ptr) - hs.Draw("hist pfc plc") - c.Draw() - x = hs.GetXaxis() - x.SetTitleOffset(1.5) - x.CenterTitle() - c.BuildLegend(0.65, 0.7, 0.9, 0.9) - c.SaveAs("reg1.png") - - # Region 2 stack - hlist = analysisManager.GetProcStack(region="4j2b") - hs = ROOT.THStack("j4b1", ">=4 jets, 2 b-tag; H_{T} [GeV]") - for h in hlist: - hs.Add(h) - hs.Draw("hist pfc plc") - c.Draw() - x = hs.GetXaxis() - x.SetTitleOffset(1.5) - x.CenterTitle() - c.BuildLegend(0.65, 0.7, 0.9, 0.9) - c.SaveAs("reg2.png") - - # b-tag variations - btag_variations = ["nominal", "btag_var_0_up", "btag_var_1_up", "btag_var_2_up", "btag_var_3_up"] - freshstack = analysisManager.GetVarStack(region="4j1b", variations=btag_variations) - - hs = ROOT.THStack("j4b1btag", "btag-variations ; H_{T} [GeV]") - for h, name in zip(freshstack, btag_variations): - print(name) - ptr = h.Rebin(2, name) - ptr.SetLineWidth(4) - ptr.SetTitle(name) - hs.Add(ptr) - hs.Draw("hist nostack plc") - c.Draw() - x = hs.GetXaxis() - x.SetRangeUser(120, 500) - x.SetTitleOffset(1.5) - x.CenterTitle() - c.BuildLegend(0.65, 0.7, 0.9, 0.9) - c.SaveAs("btag.png") - - # Jet energy variations - jet_variations = ["nominal", "pt_scale_up", "pt_res_up"] - freshstack = analysisManager.GetVarStack(region="4j2b", variations=jet_variations) - hs = ROOT.THStack("4j2bjet", "Jet energy variations ; m_{bjj} [GeV]") - for h, name in zip(freshstack, jet_variations): - print(name) - h.SetFillColor(0) - h.SetLineWidth(4) - h.SetTitle(name) - hs.Add(h) - hs.Draw("hist nostack plc") - c.Draw() - x = hs.GetXaxis() - x.SetRangeUser(0, 600) - x.SetTitleOffset(1.5) - x.CenterTitle() - c.BuildLegend(0.65, 0.7, 0.9, 0.9) - c.SaveAs("jet.png") - - # Save histograms to disk - with ROOT.TFile.Open(ARGS.histograms_output_file, "RECREATE") as output: - for process in analysisManager: - for variation in analysisManager[process]: - for region in analysisManager[process][variation]: - hist_name = ( - f"{region}_{process}_{variation}" if variation != "nominal" else f"{region}_{process}" - ) - hist = analysisManager[process][variation][region] - if not isinstance(hist, ROOT.TH1D): - hist = hist.GetValue() - if hist.IsZombie(): - raise TypeError(hist_name) - hist_sliced = ROOT.Slice(hist, 120, 550) - hist_binned = hist_sliced.Rebin(2, hist.GetTitle()) - output.WriteObject(hist_binned, hist_name) - - -def main(): - - if ARGS.scheduling_mode == "imt": - init_functions() - ROOT.EnableImplicitMT(ARGS.ncores) - print(f"The num of threads = {ROOT.GetThreadPoolSize()}") - # No handle needed in local mode - connection = None - else: - initialize(init_functions) - # Create connection to the cluster in distributed mode - connection = create_connection(ARGS.nodes, ARGS.ncores, ARGS.scheduling_mode) +def load_cpp(): + """Load C++ helper functions. Works for both local and distributed execution.""" + try: + localdir = get_worker().local_directory + cpp_source = Path(localdir) / "helpers.cpp" + except ValueError: + # must be local execution + cpp_source = "helpers.cpp" + + ROOT.gSystem.CompileMacro(str(cpp_source), "kO") + - results = analyse(connection) - make_plots(results) +def main() -> None: + program_start = time() + args = parse_args() - if connection is not None: - connection.close() + # Do not add histograms to TDirectories automatically: we'll do it ourselves as needed. + ROOT.TH1.AddDirectory(False) + + if args.verbose: + # Set higher RDF verbosity for the rest of the program. + # To only change the verbosity in a given scope, use ROOT.Experimental.RLogScopedVerbosity. + ROOT.Detail.RDF.RDFLogChannel.SetVerbosity(ROOT.Experimental.ELogLevel.kInfo) + + if args.scheduler == "mt": + # Setup for local, multi-thread RDataFrame + ROOT.EnableImplicitMT(args.ncores) + print(f"Number of threads: {ROOT.GetThreadPoolSize()}") + client = None + load_cpp() + run_graphs = ROOT.RDF.RunGraphs + else: + # Setup for distributed RDataFrame + client = create_dask_client(args.scheduler, args.ncores, args.hosts) + ROOT.RDF.Experimental.Distributed.initialize(load_cpp) + run_graphs = ROOT.RDF.Experimental.Distributed.RunGraphs + + # Book RDataFrame results + inputs: list[AGCInput] = retrieve_inputs( + args.n_max_files_per_sample, args.remote_data_prefix, args.data_cache + ) + results: list[AGCResult] = [] + for input in inputs: + df = make_rdf(input.paths, client, args.npartitions) + results += book_histos(df, input.process, input.variation, input.nevents) + print(f"Building the computation graphs took {time() - program_start:.2f} seconds") + + # Run the event loops for all processes and variations here + run_graphs_start = time() + run_graphs([r.nominal_histo for r in results]) + print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds") + if client is not None: + client.close() + + results = postprocess_results(results) + save_plots(results) + save_histos([r.histo for r in results], output_fname=args.output) + print(f"Result histograms saved in file {args.output}") if __name__ == "__main__": - raise SystemExit(main()) + main() diff --git a/analyses/cms-open-data-ttbar/helper.cpp b/analyses/cms-open-data-ttbar/helpers.cpp similarity index 50% rename from analyses/cms-open-data-ttbar/helper.cpp rename to analyses/cms-open-data-ttbar/helpers.cpp index 403646a..e0aa1ec 100644 --- a/analyses/cms-open-data-ttbar/helper.cpp +++ b/analyses/cms-open-data-ttbar/helpers.cpp @@ -4,32 +4,6 @@ #include "TH1D.h" #include "TRandom3.h" -// functions slicing histograms - -// accept numbers of bins as parameters -TH1D SliceHisto(const TH1D &h, int xfirst, int xlast) -{ - - // do slice in xfirst:xlast including xfirst and xlast - TH1D res((std::string("h_sliced_") + h.GetTitle()).c_str(), h.GetTitle(), xlast - xfirst, - h.GetXaxis()->GetBinLowEdge(xfirst), h.GetXaxis()->GetBinUpEdge(xlast - 1)); - // note that histogram arrays are : [ undeflow, bin1, bin2,....., binN, overflow] - std::copy(h.GetArray() + xfirst, h.GetArray() + xlast, res.GetArray() + 1); - // set correct underflow/overflows - res.SetBinContent(0, h.Integral(0, xfirst - 1)); // set underflow value - res.SetBinContent(res.GetNbinsX() + 1, h.Integral(xlast, h.GetNbinsX() + 1)); // set overflow value - - return res; -} - -// accept axis limits as parameters -TH1D Slice(TH1D &h, double low_edge, double high_edge) -{ - int xfirst = h.FindBin(low_edge); - int xlast = h.FindBin(high_edge); - return SliceHisto(h, xfirst, xlast); -} - // functions creating systematic variations inline TRandom &get_thread_local_trandom() { diff --git a/analyses/cms-open-data-ttbar/plotting.py b/analyses/cms-open-data-ttbar/plotting.py new file mode 100644 index 0000000..6efafa6 --- /dev/null +++ b/analyses/cms-open-data-ttbar/plotting.py @@ -0,0 +1,86 @@ +import ROOT +from utils import AGCResult, slice_and_rebin + + +def save_plots(results: list[AGCResult]): + width = 2160 + height = 2160 + c = ROOT.TCanvas("c", "c", width, height) + ROOT.gStyle.SetPalette(ROOT.kRainBow) + + # Region 1 stack + hlist = [r.histo for r in results if r.region == "4j1b" and r.variation == "nominal"] + hlist = [slice_and_rebin(h) for h in hlist] + hs = ROOT.THStack("j4b1", ">=4 jets, 1 b-tag; H_{T} [GeV]") + for h in hlist: + hs.Add(h) + hs.Draw("hist pfc plc") + c.Draw() + x = hs.GetXaxis() + x.SetTitleOffset(1.5) + x.CenterTitle() + c.BuildLegend(0.65, 0.7, 0.9, 0.9) + c.SaveAs("reg1.png") + + # Region 2 stack + hlist = [r.histo for r in results if r.region == "4j2b" and r.variation == "nominal"] + hs = ROOT.THStack("j4b1", ">=4 jets, 2 b-tag; H_{T} [GeV]") + for h in hlist: + hs.Add(h) + hs.Draw("hist pfc plc") + c.Draw() + x = hs.GetXaxis() + x.SetTitleOffset(1.5) + x.CenterTitle() + c.BuildLegend(0.65, 0.7, 0.9, 0.9) + c.SaveAs("reg2.png") + + # b-tag variations + btag_variations = [ + "nominal", + "btag_var_0_up", + "btag_var_1_up", + "btag_var_2_up", + "btag_var_3_up", + ] + + def btag_filter(r): + return r.region == "4j1b" and r.process == "ttbar" and r.variation in btag_variations + + hlist = [r.histo for r in results if btag_filter(r)] + hlist = [h.Clone().Rebin(2) for h in hlist] + hs = ROOT.THStack("j4b1btag", "btag-variations ; H_{T} [GeV]") + for h, name in zip(hlist, btag_variations): + h.SetLineWidth(4) + h.SetTitle(name) + hs.Add(h) + hs.Draw("hist nostack plc") + c.Draw() + x = hs.GetXaxis() + x.SetRangeUser(120, 500) + x.SetTitleOffset(1.5) + x.CenterTitle() + c.BuildLegend(0.65, 0.7, 0.9, 0.9) + c.SaveAs("btag.png") + + # Jet energy variations + jet_variations = ["nominal", "pt_scale_up", "pt_res_up"] + + def jet_filter(r): + return r.region == "4j2b" and r.process == "ttbar" and r.variation in jet_variations + + hlist = [r.histo for r in results if jet_filter(r)] + hs = ROOT.THStack("4j2bjet", "Jet energy variations ; m_{bjj} [GeV]") + for h, name in zip(hlist, jet_variations): + h.SetFillColor(0) + h.SetLineWidth(4) + h.SetTitle(name) + hs.Add(h) + hs.Draw("hist nostack plc") + c.Draw() + x = hs.GetXaxis() + x.SetRangeUser(0, 600) + x.SetTitleOffset(1.5) + x.CenterTitle() + c.BuildLegend(0.65, 0.7, 0.9, 0.9) + c.SaveAs("jet.png") diff --git a/analyses/cms-open-data-ttbar/utils.py b/analyses/cms-open-data-ttbar/utils.py index 21bb482..8fde147 100644 --- a/analyses/cms-open-data-ttbar/utils.py +++ b/analyses/cms-open-data-ttbar/utils.py @@ -1,6 +1,56 @@ -from tqdm import tqdm -from urllib.request import urlretrieve +import json +from dataclasses import dataclass from pathlib import Path +from typing import Optional, Union +from urllib.request import urlretrieve + +import ROOT +from tqdm import tqdm + +# Declare a Slice helper C++ function +ROOT.gInterpreter.Declare( + """ +TH1D Slice(TH1D &h, double low_edge, double high_edge) +{ + int xfirst = h.FindBin(low_edge); + int xlast = h.FindBin(high_edge); + + // do slice in xfirst:xlast including xfirst and xlast + TH1D res(h.GetName(), h.GetTitle(), xlast - xfirst, + h.GetXaxis()->GetBinLowEdge(xfirst), h.GetXaxis()->GetBinUpEdge(xlast - 1)); + // note that histogram arrays are : [ undeflow, bin1, bin2,....., binN, overflow] + std::copy(h.GetArray() + xfirst, h.GetArray() + xlast, res.GetArray() + 1); + // set correct underflow/overflows + res.SetBinContent(0, h.Integral(0, xfirst - 1)); // set underflow value + res.SetBinContent(res.GetNbinsX() + 1, h.Integral(xlast, h.GetNbinsX() + 1)); // set overflow value + + return res; +} +""" +) + + +@dataclass +class AGCInput: + paths: list[str] # paths, http urls or xrootd urls of the input files + process: str + variation: str + nevents: int # total number of events for the sample + + +@dataclass +class AGCResult: + histo: Union[ + ROOT.TH1D, ROOT.RDF.RResultPtr[ROOT.TH1D], ROOT.RDF.Experimental.RResultMap[ROOT.TH1D] + ] + region: str + process: str + variation: str + # We keep around the nominal RResultPtr even when `histo` is a RResultMap: + # in v6.28 we need a RResultPtr to pass to RDF.RunGraphs in order to trigger the event loop. + # In later versions RunGraphs accepts RResultMaps as well, and we don't need this data attribute. + nominal_histo: ROOT.RDF.RResultPtr[ROOT.TH1D] + def _tqdm_urlretrieve_hook(t: tqdm): """From https://github.com/tqdm/tqdm/blob/master/examples/tqdm_wget.py .""" @@ -24,10 +74,100 @@ def update_to(b=1, bsize=1, tsize=None): return update_to -def cache_files(file_paths: list, cache_dir: str, remote_prefix: str): + +def _cache_files(file_paths: list, cache_dir: str, remote_prefix: str): for url in file_paths: - out_path = Path(cache_dir) / url.removeprefix(remote_prefix).lstrip('/') + out_path = Path(cache_dir) / url.removeprefix(remote_prefix).lstrip("/") out_path.parent.mkdir(parents=True, exist_ok=True) if not out_path.exists(): - with tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc=out_path.name) as t: + with tqdm( + unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=out_path.name + ) as t: urlretrieve(url, out_path.absolute(), reporthook=_tqdm_urlretrieve_hook(t)) + + +def retrieve_inputs( + max_files_per_sample: Optional[int], + remote_data_prefix: Optional[str], + data_cache: Optional[str], + input_json: Path = Path("ntuples.json"), +) -> list[AGCInput]: + """Return a dictionary of file paths and a corresponding dictionary of event counts. + Both are 2-level dictionaries: there is a dictionary per process per variation. + Each files[process][variation] corresponds to a list of input files. + nevts[process][variation] is the total number of events for that sample. + """ + with open(input_json) as f: + input_spec = json.load(f) + + input_files: list[AGCInput] = [] + + for process, process_spec in input_spec.items(): + if process == "data": + continue # skip data + + for variation, sample_info in process_spec.items(): + sample_info = sample_info["files"] # this is now a list of (filename, nevents) tuples + + if max_files_per_sample is not None: + sample_info = sample_info[:max_files_per_sample] + + file_paths = [f["path"] for f in sample_info] + prefix = "https://xrootd-local.unl.edu:1094//store/user/AGC" + assert all(f.startswith(prefix) for f in file_paths) + + if remote_data_prefix is not None: + old_prefix, prefix = prefix, remote_data_prefix + file_paths = [f.replace(old_prefix, prefix) for f in file_paths] + + if data_cache is not None: + _cache_files(file_paths, data_cache, prefix) + old_prefix, prefix = prefix, str(Path(data_cache).absolute()) + file_paths = [f.replace(old_prefix, prefix) for f in file_paths] + + nevents = sum(f["nevts"] for f in sample_info) + input_files.append(AGCInput(file_paths, process, variation, nevents)) + + return input_files + + +def postprocess_results(results: list[AGCResult]): + """Extract TH1D objects from list of RDF's ResultPtrs and RResultMaps. + The function also gives appropriate names to each varied histogram and slices them and rebins them as needed. + """ + + # Substitute RResultPtrs and RResultMaps of histograms to actual histograms + new_results = [] + for res in results: + if hasattr(res.histo, "GetValue"): # RResultPtr or distrdf equivalent + # just extract the histogram from the RResultPtr + h = res.histo.GetValue() + new_results.append( + AGCResult(h, res.region, res.process, res.variation, res.nominal_histo) + ) + else: + resmap = res.histo + assert hasattr(resmap, "GetKeys") # RResultMap or distrdf equivalent + # extract each histogram in the map + for variation in resmap.GetKeys(): + h = resmap[variation] + # strip the varied variable name: it's always 'weights' + variation_name = str(variation).split(":")[-1] + new_name = h.GetName().replace("nominal", variation_name) + h.SetName(new_name) + new_results.append( + AGCResult(h, res.region, res.process, variation_name, res.nominal_histo) + ) + + return new_results + + +# Apply slicing and rebinning similar to the reference implementation +def slice_and_rebin(h: ROOT.TH1D) -> ROOT.TH1D: + return ROOT.Slice(h, 120.0, 550.0).Rebin(2) + + +def save_histos(results: list[ROOT.TH1D], output_fname: str): + with ROOT.TFile.Open(output_fname, "recreate") as out_file: + for result in results: + out_file.WriteObject(slice_and_rebin(result), result.GetName()) From c138de473fea655bc581cc75564caea177700576 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Tue, 20 Jun 2023 12:41:52 -0600 Subject: [PATCH 2/6] Use SetRangeUser instead of slicing histograms This removes the dependency of plotting.py on the jitted ROOT.Slice function. Slicing and then rebinning is not strictly equivalent to rebinning and then reducing the axis range so this change introduces minor changes in the aspect of the btag.png plot. However the reference implementation does not specify how the histograms should be plotted so I think it is ok: the contents of the histograms saved in output are not modified. --- analyses/cms-open-data-ttbar/plotting.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/analyses/cms-open-data-ttbar/plotting.py b/analyses/cms-open-data-ttbar/plotting.py index 6efafa6..78457b3 100644 --- a/analyses/cms-open-data-ttbar/plotting.py +++ b/analyses/cms-open-data-ttbar/plotting.py @@ -1,5 +1,5 @@ import ROOT -from utils import AGCResult, slice_and_rebin +from utils import AGCResult def save_plots(results: list[AGCResult]): @@ -10,13 +10,14 @@ def save_plots(results: list[AGCResult]): # Region 1 stack hlist = [r.histo for r in results if r.region == "4j1b" and r.variation == "nominal"] - hlist = [slice_and_rebin(h) for h in hlist] + hlist = [h.Clone().Rebin(2) for h in hlist] hs = ROOT.THStack("j4b1", ">=4 jets, 1 b-tag; H_{T} [GeV]") for h in hlist: hs.Add(h) hs.Draw("hist pfc plc") c.Draw() x = hs.GetXaxis() + x.SetRangeUser(120, x.GetXmax()) x.SetTitleOffset(1.5) x.CenterTitle() c.BuildLegend(0.65, 0.7, 0.9, 0.9) @@ -57,7 +58,7 @@ def btag_filter(r): hs.Draw("hist nostack plc") c.Draw() x = hs.GetXaxis() - x.SetRangeUser(120, 500) + x.SetRangeUser(120, x.GetXmax()) x.SetTitleOffset(1.5) x.CenterTitle() c.BuildLegend(0.65, 0.7, 0.9, 0.9) From ef7b7fc5adf174fd8e965fffb992fed333e15529 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Tue, 20 Jun 2023 12:47:19 -0600 Subject: [PATCH 3/6] Write out histograms without slicing and rebinning This post-processing step is only required for the fitting part of the analysis, which we do not implement yet. v1.1 of the Coffea reference implementation also saves the full histograms, differently from v1.0. --- analyses/cms-open-data-ttbar/utils.py | 29 +-------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/analyses/cms-open-data-ttbar/utils.py b/analyses/cms-open-data-ttbar/utils.py index 8fde147..2583337 100644 --- a/analyses/cms-open-data-ttbar/utils.py +++ b/analyses/cms-open-data-ttbar/utils.py @@ -7,28 +7,6 @@ import ROOT from tqdm import tqdm -# Declare a Slice helper C++ function -ROOT.gInterpreter.Declare( - """ -TH1D Slice(TH1D &h, double low_edge, double high_edge) -{ - int xfirst = h.FindBin(low_edge); - int xlast = h.FindBin(high_edge); - - // do slice in xfirst:xlast including xfirst and xlast - TH1D res(h.GetName(), h.GetTitle(), xlast - xfirst, - h.GetXaxis()->GetBinLowEdge(xfirst), h.GetXaxis()->GetBinUpEdge(xlast - 1)); - // note that histogram arrays are : [ undeflow, bin1, bin2,....., binN, overflow] - std::copy(h.GetArray() + xfirst, h.GetArray() + xlast, res.GetArray() + 1); - // set correct underflow/overflows - res.SetBinContent(0, h.Integral(0, xfirst - 1)); // set underflow value - res.SetBinContent(res.GetNbinsX() + 1, h.Integral(xlast, h.GetNbinsX() + 1)); // set overflow value - - return res; -} -""" -) - @dataclass class AGCInput: @@ -162,12 +140,7 @@ def postprocess_results(results: list[AGCResult]): return new_results -# Apply slicing and rebinning similar to the reference implementation -def slice_and_rebin(h: ROOT.TH1D) -> ROOT.TH1D: - return ROOT.Slice(h, 120.0, 550.0).Rebin(2) - - def save_histos(results: list[ROOT.TH1D], output_fname: str): with ROOT.TFile.Open(output_fname, "recreate") as out_file: for result in results: - out_file.WriteObject(slice_and_rebin(result), result.GetName()) + out_file.WriteObject(result, result.GetName()) From ab0fe4aff8b3ee3700e8552b40bcb3f63e4030f5 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Thu, 6 Jul 2023 13:15:31 -0600 Subject: [PATCH 4/6] Disable interactive graphics Avoids canvases flashing on screen before we save them to file. --- analyses/cms-open-data-ttbar/analysis.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index f0f92e7..c8ca594 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -313,6 +313,8 @@ def main() -> None: # Do not add histograms to TDirectories automatically: we'll do it ourselves as needed. ROOT.TH1.AddDirectory(False) + # Disable interactive graphics: avoids canvases flashing on screen before we save them to file + ROOT.gROOT.SetBatch(True) if args.verbose: # Set higher RDF verbosity for the rest of the program. From 4da8d8850f592086370faab851f08f7e88b2c795 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Fri, 7 Jul 2023 17:02:55 -0600 Subject: [PATCH 5/6] [NFC] Add comment about distribution of C++ files --- analyses/cms-open-data-ttbar/analysis.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index c8ca594..dff233e 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -298,6 +298,8 @@ def book_histos( def load_cpp(): """Load C++ helper functions. Works for both local and distributed execution.""" try: + # when using distributed RDataFrame 'helpers.cpp' is copied to the local_directory + # of every worker (via `distribute_unique_paths`) localdir = get_worker().local_directory cpp_source = Path(localdir) / "helpers.cpp" except ValueError: From 2c512c63f4561ae0f169a0c50d123fbb818d73ff Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Fri, 7 Jul 2023 17:05:12 -0600 Subject: [PATCH 6/6] [NFC] Update postprocess_results docs It does not slice/rebin anymore. --- analyses/cms-open-data-ttbar/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyses/cms-open-data-ttbar/utils.py b/analyses/cms-open-data-ttbar/utils.py index 2583337..295a4fc 100644 --- a/analyses/cms-open-data-ttbar/utils.py +++ b/analyses/cms-open-data-ttbar/utils.py @@ -111,7 +111,7 @@ def retrieve_inputs( def postprocess_results(results: list[AGCResult]): """Extract TH1D objects from list of RDF's ResultPtrs and RResultMaps. - The function also gives appropriate names to each varied histogram and slices them and rebins them as needed. + The function also gives appropriate names to each varied histogram. """ # Substitute RResultPtrs and RResultMaps of histograms to actual histograms