Skip to content

Commit

Permalink
feat: resource delete command (#212)
Browse files Browse the repository at this point in the history
* feat: update protos optimus

* feat: add cli command resource delete

* feat: implement delete resource

* feat: register delete command

* fix: handle panic on deleted resource

* feat: support delete resource on existing api (#211)

* feat: implement delete resource on upload all

* feat: upload-all API support recreation on deleted resource

* feat: add validation on resource apply

* feat: add support create on deleted resource

* feat: add validation update feature on deleted resource

* feat: add validation on resolve internal upstream

* feat: register delete command

* fix: unit test intermittent failed due to upstream order

* feat: change initialize resource resolver

* feat: add log and update status check on delete resource

* feat: rename error name on check is deleted resource

---------

Co-authored-by: oky.setiawan <[email protected]>

* fix: message mark deleted error

* feat: update latest proton

* feat: add checking is deleted on get all resources

* feat: update review and remove job check deleted resource

* feat: update query GetResources to return only active

* fix: bench test on ReadAll and ReadByFullName resource

* fix: resource integration test

* fix: MarkToDelete state message

* feat: handle deleted resource in change namespace

* feat: remove unnecessary changes

* fix: integration test stuck due to defer rollback

* feat: update from review

---------

Co-authored-by: oky.setiawan <[email protected]>
  • Loading branch information
okysetiawan and oky.setiawan authored Apr 23, 2024
1 parent bb951d1 commit af3fb20
Show file tree
Hide file tree
Showing 27 changed files with 1,700 additions and 243 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "b49aa053a3cae8cb3b712c099172ba1ae7a99c8c"
PROTON_COMMIT := "6a8c5c5bac05a3491ca55046d4215c34b3953cb4"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
141 changes: 141 additions & 0 deletions client/cmd/resource/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package resource

import (
"context"
"errors"
"fmt"

"github.com/MakeNowJust/heredoc"
"github.com/goto/salt/log"
"github.com/spf13/cobra"

"github.com/goto/optimus/client/cmd/internal/connection"
"github.com/goto/optimus/client/cmd/internal/logger"
"github.com/goto/optimus/client/cmd/internal/progressbar"
"github.com/goto/optimus/client/cmd/internal/survey"
"github.com/goto/optimus/config"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

type deleteCommand struct {
logger log.Logger
connection connection.Connection

configFilePath string
clientConfig *config.ClientConfig

namespaceSurvey *survey.NamespaceSurvey
namespaceName string
projectName string
storeName string
resourceName string
verbose, force bool
}

// NewDeleteCommand initializes command for delete resource from optimus
func NewDeleteCommand() *cobra.Command {
l := logger.NewClientLogger()
apply := &deleteCommand{
logger: l,
namespaceSurvey: survey.NewNamespaceSurvey(l),
}

cmd := &cobra.Command{
Use: "delete",
Short: "Delete resource from optimus",
Long: heredoc.Doc(`Delete resource from Optimus`),
Example: "optimus resource delete <resource-name> -c=<config-file-path>",
Annotations: map[string]string{
"group:core": "true",
},
RunE: apply.RunE,
PreRunE: apply.PreRunE,
}
cmd.Flags().StringVarP(&apply.configFilePath, "config", "c", apply.configFilePath, "File path for client configuration")
cmd.Flags().BoolVarP(&apply.verbose, "verbose", "v", false, "Print details related to delete stages")
cmd.Flags().StringVarP(&apply.namespaceName, "namespace", "n", "", "Namespace name within project")
cmd.Flags().StringVarP(&apply.storeName, "datastore", "s", "bigquery", "Datastore type where the resource belongs")
cmd.Flags().BoolVarP(&apply.force, "force", "f", false, "Force delete, ignoring job downstream")
return cmd
}

func (a *deleteCommand) PreRunE(_ *cobra.Command, _ []string) error {
var err error
a.clientConfig, err = config.LoadClientConfig(a.configFilePath)
if err != nil {
return err
}

a.connection = connection.New(a.logger, a.clientConfig)

return nil
}

func (a *deleteCommand) RunE(_ *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("one argument for resource name is required")
}
a.resourceName = args[0]
a.logger.Info("> Validating resource name")
if len(a.resourceName) == 0 {
return errors.New("empty resource name")
}

if a.projectName == "" {
a.projectName = a.clientConfig.Project.Name
}

var namespace *config.Namespace
// use flag or ask namespace name
if a.namespaceName == "" {
var err error
namespace, err = a.namespaceSurvey.AskToSelectNamespace(a.clientConfig)
if err != nil {
return err
}
a.namespaceName = namespace.Name
}

return a.delete()
}

func (a *deleteCommand) delete() error {
conn, err := a.connection.Create(a.clientConfig.Host)
if err != nil {
return err
}
defer conn.Close()

apply := pb.NewResourceServiceClient(conn)

spinner := progressbar.NewProgressBar()
spinner.Start("please wait...")

deleteResourceRequest := pb.DeleteResourceRequest{
ProjectName: a.projectName,
NamespaceName: a.namespaceName,
DatastoreName: a.storeName,
ResourceName: a.resourceName,
Force: a.force,
}

ctx, cancelFunc := context.WithTimeout(context.Background(), applyTimeout)
defer cancelFunc()

responses, err := apply.DeleteResource(ctx, &deleteResourceRequest)
spinner.Stop()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
a.logger.Error("delete took too long, timing out")
}
return fmt.Errorf("failed to delete resource: %w", err)
}

if len(responses.GetDownstreamJobs()) > 0 {
a.logger.Info(fmt.Sprintf("success delete resource %s with downstreamJobs: [%s]", deleteResourceRequest.ResourceName, responses.DownstreamJobs))
} else {
a.logger.Info(fmt.Sprintf("success delete resource %s", deleteResourceRequest.ResourceName))
}

return nil
}
1 change: 1 addition & 0 deletions client/cmd/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ func NewResourceCommand() *cobra.Command {
cmd.AddCommand(NewExportCommand())
cmd.AddCommand(NewChangeNamespaceCommand())
cmd.AddCommand(NewApplyCommand())
cmd.AddCommand(NewDeleteCommand())
return cmd
}
21 changes: 21 additions & 0 deletions core/event/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ func (r ResourceUpdated) Bytes() ([]byte, error) {
return resourceEventToBytes(r.Event, r.Resource, pbInt.OptimusChangeEvent_EVENT_TYPE_RESOURCE_UPDATE)
}

