Skip to content

Commit

Permalink
feat: enable tenant option to disable job scheduling (#304)
Browse files Browse the repository at this point in the history
* add handling for disable job scheduling

* fix clause order

* fix handle datetime
  • Loading branch information
ahmadnaufal authored Dec 3, 2024
1 parent 33cdee2 commit 2adb974
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 21 deletions.
16 changes: 13 additions & 3 deletions core/tenant/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
const (
EntityProject = "project"

ProjectStoragePathKey = "STORAGE_PATH"
ProjectSchedulerHost = "SCHEDULER_HOST"
ProjectSchedulerVersion = "SCHEDULER_VERSION"
ProjectStoragePathKey = "STORAGE_PATH"
ProjectSchedulerHost = "SCHEDULER_HOST"
ProjectSchedulerVersion = "SCHEDULER_VERSION"
ProjectDisableJobScheduling = "DISABLE_JOB_SCHEDULING"
)

type ProjectName string
Expand Down Expand Up @@ -78,6 +79,15 @@ func (p *Project) GetVariables() map[string]string {
return vars
}

func (p *Project) IsJobSchedulingDisabled() bool {
disable, err := p.GetConfig(ProjectDisableJobScheduling)
if err != nil {
return false
}

return strings.ToLower(disable) == "true"
}

func (p *Project) SetPresets(presets map[string]Preset) {
if presets == nil {
p.presets = make(map[string]Preset)
Expand Down
7 changes: 6 additions & 1 deletion ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ def lookup_non_standard_cron_expression(expr: str) -> str:
return expr

def get_scheduled_at(context):
job_cron_iter = croniter(context.get("dag").schedule_interval, context.get('execution_date'))
interval = context.get("dag").schedule_interval
if interval is None:
# pendulum.Datetime cannot work with serializer used by airflow, so need to convert to datetime
return datetime.fromtimestamp(context.get('logical_date').timestamp(), tz=utc)

job_cron_iter = croniter(interval, context.get('execution_date'))
return job_cron_iter.get_next(datetime)

class SuperKubernetesPodOperator(KubernetesPodOperator):
Expand Down
27 changes: 14 additions & 13 deletions ext/scheduler/airflow/dag/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,20 @@ func (c *Compiler) Compile(project *tenant.Project, jobDetails *scheduler.JobWit
upstreams := SetupUpstreams(jobDetails.Upstreams, c.hostname)

templateContext := TemplateContext{
JobDetails: jobDetails,
Tenant: jobDetails.Job.Tenant,
Version: config.BuildVersion,
SLAMissDuration: slaDuration,
Hostname: c.hostname,
GRPCHostName: c.grpcHost,
ExecutorTask: scheduler.ExecutorTask.String(),
ExecutorHook: scheduler.ExecutorHook.String(),
Task: task,
Hooks: hooks,
RuntimeConfig: runtimeConfig,
Priority: jobDetails.Priority,
Upstreams: upstreams,
JobDetails: jobDetails,
Tenant: jobDetails.Job.Tenant,
Version: config.BuildVersion,
SLAMissDuration: slaDuration,
Hostname: c.hostname,
GRPCHostName: c.grpcHost,
ExecutorTask: scheduler.ExecutorTask.String(),
ExecutorHook: scheduler.ExecutorHook.String(),
Task: task,
Hooks: hooks,
RuntimeConfig: runtimeConfig,
Priority: jobDetails.Priority,
Upstreams: upstreams,
DisableJobScheduling: project.IsJobSchedulingDisabled(),
}

airflowVersion, err := project.GetConfig(tenant.ProjectSchedulerVersion)
Expand Down
2 changes: 2 additions & 0 deletions ext/scheduler/airflow/dag/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type TemplateContext struct {
Hooks Hooks
Priority int
Upstreams Upstreams

DisableJobScheduling bool
}

type Task struct {
Expand Down
2 changes: 1 addition & 1 deletion ext/scheduler/airflow/dag/template/dag.2.1.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ default_args = {
dag = DAG(
dag_id={{.JobDetails.Name.String | quote}},
default_args=default_args,
schedule_interval={{ if eq .JobDetails.Schedule.Interval "" }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
schedule_interval={{ if or .DisableJobScheduling (eq .JobDetails.Schedule.Interval "") }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
catchup={{ if .JobDetails.Schedule.CatchUp -}}True{{- else -}}False{{- end }},
dagrun_timeout=timedelta(seconds=DAGRUN_TIMEOUT_IN_SECS),
tags=[
Expand Down
2 changes: 1 addition & 1 deletion ext/scheduler/airflow/dag/template/dag.2.4.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ default_args = {
dag = DAG(
dag_id={{.JobDetails.Name.String | quote}},
default_args=default_args,
schedule_interval={{ if eq .JobDetails.Schedule.Interval "" }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
schedule_interval={{ if or .DisableJobScheduling (eq .JobDetails.Schedule.Interval "") }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
catchup={{ if .JobDetails.Schedule.CatchUp -}}True{{- else -}}False{{- end }},
dagrun_timeout=timedelta(seconds=DAGRUN_TIMEOUT_IN_SECS),
tags=[
Expand Down
2 changes: 1 addition & 1 deletion ext/scheduler/airflow/dag/template/dag.2.6.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ default_args = {
dag = DAG(
dag_id={{.JobDetails.Name.String | quote}},
default_args=default_args,
schedule_interval={{ if eq .JobDetails.Schedule.Interval "" }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
schedule_interval={{ if or .DisableJobScheduling (eq .JobDetails.Schedule.Interval "") }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
catchup={{ if .JobDetails.Schedule.CatchUp -}}True{{- else -}}False{{- end }},
dagrun_timeout=timedelta(seconds=DAGRUN_TIMEOUT_IN_SECS),
tags=[
Expand Down
2 changes: 1 addition & 1 deletion ext/scheduler/airflow/dag/template/dag.2.9.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ default_args = {
dag = DAG(
dag_id={{.JobDetails.Name.String | quote}},
default_args=default_args,
schedule_interval={{ if eq .JobDetails.Schedule.Interval "" }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
schedule_interval={{ if or .DisableJobScheduling (eq .JobDetails.Schedule.Interval "") }}None{{- else -}} {{ .JobDetails.Schedule.Interval | quote}}{{end}},
catchup={{ if .JobDetails.Schedule.CatchUp -}}True{{- else -}}False{{- end }},
dagrun_timeout=timedelta(seconds=DAGRUN_TIMEOUT_IN_SECS),
tags=[
Expand Down

0 comments on commit 2adb974

Please sign in to comment.