forked from spotify/helios
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathZooKeeperAgentModel.java
229 lines (200 loc) · 8.18 KB
/
ZooKeeperAgentModel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
* Copyright (c) 2014 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.spotify.helios.agent;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.spotify.helios.common.Json;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.servicescommon.KafkaClientProvider;
import com.spotify.helios.servicescommon.KafkaRecord;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.spotify.helios.common.descriptors.Descriptor.parse;
/**
* The Helios Agent's view into ZooKeeper.
*
* This caches ZK state to local disk so the agent can continue to function in the face of a ZK
* outage.
*/
public class ZooKeeperAgentModel extends AbstractIdleService implements AgentModel {
private static final Logger log = LoggerFactory.getLogger(ZooKeeperAgentModel.class);
private static final String TASK_CONFIG_FILENAME = "task-config.json";
private static final String TASK_HISTORY_FILENAME = "task-history.json";
private static final String TASK_STATUS_FILENAME = "task-status.json";
private final PersistentPathChildrenCache<Task> tasks;
private final ZooKeeperUpdatingPersistentDirectory taskStatuses;
private final TaskHistoryWriter historyWriter;
private final KafkaSender kafkaSender;
private final String agent;
private final CopyOnWriteArrayList<AgentModel.Listener> listeners = new CopyOnWriteArrayList<>();
public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
final KafkaClientProvider kafkaProvider, final String host,
final Path stateDirectory) throws IOException, InterruptedException {
// TODO(drewc): we're constructing too many heavyweight things in the ctor, these kinds of
// things should be passed in/provider'd/etc.
final ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
this.agent = checkNotNull(host);
final Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile,
Json.type(Task.class));
tasks.addListener(new JobsListener());
final Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses",
provider,
taskStatusFile,
Paths.statusHostJobs(host));
this.historyWriter = new TaskHistoryWriter(
host, client, stateDirectory.resolve(TASK_HISTORY_FILENAME));
this.kafkaSender = new KafkaSender(kafkaProvider.getDefaultProducer());
}
@Override
protected void startUp() throws Exception {
tasks.startAsync().awaitRunning();
taskStatuses.startAsync().awaitRunning();
historyWriter.startAsync().awaitRunning();
}
@Override
protected void shutDown() throws Exception {
tasks.stopAsync().awaitTerminated();
taskStatuses.stopAsync().awaitTerminated();
historyWriter.stopAsync().awaitTerminated();
}
private JobId jobIdFromTaskPath(final String path) {
final String prefix = Paths.configHostJobs(agent) + "/";
return JobId.fromString(path.replaceFirst(prefix, ""));
}
/**
* Returns the tasks (basically, a pair of {@link JobId} and {@link Task}) for the current agent.
*/
@Override
public Map<JobId, Task> getTasks() {
final Map<JobId, Task> tasks = Maps.newHashMap();
for (Map.Entry<String, Task> entry : this.tasks.getNodes().entrySet()) {
final JobId id = jobIdFromTaskPath(entry.getKey());
tasks.put(id, entry.getValue());
}
return tasks;
}
/**
* Returns the {@link TaskStatus}es for all tasks assigned to the current agent.
*/
@Override
public Map<JobId, TaskStatus> getTaskStatuses() {
final Map<JobId, TaskStatus> statuses = Maps.newHashMap();
for (Map.Entry<String, byte[]> entry : this.taskStatuses.entrySet()) {
try {
final JobId id = JobId.fromString(entry.getKey());
final TaskStatus status = Json.read(entry.getValue(), TaskStatus.class);
statuses.put(id, status);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
return statuses;
}
/**
* Set the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void setTaskStatus(final JobId jobId, final TaskStatus status)
throws InterruptedException {
log.debug("setting task status: {}", status);
taskStatuses.put(jobId.toString(), status.toJsonBytes());
try {
historyWriter.saveHistoryItem(status);
} catch (Exception e) {
// Log error here and keep going as saving task history is not critical.
// This is to prevent bad data in the queue from screwing up the actually important Helios
// agent operations.
log.error("Error saving task status {} to ZooKeeper: {}", status, e);
}
final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
kafkaSender.send(KafkaRecord.of(TaskStatusEvent.KAFKA_TOPIC, event.toJsonBytes()));
}
/**
* Get the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public TaskStatus getTaskStatus(final JobId jobId) {
final byte[] data = taskStatuses.get(jobId.toString());
if (data == null) {
return null;
}
try {
return parse(data, TaskStatus.class);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
/**
* Remove the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void removeTaskStatus(final JobId jobId) throws InterruptedException {
taskStatuses.remove(jobId.toString());
}
/**
* Add a listener that will be notified when tasks are changed.
*/
@Override
public void addListener(final AgentModel.Listener listener) {
listeners.add(listener);
listener.tasksChanged(this);
}
/**
* Remove a listener that will be notified when tasks are changed.
*/
@Override
public void removeListener(final AgentModel.Listener listener) {
listeners.remove(listener);
}
protected void fireTasksUpdated() {
for (final AgentModel.Listener listener : listeners) {
try {
listener.tasksChanged(this);
} catch (Exception e) {
log.error("listener threw exception", e);
}
}
}
private class JobsListener implements PersistentPathChildrenCache.Listener {
@Override
public void nodesChanged(final PersistentPathChildrenCache<?> cache) {
fireTasksUpdated();
}
@Override
public void connectionStateChanged(final ConnectionState state) {
// ignore
}
}
}