type ResourceDeleted struct {
Event

Resource *resource.Resource
}

func NewResourceDeleteEvent(rsc *resource.Resource) (*ResourceDeleted, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &ResourceDeleted{
Event: baseEvent,
Resource: rsc,
}, nil
}

func (r ResourceDeleted) Bytes() ([]byte, error) {
return resourceEventToBytes(r.Event, r.Resource, pbInt.OptimusChangeEvent_EVENT_TYPE_RESOURCE_DELETE)
}

func resourceEventToBytes(event Event, rsc *resource.Resource, eventType pbInt.OptimusChangeEvent_EventType) ([]byte, error) {
meta := rsc.Metadata()
if meta == nil {
Expand Down
3 changes: 2 additions & 1 deletion core/job/resolver/internal_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolved
}

distinctUpstreams := job.Upstreams(upstreamResults).Deduplicate()
return job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctUpstreams), me.ToErr()
jobWithMergedUpstream := job.NewWithUpstream(jobWithUnresolvedUpstream.Job(), distinctUpstreams)
return jobWithMergedUpstream, me.ToErr()
}

func (i internalUpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.ProjectName, jobsWithUnresolvedUpstream []*job.WithUpstream) ([]*job.WithUpstream, error) {
Expand Down
19 changes: 19 additions & 0 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,3 +1259,22 @@ func (j *JobService) generateDestinationURN(ctx context.Context, tenantWithDetai

return job.ResourceURN(destinationURN), nil
}

func (j *JobService) GetDownstreamByResourceURN(ctx context.Context, tnnt tenant.Tenant, urn job.ResourceURN) (job.DownstreamList, error) {
var dependentJobs []*job.Downstream

jobs, err := j.downstreamRepo.GetDownstreamBySources(ctx, []job.ResourceURN{urn})
if err != nil {
return nil, err
}

for i := range jobs {
if jobs[i].ProjectName() != tnnt.ProjectName() || jobs[i].NamespaceName() != tnnt.NamespaceName() {
continue
}

dependentJobs = append(dependentJobs, jobs[i])
}

return dependentJobs, nil
}
36 changes: 36 additions & 0 deletions core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,42 @@ func TestJobService(t *testing.T) {
assert.Nil(t, err)
})
})

