-
-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: reduce conflicts when update configmap in k8s #89 #93
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,27 +18,44 @@ | |||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
import com.ctrip.framework.apollo.core.utils.StringUtils; | ||||||||||||||||||||||||||||||||||||||||
import com.ctrip.framework.apollo.exceptions.ApolloConfigException; | ||||||||||||||||||||||||||||||||||||||||
import com.google.common.annotations.VisibleForTesting; | ||||||||||||||||||||||||||||||||||||||||
import com.google.common.base.Strings; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.ApiClient; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.ApiException; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.apis.CoreV1Api; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1ConfigMap; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1ObjectMeta; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1Pod; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1PodList; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.util.Config; | ||||||||||||||||||||||||||||||||||||||||
import org.slf4j.Logger; | ||||||||||||||||||||||||||||||||||||||||
import org.slf4j.LoggerFactory; | ||||||||||||||||||||||||||||||||||||||||
import org.springframework.stereotype.Service; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
import java.util.Comparator; | ||||||||||||||||||||||||||||||||||||||||
import java.util.HashMap; | ||||||||||||||||||||||||||||||||||||||||
import java.util.Map; | ||||||||||||||||||||||||||||||||||||||||
import java.util.Objects; | ||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.TimeUnit; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||
* Manages Kubernetes ConfigMap operations. | ||||||||||||||||||||||||||||||||||||||||
* Required Kubernetes permissions: | ||||||||||||||||||||||||||||||||||||||||
* - pods: [get, list] - For pod selection and write eligibility | ||||||||||||||||||||||||||||||||||||||||
* - configmaps: [get, create, update] - For ConfigMap operations | ||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||
@Service | ||||||||||||||||||||||||||||||||||||||||
public class KubernetesManager { | ||||||||||||||||||||||||||||||||||||||||
private static final Logger logger = LoggerFactory.getLogger(KubernetesManager.class); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private static final String RUNNING_POD_FIELD_SELECTOR = "status.phase=Running"; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private static final int MAX_SEARCH_NUM = 100; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private ApiClient client; | ||||||||||||||||||||||||||||||||||||||||
private CoreV1Api coreV1Api; | ||||||||||||||||||||||||||||||||||||||||
private int propertyKubernetesMaxWritePods = 3; | ||||||||||||||||||||||||||||||||||||||||
private String localPodName = System.getenv("HOSTNAME"); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
public KubernetesManager() { | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
|
@@ -51,8 +68,11 @@ public KubernetesManager() { | |||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
public KubernetesManager(CoreV1Api coreV1Api) { | ||||||||||||||||||||||||||||||||||||||||
@VisibleForTesting | ||||||||||||||||||||||||||||||||||||||||
public KubernetesManager(CoreV1Api coreV1Api, String localPodName, int propertyKubernetesMaxWritePods) { | ||||||||||||||||||||||||||||||||||||||||
this.coreV1Api = coreV1Api; | ||||||||||||||||||||||||||||||||||||||||
this.localPodName = localPodName; | ||||||||||||||||||||||||||||||||||||||||
this.propertyKubernetesMaxWritePods = propertyKubernetesMaxWritePods; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private V1ConfigMap buildConfigMap(String name, String namespace, Map<String, String> data) { | ||||||||||||||||||||||||||||||||||||||||
|
@@ -132,6 +152,10 @@ public boolean updateConfigMap(String k8sNamespace, String name, Map<String, Str | |||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if (!isWritePod(k8sNamespace)) { | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
int maxRetries = 5; | ||||||||||||||||||||||||||||||||||||||||
int retryCount = 0; | ||||||||||||||||||||||||||||||||||||||||
long waitTime = 100; | ||||||||||||||||||||||||||||||||||||||||
|
@@ -205,4 +229,43 @@ public boolean checkConfigMapExist(String k8sNamespace, String configMapName) { | |||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||
* check pod whether pod can write configmap | ||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||
* @param k8sNamespace config map namespace | ||||||||||||||||||||||||||||||||||||||||
* @return true if this pod can write configmap, false otherwise | ||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||
private boolean isWritePod(String k8sNamespace) { | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
if (Strings.isNullOrEmpty(localPodName)) { | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
V1Pod localPod = coreV1Api.readNamespacedPod(localPodName, k8sNamespace, null); | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
V1ObjectMeta localMetadata = localPod.getMetadata(); | ||||||||||||||||||||||||||||||||||||||||
if (localMetadata == null || localMetadata.getLabels() == null) { | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
String appName = localMetadata.getLabels().get("app"); | ||||||||||||||||||||||||||||||||||||||||
String labelSelector = "app=" + appName; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
V1PodList v1PodList = coreV1Api.listNamespacedPod(k8sNamespace, null, null, | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the number of pod instances is large, the interface return value may be particularly large, and the limit parameter needs to be used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what is a large number of instances, so I set it to 1000 . How about your advice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think 50 or 100 is sufficient. 1000 clients pull 1000 pod lists from the k8s server at the same time, which may affect the performance of the k8s server or network. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
First of all, I agree with you. According to the test, there is little difference between the returned data size of 1000 and 200.Howerer, in the worst scenario, there will be 1000 pod trying to update the pod, which is not as expected.100 may be a good choice. |
||||||||||||||||||||||||||||||||||||||||
null, RUNNING_POD_FIELD_SELECTOR, labelSelector, | ||||||||||||||||||||||||||||||||||||||||
MAX_SEARCH_NUM, null, null | ||||||||||||||||||||||||||||||||||||||||
, null, null); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return v1PodList.getItems().stream() | ||||||||||||||||||||||||||||||||||||||||
.map(V1Pod::getMetadata) | ||||||||||||||||||||||||||||||||||||||||
.filter(Objects::nonNull) | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
//Make each node selects the same write nodes by sorting | ||||||||||||||||||||||||||||||||||||||||
.filter(metadata -> metadata.getCreationTimestamp() != null) | ||||||||||||||||||||||||||||||||||||||||
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp)) | ||||||||||||||||||||||||||||||||||||||||
.map(V1ObjectMeta::getName) | ||||||||||||||||||||||||||||||||||||||||
.limit(propertyKubernetesMaxWritePods) | ||||||||||||||||||||||||||||||||||||||||
.anyMatch(localPodName::equals); | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+257
to
+265
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance pod selection criteria The current pod selection logic only considers the creation timestamp and "Running" status. Consider additional criteria:
Example enhancement: return v1PodList.getItems().stream()
.map(V1Pod::getMetadata)
.filter(Objects::nonNull)
+ .filter(metadata -> metadata.getDeletionTimestamp() == null)
.filter(metadata -> metadata.getCreationTimestamp() != null)
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
.map(V1ObjectMeta::getName)
.limit(propertyKubernetesMaxWritePods)
.anyMatch(localPodName::equals); 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||
logger.info("Error determining write pod eligibility:{}", e.getMessage(), e); | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -21,14 +21,30 @@ | |||||||||||||||||
import io.kubernetes.client.openapi.apis.CoreV1Api; | ||||||||||||||||||
import io.kubernetes.client.openapi.models.V1ConfigMap; | ||||||||||||||||||
import io.kubernetes.client.openapi.models.V1ObjectMeta; | ||||||||||||||||||
import io.kubernetes.client.openapi.models.V1Pod; | ||||||||||||||||||
import io.kubernetes.client.openapi.models.V1PodList; | ||||||||||||||||||
import org.junit.Before; | ||||||||||||||||||
import org.junit.Test; | ||||||||||||||||||
import org.mockito.Mockito; | ||||||||||||||||||
|
||||||||||||||||||
import java.time.OffsetDateTime; | ||||||||||||||||||
import java.util.Collections; | ||||||||||||||||||
import java.util.HashMap; | ||||||||||||||||||
import java.util.Map; | ||||||||||||||||||
|
||||||||||||||||||
import static org.mockito.Mockito.*; | ||||||||||||||||||
import static org.junit.Assert.*; | ||||||||||||||||||
import static org.junit.Assert.assertEquals; | ||||||||||||||||||
import static org.junit.Assert.assertFalse; | ||||||||||||||||||
import static org.junit.Assert.assertNull; | ||||||||||||||||||
import static org.junit.Assert.assertTrue; | ||||||||||||||||||
import static org.mockito.Mockito.any; | ||||||||||||||||||
import static org.mockito.Mockito.doReturn; | ||||||||||||||||||
import static org.mockito.Mockito.doThrow; | ||||||||||||||||||
import static org.mockito.Mockito.eq; | ||||||||||||||||||
import static org.mockito.Mockito.isNull; | ||||||||||||||||||
import static org.mockito.Mockito.mock; | ||||||||||||||||||
import static org.mockito.Mockito.times; | ||||||||||||||||||
import static org.mockito.Mockito.verify; | ||||||||||||||||||
import static org.mockito.Mockito.when; | ||||||||||||||||||
|
||||||||||||||||||
public class KubernetesManagerTest { | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -38,7 +54,7 @@ public class KubernetesManagerTest { | |||||||||||||||||
@Before | ||||||||||||||||||
public void setUp() { | ||||||||||||||||||
coreV1Api = mock(CoreV1Api.class); | ||||||||||||||||||
kubernetesManager = new KubernetesManager(coreV1Api); | ||||||||||||||||||
kubernetesManager = new KubernetesManager(coreV1Api, "localPodName", 3); | ||||||||||||||||||
|
||||||||||||||||||
MockInjector.setInstance(KubernetesManager.class, kubernetesManager); | ||||||||||||||||||
MockInjector.setInstance(CoreV1Api.class, coreV1Api); | ||||||||||||||||||
|
@@ -58,13 +74,13 @@ public void testCreateConfigMapSuccess() throws Exception { | |||||||||||||||||
.metadata(new V1ObjectMeta().name(name).namespace(namespace)) | ||||||||||||||||||
.data(data); | ||||||||||||||||||
|
||||||||||||||||||
when(coreV1Api.createNamespacedConfigMap(eq(namespace), eq(configMap), isNull(), isNull(), isNull(),isNull())).thenReturn(configMap); | ||||||||||||||||||
when(coreV1Api.createNamespacedConfigMap(eq(namespace), eq(configMap), isNull(), isNull(), isNull(), isNull())).thenReturn(configMap); | ||||||||||||||||||
|
||||||||||||||||||
// act | ||||||||||||||||||
String result = kubernetesManager.createConfigMap(namespace, name, data); | ||||||||||||||||||
|
||||||||||||||||||
// assert | ||||||||||||||||||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(),isNull()); | ||||||||||||||||||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(), isNull()); | ||||||||||||||||||
assert name.equals(result); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -82,7 +98,7 @@ public void testCreateConfigMapNullData() throws Exception { | |||||||||||||||||
String result = kubernetesManager.createConfigMap(namespace, name, data); | ||||||||||||||||||
|
||||||||||||||||||
// assert | ||||||||||||||||||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(),isNull()); | ||||||||||||||||||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(), isNull()); | ||||||||||||||||||
assert name.equals(result); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -135,20 +151,40 @@ public void testUpdateConfigMapSuccess() throws Exception { | |||||||||||||||||
// arrange | ||||||||||||||||||
String namespace = "default"; | ||||||||||||||||||
String name = "testConfigMap"; | ||||||||||||||||||
Map<String, String> data = new HashMap<>(); | ||||||||||||||||||
data.put("key", "value"); | ||||||||||||||||||
|
||||||||||||||||||
V1Pod pod = new V1Pod() | ||||||||||||||||||
.metadata( | ||||||||||||||||||
new V1ObjectMeta() | ||||||||||||||||||
.name("localPodName") | ||||||||||||||||||
.creationTimestamp(OffsetDateTime.now()) | ||||||||||||||||||
.labels(Collections.singletonMap("app", "app"))); | ||||||||||||||||||
V1PodList v1PodList = new V1PodList().addItemsItem(new V1Pod().metadata(pod.getMetadata())); | ||||||||||||||||||
Comment on lines
+155
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add test cases for pod write eligibility scenarios. The current test only covers the basic success case. Add test cases for:
Would you like me to generate these additional test cases? |
||||||||||||||||||
|
||||||||||||||||||
Map<String, String> existData = new HashMap<>(); | ||||||||||||||||||
existData.put("key", "value"); | ||||||||||||||||||
V1ConfigMap configMap = new V1ConfigMap(); | ||||||||||||||||||
configMap.metadata(new V1ObjectMeta().name(name).namespace(namespace)); | ||||||||||||||||||
configMap.data(data); | ||||||||||||||||||
configMap.data(existData); | ||||||||||||||||||
|
||||||||||||||||||
when(coreV1Api.readNamespacedPod("localPodName", namespace, null)).thenReturn(pod); | ||||||||||||||||||
when(coreV1Api.listNamespacedPod(namespace, null, null, | ||||||||||||||||||
null, null, "app=app", | ||||||||||||||||||
null, null, null | ||||||||||||||||||
, null, null)).thenReturn(v1PodList); | ||||||||||||||||||
Comment on lines
+169
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Test case needs alignment with KubernetesManager implementation The test case doesn't match the actual implementation in KubernetesManager.java:
🔗 Analysis chainVerify the listNamespacedPod parameters. The API call uses hardcoded values that might need adjustment:
🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for other test files that might define these constants
rg -t java "app=app" .
rg -t java "status\.phase=Running" .
Length of output: 675 Script: #!/bin/bash
# Search for the implementation and any constants
ast-grep --pattern 'private static final String $_ = "app=$_";'
cat ./apollo-client/src/main/java/com/ctrip/framework/apollo/kubernetes/KubernetesManager.java
Length of output: 11517 |
||||||||||||||||||
when(coreV1Api.readNamespacedConfigMap(name, namespace, null)).thenReturn(configMap); | ||||||||||||||||||
when(coreV1Api.replaceNamespacedConfigMap(name, namespace, configMap, null, null, null, null)).thenReturn(configMap); | ||||||||||||||||||
|
||||||||||||||||||
// act | ||||||||||||||||||
Boolean success = kubernetesManager.updateConfigMap(namespace, name, data); | ||||||||||||||||||
HashMap<String, String> updateData = new HashMap<>(existData); | ||||||||||||||||||
updateData.put("newKey","newValue"); | ||||||||||||||||||
boolean success = kubernetesManager.updateConfigMap(namespace, name, updateData); | ||||||||||||||||||
|
||||||||||||||||||
// assert | ||||||||||||||||||
assertTrue(success); | ||||||||||||||||||
Mockito.verify(coreV1Api, Mockito.times(1)).listNamespacedPod(namespace, null, null, | ||||||||||||||||||
null, "status.phase=Running", "app=app", | ||||||||||||||||||
100, null, null | ||||||||||||||||||
, null, null); | ||||||||||||||||||
Comment on lines
+184
to
+187
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Verify the pod listing parameters. The verification uses different parameters than the actual call:
Apply this diff to align the parameters: - when(coreV1Api.listNamespacedPod(namespace, null, null,
- null, null, "app=app",
- null, null, null
- , null, null)).thenReturn(v1PodList);
+ when(coreV1Api.listNamespacedPod(namespace, null, null,
+ null, "status.phase=Running", "app=app",
+ 100, null, null
+ , null, null)).thenReturn(v1PodList); 📝 Committable suggestion
Suggested change
|
||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reconsider return value for non-write pods
Returning
true
whenisWritePod
returnsfalse
might mask issues by indicating success when no update was attempted. Consider returning a different status or throwing an exception to make it clear that the pod is not eligible to write.