From d2e5304edf49493ad000c14c4d7c5377ffd34ebf Mon Sep 17 00:00:00 2001 From: siranipour Date: Wed, 14 Jul 2021 13:27:17 +0100 Subject: [PATCH] Recreating python psueododata --- validphys2/src/validphys/pseudodata.py | 237 ++++++++++--------------- 1 file changed, 96 insertions(+), 141 deletions(-) diff --git a/validphys2/src/validphys/pseudodata.py b/validphys2/src/validphys/pseudodata.py index 5937f8a35f..8b17b0c121 100644 --- a/validphys2/src/validphys/pseudodata.py +++ b/validphys2/src/validphys/pseudodata.py @@ -12,20 +12,17 @@ import numpy as np import pandas as pd -from validphys.checks import check_cuts_fromfit, check_darwin_single_process +from validphys.checks import check_cuts_fromfit from validphys.covmats import INTRA_DATASET_SYS_NAME from reportengine import collect -from validphys.n3fit_data import replica_mcseed, replica_trvlseed import validphys.n3fit_data_utils as reader log = logging.getLogger(__name__) DataTrValSpec = namedtuple('DataTrValSpec', ['pseudodata', 'tr_idx', 'val_idx']) -fitted_pseudodata = collect('fitted_pseudodata_internal', ('fitcontext',)) - context_index = collect("groups_index", ("fitcontext",)) @check_cuts_fromfit @@ -238,152 +235,110 @@ def indexed_make_replica(groups_index, make_replica): return pd.DataFrame(make_replica, index=groups_index, columns=["data"]) -@check_darwin_single_process -def fitted_pseudodata_internal(fit, experiments, num_fitted_replicas, t0pdfset=None, NPROC=None): - """A function to obtain information about the pseudodata that went - into an N3FIT fit. - - Parameters - ---------- - fit: :py:class:`validphys.core.FitSpec` - experiments: - List of :py:class:`validphys.core.ExeperimentSpec` - num_nnfit_replicas: ``int`` - Provided for by :py:mod:`validphys.fitdata`. Equal to the number of - pre-postfit replicas. - t0pdfset: :py:class:`validphys.core.PDF` - NPROC: ``int`` - Integer specifying how many cores to run on. Default is - ``mp.cpu_count()`` - - Example - ------- - Create a ``YAML`` file say ``runcard_for_pseudodata.yaml`` - - .. code-block:: YAML - :caption: runcard_for_pseudodata.yaml - - pdf: PN3_DIS_130519 - fit: PN3_DIS_130519 - - experiments: - from_: fit - - theory: - from_: fit - - t0pdfset: - from_: datacuts - - datacuts: - from_: fit - - theoryid: - from_: theory - - use_cuts: fromfit - - Then run - - >>> with open("./runcard_for_pseudodata.yaml", 'r') as stream: - ... from reportengine.compat import yaml - ... runcard = yaml.safe_load(stream) - >>> from validphys.api import API - >>> API.get_pseudodata_internal(**runcard) - - Notes - ----- - - This is a wrapper for the ``fitted_pseudodata`` action - which knows that ``experiments``, *must* come from fit - and similarly ``PDF`` and ``theoryid`` *must* be the same as - that of ``fit`` and so on. - - This function returns the pseudodata for the replicas - pre-postfit. Postfit discards some replicas and rearranges - the order. The correpsondence is done by the - :py:func:`get_pseudodata` - function. - - This code runs in parallel to increase efficiency. +def _make_replica_task(dataset_inputs, theoryid, fit, mcseed, genrep, API, replica): + """A global function for + :py:func:`validphys.pseudodata.indexed_make_replica`, to be used in + conjunction with :py:func:`validphys.pseudodata.recreate_fit_pseudodata`. + Defined at top level to be picklable for use with multiprocessing. """ - if t0pdfset is not None: - t0pdfset = t0pdfset.load_t0() + res = API.indexed_make_replica( + dataset_inputs=dataset_inputs, + theoryid=theoryid, + use_cuts="fromfit", + fit=str(fit), + mcseed=mcseed, + genrep=genrep, + replica=replica, + ) + return res + + +def recreate_fit_pseudodata(fit, fitinputcontext, num_fitted_replicas, NPROC=None): + """Function that recreates the pseudodata seen by each Monte Carlo replica. + Returns a dataframe with each column labelled ``replica i`` for ``i`` 1 through + to the number of fitted replicas. + + # TODO: Finish this example + Example + ------- + Notes + ----- + By default this function computes the replicas in parallel using the maximum + number of available CPUs. Consider setting the ``NPROC`` flag to something + smaller to leave resources available. + """ # The + 1 coming from the fact that we wish to # include the last replica - replica = range(1, num_fitted_replicas + 1) - - trvlseed, mcseed, genrep = [ - fit.as_input().get(i) - for i in ["trvlseed", "mcseed", "genrep"] - ] - - # common_data_reader expects None if genrep is False - if genrep: - replicas_mcseed = [ - replica_mcseed(rep, mcseed, genrep) for rep in replica - ] + replicas = range(1, num_fitted_replicas + 1) + + mcseed = fit.as_input()["mcseed"] + genrep = fit.as_input()["genrep"] + + dataset_inputs = fit.as_input()['dataset_inputs'] + theoryid = fitinputcontext["theoryid"].id + + # Because of the fact that only global functions can be + # pickled, we need to define the task as a high level function. + # As such we need to do this annoying repeating of function arguments, + # since only `replica` changes between monte carlo replicas, but + # dataset_inputs, fit, mcseed and genrep stay identical. See: + # https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments + repeated_di = [dataset_inputs] * num_fitted_replicas + repeated_theoryid = [theoryid] * num_fitted_replicas + repeated_fit = [str(fit)] * num_fitted_replicas + repeated_mcseed = [mcseed] * num_fitted_replicas + repeated_genrep = [genrep] * num_fitted_replicas + # Import here to avoid cyclical imports, but we don't + # want to import this in the task function because + # loading the api is expensive. + from validphys.api import API + repeated_api = [API] * num_fitted_replicas + + args = zip( + repeated_di, + repeated_theoryid, + repeated_fit, + repeated_mcseed, + repeated_genrep, + repeated_api, + replicas + ) + + if NPROC is None: + NPROC = mp.cpu_count() + log.warning( + f"Using all {NPROC} cores available, this may be dangerous " + "especially for use on a cluster. Consider setting the NPROC " + "variable to something sensible." + ) + + if NPROC == 1: + res = list(map(lambda x: _make_replica_task(*x), args)) else: - replicas_mcseed = None + with mp.Pool(processes=NPROC) as pool: + res = pool.starmap(_make_replica_task, args) - replicas_trvlseeds = [replica_trvlseed(rep, trvlseed) for rep in replica] + df = pd.concat(res, axis=1) + df.columns = [f"replica {i}" for i in replicas] - def task(d, mcseeds, trvlseeds, replicas): - all_exp_infos = [[] for _ in range(len(mcseeds))] - for exp in experiments: - all_exp_dicts = reader.common_data_reader( - exp, t0pdfset, replica_seeds=mcseeds, trval_seeds=trvlseeds - ) - for i, exp_dict in enumerate(all_exp_dicts): - all_exp_infos[i].append(exp_dict) - for i, j in zip(all_exp_infos, replicas): - d[j] = i + return df - if NPROC == 1: - pseudodata_dicts = dict() - task(pseudodata_dicts, replicas_mcseed, replicas_trvlseeds, replica) - else: - with mp.Manager() as manager: - d = manager.dict() - - if NPROC is None: - NPROC = mp.cpu_count() - log.warning( - f"Using all {NPROC} cores available, this may be dangerous " - "especially for use on a cluster. Consider setting the NPROC " - "variable to something sensible." - ) - processes = [] - - # convert sub arrays back to lists, use tolist to get builtin python - # types. - list_split = lambda lst, n: [ - arr.tolist() for arr in np.array_split(lst, n) - ] - batched_mcseeds = list_split(replicas_mcseed, NPROC) - batched_trvlseeds = list_split(replicas_trvlseeds, NPROC) - batched_replica_num = list_split(replica, NPROC) - for mc_batch, trvl_batch, replica_batch in zip( - batched_mcseeds, batched_trvlseeds, batched_replica_num - ): - p = mp.Process( - target=task, - args=(d, mc_batch, trvl_batch, replica_batch,), - ) - p.start() - processes.append(p) - for p in processes: - p.join() - pseudodata_dicts = dict(d) - return pseudodata_dicts - - -def get_pseudodata(fitted_pseudodata, fitted_replica_indexes): - """Pseudodata used during fitting but correctly accounting for - the postfit reordering. + +def recreate_pdf_pseudodata(pdf, NPROC=None): + """Function that recreates the pseudodata of a PDF set + accounting for the postfit reordering. + + # TODO: finish this example + Example + ------- """ - # By collecting over `fitcontext` we create a list of length - # one. - fitted_pseudodata = fitted_pseudodata[0] - return [fitted_pseudodata[i] for i in fitted_replica_indexes] + # Import here to avoid cyclical imports + from validphys.api import API + + fit_pseudodata = API.recreate_fit_pseudodata(fit=str(pdf), NPROC=NPROC) + fitted_replica_indexes = API.fitted_replica_indexes(pdf=str(pdf)) + return fit_pseudodata.loc[:, fitted_replica_indexes] def _datasets_mask(experiment_list):