Skip to content

Commit

Permalink
feat: introduce snapshot checksum
Browse files Browse the repository at this point in the history
Longhorn 8666, 9488

Signed-off-by: Shuo Wu <[email protected]>
  • Loading branch information
shuo-wu committed Dec 25, 2024
1 parent 8a9085d commit 0b0258a
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Lvol struct {
CreationTime string `json:"creation_time"`
UserCreated bool `json:"user_created"`
SnapshotTimestamp string `json:"snapshot_timestamp"`
SnapshotChecksum string `json:"snapshot_checksum"`
}

func ProtoLvolToLvol(l *spdkrpc.Lvol) *Lvol {
Expand All @@ -56,6 +57,7 @@ func ProtoLvolToLvol(l *spdkrpc.Lvol) *Lvol {
CreationTime: l.CreationTime,
UserCreated: l.UserCreated,
SnapshotTimestamp: l.SnapshotTimestamp,
SnapshotChecksum: l.SnapshotChecksum,
}
}

Expand All @@ -73,6 +75,7 @@ func LvolToProtoLvol(l *Lvol) *spdkrpc.Lvol {
CreationTime: l.CreationTime,
UserCreated: l.UserCreated,
SnapshotTimestamp: l.SnapshotTimestamp,
SnapshotChecksum: l.SnapshotChecksum,
}
}

Expand Down
39 changes: 36 additions & 3 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Replica struct {
State types.InstanceState
ErrorMsg string

IsExposed bool
IsExposed bool
SnapshotChecksumEnabled bool

// constructRequired will be set to true when stopping an errored replica
constructRequired bool
Expand Down Expand Up @@ -190,6 +191,8 @@ func NewReplica(ctx context.Context, replicaName, lvsName, lvsUUID string, specS
SpecSize: roundedSpecSize,
State: types.InstanceStatePending,

SnapshotChecksumEnabled: true,

rebuildingDstCache: RebuildingDstCache{
rebuildingSnapshotMap: map[string]*api.Lvol{},
processedSnapshotList: []string{},
Expand Down Expand Up @@ -230,6 +233,25 @@ func (r *Replica) Sync(spdkClient *spdkclient.Client) (err error) {
return err
}

if r.SnapshotChecksumEnabled {
for _, bdevLvol := range bdevLvolMap {
if !bdevLvol.DriverSpecific.Lvol.Snapshot {
continue
}
if bdevLvol.DriverSpecific.Lvol.Xattrs[spdkclient.SnapshotChecksum] != "" {
continue
}
// TODO: Use a goroutine pool
go func() {
logrus.Debugf("Replica %v is registering checksum for snapshot %v", r.Name, bdevLvol.Aliases[0])
_, err := spdkClient.BdevLvolRegisterSnapshotChecksum(bdevLvol.Aliases[0])
if err != nil {
logrus.Errorf("Replica %v failed to register checksum for snapshot %v: %v", r.Name, bdevLvol.Name, err)
}
}()
}
}

if r.State == types.InstanceStatePending {
return r.construct(bdevLvolMap)
}
Expand Down Expand Up @@ -442,6 +464,16 @@ func compareSvcLvols(prev, cur *Lvol, checkChildren, checkActualSize bool) error
logrus.Warnf("Found mismatching lvol actual size %v with recorded prev lvol actual size %v when validating lvol %s", cur.ActualSize, prev.ActualSize, prev.Name)
}

if prev.SnapshotChecksum == "" {
prev.SnapshotChecksum = cur.SnapshotChecksum
}
if cur.SnapshotChecksum == "" {
prev.SnapshotChecksum = ""
}
if prev.SnapshotChecksum != cur.SnapshotChecksum {
return fmt.Errorf("found mismatching lvol snapshot checksum %v with recorded prev lvol snapshot checksum %v when validating lvol %s", cur.SnapshotChecksum, prev.SnapshotChecksum, prev.Name)
}

return nil
}

Expand Down Expand Up @@ -1086,6 +1118,7 @@ func (r *Replica) SnapshotDelete(spdkClient *spdkclient.Client, snapshotName str
return nil, fmt.Errorf("failed to get the bdev of the only child lvol %s after snapshot %s delete", childSvcLvol.Name, snapshotName)
}
childSvcLvol.ActualSize = bdevLvol[0].DriverSpecific.Lvol.NumAllocatedClusters * defaultClusterSize
childSvcLvol.SnapshotChecksum = ""
}