t.Run("GetDownstreamByResourceURN", func(t *testing.T) {
tnnt, _ := tenant.NewTenant("proj123", "name123")
urn := job.ResourceURN("bigquery://dataset.table_test")
downstreamJobs := []*job.Downstream{
job.NewDownstream("jobA", tnnt.ProjectName(), tnnt.NamespaceName(), "jobA"),
job.NewDownstream("jobA", "proj501", "name4012", "jobA"),
}

t.Run("success found dependent job", func(t *testing.T) {
var (
downstreamRepo = new(DownstreamRepository)
jobService = service.NewJobService(nil, nil, downstreamRepo, nil, nil, nil, nil, nil, nil, nil)
)

downstreamRepo.On("GetDownstreamBySources", ctx, []job.ResourceURN{urn}).Return(downstreamJobs, nil)

actual, err := jobService.GetDownstreamByResourceURN(ctx, tnnt, urn)
assert.NoError(t, err)
assert.NotNil(t, actual)
assert.Len(t, actual.GetDownstreamFullNames(), 1)
})

t.Run("return error when GetDownstreamBySources", func(t *testing.T) {
var (
downstreamRepo = new(DownstreamRepository)
jobService = service.NewJobService(nil, nil, downstreamRepo, nil, nil, nil, nil, nil, nil, nil)
)

downstreamRepo.On("GetDownstreamBySources", ctx, []job.ResourceURN{urn}).Return(nil, context.DeadlineExceeded)

actual, err := jobService.GetDownstreamByResourceURN(ctx, tnnt, urn)
assert.Error(t, err)
assert.Nil(t, actual)
})
})
}

// JobRepository is an autogenerated mock type for the JobRepository type
Expand Down
32 changes: 32 additions & 0 deletions core/resource/handler/v1beta1/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
type ResourceService interface {
Create(ctx context.Context, res *resource.Resource) error
Update(ctx context.Context, res *resource.Resource, logWriter writer.LogWriter) error
Delete(ctx context.Context, req *resource.DeleteRequest) (*resource.DeleteResponse, error)
ChangeNamespace(ctx context.Context, datastore resource.Store, resourceFullName string, oldTenant, newTenant tenant.Tenant) error
Get(ctx context.Context, tnnt tenant.Tenant, store resource.Store, resourceName string) (*resource.Resource, error)
GetAll(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error)
Expand Down Expand Up @@ -265,6 +266,37 @@ func (rh ResourceHandler) UpdateResource(ctx context.Context, req *pb.UpdateReso
return &pb.UpdateResourceResponse{}, nil
}

func (rh ResourceHandler) DeleteResource(ctx context.Context, req *pb.DeleteResourceRequest) (*pb.DeleteResourceResponse, error) {
tnnt, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
rh.l.Error("invalid tenant information request project [%s] namespace [%s]: %s", req.GetProjectName(), req.GetNamespaceName(), err)
return nil, errors.GRPCErr(err, "failed to update resource")
}

store, err := resource.FromStringToStore(req.GetDatastoreName())
if err != nil {
rh.l.Error("invalid datastore name [%s]: %s", req.GetDatastoreName(), err)
return nil, errors.GRPCErr(err, "invalid update resource request")
}

deleteReq := &resource.DeleteRequest{
Tenant: tnnt,
Datastore: store,
FullName: req.GetResourceName(),
Force: req.GetForce(),
}
var deleteRes *resource.DeleteResponse
deleteRes, err = rh.service.Delete(ctx, deleteReq)
if err != nil {
rh.l.Error("error deleting resource [%s]: %s", req.ResourceName, err)
return nil, errors.GRPCErr(err, "failed to delete resource "+err.Error())
}

raiseResourceDatastoreEventMetric(tnnt, deleteRes.Resource.Store().String(), deleteRes.Resource.Kind(), deleteRes.Resource.Status().String())
res := &pb.DeleteResourceResponse{DownstreamJobs: deleteRes.DownstreamJobs}
return res, nil
}

func (rh ResourceHandler) ChangeResourceNamespace(ctx context.Context, req *pb.ChangeResourceNamespaceRequest) (*pb.ChangeResourceNamespaceResponse, error) {
tnnt, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
Expand Down
Loading

0 comments on commit af3fb20

Please sign in to comment.