diff --git a/analysis_templates/cms_minimal/law.cfg b/analysis_templates/cms_minimal/law.cfg index 3fce2ce3e..0dcbc6d15 100644 --- a/analysis_templates/cms_minimal/law.cfg +++ b/analysis_templates/cms_minimal/law.cfg @@ -56,6 +56,7 @@ skip_ensure_proxy: False # some remote workflow parameter defaults htcondor_flavor: $CF_HTCONDOR_FLAVOR htcondor_share_software: False +htcondor_disk: -1 slurm_flavor: $CF_SLURM_FLAVOR slurm_partition: $CF_SLURM_PARTITION diff --git a/columnflow/tasks/framework/base.py b/columnflow/tasks/framework/base.py index 43002d099..85bd05e86 100644 --- a/columnflow/tasks/framework/base.py +++ b/columnflow/tasks/framework/base.py @@ -142,7 +142,7 @@ def req_params(cls, inst: AnalysisTask, **kwargs) -> dict: _prefer_cli = law.util.make_set(kwargs.get("_prefer_cli", [])) | { "version", "workflow", "job_workers", "poll_interval", "walltime", "max_runtime", "retries", "acceptance", "tolerance", "parallel_jobs", "shuffle_jobs", "htcondor_cpus", - "htcondor_gpus", "htcondor_memory", "htcondor_pool", "pilot", + "htcondor_gpus", "htcondor_memory", "htcondor_disk", "htcondor_pool", "pilot", } kwargs["_prefer_cli"] = _prefer_cli diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index be7e8f747..8a747118e 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -528,6 +528,11 @@ def common_destination_info(self, info: dict[str, str]) -> dict[str, str]: _default_htcondor_flavor = law.config.get_expanded("analysis", "htcondor_flavor", law.NO_STR) _default_htcondor_share_software = law.config.get_expanded_boolean("analysis", "htcondor_share_software", False) +_default_htcondor_disk = law.util.parse_bytes( + law.config.get_expanded_float("analysis", "htcondor_disk", law.NO_FLOAT), + input_unit="GB", + unit="GB", +) class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkflowMixin): @@ -565,9 +570,16 @@ class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkfl default=law.NO_FLOAT, unit="MB", significant=False, - description="requested memeory in MB; empty value leads to the cluster default setting; " + description="requested memory in MB; empty value leads to the cluster default setting; " "empty default", ) + htcondor_disk = law.BytesParameter( + default=_default_htcondor_disk, + unit="GB", + significant=False, + description="requested disk space in GB; empty value leads to the cluster default setting; " + f"{'empty default' if _default_htcondor_disk <= 0 else 'default: ' + str(_default_htcondor_disk)}", + ) htcondor_flavor = luigi.ChoiceParameter( default=_default_htcondor_flavor, choices=( @@ -697,6 +709,12 @@ def htcondor_job_config(self, config, job_num, branches): if self.htcondor_memory is not None and self.htcondor_memory > 0: config.custom_content.append(("Request_Memory", self.htcondor_memory)) + # request disk space + if self.htcondor_disk is not None and self.htcondor_disk > 0: + # TODO: the exact conversion might be flavor dependent in the future, use kB for npw + # e.g. https://confluence.desy.de/pages/viewpage.action?pageId=128354529 + config.custom_content.append(("RequestDisk", self.htcondor_disk * 1024**2)) + # render variables config.render_variables["cf_bootstrap_name"] = "htcondor_standalone" if self.htcondor_flavor not in ("", law.NO_STR): diff --git a/columnflow/tasks/reduction.py b/columnflow/tasks/reduction.py index dca0ae49d..6ee9c852a 100644 --- a/columnflow/tasks/reduction.py +++ b/columnflow/tasks/reduction.py @@ -260,6 +260,8 @@ class MergeReductionStats( SelectorStepsMixin, CalibratorsMixin, DatasetTask, + law.LocalWorkflow, + RemoteWorkflow, ): n_inputs = luigi.IntParameter( @@ -279,6 +281,7 @@ class MergeReductionStats( # upstream requirements reqs = Requirements( + RemoteWorkflow.reqs, ReduceEvents=ReduceEvents, ) @@ -298,10 +301,31 @@ def resolve_param_values(cls, params): return params + def create_branch_map(self): + # single branch without payload + return {0: None} + + def workflow_requires(self): + reqs = super().workflow_requires() + if self.merged_size == 0: + return reqs + + reqs["events"] = self.reqs.ReduceEvents.req_different_branching( + self, + branches=((0, self.n_inputs),), + ) + return reqs + def requires(self): if self.merged_size == 0: return [] - return self.reqs.ReduceEvents.req(self, branches=((0, self.n_inputs),)) + + return self.reqs.ReduceEvents.req_different_branching( + self, + workflow="local", + branches=((0, self.n_inputs),), + _exclude={"branch"}, + ) def output(self): return {"stats": self.target(f"stats_n{self.n_inputs}.json")} @@ -429,7 +453,7 @@ def is_sandboxed(self): @law.workflow_property(setter=True, cache=True, empty_value=0) def file_merging(self): # check if the merging stats are present - stats = self.reqs.MergeReductionStats.req(self).output()["stats"] + stats = self.reqs.MergeReductionStats.req_different_branching(self, branch=0).output()["stats"] return stats.load(formatter="json")["merge_factor"] if stats.exists() else 0 @law.dynamic_workflow_condition @@ -444,14 +468,14 @@ def create_branch_map(self): def merge_workflow_requires(self): return { - "stats": self.reqs.MergeReductionStats.req(self), + "stats": self.reqs.MergeReductionStats.req_different_branching(self), "events": self.reqs.ReduceEvents.req_different_branching(self, branches=((0, -1),)), } def merge_requires(self, start_branch, end_branch): return { - "stats": self.reqs.MergeReductionStats.req(self), - "events": self.reqs.ReduceEvents.req( + "stats": self.reqs.MergeReductionStats.req_different_branching(self, branch=0), + "events": self.reqs.ReduceEvents.req_different_branching( self, branches=((start_branch, end_branch),), workflow="local", @@ -508,6 +532,7 @@ class ProvideReducedEvents( CalibratorsMixin, DatasetTask, law.LocalWorkflow, + RemoteWorkflow, ): skip_merging = luigi.BoolParameter( @@ -524,18 +549,26 @@ class ProvideReducedEvents( # upstream requirements reqs = Requirements( + RemoteWorkflow.reqs, ReduceEvents=ReduceEvents, MergeReductionStats=MergeReductionStats, MergeReducedEvents=MergeReducedEvents, ) + @classmethod + def _resolve_workflow_parameters(cls, params): + # always fallback to local workflows + params["effective_workflow"] = "local" + + return super()._resolve_workflow_parameters(params) + @law.workflow_property(setter=True, cache=True, empty_value=0) def file_merging(self): if self.skip_merging or self.dataset_info_inst.n_files == 1: return 1 # check if the merging stats are present - stats = self.reqs.MergeReductionStats.req(self).output()["stats"] + stats = self.reqs.MergeReductionStats.req_different_branching(self, branch=0).output()["stats"] return stats.load(formatter="json")["merge_factor"] if stats.exists() else 0 @law.dynamic_workflow_condition @@ -576,7 +609,7 @@ def workflow_requires(self): reqs["events"] = self._req_reduced_events() else: # here, the merging is unclear so require the stats - reqs["reduction_stats"] = self.reqs.MergeReductionStats.req(self) + reqs["reduction_stats"] = self.reqs.MergeReductionStats.req_different_branching(self) if self.force_merging: # require merged events when forced @@ -598,7 +631,7 @@ def requires(self): if self.skip_merging or (not self.force_merging and self.dataset_info_inst.n_files == 1): reqs["events"] = self._req_reduced_events() else: - reqs["reduction_stats"] = self.reqs.MergeReductionStats.req(self) + reqs["reduction_stats"] = self.reqs.MergeReductionStats.req_different_branching(self, branch=0) if self.force_merging: reqs["events"] = self._req_merged_reduced_events() diff --git a/columnflow/tasks/selection.py b/columnflow/tasks/selection.py index 24746b007..a59080bd9 100644 --- a/columnflow/tasks/selection.py +++ b/columnflow/tasks/selection.py @@ -286,6 +286,7 @@ class MergeSelectionStats( CalibratorsMixin, DatasetTask, law.tasks.ForestMerge, + RemoteWorkflow, ): # flag that sets the *hists* output to optional if True selection_hists_optional = default_selection_hists_optional @@ -296,11 +297,9 @@ class MergeSelectionStats( # merge 25 stats files into 1 at every step of the merging cascade merge_factor = 25 - # skip receiving some parameters via req - exclude_params_req_get = {"workflow"} - # upstream requirements reqs = Requirements( + RemoteWorkflow.reqs, SelectEvents=SelectEvents, ) @@ -309,10 +308,10 @@ def create_branch_map(self): return law.tasks.ForestMerge.create_branch_map(self) def merge_workflow_requires(self): - return self.reqs.SelectEvents.req(self, _exclude={"branches"}) + return self.reqs.SelectEvents.req_different_branching(self, _exclude={"branches"}) def merge_requires(self, start_branch, end_branch): - return self.reqs.SelectEvents.req( + return self.reqs.SelectEvents.req_different_branching( self, branches=((start_branch, end_branch),), workflow="local", diff --git a/law.cfg b/law.cfg index 1d07fb288..6650da0ff 100644 --- a/law.cfg +++ b/law.cfg @@ -52,6 +52,7 @@ skip_ensure_proxy: False # some remote workflow parameter defaults htcondor_flavor: $CF_HTCONDOR_FLAVOR htcondor_share_software: False +htcondor_disk: -1 slurm_flavor: $CF_SLURM_FLAVOR slurm_partition: $CF_SLURM_PARTITION