Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to skip blocking pod startup if driver is not ready to create a request yet #20

Merged
merged 6 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ type Options struct {
// If not specified, the current operating system's default implementation
// will be used (i.e. 'mount.New("")')
Mounter mount.Interface
// ContinueOnNotReady will cause the driver's nodeserver to continue
// mounting the volume even if the driver is not ready to create a request yet.
// This is useful if you need to defer requesting a certificate until after
// initialization of the Pod (e.g. IPAM so a pod IP is allocated).
// Enabling this option WILL cause a period of time during pod startup whereby
// certificate data is not available in the volume whilst the process is running.
// An `initContainer` or other special logic in the user application must be
// added to avoid running into CrashLoopBackOff situations which can delay pod
// start time.
ContinueOnNotReady bool
}

func New(endpoint string, log logr.Logger, opts Options) (*Driver, error) {
Expand All @@ -75,11 +85,12 @@ func buildServers(opts Options, log logr.Logger) (*identityServer, *controllerSe
opts.Mounter = mount.New("")
}
return NewIdentityServer(opts.DriverName, opts.DriverVersion), &controllerServer{}, &nodeServer{
log: log,
nodeID: opts.NodeID,
manager: opts.Manager,
store: opts.Store,
mounter: opts.Mounter,
log: log,
nodeID: opts.NodeID,
manager: opts.Manager,
store: opts.Store,
mounter: opts.Mounter,
continueOnNotReady: opts.ContinueOnNotReady,
}
}

Expand Down
23 changes: 19 additions & 4 deletions driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type nodeServer struct {
mounter mount.Interface

log logr.Logger

continueOnNotReady bool
}

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Expand Down Expand Up @@ -83,10 +85,23 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

log.Info("Volume registered for management")

if err := wait.PollUntil(time.Second, func() (done bool, err error) {
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
}, ctx.Done()); err != nil {
return nil, err
// Only wait for the volume to be ready if it is in a state of 'ready to request'
// already. This allows implementors to defer actually requesting certificates
// until later in the pod lifecycle (e.g. after CNI has run & an IP address has been
// allocated, if a user wants to embed pod IPs into their requests).
isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId())
if !isReadyToRequest {
log.Info("Unable to request a certificate right now, will be retried", "reason", reason)
}
if isReadyToRequest || !ns.continueOnNotReady {
log.Info("Waiting for certificate to be issued...")
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
}, ctx.Done()); err != nil {
return nil, err
}
} else {
log.Info("Skipping waiting for certificate to be issued")
}

