diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/Job.java b/hydra-main/src/main/java/com/addthis/hydra/job/Job.java index d20dc53ee..fba75ad06 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/Job.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/Job.java @@ -577,6 +577,7 @@ public synchronized List getCopyOfTasks() { return ImmutableList.copyOf(nodes); } + // FIXME must be synchronized - iterating over nodes is not thread-safe public List getCopyOfTasksSorted() { if (nodes == null) { nodes = new ArrayList<>(); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/JobExpand.java b/hydra-main/src/main/java/com/addthis/hydra/job/JobExpand.java index 841d6413c..c25b8da5f 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/JobExpand.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/JobExpand.java @@ -13,6 +13,8 @@ */ package com.addthis.hydra.job; +import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -22,8 +24,8 @@ import java.util.regex.Pattern; import com.addthis.basis.net.HttpUtil; -import com.addthis.basis.util.Parameter; import com.addthis.basis.util.LessStrings; +import com.addthis.basis.util.Parameter; import com.addthis.basis.util.TokenReplacer; import com.addthis.basis.util.TokenReplacerOverflowException; @@ -33,7 +35,6 @@ import com.addthis.hydra.job.alias.AliasManager; import com.addthis.hydra.job.entity.JobEntityManager; import com.addthis.hydra.job.entity.JobMacro; -import com.addthis.hydra.job.entity.JobMacroManager; import com.addthis.hydra.job.spawn.Spawn; import com.google.common.base.Joiner; @@ -43,8 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - public class JobExpand { private static final int maxDepth = Parameter.intValue("spawn.macro.expand.depth", 256); @@ -314,6 +313,7 @@ public String replace(Region region, String label) { List mfn = Lists.newArrayList(Splitter.on(' ').split(label)); String macroName = mfn.get(0); List tokens = mfn.subList(1, mfn.size()); + // TODO delete jobhosts handling - it's only used in old macros that are not used by any job if (macroName.equals("jobhosts")) { JobMacro macro = spawn.createJobHostMacro(tokens.get(0), Integer.parseInt(tokens.get(1))); return macro.getMacro(); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/JobRekickTask.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/JobRekickTask.java index 130ee15e5..7f2da6d83 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/JobRekickTask.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/JobRekickTask.java @@ -41,6 +41,7 @@ class JobRekickTask implements Runnable { try { if (!spawn.getSystemManager().isQuiesced()) { String[] jobids = null; + // Not needed spawn.acquireJobLock(); try { jobids = new String[spawn.spawnState.jobs.size()]; diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/ScheduledTaskKick.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/ScheduledTaskKick.java index e6ad970f4..84d4d0d51 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/ScheduledTaskKick.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/ScheduledTaskKick.java @@ -71,6 +71,7 @@ public ScheduledTaskKick(Spawn spawn, } } catch (Exception e) { log.warn("failed to kick job {} task {} on host {}", jobId, kick.getNodeID(), kick.getHostUuid(), e); + // Maybe spawn.acquireJobLock(); try { job.errorTask(task, JobTaskErrorCode.KICK_ERROR); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java index 6e6bf70e0..b565588f6 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java @@ -681,6 +681,7 @@ public Job createJob(String creator, String minionType, String command, boolean defaults) throws Exception { + // Possibly move lock acquire to just before putJobInSpawnState? Does task assignment need to be guarded by job lock? acquireJobLock(); try { Job job = new Job(UUID.randomUUID().toString(), creator); @@ -765,6 +766,7 @@ public void submitConfigUpdate(String jobId, String user, @Nullable String commi if (jobUUID == null) { return null; } + // Not needed acquireJobLock(); try { return spawnState.jobs.get(jobUUID); @@ -777,6 +779,7 @@ public void submitConfigUpdate(String jobId, String user, @Nullable String commi if (jobUUID == null) { return null; } + // Why? To prevent concurrent calls to getConfig? Synchronize that method! acquireJobLock(); try { return jobConfigManager.getConfig(jobUUID); @@ -813,6 +816,7 @@ public Response synchronizeJobState(String jobUUID, String user, String token, S public Collection listJobs() { List clones = new ArrayList<>(spawnState.jobs.size()); + // Not needed acquireJobLock(); try { for (Job job : spawnState.jobs.values()) { @@ -1133,6 +1137,7 @@ public boolean swapTask(JobTask task, String replicaHostID, boolean kickOnComple return false; } Job job; + // Maybe acquireJobLock(); try { job = getJob(task.getJobUUID()); @@ -1260,6 +1265,8 @@ public List executeReallocationAssignments(@Nullable List * @return True if the task is successfully removed */ public boolean deleteTask(String jobUUID, String hostUuid, Integer node, boolean isReplica) { + // Maybe - but probably can be better handled by a lock on the individual job/task? + // Why do spawnMQ.sendControlMessage and queueJobTaskUpdateEvent need to be guarded? acquireJobLock(); try { if ((jobUUID == null) || (node == null)) { @@ -1313,6 +1320,7 @@ private static List removeReplicasForHost(String hostUuid, List< } public void queueJobTaskUpdateEvent(IJob job) { + // Not needed acquireJobLock(); try { jobUpdateQueue.add(job.getId()); @@ -1339,6 +1347,7 @@ public void queueJobTaskUpdateEvent(IJob job) { public void updateJob(@Nullable IJob ijob, boolean reviseReplicas) throws Exception { checkNotNull(ijob, "ijob"); Job job = new Job(ijob); + // Maybe acquireJobLock(); try { checkArgument(getJob(job.getId()) != null, "job " + job.getId() + " does not exist"); @@ -1388,6 +1397,7 @@ public Set getDataSources(String jobId) { if ((job == null) || (job.getParameters() == null)) { return dataSources; } + // Maybe acquireJobLock(); try { for (JobParameter param : job.getParameters()) { @@ -1480,6 +1490,7 @@ public RebalanceOutcome rebalanceJob(String jobUUID, int tasksToMove, String use * @return A string description */ public JSONObject fixTaskDir(String jobId, int node, boolean ignoreTaskState, boolean orphansOnly) { + // Maybe acquireJobLock(); try { Job job = getJob(jobId); @@ -1552,6 +1563,7 @@ public boolean resolveJobTaskDirectoryMatches(JobTask task, boolean deleteOrphan public JSONArray checkTaskDirJSON(String jobId, int node) { JSONArray resultList = new JSONArray(); + // Maybe acquireJobLock(); try { Job job = getJob(jobId); @@ -1656,6 +1668,7 @@ public boolean checkStatusForMove(String hostID) { } public boolean prepareTaskStatesForRebalance(Job job, JobTask task, boolean isMigration) { + // Why? acquireJobLock(); try { if (!SpawnBalancer.isInMovableState(task)) { @@ -1672,6 +1685,7 @@ public boolean prepareTaskStatesForRebalance(Job job, JobTask task, boolean isMi } public DeleteStatus forceDeleteJob(String jobUUID) throws Exception { + // job lock is over kill - a better option could be to make setEnabled synchronized if concurrent modification to the job is the concern acquireJobLock(); Job job; try { @@ -1687,6 +1701,7 @@ public DeleteStatus forceDeleteJob(String jobUUID) throws Exception { } finally { releaseJobLock(); } + // If job is deleted in another thread, stopJob and killJob will throw exception. Should catch that and return DeleteStatus.JOB_MISSING while ((job != null) && (job.getCountActiveTasks() > 0)) { stopJob(jobUUID); Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -1698,6 +1713,7 @@ public DeleteStatus forceDeleteJob(String jobUUID) throws Exception { } public DeleteStatus deleteJob(String jobUUID) throws Exception { + // OK acquireJobLock(); try { Job job = getJob(jobUUID); @@ -1932,6 +1948,7 @@ public void stopJob(String jobUUID) throws Exception { public void killJob(String jobUUID) throws Exception { boolean success = false; while (!success && !shuttingDown.get()) { + // Maybe acquireJobLock(); try { if (taskQueuesByPriority.tryLock()) { @@ -2099,6 +2116,7 @@ public void handleRebalanceFinish(IJob job, JobTask task, StatusTaskEnd update) public JobMacro createJobHostMacro(String job, int port) { String sPort = Integer.valueOf(port).toString(); Set jobHosts = new TreeSet<>();// best set? + // The whole method can be removed acquireJobLock(); try { Collection hosts = hostManager.listHostStatus(null); @@ -2138,6 +2156,7 @@ public JobMacro createJobHostMacro(String job, int port) { * sent for a while. */ public void saveAllJobs() { + // Maybe acquireJobLock(); try { for (Job job : listJobs()) { @@ -2157,6 +2176,7 @@ public void saveAllJobs() { * send job update event to registered listeners (usually http clients) */ private void sendJobUpdateEvent(Job job) { + // Maybe acquireJobLock(); try { jobConfigManager.updateJob(job); @@ -2457,6 +2477,7 @@ public void kickJobsOnQueue() { boolean success = false; while (!success && !shuttingDown.get()) { // need the job lock first + // Maybe... acquireJobLock(); try { if (taskQueuesByPriority.tryLock()) { @@ -2555,6 +2576,7 @@ List getHealthyHostStatesHousingTask(JobTask task, boolean allowRepli } @VisibleForTesting protected void loadJobs() { + // Probably can do without acquireJobLock(); try { for (IJob iJob : jobConfigManager.loadJobs().values()) { @@ -2566,6 +2588,7 @@ List getHealthyHostStatesHousingTask(JobTask task, boolean allowRepli releaseJobLock(); } Thread loadDependencies = new Thread(() -> { + // FIXME just iterate over the map entries... Set jobIds = spawnState.jobs.keySet(); for (String jobId : jobIds) { IJob job = getJob(jobId); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/UpdateEventRunnable.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/UpdateEventRunnable.java index ba1c15d5f..a272caac5 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/UpdateEventRunnable.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/UpdateEventRunnable.java @@ -52,6 +52,7 @@ public void run() { int taskQueuedNoSlot = 0; long files = 0; long bytes = 0; + // Why? spawn.acquireJobLock(); try { for (Job job : spawn.spawnState.jobs.values()) { diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/balancer/SpawnBalancer.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/balancer/SpawnBalancer.java index 8b7be25ed..a1d7a1563 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/balancer/SpawnBalancer.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/balancer/SpawnBalancer.java @@ -808,6 +808,7 @@ public void fixTasksForFailedHost(List hosts, String failedHost) { private List findAllTasksAssignedToHost(String failedHostUUID) { List rv = new ArrayList<>(); + // Why? spawn.acquireJobLock(); try { for (Job job : spawn.listJobs()) { @@ -1384,6 +1385,7 @@ private Collection balanceActiveJobsOnHost(HostState host List rv = purgeMisplacedTasks(host, 1); String hostID = host.getHostUuid(); for (String jobID : activeJobs) { + // Maybe spawn.acquireJobLock(); try { Job job = spawn.getJob(jobID); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/GroupsResource.java b/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/GroupsResource.java index eaf0a5e2c..31cb621ab 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/GroupsResource.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/GroupsResource.java @@ -205,6 +205,7 @@ private void updateDiskQuotas() { } Map> quotas = new HashMap<>(); long totalBytes = 0; + // Why? spawn.acquireJobLock(); try { Iterator jobIterator = spawn.getSpawnState().jobsIterator();