Skip to content

Commit 0e94461

Browse files
committed
add quantile models to monte cover
1 parent dcfef4c commit 0e94461

File tree

4 files changed

+665
-0
lines changed

4 files changed

+665
-0
lines changed

monte-cover/src/montecover/irm/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,26 @@
22

33
from montecover.irm.apo import APOCoverageSimulation
44
from montecover.irm.apos import APOSCoverageSimulation
5+
from montecover.irm.cvar import CVARCoverageSimulation
56
from montecover.irm.irm_ate import IRMATECoverageSimulation
67
from montecover.irm.irm_ate_sensitivity import IRMATESensitivityCoverageSimulation
78
from montecover.irm.irm_atte import IRMATTECoverageSimulation
89
from montecover.irm.irm_atte_sensitivity import IRMATTESensitivityCoverageSimulation
910
from montecover.irm.irm_cate import IRMCATECoverageSimulation
1011
from montecover.irm.irm_gate import IRMGATECoverageSimulation
12+
from montecover.irm.lpq import LPQCoverageSimulation
13+
from montecover.irm.pq import PQCoverageSimulation
1114

1215
__all__ = [
1316
"APOCoverageSimulation",
1417
"APOSCoverageSimulation",
18+
"CVARCoverageSimulation",
1519
"IRMATECoverageSimulation",
1620
"IRMATESensitivityCoverageSimulation",
1721
"IRMATTECoverageSimulation",
1822
"IRMATTESensitivityCoverageSimulation",
1923
"IRMCATECoverageSimulation",
2024
"IRMGATECoverageSimulation",
25+
"LPQCoverageSimulation",
26+
"PQCoverageSimulation",
2127
]
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
from typing import Any, Dict, Optional
2+
3+
import doubleml as dml
4+
import numpy as np
5+
import pandas as pd
6+
7+
from montecover.base import BaseSimulation
8+
from montecover.utils import create_learner_from_config
9+
10+
11+
# define loc-scale model
12+
def f_loc(D, X):
13+
loc = 0.5 * D + 2 * D * X[:, 4] + 2.0 * (X[:, 1] > 0.1) - 1.7 * (X[:, 0] * X[:, 2] > 0) - 3 * X[:, 3]
14+
return loc
15+
16+
17+
def f_scale(D, X):
18+
scale = np.sqrt(0.5 * D + 0.3 * D * X[:, 1] + 2)
19+
return scale
20+
21+
22+
def dgp(n=200, p=5):
23+
X = np.random.uniform(-1, 1, size=[n, p])
24+
D = ((X[:, 1] - X[:, 3] + 1.5 * (X[:, 0] > 0) + np.random.normal(size=n)) > 0) * 1.0
25+
epsilon = np.random.normal(size=n)
26+
27+
Y = f_loc(D, X) + f_scale(D, X) * epsilon
28+
return Y, X, D, epsilon
29+
30+
31+
class CVARCoverageSimulation(BaseSimulation):
32+
"""Simulation class for coverage properties of DoubleMLCVAR for Conditional Value at Risk estimation."""
33+
34+
def __init__(
35+
self,
36+
config_file: str,
37+
suppress_warnings: bool = True,
38+
log_level: str = "INFO",
39+
log_file: Optional[str] = None,
40+
):
41+
super().__init__(
42+
config_file=config_file,
43+
suppress_warnings=suppress_warnings,
44+
log_level=log_level,
45+
log_file=log_file,
46+
)
47+
48+
# Calculate oracle values
49+
self._calculate_oracle_values()
50+
51+
def _process_config_parameters(self):
52+
"""Process simulation-specific parameters from config"""
53+
# Process ML models in parameter grid
54+
assert "learners" in self.dml_parameters, "No learners specified in the config file"
55+
56+
required_learners = ["ml_g", "ml_m"]
57+
for learner in self.dml_parameters["learners"]:
58+
for ml in required_learners:
59+
assert ml in learner, f"No {ml} specified in the config file"
60+
61+
def _calculate_oracle_values(self):
62+
"""Calculate oracle values for the simulation."""
63+
self.logger.info("Calculating oracle values")
64+
65+
# Parameters
66+
n_true = int(10e6)
67+
tau_vec = self.dml_parameters["tau_vec"][0]
68+
p = self.dgp_parameters["dim_x"][0]
69+
70+
_, X_true, _, epsilon_true = dgp(n=n_true, p=p)
71+
D1 = np.ones(n_true)
72+
D0 = np.zeros(n_true)
73+
74+
Y1 = f_loc(D1, X_true) + f_scale(D1, X_true) * epsilon_true
75+
Y0 = f_loc(D0, X_true) + f_scale(D0, X_true) * epsilon_true
76+
77+
Y1_quant = np.quantile(Y1, q=tau_vec)
78+
Y0_quant = np.quantile(Y0, q=tau_vec)
79+
Y1_cvar = [Y1[Y1 >= quant].mean() for quant in Y1_quant]
80+
Y0_cvar = [Y0[Y0 >= quant].mean() for quant in Y0_quant]
81+
effect_cvar = np.array(Y1_cvar) - np.array(Y0_cvar)
82+
83+
self.oracle_values = dict()
84+
self.oracle_values["effect_cvar"] = effect_cvar
85+
self.oracle_values["Y1_cvar"] = Y1_cvar
86+
self.oracle_values["Y0_cvar"] = Y0_cvar
87+
88+
self.logger.info(f"Oracle values: {self.oracle_values}")
89+
90+
def run_single_rep(self, dml_data: dml.DoubleMLData, dml_params: Dict[str, Any]) -> Dict[str, Any]:
91+
"""Run a single repetition with the given parameters."""
92+
# Extract parameters
93+
learner_config = dml_params["learners"]
94+
learner_g_name, ml_g = create_learner_from_config(learner_config["ml_g"])
95+
learner_m_name, ml_m = create_learner_from_config(learner_config["ml_m"])
96+
tau_vec = dml_params["tau_vec"]
97+
trimming_threshold = dml_params["trimming_threshold"]
98+
Y0_cvar = self.oracle_values["Y0_cvar"]
99+
Y1_cvar = self.oracle_values["Y1_cvar"]
100+
effect_cvar = self.oracle_values["effect_cvar"]
101+
102+
# Model
103+
dml_model = dml.DoubleMLQTE(
104+
obj_dml_data=dml_data,
105+
ml_g=ml_g,
106+
ml_m=ml_m,
107+
score="CVaR",
108+
quantiles=tau_vec,
109+
trimming_threshold=trimming_threshold,
110+
)
111+
dml_model.fit()
112+
dml_model.bootstrap(n_rep_boot=2000)
113+
114+
result = {
115+
"Y0_coverage": [],
116+
"Y1_coverage": [],
117+
"effect_coverage": [],
118+
}
119+
for level in self.confidence_parameters["level"]:
120+
level_result = dict()
121+
level_result["effect_coverage"] = self._compute_coverage(
122+
thetas=dml_model.coef,
123+
oracle_thetas=effect_cvar,
124+
confint=dml_model.confint(level=level),
125+
joint_confint=dml_model.confint(level=level, joint=True),
126+
)
127+
128+
Y0_estimates = np.full(len(tau_vec), np.nan)
129+
Y1_estimates = np.full(len(tau_vec), np.nan)
130+
131+
Y0_confint = np.full((len(tau_vec), 2), np.nan)
132+
Y1_confint = np.full((len(tau_vec), 2), np.nan)
133+
134+
for tau_idx in range(len(tau_vec)):
135+
model_Y0 = dml_model.modellist_0[tau_idx]
136+
model_Y1 = dml_model.modellist_1[tau_idx]
137+
138+
Y0_estimates[tau_idx] = model_Y0.coef
139+
Y1_estimates[tau_idx] = model_Y1.coef
140+
141+
Y0_confint[tau_idx, :] = model_Y0.confint(level=level)
142+
Y1_confint[tau_idx, :] = model_Y1.confint(level=level)
143+
144+
Y0_confint_df = pd.DataFrame(Y0_confint, columns=["lower", "upper"])
145+
Y1_confint_df = pd.DataFrame(Y1_confint, columns=["lower", "upper"])
146+
147+
level_result["Y0_coverage"] = self._compute_coverage(
148+
thetas=Y0_estimates,
149+
oracle_thetas=Y0_cvar,
150+
confint=Y0_confint_df,
151+
joint_confint=None,
152+
)
153+
154+
level_result["Y1_coverage"] = self._compute_coverage(
155+
thetas=Y1_estimates,
156+
oracle_thetas=Y1_cvar,
157+
confint=Y1_confint_df,
158+
joint_confint=None,
159+
)
160+
161+
# add parameters to the result
162+
for res_metric in level_result.values():
163+
res_metric.update(
164+
{
165+
"Learner g": learner_g_name,
166+
"Learner m": learner_m_name,
167+
"level": level,
168+
}
169+
)
170+
for key, res in level_result.items():
171+
result[key].append(res)
172+
173+
return result
174+
175+
def summarize_results(self):
176+
"""Summarize the simulation results."""
177+
self.logger.info("Summarizing simulation results")
178+
179+
# Group by parameter combinations
180+
groupby_cols = ["Learner g", "Learner m", "level"]
181+
aggregation_dict = {
182+
"Coverage": "mean",
183+
"CI Length": "mean",
184+
"Bias": "mean",
185+
"repetition": "count",
186+
}
187+
188+
result_summary = dict()
189+
# Aggregate results for Y0 and Y1
190+
for result_name in ["Y0_coverage", "Y1_coverage"]:
191+
df = self.results[result_name]
192+
result_summary[result_name] = df.groupby(groupby_cols).agg(aggregation_dict).reset_index()
193+
self.logger.debug(f"Summarized {result_name} results")
194+
195+
uniform_aggregation_dict = {
196+
"Coverage": "mean",
197+
"CI Length": "mean",
198+
"Bias": "mean",
199+
"Uniform Coverage": "mean",
200+
"Uniform CI Length": "mean",
201+
"repetition": "count",
202+
}
203+
result_summary["effect_coverage"] = (
204+
self.results["effect_coverage"].groupby(groupby_cols).agg(uniform_aggregation_dict).reset_index()
205+
)
206+
self.logger.debug("Summarized effect_coverage results")
207+
208+
return result_summary
209+
210+
def _generate_dml_data(self, dgp_params: Dict[str, Any]) -> dml.DoubleMLData:
211+
"""Generate data for the simulation."""
212+
Y, X, D, _ = dgp(n=dgp_params["n_obs"], p=dgp_params["dim_x"])
213+
dml_data = dml.DoubleMLData.from_arrays(X, Y, D)
214+
return dml_data

0 commit comments

Comments
 (0)