Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iam): add iam bindings to crds #191

Merged
merged 12 commits into from
Mar 26, 2024
29 changes: 29 additions & 0 deletions api/v1/iam_binding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

// IamBinding provides authorative binding of permissions to for the resource.
type IamBinding struct {
// Role granted for the service accounts on topic / subscription level.
//+kubebuilder:validation:Required
//+kubebuilder:example="roles/pubsub.publisher"
Role string `json:"role"`

// Service accounts assigned with the role
//+kubebuilder:validation:Required
ServiceAccounts []string `json:"serviceAccounts"`
}
4 changes: 4 additions & 0 deletions api/v1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type SubscriptionSpec struct {
// project ID of topic
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
TopicProjectID string `json:"topicProjectID,omitempty"`

// Authorative IAM Binding for subscription
//+kubebuilder:validation:Optional
Bindings []IamBinding `json:"bindings,omitempty"`
}

// SubscriptionStatus defines the observed state of Subscription
Expand Down
4 changes: 4 additions & 0 deletions api/v1/topic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type TopicSpec struct {
// ID of topic
//+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf"
TopicID string `json:"topicID,omitempty"`

// Authorative IAM Binding for topic
//+kubebuilder:validation:Optional
Bindings []IamBinding `json:"bindings,omitempty"`
}

// TopicStatus defines the observed state of Topic
Expand Down
38 changes: 36 additions & 2 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ spec:
spec:
description: SubscriptionSpec defines the desired state of Subscription
properties:
bindings:
description: Authorative IAM Binding for subscription
items:
description: IamBinding provides authorative binding of permissions
to for the resource.
properties:
role:
description: Role granted for the service accounts on topic
/ subscription level.
example: roles/pubsub.publisher
type: string
serviceAccounts:
description: Service accounts assigned with the role
items:
type: string
type: array
required:
- role
- serviceAccounts
type: object
type: array
subscriptionID:
description: subscription ID
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ spec:
spec:
description: TopicSpec defines the desired state of Topic
properties:
bindings:
description: Authorative IAM Binding for topic
items:
description: IamBinding provides authorative binding of permissions
to for the resource.
properties:
role:
description: Role granted for the service accounts on topic
/ subscription level.
example: roles/pubsub.publisher
type: string
serviceAccounts:
description: Service accounts assigned with the role
items:
type: string
type: array
required:
- role
- serviceAccounts
type: object
type: array
projectID:
description: ID of project
type: string
Expand Down
37 changes: 37 additions & 0 deletions internal/controller/subscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"cloud.google.com/go/iam"
"cloud.google.com/go/pubsub"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -102,6 +103,14 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
if isPubSubAlreadyExistsError(err) {
// don't treat as error
logger.Info("PubSub subscription already exists")

// update iam policy
if err = r.updateIamPolicy(ctx, subscription.Spec.SubscriptionProjectID, subscription.Spec.SubscriptionID, subscription.Spec.Bindings); err != nil {
logger.Error(err, "Unable to update iam policy for topic")
return ctrl.Result{}, err
}
logger.Info("Updated IAM policy for subscription", "id", subscription.Spec.SubscriptionID)

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
Expand Down Expand Up @@ -130,6 +139,13 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
logger.Info(fmt.Sprintf("Subscription created: %v", s.ID()), "subscription", subscription)

// update iam policy
if err = r.updateIamPolicy(ctx, subscription.Spec.SubscriptionProjectID, subscription.Spec.SubscriptionID, subscription.Spec.Bindings); err != nil {
logger.Error(err, "Unable to update iam policy for topic")
return ctrl.Result{}, err
}
logger.Info("Updated IAM policy for subscription", "id", s.ID())

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
Expand Down Expand Up @@ -184,3 +200,24 @@ func (r *SubscriptionReconciler) deleteSubscription(ctx context.Context, subscri
}
return nil
}

