-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask_states.c
193 lines (154 loc) · 3.87 KB
/
task_states.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*-------------------------------------------------------------------------
*
* src/task_states.c
*
* Logic for storing and manipulating cron task states.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "cron.h"
#include "pg_cron.h"
#include "task_states.h"
#include "access/hash.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
/* forward declarations */
static HTAB * CreateCronTaskHash(void);
static CronTask * GetCronTask(int64 jobId);
/* global variables */
static MemoryContext CronTaskContext = NULL;
static HTAB *CronTaskHash = NULL;
/* settings */
bool LaunchActiveJobs = true;
/*
* InitializeTaskStateHash initializes the hash for storing task states.
*/
void
InitializeTaskStateHash(void)
{
CronTaskContext = AllocSetContextCreate(CurrentMemoryContext,
"pg_cron task context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
CronTaskHash = CreateCronTaskHash();
}
/*
* CreateCronTaskHash creates the hash for storing cron task states.
*/
static HTAB *
CreateCronTaskHash(void)
{
HTAB *taskHash = NULL;
HASHCTL info;
int hashFlags = 0;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(CronTask);
info.hash = tag_hash;
info.hcxt = CronTaskContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
taskHash = hash_create("pg_cron tasks", 32, &info, hashFlags);
return taskHash;
}
/*
* RefreshTaskHash reloads the cron jobs from the cron.job table.
* If a job that has an active task has been removed, the task
* is marked as inactive by this function.
*/
void
RefreshTaskHash(void)
{
List *jobList = NIL;
ListCell *jobCell = NULL;
CronTask *task = NULL;
HASH_SEQ_STATUS status;
ResetJobMetadataCache();
hash_seq_init(&status, CronTaskHash);
/* mark all tasks as inactive */
while ((task = hash_seq_search(&status)) != NULL)
{
task->isActive = false;
}
jobList = LoadCronJobList();
/* mark tasks that still have a job as active */
foreach(jobCell, jobList)
{
CronJob *job = (CronJob *) lfirst(jobCell);
task = GetCronTask(job->jobId);
task->isActive = LaunchActiveJobs && job->active;
task->secondsInterval = job->schedule.secondsInterval;
}
CronJobCacheValid = true;
}
/*
* GetCronTask gets the current task with the given job ID.
*/
static CronTask *
GetCronTask(int64 jobId)
{
CronTask *task = NULL;
int64 hashKey = jobId;
bool isPresent = false;
task = hash_search(CronTaskHash, &hashKey, HASH_ENTER, &isPresent);
if (!isPresent)
{
InitializeCronTask(task, jobId);
/*
* We only initialize last run when entering into the hash.
* The net effect is that the timer for the first run of an
* interval job starts when pg_cron first learns about the job.
*/
task->lastStartTime = GetCurrentTimestamp();
}
return task;
}
/*
* InitializeCronTask intializes a CronTask struct.
*/
void
InitializeCronTask(CronTask *task, int64 jobId)
{
task->runId = 0;
task->jobId = jobId;
task->state = CRON_TASK_WAITING;
task->pendingRunCount = 0;
task->connection = NULL;
task->pollingStatus = 0;
task->startDeadline = 0;
task->isSocketReady = false;
task->isActive = true;
task->errorMessage = NULL;
task->freeErrorMessage = false;
}
/*
* CurrentTaskList extracts the current list of tasks from the
* cron task hash.
*/
List *
CurrentTaskList(void)
{
List *taskList = NIL;
CronTask *task = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, CronTaskHash);
while ((task = hash_seq_search(&status)) != NULL)
{
taskList = lappend(taskList, task);
}
return taskList;
}
/*
* RemoveTask remove the task for the given job ID.
*/
void
RemoveTask(int64 jobId)
{
bool isPresent = false;
hash_search(CronTaskHash, &jobId, HASH_REMOVE, &isPresent);
}