Skip to content

Commit

Permalink
Implemented a node replace flow
Browse files Browse the repository at this point in the history
  • Loading branch information
RostislavPorohnya committed Jan 3, 2024
1 parent 46b12b2 commit 24db632
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 283 deletions.
10 changes: 5 additions & 5 deletions config/samples/onpremises/clusters_v1beta1_cassandra.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
apiVersion: clusters.instaclustr.com/v1beta1
kind: Cassandra
metadata:
name: cassandra-on-prem-cluster
name: cassandra-on-prem-cluster-test
spec:
name: "danylo-on-prem-cassandra"
name: "test-on-prem-cassandra"
version: "4.0.10"
privateNetworkCluster: false
onPremisesSpec:
cloudInitScriptRef:
name: instaclustr-cloud-init-secret
namespace: default
storageClassName: managed-csi-premium
osDiskSize: 20Gi
dataDiskSize: 200Gi
Expand All @@ -15,9 +18,6 @@ spec:
nodeCPU: 2
nodeMemory: 8192Mi
osImageURL: "https://s3.amazonaws.com/debian-bucket/debian-11-generic-amd64-20230601-1398.raw"
cloudInitScriptRef:
namespace: default
name: instaclustr-cloud-init-secret
dataCentres:
- name: "onPremCassandra"
region: "CLIENT_DC"
Expand Down
6 changes: 3 additions & 3 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (r *CadenceReconciler) handleCreateCluster(
return reconcile.Result{}, err
}

err = r.startClusterOnPremisesIPsJob(c, bootstrap)
err = r.startClusterOnPremisesIPsJob(ctx, c, bootstrap)
if err != nil {
l.Error(err, "Cannot start on-premises cluster IPs check job",
"cluster ID", c.Status.ID,
Expand Down Expand Up @@ -882,8 +882,8 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra
}, nil
}

