Skip to content

Commit

Permalink
Using collect approach
Browse files Browse the repository at this point in the history
  • Loading branch information
siranipour committed Jul 14, 2021
1 parent 9d9ae85 commit 70ae840
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 85 deletions.
26 changes: 26 additions & 0 deletions validphys2/src/validphys/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SimilarCuts,
ThCovMatSpec,
)
from validphys.fitdata import fitted_replica_indexes, num_fitted_replicas
from validphys.loader import (
Loader,
LoaderError,
Expand Down Expand Up @@ -228,6 +229,31 @@ def parse_fit(self, fit: str):
except LoadFailedError as e:
raise ConfigError(str(e), fit, self.loader.available_fits)

def produce_fitreplicas(self, fit):
num_replicas = num_fitted_replicas(fit)
return NSList(range(1, num_replicas + 1), nskey='replica')

def produce_pdfreplicas(self, fitpdf):
pdf = fitpdf['pdf']
replicas = fitted_replica_indexes(pdf)
return NSList(replicas, nskey='replica')

def produce_fitenvironment(self, fitinputcontext):
theoryid = fitinputcontext['theoryid']
data_input = fitinputcontext['data_input']

_, fitting_ns = self.parse_from_("fit", "fitting", write=False)
mcseed = fitting_ns['mcseed']
genrep = fitting_ns['genrep']

return {
"dataset_inputs":data_input,
"theoryid":theoryid,
"use_cuts":CutsPolicy.FROMFIT,
"mcseed":mcseed,
"genrep":genrep
}

def produce_fitcontext(self, fitinputcontext, fitpdf):
"""Set PDF, theory ID and data input from the fit config"""

Expand Down
96 changes: 11 additions & 85 deletions validphys2/src/validphys/pseudodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,94 +253,20 @@ def _make_replica_task(dataset_inputs, theoryid, fit, mcseed, genrep, API, repli
return res


def recreate_fit_pseudodata(fit, fitinputcontext, num_fitted_replicas, NPROC=None):
"""Function that recreates the pseudodata seen by each Monte Carlo replica.
Returns a dataframe with each column labelled ``replica i`` for ``i`` 1 through
to the number of fitted replicas.
# TODO: Finish this example
Example
-------
Notes
-----
By default this function computes the replicas in parallel using the maximum
number of available CPUs. Consider setting the ``NPROC`` flag to something
smaller to leave resources available.
"""
# The + 1 coming from the fact that we wish to
# include the last replica
replicas = range(1, num_fitted_replicas + 1)

mcseed = fit.as_input()["mcseed"]
genrep = fit.as_input()["genrep"]

dataset_inputs = fit.as_input()['dataset_inputs']
theoryid = fitinputcontext["theoryid"].id

# Because of the fact that only global functions can be
# pickled, we need to define the task as a high level function.
# As such we need to do this annoying repeating of function arguments,
# since only `replica` changes between monte carlo replicas, but
# dataset_inputs, fit, mcseed and genrep stay identical. See:
# https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments
repeated_di = [dataset_inputs] * num_fitted_replicas
repeated_theoryid = [theoryid] * num_fitted_replicas
repeated_fit = [str(fit)] * num_fitted_replicas
repeated_mcseed = [mcseed] * num_fitted_replicas
repeated_genrep = [genrep] * num_fitted_replicas
# Import here to avoid cyclical imports, but we don't
# want to import this in the task function because
# loading the api is expensive.
from validphys.api import API
repeated_api = [API] * num_fitted_replicas

args = zip(
repeated_di,
repeated_theoryid,
repeated_fit,
repeated_mcseed,
repeated_genrep,
repeated_api,
replicas
)

if NPROC is None:
NPROC = mp.cpu_count()
log.warning(
f"Using all {NPROC} cores available, this may be dangerous "
"especially for use on a cluster. Consider setting the NPROC "
"variable to something sensible."
)

if NPROC == 1:
res = list(map(lambda x: _make_replica_task(*x), args))
else:
with mp.Pool(processes=NPROC) as pool:
res = pool.starmap(_make_replica_task, args)

df = pd.concat(res, axis=1)
df.columns = [f"replica {i}" for i in replicas]
_recreate_fit_pseudodata = collect('indexed_make_replica', ('fitreplicas', 'fitenvironment'))
_recreate_pdf_pseudodata = collect('indexed_make_replica', ('pdfreplicas', 'fitenvironment'))

def recreate_fit_pseudodata(_recreate_fit_pseudodata, fitreplicas):
columns = [f'replica {i}' for i in fitreplicas]
df = pd.concat(_recreate_fit_pseudodata, axis=1)
df.columns = columns
return df


def recreate_pdf_pseudodata(pdf, NPROC=None):
"""Function that recreates the pseudodata of a PDF set
accounting for the postfit reordering.
# TODO: finish this example
Example
-------
"""
# Import here to avoid cyclical imports
from validphys.api import API

fit_pseudodata = API.recreate_fit_pseudodata(fit=str(pdf), NPROC=NPROC)
fitted_replica_indexes = API.fitted_replica_indexes(pdf=str(pdf))
cols = [f"replica {i}" for i in fitted_replica_indexes]
return fit_pseudodata.loc[:, cols]

def recreate_pdf_pseudodata(_recreate_pdf_pseudodata, pdfreplicas):
columns = [f'replica {i}' for i in pdfreplicas]
df = pd.concat(_recreate_pdf_pseudodata, axis=1)
df.columns = columns
return df

def _datasets_mask(experiment_list):
"""Function to obtain a per datasets training/validation
Expand Down

0 comments on commit 70ae840

Please sign in to comment.