From f6920e22ed085eee9768bf9e13012dbb302e1a3f Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Tue, 16 Jan 2024 13:44:03 +0800 Subject: [PATCH] feat(auto-balance): implement file local sync to added replica longhorn/longhorn-4105 Signed-off-by: Chin-Ya Huang --- controller/engine_controller.go | 56 +++++++++++++++++++++++++++++++-- datastore/longhorn.go | 36 +++++++++++++++++++++ engineapi/engine.go | 4 ++- engineapi/enginesim.go | 6 ++-- engineapi/enginesim_test.go | 2 +- engineapi/proxy_replica.go | 6 ++-- engineapi/types.go | 4 ++- 7 files changed, 105 insertions(+), 9 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index 886823724e..7e88ad0611 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "path/filepath" "reflect" "regexp" "strconv" @@ -1909,17 +1910,22 @@ func (ec *EngineController) startRebuilding(e *longhorn.Engine, replicaName, add return } + localSync, err := ec.getFileLocalSync(replica, e) + if err != nil { + ec.logger.WithError(err).Errorf("Failed to initiate file local sync for replica %v, use remote sync", replicaName) + } + // start rebuild if e.Spec.RequestedBackupRestore != "" { if e.Spec.NodeID != "" { ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonRebuilding, "Start rebuilding replica %v with Address %v for restore engine %v and volume %v", replicaName, addr, e.Name, e.Spec.VolumeName) - err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, true, fastReplicaRebuild, fileSyncHTTPClientTimeout, 0) + err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, true, fastReplicaRebuild, localSync, fileSyncHTTPClientTimeout, 0) } } else { ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonRebuilding, "Start rebuilding replica %v with Address %v for normal engine %v and volume %v", replicaName, addr, e.Name, e.Spec.VolumeName) - err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, false, fastReplicaRebuild, fileSyncHTTPClientTimeout, grpcTimeoutSeconds) + err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, false, fastReplicaRebuild, localSync, fileSyncHTTPClientTimeout, grpcTimeoutSeconds) } if err != nil { replicaRebuildErrMsg := err.Error() @@ -1993,6 +1999,52 @@ func (ec *EngineController) startRebuilding(e *longhorn.Engine, replicaName, add return nil } +// getFileLocalSync retrieves details for local file sync between the target replica +// and another eligible replica on the same node. It returns an object with the source +// and target paths for the local sync, or nil if no other eligible replica is found. +// Any error encountered during the process is returned. +func (ec *EngineController) getFileLocalSync(targetReplica *longhorn.Replica, engine *longhorn.Engine) (*etypes.FileLocalSync, error) { + // Retrieve a map of replicas grouped by node for the engine's volume + replicaMapByNode, err := ec.ds.ListVolumeReplicasROMapByNode(engine.Spec.VolumeName) + if err != nil { + return nil, err + } + + // Check if there are multiple replicas on the target replica's node, + // do nothing if there is no other available replica to sync locally from. + if len(replicaMapByNode[targetReplica.Spec.InstanceSpec.NodeID]) <= 1 { + return nil, nil + } + + localSync := &etypes.FileLocalSync{} + + for _, nodeReplica := range replicaMapByNode[targetReplica.Spec.InstanceSpec.NodeID] { + // Skip the target replica itself + if nodeReplica.Name == targetReplica.Name { + continue + } + + // Skip replicas marked for deletion + if nodeReplica.DeletionTimestamp != nil { + continue + } + + // Skip replicas that are not in the running state + if nodeReplica.Status.CurrentState != longhorn.InstanceStateRunning { + continue + } + + // Set the local sync details + localSync = &etypes.FileLocalSync{ + Source: filepath.Join(nodeReplica.Spec.DiskPath, "replicas", nodeReplica.Spec.DataDirectoryName), + Target: filepath.Join(targetReplica.Spec.DiskPath, "replicas", targetReplica.Spec.DataDirectoryName), + } + break + } + + return localSync, nil +} + // updateReplicaRebuildFailedCondition updates the rebuild failed condition if replica rebuilding failed func (ec *EngineController) updateReplicaRebuildFailedCondition(replica *longhorn.Replica, errMsg string) (*longhorn.Replica, error) { replicaRebuildFailedReason, conditionStatus, err := ec.getReplicaRebuildFailedReason(replica.Spec.NodeID, errMsg) diff --git a/datastore/longhorn.go b/datastore/longhorn.go index b936ad2dde..099bded69f 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -1515,6 +1515,42 @@ func (s *DataStore) ListVolumeReplicasRO(volumeName string) (map[string]*longhor return rMap, nil } +// ListVolumeReplicasROMapByNode returns a map of read-only replicas grouped by +// the node ID for the given volume. +// The function organizes the replicas into a map where the keys is the node ID, +// and the values are maps of replica names to the replica objects. +// If successful, the function returns the map of replicas by node. Otherwise, +// an error is returned. +func (s *DataStore) ListVolumeReplicasROMapByNode(volumeName string) (map[string]map[string]*longhorn.Replica, error) { + // Get volume selector + selector, err := getVolumeSelector(volumeName) + if err != nil { + return nil, err + } + + // List replicas based on the volume selector + replicaList, err := s.replicaLister.Replicas(s.namespace).List(selector) + if err != nil { + return nil, err + } + + // Organize the replicas by node + replicaMapByNode := make(map[string]map[string]*longhorn.Replica) + for _, replica := range replicaList { + nodeID := replica.Spec.NodeID + + // Create the map if it doesn't exist for the current replica's node + if _, exists := replicaMapByNode[nodeID]; !exists { + replicaMapByNode[nodeID] = make(map[string]*longhorn.Replica) + } + + // Add the replica to the map + replicaMapByNode[nodeID][replica.Name] = replica + } + + return replicaMapByNode, nil +} + // ReplicaAddressToReplicaName will directly return the address if the format // is invalid or the replica is not found. func ReplicaAddressToReplicaName(address string, rs []*longhorn.Replica) string { diff --git a/engineapi/engine.go b/engineapi/engine.go index b14e7e74be..7769ccf828 100644 --- a/engineapi/engine.go +++ b/engineapi/engine.go @@ -10,6 +10,8 @@ import ( "github.com/pkg/errors" + etypes "github.com/longhorn/longhorn-engine/pkg/types" + lhexec "github.com/longhorn/go-common-libs/exec" lhtypes "github.com/longhorn/go-common-libs/types" imutil "github.com/longhorn/longhorn-instance-manager/pkg/util" @@ -133,7 +135,7 @@ func (e *EngineBinary) ReplicaList(*longhorn.Engine) (map[string]*Replica, error // ReplicaAdd calls engine binary // TODO: Deprecated, replaced by gRPC proxy -func (e *EngineBinary) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error { +func (e *EngineBinary) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error { // Ignore grpcTimeoutSeconds because we expect that longhorn manager should use proxy gRPC to communicate with // engine/replica who understands this field if err := ValidateReplicaURL(url); err != nil { diff --git a/engineapi/enginesim.go b/engineapi/enginesim.go index e6c0815a9b..7b0ea7d0d6 100644 --- a/engineapi/enginesim.go +++ b/engineapi/enginesim.go @@ -6,6 +6,8 @@ import ( "github.com/sirupsen/logrus" + etypes "github.com/longhorn/longhorn-engine/pkg/types" + "github.com/longhorn/longhorn-manager/datastore" longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" ) @@ -45,7 +47,7 @@ func (c *EngineSimulatorCollection) CreateEngineSimulator(request *EngineSimulat mutex: &sync.RWMutex{}, } for _, addr := range request.ReplicaAddrs { - if err := s.ReplicaAdd(&longhorn.Engine{}, "", addr, false, false, 30, 0); err != nil { + if err := s.ReplicaAdd(&longhorn.Engine{}, "", addr, false, false, nil, 30, 0); err != nil { return err } } @@ -121,7 +123,7 @@ func (e *EngineSimulator) ReplicaList(*longhorn.Engine) (map[string]*Replica, er return ret, nil } -func (e *EngineSimulator) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, replicaFileSyncHTTPClientTimeout int64, grpcTimeoutSeconds int64) error { +func (e *EngineSimulator) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout int64, grpcTimeoutSeconds int64) error { e.mutex.Lock() defer e.mutex.Unlock() diff --git a/engineapi/enginesim_test.go b/engineapi/enginesim_test.go index 4cc0f5c386..cb6d2f2ef8 100644 --- a/engineapi/enginesim_test.go +++ b/engineapi/enginesim_test.go @@ -64,7 +64,7 @@ func (s *TestSuite) TestBasic(c *C) { c.Assert(replicas, HasLen, 1) c.Assert(replicas[Replica1Addr].Mode, Equals, longhorn.ReplicaModeRW) - _ = sim.ReplicaAdd(e, "", Replica3Addr, false, false, 30, 0) + _ = sim.ReplicaAdd(e, "", Replica3Addr, false, false, nil, 30, 0) replicas, err = sim.ReplicaList(e) c.Assert(err, IsNil) c.Assert(replicas, HasLen, 2) diff --git a/engineapi/proxy_replica.go b/engineapi/proxy_replica.go index 5664d9a197..bb0892a03d 100644 --- a/engineapi/proxy_replica.go +++ b/engineapi/proxy_replica.go @@ -1,13 +1,15 @@ package engineapi import ( + etypes "github.com/longhorn/longhorn-engine/pkg/types" + longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" ) -func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) (err error) { +func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) (err error) { return p.grpcClient.ReplicaAdd(string(e.Spec.DataEngine), e.Name, e.Spec.VolumeName, p.DirectToURL(e), replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize, - int(replicaFileSyncHTTPClientTimeout), fastSync, grpcTimeoutSeconds) + int(replicaFileSyncHTTPClientTimeout), fastSync, localSync, grpcTimeoutSeconds) } func (p *Proxy) ReplicaRemove(e *longhorn.Engine, address string) (err error) { diff --git a/engineapi/types.go b/engineapi/types.go index 8947a1360c..f44fc342e1 100644 --- a/engineapi/types.go +++ b/engineapi/types.go @@ -9,6 +9,8 @@ import ( spdkdevtypes "github.com/longhorn/go-spdk-helper/pkg/types" emeta "github.com/longhorn/longhorn-engine/pkg/meta" + etypes "github.com/longhorn/longhorn-engine/pkg/types" + "github.com/longhorn/longhorn-manager/types" longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" @@ -84,7 +86,7 @@ type EngineClient interface { VolumeSnapshotMaxSizeSet(engine *longhorn.Engine) error ReplicaList(*longhorn.Engine) (map[string]*Replica, error) - ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error + ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error ReplicaRemove(engine *longhorn.Engine, url string) error ReplicaRebuildStatus(*longhorn.Engine) (map[string]*longhorn.RebuildStatus, error) ReplicaRebuildVerify(engine *longhorn.Engine, replicaName, url string) error