Skip to content

Commit

Permalink
Recreating python psueododata
Browse files Browse the repository at this point in the history
  • Loading branch information
siranipour committed Jul 14, 2021
1 parent 74d9183 commit d2e5304
Showing 1 changed file with 96 additions and 141 deletions.
237 changes: 96 additions & 141 deletions validphys2/src/validphys/pseudodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@
import numpy as np
import pandas as pd

from validphys.checks import check_cuts_fromfit, check_darwin_single_process
from validphys.checks import check_cuts_fromfit
from validphys.covmats import INTRA_DATASET_SYS_NAME

from reportengine import collect

from validphys.n3fit_data import replica_mcseed, replica_trvlseed
import validphys.n3fit_data_utils as reader

log = logging.getLogger(__name__)

DataTrValSpec = namedtuple('DataTrValSpec', ['pseudodata', 'tr_idx', 'val_idx'])

fitted_pseudodata = collect('fitted_pseudodata_internal', ('fitcontext',))

context_index = collect("groups_index", ("fitcontext",))

@check_cuts_fromfit
Expand Down Expand Up @@ -238,152 +235,110 @@ def indexed_make_replica(groups_index, make_replica):
return pd.DataFrame(make_replica, index=groups_index, columns=["data"])


@check_darwin_single_process
def fitted_pseudodata_internal(fit, experiments, num_fitted_replicas, t0pdfset=None, NPROC=None):
"""A function to obtain information about the pseudodata that went
into an N3FIT fit.
Parameters
----------
fit: :py:class:`validphys.core.FitSpec`
experiments:
List of :py:class:`validphys.core.ExeperimentSpec`
num_nnfit_replicas: ``int``
Provided for by :py:mod:`validphys.fitdata`. Equal to the number of
pre-postfit replicas.
t0pdfset: :py:class:`validphys.core.PDF`
NPROC: ``int``
Integer specifying how many cores to run on. Default is
``mp.cpu_count()``
Example
-------
Create a ``YAML`` file say ``runcard_for_pseudodata.yaml``
.. code-block:: YAML
:caption: runcard_for_pseudodata.yaml
pdf: PN3_DIS_130519
fit: PN3_DIS_130519
experiments:
from_: fit
theory:
from_: fit
t0pdfset:
from_: datacuts
datacuts:
from_: fit
theoryid:
from_: theory
use_cuts: fromfit
Then run
>>> with open("./runcard_for_pseudodata.yaml", 'r') as stream:
... from reportengine.compat import yaml
... runcard = yaml.safe_load(stream)
>>> from validphys.api import API
>>> API.get_pseudodata_internal(**runcard)
Notes
-----
- This is a wrapper for the ``fitted_pseudodata`` action
which knows that ``experiments``, *must* come from fit
and similarly ``PDF`` and ``theoryid`` *must* be the same as
that of ``fit`` and so on.
- This function returns the pseudodata for the replicas
pre-postfit. Postfit discards some replicas and rearranges
the order. The correpsondence is done by the
:py:func:`get_pseudodata`
function.
- This code runs in parallel to increase efficiency.
def _make_replica_task(dataset_inputs, theoryid, fit, mcseed, genrep, API, replica):
"""A global function for
:py:func:`validphys.pseudodata.indexed_make_replica`, to be used in
conjunction with :py:func:`validphys.pseudodata.recreate_fit_pseudodata`.
Defined at top level to be picklable for use with multiprocessing.
"""
if t0pdfset is not None:
t0pdfset = t0pdfset.load_t0()
res = API.indexed_make_replica(
dataset_inputs=dataset_inputs,
theoryid=theoryid,
use_cuts="fromfit",
fit=str(fit),
mcseed=mcseed,
genrep=genrep,
replica=replica,
)
return res


def recreate_fit_pseudodata(fit, fitinputcontext, num_fitted_replicas, NPROC=None):
"""Function that recreates the pseudodata seen by each Monte Carlo replica.
Returns a dataframe with each column labelled ``replica i`` for ``i`` 1 through
to the number of fitted replicas.
# TODO: Finish this example
Example
-------
Notes
-----
By default this function computes the replicas in parallel using the maximum
number of available CPUs. Consider setting the ``NPROC`` flag to something
smaller to leave resources available.
"""
# The + 1 coming from the fact that we wish to
# include the last replica
replica = range(1, num_fitted_replicas + 1)

trvlseed, mcseed, genrep = [
fit.as_input().get(i)
for i in ["trvlseed", "mcseed", "genrep"]
]

# common_data_reader expects None if genrep is False
if genrep:
replicas_mcseed = [
replica_mcseed(rep, mcseed, genrep) for rep in replica
]
replicas = range(1, num_fitted_replicas + 1)

mcseed = fit.as_input()["mcseed"]
genrep = fit.as_input()["genrep"]

dataset_inputs = fit.as_input()['dataset_inputs']
theoryid = fitinputcontext["theoryid"].id

# Because of the fact that only global functions can be
# pickled, we need to define the task as a high level function.
# As such we need to do this annoying repeating of function arguments,
# since only `replica` changes between monte carlo replicas, but
# dataset_inputs, fit, mcseed and genrep stay identical. See:
# https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments
repeated_di = [dataset_inputs] * num_fitted_replicas
repeated_theoryid = [theoryid] * num_fitted_replicas
repeated_fit = [str(fit)] * num_fitted_replicas
repeated_mcseed = [mcseed] * num_fitted_replicas
repeated_genrep = [genrep] * num_fitted_replicas
# Import here to avoid cyclical imports, but we don't
# want to import this in the task function because
# loading the api is expensive.
from validphys.api import API
repeated_api = [API] * num_fitted_replicas

args = zip(
repeated_di,
repeated_theoryid,
repeated_fit,
repeated_mcseed,
repeated_genrep,
repeated_api,
replicas
)

if NPROC is None:
NPROC = mp.cpu_count()
log.warning(
f"Using all {NPROC} cores available, this may be dangerous "
"especially for use on a cluster. Consider setting the NPROC "
"variable to something sensible."
)

if NPROC == 1:
res = list(map(lambda x: _make_replica_task(*x), args))
else:
replicas_mcseed = None
with mp.Pool(processes=NPROC) as pool:
res = pool.starmap(_make_replica_task, args)

replicas_trvlseeds = [replica_trvlseed(rep, trvlseed) for rep in replica]
df = pd.concat(res, axis=1)
df.columns = [f"replica {i}" for i in replicas]

def task(d, mcseeds, trvlseeds, replicas):
all_exp_infos = [[] for _ in range(len(mcseeds))]
for exp in experiments:
all_exp_dicts = reader.common_data_reader(
exp, t0pdfset, replica_seeds=mcseeds, trval_seeds=trvlseeds
)
for i, exp_dict in enumerate(all_exp_dicts):
all_exp_infos[i].append(exp_dict)
for i, j in zip(all_exp_infos, replicas):
d[j] = i
return df

if NPROC == 1:
pseudodata_dicts = dict()
task(pseudodata_dicts, replicas_mcseed, replicas_trvlseeds, replica)
else:
with mp.Manager() as manager:
d = manager.dict()

if NPROC is None:
NPROC = mp.cpu_count()
log.warning(
f"Using all {NPROC} cores available, this may be dangerous "
"especially for use on a cluster. Consider setting the NPROC "
"variable to something sensible."
)
processes = []

# convert sub arrays back to lists, use tolist to get builtin python
# types.
list_split = lambda lst, n: [
arr.tolist() for arr in np.array_split(lst, n)
]
batched_mcseeds = list_split(replicas_mcseed, NPROC)
batched_trvlseeds = list_split(replicas_trvlseeds, NPROC)
batched_replica_num = list_split(replica, NPROC)
for mc_batch, trvl_batch, replica_batch in zip(
batched_mcseeds, batched_trvlseeds, batched_replica_num
):
p = mp.Process(
target=task,
args=(d, mc_batch, trvl_batch, replica_batch,),
)
p.start()
processes.append(p)
for p in processes:
p.join()
pseudodata_dicts = dict(d)
return pseudodata_dicts


def get_pseudodata(fitted_pseudodata, fitted_replica_indexes):
"""Pseudodata used during fitting but correctly accounting for
the postfit reordering.

def recreate_pdf_pseudodata(pdf, NPROC=None):
"""Function that recreates the pseudodata of a PDF set
accounting for the postfit reordering.
# TODO: finish this example
Example
-------
"""
# By collecting over `fitcontext` we create a list of length
# one.
fitted_pseudodata = fitted_pseudodata[0]
return [fitted_pseudodata[i] for i in fitted_replica_indexes]
# Import here to avoid cyclical imports
from validphys.api import API

fit_pseudodata = API.recreate_fit_pseudodata(fit=str(pdf), NPROC=NPROC)
fitted_replica_indexes = API.fitted_replica_indexes(pdf=str(pdf))
return fit_pseudodata.loc[:, fitted_replica_indexes]


def _datasets_mask(experiment_list):
Expand Down

0 comments on commit d2e5304

Please sign in to comment.