From 8905a18f9668ae422c28935bc9ad74503c7db16d Mon Sep 17 00:00:00 2001 From: Lara Date: Fri, 29 Nov 2024 11:56:10 +0100 Subject: [PATCH 1/5] Introduce combining processes in ML and rewiegting strategies --- hbw/config/processes.py | 21 ++++++ hbw/ml/base.py | 1 + hbw/ml/data_loader.py | 163 +++++++++++++++++++++++++++++++++++++--- hbw/ml/derived/dl.py | 66 +++++++++++++++- hbw/ml/tf_util.py | 2 +- hbw/tasks/ml.py | 23 +++++- law.cfg | 11 ++- law.nocert.cfg | 1 + 8 files changed, 275 insertions(+), 13 deletions(-) diff --git a/hbw/config/processes.py b/hbw/config/processes.py index fab2b61e..ac624a65 100644 --- a/hbw/config/processes.py +++ b/hbw/config/processes.py @@ -10,6 +10,7 @@ from scinum import Number from cmsdb.util import add_decay_process +from columnflow.util import DotDict from hbw.config.styling import color_palette @@ -199,3 +200,23 @@ def configure_hbw_processes(config: od.Config): if config.has_process(bg): bg = config.get_process(bg) background.add_process(bg) + + +from random import randint + + +def create_combined_proc_forML(config: od.Config, proc_name: str, proc_dict: dict, color=None): + + combining_proc = [] + for proc in proc_dict.sub_processes: + combining_proc.append(config.get_process(proc, default=None)) + proc_name = add_parent_process(config, + combining_proc, + name=proc_name, + id=randint(10000000, 99999999), + # TODO: random number (could by chance be a already used number --> should be checked) + label=proc_dict.get("label", "combined custom process"), + color=proc_dict.get("color", None), + ) + ml_config = DotDict({"weighting": proc_dict.get("weighting", None), "sub_processes": proc_dict.sub_processes}) + proc_name.x.ml_config = ml_config diff --git a/hbw/ml/base.py b/hbw/ml/base.py index 1e13a1a5..c1774524 100644 --- a/hbw/ml/base.py +++ b/hbw/ml/base.py @@ -273,6 +273,7 @@ def uses(self, config_inst: od.Config) -> set[Route | str]: columns = {"mli_*"} # TODO: switch to full event weight # TODO: this might not work with data, to be checked + columns.add("process_id") columns.add("normalization_weight") columns.add("stitched_normalization_weight") columns.add("event_weight") diff --git a/hbw/ml/data_loader.py b/hbw/ml/data_loader.py index b96874f0..dd72394a 100644 --- a/hbw/ml/data_loader.py +++ b/hbw/ml/data_loader.py @@ -19,6 +19,60 @@ logger = law.logger.get_logger(__name__) +def get_proc_mask( + events: ak.Array, + proc: str | od.Process, + config_inst: od.Config | None = None, +) -> np.ndarray: + """ + Creates a list of the Ids of all subprocesses and teh corresponding mask for all events. + + :param events: Event array + :param config_inst: An instance of the Config, can be None if Porcess instance is given. + :param proc: Either string or process instance. + """ + # get process instance + if config_inst: + proc_inst = config_inst.get_process(proc) + elif isinstance(proc, od.Process): + proc_inst = proc + + proc_id = events.process_id + unique_proc_ids = set(proc_id) + + # get list of Ids that are belonging to the process and are present in the event array + sub_id = [ + proc_inst.id + for proc_inst, _, _ in proc_inst.walk_processes(include_self=True) + if proc_inst.id in unique_proc_ids + ] + + # Create process mask + proc_mask = np.isin(proc_id, sub_id) + return proc_mask, sub_id + + +def del_sub_proc_stats( + stats: dict, + proc: str, + sub_id: list, +) -> np.ndarray: + """ + Function deletes dict keys which are not part of the requested process + + :param stats: Dictionaire containing ML stats for each process. + :param proc: String of the process. + :param sub_id: List of ids of sub processes that should be reatined (!). + """ + id_list = list(stats[proc]["num_events_per_process"].keys()) + item_list = list(stats[proc].keys()) + for id in id_list: + if int(id) not in sub_id: + for item in item_list: + if "per_process" in item: + del stats[proc][item][id] + + def input_features_sanity_checks(ml_model_inst: MLModel, input_features: list[str]): """ Perform sanity checks on the input features. @@ -61,6 +115,8 @@ class MLDatasetLoader: shuffle: bool = True input_arrays: tuple = ("features", "weights", "train_weights", "equal_weights") + # input_arrays: tuple = ("features", "weights", _ + # _ "equal_train_weights", "xsec_train_weights", "train_weights", "equal_weights") evaluation_arrays: tuple = ("prediction",) def __init__(self, ml_model_inst: MLModel, process: "str", events: ak.Array, stats: dict | None = None): @@ -78,8 +134,12 @@ def __init__(self, ml_model_inst: MLModel, process: "str", events: ak.Array, sta """ self._ml_model_inst = ml_model_inst self._process = process + + proc_mask, _ = get_proc_mask(events, process, ml_model_inst.config_inst) + # TODO: die ohne _per_process müssen auch noch, still, per fold never make sense then anymore -> DISCUSS self._stats = stats - self._events = events + # del_sub_proc_stats(process, sub_id) + self._events = events[proc_mask] def __repr__(self): return f"{self.__class__.__name__}({self.ml_model_inst.cls_name}, {self.process})" @@ -185,10 +245,70 @@ def shuffle_indices(self) -> np.ndarray: self._shuffle_indices = np.random.permutation(self.n_events) return self._shuffle_indices + def get_xsec_train_weights(self) -> np.ndarray: + """ + Weighting such that each event has roughly the same weight, + sub processes are weighted accoridng to their cross section + """ + if hasattr(self, "_xsec_train_weights"): + return self._xsec_train_weights + + if not self.stats: + raise Exception("cannot determine train weights without stats") + + _, sub_id = get_proc_mask(self._events, self.process, self.ml_model_inst.config_inst) + sum_abs_weights = np.sum([self.stats[self.process]["sum_abs_weights_per_process"][str(id)] for id in sub_id]) + num_events = np.sum([self.stats[self.process]["num_events_per_process"][str(id)] for id in sub_id]) + + xsec_train_weights = self.weights / sum_abs_weights * num_events + + return xsec_train_weights + + def get_equal_train_weights(self) -> np.ndarray: + """ + Weighting such that events of each sub processes are weighted equally + """ + if hasattr(self, "_equally_train_weights"): + return self._equal_train_weights + + if not self.stats: + raise Exception("cannot determine train weights without stats") + + combined_proc_inst = self.ml_model_inst.config_inst.get_process(self.process) + targeted_sum_of_weights_per_process = ( + self.stats[self.process]["num_events"] / len(combined_proc_inst.x.ml_config.sub_processes) + ) + equal_train_weights = ak.full_like(self.weights, 1.) + sub_class_factors = {} + + for proc in combined_proc_inst.x.ml_config.sub_processes: + proc_mask, sub_id = get_proc_mask(self._events, proc, self.ml_model_inst.config_inst) + sum_pos_weights_per_sub_proc = 0. + sum_pos_weights_per_proc = self.stats[self.process]["sum_pos_weights_per_process"] + + for id in sub_id: + id = str(id) + if id in self.stats[self.process]["num_events_per_process"]: + sum_pos_weights_per_sub_proc += sum_pos_weights_per_proc[id] + + if sum_pos_weights_per_sub_proc == 0: + norm_const_per_proc = 1. + logger.info( + f"No weight sum found in stats for sub process {proc}." + f"Normalization constant set to 1 but results are probably not correct.") + else: + norm_const_per_proc = targeted_sum_of_weights_per_process / sum_pos_weights_per_sub_proc + logger.info(f"Normalizing constant for {proc} is {norm_const_per_proc}") + + sub_class_factors[proc] = norm_const_per_proc + equal_train_weights = np.where(proc_mask, self.weights * norm_const_per_proc, equal_train_weights) + + return equal_train_weights + @property def train_weights(self) -> np.ndarray: """ - Weighting such that each event has roughly the same weight + Weighting according to the parameters set in the ML model config """ if hasattr(self, "_train_weights"): return self._train_weights @@ -196,10 +316,19 @@ def train_weights(self) -> np.ndarray: if not self.stats: raise Exception("cannot determine train weights without stats") - sum_abs_weights = self.stats[self.process]["sum_abs_weights"] - num_events = self.stats[self.process]["num_events"] + # TODO: hier muss np.float gemacht werden + proc = self.process + proc_inst = self.ml_model_inst.config_inst.get_process(proc) + if proc_inst.x("ml_config", None) and proc_inst.x.ml_config.weighting == "equal": + train_weights = self.get_equal_train_weights() + else: + train_weights = self.get_xsec_train_weights() + # self._train_weights = self.get_equal_train_weights() + # else: + # self._train_weights = self.get_xsec_train_weights() + + self._train_weights = ak.to_numpy(train_weights).astype(np.float32) - self._train_weights = self.weights / sum_abs_weights * num_events return self._train_weights @property @@ -213,11 +342,26 @@ def equal_weights(self) -> np.ndarray: if not self.stats: raise Exception("cannot determine val weights without stats") + # TODO: per process pls [done] and now please tidy up processes = self.ml_model_inst.processes - sum_abs_weights = self.stats[self.process]["sum_abs_weights"] - num_events_per_process = {proc: self.stats[proc]["num_events"] for proc in processes} - - self._validation_weights = self.weights / sum_abs_weights * max(num_events_per_process.values()) + num_events_per_process = {} + for proc in processes: + id_list = list(self.stats[proc]["num_events_per_process"].keys()) + proc_inst = self.ml_model_inst.config_inst.get_process(proc) + sub_id = [ + p_inst.id + for p_inst, _, _ in proc_inst.walk_processes(include_self=True) + if str(p_inst.id) in id_list + ] + if proc == self.process: + sum_abs_weights = np.sum([self.stats[self.process]["sum_abs_weights_per_process"][str(id)] for id in sub_id]) + num_events_per_proc = np.sum([self.stats[proc]["num_events_per_process"][str(id)] for id in sub_id]) + num_events_per_process[proc] = num_events_per_proc + + # sum_abs_weights = self.stats[self.process]["sum_abs_weights"] + # num_events_per_process = {proc: self.stats[proc]["num_events"] for proc in processes} + validation_weights = self.weights / sum_abs_weights * max(num_events_per_process.values()) + self._validation_weights = ak.to_numpy(validation_weights).astype(np.float32) return self._validation_weights @@ -544,6 +688,7 @@ def target(self) -> np.ndarray: if self._ml_model_inst.negative_weights == "handle": target[self.m_negative_weights] = 1 - target[self.m_negative_weights] + # NOTE: I think here the targets are somehow 64floats... Maybe check that self._target = target return self._target diff --git a/hbw/ml/derived/dl.py b/hbw/ml/derived/dl.py index bd5d9ff5..5266107b 100644 --- a/hbw/ml/derived/dl.py +++ b/hbw/ml/derived/dl.py @@ -10,11 +10,13 @@ import law -from columnflow.util import maybe_import +from columnflow.util import maybe_import, DotDict from hbw.ml.base import MLClassifierBase from hbw.ml.mixins import DenseModelMixin, ModelFitMixin +from hbw.config.processes import create_combined_proc_forML + np = maybe_import("numpy") ak = maybe_import("awkward") @@ -23,6 +25,8 @@ class DenseClassifierDL(DenseModelMixin, ModelFitMixin, MLClassifierBase): + combine_processes = () + processes = ( "sig", "tt", @@ -135,6 +139,12 @@ def setup(self): # dynamically add variables for the quantities produced by this model # NOTE: since these variables are only used in ConfigTasks, # we do not need to add these variables to all configs + for proc in self.combine_processes: + if proc not in self.config_inst.processes: + proc_name = str(proc) + proc_dict = DotDict(self.combine_processes[proc]) + create_combined_proc_forML(self.config_inst, proc_name, proc_dict) + for proc in self.processes: for config_inst in self.config_insts: if f"mlscore.{proc}" not in config_inst.variables: @@ -180,6 +190,60 @@ def setup(self): # "processes": ["hh_ggf_hbb_hvv2l2nu_kl1_kt1", "tt"], }) +dl_22post_testproc = dl_22post.derive("dl_22post_testproc", cls_dict={ + "training_configs": lambda self, requested_configs: ["l22post"], + "combine_processes": { + "test1": { + # "name": "tt_and_st", + "label": "my label", + "sub_processes": ["st", "hh_ggf_hbb_hvv2l2nu_kl1_kt1"], + "weighting": "equal", + }, + }, + "processes": ["test1", "tt"], +}) +dl_22post_sigtest = dl_22post.derive("dl_22post_sigtest", cls_dict={ + "training_configs": lambda self, requested_configs: ["c22post"], + "combine_processes": { + "sig_01": { + # "name": "tt_and_st", + "label": "signal_kl0_1", + "sub_processes": ["hh_ggf_hbb_hvv2l2nu_kl0_kt1", "hh_ggf_hbb_hvv2l2nu_kl1_kt1"], + "weighting": "equal", + }, + "sig_25": { + # "name": "tt_and_st", + "label": "signal_kl2p45_5", + "sub_processes": ["hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", "hh_ggf_hbb_hvv2l2nu_kl5_kt1"], + "weighting": "xsec", + }, + }, + "processes": ["sig_01", "sig_25"], +}) + +dl_22post_test = dl_22post.derive("dl_22post_sigtest", cls_dict={ + "training_configs": lambda self, requested_configs: ["c22post"], + "combine_processes": { + "signal": { + # "name": "tt_and_st", + "label": "signal_kl0_1", + "sub_processes": [ + "hh_ggf_hbb_hvv2l2nu_kl0_kt1", + "hh_ggf_hbb_hvv2l2nu_kl1_kt1", + "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", + "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + ], + "weighting": "xsec", + }, + "top": { + # "name": "tt_and_st", + "label": "signal_kl2p45_5", + "sub_processes": ["st", "tt"], + "weighting": "equal", + }, + }, + "processes": ["signal", "top"], +}) # # setups with different processes (0: baseline, 1: add SM vbf + single H, 2: add SL+all HH variations) # NOTE: we should decide which signal processes exactly to use: diff --git a/hbw/ml/tf_util.py b/hbw/ml/tf_util.py index 844ab8d4..d187bfcf 100644 --- a/hbw/ml/tf_util.py +++ b/hbw/ml/tf_util.py @@ -41,6 +41,7 @@ def __init__( for proc_inst, arrays in data.items(): arrays = (arrays.features, arrays.target, arrays.train_weights) + # ML WEIGHTING self.tuple_length = len(arrays) self.datasets.append(tf.data.Dataset.from_tensor_slices(arrays)) self.counts.append(len(arrays[0])) @@ -135,7 +136,6 @@ def __iter__(self): continue if do_break: break - yield tuple(tf.concat([batch[i] for batch in dataset_batches], axis=0) for i in range(self.tuple_length)) self.batches_seen += 1 diff --git a/hbw/tasks/ml.py b/hbw/tasks/ml.py index 6730733a..8001aa42 100644 --- a/hbw/tasks/ml.py +++ b/hbw/tasks/ml.py @@ -13,6 +13,9 @@ from collections import defaultdict +import numpy as np + +from hbw.ml.data_loader import get_proc_mask import law import luigi @@ -161,6 +164,7 @@ def run(self): self.output()["model_summary"].dump(model_summary, formatter="yaml") +# TODO: Cross check if the trian weights work as intended --> by reading out porcess_id class MLPreTraining( HBWTask, MLModelTrainingMixin, @@ -380,6 +384,22 @@ def run(self): # initialize the DatasetLoader ml_dataset = self.ml_model_inst.data_loader(self.ml_model_inst, process, events, stats) + # NOTE: Almost closure + sum_train_weights = np.sum(ml_dataset.train_weights) + n_events_per_fold = len(ml_dataset.train_weights) + logger.info(f"Sum of traing weights is: {sum_train_weights} for {n_events_per_fold} {process} events") + + if self.ml_model_inst.config_inst.get_process(process).x("ml_config", None): + + if self.ml_model_inst.config_inst.get_process(process).x.ml_config.weighting == "equal": + for sub_proc in self.ml_model_inst.config_inst.get_process(process).x.ml_config.sub_processes: + proc_mask, sub_id = get_proc_mask(events, sub_proc, self.ml_model_inst.config_inst) + xcheck_weight_sum = np.sum(ml_dataset.train_weights[proc_mask]) + xcheck_n_events = len(ml_dataset.train_weights[proc_mask]) + logger.info( + f"For the equal weighting method the sum of weights for {sub_proc} is {xcheck_weight_sum} " + f"(No. of events: {xcheck_n_events})", + ) for input_array in self.ml_model_inst.data_loader.input_arrays: logger.info(f"loading data for input array {input_array}") @@ -392,7 +412,6 @@ def run(self): outputs[input_array]["test"][process][fold].dump(test, formatter="numpy") outputs["input_features"][process][fold].dump(ml_dataset.input_features, formatter="pickle") - # dump parameters of the DatasetLoader outputs["parameters"].dump(ml_dataset.parameters, formatter="yaml") @@ -670,6 +689,8 @@ def run(self): "test": MLProcessData(self.ml_model_inst, input_files, "test", self.ml_model_inst.processes, self.fold), }) + # ML WEIGHTING data.train.equal_weights + # create plots # NOTE: this is currently hard-coded, could be made customizable and could also be parallelized since # input reading is quite fast, while producing certain plots takes a long time diff --git a/law.cfg b/law.cfg index 75d32b48..9614c9f2 100644 --- a/law.cfg +++ b/law.cfg @@ -56,7 +56,7 @@ slurm_flavor: $CF_SLURM_FLAVOR slurm_partition: $CF_SLURM_PARTITION # ChunkedIOHandler defaults -chunked_io_chunk_size: 100000 +chunked_io_chunk_size: 50000 chunked_io_pool_size: 2 chunked_io_debug: False @@ -79,6 +79,15 @@ wlcg_file_systems: wlcg_fs, wlcg_fs_desy, wlcg_fs_cernbox, wlcg_fs_desy_store, w # look for the correct fs per nano input file (in that order) lfn_sources: local_desy_dcache, wlcg_fs_desy_store, wlcg_fs_infn_redirector, wlcg_fs_global_redirector +shared_hbw_location: /nfs/dust/cms/user/frahmmat/public/hh2bbww/data/hbw_store + +cf.SelectEvents: local, %(shared_hbw_location)s, hbw_parts +cf.MergeSelectionStats: local, %(shared_hbw_location)s, hbw_parts +cf.ReduceEvents: local, %(shared_hbw_location)s, hbw_parts +cf.MergeReductionStats: local, %(shared_hbw_location)s, hbw_parts +cf.MergeReducedEvents: local, %(shared_hbw_location)s, hbw_parts +cf.GetDatasetLFNs: local, %(shared_hbw_location)s, hbw_parts +cf.CalibrateEvents: local, %(shared_hbw_location)s, hbw_parts # output locations per task family # for local targets : "local[, STORE_PATH]" # for remote targets: "wlcg[, WLCG_FS_NAME]" diff --git a/law.nocert.cfg b/law.nocert.cfg index 16b90b03..32545e0a 100644 --- a/law.nocert.cfg +++ b/law.nocert.cfg @@ -32,6 +32,7 @@ cf.BundleCMSSWSandbox: local cf.BundleExternalFiles: local # GetDatasetLFNs requires a Grid certificate -> use a common space to store the output cf.GetDatasetLFNs: local, /nfs/dust/cms/user/frahmmat/public/hh2bbww/data/common_store +#cf.GetDatasetLFNs: /nfs/dust/cms/user/frahmmat/public/hh2bbww/data/common_store cf.CalibrateEvents: local cf.SelectEvents: local cf.CreateCutflowHistograms: local From e06606a86265974c0e2bb71237178d690b72a7d7 Mon Sep 17 00:00:00 2001 From: Lara Date: Mon, 9 Dec 2024 13:18:36 +0100 Subject: [PATCH 2/5] cleanup and model configuration for ml weighting studies --- hbw/categorization/categories.py | 1 + hbw/config/categories.py | 5 +- hbw/inference/dl.py | 84 +++++++++++++++++++++++ hbw/ml/data_loader.py | 8 ++- hbw/ml/derived/dl.py | 111 +++++++++++++++++++++++++++++-- hbw/ml/stats.py | 23 +++++++ hbw/production/ml_inputs.py | 5 +- hbw/tasks/ml.py | 23 +++++++ hbw/weight/default.py | 4 +- 9 files changed, 250 insertions(+), 14 deletions(-) diff --git a/hbw/categorization/categories.py b/hbw/categorization/categories.py index 33ab591c..c5799e36 100644 --- a/hbw/categorization/categories.py +++ b/hbw/categorization/categories.py @@ -271,6 +271,7 @@ def catid_2b(self: Categorizer, events: ak.Array, **kwargs) -> tuple[ak.Array, a # TODO: not hard-coded -> use config? ml_processes = [ + "signal_ggf", "signal_ggf2", "signal_vbf", "signal_vbf2", "hh_ggf_hbb_hvv_kl1_kt1", "hh_vbf_hbb_hvv_kv1_k2v1_kl1", "hh_ggf_hbb_hvvqqlnu_kl1_kt1", "hh_vbf_hbb_hvvqqlnu_kv1_k2v1_kl1", "hh_ggf_hbb_hvv2l2nu_kl1_kt1", "hh_vbf_hbb_hvv2l2nu_kv1_k2v1_kl1", diff --git a/hbw/config/categories.py b/hbw/config/categories.py index def5b635..a3f2fcbb 100644 --- a/hbw/config/categories.py +++ b/hbw/config/categories.py @@ -323,16 +323,17 @@ def add_categories_ml(config, ml_model_inst): # add ml categories directly to the config # NOTE: this is a bit dangerous, because our ID depends on the MLModel, but # we can reconfigure our MLModel after having created these categories + # TODO: config is empty and therefore fails ml_categories = [] for i, proc in enumerate(ml_model_inst.processes): - cat_label = config.get_process(proc).x.ml_label + # cat_label = config.get_process(proc).x.ml_label ml_categories.append(config.add_category( # NOTE: name and ID is unique as long as we don't use # multiple ml_models simutaneously name=f"ml_{proc}", id=(i + 1) * 1000, selection=f"catid_ml_{proc}", - label=f"{cat_label} category", + # label=f"{cat_label} category", aux={"ml_proc": proc}, )) diff --git a/hbw/inference/dl.py b/hbw/inference/dl.py index 2ce1ea9e..9bde981f 100644 --- a/hbw/inference/dl.py +++ b/hbw/inference/dl.py @@ -142,6 +142,90 @@ dl = HBWInferenceModelBase.derive("dl", cls_dict=default_cls_dict) +# "hh_vbf_hbb_hvv2l2nu_kv1_k2v1_kl1", +# "hh_vbf_hbb_hvv2l2nu_kv1_k2v0_kl1", +# "hh_vbf_hbb_hvv2l2nu_kvm0p962_k2v0p959_klm1p43", +# "hh_vbf_hbb_hvv2l2nu_kvm1p21_k2v1p94_klm0p94", +# "hh_vbf_hbb_hvv2l2nu_kvm1p6_k2v2p72_klm1p36", +# "hh_vbf_hbb_hvv2l2nu_kvm1p83_k2v3p57_klm3p39", +# "hh_ggf_hbb_hvv2l2nu_kl0_kt1", +# "hh_ggf_hbb_hvv2l2nu_kl1_kt1", +# "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", +# "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + + +dl.derive("dl_ml_study_1", cls_dict={ + "ml_model_name": "dl_22post_ml_study_1", + "config_categories": [ + "sr__1b__ml_signal_ggf", + "sr__1b__ml_signal_vbf", + "sr__1b__ml_tt", + "sr__1b__ml_st", + "sr__1b__ml_dy", + "sr__1b__ml_h", + "sr__2b__ml_signal_ggf", + "sr__2b__ml_signal_vbf", + "sr__2b__ml_tt", + "sr__2b__ml_st", + "sr__2b__ml_dy", + "sr__2b__ml_h", + ], + "processes": [ + "hh_vbf_hbb_hvv2l2nu_kv1_k2v1_kl1", + "hh_vbf_hbb_hvv2l2nu_kv1_k2v0_kl1", + "hh_vbf_hbb_hvv2l2nu_kvm0p962_k2v0p959_klm1p43", + "hh_vbf_hbb_hvv2l2nu_kvm1p21_k2v1p94_klm0p94", + "hh_vbf_hbb_hvv2l2nu_kvm1p6_k2v2p72_klm1p36", + "hh_vbf_hbb_hvv2l2nu_kvm1p83_k2v3p57_klm3p39", + "hh_ggf_hbb_hvv2l2nu_kl0_kt1", + "hh_ggf_hbb_hvv2l2nu_kl1_kt1", + "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", + "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + "tt", + "dy", + "w_lnu", + "vv", + "h_ggf", "h_vbf", "zh", "wh", "zh_gg", "tth", + ], + "systematics": rate_systematics, +}) + +dl.derive("dl_ml_study_2", cls_dict={ + "ml_model_name": "dl_22post_ml_study_2", + "config_categories": [ + "sr__1b__ml_signal_ggf2", + "sr__1b__ml_signal_vbf2", + "sr__1b__ml_tt", + "sr__1b__ml_st", + "sr__1b__ml_dy", + "sr__1b__ml_h", + "sr__2b__ml_signal_ggf2", + "sr__2b__ml_signal_vbf2", + "sr__2b__ml_tt", + "sr__2b__ml_st", + "sr__2b__ml_dy", + "sr__2b__ml_h", + ], + "processes": [ + "hh_vbf_hbb_hvv2l2nu_kv1_k2v1_kl1", + "hh_vbf_hbb_hvv2l2nu_kv1_k2v0_kl1", + "hh_vbf_hbb_hvv2l2nu_kvm0p962_k2v0p959_klm1p43", + "hh_vbf_hbb_hvv2l2nu_kvm1p21_k2v1p94_klm0p94", + "hh_vbf_hbb_hvv2l2nu_kvm1p6_k2v2p72_klm1p36", + "hh_vbf_hbb_hvv2l2nu_kvm1p83_k2v3p57_klm3p39", + "hh_ggf_hbb_hvv2l2nu_kl0_kt1", + "hh_ggf_hbb_hvv2l2nu_kl1_kt1", + "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", + "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + "tt", + "dy", + "w_lnu", + "vv", + "h_ggf", "h_vbf", "zh", "wh", "zh_gg", "tth", + ], + "systematics": rate_systematics, +}) + dl.derive("dl_hww_and_hzz", cls_dict={ "processes": [ "hh_ggf_hbb_hww_kl0_kt1", diff --git a/hbw/ml/data_loader.py b/hbw/ml/data_loader.py index dd72394a..55f7b11f 100644 --- a/hbw/ml/data_loader.py +++ b/hbw/ml/data_loader.py @@ -275,8 +275,10 @@ def get_equal_train_weights(self) -> np.ndarray: raise Exception("cannot determine train weights without stats") combined_proc_inst = self.ml_model_inst.config_inst.get_process(self.process) + _, sub_id_proc = get_proc_mask(self._events, self.process, self.ml_model_inst.config_inst) + num_events = np.sum([self.stats[self.process]["num_events_per_process"][str(id)] for id in sub_id_proc]) targeted_sum_of_weights_per_process = ( - self.stats[self.process]["num_events"] / len(combined_proc_inst.x.ml_config.sub_processes) + num_events / len(combined_proc_inst.x.ml_config.sub_processes) ) equal_train_weights = ak.full_like(self.weights, 1.) sub_class_factors = {} @@ -354,7 +356,9 @@ def equal_weights(self) -> np.ndarray: if str(p_inst.id) in id_list ] if proc == self.process: - sum_abs_weights = np.sum([self.stats[self.process]["sum_abs_weights_per_process"][str(id)] for id in sub_id]) + sum_abs_weights = np.sum([ + self.stats[self.process]["sum_abs_weights_per_process"][str(id)] for id in sub_id + ]) num_events_per_proc = np.sum([self.stats[proc]["num_events_per_process"][str(id)] for id in sub_id]) num_events_per_process[proc] = num_events_per_proc diff --git a/hbw/ml/derived/dl.py b/hbw/ml/derived/dl.py index 5266107b..59b4a631 100644 --- a/hbw/ml/derived/dl.py +++ b/hbw/ml/derived/dl.py @@ -153,7 +153,7 @@ def setup(self): expression=f"mlscore.{proc}", null_value=-1, binning=(1000, 0., 1.), - x_title=f"DNN output score {config_inst.get_process(proc).x.ml_label}", + x_title=f"DNN output score {config_inst.get_process(proc).x('ml_label', '')}", aux={"rebin": 40}, ) config_inst.add_variable( @@ -161,7 +161,7 @@ def setup(self): expression=f"mlscore.{proc}", null_value=-1, binning=(40, 0., 1.), - x_title=f"DNN output score {config_inst.get_process(proc).x.ml_label}", + x_title=f"DNN output score {config_inst.get_process(proc).x('ml_label', '')}", ) @@ -221,12 +221,12 @@ def setup(self): "processes": ["sig_01", "sig_25"], }) -dl_22post_test = dl_22post.derive("dl_22post_sigtest", cls_dict={ +dl_22post_test = dl_22post.derive("dl_22post_test", cls_dict={ "training_configs": lambda self, requested_configs: ["c22post"], "combine_processes": { "signal": { # "name": "tt_and_st", - "label": "signal_kl0_1", + "label": "signal", "sub_processes": [ "hh_ggf_hbb_hvv2l2nu_kl0_kt1", "hh_ggf_hbb_hvv2l2nu_kl1_kt1", @@ -237,13 +237,114 @@ def setup(self): }, "top": { # "name": "tt_and_st", - "label": "signal_kl2p45_5", + "label": "top_induced", "sub_processes": ["st", "tt"], "weighting": "equal", }, }, "processes": ["signal", "top"], }) + +dl_22post_test2 = dl_22post.derive("dl_22post_test2", cls_dict={ + "training_configs": lambda self, requested_configs: ["c22post"], + "combine_processes": { + "signal": { + # "name": "tt_and_st", + "label": "signal", + "sub_processes": [ + "hh_ggf_hbb_hvv2l2nu_kl0_kt1", + "hh_ggf_hbb_hvv2l2nu_kl1_kt1", + "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", + "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + ], + "weighting": "xsec", + }, + "top": { + # "name": "tt_and_st", + "label": "top_induced", + "sub_processes": ["st", "tt"], + "weighting": "xsec", + }, + }, + "processes": ["signal", "top"], +}) + +dl_22post_ml_study_1 = dl_22post.derive("dl_22post_ml_study_1", cls_dict={ + "training_configs": lambda self, requested_configs: ["c22post"], + "combine_processes": { + "signal_ggf": { + # "name": "tt_and_st", + "label": "Signal GGF", + "sub_processes": [ + "hh_ggf_hbb_hvv2l2nu_kl0_kt1", + "hh_ggf_hbb_hvv2l2nu_kl1_kt1", + "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", + "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + ], + "weighting": "xsec", + }, + "signal_vbf": { + # "name": "tt_and_st", + "label": "Signal VBF", + "sub_processes": [ + "hh_vbf_hbb_hvv2l2nu_kv1_k2v1_kl1", + "hh_vbf_hbb_hvv2l2nu_kv1_k2v0_kl1", + "hh_vbf_hbb_hvv2l2nu_kvm0p962_k2v0p959_klm1p43", + "hh_vbf_hbb_hvv2l2nu_kvm1p21_k2v1p94_klm0p94", + "hh_vbf_hbb_hvv2l2nu_kvm1p6_k2v2p72_klm1p36", + "hh_vbf_hbb_hvv2l2nu_kvm1p83_k2v3p57_klm3p39", + ], + "weighting": "xsec", + }, + }, + "processes": [ + "signal_ggf", + "signal_vbf", + "tt", + "st", + "dy", + "h", + ], +}) + +dl_22post_ml_study_2 = dl_22post.derive("dl_22post_ml_study_2", cls_dict={ + "training_configs": lambda self, requested_configs: ["c22post"], + "combine_processes": { + "signal_ggf2": { + # "name": "tt_and_st", + "label": "Signal GGF", + "sub_processes": [ + "hh_ggf_hbb_hvv2l2nu_kl0_kt1", + "hh_ggf_hbb_hvv2l2nu_kl1_kt1", + "hh_ggf_hbb_hvv2l2nu_kl2p45_kt1", + "hh_ggf_hbb_hvv2l2nu_kl5_kt1", + ], + "weighting": "equal", + }, + "signal_vbf2": { + # "name": "tt_and_st", + "label": "Signal VBF", + "sub_processes": [ + "hh_vbf_hbb_hvv2l2nu_kv1_k2v1_kl1", + "hh_vbf_hbb_hvv2l2nu_kv1_k2v0_kl1", + "hh_vbf_hbb_hvv2l2nu_kvm0p962_k2v0p959_klm1p43", + "hh_vbf_hbb_hvv2l2nu_kvm1p21_k2v1p94_klm0p94", + "hh_vbf_hbb_hvv2l2nu_kvm1p6_k2v2p72_klm1p36", + "hh_vbf_hbb_hvv2l2nu_kvm1p83_k2v3p57_klm3p39", + + ], + "weighting": "xsec", + }, + }, + "processes": [ + "signal_ggf2", + "signal_vbf2", + "tt", + "st", + "dy", + "h", + ], +}) # # setups with different processes (0: baseline, 1: add SM vbf + single H, 2: add SL+all HH variations) # NOTE: we should decide which signal processes exactly to use: diff --git a/hbw/ml/stats.py b/hbw/ml/stats.py index c80b55e0..1a4b0a88 100644 --- a/hbw/ml/stats.py +++ b/hbw/ml/stats.py @@ -27,6 +27,22 @@ logger = law.logger.get_logger(__name__) +def del_sub_proc_stats( + stats: dict, + proc: str, +) -> np.ndarray: + """ + Function deletes dict keys which are not part of the requested process + + :param stats: Dictionaire containing ML stats for each process. + :param proc: String of the process. + :param sub_id: List of ids of sub processes that should be reatined (!). + """ + item_list = list(stats.weight_map.keys()) + for item in item_list: + stats[item].pop() + + @producer( uses={IF_SL(catid_sr), IF_DL(catid_mll_low), increment_stats, "process_id", "fold_indices"}, produces={IF_MC("event_weight")}, @@ -78,6 +94,7 @@ def ml_preparation( "fold": { "values": events.fold_indices, "mask_fn": (lambda v: events.fold_indices == v), + "combinations_only": True, }, } @@ -92,6 +109,12 @@ def ml_preparation( group_combinations=group_combinations, **kwargs, ) + + key_list = list(weight_map.keys()) + for key in key_list: + stats.pop(key, None) + # TODO: pop 'num_fold_events' + return events diff --git a/hbw/production/ml_inputs.py b/hbw/production/ml_inputs.py index 813a3cb4..c56f4f9e 100644 --- a/hbw/production/ml_inputs.py +++ b/hbw/production/ml_inputs.py @@ -127,7 +127,7 @@ def common_ml_inputs(self: Producer, events: ak.Array, **kwargs) -> ak.Array: events = set_ak_column_f32(events, "mli_mindr_jj", ak.min(dr, axis=1)) # hbb features - hbb = events.Bjet[:, 0] + events.Bjet[:, 1] + hbb = (events.Bjet[:, 0] + events.Bjet[:, 1]) * 1 # NOTE: *1 so it is a Lorentzvector not a candidate vector events = set_ak_column_f32(events, "mli_bb_pt", hbb.pt) events = set_ak_column_f32(events, "mli_mbb", hbb.mass) @@ -318,8 +318,7 @@ def dl_ml_inputs(self: Producer, events: ak.Array, **kwargs) -> ak.Array: events = set_ak_column_f32(events, "mli_min_dr_llbb", min_dr_llbb) # hh system - hbb = events.Bjet[:, 0] + events.Bjet[:, 1] - + hbb = (events.Bjet[:, 0] + events.Bjet[:, 1]) * 1 # NOTE: *1 so it is a Lorentzvector not a candidate vector events = set_ak_column_f32(events, "mli_mbbllMET", (hll + hbb + events.MET[:]).mass) events = set_ak_column_f32(events, "mli_dr_bb_llMET", hbb.delta_r(hll + events.MET[:])) events = set_ak_column_f32(events, "mli_dphi_bb_nu", abs(hbb.delta_phi(events.MET))) diff --git a/hbw/tasks/ml.py b/hbw/tasks/ml.py index 8001aa42..5e45e9d7 100644 --- a/hbw/tasks/ml.py +++ b/hbw/tasks/ml.py @@ -211,6 +211,10 @@ def fold(self): return None return self.branch_data["fold"] + @property + def config_inst(self): + return self.config_insts[0] + def create_branch_map(self): return [ DotDict({"fold": fold, "process": process}) @@ -341,6 +345,17 @@ def merge_stats(self, inputs) -> dict: # gather stats per ml process stats = inputs["stats"][config_inst.name][dataset]["stats"].load(formatter="json") process = config_inst.get_dataset(dataset).x.ml_process + proc_inst = config_inst.get_process(process) + sub_id = [ + proc_inst.id + for proc_inst, _, _ in proc_inst.walk_processes(include_self=True) + ] + + for id in list(stats["num_events_per_process"].keys()): + if int(id) not in sub_id: + for key in list(stats.keys()): + stats[key].pop(id, None) + MergeMLStats.merge_counts(merged_stats[process], stats) return merged_stats @@ -452,6 +467,10 @@ class MLEvaluationSingleFold( description="the data split to evaluate; must be one of 'train', 'val', 'test'; default: 'test'", ) + @property + def config_inst(self): + return self.config_insts[0] + def create_branch_map(self): return [ DotDict({"process": process}) @@ -594,6 +613,10 @@ class PlotMLResultsSingleFold( data_splits = ("test", "val", "train") + @property + def config_inst(self): + return self.config_insts[0] + def create_branch_map(self): return {0: None} diff --git a/hbw/weight/default.py b/hbw/weight/default.py index 7cc71d12..501ef584 100644 --- a/hbw/weight/default.py +++ b/hbw/weight/default.py @@ -139,7 +139,7 @@ def base_init(self: WeightProducer) -> None: "muon_id_weight": ["mu_id_sf"], "muon_iso_weight": ["mu_iso_sf"], "electron_weight": ["e_sf"], - "normalized_ht_njet_btag_weight": [f"btag_{unc}" for unc in btag_uncs], + "normalized_ht_njet_nhf_btag_weight": [f"btag_{unc}" for unc in btag_uncs], "normalized_murmuf_envelope_weight": ["murf_envelope"], "normalized_mur_weight": ["mur"], "normalized_muf_weight": ["muf"], @@ -155,7 +155,7 @@ def base_init(self: WeightProducer) -> None: base.derive("unstitched", cls_dict={"weight_columns": {**default_correction_weights, "normalization_weight": []}}) weight_columns_execpt_btag = default_weight_columns.copy() -weight_columns_execpt_btag.pop("normalized_ht_njet_btag_weight") +weight_columns_execpt_btag.pop("normalized_ht_njet_nhf_btag_weight") base.derive("no_btag_weight", cls_dict={"weight_columns": weight_columns_execpt_btag}) base.derive("btag_not_normalized", cls_dict={"weight_columns": { From 3febce2ca38c77b3ec314d3b65627434176dffe2 Mon Sep 17 00:00:00 2001 From: lamarkus <67378765+Lara813@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:26:36 +0100 Subject: [PATCH 3/5] Update hbw/ml/data_loader.py Co-authored-by: Mathis Frahm <49306645+mafrahm@users.noreply.github.com> --- hbw/ml/data_loader.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/hbw/ml/data_loader.py b/hbw/ml/data_loader.py index 55f7b11f..677e0f04 100644 --- a/hbw/ml/data_loader.py +++ b/hbw/ml/data_loader.py @@ -115,8 +115,6 @@ class MLDatasetLoader: shuffle: bool = True input_arrays: tuple = ("features", "weights", "train_weights", "equal_weights") - # input_arrays: tuple = ("features", "weights", _ - # _ "equal_train_weights", "xsec_train_weights", "train_weights", "equal_weights") evaluation_arrays: tuple = ("prediction",) def __init__(self, ml_model_inst: MLModel, process: "str", events: ak.Array, stats: dict | None = None): From fc83b520b58de9ba6b64cb7aa3efb86b3dc37332 Mon Sep 17 00:00:00 2001 From: lamarkus <67378765+Lara813@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:30:20 +0100 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Mathis Frahm <49306645+mafrahm@users.noreply.github.com> --- hbw/ml/data_loader.py | 37 +++++-------------------------------- hbw/tasks/ml.py | 4 +--- 2 files changed, 6 insertions(+), 35 deletions(-) diff --git a/hbw/ml/data_loader.py b/hbw/ml/data_loader.py index 677e0f04..445ccaba 100644 --- a/hbw/ml/data_loader.py +++ b/hbw/ml/data_loader.py @@ -23,13 +23,14 @@ def get_proc_mask( events: ak.Array, proc: str | od.Process, config_inst: od.Config | None = None, -) -> np.ndarray: +) -> tuple(np.ndarray, list): """ - Creates a list of the Ids of all subprocesses and teh corresponding mask for all events. + Creates the mask selecting events belonging to the process *proc* and a list of all ids belonging to this process. - :param events: Event array - :param config_inst: An instance of the Config, can be None if Porcess instance is given. + :param events: Event array :param proc: Either string or process instance. + :param config_inst: An instance of the Config, can be None if Porcess instance is given. + :return process mask and the corresponding process ids """ # get process instance if config_inst: @@ -52,27 +53,6 @@ def get_proc_mask( return proc_mask, sub_id -def del_sub_proc_stats( - stats: dict, - proc: str, - sub_id: list, -) -> np.ndarray: - """ - Function deletes dict keys which are not part of the requested process - - :param stats: Dictionaire containing ML stats for each process. - :param proc: String of the process. - :param sub_id: List of ids of sub processes that should be reatined (!). - """ - id_list = list(stats[proc]["num_events_per_process"].keys()) - item_list = list(stats[proc].keys()) - for id in id_list: - if int(id) not in sub_id: - for item in item_list: - if "per_process" in item: - del stats[proc][item][id] - - def input_features_sanity_checks(ml_model_inst: MLModel, input_features: list[str]): """ Perform sanity checks on the input features. @@ -134,9 +114,7 @@ def __init__(self, ml_model_inst: MLModel, process: "str", events: ak.Array, sta self._process = process proc_mask, _ = get_proc_mask(events, process, ml_model_inst.config_inst) - # TODO: die ohne _per_process müssen auch noch, still, per fold never make sense then anymore -> DISCUSS self._stats = stats - # del_sub_proc_stats(process, sub_id) self._events = events[proc_mask] def __repr__(self): @@ -323,9 +301,6 @@ def train_weights(self) -> np.ndarray: train_weights = self.get_equal_train_weights() else: train_weights = self.get_xsec_train_weights() - # self._train_weights = self.get_equal_train_weights() - # else: - # self._train_weights = self.get_xsec_train_weights() self._train_weights = ak.to_numpy(train_weights).astype(np.float32) @@ -360,8 +335,6 @@ def equal_weights(self) -> np.ndarray: num_events_per_proc = np.sum([self.stats[proc]["num_events_per_process"][str(id)] for id in sub_id]) num_events_per_process[proc] = num_events_per_proc - # sum_abs_weights = self.stats[self.process]["sum_abs_weights"] - # num_events_per_process = {proc: self.stats[proc]["num_events"] for proc in processes} validation_weights = self.weights / sum_abs_weights * max(num_events_per_process.values()) self._validation_weights = ak.to_numpy(validation_weights).astype(np.float32) diff --git a/hbw/tasks/ml.py b/hbw/tasks/ml.py index 5e45e9d7..ac2eb876 100644 --- a/hbw/tasks/ml.py +++ b/hbw/tasks/ml.py @@ -404,8 +404,8 @@ def run(self): n_events_per_fold = len(ml_dataset.train_weights) logger.info(f"Sum of traing weights is: {sum_train_weights} for {n_events_per_fold} {process} events") + # check that equal weighting works as intended if self.ml_model_inst.config_inst.get_process(process).x("ml_config", None): - if self.ml_model_inst.config_inst.get_process(process).x.ml_config.weighting == "equal": for sub_proc in self.ml_model_inst.config_inst.get_process(process).x.ml_config.sub_processes: proc_mask, sub_id = get_proc_mask(events, sub_proc, self.ml_model_inst.config_inst) @@ -712,8 +712,6 @@ def run(self): "test": MLProcessData(self.ml_model_inst, input_files, "test", self.ml_model_inst.processes, self.fold), }) - # ML WEIGHTING data.train.equal_weights - # create plots # NOTE: this is currently hard-coded, could be made customizable and could also be parallelized since # input reading is quite fast, while producing certain plots takes a long time From 4e3b009746dedbe544bef1bc65cd9963ec56cc9c Mon Sep 17 00:00:00 2001 From: Mathis Frahm <49306645+mafrahm@users.noreply.github.com> Date: Wed, 11 Dec 2024 09:59:10 +0100 Subject: [PATCH 5/5] Apply suggestions from code review --- law.cfg | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/law.cfg b/law.cfg index 9614c9f2..75d32b48 100644 --- a/law.cfg +++ b/law.cfg @@ -56,7 +56,7 @@ slurm_flavor: $CF_SLURM_FLAVOR slurm_partition: $CF_SLURM_PARTITION # ChunkedIOHandler defaults -chunked_io_chunk_size: 50000 +chunked_io_chunk_size: 100000 chunked_io_pool_size: 2 chunked_io_debug: False @@ -79,15 +79,6 @@ wlcg_file_systems: wlcg_fs, wlcg_fs_desy, wlcg_fs_cernbox, wlcg_fs_desy_store, w # look for the correct fs per nano input file (in that order) lfn_sources: local_desy_dcache, wlcg_fs_desy_store, wlcg_fs_infn_redirector, wlcg_fs_global_redirector -shared_hbw_location: /nfs/dust/cms/user/frahmmat/public/hh2bbww/data/hbw_store - -cf.SelectEvents: local, %(shared_hbw_location)s, hbw_parts -cf.MergeSelectionStats: local, %(shared_hbw_location)s, hbw_parts -cf.ReduceEvents: local, %(shared_hbw_location)s, hbw_parts -cf.MergeReductionStats: local, %(shared_hbw_location)s, hbw_parts -cf.MergeReducedEvents: local, %(shared_hbw_location)s, hbw_parts -cf.GetDatasetLFNs: local, %(shared_hbw_location)s, hbw_parts -cf.CalibrateEvents: local, %(shared_hbw_location)s, hbw_parts # output locations per task family # for local targets : "local[, STORE_PATH]" # for remote targets: "wlcg[, WLCG_FS_NAME]"