Skip to content

Commit

Permalink
moved cronjob outside of condition, fixed patching errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ca7alindev committed Sep 23, 2024
1 parent c9def51 commit d7a4b9c
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 94 deletions.
4 changes: 2 additions & 2 deletions example/composition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
apiVersion: kndp.io/v1alpha1
kind: Poll
providerConfigRef: "kndp-kubernetes-provider-config"
deploymentImage: "ghcr.io/kndpio/function-poll/slack-collector:c9843"
cronJobImage: "ghcr.io/kndpio/function-poll/slack-notify:c9843"
deploymentImage: "ghcr.io/kndpio/function-poll/slack-collector:c9def"
cronJobImage: "ghcr.io/kndpio/function-poll/slack-notify:c9def"
deploymentName: "slack-collector"
serviceAccountName: "slack-collector"
2 changes: 1 addition & 1 deletion example/functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ metadata:
# render.crossplane.io/runtime: Development
spec:
# This is ignored when using the Development runtime.
package: ghcr.io/kndpio/function-poll:c9843
package: ghcr.io/kndpio/function-poll:c9def
runtimeConfigRef:
name: poll-function-runtime
2 changes: 1 addition & 1 deletion example/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ rules:
resources: ["ingresses"]
verbs: ["*"]
- apiGroups: ["kndp.io"]
resources: ["meals"]
resources: ["polls"]
verbs: ["*"]
- apiGroups: ["v1"]
resources: ["services"]
Expand Down
5 changes: 2 additions & 3 deletions example/xr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ metadata:
name: meal
spec:
deliveryTime: 0
dueOrderTime: 100000
dueOrderTime: 10
dueTakeTime: 0
voters: []
title: "meal"
schedule: "*/1 * * * *"
schedule: "0 * * * *"
messages:
question: "how are you?"
response: "thank you for response."
result: "here are the voting results:"

105 changes: 53 additions & 52 deletions fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,54 +144,56 @@ func (f *Function) RunFunction(_ context.Context, req *fnv1beta1.RunFunctionRequ
},
}

