Skip to content
This repository has been archived by the owner on Jul 7, 2020. It is now read-only.

First pass of job lock usage review #233

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hydra-main/src/main/java/com/addthis/hydra/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ public synchronized List<JobTask> getCopyOfTasks() {
return ImmutableList.copyOf(nodes);
}

// FIXME must be synchronized - iterating over nodes is not thread-safe
public List<JobTask> getCopyOfTasksSorted() {
if (nodes == null) {
nodes = new ArrayList<>();
Expand Down
8 changes: 4 additions & 4 deletions hydra-main/src/main/java/com/addthis/hydra/job/JobExpand.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.addthis.hydra.job;

import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -22,8 +24,8 @@
import java.util.regex.Pattern;

import com.addthis.basis.net.HttpUtil;
import com.addthis.basis.util.Parameter;
import com.addthis.basis.util.LessStrings;
import com.addthis.basis.util.Parameter;
import com.addthis.basis.util.TokenReplacer;
import com.addthis.basis.util.TokenReplacerOverflowException;

Expand All @@ -33,7 +35,6 @@
import com.addthis.hydra.job.alias.AliasManager;
import com.addthis.hydra.job.entity.JobEntityManager;
import com.addthis.hydra.job.entity.JobMacro;
import com.addthis.hydra.job.entity.JobMacroManager;
import com.addthis.hydra.job.spawn.Spawn;

import com.google.common.base.Joiner;
Expand All @@ -43,8 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

public class JobExpand {

private static final int maxDepth = Parameter.intValue("spawn.macro.expand.depth", 256);
Expand Down Expand Up @@ -314,6 +313,7 @@ public String replace(Region region, String label) {
List<String> mfn = Lists.newArrayList(Splitter.on(' ').split(label));
String macroName = mfn.get(0);
List<String> tokens = mfn.subList(1, mfn.size());
// TODO delete jobhosts handling - it's only used in old macros that are not used by any job
if (macroName.equals("jobhosts")) {
JobMacro macro = spawn.createJobHostMacro(tokens.get(0), Integer.parseInt(tokens.get(1)));
return macro.getMacro();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class JobRekickTask implements Runnable {
try {
if (!spawn.getSystemManager().isQuiesced()) {
String[] jobids = null;
// Not needed
spawn.acquireJobLock();
try {
jobids = new String[spawn.spawnState.jobs.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public ScheduledTaskKick(Spawn spawn,
}
} catch (Exception e) {
log.warn("failed to kick job {} task {} on host {}", jobId, kick.getNodeID(), kick.getHostUuid(), e);
// Maybe
spawn.acquireJobLock();
try {
job.errorTask(task, JobTaskErrorCode.KICK_ERROR);
Expand Down
23 changes: 23 additions & 0 deletions hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ public Job createJob(String creator,
String minionType,
String command,
boolean defaults) throws Exception {
// Possibly move lock acquire to just before putJobInSpawnState? Does task assignment need to be guarded by job lock?
acquireJobLock();
try {
Job job = new Job(UUID.randomUUID().toString(), creator);
Expand Down Expand Up @@ -765,6 +766,7 @@ public void submitConfigUpdate(String jobId, String user, @Nullable String commi
if (jobUUID == null) {
return null;
}
// Not needed
acquireJobLock();
try {
return spawnState.jobs.get(jobUUID);
Expand All @@ -777,6 +779,7 @@ public void submitConfigUpdate(String jobId, String user, @Nullable String commi
if (jobUUID == null) {
return null;
}
// Why? To prevent concurrent calls to getConfig? Synchronize that method!
acquireJobLock();
try {
return jobConfigManager.getConfig(jobUUID);
Expand Down Expand Up @@ -813,6 +816,7 @@ public Response synchronizeJobState(String jobUUID, String user, String token, S

public Collection<Job> listJobs() {
List<Job> clones = new ArrayList<>(spawnState.jobs.size());
// Not needed
acquireJobLock();
try {
for (Job job : spawnState.jobs.values()) {
Expand Down Expand Up @@ -1133,6 +1137,7 @@ public boolean swapTask(JobTask task, String replicaHostID, boolean kickOnComple
return false;
}
Job job;
// Maybe
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly believe that this lock should be kept.

If we somehow tried to swap in two threads at the same time (swaps can be executed via an HTTP call, I believe) then modifications to the replicas and the hostUUID could interleave in a very harmful way -- for example, setting the replica and the host to the same UUID and "losing" a valid replica elsewhere.

acquireJobLock();
try {
job = getJob(task.getJobUUID());
Expand Down Expand Up @@ -1260,6 +1265,8 @@ public List<JobTaskMoveAssignment> executeReallocationAssignments(@Nullable List
* @return True if the task is successfully removed
*/
public boolean deleteTask(String jobUUID, String hostUuid, Integer node, boolean isReplica) {
// Maybe - but probably can be better handled by a lock on the individual job/task?
// Why do spawnMQ.sendControlMessage and queueJobTaskUpdateEvent need to be guarded?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, I think the task.setReplicas call is worthy of consideration.

If a task has replicas A, B, and C, and one thread tries to remove A at the same time as another thread tries to remove B, then it is possible that one removal would be swallowed depending on the timing.

acquireJobLock();
try {
if ((jobUUID == null) || (node == null)) {
Expand Down Expand Up @@ -1313,6 +1320,7 @@ private static List<JobTaskReplica> removeReplicasForHost(String hostUuid, List<
}

public void queueJobTaskUpdateEvent(IJob job) {
// Not needed
acquireJobLock();
try {
jobUpdateQueue.add(job.getId());
Expand All @@ -1339,6 +1347,7 @@ public void queueJobTaskUpdateEvent(IJob job) {
public void updateJob(@Nullable IJob ijob, boolean reviseReplicas) throws Exception {
checkNotNull(ijob, "ijob");
Job job = new Job(ijob);
// Maybe
acquireJobLock();
try {
checkArgument(getJob(job.getId()) != null, "job " + job.getId() + " does not exist");
Expand Down Expand Up @@ -1388,6 +1397,7 @@ public Set<String> getDataSources(String jobId) {
if ((job == null) || (job.getParameters() == null)) {
return dataSources;
}
// Maybe
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that parameters is a non-threadsafe collection that is exposed directly via a getter, I think that some protection is necessary. There is nothing to stop someone from calling job.getParameters.add(something) at an inopportune moment.

Changing the getter to return a copy rather than the actual list might be a more lightweight solution

acquireJobLock();
try {
for (JobParameter param : job.getParameters()) {
Expand Down Expand Up @@ -1480,6 +1490,7 @@ public RebalanceOutcome rebalanceJob(String jobUUID, int tasksToMove, String use
* @return A string description
*/
public JSONObject fixTaskDir(String jobId, int node, boolean ignoreTaskState, boolean orphansOnly) {
// Maybe
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given all the task state checks inside the loop, I think this one should be kept.

acquireJobLock();
try {
Job job = getJob(jobId);
Expand Down Expand Up @@ -1552,6 +1563,7 @@ public boolean resolveJobTaskDirectoryMatches(JobTask task, boolean deleteOrphan

public JSONArray checkTaskDirJSON(String jobId, int node) {
JSONArray resultList = new JSONArray();
// Maybe
acquireJobLock();
try {
Job job = getJob(jobId);
Expand Down Expand Up @@ -1656,6 +1668,7 @@ public boolean checkStatusForMove(String hostID) {
}

public boolean prepareTaskStatesForRebalance(Job job, JobTask task, boolean isMigration) {
// Why?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is conceivable that the task could have suddenly kicked in between the call to isInMovableState and the call to job.setTaskState .

If that were the case, the task state would show up as REBALANCE, but the true state would be RUNNING, which would be a bad outcome.

Holding the job lock here prevents the task from kicking until we have decided whether we are rebalancing this task.

acquireJobLock();
try {
if (!SpawnBalancer.isInMovableState(task)) {
Expand All @@ -1672,6 +1685,7 @@ public boolean prepareTaskStatesForRebalance(Job job, JobTask task, boolean isMi
}

public DeleteStatus forceDeleteJob(String jobUUID) throws Exception {
// job lock is over kill - a better option could be to make setEnabled synchronized if concurrent modification to the job is the concern
acquireJobLock();
Job job;
try {
Expand All @@ -1687,6 +1701,7 @@ public DeleteStatus forceDeleteJob(String jobUUID) throws Exception {
} finally {
releaseJobLock();
}
// If job is deleted in another thread, stopJob and killJob will throw exception. Should catch that and return DeleteStatus.JOB_MISSING
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we are calling both stopJob and killJob in the first place. If we are hoping to let the job shut down cleanly, 100 ms is certainly not enough when we consider that the job almost certainly has to replicate and backup. I think killJob should be sufficient, since our intent is to delete anyway.

while ((job != null) && (job.getCountActiveTasks() > 0)) {
stopJob(jobUUID);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
Expand All @@ -1698,6 +1713,7 @@ public DeleteStatus forceDeleteJob(String jobUUID) throws Exception {
}

public DeleteStatus deleteJob(String jobUUID) throws Exception {
// OK
acquireJobLock();
try {
Job job = getJob(jobUUID);
Expand Down Expand Up @@ -1932,6 +1948,7 @@ public void stopJob(String jobUUID) throws Exception {
public void killJob(String jobUUID) throws Exception {
boolean success = false;
while (!success && !shuttingDown.get()) {
// Maybe
Copy link

@ghost ghost Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this loop is concerning for an unrelated reason -- if the queueLock is locked, we call tryLock over and over again without any sort of delay in between. Maybe we should sleep for a few millis on failure?

acquireJobLock();
try {
if (taskQueuesByPriority.tryLock()) {
Expand Down Expand Up @@ -2099,6 +2116,7 @@ public void handleRebalanceFinish(IJob job, JobTask task, StatusTaskEnd update)
public JobMacro createJobHostMacro(String job, int port) {
String sPort = Integer.valueOf(port).toString();
Set<String> jobHosts = new TreeSet<>();// best set?
// The whole method can be removed
acquireJobLock();
try {
Collection<HostState> hosts = hostManager.listHostStatus(null);
Expand Down Expand Up @@ -2138,6 +2156,7 @@ public JobMacro createJobHostMacro(String job, int port) {
* sent for a while.
*/
public void saveAllJobs() {
// Maybe
acquireJobLock();
try {
for (Job job : listJobs()) {
Expand All @@ -2157,6 +2176,7 @@ public void saveAllJobs() {
* send job update event to registered listeners (usually http clients)
*/
private void sendJobUpdateEvent(Job job) {
// Maybe
acquireJobLock();
try {
jobConfigManager.updateJob(job);
Expand Down Expand Up @@ -2457,6 +2477,7 @@ public void kickJobsOnQueue() {
boolean success = false;
while (!success && !shuttingDown.get()) {
// need the job lock first
// Maybe...
acquireJobLock();
try {
if (taskQueuesByPriority.tryLock()) {
Expand Down Expand Up @@ -2555,6 +2576,7 @@ List<HostState> getHealthyHostStatesHousingTask(JobTask task, boolean allowRepli
}

@VisibleForTesting protected void loadJobs() {
// Probably can do without
acquireJobLock();
try {
for (IJob iJob : jobConfigManager.loadJobs().values()) {
Expand All @@ -2566,6 +2588,7 @@ List<HostState> getHealthyHostStatesHousingTask(JobTask task, boolean allowRepli
releaseJobLock();
}
Thread loadDependencies = new Thread(() -> {
// FIXME just iterate over the map entries...
Set<String> jobIds = spawnState.jobs.keySet();
for (String jobId : jobIds) {
IJob job = getJob(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void run() {
int taskQueuedNoSlot = 0;
long files = 0;
long bytes = 0;
// Why?
spawn.acquireJobLock();
try {
for (Job job : spawn.spawnState.jobs.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ public void fixTasksForFailedHost(List<HostState> hosts, String failedHost) {

private List<JobTask> findAllTasksAssignedToHost(String failedHostUUID) {
List<JobTask> rv = new ArrayList<>();
// Why?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it is prudent to hold the job lock while failing hosts -- which is the only time this method is called. We don't want to, e.g., kick tasks before all the bad hosts have been replaced.

However, this read-only method does not appear to be useful place to get the lock. I would suggest SpawnBalancer's attemptFixTaskForFailedHost as a method that more clearly needs the lock.

spawn.acquireJobLock();
try {
for (Job job : spawn.listJobs()) {
Expand Down Expand Up @@ -1384,6 +1385,7 @@ private Collection<JobTaskMoveAssignment> balanceActiveJobsOnHost(HostState host
List<JobTaskMoveAssignment> rv = purgeMisplacedTasks(host, 1);
String hostID = host.getHostUuid();
for (String jobID : activeJobs) {
// Maybe
spawn.acquireJobLock();
try {
Job job = spawn.getJob(jobID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ private void updateDiskQuotas() {
}
Map<String, List<MinimalJob>> quotas = new HashMap<>();
long totalBytes = 0;
// Why?
spawn.acquireJobLock();
try {
Iterator<Job> jobIterator = spawn.getSpawnState().jobsIterator();
Expand Down