Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
COSI-8: CreateBucket using COSI driver
Browse files Browse the repository at this point in the history
This commit enabled the functionality of create
bucket in the COSI driver.
Some helper functions have also been added for
obtaining kubernetes admin defined values.
  • Loading branch information
anurag4DSB committed Oct 29, 2024
1 parent 3a825f5 commit d7ba0ff
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 39 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
google.golang.org/grpc v1.66.0
k8s.io/client-go v0.31.0
k8s.io/client-go v0.31.2
k8s.io/klog/v2 v2.130.1
sigs.k8s.io/container-object-storage-interface-api v0.1.0
sigs.k8s.io/container-object-storage-interface-provisioner-sidecar v0.1.0
Expand All @@ -29,6 +29,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
)

require (
Expand Down Expand Up @@ -70,8 +71,8 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.0 // indirect
k8s.io/apimachinery v0.31.0 // indirect
k8s.io/api v0.31.2 // indirect
k8s.io/apimachinery v0.31.2 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/controller-runtime v0.12.3 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
Expand All @@ -184,10 +186,16 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.31.0 h1:b9LiSjR2ym/SzTOlfMHm1tr7/21aD7fSkqgD/CVJBCo=
k8s.io/api v0.31.0/go.mod h1:0YiFF+JfFxMM6+1hQei8FY8M7s1Mth+z/q7eF1aJkTE=
k8s.io/api v0.31.2 h1:3wLBbL5Uom/8Zy98GRPXpJ254nEFpl+hwndmk9RwmL0=
k8s.io/api v0.31.2/go.mod h1:bWmGvrGPssSK1ljmLzd3pwCQ9MgoTsRCuK35u6SygUk=
k8s.io/apimachinery v0.31.0 h1:m9jOiSr3FoSSL5WO9bjm1n6B9KROYYgNZOb4tyZ1lBc=
k8s.io/apimachinery v0.31.0/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/apimachinery v0.31.2 h1:i4vUt2hPK56W6mlT7Ry+AO8eEsyxMD1U44NR22CLTYw=
k8s.io/apimachinery v0.31.2/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.0 h1:QqEJzNjbN2Yv1H79SsS+SWnXkBgVu4Pj3CJQgbx0gI8=
k8s.io/client-go v0.31.0/go.mod h1:Y9wvC76g4fLjmU0BA+rV+h2cncoadjvjjkkIGoTLcGU=
k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc=
k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
Expand Down
171 changes: 151 additions & 20 deletions pkg/driver/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,63 @@ package driver

import (
"context"
"errors"
"os"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
s3client "github.com/scality/cosi/pkg/util/s3client"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cosiclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
"k8s.io/klog/v2"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
cosiapi "sigs.k8s.io/container-object-storage-interface-spec"
)

type provisionerServer struct {
Provisioner string
KubeClientset *kubernetes.Clientset
KubeConfig *rest.Config
CosiClientset cosiclientset.Interface
type ProvisionerServer struct {
Provisioner string
Clientset kubernetes.Interface
KubeConfig *rest.Config
BucketClientset bucketclientset.Interface
}

var _ cosiapi.ProvisionerServer = &provisionerServer{}
var _ cosiapi.ProvisionerServer = &ProvisionerServer{}

// helper methods initialized as variables for testing
var InitializeClient = initializeObjectStorageClient
var FetchSecretInformation = fetchObjectStorageProviderSecretInfo
var FetchParameters = fetchS3Parameters

func InitProvisionerServer(provisioner string) (cosiapi.ProvisionerServer, error) {
klog.V(3).InfoS("Initializing ProvisionerServer", "provisioner", provisioner)

func InitProvisionerServer(driverName string) (cosiapi.ProvisionerServer, error) {
kubeConfig, err := rest.InClusterConfig()
if err != nil {
klog.ErrorS(err, "Failed to get in-cluster config")
return nil, err
}

kubeClientset, err := kubernetes.NewForConfig(kubeConfig)
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
klog.ErrorS(err, "Failed to create Kubernetes clientset")
return nil, err
}

cosiClientset, err := cosiclientset.NewForConfig(kubeConfig)
bucketClientset, err := bucketclientset.NewForConfig(kubeConfig)
if err != nil {
klog.ErrorS(err, "Failed to create BucketClientset")
return nil, err
}

return &provisionerServer{
Provisioner: driverName,
KubeClientset: kubeClientset,
KubeConfig: kubeConfig,
CosiClientset: cosiClientset,
klog.V(3).InfoS("Successfully initialized ProvisionerServer", "provisioner", provisioner)
return &ProvisionerServer{
Provisioner: provisioner,
Clientset: clientset,
KubeConfig: kubeConfig,
BucketClientset: bucketClientset,
}, nil
}

Expand All @@ -70,10 +88,123 @@ func InitProvisionerServer(driverName string) (cosiapi.ProvisionerServer, error)
// nil - Bucket successfully created
// codes.AlreadyExists - Bucket already exists. No more retries
// non-nil err - Internal error [requeue'd with exponential backoff]
func (s *provisionerServer) DriverCreateBucket(ctx context.Context,
func (s *ProvisionerServer) DriverCreateBucket(ctx context.Context,
req *cosiapi.DriverCreateBucketRequest) (*cosiapi.DriverCreateBucketResponse, error) {
bucketName := req.GetName()
parameters := req.GetParameters()

return nil, status.Error(codes.Unimplemented, "DriverCreateBucket: not implemented")
klog.V(3).InfoS("Received DriverCreateBucket request", "bucketName", bucketName)
klog.V(5).InfoS("Request parameters", "parameters", parameters)

s3Client, s3Params, err := InitializeClient(ctx, s.Clientset, parameters)
if err != nil {
klog.ErrorS(err, "Failed to initialize object storage provider clients", "bucketName", bucketName)
return nil, status.Error(codes.Internal, "failed to initialize object storage provider S3 client")
}

err = s3Client.CreateBucket(ctx, bucketName, *s3Params)
if err != nil {
var bucketAlreadyExists *s3types.BucketAlreadyExists
var bucketOwnedByYou *s3types.BucketAlreadyOwnedByYou

if errors.As(err, &bucketAlreadyExists) {
klog.V(3).InfoS("Bucket already exists", "bucketName", bucketName)
return nil, status.Errorf(codes.AlreadyExists, "Bucket already exists: %s", bucketName)
} else if errors.As(err, &bucketOwnedByYou) {
klog.V(3).InfoS("Bucket with the same parameters already exists, no error", "bucketName", bucketName)
return &cosiapi.DriverCreateBucketResponse{
BucketId: bucketName,
}, nil
} else {
var opErr *smithy.OperationError
if errors.As(err, &opErr) {
klog.V(4).InfoS("AWS operation error encountered", "operation", opErr.OperationName, "message", opErr.Err.Error(), "bucketName", bucketName)
}
klog.ErrorS(err, "Failed to create bucket", "bucketName", bucketName)
return nil, status.Error(codes.Internal, "Failed to create bucket")
}
}
klog.V(3).InfoS("Successfully created bucket", "bucketName", bucketName)
return &cosiapi.DriverCreateBucketResponse{
BucketId: bucketName,
}, nil
}

func initializeObjectStorageClient(ctx context.Context, clientset kubernetes.Interface, parameters map[string]string) (*s3client.S3Client, *s3client.S3Params, error) {
klog.V(3).InfoS("Initializing object storage provider clients", "parameters", parameters)

ospSecretName, namespace, err := FetchSecretInformation(parameters)
if err != nil {
klog.ErrorS(err, "Failed to fetch object storage provider secret info")
return nil, nil, err
}

klog.V(4).InfoS("Fetching secret", "secretName", ospSecretName, "namespace", namespace)
ospSecret, err := clientset.CoreV1().Secrets(namespace).Get(ctx, ospSecretName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get object store user secret", "secretName", ospSecretName)
return nil, nil, status.Error(codes.Internal, "failed to get object store user secret")
}

s3Params, err := FetchParameters(ospSecret.Data)
if err != nil {
klog.ErrorS(err, "Failed to fetch S3 parameters from secret", "secretName", ospSecretName)
return nil, nil, err
}

s3Client, err := s3client.InitS3Client(*s3Params)
if err != nil {
klog.ErrorS(err, "Failed to create S3 client", "endpoint", s3Params.Endpoint)
return nil, nil, status.Error(codes.Internal, "failed to create S3 client")
}
klog.V(3).InfoS("Successfully initialized S3 client", "endpoint", s3Params.Endpoint)
return s3Client, s3Params, nil // Returning both the client and the params
}

func fetchObjectStorageProviderSecretInfo(parameters map[string]string) (string, string, error) {
klog.V(4).InfoS("Fetching object storage provider secret info", "parameters", parameters)

secretName := parameters["COSI_OBJECT_STORAGE_PROVIDER_SECRET_NAME"]
namespace := os.Getenv("POD_NAMESPACE")
if parameters["COSI_OBJECT_STORAGE_PROVIDER_SECRET_NAMESPACE"] != "" {
namespace = parameters["COSI_OBJECT_STORAGE_PROVIDER_SECRET_NAMESPACE"]
}
if secretName == "" || namespace == "" {
klog.ErrorS(nil, "Missing object storage provider secret name or namespace", "secretName", secretName, "namespace", namespace)
return "", "", status.Error(codes.InvalidArgument, "Object storage provider secret name and namespace are required")
}

klog.V(4).InfoS("Object storage provider secret info fetched", "secretName", secretName, "namespace", namespace)
return secretName, namespace, nil
}

func fetchS3Parameters(secretData map[string][]byte) (*s3client.S3Params, error) {
klog.V(5).InfoS("Fetching S3 parameters from secret")

accessKey := string(secretData["COSI_S3_ACCESS_KEY_ID"])
secretKey := string(secretData["COSI_S3_ACCESS_SECRET_KEY"])
endpoint := string(secretData["COSI_S3_ENDPOINT"])
region := string(secretData["COSI_S3_REGION"])

if endpoint == "" || accessKey == "" || secretKey == "" || region == "" {
klog.ErrorS(nil, "Missing required S3 parameters", "accessKey", accessKey != "", "secretKey", secretKey != "", "endpoint", endpoint != "", "region", region != "")
return nil, status.Error(codes.InvalidArgument, "endpoint, accessKeyID, secretKey and region are required")
}

var tlsCert []byte
if cert, exists := secretData["COSI_S3_TLS_CERT_SECRET_NAME"]; exists {
tlsCert = cert
} else {
klog.V(5).InfoS("TLS certificate is not provided, proceeding without it")
}

return &s3client.S3Params{
AccessKey: accessKey,
SecretKey: secretKey,
Endpoint: endpoint,
Region: region,
TLSCert: tlsCert,
}, nil
}

// DriverDeleteBucket is an idempotent method for deleting buckets
Expand All @@ -84,7 +215,7 @@ func (s *provisionerServer) DriverCreateBucket(ctx context.Context,
//
// nil - Bucket successfully deleted
// non-nil err - Internal error [requeue'd with exponential backoff]
func (s *provisionerServer) DriverDeleteBucket(ctx context.Context,
func (s *ProvisionerServer) DriverDeleteBucket(ctx context.Context,
req *cosiapi.DriverDeleteBucketRequest) (*cosiapi.DriverDeleteBucketResponse, error) {

return nil, status.Error(codes.Unimplemented, "DriverCreateBucket: not implemented")
Expand All @@ -97,7 +228,7 @@ func (s *provisionerServer) DriverDeleteBucket(ctx context.Context,
//
// nil - Bucket access successfully created
// non-nil err - Internal error [requeue'd with exponential backoff]
func (s *provisionerServer) DriverGrantBucketAccess(ctx context.Context,
func (s *ProvisionerServer) DriverGrantBucketAccess(ctx context.Context,
req *cosiapi.DriverGrantBucketAccessRequest) (*cosiapi.DriverGrantBucketAccessResponse, error) {

return nil, status.Error(codes.Unimplemented, "DriverCreateBucket: not implemented")
Expand All @@ -111,7 +242,7 @@ func (s *provisionerServer) DriverGrantBucketAccess(ctx context.Context,
//
// nil - Bucket access successfully deleted
// non-nil err - Internal error [requeue'd with exponential backoff]
func (s *provisionerServer) DriverRevokeBucketAccess(ctx context.Context,
func (s *ProvisionerServer) DriverRevokeBucketAccess(ctx context.Context,
req *cosiapi.DriverRevokeBucketAccessRequest) (*cosiapi.DriverRevokeBucketAccessResponse, error) {

return nil, status.Error(codes.Unimplemented, "DriverCreateBucket: not implemented")
Expand Down
Loading

0 comments on commit d7ba0ff

Please sign in to comment.