From 52b93958e9cea216f960987efe0e7d1216df49f8 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Fri, 26 Jan 2024 13:46:59 +0800 Subject: [PATCH] Fix model not deploy issue under intensive prediction taskks (#1903) Signed-off-by: zane-neo (cherry picked from commit 521b8805472ef62a05e462e49cdcb57516428694) --- .../ml/action/syncup/TransportSyncUpOnNodeAction.java | 4 ++++ .../src/main/java/org/opensearch/ml/task/MLTaskManager.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java b/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java index e705bdb7cd..3afb2cff51 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java @@ -198,6 +198,10 @@ void cleanUpLocalCache(Map> runningDeployModelTasks) { } for (String taskId : allTaskIds) { MLTaskCache mlTaskCache = mlTaskManager.getMLTaskCache(taskId); + // Task could be a prediction task, and it could be completed and removed from cache in predict thread during the cleaning up. + if (mlTaskCache == null) { + continue; + } MLTask mlTask = mlTaskCache.getMlTask(); Instant lastUpdateTime = mlTask.getLastUpdateTime(); Instant now = Instant.now(); diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java b/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java index 9e9dea5d22..ca5b5b0abb 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java @@ -117,7 +117,7 @@ public synchronized void add(MLTask mlTask, List workerNodes) { throw new IllegalArgumentException("Duplicate taskId"); } taskCaches.put(taskId, new MLTaskCache(mlTask, workerNodes)); - log.debug("add ML task to cache " + taskId); + log.debug("add ML task to cache, taskId: {}, taskType: {} ", taskId, mlTask.getTaskType()); } /**