Skip to content

Commit

Permalink
feat(auto-balance): implement file local sync to added replica
Browse files Browse the repository at this point in the history
longhorn/longhorn-4105

Signed-off-by: Chin-Ya Huang <[email protected]>
  • Loading branch information
c3y1huang committed Jun 21, 2024
1 parent 98397fe commit f6920e2
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 9 deletions.
56 changes: 54 additions & 2 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"path/filepath"
"reflect"
"regexp"
"strconv"
Expand Down Expand Up @@ -1909,17 +1910,22 @@ func (ec *EngineController) startRebuilding(e *longhorn.Engine, replicaName, add
return
}

localSync, err := ec.getFileLocalSync(replica, e)
if err != nil {
ec.logger.WithError(err).Errorf("Failed to initiate file local sync for replica %v, use remote sync", replicaName)
}

// start rebuild
if e.Spec.RequestedBackupRestore != "" {
if e.Spec.NodeID != "" {
ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonRebuilding,
"Start rebuilding replica %v with Address %v for restore engine %v and volume %v", replicaName, addr, e.Name, e.Spec.VolumeName)
err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, true, fastReplicaRebuild, fileSyncHTTPClientTimeout, 0)
err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, true, fastReplicaRebuild, localSync, fileSyncHTTPClientTimeout, 0)
}
} else {
ec.eventRecorder.Eventf(e, corev1.EventTypeNormal, constant.EventReasonRebuilding,
"Start rebuilding replica %v with Address %v for normal engine %v and volume %v", replicaName, addr, e.Name, e.Spec.VolumeName)
err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, false, fastReplicaRebuild, fileSyncHTTPClientTimeout, grpcTimeoutSeconds)
err = engineClientProxy.ReplicaAdd(e, replicaName, replicaURL, false, fastReplicaRebuild, localSync, fileSyncHTTPClientTimeout, grpcTimeoutSeconds)
}
if err != nil {
replicaRebuildErrMsg := err.Error()
Expand Down Expand Up @@ -1993,6 +1999,52 @@ func (ec *EngineController) startRebuilding(e *longhorn.Engine, replicaName, add
return nil
}

// getFileLocalSync retrieves details for local file sync between the target replica
// and another eligible replica on the same node. It returns an object with the source
// and target paths for the local sync, or nil if no other eligible replica is found.
// Any error encountered during the process is returned.
func (ec *EngineController) getFileLocalSync(targetReplica *longhorn.Replica, engine *longhorn.Engine) (*etypes.FileLocalSync, error) {
// Retrieve a map of replicas grouped by node for the engine's volume
replicaMapByNode, err := ec.ds.ListVolumeReplicasROMapByNode(engine.Spec.VolumeName)
if err != nil {
return nil, err
}

// Check if there are multiple replicas on the target replica's node,
// do nothing if there is no other available replica to sync locally from.
if len(replicaMapByNode[targetReplica.Spec.InstanceSpec.NodeID]) <= 1 {
return nil, nil
}

localSync := &etypes.FileLocalSync{}

for _, nodeReplica := range replicaMapByNode[targetReplica.Spec.InstanceSpec.NodeID] {
// Skip the target replica itself
if nodeReplica.Name == targetReplica.Name {
continue
}

// Skip replicas marked for deletion
if nodeReplica.DeletionTimestamp != nil {
continue
}

// Skip replicas that are not in the running state
if nodeReplica.Status.CurrentState != longhorn.InstanceStateRunning {
continue
}

// Set the local sync details
localSync = &etypes.FileLocalSync{
Source: filepath.Join(nodeReplica.Spec.DiskPath, "replicas", nodeReplica.Spec.DataDirectoryName),
Target: filepath.Join(targetReplica.Spec.DiskPath, "replicas", targetReplica.Spec.DataDirectoryName),
}
break
}

return localSync, nil
}

// updateReplicaRebuildFailedCondition updates the rebuild failed condition if replica rebuilding failed
func (ec *EngineController) updateReplicaRebuildFailedCondition(replica *longhorn.Replica, errMsg string) (*longhorn.Replica, error) {
replicaRebuildFailedReason, conditionStatus, err := ec.getReplicaRebuildFailedReason(replica.Spec.NodeID, errMsg)
Expand Down
36 changes: 36 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,42 @@ func (s *DataStore) ListVolumeReplicasRO(volumeName string) (map[string]*longhor
return rMap, nil
}

// ListVolumeReplicasROMapByNode returns a map of read-only replicas grouped by
// the node ID for the given volume.
// The function organizes the replicas into a map where the keys is the node ID,
// and the values are maps of replica names to the replica objects.
// If successful, the function returns the map of replicas by node. Otherwise,
// an error is returned.
func (s *DataStore) ListVolumeReplicasROMapByNode(volumeName string) (map[string]map[string]*longhorn.Replica, error) {
// Get volume selector
selector, err := getVolumeSelector(volumeName)
if err != nil {
return nil, err
}

// List replicas based on the volume selector
replicaList, err := s.replicaLister.Replicas(s.namespace).List(selector)
if err != nil {
return nil, err
}

// Organize the replicas by node
replicaMapByNode := make(map[string]map[string]*longhorn.Replica)
for _, replica := range replicaList {
nodeID := replica.Spec.NodeID

// Create the map if it doesn't exist for the current replica's node
if _, exists := replicaMapByNode[nodeID]; !exists {
replicaMapByNode[nodeID] = make(map[string]*longhorn.Replica)
}

// Add the replica to the map
replicaMapByNode[nodeID][replica.Name] = replica
}

return replicaMapByNode, nil
}

// ReplicaAddressToReplicaName will directly return the address if the format
// is invalid or the replica is not found.
func ReplicaAddressToReplicaName(address string, rs []*longhorn.Replica) string {
Expand Down
4 changes: 3 additions & 1 deletion engineapi/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/pkg/errors"

etypes "github.com/longhorn/longhorn-engine/pkg/types"

lhexec "github.com/longhorn/go-common-libs/exec"
lhtypes "github.com/longhorn/go-common-libs/types"
imutil "github.com/longhorn/longhorn-instance-manager/pkg/util"
Expand Down Expand Up @@ -133,7 +135,7 @@ func (e *EngineBinary) ReplicaList(*longhorn.Engine) (map[string]*Replica, error

// ReplicaAdd calls engine binary
// TODO: Deprecated, replaced by gRPC proxy
func (e *EngineBinary) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error {
func (e *EngineBinary) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error {
// Ignore grpcTimeoutSeconds because we expect that longhorn manager should use proxy gRPC to communicate with
// engine/replica who understands this field
if err := ValidateReplicaURL(url); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions engineapi/enginesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/sirupsen/logrus"

etypes "github.com/longhorn/longhorn-engine/pkg/types"

"github.com/longhorn/longhorn-manager/datastore"
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)
Expand Down Expand Up @@ -45,7 +47,7 @@ func (c *EngineSimulatorCollection) CreateEngineSimulator(request *EngineSimulat
mutex: &sync.RWMutex{},
}
for _, addr := range request.ReplicaAddrs {
if err := s.ReplicaAdd(&longhorn.Engine{}, "", addr, false, false, 30, 0); err != nil {
if err := s.ReplicaAdd(&longhorn.Engine{}, "", addr, false, false, nil, 30, 0); err != nil {
return err
}
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (e *EngineSimulator) ReplicaList(*longhorn.Engine) (map[string]*Replica, er
return ret, nil
}

func (e *EngineSimulator) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, replicaFileSyncHTTPClientTimeout int64, grpcTimeoutSeconds int64) error {
func (e *EngineSimulator) ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout int64, grpcTimeoutSeconds int64) error {
e.mutex.Lock()
defer e.mutex.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion engineapi/enginesim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *TestSuite) TestBasic(c *C) {
c.Assert(replicas, HasLen, 1)
c.Assert(replicas[Replica1Addr].Mode, Equals, longhorn.ReplicaModeRW)

_ = sim.ReplicaAdd(e, "", Replica3Addr, false, false, 30, 0)
_ = sim.ReplicaAdd(e, "", Replica3Addr, false, false, nil, 30, 0)
replicas, err = sim.ReplicaList(e)
c.Assert(err, IsNil)
c.Assert(replicas, HasLen, 2)
Expand Down
6 changes: 4 additions & 2 deletions engineapi/proxy_replica.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package engineapi

import (
etypes "github.com/longhorn/longhorn-engine/pkg/types"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) (err error) {
func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) (err error) {
return p.grpcClient.ReplicaAdd(string(e.Spec.DataEngine), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize,
int(replicaFileSyncHTTPClientTimeout), fastSync, grpcTimeoutSeconds)
int(replicaFileSyncHTTPClientTimeout), fastSync, localSync, grpcTimeoutSeconds)
}

func (p *Proxy) ReplicaRemove(e *longhorn.Engine, address string) (err error) {
Expand Down
4 changes: 3 additions & 1 deletion engineapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
spdkdevtypes "github.com/longhorn/go-spdk-helper/pkg/types"

emeta "github.com/longhorn/longhorn-engine/pkg/meta"
etypes "github.com/longhorn/longhorn-engine/pkg/types"

"github.com/longhorn/longhorn-manager/types"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
Expand Down Expand Up @@ -84,7 +86,7 @@ type EngineClient interface {
VolumeSnapshotMaxSizeSet(engine *longhorn.Engine) error

ReplicaList(*longhorn.Engine) (map[string]*Replica, error)
ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error
ReplicaAdd(engine *longhorn.Engine, replicaName, url string, isRestoreVolume, fastSync bool, localSync *etypes.FileLocalSync, replicaFileSyncHTTPClientTimeout, grpcTimeoutSeconds int64) error
ReplicaRemove(engine *longhorn.Engine, url string) error
ReplicaRebuildStatus(*longhorn.Engine) (map[string]*longhorn.RebuildStatus, error)
ReplicaRebuildVerify(engine *longhorn.Engine, replicaName, url string) error
Expand Down

0 comments on commit f6920e2

Please sign in to comment.