From 55b44306e4f599a83365282e494b74a23c459674 Mon Sep 17 00:00:00 2001 From: Rostislav Porohnya Date: Thu, 21 Dec 2023 15:44:44 +0200 Subject: [PATCH] Implemented a node replace flow --- .../clusters_v1beta1_cassandra.yaml | 10 +- controllers/clusters/cadence_controller.go | 6 +- controllers/clusters/cassandra_controller.go | 110 +++- controllers/clusters/kafka_controller.go | 6 +- .../clusters/kafkaconnect_controller.go | 6 +- controllers/clusters/on_premises.go | 614 +++++++++++------- controllers/clusters/postgresql_controller.go | 6 +- controllers/clusters/redis_controller.go | 6 +- pkg/models/errors.go | 1 + pkg/models/on_premises.go | 12 +- 10 files changed, 496 insertions(+), 281 deletions(-) diff --git a/config/samples/onpremises/clusters_v1beta1_cassandra.yaml b/config/samples/onpremises/clusters_v1beta1_cassandra.yaml index 8b7968872..adf7b7729 100644 --- a/config/samples/onpremises/clusters_v1beta1_cassandra.yaml +++ b/config/samples/onpremises/clusters_v1beta1_cassandra.yaml @@ -1,12 +1,15 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Cassandra metadata: - name: cassandra-on-prem-cluster + name: cassandra-on-prem-cluster-test spec: - name: "danylo-on-prem-cassandra" + name: "test-on-prem-cassandra" version: "4.0.10" privateNetworkCluster: false onPremisesSpec: + cloudInitScriptRef: + name: instaclustr-cloud-init-secret + namespace: default storageClassName: managed-csi-premium osDiskSize: 20Gi dataDiskSize: 200Gi @@ -15,9 +18,6 @@ spec: nodeCPU: 2 nodeMemory: 8192Mi osImageURL: "https://s3.amazonaws.com/debian-bucket/debian-11-generic-amd64-20230601-1398.raw" - cloudInitScriptRef: - namespace: default - name: instaclustr-cloud-init-secret dataCentres: - name: "onPremCassandra" region: "CLIENT_DC" diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index ce4b61f91..8a7c5e127 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -302,7 +302,7 @@ func (r *CadenceReconciler) handleCreateCluster( return reconcile.Result{}, err } - err = r.startClusterOnPremisesIPsJob(c, bootstrap) + err = r.startClusterOnPremisesIPsJob(ctx, c, bootstrap) if err != nil { l.Error(err, "Cannot start on-premises cluster IPs check job", "cluster ID", c.Status.ID, @@ -882,8 +882,8 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra }, nil } -func (r *CadenceReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cadence, b *onPremisesBootstrap) error { - job := newWatchOnPremisesIPsJob(c.Kind, b) +func (r *CadenceReconciler) startClusterOnPremisesIPsJob(ctx context.Context, c *v1beta1.Cadence, b *onPremisesBootstrap) error { + job := newWatchOnPremisesIPsJob(ctx, c.Kind, b) err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) if err != nil { diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index a1230b7b7..6e987b1e3 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -227,7 +227,7 @@ func (r *CassandraReconciler) handleCreateCluster( } if c.Status.State != models.DeletedStatus { - err = r.startClusterStatusJob(c) + err = r.startClusterStatusJob(ctx, c) if err != nil { l.Error(err, "Cannot start cluster status job", "c cluster ID", c.Status.ID) @@ -300,7 +300,7 @@ func (r *CassandraReconciler) handleCreateCluster( return reconcile.Result{}, err } - err = r.startClusterOnPremisesIPsJob(c, bootstrap) + err = r.startClusterOnPremisesIPsJob(ctx, c, bootstrap) if err != nil { l.Error(err, "Cannot start on-premises cluster IPs check job", "cluster ID", c.Status.ID, @@ -323,7 +323,7 @@ func (r *CassandraReconciler) handleCreateCluster( return models.ExitReconcile, nil } - err = r.startClusterBackupsJob(c) + err = r.startClusterBackupsJob(ctx, c) if err != nil { l.Error(err, "Cannot start cluster backups check job", "cluster ID", c.Status.ID, @@ -343,7 +343,7 @@ func (r *CassandraReconciler) handleCreateCluster( ) if c.Spec.UserRefs != nil && c.Status.AvailableUsers == nil { - err = r.startUsersCreationJob(c) + err = r.startUsersCreationJob(ctx, c) if err != nil { l.Error(err, "Failed to start user creation job") r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, @@ -717,8 +717,8 @@ func (r *CassandraReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *CassandraReconciler) startClusterStatusJob(c *v1beta1.Cassandra) error { - job := r.newWatchStatusJob(c) +func (r *CassandraReconciler) startClusterStatusJob(ctx context.Context, c *v1beta1.Cassandra) error { + job := r.newWatchStatusJob(ctx, c) err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) if err != nil { @@ -728,8 +728,8 @@ func (r *CassandraReconciler) startClusterStatusJob(c *v1beta1.Cassandra) error return nil } -func (r *CassandraReconciler) startClusterBackupsJob(cluster *v1beta1.Cassandra) error { - job := r.newWatchBackupsJob(cluster) +func (r *CassandraReconciler) startClusterBackupsJob(ctx context.Context, cluster *v1beta1.Cassandra) error { + job := r.newWatchBackupsJob(ctx, cluster) err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.BackupsChecker), scheduler.ClusterBackupsInterval, job) if err != nil { @@ -739,8 +739,8 @@ func (r *CassandraReconciler) startClusterBackupsJob(cluster *v1beta1.Cassandra) return nil } -func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra) error { - job := r.newUsersCreationJob(cluster) +func (r *CassandraReconciler) startUsersCreationJob(ctx context.Context, cluster *v1beta1.Cassandra) error { + job := r.newUsersCreationJob(ctx, cluster) err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.UserCreator), scheduler.UserCreationInterval, job) if err != nil { @@ -750,8 +750,8 @@ func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra) return nil } -func (r *CassandraReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cassandra, b *onPremisesBootstrap) error { - job := newWatchOnPremisesIPsJob(c.Kind, b) +func (r *CassandraReconciler) startClusterOnPremisesIPsJob(ctx context.Context, c *v1beta1.Cassandra, b *onPremisesBootstrap) error { + job := newWatchOnPremisesIPsJob(ctx, c.Kind, b) err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) if err != nil { @@ -761,11 +761,11 @@ func (r *CassandraReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cassandra, return nil } -func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.Job { +func (r *CassandraReconciler) newWatchStatusJob(ctx context.Context, c *v1beta1.Cassandra) scheduler.Job { l := log.Log.WithValues("component", "CassandraStatusClusterJob") return func() error { namespacedName := client.ObjectKeyFromObject(c) - err := r.Get(context.Background(), namespacedName, c) + err := r.Get(ctx, namespacedName, c) if k8serrors.IsNotFound(err) { l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) @@ -779,11 +779,11 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. if err != nil { if errors.Is(err, instaclustr.NotFound) { if c.DeletionTimestamp != nil { - _, err = r.handleDeleteCluster(context.Background(), l, c) + _, err = r.handleDeleteCluster(ctx, l, c) return err } - return r.handleExternalDelete(context.Background(), c) + return r.handleExternalDelete(ctx, c) } l.Error(err, "Cannot get cluster from the Instaclustr API", @@ -800,7 +800,67 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. return err } + // TODO move everything not related to the watch status job to other places if !areStatusesEqual(&iCassandra.Status.ClusterStatus, &c.Status.ClusterStatus) { + if len(c.Status.DataCentres) != 0 && + len(iCassandra.Status.DataCentres) != 0 { + bootstrap := newOnPremisesBootstrap( + r.Client, + c, + r.EventRecorder, + iCassandra.Status.ClusterStatus, + c.Spec.OnPremisesSpec, + newExposePorts(c.GetExposePorts()), + c.GetHeadlessPorts(), + c.Spec.PrivateNetworkCluster, + ) + oldNode, newNode := getReplacedNodes(c.Status.DataCentres[0], iCassandra.Status.DataCentres[0]) + if oldNode != nil && + newNode != nil { + err = deleteReplacedNodeResources(ctx, r.Client, oldNode.ID) + if err != nil { + if !k8serrors.IsNotFound(err) { + l.Info("Node replace is in progress. Deleting old vm resources", + "cluster name", c.Name, + "old id", oldNode.ID) + + return nil + } + + l.Error(err, "Cannot delete replaced node resources", + "old node id", oldNode.ID) + + return err + } + + l.Info("Node replace is in progress. Creating new resources", + "bootstrap struct", bootstrap, + "new node id", newNode.ID) + + err = handleNodeReplace( + ctx, + bootstrap, + oldNode, + newNode) + if err != nil { + if k8serrors.IsNotFound(err) { + l.Info("Node replace is in progress. Reconciling resources", + "old node id", oldNode.ID, + "new node id", newNode.ID) + + return nil + } + + l.Error(err, "Error while handling node replace operation", + "bootstrap struct", bootstrap, + "old node", oldNode, + "new node", newNode) + + return err + } + } + } + l.Info("Updating cluster status", "status from Instaclustr", iCassandra.Status.ClusterStatus, "status from k8s", c.Status.ClusterStatus) @@ -809,7 +869,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. patch := c.NewPatch() c.Status.ClusterStatus = iCassandra.Status.ClusterStatus - err = r.Status().Patch(context.Background(), c, patch) + err = r.Status().Patch(ctx, c, patch) if err != nil { return err } @@ -848,7 +908,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. patch := c.NewPatch() c.Annotations[models.ExternalChangesAnnotation] = models.True - err = r.Patch(context.Background(), c, patch) + err = r.Patch(ctx, c, patch) if err != nil { l.Error(err, "Cannot patch cluster cluster", "cluster name", c.Spec.Name, "cluster state", c.Status.State) @@ -865,8 +925,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. r.EventRecorder.Eventf(c, models.Warning, models.ExternalChanges, msgDiffSpecs) } - //TODO: change all context.Background() and context.TODO() to ctx from Reconcile - err = r.reconcileMaintenanceEvents(context.Background(), c) + err = r.reconcileMaintenanceEvents(ctx, c) if err != nil { l.Error(err, "Cannot reconcile cluster maintenance events", "cluster name", c.Spec.Name, @@ -890,7 +949,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. } dc.ResizeOperations = resizeOperations - err = r.Status().Patch(context.Background(), c, patch) + err = r.Status().Patch(ctx, c, patch) if err != nil { l.Error(err, "Cannot patch data centre resize operations", "cluster name", c.Spec.Name, @@ -907,11 +966,10 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. } } -func (r *CassandraReconciler) newWatchBackupsJob(c *v1beta1.Cassandra) scheduler.Job { +func (r *CassandraReconciler) newWatchBackupsJob(ctx context.Context, c *v1beta1.Cassandra) scheduler.Job { l := log.Log.WithValues("component", "cassandraBackupsClusterJob") return func() error { - ctx := context.Background() err := r.Get(ctx, types.NamespacedName{Namespace: c.Namespace, Name: c.Name}, c) if err != nil { if k8serrors.IsNotFound(err) { @@ -994,7 +1052,7 @@ func (r *CassandraReconciler) newWatchBackupsJob(c *v1beta1.Cassandra) scheduler patch := backupToAssign.NewPatch() backupToAssign.Status.Start = iBackup.Start backupToAssign.Status.UpdateStatus(iBackup) - err = r.Status().Patch(context.TODO(), backupToAssign, patch) + err = r.Status().Patch(ctx, backupToAssign, patch) if err != nil { return err } @@ -1015,12 +1073,10 @@ func (r *CassandraReconciler) newWatchBackupsJob(c *v1beta1.Cassandra) scheduler } } -func (r *CassandraReconciler) newUsersCreationJob(c *v1beta1.Cassandra) scheduler.Job { +func (r *CassandraReconciler) newUsersCreationJob(ctx context.Context, c *v1beta1.Cassandra) scheduler.Job { l := log.Log.WithValues("component", "cassandraUsersCreationJob") return func() error { - ctx := context.Background() - err := r.Get(ctx, types.NamespacedName{ Namespace: c.Namespace, Name: c.Name, diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index a7ec26e3a..284afd91e 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -252,7 +252,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, k *v1beta1.Ka return reconcile.Result{}, err } - err = r.startClusterOnPremisesIPsJob(k, bootstrap) + err = r.startClusterOnPremisesIPsJob(ctx, k, bootstrap) if err != nil { l.Error(err, "Cannot start on-premises cluster IPs check job", "cluster ID", k.Status.ID, @@ -574,8 +574,8 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, k *v1beta1.Ka return models.ExitReconcile, nil } -func (r *KafkaReconciler) startClusterOnPremisesIPsJob(k *v1beta1.Kafka, b *onPremisesBootstrap) error { - job := newWatchOnPremisesIPsJob(k.Kind, b) +func (r *KafkaReconciler) startClusterOnPremisesIPsJob(ctx context.Context, k *v1beta1.Kafka, b *onPremisesBootstrap) error { + job := newWatchOnPremisesIPsJob(ctx, k.Kind, b) err := r.Scheduler.ScheduleJob(k.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) if err != nil { diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index 423688fdd..cbf6709d9 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -241,7 +241,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 return reconcile.Result{}, err } - err = r.startClusterOnPremisesIPsJob(kc, bootstrap) + err = r.startClusterOnPremisesIPsJob(ctx, kc, bootstrap) if err != nil { l.Error(err, "Cannot start on-premises cluster IPs check job", "cluster ID", kc.Status.ID, @@ -574,8 +574,8 @@ func (r *KafkaConnectReconciler) createDefaultSecret(ctx context.Context, kc *v1 return nil } -func (r *KafkaConnectReconciler) startClusterOnPremisesIPsJob(k *v1beta1.KafkaConnect, b *onPremisesBootstrap) error { - job := newWatchOnPremisesIPsJob(k.Kind, b) +func (r *KafkaConnectReconciler) startClusterOnPremisesIPsJob(ctx context.Context, k *v1beta1.KafkaConnect, b *onPremisesBootstrap) error { + job := newWatchOnPremisesIPsJob(ctx, k.Kind, b) err := r.Scheduler.ScheduleJob(k.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) if err != nil { diff --git a/controllers/clusters/on_premises.go b/controllers/clusters/on_premises.go index 18fefaf67..58bbf66b7 100644 --- a/controllers/clusters/on_premises.go +++ b/controllers/clusters/on_premises.go @@ -19,10 +19,7 @@ package clusters import ( "context" "fmt" - "strings" - k8scorev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -93,102 +90,134 @@ func handleCreateOnPremisesClusterResources(ctx context.Context, b *onPremisesBo return nil } +func handleNodeReplace(ctx context.Context, b *onPremisesBootstrap, newNode, oldNode *v1beta1.Node) error { + objectName := b.K8sObject.GetName() + vm := &virtcorev1.VirtualMachine{} + + for i := 0; i < models.VMNameMaxIndex; i++ { + nodeName := fmt.Sprintf("%s-%d-%s", models.NodeVMPrefix, i, objectName) + err := b.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: b.K8sObject.GetNamespace(), + Name: nodeName, + }, vm) + if client.IgnoreNotFound(err) != nil { + return err + } + if err == nil { + continue + } + + osDVName := fmt.Sprintf("%s-%d-%s", models.NodeOSDVPrefix, i, objectName) + osDV, err := reconcileDV(ctx, b, newNode.ID, osDVName, models.OSDVLabel) + if err != nil { + return err + } + + storageDVList := &cdiv1beta1.DataVolumeList{} + err = b.K8sClient.List(ctx, storageDVList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.NodeIDLabel: oldNode.ID, + models.DVRoleLabel: models.StorageDVLabel, + }), + }) + if err != nil { + return err + } + + storageDV := &cdiv1beta1.DataVolume{} + storageDVName := fmt.Sprintf("%s-%d-%s", models.NodeDVPrefix, i, objectName) + err = b.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: b.K8sObject.GetNamespace(), + Name: storageDVName, + }, storageDV) + if err != nil { + return err + } + + patch := client.MergeFrom(storageDV.DeepCopy()) + storageDV.Labels[models.NodeIDLabel] = newNode.ID + err = b.K8sClient.Patch(ctx, storageDV, patch) + if err != nil { + return err + } + + vm, err = reconcileVM(ctx, b, newNode.ID, nodeName, newNode.Rack, osDV.Name, storageDV.Name) + if err != nil { + return err + } + + return nil + } + + return models.ErrGenerateAvailableVMNameFailed +} + func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap) error { - gatewayDVSize, err := resource.ParseQuantity(b.OnPremisesSpec.OSDiskSize) + gatewayDVName := fmt.Sprintf("%s-%s", models.GatewayDVPrefix, b.K8sObject.GetName()) + gatewayDV, err := reconcileDV(ctx, b, b.ClusterStatus.DataCentres[0].ID, gatewayDVName, models.OSDVLabel) if err != nil { return err } - gatewayDVName := fmt.Sprintf("%s-%s", models.GatewayDVPrefix, strings.ToLower(b.K8sObject.GetName())) - gatewayDV, err := createDV( + gatewayName := fmt.Sprintf("%s-%s", models.GatewayVMPrefix, b.K8sObject.GetName()) + gatewayVM, err := reconcileVM( ctx, b, - gatewayDVName, b.ClusterStatus.DataCentres[0].ID, - gatewayDVSize, - true, - ) + gatewayName, + models.GatewayRack, + gatewayDV.Name) if err != nil { return err } - gatewayCPU := resource.Quantity{} - gatewayCPU.Set(b.OnPremisesSpec.SSHGatewayCPU) - - gatewayMemory, err := resource.ParseQuantity(b.OnPremisesSpec.SSHGatewayMemory) + _, err = reconcileService(ctx, b, b.ClusterStatus.DataCentres[0].ID, gatewayVM.Name, models.ExposeServiceRoleLabel) if err != nil { return err } - gatewayName := fmt.Sprintf("%s-%s", models.GatewayVMPrefix, strings.ToLower(b.K8sObject.GetName())) - - gatewayVM := &virtcorev1.VirtualMachine{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: gatewayName, - }, gatewayVM) - if client.IgnoreNotFound(err) != nil { + _, err = reconcileService(ctx, b, b.ClusterStatus.DataCentres[0].ID, gatewayVM.Name, models.ClusterIPServiceRoleLabel) + if err != nil { return err } - if k8serrors.IsNotFound(err) { - gatewayVM, err = newVM( - ctx, - b, - gatewayName, - b.ClusterStatus.DataCentres[0].ID, - models.GatewayRack, - gatewayDV.Name, - gatewayCPU, - gatewayMemory) + + return nil +} + +func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap) error { + for i, node := range b.ClusterStatus.DataCentres[0].Nodes { + objectName := b.K8sObject.GetName() + newOSDVName := fmt.Sprintf("%s-%d-%s", models.NodeOSDVPrefix, i, objectName) + nodeOSDV, err := reconcileDV(ctx, b, node.ID, newOSDVName, models.OSDVLabel) if err != nil { return err } - err = b.K8sClient.Create(ctx, gatewayVM) + + nodeDataDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeDVPrefix, i, objectName) + nodeDataDV, err := reconcileDV(ctx, b, node.ID, nodeDataDiskDVName, models.StorageDVLabel) if err != nil { return err } - } - gatewaySvcName := fmt.Sprintf("%s-%s", models.GatewaySvcPrefix, gatewayName) - gatewayExposeService := &k8scorev1.Service{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: gatewaySvcName, - }, gatewayExposeService) + nodeName := fmt.Sprintf("%s-%d-%s", models.NodeVMPrefix, i, objectName) + nodeVM, err := reconcileVM(ctx, b, node.ID, nodeName, node.Rack, nodeOSDV.Name, nodeDataDV.Name) + if err != nil { + return err + } - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - gatewayExposeService = newExposeService( - b, - gatewaySvcName, - gatewayName, - b.ClusterStatus.DataCentres[0].ID, - ) - err = b.K8sClient.Create(ctx, gatewayExposeService) + if !b.PrivateNetworkCluster { + _, err = reconcileService(ctx, b, node.ID, nodeVM.Name, models.ExposeServiceRoleLabel) + if err != nil { + return err + } + } + + _, err = reconcileService(ctx, b, node.ID, nodeVM.Name, models.HeadlessServiceRoleLabel) if err != nil { return err } - } - clusterIPServiceName := fmt.Sprintf("cluster-ip-%s", gatewayName) - nodeExposeService := &k8scorev1.Service{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: clusterIPServiceName, - }, nodeExposeService) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - nodeExposeService = newClusterIPService( - b, - clusterIPServiceName, - gatewayName, - b.ClusterStatus.DataCentres[0].ID, - ) - err = b.K8sClient.Create(ctx, nodeExposeService) + _, err = reconcileService(ctx, b, node.ID, nodeVM.Name, models.ClusterIPServiceRoleLabel) if err != nil { return err } @@ -197,198 +226,220 @@ func reconcileSSHGatewayResources(ctx context.Context, b *onPremisesBootstrap) e return nil } -func reconcileNodesResources(ctx context.Context, b *onPremisesBootstrap) error { - for i, node := range b.ClusterStatus.DataCentres[0].Nodes { - nodeOSDiskSize, err := resource.ParseQuantity(b.OnPremisesSpec.OSDiskSize) +func reconcileVM( + ctx context.Context, + b *onPremisesBootstrap, + nodeID, + nodeName, + rack, + osDVName string, + storageDVName ...string) (*virtcorev1.VirtualMachine, error) { + nodeCPU := resource.Quantity{} + nodeMemory := resource.Quantity{} + var err error + + switch rack { + case models.GatewayRack: + nodeCPU.Set(b.OnPremisesSpec.SSHGatewayCPU) + nodeMemory, err = resource.ParseQuantity(b.OnPremisesSpec.SSHGatewayMemory) if err != nil { - return err + return nil, err } - - nodeOSDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeOSDVPrefix, i, strings.ToLower(b.K8sObject.GetName())) - nodeOSDV, err := createDV( - ctx, - b, - nodeOSDiskDVName, - node.ID, - nodeOSDiskSize, - true, - ) + default: + nodeCPU.Set(b.OnPremisesSpec.NodeCPU) + nodeMemory, err = resource.ParseQuantity(b.OnPremisesSpec.NodeMemory) if err != nil { - return err + return nil, err } + } - nodeDataDiskDVSize, err := resource.ParseQuantity(b.OnPremisesSpec.DataDiskSize) - if err != nil { - return err - } + vmList := &virtcorev1.VirtualMachineList{} + err = b.K8sClient.List(ctx, vmList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.NodeIDLabel: nodeID, + }), + }) - nodeDataDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeDVPrefix, i, strings.ToLower(b.K8sObject.GetName())) - nodeDataDV, err := createDV( + vm := &virtcorev1.VirtualMachine{} + if len(vmList.Items) == 0 { + vm, err = newVM( ctx, b, - nodeDataDiskDVName, - node.ID, - nodeDataDiskDVSize, - false, + nodeName, + nodeID, + rack, + osDVName, + nodeCPU, + nodeMemory, + storageDVName..., ) if err != nil { - return err + return nil, err } - nodeCPU := resource.Quantity{} - nodeCPU.Set(b.OnPremisesSpec.NodeCPU) - - nodeMemory, err := resource.ParseQuantity(b.OnPremisesSpec.NodeMemory) + err = b.K8sClient.Create(ctx, vm) if err != nil { - return err + + return nil, err } - nodeName := fmt.Sprintf("%s-%d-%s", models.NodeVMPrefix, i, strings.ToLower(b.K8sObject.GetName())) + return vm, nil + } - nodeVM := &virtcorev1.VirtualMachine{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: nodeName, - }, nodeVM) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - nodeVM, err = newVM( - ctx, - b, - nodeName, - node.ID, - node.Rack, - nodeOSDV.Name, - nodeCPU, - nodeMemory, - nodeDataDV.Name, - ) + return &vmList.Items[0], nil +} + +func reconcileDV(ctx context.Context, b *onPremisesBootstrap, nodeID, diskName, diskRole string) (*cdiv1beta1.DataVolume, error) { + dvList := &cdiv1beta1.DataVolumeList{} + err := b.K8sClient.List(ctx, dvList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.NodeIDLabel: nodeID, + models.DVRoleLabel: diskRole, + }), + }) + if err != nil { + return nil, err + } + + dv := &cdiv1beta1.DataVolume{} + if len(dvList.Items) == 0 { + dvSize := resource.Quantity{} + switch diskRole { + case models.OSDVLabel: + dvSize, err = resource.ParseQuantity(b.OnPremisesSpec.OSDiskSize) if err != nil { - return err + return nil, err } - err = b.K8sClient.Create(ctx, nodeVM) + case models.StorageDVLabel: + dvSize, err = resource.ParseQuantity(b.OnPremisesSpec.DataDiskSize) if err != nil { - return err + return nil, err } } - if !b.PrivateNetworkCluster { - nodeExposeName := fmt.Sprintf("%s-%s", models.NodeSvcPrefix, nodeName) - nodeExposeService := &k8scorev1.Service{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: nodeExposeName, - }, nodeExposeService) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - nodeExposeService = newExposeService( - b, - nodeExposeName, - nodeName, - node.ID, - ) - err = b.K8sClient.Create(ctx, nodeExposeService) - if err != nil { - return err - } - } + dv, err = createDV( + ctx, + b, + diskName, + nodeID, + diskRole, + dvSize, + ) + if err != nil { + return nil, err } - headlessServiceName := fmt.Sprintf("%s-%s", models.KubevirtSubdomain, strings.ToLower(b.K8sObject.GetName())) - headlessSVC := &k8scorev1.Service{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: headlessServiceName, - }, headlessSVC) + return dv, nil + } - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - headlessSVC = newHeadlessService( + return &dvList.Items[0], nil +} + +func reconcileService(ctx context.Context, b *onPremisesBootstrap, nodeID, nodeName, role string) (*k8scorev1.Service, error) { + svcList := &k8scorev1.ServiceList{} + svcLabels := map[string]string{ + models.ServiceRoleLabel: role, + models.ClusterIDLabel: b.ClusterStatus.ID, + } + + var svcName string + if role == models.HeadlessServiceRoleLabel { + svcName = fmt.Sprintf("%s-%s", role, b.K8sObject.GetName()) + } else { + svcName = fmt.Sprintf("%s-%s", role, nodeName) + svcLabels[models.NodeIDLabel] = nodeID + } + + err := b.K8sClient.List(ctx, svcList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(svcLabels), + }) + if err != nil { + return nil, err + } + + svc := &k8scorev1.Service{} + if len(svcList.Items) == 0 { + switch role { + case models.ExposeServiceRoleLabel: + svc = newExposeService( b, - headlessServiceName, + svcName, + nodeName, + nodeID, ) - err = b.K8sClient.Create(ctx, headlessSVC) - if err != nil { - return err - } - } - - clusterIPServiceName := fmt.Sprintf("cluster-ip-%s", nodeName) - nodeExposeService := &k8scorev1.Service{} - err = b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: clusterIPServiceName, - }, nodeExposeService) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - nodeExposeService = newClusterIPService( + case models.HeadlessServiceRoleLabel: + svc = newHeadlessService( + b, + svcName, + ) + case models.ClusterIPServiceRoleLabel: + svc = newClusterIPService( b, - clusterIPServiceName, + svcName, nodeName, - node.ID, + nodeID, ) - err = b.K8sClient.Create(ctx, nodeExposeService) - if err != nil { - return err - } + } + err = b.K8sClient.Create(ctx, svc) + if err != nil { + return nil, err } + return svc, nil } - return nil + + err = b.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: b.K8sObject.GetNamespace(), + Name: svcName, + }, svc) + if err != nil { + return nil, err + } + + return svc, nil } func createDV( ctx context.Context, b *onPremisesBootstrap, name, - nodeID string, + nodeID, + diskRole string, size resource.Quantity, - isOSDisk bool, ) (*cdiv1beta1.DataVolume, error) { - dv := &cdiv1beta1.DataVolume{} - err := b.K8sClient.Get(ctx, types.NamespacedName{ - Namespace: b.K8sObject.GetNamespace(), - Name: name, - }, dv) - if client.IgnoreNotFound(err) != nil { + dv := newDataDiskDV( + b, + name, + nodeID, + diskRole, + size, + ) + err := b.K8sClient.Create(ctx, dv) + if err != nil { return nil, err } - if k8serrors.IsNotFound(err) { - dv = newDataDiskDV( - b, - name, - nodeID, - size, - isOSDisk, - ) - err = b.K8sClient.Create(ctx, dv) - if err != nil { - return nil, err - } - } + return dv, nil } func newDataDiskDV( b *onPremisesBootstrap, name, - nodeID string, + nodeID, + diskRole string, storageSize resource.Quantity, - isOSDisk bool, ) *cdiv1beta1.DataVolume { dvSource := &cdiv1beta1.DataVolumeSource{} + dvLabels := map[string]string{ + models.ClusterIDLabel: b.ClusterStatus.ID, + models.NodeIDLabel: nodeID, + models.DVRoleLabel: diskRole, + } - if isOSDisk { + switch diskRole { + case models.OSDVLabel: dvSource.HTTP = &cdiv1beta1.DataVolumeSourceHTTP{URL: b.OnPremisesSpec.OSImageURL} - } else { + case models.StorageDVLabel: dvSource.Blank = &cdiv1beta1.DataVolumeBlankImage{} } @@ -398,12 +449,9 @@ func newDataDiskDV( APIVersion: models.CDIKubevirtV1beta1APIVersion, }, ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: b.K8sObject.GetNamespace(), - Labels: map[string]string{ - models.ClusterIDLabel: b.ClusterStatus.ID, - models.NodeIDLabel: nodeID, - }, + Name: name, + Namespace: b.K8sObject.GetNamespace(), + Labels: dvLabels, Finalizers: []string{models.DeletionFinalizer}, }, Spec: cdiv1beta1.DataVolumeSpec{ @@ -517,10 +565,8 @@ func newVM( { Name: models.Boot, VolumeSource: virtcorev1.VolumeSource{ - PersistentVolumeClaim: &virtcorev1.PersistentVolumeClaimVolumeSource{ - PersistentVolumeClaimVolumeSource: k8scorev1.PersistentVolumeClaimVolumeSource{ - ClaimName: OSDiskDVName, - }, + DataVolume: &virtcorev1.DataVolumeSource{ + Name: OSDiskDVName, }, }, }, @@ -566,10 +612,8 @@ func newVM( vm.Spec.Template.Spec.Volumes = append(vm.Spec.Template.Spec.Volumes, virtcorev1.Volume{ Name: diskName, VolumeSource: virtcorev1.VolumeSource{ - PersistentVolumeClaim: &virtcorev1.PersistentVolumeClaimVolumeSource{ - PersistentVolumeClaimVolumeSource: k8scorev1.PersistentVolumeClaimVolumeSource{ - ClaimName: dvName, - }, + DataVolume: &virtcorev1.DataVolumeSource{ + Name: dvName, }, }, }) @@ -593,8 +637,9 @@ func newExposeService( Name: svcName, Namespace: b.K8sObject.GetNamespace(), Labels: map[string]string{ - models.ClusterIDLabel: b.ClusterStatus.ID, - models.NodeIDLabel: nodeID, + models.ClusterIDLabel: b.ClusterStatus.ID, + models.NodeIDLabel: nodeID, + models.ServiceRoleLabel: models.ExposeServiceRoleLabel, }, Finalizers: []string{models.DeletionFinalizer}, }, @@ -622,7 +667,8 @@ func newHeadlessService( Name: svcName, Namespace: b.K8sObject.GetNamespace(), Labels: map[string]string{ - models.ClusterIDLabel: b.ClusterStatus.ID, + models.ClusterIDLabel: b.ClusterStatus.ID, + models.ServiceRoleLabel: models.HeadlessServiceRoleLabel, }, Finalizers: []string{models.DeletionFinalizer}, }, @@ -758,12 +804,12 @@ func newExposePorts(sp []k8scorev1.ServicePort) []k8scorev1.ServicePort { return ports } -func newWatchOnPremisesIPsJob(kind string, b *onPremisesBootstrap) scheduler.Job { +func newWatchOnPremisesIPsJob(ctx context.Context, kind string, b *onPremisesBootstrap) scheduler.Job { l := log.Log.WithValues("component", fmt.Sprintf("%sOnPremisesIPsCheckerJob", kind)) return func() error { allNodePods := &k8scorev1.PodList{} - err := b.K8sClient.List(context.Background(), allNodePods, &client.ListOptions{ + err := b.K8sClient.List(ctx, allNodePods, &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ models.ClusterIDLabel: b.ClusterStatus.ID, models.NodeLabel: models.WorkerNode, @@ -801,7 +847,7 @@ func newWatchOnPremisesIPsJob(kind string, b *onPremisesBootstrap) scheduler.Job for _, node := range b.ClusterStatus.DataCentres[0].Nodes { nodePods := &k8scorev1.PodList{} - err = b.K8sClient.List(context.Background(), nodePods, &client.ListOptions{ + err = b.K8sClient.List(ctx, nodePods, &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ models.ClusterIDLabel: b.ClusterStatus.ID, models.NodeIDLabel: node.ID, @@ -837,7 +883,7 @@ func newWatchOnPremisesIPsJob(kind string, b *onPremisesBootstrap) scheduler.Job b.EventRecorder.Eventf( b.K8sObject, models.Warning, models.CreationFailed, - "The private IP addresses of the node are not matching. Reason: %v", + "The private IP addresses of the node are not matching. Contact Instaclustr support to resolve the issue. Reason: %v", err, ) return err @@ -846,7 +892,7 @@ func newWatchOnPremisesIPsJob(kind string, b *onPremisesBootstrap) scheduler.Job if !b.PrivateNetworkCluster { nodeSVCs := &k8scorev1.ServiceList{} - err = b.K8sClient.List(context.Background(), nodeSVCs, &client.ListOptions{ + err = b.K8sClient.List(ctx, nodeSVCs, &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ models.ClusterIDLabel: b.ClusterStatus.ID, models.NodeIDLabel: node.ID, @@ -882,7 +928,7 @@ func newWatchOnPremisesIPsJob(kind string, b *onPremisesBootstrap) scheduler.Job b.EventRecorder.Eventf( b.K8sObject, models.Warning, models.CreationFailed, - "The public IP addresses of the node are not matching. Reason: %v", + "The public IP addresses of the node are not matching. Contact Instaclustr support to resolve the issue. Reason: %v", err, ) return err @@ -910,8 +956,9 @@ func newClusterIPService( Name: svcName, Namespace: b.K8sObject.GetNamespace(), Labels: map[string]string{ - models.ClusterIDLabel: b.ClusterStatus.ID, - models.NodeIDLabel: nodeID, + models.ClusterIDLabel: b.ClusterStatus.ID, + models.NodeIDLabel: nodeID, + models.ServiceRoleLabel: models.ClusterIPServiceRoleLabel, }, Finalizers: []string{models.DeletionFinalizer}, }, @@ -925,3 +972,106 @@ func newClusterIPService( }, } } + +func getReplacedNodes(oldNodesDC, newNodesDC *v1beta1.DataCentreStatus) (removedNode, addedNode *v1beta1.Node) { + for _, oldNode := range oldNodesDC.Nodes { + for _, newNode := range newNodesDC.Nodes { + if newNode.PrivateAddress == "" { + addedNode = newNode + } + + if oldNode.ID == newNode.ID { + removedNode = nil + continue + } + removedNode = oldNode + } + } + + return +} + +// Tries to delete old node resources and returns NotFound error if all resources already deleted +func deleteReplacedNodeResources(ctx context.Context, k8sClient client.Client, oldID string) error { + vmList := &virtcorev1.VirtualMachineList{} + err := k8sClient.List(ctx, vmList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.NodeIDLabel: oldID, + }), + }) + if err != nil { + return err + } + + if len(vmList.Items) != 0 { + err = k8sClient.Delete(ctx, &vmList.Items[0]) + if err != nil { + if client.IgnoreNotFound(err) != nil { + return err + } + } else { + patch := client.MergeFrom(vmList.Items[0].DeepCopy()) + controllerutil.RemoveFinalizer(&vmList.Items[0], models.DeletionFinalizer) + err = k8sClient.Patch(ctx, &vmList.Items[0], patch) + if err != nil { + return err + } + } + } + + dvList := &cdiv1beta1.DataVolumeList{} + err = k8sClient.List(ctx, dvList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.NodeIDLabel: oldID, + models.DVRoleLabel: models.OSDVLabel, + }), + }) + if err != nil { + return err + } + + if len(dvList.Items) != 0 { + err = k8sClient.Delete(ctx, &dvList.Items[0]) + if err != nil { + if client.IgnoreNotFound(err) != nil { + return err + } + } else { + patch := client.MergeFrom(dvList.Items[0].DeepCopy()) + controllerutil.RemoveFinalizer(&dvList.Items[0], models.DeletionFinalizer) + err = k8sClient.Patch(ctx, &dvList.Items[0], patch) + if err != nil { + return err + } + } + } + + svcList := &k8scorev1.ServiceList{} + err = k8sClient.List(ctx, dvList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.NodeIDLabel: oldID, + models.ServiceRoleLabel: models.ClusterIPServiceRoleLabel, + }), + }) + if err != nil { + return err + } + + if len(svcList.Items) != 0 { + err = k8sClient.Delete(ctx, &svcList.Items[0]) + if err != nil { + if client.IgnoreNotFound(err) != nil { + return err + } + } else { + patch := client.MergeFrom(svcList.Items[0].DeepCopy()) + controllerutil.RemoveFinalizer(&svcList.Items[0], models.DeletionFinalizer) + err = k8sClient.Patch(ctx, &svcList.Items[0], patch) + if err != nil { + return err + } + } + } + + return err +} diff --git a/controllers/clusters/postgresql_controller.go b/controllers/clusters/postgresql_controller.go index e1a25c0f7..32713f588 100644 --- a/controllers/clusters/postgresql_controller.go +++ b/controllers/clusters/postgresql_controller.go @@ -311,7 +311,7 @@ func (r *PostgreSQLReconciler) handleCreateCluster( return reconcile.Result{}, err } - err = r.startClusterOnPremisesIPsJob(pg, bootstrap) + err = r.startClusterOnPremisesIPsJob(ctx, pg, bootstrap) if err != nil { l.Error(err, "Cannot start on-premises cluster IPs check job", "cluster ID", pg.Status.ID, @@ -1151,8 +1151,8 @@ func (r *PostgreSQLReconciler) handleUpdateDefaultUserPassword( return models.ExitReconcile, nil } -func (r *PostgreSQLReconciler) startClusterOnPremisesIPsJob(pg *v1beta1.PostgreSQL, b *onPremisesBootstrap) error { - job := newWatchOnPremisesIPsJob(pg.Kind, b) +func (r *PostgreSQLReconciler) startClusterOnPremisesIPsJob(ctx context.Context, pg *v1beta1.PostgreSQL, b *onPremisesBootstrap) error { + job := newWatchOnPremisesIPsJob(ctx, pg.Kind, b) err := r.Scheduler.ScheduleJob(pg.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) if err != nil { diff --git a/controllers/clusters/redis_controller.go b/controllers/clusters/redis_controller.go index ef1296507..17919af1c 100644 --- a/controllers/clusters/redis_controller.go +++ b/controllers/clusters/redis_controller.go @@ -295,7 +295,7 @@ func (r *RedisReconciler) handleCreateCluster( return reconcile.Result{}, err } - err = r.startClusterOnPremisesIPsJob(redis, bootstrap) + err = r.startClusterOnPremisesIPsJob(ctx, redis, bootstrap) if err != nil { l.Error(err, "Cannot start on-premises cluster IPs check job", "cluster ID", redis.Status.ID, @@ -711,8 +711,8 @@ func (r *RedisReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *RedisReconciler) startClusterOnPremisesIPsJob(redis *v1beta1.Redis, b *onPremisesBootstrap) error { - job := newWatchOnPremisesIPsJob(redis.Kind, b) +func (r *RedisReconciler) startClusterOnPremisesIPsJob(ctx context.Context, redis *v1beta1.Redis, b *onPremisesBootstrap) error { + job := newWatchOnPremisesIPsJob(ctx, redis.Kind, b) err := r.Scheduler.ScheduleJob(redis.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) if err != nil { diff --git a/pkg/models/errors.go b/pkg/models/errors.go index a86d5ca0f..09e49b284 100644 --- a/pkg/models/errors.go +++ b/pkg/models/errors.go @@ -70,4 +70,5 @@ var ( ErrDebeziumImmutable = errors.New("debezium array is immutable") ErrCreateClusterWithMultiDC = errors.New("Multiple data center is still not supported. Please create a cluster with one data centre and add a second one when the cluster is in the running state") ErrOnPremicesWithMultiDC = errors.New("on-premises cluster can be provisioned with only one data centre") + ErrGenerateAvailableVMNameFailed = errors.New("failed to generate available VM name") ) diff --git a/pkg/models/on_premises.go b/pkg/models/on_premises.go index 7766ae2ca..ae0e43319 100644 --- a/pkg/models/on_premises.go +++ b/pkg/models/on_premises.go @@ -15,8 +15,15 @@ const ( NodeIDLabel = "nodeID" NodeRackLabel = "nodeRack" NodeLabel = "node" - NodeOSDVPrefix = "node-os-data-volume-pvc" - NodeDVPrefix = "node-data-volume-pvc" + ServiceRoleLabel = "service-role" + ExposeServiceRoleLabel = "expose" + HeadlessServiceRoleLabel = "headless" + ClusterIPServiceRoleLabel = "cluster-ip" + DVRoleLabel = "role" + OSDVLabel = "osdv" + StorageDVLabel = "storagedv" + NodeOSDVPrefix = "os-volume" + NodeDVPrefix = "data-volume" NodeVMPrefix = "node-vm" NodeSvcPrefix = "node-service" WorkerNode = "worker-node" @@ -26,6 +33,7 @@ const ( GatewayRack = "ssh-gateway-rack" IgnitionScriptSecretPrefix = "ignition-script-secret" DataDisk = "data-disk" + VMNameMaxIndex = 20 Boot = "boot" Storage = "storage"