diff --git a/driver/driver.go b/driver/driver.go index 7195302..8bf6fe7 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -52,6 +52,16 @@ type Options struct { // If not specified, the current operating system's default implementation // will be used (i.e. 'mount.New("")') Mounter mount.Interface + // ContinueOnNotReady will cause the driver's nodeserver to continue + // mounting the volume even if the driver is not ready to create a request yet. + // This is useful if you need to defer requesting a certificate until after + // initialization of the Pod (e.g. IPAM so a pod IP is allocated). + // Enabling this option WILL cause a period of time during pod startup whereby + // certificate data is not available in the volume whilst the process is running. + // An `initContainer` or other special logic in the user application must be + // added to avoid running into CrashLoopBackOff situations which can delay pod + // start time. + ContinueOnNotReady bool } func New(endpoint string, log logr.Logger, opts Options) (*Driver, error) { @@ -75,11 +85,12 @@ func buildServers(opts Options, log logr.Logger) (*identityServer, *controllerSe opts.Mounter = mount.New("") } return NewIdentityServer(opts.DriverName, opts.DriverVersion), &controllerServer{}, &nodeServer{ - log: log, - nodeID: opts.NodeID, - manager: opts.Manager, - store: opts.Store, - mounter: opts.Mounter, + log: log, + nodeID: opts.NodeID, + manager: opts.Manager, + store: opts.Store, + mounter: opts.Mounter, + continueOnNotReady: opts.ContinueOnNotReady, } } diff --git a/driver/nodeserver.go b/driver/nodeserver.go index d6751b6..114ce78 100644 --- a/driver/nodeserver.go +++ b/driver/nodeserver.go @@ -41,6 +41,8 @@ type nodeServer struct { mounter mount.Interface log logr.Logger + + continueOnNotReady bool } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { @@ -83,10 +85,23 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis log.Info("Volume registered for management") - if err := wait.PollUntil(time.Second, func() (done bool, err error) { - return ns.manager.IsVolumeReady(req.GetVolumeId()), nil - }, ctx.Done()); err != nil { - return nil, err + // Only wait for the volume to be ready if it is in a state of 'ready to request' + // already. This allows implementors to defer actually requesting certificates + // until later in the pod lifecycle (e.g. after CNI has run & an IP address has been + // allocated, if a user wants to embed pod IPs into their requests). + isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId()) + if !isReadyToRequest { + log.Info("Unable to request a certificate right now, will be retried", "reason", reason) + } + if isReadyToRequest || !ns.continueOnNotReady { + log.Info("Waiting for certificate to be issued...") + if err := wait.PollUntil(time.Second, func() (done bool, err error) { + return ns.manager.IsVolumeReady(req.GetVolumeId()), nil + }, ctx.Done()); err != nil { + return nil, err + } + } else { + log.Info("Skipping waiting for certificate to be issued") } log.Info("Volume ready for mounting") diff --git a/manager/interfaces.go b/manager/interfaces.go index b96ceba..5ae41b5 100644 --- a/manager/interfaces.go +++ b/manager/interfaces.go @@ -89,3 +89,14 @@ type WriteKeypairFunc func(meta metadata.Metadata, key crypto.PrivateKey, chain // volume being published. Useful for modifying clients to make use of CSI // token requests. type ClientForMetadataFunc func(meta metadata.Metadata) (cmclient.Interface, error) + +// ReadyToRequestFunc can be optionally implemented by drivers to indicate whether +// the driver is ready to request a certificate for the given volume/metadata. +// This can be used to 'defer' fetching until later pod initialization events have +// happened (e.g. CNI has allocated an IP if you want to embed a pod IP into the certificate +// request resources). +type ReadyToRequestFunc func(meta metadata.Metadata) (bool, string) + +func AlwaysReadyToRequest(_ metadata.Metadata) (bool, string) { + return true, "" +} diff --git a/manager/manager.go b/manager/manager.go index 1e6d058..5b307d7 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -81,6 +81,7 @@ type Options struct { GenerateRequest GenerateRequestFunc SignRequest SignRequestFunc WriteKeypair WriteKeypairFunc + ReadyToRequest ReadyToRequestFunc } // NewManager constructs a new manager used to manage volumes containing @@ -117,6 +118,9 @@ func NewManager(opts Options) (*Manager, error) { if opts.WriteKeypair == nil { return nil, errors.New("WriteKeypair must be set") } + if opts.ReadyToRequest == nil { + opts.ReadyToRequest = AlwaysReadyToRequest + } if opts.MaxRequestsPerVolume == 0 { opts.MaxRequestsPerVolume = 1 } @@ -156,6 +160,7 @@ func NewManager(opts Options) (*Manager, error) { generateRequest: opts.GenerateRequest, signRequest: opts.SignRequest, writeKeypair: opts.WriteKeypair, + readyToRequest: opts.ReadyToRequest, managedVolumes: map[string]chan struct{}{}, stopInformer: stopCh, @@ -218,6 +223,7 @@ type Manager struct { generateRequest GenerateRequestFunc signRequest SignRequestFunc writeKeypair WriteKeypairFunc + readyToRequest ReadyToRequestFunc lock sync.Mutex // global view of all volumes managed by this manager @@ -252,6 +258,10 @@ func (m *Manager) issue(volumeID string) error { } log.V(2).Info("Read metadata", "metadata", meta) + if ready, reason := m.readyToRequest(meta); !ready { + return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason) + } + key, err := m.generatePrivateKey(meta) if err != nil { return fmt.Errorf("generating private key: %w", err) @@ -526,6 +536,16 @@ func (m *Manager) UnmanageVolume(volumeID string) { } } +func (m *Manager) IsVolumeReadyToRequest(volumeID string) (bool, string) { + meta, err := m.metadataReader.ReadMetadata(volumeID) + if err != nil { + m.log.Error(err, "failed to read metadata", "volume_id", volumeID) + return false, "" + } + + return m.readyToRequest(meta) +} + func (m *Manager) IsVolumeReady(volumeID string) bool { meta, err := m.metadataReader.ReadMetadata(volumeID) if err != nil { diff --git a/storage/filesystem.go b/storage/filesystem.go index 1b1e2a6..9fb13d1 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -124,7 +124,7 @@ func (f *Filesystem) ListVolumes() ([]string, error) { return vols, nil } -// MetadataForVolume will return the metadata for the volume with the given ID. +// ReadMetadata will return the metadata for the volume with the given ID. // Errors wrapping ErrNotFound will be returned if metadata for the ID cannot // be found. func (f *Filesystem) ReadMetadata(volumeID string) (metadata.Metadata, error) { diff --git a/storage/memory.go b/storage/memory.go index c8aa410..9db3645 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -146,5 +146,10 @@ func (m *MemoryFS) ReadFiles(volumeID string) (map[string][]byte, error) { if !ok { return nil, ErrNotFound } - return vol, nil + // make a copy of the map to ensure no races can occur + cpy := make(map[string][]byte) + for k, v := range vol { + cpy[k] = v + } + return cpy, nil } diff --git a/test/integration/ready_to_request_test.go b/test/integration/ready_to_request_test.go new file mode 100644 index 0000000..f41814f --- /dev/null +++ b/test/integration/ready_to_request_test.go @@ -0,0 +1,201 @@ +/* +Copyright 2022 The cert-manager Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "crypto" + "crypto/x509" + "errors" + "fmt" + "os" + "reflect" + "testing" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/util/wait" + fakeclock "k8s.io/utils/clock/testing" + + "github.com/cert-manager/csi-lib/manager" + "github.com/cert-manager/csi-lib/metadata" + "github.com/cert-manager/csi-lib/storage" + testutil "github.com/cert-manager/csi-lib/test/util" +) + +func Test_CompletesIfNotReadyToRequest_ContinueOnNotReadyEnabled(t *testing.T) { + store := storage.NewMemoryFS() + clock := fakeclock.NewFakeClock(time.Now()) + + calls := 0 + opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{ + Store: store, + Clock: clock, + ContinueOnNotReady: true, + ReadyToRequest: func(meta metadata.Metadata) (bool, string) { + if calls < 1 { + calls++ + return false, "calls < 1" + } + // only indicate we are ready after issuance has been attempted 1 time + return calls == 1, "calls == 1" + }, + GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) { + return nil, nil + }, + GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) { + return &manager.CertificateRequestBundle{ + Namespace: "certificaterequest-namespace", + }, nil + }, + SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) { + return []byte{}, nil + }, + WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error { + store.WriteFiles(meta, map[string][]byte{ + "ca": ca, + "cert": chain, + }) + nextIssuanceTime := clock.Now().Add(time.Hour) + meta.NextIssuanceTime = &nextIssuanceTime + return store.WriteMetadata(meta.VolumeID, meta) + }, + }) + defer stop() + + // Setup a routine to issue/sign the request IF it is created + stopCh := make(chan struct{}) + go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes")) + defer close(stopCh) + + tmpDir, err := os.MkdirTemp("", "*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + _, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ + VolumeId: "test-vol", + VolumeContext: map[string]string{ + "csi.storage.k8s.io/ephemeral": "true", + "csi.storage.k8s.io/pod.name": "the-pod-name", + "csi.storage.k8s.io/pod.namespace": "the-pod-namespace", + }, + TargetPath: tmpDir, + Readonly: true, + }) + if err != nil { + t.Fatal(err) + } + + if err := wait.PollUntil(time.Second, func() (done bool, err error) { + files, err := store.ReadFiles("test-vol") + if errors.Is(err, storage.ErrNotFound) || len(files) <= 1 { + return false, nil + } + if err != nil { + return false, err + } + if !reflect.DeepEqual(files["ca"], []byte("ca bytes")) { + return false, fmt.Errorf("unexpected CA data: %v", files["ca"]) + } + if !reflect.DeepEqual(files["cert"], []byte("certificate bytes")) { + return false, fmt.Errorf("unexpected certificate data: %v", files["cert"]) + } + return true, nil + }, ctx.Done()); err != nil { + t.Error(err) + } +} + +func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) { + store := storage.NewMemoryFS() + clock := fakeclock.NewFakeClock(time.Now()) + + opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{ + Store: store, + Clock: clock, + ContinueOnNotReady: false, + ReadyToRequest: func(meta metadata.Metadata) (bool, string) { + return false, "never ready" + }, + GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) { + return nil, nil + }, + GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) { + return &manager.CertificateRequestBundle{ + Namespace: "certificaterequest-namespace", + }, nil + }, + SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) { + return []byte{}, nil + }, + WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error { + store.WriteFiles(meta, map[string][]byte{ + "ca": ca, + "cert": chain, + }) + nextIssuanceTime := clock.Now().Add(time.Hour) + meta.NextIssuanceTime = &nextIssuanceTime + return store.WriteMetadata(meta.VolumeID, meta) + }, + }) + defer stop() + + // Setup a routine to issue/sign the request IF it is created + stopCh := make(chan struct{}) + go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes")) + defer close(stopCh) + tmpDir, err := os.MkdirTemp("", "*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ + VolumeId: "test-vol", + VolumeContext: map[string]string{ + "csi.storage.k8s.io/ephemeral": "true", + "csi.storage.k8s.io/pod.name": "the-pod-name", + "csi.storage.k8s.io/pod.namespace": "the-pod-namespace", + }, + TargetPath: tmpDir, + Readonly: true, + }) + if status.Code(err) != codes.DeadlineExceeded { + t.Errorf("Expected timeout to be returned from NodePublishVolume but got: %v", err) + } + + // allow 1s for the cleanup functions in NodePublishVolume to be run + // without this pause, the test can flake due to the storage backend not + // being cleaned up of the persisted metadata file. + ctx, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + if err := wait.PollUntil(time.Millisecond*100, func() (bool, error) { + _, err := store.ReadFiles("test-vol") + if err != storage.ErrNotFound { + return false, nil + } + return true, nil + }, ctx.Done()); err != nil { + t.Errorf("failed to wait for storage backend to return NotFound: %v", err) + } +} diff --git a/test/util/testutil.go b/test/util/testutil.go index 4f5afe9..33204a0 100644 --- a/test/util/testutil.go +++ b/test/util/testutil.go @@ -53,11 +53,13 @@ type DriverOptions struct { NodeID string MaxRequestsPerVolume int + ContinueOnNotReady bool GeneratePrivateKey manager.GeneratePrivateKeyFunc GenerateRequest manager.GenerateRequestFunc SignRequest manager.SignRequestFunc WriteKeypair manager.WriteKeypairFunc + ReadyToRequest manager.ReadyToRequestFunc } func RunTestDriver(t *testing.T, opts DriverOptions) (DriverOptions, csi.NodeClient, func()) { @@ -117,15 +119,17 @@ func RunTestDriver(t *testing.T, opts DriverOptions) (DriverOptions, csi.NodeCli GenerateRequest: opts.GenerateRequest, SignRequest: opts.SignRequest, WriteKeypair: opts.WriteKeypair, + ReadyToRequest: opts.ReadyToRequest, }) d := driver.NewWithListener(lis, *opts.Log, driver.Options{ - DriverName: "driver-name", - DriverVersion: "v0.0.1", - NodeID: opts.NodeID, - Store: opts.Store, - Mounter: opts.Mounter, - Manager: m, + DriverName: "driver-name", + DriverVersion: "v0.0.1", + NodeID: opts.NodeID, + Store: opts.Store, + Mounter: opts.Mounter, + Manager: m, + ContinueOnNotReady: opts.ContinueOnNotReady, }) // start the driver