From 8d6f0665ecaa87f770ce1fc77ffea94cecb19db9 Mon Sep 17 00:00:00 2001 From: Manuel Finelli Date: Thu, 2 May 2024 11:46:34 +0100 Subject: [PATCH] Added a switch to disable transactions recovery; new state for heuristic transactions; event to signal HEURISTIC transactions; introduced recovery counter --- api/v1alpha1/wildflyserver_types.go | 36 ++- api/v1alpha1/zz_generated.openapi.go | 14 + .../crd/bases/wildfly.org_wildflyservers.yaml | 12 +- controllers/transaction_recovery.go | 293 +++++++++++------- controllers/transaction_recovery_test.go | 31 +- controllers/wildflyserver_controller.go | 9 +- doc/apis.adoc | 2 + doc/user-guide.adoc | 21 +- pkg/util/wildfly_mgmt.go | 2 + 9 files changed, 273 insertions(+), 147 deletions(-) diff --git a/api/v1alpha1/wildflyserver_types.go b/api/v1alpha1/wildflyserver_types.go index 09546ade4..0a781b226 100644 --- a/api/v1alpha1/wildflyserver_types.go +++ b/api/v1alpha1/wildflyserver_types.go @@ -38,8 +38,10 @@ type WildFlyServerSpec struct { // SessionAffinity defines if connections from the same client ip are passed to the same WildFlyServer instance/pod each time (false if omitted) SessionAffinity bool `json:"sessionAffinity,omitempty"` // DisableHTTPRoute disables the creation a route to the HTTP port of the application service (false if omitted) - DisableHTTPRoute bool `json:"disableHTTPRoute,omitempty"` - StandaloneConfigMap *StandaloneConfigMapSpec `json:"standaloneConfigMap,omitempty"` + DisableHTTPRoute bool `json:"disableHTTPRoute,omitempty"` + // DeactivateTransactionRecovery disables the process of recovering transactions (false if omitted) + DeactivateTransactionRecovery bool `json:"deactivateTransactionRecovery,omitempty"` + StandaloneConfigMap *StandaloneConfigMapSpec `json:"standaloneConfigMap,omitempty"` // StorageSpec defines specific storage required for the server own data directory. If omitted, an EmptyDir is used (that will not // persist data across pod restart). Storage *StorageSpec `json:"storage,omitempty"` @@ -164,22 +166,20 @@ type WildFlyServerStatus struct { } const ( - // PodStateActive represents PodStatus.State when pod is active to serve requests - // it's connected in the Service load balancer + // PodStateActive represents an active pod that is connected to the load balancer Service + // and that can serve requests PodStateActive = "ACTIVE" - // PodStateScalingDownRecoveryInvestigation represents the PodStatus.State when pod is in state of scaling down - // and is to be verified if it's dirty and if recovery is needed - // as the pod is under recovery verification it can't be immediately removed - // and it needs to be wait until it's marked as clean to be removed + // PodStateScalingDownRecoveryInvestigation represents a pod that is under investigation + // to find out if there are transactions to be recovered. A pod in this state will be updated to one of + // the following states eventually PodStateScalingDownRecoveryInvestigation = "SCALING_DOWN_RECOVERY_INVESTIGATION" - // PodStateScalingDownRecoveryDirty represents the PodStatus.State when the pod was marked as recovery is needed - // because there are some in-doubt transactions. - // The app server was restarted with the recovery properties to speed-up recovery nad it's needed to wait - // until all ind-doubt transactions are processed. - PodStateScalingDownRecoveryDirty = "SCALING_DOWN_RECOVERY_DIRTY" - // PodStateScalingDownClean represents the PodStatus.State when pod is not active to serve requests - // it's in state of scaling down and it's clean - // 'clean' means it's ready to be removed from the kubernetes cluster + // PodStateScalingDownRecoveryProcessing represents a pod that has transactions to be completed. + // The Operator will wait until all transactions are processed + PodStateScalingDownRecoveryProcessing = "SCALING_DOWN_RECOVERY_PROCESSING" + // PodStateScalingDownRecoveryHeuristic represents a pod that has heuristic transactions. + // The Operator will wait until all heuristic transactions are manually solved + PodStateScalingDownRecoveryHeuristic = "SCALING_DOWN_RECOVERY_HEURISTICS" + // PodStateScalingDownClean represents a pod that is ready to be scaled down PodStateScalingDownClean = "SCALING_DOWN_CLEAN" ) @@ -189,8 +189,10 @@ type PodStatus struct { Name string `json:"name"` PodIP string `json:"podIP"` // Represent the state of the Pod, it is used especially during scale down. - // +kubebuilder:validation:Enum=ACTIVE;SCALING_DOWN_RECOVERY_INVESTIGATION;SCALING_DOWN_RECOVERY_DIRTY;SCALING_DOWN_CLEAN + // +kubebuilder:validation:Enum=ACTIVE;SCALING_DOWN_RECOVERY_INVESTIGATION;SCALING_DOWN_RECOVERY_PROCESSING;SCALING_DOWN_RECOVERY_HEURISTICS;SCALING_DOWN_CLEAN State string `json:"state"` + // Counts the recovery attempts when there are in-doubt/heuristic transactions + RecoveryCounter int32 `json:"recoveryCounter,omitempty"` } // WildFlyServer is the Schema for the wildflyservers API diff --git a/api/v1alpha1/zz_generated.openapi.go b/api/v1alpha1/zz_generated.openapi.go index 78167af96..f9e609d99 100644 --- a/api/v1alpha1/zz_generated.openapi.go +++ b/api/v1alpha1/zz_generated.openapi.go @@ -68,6 +68,13 @@ func schema__api_v1alpha1_PodStatus(ref common.ReferenceCallback) common.OpenAPI Format: "", }, }, + "recoveryCounter": { + SchemaProps: spec.SchemaProps{ + Description: "Counts the recovery attempts when there are in-doubt/heuristic transactions", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, Required: []string{"name", "podIP", "state"}, }, @@ -283,6 +290,13 @@ func schema__api_v1alpha1_WildFlyServerSpec(ref common.ReferenceCallback) common Format: "", }, }, + "deactivateTransactionRecovery": { + SchemaProps: spec.SchemaProps{ + Description: "DeactivateTransactionRecovery disables the process of recovering transactions (false if omitted)", + Type: []string{"boolean"}, + Format: "", + }, + }, "standaloneConfigMap": { SchemaProps: spec.SchemaProps{ Ref: ref("./api/v1alpha1.StandaloneConfigMapSpec"), diff --git a/config/crd/bases/wildfly.org_wildflyservers.yaml b/config/crd/bases/wildfly.org_wildflyservers.yaml index 1d356353d..3f8096630 100644 --- a/config/crd/bases/wildfly.org_wildflyservers.yaml +++ b/config/crd/bases/wildfly.org_wildflyservers.yaml @@ -63,6 +63,10 @@ spec: minItems: 1 type: array x-kubernetes-list-type: set + deactivateTransactionRecovery: + description: DeactivateTransactionRecovery disables the process of + recovering transactions (false if omitted) + type: boolean disableHTTPRoute: description: DisableHTTPRoute disables the creation a route to the HTTP port of the application service (false if omitted) @@ -1072,13 +1076,19 @@ spec: type: string podIP: type: string + recoveryCounter: + description: Counts the recovery attempts when there are in-doubt/heuristic + transactions + format: int32 + type: integer state: description: Represent the state of the Pod, it is used especially during scale down. enum: - ACTIVE - SCALING_DOWN_RECOVERY_INVESTIGATION - - SCALING_DOWN_RECOVERY_DIRTY + - SCALING_DOWN_RECOVERY_PROCESSING + - SCALING_DOWN_RECOVERY_HEURISTICS - SCALING_DOWN_CLEAN type: string required: diff --git a/controllers/transaction_recovery.go b/controllers/transaction_recovery.go index 9915a4f8e..2208b8799 100644 --- a/controllers/transaction_recovery.go +++ b/controllers/transaction_recovery.go @@ -31,136 +31,165 @@ const ( wftcDataDirName = "ejb-xa-recovery" // data directory where WFTC stores transaction runtime data ) -func (r *WildFlyServerReconciler) checkRecovery(reqLogger logr.Logger, scaleDownPod *corev1.Pod, w *wildflyv1alpha1.WildFlyServer) (bool, string, error) { +// Defines a new type to map the result of the checkRecovery routine +type checkResult int + +// Declaration of the possible results of checkRecovery +const ( + clean checkResult = iota + recovery + heuristic + genericError +) + +func (r *WildFlyServerReconciler) checkRecovery(reqLogger logr.Logger, scaleDownPod *corev1.Pod, w *wildflyv1alpha1.WildFlyServer) (checkResult, string, error) { + scaleDownPodName := scaleDownPod.ObjectMeta.Name scaleDownPodIP := scaleDownPod.Status.PodIP scaleDownPodRecoveryPort := defaultRecoveryPort - // Reading timestamp for the latest log record + // Reading timestamp of the latest log record scaleDownPodLogTimestampAtStart, err := wfly.RemoteOps.ObtainLogLatestTimestamp(scaleDownPod) if err != nil { - return false, "", fmt.Errorf("Log of the pod '%s' of WildflyServer '%v' is not ready during scaling down, "+ + return genericError, "", fmt.Errorf("The pod '%s' (of the WildflyServer '%v') is not ready to be scaled down, "+ "please verify its state. Error: %v", scaleDownPodName, w.ObjectMeta.Name, err) } - // If we are in state of recovery is needed the setup of the server has to be already done + // If transactions needs to be recovered, the setup of the server has to be carried out if scaleDownPod.Annotations[markerRecoveryPort] == "" { - reqLogger.Info("Verification the recovery listener is setup to run transaction recovery at " + scaleDownPodName) + reqLogger.Info("Verification that the transaction recovery listener of the pod is ready to recover transactions", "Pod name", scaleDownPodName) // Verify the recovery listener is setup jsonResult, err := wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnCheckRecoveryListener) if err != nil { - return false, "", fmt.Errorf("Cannot check if the transaction recovery listener is enabled for recovery at pod %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Cannot check if the transaction recovery listener of the pod %v is enabled, error: %v", scaleDownPodName, err) } if !wfly.IsMgmtOutcomeSuccesful(jsonResult) { - return false, "", fmt.Errorf("Failed to verify if transaction recovery listener is enabled at pod %v. Scaledown processing cannot trigger recovery. "+ + return genericError, "", fmt.Errorf("Failed to verify if the transaction recovery listener of the pod %v is enabled. Scaledown processing cannot trigger recovery. "+ "Management command: %v, JSON response: %v", scaleDownPodName, wfly.MgmtOpTxnCheckRecoveryListener, jsonResult) } - // When listener is not enabled then the pod will be terminated + // When the transaction recovery listener is not enabled then the pod will be terminated isTxRecoveryListenerEnabledAsInterface := wfly.ReadJSONDataByIndex(jsonResult, "result") isTxRecoveryListenerEnabledAsString, _ := wfly.ConvertToString(isTxRecoveryListenerEnabledAsInterface) if txrecoverydefined, err := strconv.ParseBool(isTxRecoveryListenerEnabledAsString); err == nil && !txrecoverydefined { - reqLogger.Info("Transaction recovery listener is not enabled. Transaction recovery cannot proceed at pod " + scaleDownPodName) + reqLogger.Info("The transaction recovery listener of the pod is not enabled", "Pod name", scaleDownPodName) r.Recorder.Event(w, corev1.EventTypeWarning, "WildFlyServerTransactionRecovery", - "Application server at pod "+scaleDownPodName+" does not define transaction recovery listener and recovery processing can't go forward."+ - " Please consider fixing server configuration. The pod is now going to be terminated.") - return true, "", nil + "The transaction recovery listener of the pod "+scaleDownPodName+" is not defined and the recovery process cannot started."+ + " If this is not intentional, fix the server configuration. The pod will be terminated.") + return clean, "", nil } // Reading recovery port from the app server with management port - reqLogger.Info("Query to find the transaction recovery port to force scan at pod " + scaleDownPodName) + reqLogger.Info("Query the app server to find out the transaction recovery port of the pod", "Pod name", scaleDownPodName) queriedScaleDownPodRecoveryPort, err := wfly.GetTransactionRecoveryPort(scaleDownPod) if err == nil && queriedScaleDownPodRecoveryPort != 0 { scaleDownPodRecoveryPort = queriedScaleDownPodRecoveryPort } if err != nil { - reqLogger.Error(err, "Error on reading transaction recovery port with management command. Using default port: "+scaleDownPodName, - "Pod name", scaleDownPodName) + reqLogger.Error(err, "Error reading the transaction recovery port", "Pod name", scaleDownPodName, "Default port", strconv.Itoa(int(scaleDownPodRecoveryPort))) } - // The pod was already searched for the recovery port, marking that into the annotations + // Save the recovery port into the annotations annotations := wfly.MapMerge( scaleDownPod.GetAnnotations(), map[string]string{markerRecoveryPort: strconv.FormatInt(int64(scaleDownPodRecoveryPort), 10)}) patch := client.MergeFrom(scaleDownPod.DeepCopy()) scaleDownPod.SetAnnotations(annotations) if err := resources.Patch(w, r.Client, scaleDownPod, patch); err != nil { - return false, "", fmt.Errorf("Failed to update pod annotations, pod name %v, annotations to be set %v, error: %v", + return genericError, "", fmt.Errorf("Failed to update the annotation of the pod %v, annotations to be set %v, error: %v", scaleDownPodName, scaleDownPod.Annotations, err) } } else { - // pod annotation already contains information on recovery port thus we just use it + // The annotations of the pod already contain information about the recovery port queriedScaleDownPodRecoveryPortString := scaleDownPod.Annotations[markerRecoveryPort] queriedScaleDownPodRecoveryPort, err := strconv.Atoi(queriedScaleDownPodRecoveryPortString) if err != nil { patch := client.MergeFrom(scaleDownPod.DeepCopy()) delete(scaleDownPod.Annotations, markerRecoveryPort) if errUpdate := resources.Patch(w, r.Client, scaleDownPod, patch); errUpdate != nil { - reqLogger.Info("Cannot update scaledown pod while resetting the recovery port annotation", - "Scale down Pod", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) + reqLogger.Info("Cannot update the pod while resetting the recovery port annotation", + "Pod name", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) } - return false, "", fmt.Errorf("Cannot convert recovery port value '%s' to integer for the scaling down pod %v, error: %v", + return genericError, "", fmt.Errorf("Cannot convert recovery port value '%s' to integer for the pod %v, error: %v", queriedScaleDownPodRecoveryPortString, scaleDownPodName, err) } scaleDownPodRecoveryPort = int32(queriedScaleDownPodRecoveryPort) } - // With enabled recovery listener and the port, let's start the recovery scan - reqLogger.Info("Executing recovery scan at "+scaleDownPodName, "Pod IP", scaleDownPodIP, "Recovery port", scaleDownPodRecoveryPort) + // Transaction recovery listener is enabled and the recovery port has been retrieved, the recovery scan can be started + reqLogger.Info("Executing the recovery scan for the pod", "Pod name", scaleDownPodName, "Pod IP", scaleDownPodIP, "Recovery port", scaleDownPodRecoveryPort) _, err = wfly.RemoteOps.SocketConnect(scaleDownPodIP, scaleDownPodRecoveryPort, txnRecoveryScanCommand) if err != nil { patch := client.MergeFrom(scaleDownPod.DeepCopy()) delete(scaleDownPod.Annotations, markerRecoveryPort) if errUpdate := r.Client.Patch(context.TODO(), scaleDownPod, patch); errUpdate != nil { - reqLogger.Info("Cannot update scaledown pod while resetting the recovery port annotation", - "Scale down Pod", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) + reqLogger.Info("Cannot update the pod while resetting the recovery port annotation", + "Pod name", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) } - return false, "", fmt.Errorf("Failed to run transaction recovery scan for scaling down pod %v. "+ - "Please, verify the pod log file. Error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Failed to run the transaction recovery scan for the pod %v. "+ + "Please, verify the pod logs. Error: %v", scaleDownPodName, err) } // No error on recovery scan => all the registered resources were available during the recovery processing foundLogLine, err := wfly.RemoteOps.VerifyLogContainsRegexp(scaleDownPod, scaleDownPodLogTimestampAtStart, recoveryErrorRegExp) if err != nil { - return false, "", fmt.Errorf("Cannot parse log from scaling down pod %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Cannot parse logs from the pod %v, error: %v", scaleDownPodName, err) } if foundLogLine != "" { - retString := fmt.Sprintf("Scale down transaction recovery processing contains errors in log. The recovery will be retried."+ - "Pod name: %v, log line with error '%v'", scaleDownPod, foundLogLine) - return false, retString, nil + retString := fmt.Sprintf("The transaction recovery process contains errors in the logs. The recovery will be retried."+ + "Pod: %v, log line with error '%v'", scaleDownPod, foundLogLine) + return genericError, retString, nil } - // Probing transaction log to verify there is not in-doubt transaction in the log + // Probing transaction logs to verify there is not any in-doubt transaction in the log _, err = wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnProbe) if err != nil { - return false, "", fmt.Errorf("Error in probing transaction log for scaling down pod %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Error while probing transaction logs of the pod %v, error: %v", scaleDownPodName, err) } - // Transaction log was probed, now we read the set of transactions which are in-doubt + // Transaction logs were probed, now we read the set of in-doubt transactions jsonResult, err := wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnRead) if err != nil { - return false, "", fmt.Errorf("Cannot read transactions from the transaction log for pod scaling down %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Cannot read transactions from the log store of the pod %v, error: %v", scaleDownPodName, err) } if !wfly.IsMgmtOutcomeSuccesful(jsonResult) { - return false, "", fmt.Errorf("Cannot get list of the in-doubt transactions at pod %v for transaction scaledown", scaleDownPodName) + return genericError, "", fmt.Errorf("Cannot get the list of in-doubt transactions of the pod %v", scaleDownPodName) } // Is the number of in-doubt transactions equal to zero? transactions := jsonResult["result"] txnMap, isMap := transactions.(map[string]interface{}) // typing the variable to be a map of interfaces if isMap && len(txnMap) > 0 { - retString := fmt.Sprintf("Recovery scan to be invoked as the transaction log storage is not empty for pod scaling down pod %v, "+ + + // Check for HEURISTIC transactions + jsonResult, err := wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnReadHeuristic) + if err != nil { + return genericError, "", fmt.Errorf("Cannot read HEURISTIC transactions from the log store of the pod %v, error: %v", scaleDownPodName, err) + } + if !wfly.IsMgmtOutcomeSuccesful(jsonResult) { + return genericError, "", fmt.Errorf("Cannot read HEURISTIC transactions from the log store of the pod %v", scaleDownPodName) + } + // Is the number of HEURISTIC transactions equal to zero? + transactions := jsonResult["result"] + heuristicTxnArray, isArray := transactions.([]interface{}) // typing the variable to be an array of interfaces + if isArray && len(heuristicTxnArray) > 0 { + retString := fmt.Sprintf("There are HEURISTIC transactions in the log store of the pod %v. Please, resolve them manually, "+ + "transaction list: %v", scaleDownPodName, heuristicTxnArray) + return heuristic, retString, nil + } + + retString := fmt.Sprintf("A recovery scan is needed as the log store of the pod %v is not empty, "+ "transaction list: %v", scaleDownPodName, txnMap) - return false, retString, nil + return recovery, retString, nil } // Verification of the unfinished data of the WildFly transaction client (verification of the directory content) lsCommand := fmt.Sprintf(`ls ${JBOSS_HOME}/%s/%s/ 2> /dev/null || true`, resources.StandaloneServerDataDirRelativePath, wftcDataDirName) commandResult, err := wfly.RemoteOps.Execute(scaleDownPod, lsCommand) if err != nil { - return false, "", fmt.Errorf("Cannot query filesystem at scaling down pod %v to check existing remote transactions. "+ + return genericError, "", fmt.Errorf("Cannot query the filesystem of the pod %v to check existing remote transactions. "+ "Exec command: %v", scaleDownPodName, lsCommand) } if commandResult != "" { - retString := fmt.Sprintf("WildFly Transaction Client data dir is not empty and scaling down of the pod '%v' will be retried."+ - "Wildfly Transacton Client data dir path '${JBOSS_HOME}/%v/%v', output listing: %v", + retString := fmt.Sprintf("WildFly's data directory is not empty and the scaling down of the pod '%v' will be retried."+ + "Wildfly's data directory path is: '${JBOSS_HOME}/%v/%v', output listing: %v", scaleDownPodName, resources.StandaloneServerDataDirRelativePath, wftcDataDirName, commandResult) - return false, retString, nil + return recovery, retString, nil } - return true, "", nil + return clean, "", nil } func (r *WildFlyServerReconciler) setupRecoveryPropertiesAndRestart(reqLogger logr.Logger, scaleDownPod *corev1.Pod, w *wildflyv1alpha1.WildFlyServer) (mustReconcile int, err error) { @@ -238,28 +267,44 @@ func (r *WildFlyServerReconciler) updatePodLabel(w *wildflyv1alpha1.WildFlyServe return updated, nil } -// processTransactionRecoveryScaleDown runs transaction recovery on provided number of pods -// -// mustReconcile returns int constant; 'requeueNow' if the reconcile requeue loop should be called as soon as possible, -// 'requeueLater' if requeue loop is needed but it could be delayed, 'requeueOff' if requeue loop is not necessary -// err reports error which occurs during method processing +/* + * processTransactionRecoveryScaleDown runs transaction recovery on provided number of pods + * Returns the int contant mustReconcile: + * - 'requeueNow' if the reconcile requeue loop should be called as soon as possible + * - 'requeueLater' if requeue loop is needed but it could be delayed + * - 'requeueOff' if requeue loop is not necessary + * err reports error that occured during the transaction recovery + */ func (r *WildFlyServerReconciler) processTransactionRecoveryScaleDown(reqLogger logr.Logger, w *wildflyv1alpha1.WildFlyServer, numberOfPodsToScaleDown int, podList *corev1.PodList) (mustReconcile int, err error) { + // podList comes from the wildflyserver_controller.Reconcile method, + // where the list of pods specific to a WildFlyServer CR is created + + // Current number of pods wildflyServerNumberOfPods := len(podList.Items) - scaleDownPodsStates := sync.Map{} // map referring to: pod name - pod state - scaleDownErrors := sync.Map{} // errors occurred during processing the scaledown for the pods + // sync.map to store errors occurred during processing the scaledown of pods + errorsSyncMap := sync.Map{} + // Return value for the reconcile cycle mustReconcile = requeueOff if wildflyServerNumberOfPods == 0 || numberOfPodsToScaleDown == 0 { // no active pods to scale down or no pods are requested to scale down return requeueOff, nil } + // In case the WildFlyServer custom resource has not been updated yet, wait the next reconcile cycle + if len(w.Status.Pods) != wildflyServerNumberOfPods { + return requeueLater, nil + } if w.Spec.BootableJar { reqLogger.Info("Transaction scale down recovery is unsupported for Bootable JAR pods. The pods will be removed without checking pending transactions.", "Number of pods to be scaled down", numberOfPodsToScaleDown) return r.skipRecoveryAndForceScaleDown(w, wildflyServerNumberOfPods, numberOfPodsToScaleDown, podList) } + if w.Spec.DeactivateTransactionRecovery { + reqLogger.Info("The 'DeactivateTransactionRecovery' flag is set to true thus the process to recovery transactions will be skipped.") + return r.skipRecoveryAndForceScaleDown(w, wildflyServerNumberOfPods, numberOfPodsToScaleDown, podList) + } subsystemsList, err := wfly.ListSubsystems(&podList.Items[0]) if err != nil { reqLogger.Info("Cannot get list of subsystems available in application server", "Pod", podList.Items[0].ObjectMeta.Name) @@ -277,119 +322,129 @@ func (r *WildFlyServerReconciler) processTransactionRecoveryScaleDown(reqLogger return requeueLater, err } if !isJDBCObjectStore { - reqLogger.Info("Transaction scale down recovery will be skipped as it's unsupported when WildFlyServer Storage uses 'EmptyDir' "+ - "and transaction subsystem does not use the JDBC object store. The recovery processing is unsafe. "+ - "Please configure WildFlyServer.Spec.Storage with a Persistent Volume Claim or use database to store transaction log.", + reqLogger.Info("Transaction scale down recovery will be skipped as it is unsupported when WildFlyServer's Storage is 'EmptyDir' "+ + "and the transaction subsystem does not use the JDBC object store. As a consequence, the transaction recovery processing is unsafe. "+ + "Please, configure WildFlyServer.Spec.Storage with a Persistent Volume Claim or use a database to store transaction logs.", "Number of pods to be scaled down", numberOfPodsToScaleDown) return r.skipRecoveryAndForceScaleDown(w, wildflyServerNumberOfPods, numberOfPodsToScaleDown, podList) } } - // Setting-up the pod status - status is used to decide if the pod could be scaled (aka. removed from the statefulset) - updated := abool.New() - for scaleDownIndex := 1; scaleDownIndex <= numberOfPodsToScaleDown; scaleDownIndex++ { - scaleDownPodName := podList.Items[wildflyServerNumberOfPods-scaleDownIndex].ObjectMeta.Name - wildflyServerSpecPodStatus := getWildflyServerPodStatusByName(w, scaleDownPodName) - if wildflyServerSpecPodStatus == nil { - continue + // Flag to signal that there are pods in need of updating + podStatusNeedsUpdating := abool.New() + // Select pods to be scaled down (starting from the tail of the list) + var podsToScaleDown []corev1.Pod + for index := wildflyServerNumberOfPods - 1; index >= wildflyServerNumberOfPods-numberOfPodsToScaleDown; index-- { + // Only running pods can be considered + if podList.Items[index].Status.Phase == "Running" { + podsToScaleDown = append(podsToScaleDown, podList.Items[index]) } + } + if len(podsToScaleDown) == 0 { + reqLogger.Info("There are not 'Running' pods to scale down") + return requeueLater, nil + } + // PodStatus.State is set to PodStateScalingDownRecoveryInvestigation (SCALING_DOWN_RECOVERY_INVESTIGATION) + // to start the scale down process + for _, corePod := range podsToScaleDown { + // wildflyServerSpecPodStatus points to wildflyv1alpha1.WildFlyServer.Status.Pods[index]; + wildflyServerSpecPodStatus := getWildflyServerPodStatusByName(w, corePod.ObjectMeta.Name) if wildflyServerSpecPodStatus.State == wildflyv1alpha1.PodStateActive { wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownRecoveryInvestigation - scaleDownPodsStates.Store(scaleDownPodName, wildflyv1alpha1.PodStateScalingDownRecoveryInvestigation) - updated.Set() - } else { - scaleDownPodsStates.Store(scaleDownPodName, wildflyServerSpecPodStatus.State) + podStatusNeedsUpdating.Set() } } - if updated.IsSet() { // updating status of pods as soon as possible + // If there are pods in PodStateScalingDownRecoveryInvestigation state, an update cycle must be run + if podStatusNeedsUpdating.IsSet() { w.Status.ScalingdownPods = int32(numberOfPodsToScaleDown) err := resources.UpdateWildFlyServerStatus(w, r.Client) if err != nil { - return requeueNow, fmt.Errorf("There was trouble to update state of WildflyServer: %v, error: %v", w.Status.Pods, err) + return requeueNow, fmt.Errorf("Failed to update the state of the WildflyServer resource: %v, error: %v", w.Status.Pods, err) } } - updated.UnSet() + // Reset the flag to signal the need to run an update cycle + podStatusNeedsUpdating.UnSet() var wg sync.WaitGroup - for scaleDownIndex := 1; scaleDownIndex <= numberOfPodsToScaleDown; scaleDownIndex++ { - scaleDownPod := podList.Items[wildflyServerNumberOfPods-scaleDownIndex] + for _, corePod := range podsToScaleDown { + // As the variables `wildflyServerSpecPodStatus` and `corePodTemp` are created for each iteration, + // only the goroutine defined in the same iteration can access them. In this way, the situation where + //multiple goroutines access the same variable is avoided. + wildflyServerSpecPodStatus := getWildflyServerPodStatusByName(w, corePod.ObjectMeta.Name) + corePodTemp := corePod wg.Add(1) go func() { defer wg.Done() - // Scaledown scenario, need to handle transaction recovery - scaleDownPodName := scaleDownPod.ObjectMeta.Name - scaleDownPodIP := scaleDownPod.Status.PodIP - if strings.Contains(scaleDownPodIP, ":") && !strings.HasPrefix(scaleDownPodIP, "[") { - scaleDownPodIP = "[" + scaleDownPodIP + "]" // for IPv6 + podIP := wildflyServerSpecPodStatus.PodIP + if strings.Contains(podIP, ":") && !strings.HasPrefix(podIP, "[") { + podIP = "[" + podIP + "]" // for IPv6 } - podState, ok := scaleDownPodsStates.Load(scaleDownPodName) - if !ok { - scaleDownErrors.Store(scaleDownPodName+"_status-update", - fmt.Errorf("Cannot find pod name '%v' in the list of the active pods for the WildflyServer operator: %v", - scaleDownPodName, w.ObjectMeta.Name)) - _, podsStatus := getPodStatus(podList.Items, w.Status.Pods) - reqLogger.Info("Updating pod status", "Pod Status", podsStatus) - w.Status.Pods = podsStatus - updated.Set() - return - } - - if podState != wildflyv1alpha1.PodStateScalingDownClean { - reqLogger.Info("Transaction recovery scaledown processing", "Pod Name", scaleDownPodName, - "IP Address", scaleDownPodIP, "Pod State", podState, "Pod Phase", scaleDownPod.Status.Phase) + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownClean { + reqLogger.Info("Transaction recovery scaledown processing", "Pod Name", corePodTemp.ObjectMeta.Name, + "IP Address", podIP, "Pod State", wildflyServerSpecPodStatus.State, "Pod's Recovery Counter", wildflyServerSpecPodStatus.RecoveryCounter) - // For full and correct recovery we need to first, run two recovery checks and second, having the orphan detection interval set to minimum - needsReconcile, setupErr := r.setupRecoveryPropertiesAndRestart(reqLogger, &scaleDownPod, w) + // For full and correct recovery we need to run two recovery checks with orphan detection interval set to minimum + needsReconcile, setupErr := r.setupRecoveryPropertiesAndRestart(reqLogger, &corePodTemp, w) if needsReconcile > mustReconcile { mustReconcile = needsReconcile } if setupErr != nil { - scaleDownErrors.Store(scaleDownPodName, setupErr) + errorsSyncMap.Store(corePodTemp.ObjectMeta.Name, setupErr) return } //The futher recovery attempts won't succeed until reconcilation loop is repeated if needsReconcile != requeueOff { return } - // Running recovery twice for orphan detection will be kicked-in - _, _, recoveryErr := r.checkRecovery(reqLogger, &scaleDownPod, w) - if recoveryErr != nil { - scaleDownErrors.Store(scaleDownPodName, recoveryErr) - return - } - success, message, recoveryErr := r.checkRecovery(reqLogger, &scaleDownPod, w) - if recoveryErr != nil { - scaleDownErrors.Store(scaleDownPodName, recoveryErr) - return + + // Running the recovery check twice to discover in-doubt transactions + var ( + outcome checkResult + message string + recoveryErr error + ) + for count := 0; count < 2; count++ { + outcome, message, recoveryErr = r.checkRecovery(reqLogger, &corePodTemp, w) + // This if handles the outcome genericError + if recoveryErr != nil { + errorsSyncMap.Store(corePodTemp.ObjectMeta.Name, recoveryErr) + return + } } - if success { + if outcome == clean { // Recovery was processed with success, the pod is clean to go - scaleDownPodsStates.Store(scaleDownPodName, wildflyv1alpha1.PodStateScalingDownClean) - } else if message != "" { - // Some in-doubt transaction left in store, the pod is still dirty - reqLogger.Info("In-doubt transactions in object store", "Pod Name", scaleDownPodName, "Message", message) - scaleDownPodsStates.Store(scaleDownPodName, wildflyv1alpha1.PodStateScalingDownRecoveryDirty) + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownClean { + wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownClean + podStatusNeedsUpdating.Set() + } + } else if outcome == recovery { + reqLogger.Info("The pod is trying to recover unfinished transactions", "Pod name", corePodTemp.ObjectMeta.Name, "Message", message) + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownRecoveryProcessing { + wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownRecoveryProcessing + } + wildflyServerSpecPodStatus.RecoveryCounter++ + reqLogger.Info("The recovery counter of the pod is: "+strconv.Itoa(int(wildflyServerSpecPodStatus.RecoveryCounter)), "Pod name", corePodTemp.ObjectMeta.Name) + // As the RecoveryCounter increases at every recovery attempt, podStatusNeedsUpdating must be set to trigger the update of the status + podStatusNeedsUpdating.Set() + } else if outcome == heuristic { + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownRecoveryHeuristic { + wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownRecoveryHeuristic + podStatusNeedsUpdating.Set() + } + reqLogger.Info("The pod has heuristic transactions", "Pod name", corePodTemp.ObjectMeta.Name, "Message", message) + r.Recorder.Event(w, corev1.EventTypeWarning, "Transaction Recovery", "There are HEURISTIC transactions in "+corePodTemp.ObjectMeta.Name+"! Please, resolve them manually!") } } }() // execution of the go routine for one pod } wg.Wait() - // Updating the pod state based on the recovery processing when a scale down is in progress - for wildflyServerPodStatusIndex, v := range w.Status.Pods { - if podStateValue, exist := scaleDownPodsStates.Load(v.Name); exist { - if w.Status.Pods[wildflyServerPodStatusIndex].State != podStateValue.(string) { - updated.Set() - } - w.Status.Pods[wildflyServerPodStatusIndex].State = podStateValue.(string) - } - } // Verification if an error happened during the recovery processing var errStrings string numberOfScaleDownErrors := 0 var resultError error - scaleDownErrors.Range(func(k, v interface{}) bool { + errorsSyncMap.Range(func(k, v interface{}) bool { numberOfScaleDownErrors++ errStrings += " [[" + v.(error).Error() + "]]," return true @@ -400,7 +455,7 @@ func (r *WildFlyServerReconciler) processTransactionRecoveryScaleDown(reqLogger "Errors during transaction recovery scaledown processing. Consult operator log.") } - if updated.IsSet() { // recovery changed the state of the pods + if podStatusNeedsUpdating.IsSet() { // recovery changed the state of the pods w.Status.ScalingdownPods = int32(numberOfPodsToScaleDown) err := resources.UpdateWildFlyServerStatus(w, r.Client) if err != nil { diff --git a/controllers/transaction_recovery_test.go b/controllers/transaction_recovery_test.go index d2c91d806..a623dad1a 100644 --- a/controllers/transaction_recovery_test.go +++ b/controllers/transaction_recovery_test.go @@ -35,7 +35,7 @@ const ( ) var ( - // variables to be setup and re-used in the method over the file + // Variables to be used in the testing methods assert *testifyAssert.Assertions cl client.Client r *WildFlyServerReconciler @@ -347,6 +347,35 @@ func TestSkipRecoveryScaleDownWhenEmptyDirStorage(t *testing.T) { assert.Empty(remoteOpsMock.ExecuteMockReturn) } +func TestSkipRecoveryScaleDownWhenDeactivateTransactionRecoveryIsTrue(t *testing.T) { + wildflyServer := defaultWildflyServerDefinition.DeepCopy() + // Deactivate the Transaction Recovery feature + wildflyServer.Spec.DeactivateTransactionRecovery = true + setupBeforeScaleDown(t, wildflyServer, 1) + + log := ctrl.Log.WithName("TestSkipRecoveryScaleDownWhenDeactivateTransactionRecoveryIsTrue") + + log.Info("WildFly server was reconciled, let's scale it down.", "WildflyServer", wildflyServer) + wildflyServer.Spec.Replicas = 0 + err := cl.Update(context.TODO(), wildflyServer) + + // Reconcile for the scale down - updating the pod labels + _, err = r.Reconcile(context.TODO(), req) + require.NoError(t, err) + + // Reconcile to start the scale down procesing - recovery skipped + _, err = r.Reconcile(context.TODO(), req) + require.NoError(t, err) + // StatefulSet needs to be updated + statefulSet := &appsv1.StatefulSet{} + err = cl.Get(context.TODO(), req.NamespacedName, statefulSet) + assert.Equal(int32(0), *statefulSet.Spec.Replicas) + // WildFlyServer status needs to be updated in sclaed down manner + err = cl.Get(context.TODO(), req.NamespacedName, wildflyServer) + require.NoError(t, err) + assert.Equal(wildflyv1alpha1.PodStateScalingDownClean, wildflyServer.Status.Pods[0].State) +} + // -- // -- Mocking the remote calls and Kubernetes API --- // -- diff --git a/controllers/wildflyserver_controller.go b/controllers/wildflyserver_controller.go index 1dfa09cef..1f84537b8 100644 --- a/controllers/wildflyserver_controller.go +++ b/controllers/wildflyserver_controller.go @@ -503,13 +503,16 @@ func getPodStatus(pods []corev1.Pod, originalPodStatuses []wildflyv1alpha1.PodSt } for _, pod := range pods { podState := wildflyv1alpha1.PodStateActive + recoveryCounter := int32(0) if value, exists := podStatusesOriginalMap[pod.Name]; exists { podState = value.State + recoveryCounter = value.RecoveryCounter } podStatuses = append(podStatuses, wildflyv1alpha1.PodStatus{ - Name: pod.Name, - PodIP: pod.Status.PodIP, - State: podState, + Name: pod.Name, + PodIP: pod.Status.PodIP, + State: podState, + RecoveryCounter: recoveryCounter, }) if pod.Status.PodIP == "" { requeue = true diff --git a/doc/apis.adoc b/doc/apis.adoc index 1b3e2e8e5..857a72c32 100644 --- a/doc/apis.adoc +++ b/doc/apis.adoc @@ -43,6 +43,7 @@ It uses a `StatefulSet` with a pod spec that mounts the volume specified by `sto | `bootableJar` | BootableJar specifies whether the application image is using WildFly S2I Builder/Runtime images or Bootable Jar. If omitted, it defaults to false (application image is expected to use WildFly S2I Builder/Runtime images) | bool | false | `configMaps` | List of ConfigMap names to mount as volumes in the containers. Each config map is mounted as a read-only volume under `/etc/configmaps/` | string[] | false +| `deactivateTransactionRecovery`| DeactivateTransactionRecovery disables the process of recoverying transactions (false if omitted) | bool | false | `disableHTTPRoute` | Disable the creation a route to the HTTP port of the application service (false if omitted) | bool | false | `env` | List of environment variable present in the containers | []https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#envvar-v1-core[corev1.EnvVar] | false | `envFrom` | List of environment variable present in the containers from source (either `ConfigMap` or `Secret`) | []https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#envfromsource-v1-core[corev1.EnvFromSource] |false @@ -145,5 +146,6 @@ transaction, make sure to specify a `volumeClaimTemplate` that so that the same | Field | Description |Scheme| Required | `name` | Name of the Pod | string | true | `podIP` | IP address allocated to the pod | string | true +| `recoveryCounter`| Counts the recovery attempts when there are in-doubt transactions | `state` | State of the pod from perspective of scale down process. By default it's active which means it serves requests. | string | false |======================= diff --git a/doc/user-guide.adoc b/doc/user-guide.adoc index 6bf55e418..9885c07d5 100644 --- a/doc/user-guide.adoc +++ b/doc/user-guide.adoc @@ -344,17 +344,20 @@ The `WildFlyServer.Status.Pods[].State` can be one of the following values: | The pod is active and processing requests. | SCALING_DOWN_RECOVERY_INVESTIGATION -| The pod is about to be scaled down. The scale-down process is under investigation about the state of transactions in WildFly. +| The pod is under investigation to find out if there are transactions that did not complete their lifecycle successfully. -| SCALING_DOWN_RECOVERY_DIRTY -| The WildFly contains some unfinished transactions. The pod cannot be terminated until they are cleaned. - The transaction recovery is periodically run at WildFly and it waits the transactions are finished eventually. +| SCALING_DOWN_RECOVERY_PROCESSING +| There are in-doubt transactions in the log store. The pod cannot be terminated until these transactions are either completed or cleaned. + +| SCALING_DOWN_RECOVERY_HEURISTICS +| There are heuristic transactions in the log store. The pod cannot be terminated until these transactions are either completed or cleaned. | SCALING_DOWN_CLEAN | The pod was processed by transaction scaled down processing and is marked as clean to be removed from the cluster. |=== +For pods in the `SCALING_DOWN_RECOVERY_PROCESSING` status, each recovery cycle is tracked using the `RecoveryCounter` (accessible via `WildFlyServer.Status.Pods[].RecoveryCounter`). This field monitors the operator's recovery attempts until all in-doubt transactions are successfully completed. You can observe the overall state of the active and no-active pods by looking at the `WildFlyServer.Status.'Scalingdown Pods'` and `WildFlyServer.Status.Replicas` fields. @@ -368,6 +371,12 @@ If there are no pods in scaledown process the numbers of `WildFlyServer.Status.R This feature is not supported by a Bootable JAR application image. The transaction recovery facility will be ignored for Bootable JAR application images. ==== +[WARNING] +==== +It's feasible to disable transaction recovery during scale-down by configuring the property `WildFlyServerSpec.DeactivateTransactionRecovery` to `true` (by default, it's set to `false`). When `DeactivateTransactionRecovery` is enabled, in-doubt and heuristic transactions won't be finalized or reported. +Consequently, this could lead to potential data inconsistency or loss when distributed transactions are employed. +==== + === Transaction scaledown special cases ==== Heuristics transactions @@ -375,7 +384,7 @@ This feature is not supported by a Bootable JAR application image. The transacti As it's well-known the transaction may finish either with commit or roll-back. Unfortunately there is a third outcome which is _unknown_. It’s a state when there is no way of automatic transaction recovery and human intervention is needed. -If the transaction is in state of heuristics the pod is marked as `SCALING_DOWN_RECOVERY_DIRTY` +If the transaction is in state of heuristics the pod is marked as `SCALING_DOWN_RECOVERY_HEURISTICS` and the administrator needs to manually connect with the `jboss-cli` to the particular WildFly instance and to resolve the heuristic transaction. @@ -388,7 +397,7 @@ There is a special case coming from the design of the `StatefulSet` that ensures (it does not change on the pod restart). The `StatefulSet` depends on ordering of the pods. The pod are named by the defined order. The `StatefulSet` then requires the pod-0 not being terminated before the pod-1. First pod-1 is terminated and then pod-0. -From that rule we can observe that if the pod-1 is in state `SCALING_DOWN_RECOVERY_DIRTY` (contains some unfinished, e.g. heuristic transactions) +From that rule we can observe that if the pod-1 is in state `SCALING_DOWN_RECOVERY_HEURISTICS` (e.g. contains some heuristic transactions) then if pod-0 is in the state of `SCALING_DOWN_CLEAN` in will be lingering at that state until the pod-1 is terminated. But even the pod is in state `SCALING_DOWN_CLEAN` the pod is not receiving any new requests diff --git a/pkg/util/wildfly_mgmt.go b/pkg/util/wildfly_mgmt.go index 00fdcb014..868954d81 100644 --- a/pkg/util/wildfly_mgmt.go +++ b/pkg/util/wildfly_mgmt.go @@ -30,6 +30,8 @@ var ( MgmtOpTxnProbe = "/subsystem=transactions/log-store=log-store:probe()" // MgmtOpTxnRead is a JBoss CLI command for reading transaction log store MgmtOpTxnRead = "/subsystem=transactions/log-store=log-store:read-children-resources(child-type=transactions,recursive=true,include-runtime=true)" + // MgmtOpTxnReadHeuristic is a JBoss CLI command for scanning the log store in search of transactions in HEURISTIC status + MgmtOpTxnReadHeuristic = "/subsystem=transactions/log-store=log-store/transactions=*/participants=*:query(where={\"status\"=\"HEURISTIC\"}" // MgmtOpTxnRecoverySocketBindingRead is a JBoss CLI command for reading name of recovery socket binding MgmtOpTxnRecoverySocketBindingRead = "/subsystem=transactions:read-attribute(name=socket-binding)" // MgmtOpSocketBindingRead is a JBoss CLI command for reading all data on the socket binding group