diff --git a/api/v1alpha1/backup_types.go b/api/v1alpha1/backup_types.go index 430291a1..35cf799c 100644 --- a/api/v1alpha1/backup_types.go +++ b/api/v1alpha1/backup_types.go @@ -64,6 +64,8 @@ type BackupStatus struct { BackupDate string `json:"backupDate,omitempty"` // Get the backup Type BackupType string `json:"backupType,omitempty"` + // Get the Gtid + Gtid string `json:"gtid,omitempty"` // Conditions represents the backup resource conditions list. Conditions []BackupCondition `json:"conditions,omitempty"` } diff --git a/api/v1alpha1/mysqlcluster_types.go b/api/v1alpha1/mysqlcluster_types.go index 7250d5f2..0635cc00 100644 --- a/api/v1alpha1/mysqlcluster_types.go +++ b/api/v1alpha1/mysqlcluster_types.go @@ -405,6 +405,9 @@ type MysqlClusterStatus struct { ReadyNodes int `json:"readyNodes,omitempty"` // State State ClusterState `json:"state,omitempty"` + // LastBackup + LastBackup string `json:"lastbackup,omitempty"` + LastBackupGtid string `json:"lastbackupGtid,omitempty"` // Conditions contains the list of the cluster conditions fulfilled. Conditions []ClusterCondition `json:"conditions,omitempty"` // Nodes contains the list of the node status fulfilled. diff --git a/api/v1alpha1/mysqlcluster_webhook.go b/api/v1alpha1/mysqlcluster_webhook.go index 207b12fc..45b43b0a 100644 --- a/api/v1alpha1/mysqlcluster_webhook.go +++ b/api/v1alpha1/mysqlcluster_webhook.go @@ -88,7 +88,7 @@ func (r *MysqlCluster) ValidateDelete() error { return nil } -// TODO: Add NFSServerAddress webhook & backup schedule. +// Add NFSServerAddress webhook & backup schedule. func (r *MysqlCluster) validateNFSServerAddress(oldCluster *MysqlCluster) error { isIP := net.ParseIP(r.Spec.NFSServerAddress) != nil if len(r.Spec.NFSServerAddress) != 0 && !isIP { diff --git a/backup/cronbackup.go b/backup/cronbackup.go index ca00b238..6d852c5d 100644 --- a/backup/cronbackup.go +++ b/backup/cronbackup.go @@ -57,14 +57,22 @@ func (j *CronJob) scheduledBackupsRunningCount() int { backupsList := &apiv1alpha1.BackupList{} // select all backups with labels recurrent=true and and not completed of the cluster selector := j.backupSelector() - client.MatchingFields{"status.completed": "false"}.ApplyToList(selector) + // Because k8s do not support fieldSelector with custom resources + // https://github.com/kubernetes/kubernetes/issues/51046 + // So this cannot use fields selector. + // client.MatchingFields{"status.completed": "false"}.ApplyToList(selector) - if err := j.Client.List(context.TODO(), backupsList, selector); err != nil { + if err := j.Client.List(context.TODO(), backupsList); err != nil { log.Error(err, "failed getting backups", "selector", selector) return 0 } - - return len(backupsList.Items) + var rest []apiv1alpha1.Backup + for _, b := range backupsList.Items { + if !b.Status.Completed { + rest = append(rest, b) + } + } + return len(rest) } func (j *CronJob) backupSelector() *client.ListOptions { @@ -133,10 +141,12 @@ func (j *CronJob) createBackup() (*apiv1alpha1.Backup, error) { //RemoteDeletePolicy: j.BackupRemoteDeletePolicy, HostName: fmt.Sprintf("%s-mysql-0", j.ClusterName), }, + Status: apiv1alpha1.BackupStatus{Completed: false}, } if len(j.NFSServerAddress) > 0 { backup.Spec.NFSServerAddress = j.NFSServerAddress } + return backup, j.Client.Create(context.TODO(), backup) } diff --git a/backup/syncer/job.go b/backup/syncer/job.go index dde56efc..3396860f 100644 --- a/backup/syncer/job.go +++ b/backup/syncer/job.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" v1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" "github.com/radondb/radondb-mysql-kubernetes/backup" @@ -101,6 +102,9 @@ func (s *jobSyncer) updateStatus(job *batchv1.Job) { if backType := s.job.Annotations[utils.JobAnonationType]; backType != "" { s.backup.Status.BackupType = backType } + if gtid := s.job.Annotations[utils.JobAnonationGtid]; gtid != "" { + s.backup.Status.Gtid = gtid + } } // check for failed condition @@ -152,13 +156,15 @@ func (s *jobSyncer) ensurePodSpec(in corev1.PodSpec) corev1.PodSpec { "/bin/bash", "-c", "--", } backupToDir, DateTime := utils.BuildBackupName(s.backup.Spec.ClusterName) + // add the gtid script strAnnonations := fmt.Sprintf(`curl -X PATCH -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" -H "Content-Type: application/json-patch+json" \ --cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/apis/batch/v1/namespaces/%s/jobs/%s \ - -d '[{"op": "add", "path": "/metadata/annotations/backupName", "value": "%s"}, {"op": "add", "path": "/metadata/annotations/backupDate", "value": "%s"}, {"op": "add", "path": "/metadata/annotations/backupType", "value": "NFS"}]';`, - s.backup.Namespace, s.backup.GetNameForJob(), backupToDir, DateTime) + -d "[{\"op\": \"add\", \"path\": \"/metadata/annotations/backupName\", \"value\": \"%s\"}, {\"op\": \"add\", \"path\": \"/metadata/annotations/backupDate\", \"value\": \"%s\"},{\"op\": \"add\", \"path\": \"/metadata/annotations/gtid\", \"value\": \"$( cat /backup/%s/xtrabackup_binlog_info|awk '{print $3}')\"}, {\"op\": \"add\", \"path\": \"/metadata/annotations/backupType\", \"value\": \"NFS\"}]";`, + s.backup.Namespace, s.backup.GetNameForJob(), backupToDir, DateTime, backupToDir) + log.Log.Info(strAnnonations) // Add the check DiskUsage // use expr because shell cannot compare float number - checkUsage := `[ $(expr $(df /backup|awk 'NR>1 {print $4}') \> $(du /backup |awk 'END {if (NR > 1) {print $1 /(NR-1)} else print 0}')) -eq '1' ] || { echo disk available may be too small; exit 1;};` + checkUsage := `[ $(expr $(df /backup|awk 'NR>1 {print $4}') \> $(echo $(du -d1 /backup |awk 'END {if (NR > 1) {print $1 /(NR-1)} else print 0}')|cut -d. -f1)) -eq '1' ] || { echo disk available may be too small; exit 1;};` in.Containers[0].Args = []string{ checkUsage + fmt.Sprintf("mkdir -p /backup/%s;"+ "curl --user $BACKUP_USER:$BACKUP_PASSWORD %s/download|xbstream -x -C /backup/%s; err1=${PIPESTATUS[0]};"+ diff --git a/charts/mysql-operator/crds/mysql.radondb.com_backups.yaml b/charts/mysql-operator/crds/mysql.radondb.com_backups.yaml index dbe5e0b6..29791d70 100644 --- a/charts/mysql-operator/crds/mysql.radondb.com_backups.yaml +++ b/charts/mysql-operator/crds/mysql.radondb.com_backups.yaml @@ -125,6 +125,9 @@ spec: - type type: object type: array + gtid: + description: Get the Gtid + type: string required: - completed type: object diff --git a/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml b/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml index 98067a36..c5856046 100644 --- a/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml +++ b/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml @@ -1394,6 +1394,11 @@ spec: - type type: object type: array + lastbackup: + description: LastBackup + type: string + lastbackupGtid: + type: string nodes: description: Nodes contains the list of the node status fulfilled. items: diff --git a/config/crd/bases/mysql.radondb.com_backups.yaml b/config/crd/bases/mysql.radondb.com_backups.yaml index dbe5e0b6..29791d70 100644 --- a/config/crd/bases/mysql.radondb.com_backups.yaml +++ b/config/crd/bases/mysql.radondb.com_backups.yaml @@ -125,6 +125,9 @@ spec: - type type: object type: array + gtid: + description: Get the Gtid + type: string required: - completed type: object diff --git a/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml b/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml index 98067a36..c5856046 100644 --- a/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml +++ b/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml @@ -1394,6 +1394,11 @@ spec: - type type: object type: array + lastbackup: + description: LastBackup + type: string + lastbackupGtid: + type: string nodes: description: Nodes contains the list of the node status fulfilled. items: diff --git a/controllers/backupcron_controller.go b/controllers/backupcron_controller.go index ae315e7d..952ff6ab 100644 --- a/controllers/backupcron_controller.go +++ b/controllers/backupcron_controller.go @@ -120,7 +120,7 @@ func (r *BackupCronReconciler) updateClusterSchedule(ctx context.Context, cluste log.V(1).Info("cluster already added to cron.", "key", cluster) // change scheduler for already added crons - if !reflect.DeepEqual(entry.Schedule, schedule) { + if !reflect.DeepEqual(entry.Schedule, schedule) || j.NFSServerAddress != cluster.Spec.NFSServerAddress { log.Info("update cluster scheduler", "key", cluster, "scheduler", cluster.Spec.BackupSchedule) diff --git a/mysqlcluster/syncer/status.go b/mysqlcluster/syncer/status.go index c60375a0..298a6167 100644 --- a/mysqlcluster/syncer/status.go +++ b/mysqlcluster/syncer/status.go @@ -19,12 +19,14 @@ package syncer import ( "context" "fmt" + "sort" "strconv" "time" "github.com/presslabs/controller-util/pkg/syncer" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -153,11 +155,44 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { if len(s.Status.Conditions) > maxStatusesQuantity { s.Status.Conditions = s.Status.Conditions[len(s.Status.Conditions)-maxStatusesQuantity:] } - + // update backup Status + s.updateLastBackup() // Update all nodes' status. return syncer.SyncResult{}, s.updateNodeStatus(ctx, s.cli, list.Items) } +func (s *StatusSyncer) updateLastBackup() error { + // 1. fetch all finished backup cr + backupsList := &apiv1alpha1.BackupList{} + labelSet := labels.Set{"cluster": s.Name} + if err := s.cli.List(context.TODO(), backupsList, &client.ListOptions{ + Namespace: s.Namespace, LabelSelector: labelSet.AsSelector(), + }); err != nil { + return err + } + var finisheds []apiv1alpha1.Backup + for _, b := range backupsList.Items { + if b.Status.Completed { + finisheds = append(finisheds, b) + } + } + // 2. sort descent + sort.Slice(finisheds, func(i, j int) bool { + return finisheds[i].ObjectMeta.CreationTimestamp.Before(&finisheds[j].ObjectMeta.CreationTimestamp) + }) + // 3. get first backup which has backup Name + for _, b := range finisheds { + if len(b.Status.BackupName) != 0 { + s.Status.LastBackup = b.Status.BackupName + s.Status.LastBackupGtid = b.Status.Gtid + break + } + + } + + return nil +} + // updateClusterStatus update the cluster status and returns condition. func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition { clusterCondition := apiv1alpha1.ClusterCondition{ diff --git a/sidecar/server.go b/sidecar/server.go index 8eccc654..24e4d67c 100644 --- a/sidecar/server.go +++ b/sidecar/server.go @@ -103,11 +103,11 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "Not authenticated!", http.StatusForbidden) return } - backName, Datetime, err := RunTakeBackupCommand(s.cfg) + backName, Datetime, Gtid, err := RunTakeBackupCommand(s.cfg) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } else { - msg, _ := json.Marshal(utils.JsonResult{Status: backupSuccessful, BackupName: backName, Date: Datetime}) + msg, _ := json.Marshal(utils.JsonResult{Status: backupSuccessful, BackupName: backName, Gtid: Gtid, Date: Datetime}) w.Write(msg) } } @@ -211,7 +211,7 @@ func transportWithTimeout(connectTimeout time.Duration) http.RoundTripper { } } -func setAnnonations(cfg *Config, backname string, DateTime string, BackupType string) error { +func setAnnonations(cfg *Config, backname string, DateTime string, Gtid, BackupType string) error { config, err := rest.InClusterConfig() if err != nil { return err @@ -231,6 +231,7 @@ func setAnnonations(cfg *Config, backname string, DateTime string, BackupType st } job.Annotations[utils.JobAnonationName] = backname job.Annotations[utils.JobAnonationDate] = DateTime + job.Annotations[utils.JobAnonationGtid] = Gtid job.Annotations[utils.JobAnonationType] = BackupType _, err = clientset.BatchV1().Jobs(cfg.NameSpace).Update(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { @@ -266,7 +267,7 @@ func requestABackup(cfg *Config, host string, endpoint string) (*http.Response, var result utils.JsonResult json.NewDecoder(resp.Body).Decode(&result) - err = setAnnonations(cfg, result.BackupName, result.Date, "S3") // set annotation + err = setAnnonations(cfg, result.BackupName, result.Date, result.Gtid, "S3") // set annotation if err != nil { return nil, fmt.Errorf("fail to set annotation: %s", err) } diff --git a/sidecar/takebackup.go b/sidecar/takebackup.go index c06a3ee5..5a84c51e 100644 --- a/sidecar/takebackup.go +++ b/sidecar/takebackup.go @@ -17,36 +17,66 @@ limitations under the License. package sidecar import ( + "bufio" + "fmt" "os" "os/exec" "strings" + "sync" ) // RunTakeBackupCommand starts a backup command -func RunTakeBackupCommand(cfg *Config) (string, string, error) { +func RunTakeBackupCommand(cfg *Config) (string, string, string, error) { // cfg->XtrabackupArgs() xtrabackup := exec.Command(xtrabackupCommand, cfg.XtrabackupArgs()...) var err error backupName, DateTime := cfg.XBackupName() + Gtid := "" xcloud := exec.Command(xcloudCommand, cfg.XCloudArgs(backupName)...) log.Info("xargs ", "xargs", strings.Join(cfg.XCloudArgs(backupName), " ")) if xcloud.Stdin, err = xtrabackup.StdoutPipe(); err != nil { log.Error(err, "failed to pipline") - return "", "", err + return "", "", "", err } - xtrabackup.Stderr = os.Stderr + //xtrabackup.Stderr = os.Stderr xcloud.Stderr = os.Stderr + var wg sync.WaitGroup + Stderr, err := xtrabackup.StderrPipe() + if err != nil { + return "", "", "", fmt.Errorf("RunCommand: cmd.StderrPipe(): %v", err) + } if err := xtrabackup.Start(); err != nil { log.Error(err, "failed to start xtrabackup command") - return "", "", err + return "", "", "", err } if err := xcloud.Start(); err != nil { log.Error(err, "fail start xcloud ") - return "", "", err + return "", "", "", err } + scanner := bufio.NewScanner(Stderr) + //scanner.Split(ScanLinesR) + wg.Add(1) + go func() { + for scanner.Scan() { + text := scanner.Text() + fmt.Println(text) + if index := strings.Index(text, "GTID"); index != -1 { + // Mysql5.7 examples: MySQL binlog position: filename 'mysql-bin.000002', position '588', GTID of the last change '319bd6eb-2ea2-11ed-bf40-7e1ef582b427:1-2' + // MySQL8.0 no gtid: MySQL binlog position: filename 'mysql-bin.000025', position '156' + length := len("GTID of the last change") + Gtid = strings.Trim(text[index+length:], " '") // trim space and \' + if len(Gtid) != 0 { + log.Info("Catch gtid: " + Gtid) + } + } + } + wg.Done() + }() + + wg.Wait() // pipe command fail one, whole things fail errorChannel := make(chan error, 2) go func() { @@ -55,11 +85,18 @@ func RunTakeBackupCommand(cfg *Config) (string, string, error) { go func() { errorChannel <- xtrabackup.Wait() }() + defer xtrabackup.Wait() + defer xcloud.Wait() for i := 0; i < 2; i++ { if err = <-errorChannel; err != nil { - return "", "", err + log.Info("catch error , need to stop") + _ = xtrabackup.Process.Kill() + _ = xcloud.Process.Kill() + + return "", "", "", err } } - return backupName, DateTime, nil + + return backupName, DateTime, Gtid, nil } diff --git a/utils/constants.go b/utils/constants.go index 8cab16bf..b80ec50e 100644 --- a/utils/constants.go +++ b/utils/constants.go @@ -163,6 +163,8 @@ const ( JobAnonationName = "backupName" // Job Annonations date JobAnonationDate = "backupDate" + // Job Anonations Gtid + JobAnonationGtid = "gtid" // Job Annonations type JobAnonationType = "backupType" ) @@ -197,6 +199,7 @@ const ( type JsonResult struct { Status string `json:"status"` BackupName string `json:"backupName"` + Gtid string `json:"gtid"` Date string `json:"date"` }