Skip to content

Commit

Permalink
Convergence part 2: Passthrough store, cloning
Browse files Browse the repository at this point in the history
  • Loading branch information
clintonk committed Nov 14, 2017
1 parent d3ddf4e commit 3c65693
Show file tree
Hide file tree
Showing 25 changed files with 2,056 additions and 208 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

dvp "github.com/netapp/netappdvp/storage_drivers"
"github.com/netapp/trident/utils"
)

Expand Down Expand Up @@ -84,6 +85,9 @@ var (
TransactionURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/txn"
StorageClassURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/storageclass"
StoreURL = "/" + OrchestratorName + "/store"

UsingPassthroughStore bool
DriverContext dvp.DriverContext
)

func IsValidProtocol(p Protocol) bool {
Expand Down
253 changes: 173 additions & 80 deletions core/orchestrator_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ func (o *tridentOrchestrator) bootstrapBackends() error {
if err != nil {
return err
}
_, err = o.AddStorageBackend(serializedConfig)
newBackendExternal, err := o.AddStorageBackend(serializedConfig)
if err != nil {
return err
}

// Note that AddStorageBackend returns an external copy of the newly
// added backend, so we have to go fetch it manually.
newBackend := o.backends[b.Name]
newBackend := o.backends[newBackendExternal.Name]
newBackend.Online = b.Online
log.WithFields(log.Fields{
"backend": b.Name,
"backend": newBackend.Name,
"handler": "Bootstrap",
}).Info("Added an existing backend.")
}
Expand Down Expand Up @@ -164,6 +164,15 @@ func (o *tridentOrchestrator) bootstrapVolumes() error {
return err
}
for _, v := range volumes {

log.WithFields(log.Fields{
"name": v.Config.Name,
"internalName": v.Config.InternalName,
"size": v.Config.Size,
"backend": v.Backend,
"pool": v.Pool,
}).Debug("Bootstrapping volume.")

// TODO: If the API evolves, check the Version field here.
var backend *storage.StorageBackend
var ok bool
Expand Down Expand Up @@ -561,95 +570,29 @@ func (o *tridentOrchestrator) AddVolume(volumeConfig *storage.VolumeConfig) (
volumeConfig.StorageClass)
}

// Check if an addVolume transaction already exists for this name.
// If so, we failed earlier and we need to call the bootstrap cleanup code.
// If this fails, return an error. If it succeeds or no transaction
// existed, log a new transaction in the persistent store and proceed.
volTxn := &persistent_store.VolumeTransaction{
Config: volumeConfig,
Op: persistent_store.AddVolume,
}
oldTxn, err := o.storeClient.GetExistingVolumeTransaction(volTxn)
if err != nil {
log.Warnf("Unable to check for existing volume transactions: %v", err)
return nil, err
}
if oldTxn != nil {
err = o.rollBackTransaction(oldTxn)
if err != nil {
return nil, fmt.Errorf("Unable to roll back existing transaction "+
"for volume %s: %v", volumeConfig.Name, err)
}
}

err = o.storeClient.AddVolumeTransaction(volTxn)
// Add transaction in case the operation must be rolled back later
volTxn, err := o.addVolumeTransaction(volumeConfig)
if err != nil {
return nil, err
}

// Recovery function in case of error
defer func() {
var (
cleanupErr, txErr error
)
if err != nil {
// We failed somewhere. There are two possible cases:
// 1. We failed to allocate on a backend and fell through to the
// end of the function. In this case, we don't need to roll
// anything back.
// 2. We failed to add the volume to etcd. In this case, we need
// to remove the volume from the backend.
// If we succeeded in adding the volume to etcd, err will not be
// nil by the time we get here; we can only fail at removing the
// volume txn at this point.
if backend != nil && vol != nil {
// We succeeded in adding the volume to the backend; now
// delete it
cleanupErr = backend.RemoveVolume(vol)
if cleanupErr != nil {
cleanupErr = fmt.Errorf("Unable to delete volume "+
"from backend during cleanup: %v", cleanupErr)
}
}
}
if cleanupErr == nil {
// Only clean up the volume transaction if we've succeeded at
// cleaning up on the backend or if we didn't need to do so in the
// first place.
txErr = o.storeClient.DeleteVolumeTransaction(volTxn)
if txErr != nil {
fmt.Errorf("Unable to clean up transaction: %v", txErr)
}
}
if cleanupErr != nil || txErr != nil {
// Remove the volume from memory, if it's there, so that the user
// can try to re-add. This will trigger recovery code.
delete(o.volumes, volumeConfig.Name)
externalVol = nil
// Report on all errors we encountered.
errList := make([]string, 0, 3)
for _, e := range []error{err, cleanupErr, txErr} {
if e != nil {
errList = append(errList, e.Error())
}
}
err = fmt.Errorf("%s", strings.Join(errList, "\n\t"))
}
return
}()
defer func() { o.addVolumeCleanup(err, backend, vol, volTxn, volumeConfig) }()

// Randomize the storage pool list for better distribution of load across
// all pools.
// Randomize the storage pool list for better distribution of load across all pools.
rand.Seed(time.Now().UnixNano())

log.WithFields(log.Fields{
"volume": volumeConfig.Name,
}).Debugf("Looking through %d storage pools.", len(pools))

errorMessages := make([]string, 0)

// Choose a pool at random.
for _, num := range rand.Perm(len(pools)) {
backend = pools[num].Backend
if vol, err = backend.AddVolume(
volumeConfig, pools[num], storageClass.GetAttributes(),
); vol != nil && err == nil {
vol, err = backend.AddVolume(volumeConfig, pools[num], storageClass.GetAttributes())
if vol != nil && err == nil {
if vol.Config.Protocol == config.ProtocolAny {
vol.Config.Protocol = backend.GetProtocol()
}
Expand Down Expand Up @@ -688,6 +631,156 @@ func (o *tridentOrchestrator) AddVolume(volumeConfig *storage.VolumeConfig) (
return nil, err
}

func (o *tridentOrchestrator) CloneVolume(
volumeConfig *storage.VolumeConfig,
) (*storage.VolumeExternal, error) {

var (
backend *storage.StorageBackend
vol *storage.Volume
)
o.mutex.Lock()
defer o.mutex.Unlock()

if _, ok := o.volumes[volumeConfig.Name]; ok {
return nil, fmt.Errorf("Volume %s already exists.", volumeConfig.Name)
}
volumeConfig.Version = config.OrchestratorAPIVersion

// Get the source volume
sourceVolume, found := o.volumes[volumeConfig.SourceName]
if !found {
return nil, fmt.Errorf("Source volume not found: %s", volumeConfig.SourceName)
}
sourceVolumeConfig := sourceVolume.Config

// Clone the source config, as most of its attributes will apply to the clone
cloneConfig := &storage.VolumeConfig{}
sourceVolumeConfig.ConstructClone(cloneConfig)

// Copy a few attributes from the request that will affect clone creation
cloneConfig.Name = volumeConfig.Name
cloneConfig.SplitOnClone = volumeConfig.SplitOnClone
cloneConfig.SourceName = volumeConfig.SourceName
cloneConfig.SourceSnapshotName = volumeConfig.SourceSnapshotName

// Add transaction in case the operation must be rolled back later
volTxn, err := o.addVolumeTransaction(volumeConfig)
if err != nil {
return nil, err
}

// Recovery function in case of error
defer func() { o.addVolumeCleanup(err, backend, vol, volTxn, volumeConfig) }()

backend = sourceVolume.Pool.Backend
vol, err = backend.CloneVolume(cloneConfig, sourceVolume.Pool)
if err != nil {
return nil, fmt.Errorf("Failed to create cloned volume %s "+
"on storage pool %s from backend %s: %v",
cloneConfig.Name, sourceVolume.Pool.Name, backend.Name, err)
}

// Save references to new volume
err = o.storeClient.AddVolume(vol)
if err != nil {
return nil, err
}
o.volumes[cloneConfig.Name] = vol

return vol.ConstructExternal(), nil
}

// addVolumeTransaction is called from the volume create/clone methods to save
// a record of the operation in case it fails and must be cleaned up later.
func (o *tridentOrchestrator) addVolumeTransaction(
volumeConfig *storage.VolumeConfig,
) (*persistent_store.VolumeTransaction, error) {

// Check if an addVolume transaction already exists for this name.
// If so, we failed earlier and we need to call the bootstrap cleanup code.
// If this fails, return an error. If it succeeds or no transaction
// existed, log a new transaction in the persistent store and proceed.
volTxn := &persistent_store.VolumeTransaction{
Config: volumeConfig,
Op: persistent_store.AddVolume,
}
oldTxn, err := o.storeClient.GetExistingVolumeTransaction(volTxn)
if err != nil {
log.Warningf("Unable to check for existing volume transactions: %v", err)
return nil, err
}
if oldTxn != nil {
err = o.rollBackTransaction(oldTxn)
if err != nil {
return nil, fmt.Errorf("Unable to roll back existing transaction "+
"for volume %s: %v", volumeConfig.Name, err)
}
}

err = o.storeClient.AddVolumeTransaction(volTxn)
if err != nil {
return nil, err
}

return volTxn, nil
}

// addVolumeCleanup is used as a deferred method from the volume create/clone methods
// to clean up in case anything goes wrong during the operation.
func (o *tridentOrchestrator) addVolumeCleanup(
err error, backend *storage.StorageBackend, vol *storage.Volume,
volTxn *persistent_store.VolumeTransaction, volumeConfig *storage.VolumeConfig) {

var (
cleanupErr, txErr error
)
if err != nil {
// We failed somewhere. There are two possible cases:
// 1. We failed to allocate on a backend and fell through to the
// end of the function. In this case, we don't need to roll
// anything back.
// 2. We failed to add the volume to etcd. In this case, we need
// to remove the volume from the backend.
// If we succeeded in adding the volume to etcd, err will not be
// nil by the time we get here; we can only fail at removing the
// volume txn at this point.
if backend != nil && vol != nil {
// We succeeded in adding the volume to the backend; now
// delete it
cleanupErr = backend.RemoveVolume(vol)
if cleanupErr != nil {
cleanupErr = fmt.Errorf("Unable to delete volume "+
"from backend during cleanup: %v", cleanupErr)
}
}
}
if cleanupErr == nil {
// Only clean up the volume transaction if we've succeeded at
// cleaning up on the backend or if we didn't need to do so in the
// first place.
txErr = o.storeClient.DeleteVolumeTransaction(volTxn)
if txErr != nil {
fmt.Errorf("Unable to clean up transaction: %v", txErr)
}
}
if cleanupErr != nil || txErr != nil {
// Remove the volume from memory, if it's there, so that the user
// can try to re-add. This will trigger recovery code.
delete(o.volumes, volumeConfig.Name)
//externalVol = nil
// Report on all errors we encountered.
errList := make([]string, 0, 3)
for _, e := range []error{err, cleanupErr, txErr} {
if e != nil {
errList = append(errList, e.Error())
}
}
err = fmt.Errorf("%s", strings.Join(errList, "\n\t"))
}
return
}

func (o *tridentOrchestrator) GetVolume(volume string) *storage.VolumeExternal {
o.mutex.Lock()
defer o.mutex.Unlock()
Expand Down Expand Up @@ -1023,7 +1116,7 @@ func (o *tridentOrchestrator) updateBackendOnPersistentStore(
backend *storage.StorageBackend, newBackend bool,
) error {
// Update the persistent store with the backend information
if o.bootstrapped {
if o.bootstrapped || config.UsingPassthroughStore {
var err error
if newBackend {
err = o.storeClient.AddBackend(backend)
Expand Down
Loading

0 comments on commit 3c65693

Please sign in to comment.