updateRequired = true
Expand Down Expand Up @@ -1968,9 +2001,9 @@ func (r *Replica) rebuildingDstShallowCopyPrepare(spdkClient *spdkclient.Client,
dstSnapBdevLvol := bdevLvolMap[dstSnapshotLvolName]
snaplvolSnapshotTimestamp := dstSnapBdevLvol.DriverSpecific.Lvol.Xattrs[spdkclient.SnapshotTimestamp]
snaplvolActualSize := dstSnapBdevLvol.DriverSpecific.Lvol.NumAllocatedClusters * defaultClusterSize
// TODO: Verify the checksum
isIntactSnap := srcSnapSvcLvol.SnapshotTimestamp == snaplvolSnapshotTimestamp &&
srcSnapSvcLvol.ActualSize == snaplvolActualSize
srcSnapSvcLvol.ActualSize == snaplvolActualSize &&
srcSnapSvcLvol.SnapshotChecksum != "" && srcSnapSvcLvol.SnapshotChecksum == dstSnapBdevLvol.DriverSpecific.Lvol.Xattrs[spdkclient.SnapshotChecksum]
// For now directly delete the corrupted or outdated snapshot lvol and start a full shallow copy since we cannot validate existing data during the shallow copy
if !isIntactSnap {
for _, childLvolName := range dstSnapBdevLvol.DriverSpecific.Lvol.Clones {
Expand Down
3 changes: 3 additions & 0 deletions pkg/spdk/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Lvol struct {
CreationTime string
UserCreated bool
SnapshotTimestamp string
SnapshotChecksum string
}

func ServiceBackingImageLvolToProtoBackingImageLvol(lvol *Lvol) *spdkrpc.Lvol {
Expand Down Expand Up @@ -117,6 +118,7 @@ func ServiceLvolToProtoLvol(replicaName string, lvol *Lvol) *spdkrpc.Lvol {
CreationTime: lvol.CreationTime,
UserCreated: lvol.UserCreated,
SnapshotTimestamp: lvol.SnapshotTimestamp,
SnapshotChecksum: lvol.SnapshotChecksum,
}

if lvol.Name == replicaName {
Expand Down Expand Up @@ -150,6 +152,7 @@ func BdevLvolInfoToServiceLvol(bdev *spdktypes.BdevInfo) *Lvol {
CreationTime: bdev.CreationTime,
UserCreated: bdev.DriverSpecific.Lvol.Xattrs[spdkclient.UserCreated] == strconv.FormatBool(true),
SnapshotTimestamp: bdev.DriverSpecific.Lvol.Xattrs[spdkclient.SnapshotTimestamp],
SnapshotChecksum: bdev.DriverSpecific.Lvol.Xattrs[spdkclient.SnapshotChecksum],
}
}

Expand Down
50 changes: 47 additions & 3 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ var (

defaultTestExecuteTimeout = 10 * time.Second

defaultTestRebuildingWaitInterval = 3 * time.Second
defaultTestRebuildingWaitCount = 60
defaultTestRebuildingWaitInterval = 3 * time.Second
defaultTestRebuildingWaitCount = 60
defaultTestSnapChecksumWaitInterval = 1 * time.Second
defaultTestSnapChecksumWaitCount = 60
)

func Test(t *testing.T) { TestingT(t) }
Expand Down Expand Up @@ -1382,7 +1384,10 @@ func (s *TestSuite) TestSPDKMultipleThreadFastRebuilding(c *C) {
},
nil)

// Test online rebuilding twice
waitReplicaSnapshotChecksum(c, spdkCli, replicaName1, "")
waitReplicaSnapshotChecksum(c, spdkCli, replicaName2, "")

// Test online rebuilding

// Crash replica1
err = spdkCli.ReplicaDelete(replicaName1, false)
Expand Down Expand Up @@ -1616,6 +1621,45 @@ func checkReplicaSnapshots(c *C, spdkCli *client.SPDKClient, engineName string,
}
}

func waitReplicaSnapshotChecksum(c *C, spdkCli *client.SPDKClient, replicaName, targetSnapName string) {
waitReplicaSnapshotChecksumTimeout(c, spdkCli, replicaName, targetSnapName, defaultTestSnapChecksumWaitCount)
}

func waitReplicaSnapshotChecksumTimeout(c *C, spdkCli *client.SPDKClient, replicaName, targetSnapName string, timeoutInSecond int) {
ticker := time.NewTicker(defaultTestSnapChecksumWaitInterval)
defer ticker.Stop()
timer := time.NewTimer(time.Duration(timeoutInSecond) * time.Second)
defer timer.Stop()

hasChecksum := true
for {
hasChecksum = true
select {
case <-timer.C:
c.Assert(hasChecksum, Equals, true)
return
case <-ticker.C:
replica, err := spdkCli.ReplicaGet(replicaName)
c.Assert(err, IsNil)
if targetSnapName == "" || replica.Snapshots[targetSnapName] != nil {
for snapName, snap := range replica.Snapshots {
if targetSnapName == "" || snapName == targetSnapName {
if snap.SnapshotChecksum == "" {
hasChecksum = false
break
}
}
}
}
}
if hasChecksum {
break
}
}

c.Assert(hasChecksum, Equals, true)
}

func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, engineName string, replicaAddressMap map[string]string) {
ip, err := commonnet.GetAnyExternalIP()
c.Assert(err, IsNil)
Expand Down

0 comments on commit 0b0258a

Please sign in to comment.