cronjob := composed.Unstructured{
Unstructured: unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "kubernetes.crossplane.io/v1alpha2",
"kind": "Object",
"metadata": map[string]interface{}{
"name": "slack-notify-cronjob",
},
"spec": map[string]interface{}{
"forProvider": map[string]interface{}{
"manifest": map[string]interface{}{
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": map[string]interface{}{
"name": "slack-notify-cronjob",
"namespace": "default",
},
"spec": map[string]interface{}{
"schedule": schedule,
"jobTemplate": map[string]interface{}{
"spec": map[string]interface{}{
"template": map[string]interface{}{
"spec": map[string]interface{}{
"restartPolicy": "OnFailure",
"serviceAccountName": input.ServiceAccountName,
"containers": []interface{}{
map[string]interface{}{
"name": "poll-container",
"image": input.CronJobImage,
"env": []interface{}{
map[string]interface{}{
"name": "SLACK_NOTIFY_MESSAGE",
"value": question,
},
map[string]interface{}{
"name": "POLL_NAME",
"value": pollName,
},
map[string]interface{}{
"name": "POLL_TITLE",
"value": pollTitle,
},
desired[resource.Name(deployment.GetName())] = &resource.DesiredComposed{Resource: &deployment}
}

cronjob := composed.Unstructured{
Unstructured: unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "kubernetes.crossplane.io/v1alpha2",
"kind": "Object",
"metadata": map[string]interface{}{
"name": "slack-notify-cronjob",
},
"spec": map[string]interface{}{
"forProvider": map[string]interface{}{
"manifest": map[string]interface{}{
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": map[string]interface{}{
"name": "slack-notify-cronjob",
"namespace": "default",
},
"spec": map[string]interface{}{
"schedule": schedule,
"jobTemplate": map[string]interface{}{
"spec": map[string]interface{}{
"template": map[string]interface{}{
"spec": map[string]interface{}{
"restartPolicy": "OnFailure",
"serviceAccountName": input.ServiceAccountName,
"containers": []interface{}{
map[string]interface{}{
"name": "poll-container",
"image": input.CronJobImage,
"env": []interface{}{
map[string]interface{}{
"name": "SLACK_NOTIFY_MESSAGE",
"value": question,
},
map[string]interface{}{
"name": "POLL_NAME",
"value": pollName,
},
"envFrom": []interface{}{
map[string]interface{}{
"secretRef": map[string]interface{}{
"name": secretName,
},
map[string]interface{}{
"name": "POLL_TITLE",
"value": pollTitle,
},
},
"envFrom": []interface{}{
map[string]interface{}{
"secretRef": map[string]interface{}{
"name": secretName,
},
},
},
Expand All @@ -203,15 +205,14 @@ func (f *Function) RunFunction(_ context.Context, req *fnv1beta1.RunFunctionRequ
},
},
},
"providerConfigRef": map[string]interface{}{"name": input.ProviderConfigRef},
},
"providerConfigRef": map[string]interface{}{"name": input.ProviderConfigRef},
},
},
}

desired[resource.Name(cronjob.GetName())] = &resource.DesiredComposed{Resource: &cronjob}
desired[resource.Name(deployment.GetName())] = &resource.DesiredComposed{Resource: &deployment}
},
}
desired[resource.Name(cronjob.GetName())] = &resource.DesiredComposed{Resource: &cronjob}

if err := response.SetDesiredComposedResources(rsp, desired); err != nil {
return rsp, err
}
Expand Down
11 changes: 5 additions & 6 deletions internal/slack-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ type Poll struct {
}

var (
api = slack.New(os.Getenv("SLACK_API_TOKEN"))
path = os.Getenv("SLACK_COLLECTOR_PATH")
port = os.Getenv("SLACK_COLLECTOR_PORT")
pollResource *Poll
api = slack.New(os.Getenv("SLACK_API_TOKEN"))
path = os.Getenv("SLACK_COLLECTOR_PATH")
port = os.Getenv("SLACK_COLLECTOR_PORT")
)

// handleEventsEndpoint handles the events endpoint.
Expand Down Expand Up @@ -167,16 +166,16 @@ func getK8sResource(dynamicClient dynamic.Interface, ctx context.Context, pollSl

// respondMsg sends a response message to Slack.
func respondMsg(userID string, userName string, selectedOption string, pollName string) {

attachment := slack.Attachment{
Color: "#f9a41b",
CallbackID: pollName,
Text: pollResource.Spec.Messages.Response + "\n Selected: " + selectedOption,
Text: "\n Selected: " + selectedOption,
Fields: []slack.AttachmentField{},
Actions: []slack.AttachmentAction{},
MarkdownIn: []string{},
Blocks: slack.Blocks{},
}

channelID, _, err := api.PostMessage(
userID,
slack.MsgOptionText("", true),
Expand Down
64 changes: 35 additions & 29 deletions internal/slack-notify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"os"
"time"

"github.com/crossplane/function-sdk-go/logging"
"github.com/nlopes/slack"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -56,29 +56,27 @@ type Poll struct {
} `json:"status"`
}

// ProcessSlackMembers gets and process slack members
func ProcessSlackMembers(api *slack.Client, channelID string, logger logging.Logger) ([]string, error) {
members, _, err := api.GetUsersInConversation(&slack.GetUsersInConversationParameters{ChannelID: channelID})
// getK8sResource gets the Kubernetes resource.
func getK8sResource(dynamicClient dynamic.Interface, ctx context.Context, pollSlackName string, resId schema.GroupVersionResource) (*Poll, error) {

res, err := dynamicClient.Resource(resId).Namespace("").
List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
realUsers := make([]string, 0)
for _, memberID := range members {
userInfo, err := api.GetUserInfo(memberID)
if err != nil {
logger.Info("error getting user info for", memberID, err)
continue
for _, item := range res.Items {
res := &Poll{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, res); err != nil {
return nil, fmt.Errorf("error converting Unstructured to Poll struct: %v", err)
}

if !userInfo.IsBot {
realUsers = append(realUsers, userInfo.Name)
if res.GetObjectMeta().GetName() == pollSlackName {
return res, nil
}
}
return realUsers, nil
return nil, fmt.Errorf("poll resource with name %s not found", pollSlackName)
}

func main() {
fmt.Println("resourceId")
config, err := ctrl.GetConfig()
if err != nil {
fmt.Println("error getting config", err)
Expand All @@ -92,25 +90,33 @@ func main() {
Version: "v1alpha1",
Resource: "polls",
}
poll := Poll{
Status: struct {
Done bool `json:"done"`
LastNotificationTime int64 `json:"lastNotificationTime"`
}{
Done: false,
LastNotificationTime: time.Now().Unix(),
},
}

u, err := json.Marshal(poll)
pollResource, _ := getK8sResource(client, context.Background(), pollName, resourceId)
pollResource.GetObjectMeta().SetManagedFields(nil)
pollBytes, _ := json.Marshal(pollResource)
_, err = client.Resource(resourceId).Namespace("").Patch(context.Background(), pollResource.GetObjectMeta().GetName(), types.MergePatchType, pollBytes, metav1.PatchOptions{FieldManager: "slack-collector"})
if err != nil {
fmt.Println("error marshalling poll object", err)
return
fmt.Println("Error patching poll resource", err)
}

_, err = client.Resource(resourceId).Namespace("").Patch(context.Background(), pollName, types.MergePatchType, u, metav1.PatchOptions{})
statusBytes, _ := json.Marshal(map[string]interface{}{
"status": map[string]interface{}{
"done": false,
"lastNotificationTime": time.Now().Unix(),
},
})

// Use the "/status" subresource to update just the status
_, err = client.Resource(resourceId).Namespace("").Patch(
context.Background(),
pollResource.GetObjectMeta().GetName(),
types.MergePatchType,
statusBytes,
metav1.PatchOptions{FieldManager: "slack-collector"},
"/status",
)
if err != nil {
fmt.Println("error applying resource", err)
fmt.Println("Error patching poll status", err)
}

api := slack.New(token)
Expand Down

0 comments on commit d7a4b9c

Please sign in to comment.