diff --git a/apis/addtoscheme_connection_v1alpha1.go b/apis/addtoscheme_connection_v1alpha1.go new file mode 100644 index 00000000000..4e4cd66fccb --- /dev/null +++ b/apis/addtoscheme_connection_v1alpha1.go @@ -0,0 +1,25 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +import ( + "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" +) + +func init() { + // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back + AddToSchemes = append(AddToSchemes, v1alpha1.AddToScheme) +} diff --git a/apis/connection/v1alpha1/connection_types.go b/apis/connection/v1alpha1/connection_types.go new file mode 100644 index 00000000000..8ef9df0f59a --- /dev/null +++ b/apis/connection/v1alpha1/connection_types.go @@ -0,0 +1,67 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ConnectionSpec is the configuration for the pubsub connection. +type ConnectionSpec struct { + // Provider is the name of the pubsub provider + // dapr/rabbitmq/redis/nats/ + Driver string `json:"driver,omitempty"` + Config map[string]string `json:"config,omitempty"` +} + +// ConnectionStatus defines the observed state of Config. +type ConnectionStatus struct { + ByPod []ByPodStatus `json:"byPod,omitempty"` +} +type ByPodStatus struct { + // a unique identifier for the pod that wrote the status + ID string `json:"id,omitempty"` + Errors []ConnectionError `json:"errors,omitempty"` +} + +type ConnectionError struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// +kubebuilder:resource:scope=Namespaced +// +kubebuilder:object:root=true + +// Connection is the Schema for the configs API. +type Connection struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec ConnectionSpec `json:"spec,omitempty"` + Status ConnectionStatus `json:"status,omitempty"` +} +// +kubebuilder:object:root=true + +// ConnectionList contains a list of Config. +type ConnectionList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Connection `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Connection{}, &ConnectionList{}) +} diff --git a/apis/connection/v1alpha1/groupversion_info.go b/apis/connection/v1alpha1/groupversion_info.go new file mode 100644 index 00000000000..f242484176b --- /dev/null +++ b/apis/connection/v1alpha1/groupversion_info.go @@ -0,0 +1,35 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha1 contains API Schema definitions for the config v1alpha1 API group +// +kubebuilder:object:generate=true +// +groupName=connection.gatekeeper.sh +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + // GroupVersion is group version used to register these objects. + GroupVersion = schema.GroupVersion{Group: "connection.gatekeeper.sh", Version: "v1alpha1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme. + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + + // AddToScheme adds the types in this group-version to the given scheme. + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/apis/connection/v1alpha1/zz_generated.deepcopy.go b/apis/connection/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..846e2b8239f --- /dev/null +++ b/apis/connection/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,162 @@ +//go:build !ignore_autogenerated + +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ByPodStatus) DeepCopyInto(out *ByPodStatus) { + *out = *in + if in.Errors != nil { + in, out := &in.Errors, &out.Errors + *out = make([]ConnectionError, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ByPodStatus. +func (in *ByPodStatus) DeepCopy() *ByPodStatus { + if in == nil { + return nil + } + out := new(ByPodStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Connection) DeepCopyInto(out *Connection) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Connection. +func (in *Connection) DeepCopy() *Connection { + if in == nil { + return nil + } + out := new(Connection) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Connection) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionError) DeepCopyInto(out *ConnectionError) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionError. +func (in *ConnectionError) DeepCopy() *ConnectionError { + if in == nil { + return nil + } + out := new(ConnectionError) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionList) DeepCopyInto(out *ConnectionList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Connection, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionList. +func (in *ConnectionList) DeepCopy() *ConnectionList { + if in == nil { + return nil + } + out := new(ConnectionList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ConnectionList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionSpec) DeepCopyInto(out *ConnectionSpec) { + *out = *in + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionSpec. +func (in *ConnectionSpec) DeepCopy() *ConnectionSpec { + if in == nil { + return nil + } + out := new(ConnectionSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionStatus) DeepCopyInto(out *ConnectionStatus) { + *out = *in + if in.ByPod != nil { + in, out := &in.ByPod, &out.ByPod + *out = make([]ByPodStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionStatus. +func (in *ConnectionStatus) DeepCopy() *ConnectionStatus { + if in == nil { + return nil + } + out := new(ConnectionStatus) + in.DeepCopyInto(out) + return out +} diff --git a/config/crd/bases/connection.gatekeeper.sh_connections.yaml b/config/crd/bases/connection.gatekeeper.sh_connections.yaml new file mode 100644 index 00000000000..223bf4f0796 --- /dev/null +++ b/config/crd/bases/connection.gatekeeper.sh_connections.yaml @@ -0,0 +1,79 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the configs API. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec is the configuration for the pubsub connection. + properties: + config: + additionalProperties: + type: string + type: object + driver: + description: |- + Provider is the name of the pubsub provider + dapr/rabbitmq/redis/nats/ + type: string + type: object + status: + description: ConnectionStatus defines the observed state of Config. + properties: + byPod: + items: + properties: + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: a unique identifier for the pod that wrote the + status + type: string + type: object + type: array + type: object + type: object + served: true + storage: true diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 4eb253c7672..44e40fe037e 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -3,6 +3,7 @@ # It should be run by config/default resources: - bases/config.gatekeeper.sh_configs.yaml +- bases/connection.gatekeeper.sh_connections.yaml - bases/syncset.gatekeeper.sh_syncsets.yaml - bases/status.gatekeeper.sh_constraintpodstatuses.yaml - bases/status.gatekeeper.sh_constrainttemplatepodstatuses.yaml diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index f3416ee2060..366b2971cf9 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -76,6 +76,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: diff --git a/deploy/gatekeeper.yaml b/deploy/gatekeeper.yaml index b9bf0ed2303..b4eb4b11b35 100644 --- a/deploy/gatekeeper.yaml +++ b/deploy/gatekeeper.yaml @@ -2505,6 +2505,87 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + gatekeeper.sh/system: "yes" + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the configs API. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec is the configuration for the pubsub connection. + properties: + config: + additionalProperties: + type: string + type: object + driver: + description: |- + Provider is the name of the pubsub provider + dapr/rabbitmq/redis/nats/ + type: string + type: object + status: + description: ConnectionStatus defines the observed state of Config. + properties: + byPod: + items: + properties: + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: a unique identifier for the pod that wrote the status + type: string + type: object + type: array + type: object + type: object + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.14.0 @@ -4740,6 +4821,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: @@ -4958,7 +5051,7 @@ spec: - --logtostderr - --disable-opa-builtin={http.send} - --disable-cert-rotation - - --enable-pub-sub=false + - --enable-pub-sub=true - --constraint-violations-limit=0 - --audit-connection=audit - --audit-channel=audit diff --git a/manifest_staging/charts/gatekeeper/crds/connection-customresourcedefinition.yaml b/manifest_staging/charts/gatekeeper/crds/connection-customresourcedefinition.yaml new file mode 100644 index 00000000000..d814e87a4ec --- /dev/null +++ b/manifest_staging/charts/gatekeeper/crds/connection-customresourcedefinition.yaml @@ -0,0 +1,85 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + app: '{{ template "gatekeeper.name" . }}' + chart: '{{ template "gatekeeper.name" . }}' + gatekeeper.sh/system: "yes" + heritage: '{{ .Release.Service }}' + release: '{{ .Release.Name }}' + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the configs API. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec is the configuration for the pubsub connection. + properties: + config: + additionalProperties: + type: string + type: object + driver: + description: |- + Provider is the name of the pubsub provider + dapr/rabbitmq/redis/nats/ + type: string + type: object + status: + description: ConnectionStatus defines the observed state of Config. + properties: + byPod: + items: + properties: + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: a unique identifier for the pod that wrote the status + type: string + type: object + type: array + type: object + type: object + served: true + storage: true diff --git a/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml b/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml index a6306b3a285..3da336d5b56 100644 --- a/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml +++ b/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml @@ -83,6 +83,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: diff --git a/manifest_staging/deploy/gatekeeper.yaml b/manifest_staging/deploy/gatekeeper.yaml index fe375e4eb43..8db881a06a7 100644 --- a/manifest_staging/deploy/gatekeeper.yaml +++ b/manifest_staging/deploy/gatekeeper.yaml @@ -2505,6 +2505,87 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + gatekeeper.sh/system: "yes" + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the configs API. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec is the configuration for the pubsub connection. + properties: + config: + additionalProperties: + type: string + type: object + driver: + description: |- + Provider is the name of the pubsub provider + dapr/rabbitmq/redis/nats/ + type: string + type: object + status: + description: ConnectionStatus defines the observed state of Config. + properties: + byPod: + items: + properties: + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: a unique identifier for the pod that wrote the status + type: string + type: object + type: array + type: object + type: object + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.14.0 @@ -4740,6 +4821,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: diff --git a/pkg/controller/pubsub/pubsub_config_controller.go b/pkg/controller/pubsub/pubsub_config_controller.go index 7b93376bcd0..20418391b97 100644 --- a/pkg/controller/pubsub/pubsub_config_controller.go +++ b/pkg/controller/pubsub/pubsub_config_controller.go @@ -2,25 +2,21 @@ package pubsub import ( "context" - "encoding/json" "flag" "fmt" + connectionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" - "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -71,30 +67,17 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { } return c.Watch( - source.Kind(mgr.GetCache(), &corev1.ConfigMap{}, - &handler.TypedEnqueueRequestForObject[*corev1.ConfigMap]{}, - predicate.TypedFuncs[*corev1.ConfigMap]{ - CreateFunc: func(e event.TypedCreateEvent[*corev1.ConfigMap]) bool { - return e.Object.GetNamespace() == util.GetNamespace() - }, - UpdateFunc: func(e event.TypedUpdateEvent[*corev1.ConfigMap]) bool { - return e.ObjectNew.GetNamespace() == util.GetNamespace() - }, - DeleteFunc: func(e event.TypedDeleteEvent[*corev1.ConfigMap]) bool { - return e.Object.GetNamespace() == util.GetNamespace() - }, - GenericFunc: func(e event.TypedGenericEvent[*corev1.ConfigMap]) bool { - return e.Object.GetNamespace() == util.GetNamespace() - }, - }, - )) + source.Kind(mgr.GetCache(), &connectionv1alpha1.Connection{}, + &handler.TypedEnqueueRequestForObject[*connectionv1alpha1.Connection]{})) } +// +kubebuilder:rbac:groups=connection.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete + func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { log.Info("Reconcile", "request", request, "namespace", request.Namespace, "name", request.Name) deleted := false - cfg := &corev1.ConfigMap{} + cfg := &connectionv1alpha1.Connection{} err := r.Get(ctx, request.NamespacedName, cfg) if err != nil { if !errors.IsNotFound(err) { @@ -112,23 +95,18 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{}, nil } - if len(cfg.Data) == 0 { - return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName)) - } - if _, ok := cfg.Data["provider"]; !ok { - return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName)) + if len(cfg.Spec.Config) == 0 { + return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("config missing in connection %s, unable to configure respective pubsub", request.NamespacedName)) } - var config interface{} - err = json.Unmarshal([]byte(cfg.Data["config"]), &config) - if err != nil { - return reconcile.Result{}, err + if cfg.Spec.Driver == "" { + return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing driver field in connection %s, unable to configure respective pubsub", request.NamespacedName)) } - err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"]) + err = r.system.UpsertConnection(ctx, cfg.Spec.Config, request.Name, cfg.Spec.Driver) if err != nil { return reconcile.Result{}, err } - log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Data["provider"]) + log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Spec.Driver) return reconcile.Result{}, nil } diff --git a/pkg/pubsub/diskwriter/diskwriter.go b/pkg/pubsub/diskwriter/diskwriter.go index 4d668b70236..94e5429790c 100644 --- a/pkg/pubsub/diskwriter/diskwriter.go +++ b/pkg/pubsub/diskwriter/diskwriter.go @@ -13,8 +13,8 @@ import ( ) type DiskWriter struct { - Path string `json:"path,omitempty"` - file *os.File + Path string `json:"path,omitempty"` + file *os.File } const ( @@ -22,7 +22,7 @@ const ( ) func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) error { - if msg, ok := data.(string); ok && msg == "audit is completed" { + if msg, ok := data.(string); ok && msg == "audit is completed" { // release lock err := syscall.Flock(int(r.file.Fd()), syscall.LOCK_UN) r.file.Close() @@ -36,29 +36,29 @@ func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) erro } if r.file == nil { - // Open a new file and acquire a lock - filePath := path.Join(r.Path, "violations.txt") - file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } + // Open a new file and acquire a lock + filePath := path.Join(r.Path, "violations.txt") + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + + for { + err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) // Sleep for a short duration before retrying + } - for { - err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX) - if err == nil { - break - } - time.Sleep(100 * time.Millisecond) // Sleep for a short duration before retrying - } - r.file = file err = r.file.Truncate(0) if err != nil { r.file = nil - return fmt.Errorf("failed to truncate file: %w", err) - } + return fmt.Errorf("failed to truncate file: %w", err) + } } - + _, err = r.file.WriteString(string(jsonData) + "\n") if err != nil { return fmt.Errorf("error publishing message to dapr: %w", err) @@ -87,11 +87,11 @@ func (r *DiskWriter) UpdateConnection(_ context.Context, _ interface{}) error { // Returns a new client for dapr. func NewConnection(_ context.Context, config interface{}) (connection.Connection, error) { var diskWriter DiskWriter - m, ok := config.(map[string]interface{}) + m, ok := config.(map[string]string) if !ok { return nil, fmt.Errorf("invalid type assertion, config is not in expected format") } - diskWriter.Path, ok = m["path"].(string) + diskWriter.Path, ok = m["path"] if !ok { return nil, fmt.Errorf("failed to get value of path") } diff --git a/test/pubsub/fake-reader/main.go b/test/pubsub/fake-reader/main.go index 6b2650dc4bf..8f1ac996658 100644 --- a/test/pubsub/fake-reader/main.go +++ b/test/pubsub/fake-reader/main.go @@ -2,11 +2,11 @@ package main import ( "bufio" - // "fmt" - "time" - "log" - "os" + "log" + "os" "syscall" + // "fmt". + "time" ) type PubsubMsg struct { @@ -35,53 +35,53 @@ type PubsubMsg struct { // hold tmp file for previous violations // 2 files // 1 - GK publish violations -// 1 - policy read violations +// 1 - policy read violations. func main() { filePath := "/tmp/violations/violations.txt" for { // Open the file in read-write mode - file, err := os.OpenFile(filePath, os.O_RDWR, 0644) - if err != nil { - log.Printf("Error opening file: %v\n", err) + file, err := os.OpenFile(filePath, os.O_RDWR, 0o644) + if err != nil { + log.Printf("Error opening file: %v\n", err) time.Sleep(5 * time.Second) continue - } + } - // Acquire an exclusive lock on the file - if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err != nil { - log.Fatalf("Error locking file: %v\n", err) - } + // Acquire an exclusive lock on the file + if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err != nil { + log.Fatalf("Error locking file: %v\n", err) + } - // Read the file content - scanner := bufio.NewScanner(file) - var lines []string - for scanner.Scan() { - lines = append(lines, scanner.Text()) - } + // Read the file content + scanner := bufio.NewScanner(file) + var lines []string + for scanner.Scan() { + lines = append(lines, scanner.Text()) + } - if err := scanner.Err(); err != nil { - log.Fatalf("Error reading file: %v\n", err) - } + if err := scanner.Err(); err != nil { + log.Fatalf("Error reading file: %v\n", err) + } - // Process the read content - for _, line := range lines { - log.Printf("Processed line: %s\n", line) - } + // Process the read content + for _, line := range lines { + log.Printf("Processed line: %s\n", line) + } - // Truncate the file to remove the processed content - if err := file.Truncate(0); err != nil { - log.Fatalf("Error truncating file: %v\n", err) - } + // Truncate the file to remove the processed content + if err := file.Truncate(0); err != nil { + log.Fatalf("Error truncating file: %v\n", err) + } - // Release the lock - if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil { - log.Fatalf("Error unlocking file: %v\n", err) - } + // Release the lock + if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil { + log.Fatalf("Error unlocking file: %v\n", err) + } - // Close the file - if err := file.Close(); err != nil { - log.Fatalf("Error closing file: %v\n", err) - } + // Close the file + if err := file.Close(); err != nil { + log.Fatalf("Error closing file: %v\n", err) + } } -} \ No newline at end of file +} diff --git a/test/pubsub/fake-writer/main.go b/test/pubsub/fake-writer/main.go index 197a64ac1b4..4854d9b59e1 100644 --- a/test/pubsub/fake-writer/main.go +++ b/test/pubsub/fake-writer/main.go @@ -1,9 +1,9 @@ package main import ( - "fmt" - "os" - // "time" + "fmt" + "os" + // "time". "syscall" ) @@ -32,7 +32,7 @@ func main() { msgId := 1 for { - file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) if err != nil { fmt.Println("failed to open file: %w", err) } @@ -41,22 +41,22 @@ func main() { if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err != nil { fmt.Println("failed to lock file: %w", err) } - + _, err = file.WriteString(fmt.Sprintf("violation_msg_", msgId) + "\n") if err != nil { fmt.Println("error publishing message to dapr: %w", err) } - + // Release the lock - if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil { - fmt.Println("Error unlocking file: %v\n", err) - } - - // Close the file - if err := file.Close(); err != nil { - fmt.Println("Error closing file: %v\n", err) - } + if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil { + fmt.Println("Error unlocking file: %v\n", err) + } + + // Close the file + if err := file.Close(); err != nil { + fmt.Println("Error closing file: %v\n", err) + } fmt.Println("Published message: violation_msg_", msgId) msgId++ } -} \ No newline at end of file +} diff --git a/test/pubsub/publish-components.yaml b/test/pubsub/publish-components.yaml index ea2aa4f4ddc..ed39e7f8e9e 100644 --- a/test/pubsub/publish-components.yaml +++ b/test/pubsub/publish-components.yaml @@ -15,14 +15,13 @@ # name: redis # key: redis-password # --- -apiVersion: v1 -kind: ConfigMap +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection metadata: name: audit namespace: gatekeeper-system -data: - provider: "diskwriter" - config: | - { - "path": "/tmp/violations" - } +spec: + driver: "diskwriter" + config: + path: "/tmp/violations" +