func (r *CadenceReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cadence, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(c.Kind, b)
func (r *CadenceReconciler) startClusterOnPremisesIPsJob(ctx context.Context, c *v1beta1.Cadence, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(ctx, c.Kind, b)

err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job)
if err != nil {
Expand Down
111 changes: 84 additions & 27 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (r *CassandraReconciler) handleCreateCluster(
}

if c.Status.State != models.DeletedStatus {
err = r.startClusterStatusJob(c)
err = r.startClusterStatusJob(ctx, c)
if err != nil {
l.Error(err, "Cannot start cluster status job",
"c cluster ID", c.Status.ID)
Expand Down Expand Up @@ -300,7 +300,7 @@ func (r *CassandraReconciler) handleCreateCluster(
return reconcile.Result{}, err
}

err = r.startClusterOnPremisesIPsJob(c, bootstrap)
err = r.startClusterOnPremisesIPsJob(ctx, c, bootstrap)
if err != nil {
l.Error(err, "Cannot start on-premises cluster IPs check job",
"cluster ID", c.Status.ID,
Expand All @@ -323,7 +323,7 @@ func (r *CassandraReconciler) handleCreateCluster(
return models.ExitReconcile, nil
}

err = r.startClusterBackupsJob(c)
err = r.startClusterBackupsJob(ctx, c)
if err != nil {
l.Error(err, "Cannot start cluster backups check job",
"cluster ID", c.Status.ID,
Expand All @@ -343,7 +343,7 @@ func (r *CassandraReconciler) handleCreateCluster(
)

if c.Spec.UserRefs != nil && c.Status.AvailableUsers == nil {
err = r.startUsersCreationJob(c)
err = r.startUsersCreationJob(ctx, c)
if err != nil {
l.Error(err, "Failed to start user creation job")
r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed,
Expand Down Expand Up @@ -717,8 +717,8 @@ func (r *CassandraReconciler) handleDeleteCluster(
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) startClusterStatusJob(c *v1beta1.Cassandra) error {
job := r.newWatchStatusJob(c)
func (r *CassandraReconciler) startClusterStatusJob(ctx context.Context, c *v1beta1.Cassandra) error {
job := r.newWatchStatusJob(ctx, c)

err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job)
if err != nil {
Expand All @@ -728,8 +728,8 @@ func (r *CassandraReconciler) startClusterStatusJob(c *v1beta1.Cassandra) error
return nil
}

func (r *CassandraReconciler) startClusterBackupsJob(cluster *v1beta1.Cassandra) error {
job := r.newWatchBackupsJob(cluster)
func (r *CassandraReconciler) startClusterBackupsJob(ctx context.Context, cluster *v1beta1.Cassandra) error {
job := r.newWatchBackupsJob(ctx, cluster)

err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.BackupsChecker), scheduler.ClusterBackupsInterval, job)
if err != nil {
Expand All @@ -739,8 +739,8 @@ func (r *CassandraReconciler) startClusterBackupsJob(cluster *v1beta1.Cassandra)
return nil
}

func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra) error {
job := r.newUsersCreationJob(cluster)
func (r *CassandraReconciler) startUsersCreationJob(ctx context.Context, cluster *v1beta1.Cassandra) error {
job := r.newUsersCreationJob(ctx, cluster)

err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.UserCreator), scheduler.UserCreationInterval, job)
if err != nil {
Expand All @@ -750,8 +750,8 @@ func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra)
return nil
}

func (r *CassandraReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cassandra, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(c.Kind, b)
func (r *CassandraReconciler) startClusterOnPremisesIPsJob(ctx context.Context, c *v1beta1.Cassandra, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(ctx, c.Kind, b)

err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job)
if err != nil {
Expand All @@ -761,11 +761,11 @@ func (r *CassandraReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cassandra,
return nil
}

func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.Job {
func (r *CassandraReconciler) newWatchStatusJob(ctx context.Context, c *v1beta1.Cassandra) scheduler.Job {
l := log.Log.WithValues("component", "CassandraStatusClusterJob")
return func() error {
namespacedName := client.ObjectKeyFromObject(c)
err := r.Get(context.Background(), namespacedName, c)
err := r.Get(ctx, namespacedName, c)
if k8serrors.IsNotFound(err) {
l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.",
"namespaced name", namespacedName)
Expand All @@ -779,11 +779,11 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
if c.DeletionTimestamp != nil {
_, err = r.handleDeleteCluster(context.Background(), l, c)
_, err = r.handleDeleteCluster(ctx, l, c)
return err
}

return r.handleExternalDelete(context.Background(), c)
return r.handleExternalDelete(ctx, c)
}

l.Error(err, "Cannot get cluster from the Instaclustr API",
Expand All @@ -800,7 +800,68 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.
return err
}

// TODO https://github.com/instaclustr/operator/issues/661
if !areStatusesEqual(&iCassandra.Status.ClusterStatus, &c.Status.ClusterStatus) {
if c.Spec.OnPremisesSpec != nil &&
len(c.Status.DataCentres) != 0 &&
len(iCassandra.Status.DataCentres) != 0 {
bootstrap := newOnPremisesBootstrap(
r.Client,
c,
r.EventRecorder,
iCassandra.Status.ClusterStatus,
c.Spec.OnPremisesSpec,
newExposePorts(c.GetExposePorts()),
c.GetHeadlessPorts(),
c.Spec.PrivateNetworkCluster,
)
oldNode, newNode := getReplacedNodes(c.Status.DataCentres[0], iCassandra.Status.DataCentres[0])
if oldNode != nil &&
newNode != nil {
deleteDone, err := deleteReplacedNodeResources(ctx, r.Client, oldNode.ID)
if err != nil {
l.Error(err, "Cannot delete replaced node resources",
"old node id", oldNode.ID)

return err
}

if !deleteDone {
l.Info("Node replace is in progress. Deleting old vm resources",
"cluster name", c.Name,
"old id", oldNode.ID)

return nil
}

l.Info("Node replace is in progress. Creating new resources",
"bootstrap struct", bootstrap,
"new node id", newNode.ID)

err = handleNodeReplace(
ctx,
bootstrap,
oldNode,
newNode)
if err != nil {
if k8serrors.IsNotFound(err) {
l.Info("Node replace is in progress. Reconciling resources",
"old node id", oldNode.ID,
"new node id", newNode.ID)

return nil
}

l.Error(err, "Error while handling node replace operation",
"bootstrap struct", bootstrap,
"old node", oldNode,
"new node", newNode)

return err
}
}
}

l.Info("Updating cluster status",
"status from Instaclustr", iCassandra.Status.ClusterStatus,
"status from k8s", c.Status.ClusterStatus)
Expand All @@ -809,7 +870,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.

patch := c.NewPatch()
c.Status.ClusterStatus = iCassandra.Status.ClusterStatus
err = r.Status().Patch(context.Background(), c, patch)
err = r.Status().Patch(ctx, c, patch)
if err != nil {
return err
}
Expand Down Expand Up @@ -848,7 +909,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.
patch := c.NewPatch()
c.Annotations[models.ExternalChangesAnnotation] = models.True

err = r.Patch(context.Background(), c, patch)
err = r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch cluster cluster",
"cluster name", c.Spec.Name, "cluster state", c.Status.State)
Expand All @@ -865,8 +926,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.
r.EventRecorder.Eventf(c, models.Warning, models.ExternalChanges, msgDiffSpecs)
}

//TODO: change all context.Background() and context.TODO() to ctx from Reconcile
err = r.reconcileMaintenanceEvents(context.Background(), c)
err = r.reconcileMaintenanceEvents(ctx, c)
if err != nil {
l.Error(err, "Cannot reconcile cluster maintenance events",
"cluster name", c.Spec.Name,
Expand All @@ -890,7 +950,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.
}

dc.ResizeOperations = resizeOperations
err = r.Status().Patch(context.Background(), c, patch)
err = r.Status().Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch data centre resize operations",
"cluster name", c.Spec.Name,
Expand All @@ -907,11 +967,10 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.
}
}

func (r *CassandraReconciler) newWatchBackupsJob(c *v1beta1.Cassandra) scheduler.Job {
func (r *CassandraReconciler) newWatchBackupsJob(ctx context.Context, c *v1beta1.Cassandra) scheduler.Job {
l := log.Log.WithValues("component", "cassandraBackupsClusterJob")

return func() error {
ctx := context.Background()
err := r.Get(ctx, types.NamespacedName{Namespace: c.Namespace, Name: c.Name}, c)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -994,7 +1053,7 @@ func (r *CassandraReconciler) newWatchBackupsJob(c *v1beta1.Cassandra) scheduler
patch := backupToAssign.NewPatch()
backupToAssign.Status.Start = iBackup.Start
backupToAssign.Status.UpdateStatus(iBackup)
err = r.Status().Patch(context.TODO(), backupToAssign, patch)
err = r.Status().Patch(ctx, backupToAssign, patch)
if err != nil {
return err
}
Expand All @@ -1015,12 +1074,10 @@ func (r *CassandraReconciler) newWatchBackupsJob(c *v1beta1.Cassandra) scheduler
}
}

func (r *CassandraReconciler) newUsersCreationJob(c *v1beta1.Cassandra) scheduler.Job {
func (r *CassandraReconciler) newUsersCreationJob(ctx context.Context, c *v1beta1.Cassandra) scheduler.Job {
l := log.Log.WithValues("component", "cassandraUsersCreationJob")

return func() error {
ctx := context.Background()

err := r.Get(ctx, types.NamespacedName{
Namespace: c.Namespace,
Name: c.Name,
Expand Down
6 changes: 3 additions & 3 deletions controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, k *v1beta1.Ka
return reconcile.Result{}, err
}

err = r.startClusterOnPremisesIPsJob(k, bootstrap)
err = r.startClusterOnPremisesIPsJob(ctx, k, bootstrap)
if err != nil {
l.Error(err, "Cannot start on-premises cluster IPs check job",
"cluster ID", k.Status.ID,
Expand Down Expand Up @@ -574,8 +574,8 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, k *v1beta1.Ka
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) startClusterOnPremisesIPsJob(k *v1beta1.Kafka, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(k.Kind, b)
func (r *KafkaReconciler) startClusterOnPremisesIPsJob(ctx context.Context, k *v1beta1.Kafka, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(ctx, k.Kind, b)

err := r.Scheduler.ScheduleJob(k.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions controllers/clusters/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1
return reconcile.Result{}, err
}

err = r.startClusterOnPremisesIPsJob(kc, bootstrap)
err = r.startClusterOnPremisesIPsJob(ctx, kc, bootstrap)
if err != nil {
l.Error(err, "Cannot start on-premises cluster IPs check job",
"cluster ID", kc.Status.ID,
Expand Down Expand Up @@ -574,8 +574,8 @@ func (r *KafkaConnectReconciler) createDefaultSecret(ctx context.Context, kc *v1
return nil
}

func (r *KafkaConnectReconciler) startClusterOnPremisesIPsJob(k *v1beta1.KafkaConnect, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(k.Kind, b)
func (r *KafkaConnectReconciler) startClusterOnPremisesIPsJob(ctx context.Context, k *v1beta1.KafkaConnect, b *onPremisesBootstrap) error {
job := newWatchOnPremisesIPsJob(ctx, k.Kind, b)

err := r.Scheduler.ScheduleJob(k.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job)
if err != nil {
Expand Down
Loading

0 comments on commit 24db632

Please sign in to comment.