// updateIamPolicy sets the topic iam policy to match the defined spec, if the bindings are non nil.
func (r *SubscriptionReconciler) updateIamPolicy(ctx context.Context, projectID, subscriptionID string, bindings []googlecloudpubsuboperatorv1.IamBinding) error {
if bindings != nil {
c, err := r.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer c.Close()

policy := &iam.Policy{}
for _, bind := range bindings {
for _, member := range bind.ServiceAccounts {
policy.Add(member, iam.RoleName(bind.Role))
}
}

return c.Subscription(subscriptionID).IAM().SetPolicy(ctx, policy)
}
return nil
}
56 changes: 56 additions & 0 deletions internal/controller/subscription_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"cloud.google.com/go/iam"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1"
Expand Down Expand Up @@ -52,6 +53,61 @@ var _ = Describe("Subscription controller", func() {
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})

It("Should create a Pub/Sub Subscription with IAM Binding", func(ctx context.Context) {
const projectID = "subscription-project-1"
const subscriptionID = "my-subscription-with-iam"
psClient, err := pubsubtest.NewClient(ctx, projectID, psServer)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Topic")
topicID := "my-topic-2"
_, err = psClient.CreateTopic(ctx, topicID)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Subscription")
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
SubscriptionID: subscriptionID,
TopicProjectID: projectID,
TopicID: topicID,
Bindings: []googlecloudpubsuboperatorv1.IamBinding{
{
Role: "roles/pubsub.consumer",
ServiceAccounts: []string{
"[email protected]",
},
},
},
},
}
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())

By("Checking if the Subscription exists")
Eventually(func(g Gomega) {
sub := psClient.Subscription(subscriptionID)
subscriptionExists, err := sub.Exists(ctx)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(subscriptionExists).Should(BeTrue())

policy, err := sub.IAM().Policy(ctx)
g.Expect(err).ShouldNot(HaveOccurred())

consumers := policy.Members(iam.RoleName("roles/pubsub.consumer"))
g.Expect(consumers).Should(ContainElement("[email protected]"))
g.Expect(consumers).Should(HaveLen(1))

}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "subscription-project-2"

Expand Down
34 changes: 34 additions & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import (
"context"
"fmt"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"
"unsafe"

"cloud.google.com/go/iam/apiv1/iampb"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/quipper/google-cloud-pubsub-operator/internal/pubsubtest"
"google.golang.org/api/option"
"google.golang.org/grpc"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
clocktesting "k8s.io/utils/clock/testing"
Expand Down Expand Up @@ -57,6 +61,15 @@ func TestControllers(t *testing.T) {
RunSpecs(t, "Controller Suite")
}

func TestGetInternalGrpcServer(t *testing.T) {
psServer := pstest.NewServer()
gSrv := getInternalGrpcServer(psServer)
sInfo := gSrv.GetServiceInfo()
if sInfo == nil {
t.Error("failed to get grpc server info")
}
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

Expand Down Expand Up @@ -104,6 +117,13 @@ var _ = BeforeSuite(func() {
psServer = pstest.NewServer(
pubsubtest.CreateTopicErrorInjectionReactor(),
)

gsrv := getInternalGrpcServer(psServer)

// trying to register fake iam policy server
// this fails randomly because Registering is not possible after `grsv.Serve()` is called.
iampb.RegisterIAMPolicyServer(gsrv, pubsubtest.CreateFakeIamPolicyServer())

DeferCleanup(func() {
Expect(psServer.Close()).Should(Succeed())
})
Expand Down Expand Up @@ -138,3 +158,17 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
})

// getInternalGrpcServer uses reflection to receive pointer to the internal grpc.Server located in the psutil.Server.
// This reference can be used to register additional fake servers.
func getInternalGrpcServer(psServer *pstest.Server) *grpc.Server {
// try to get access to `Gsrv` thru `*testutil.Server`, that's on first field of `pstest.Server`
rs := reflect.ValueOf(psServer).Elem()

// *testutil.Server
tsrv := rs.Field(0)
tsrv = reflect.NewAt(tsrv.Type(), unsafe.Pointer(tsrv.UnsafeAddr())).Elem()

// *grpc.Server is found from the exported field `Gsrv`
return reflect.Indirect(tsrv).FieldByName("Gsrv").Interface().(*grpc.Server)
}
Loading