log.Info("Volume ready for mounting")
Expand Down
11 changes: 11 additions & 0 deletions manager/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ type WriteKeypairFunc func(meta metadata.Metadata, key crypto.PrivateKey, chain
// volume being published. Useful for modifying clients to make use of CSI
// token requests.
type ClientForMetadataFunc func(meta metadata.Metadata) (cmclient.Interface, error)

// ReadyToRequestFunc can be optionally implemented by drivers to indicate whether
// the driver is ready to request a certificate for the given volume/metadata.
// This can be used to 'defer' fetching until later pod initialization events have
// happened (e.g. CNI has allocated an IP if you want to embed a pod IP into the certificate
// request resources).
type ReadyToRequestFunc func(meta metadata.Metadata) (bool, string)

func AlwaysReadyToRequest(_ metadata.Metadata) (bool, string) {
return true, ""
}
20 changes: 20 additions & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Options struct {
GenerateRequest GenerateRequestFunc
SignRequest SignRequestFunc
WriteKeypair WriteKeypairFunc
ReadyToRequest ReadyToRequestFunc
}

// NewManager constructs a new manager used to manage volumes containing
Expand Down Expand Up @@ -117,6 +118,9 @@ func NewManager(opts Options) (*Manager, error) {
if opts.WriteKeypair == nil {
return nil, errors.New("WriteKeypair must be set")
}
if opts.ReadyToRequest == nil {
opts.ReadyToRequest = AlwaysReadyToRequest
}
if opts.MaxRequestsPerVolume == 0 {
opts.MaxRequestsPerVolume = 1
}
Expand Down Expand Up @@ -156,6 +160,7 @@ func NewManager(opts Options) (*Manager, error) {
generateRequest: opts.GenerateRequest,
signRequest: opts.SignRequest,
writeKeypair: opts.WriteKeypair,
readyToRequest: opts.ReadyToRequest,

managedVolumes: map[string]chan struct{}{},
stopInformer: stopCh,
Expand Down Expand Up @@ -218,6 +223,7 @@ type Manager struct {
generateRequest GenerateRequestFunc
signRequest SignRequestFunc
writeKeypair WriteKeypairFunc
readyToRequest ReadyToRequestFunc

lock sync.Mutex
// global view of all volumes managed by this manager
Expand Down Expand Up @@ -252,6 +258,10 @@ func (m *Manager) issue(volumeID string) error {
}
log.V(2).Info("Read metadata", "metadata", meta)

if ready, reason := m.readyToRequest(meta); !ready {
return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason)
}

key, err := m.generatePrivateKey(meta)
if err != nil {
return fmt.Errorf("generating private key: %w", err)
Expand Down Expand Up @@ -526,6 +536,16 @@ func (m *Manager) UnmanageVolume(volumeID string) {
}
}

func (m *Manager) IsVolumeReadyToRequest(volumeID string) (bool, string) {
meta, err := m.metadataReader.ReadMetadata(volumeID)
if err != nil {
m.log.Error(err, "failed to read metadata", "volume_id", volumeID)
return false, ""
}

return m.readyToRequest(meta)
}

func (m *Manager) IsVolumeReady(volumeID string) bool {
meta, err := m.metadataReader.ReadMetadata(volumeID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion storage/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (f *Filesystem) ListVolumes() ([]string, error) {
return vols, nil
}

// MetadataForVolume will return the metadata for the volume with the given ID.
// ReadMetadata will return the metadata for the volume with the given ID.
// Errors wrapping ErrNotFound will be returned if metadata for the ID cannot
// be found.
func (f *Filesystem) ReadMetadata(volumeID string) (metadata.Metadata, error) {
Expand Down
7 changes: 6 additions & 1 deletion storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,10 @@ func (m *MemoryFS) ReadFiles(volumeID string) (map[string][]byte, error) {
if !ok {
return nil, ErrNotFound
}
return vol, nil
// make a copy of the map to ensure no races can occur
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly using sync.RWMutex and RLock() to avoid the read race condition ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we return this map, it's not possible to do so as we can't enforce call-sites create a mutex. To push this onus onto the caller creates a fair bit of extra complexity and I'm not convinced that outweighs the performance gains.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got your idea. thanks.

cpy := make(map[string][]byte)
for k, v := range vol {
cpy[k] = v
}
return cpy, nil
}
201 changes: 201 additions & 0 deletions test/integration/ready_to_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
Copyright 2022 The cert-manager Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"context"
"crypto"
"crypto/x509"
"errors"
"fmt"
"os"
"reflect"
"testing"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
fakeclock "k8s.io/utils/clock/testing"

"github.com/cert-manager/csi-lib/manager"
"github.com/cert-manager/csi-lib/metadata"
"github.com/cert-manager/csi-lib/storage"
testutil "github.com/cert-manager/csi-lib/test/util"
)

func Test_CompletesIfNotReadyToRequest_ContinueOnNotReadyEnabled(t *testing.T) {
store := storage.NewMemoryFS()
clock := fakeclock.NewFakeClock(time.Now())

calls := 0
opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{
Store: store,
Clock: clock,
ContinueOnNotReady: true,
ReadyToRequest: func(meta metadata.Metadata) (bool, string) {
if calls < 1 {
calls++
return false, "calls < 1"
}
// only indicate we are ready after issuance has been attempted 1 time
return calls == 1, "calls == 1"
},
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
return nil, nil
},
GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) {
return &manager.CertificateRequestBundle{
Namespace: "certificaterequest-namespace",
}, nil
},
SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) {
return []byte{}, nil
},
WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error {
store.WriteFiles(meta, map[string][]byte{
"ca": ca,
"cert": chain,
})
nextIssuanceTime := clock.Now().Add(time.Hour)
meta.NextIssuanceTime = &nextIssuanceTime
return store.WriteMetadata(meta.VolumeID, meta)
},
})
defer stop()

// Setup a routine to issue/sign the request IF it is created
stopCh := make(chan struct{})
go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes"))
defer close(stopCh)

tmpDir, err := os.MkdirTemp("", "*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeId: "test-vol",
VolumeContext: map[string]string{
"csi.storage.k8s.io/ephemeral": "true",
"csi.storage.k8s.io/pod.name": "the-pod-name",
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
},
TargetPath: tmpDir,
Readonly: true,
})
if err != nil {
t.Fatal(err)
}

if err := wait.PollUntil(time.Second, func() (done bool, err error) {
files, err := store.ReadFiles("test-vol")
if errors.Is(err, storage.ErrNotFound) || len(files) <= 1 {
return false, nil
}
if err != nil {
return false, err
}
if !reflect.DeepEqual(files["ca"], []byte("ca bytes")) {
return false, fmt.Errorf("unexpected CA data: %v", files["ca"])
}
if !reflect.DeepEqual(files["cert"], []byte("certificate bytes")) {
return false, fmt.Errorf("unexpected certificate data: %v", files["cert"])
}
return true, nil
}, ctx.Done()); err != nil {
t.Error(err)
}
}

func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) {
store := storage.NewMemoryFS()
clock := fakeclock.NewFakeClock(time.Now())

opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{
Store: store,
Clock: clock,
ContinueOnNotReady: false,
ReadyToRequest: func(meta metadata.Metadata) (bool, string) {
return false, "never ready"
},
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
return nil, nil
},
GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) {
return &manager.CertificateRequestBundle{
Namespace: "certificaterequest-namespace",
}, nil
},
SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) {
return []byte{}, nil
},
WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error {
store.WriteFiles(meta, map[string][]byte{
"ca": ca,
"cert": chain,
})
nextIssuanceTime := clock.Now().Add(time.Hour)
meta.NextIssuanceTime = &nextIssuanceTime
return store.WriteMetadata(meta.VolumeID, meta)
},
})
defer stop()

// Setup a routine to issue/sign the request IF it is created
stopCh := make(chan struct{})
go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes"))
defer close(stopCh)
tmpDir, err := os.MkdirTemp("", "*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeId: "test-vol",
VolumeContext: map[string]string{
"csi.storage.k8s.io/ephemeral": "true",
"csi.storage.k8s.io/pod.name": "the-pod-name",
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
},
TargetPath: tmpDir,
Readonly: true,
})
if status.Code(err) != codes.DeadlineExceeded {
t.Errorf("Expected timeout to be returned from NodePublishVolume but got: %v", err)
}

// allow 1s for the cleanup functions in NodePublishVolume to be run
// without this pause, the test can flake due to the storage backend not
// being cleaned up of the persisted metadata file.
ctx, cancel2 := context.WithTimeout(context.Background(), time.Second)
defer cancel2()
if err := wait.PollUntil(time.Millisecond*100, func() (bool, error) {
_, err := store.ReadFiles("test-vol")
if err != storage.ErrNotFound {
return false, nil
}
return true, nil
}, ctx.Done()); err != nil {
t.Errorf("failed to wait for storage backend to return NotFound: %v", err)
}
}
Loading