From 9c2660d26d1a1e6ef163a718bf3a9be7d0a74a1d Mon Sep 17 00:00:00 2001 From: Piyush Chauhan <42397980+pyshx@users.noreply.github.com> Date: Thu, 20 Feb 2025 14:58:12 +0530 Subject: [PATCH] feat(api): specify machine type and concurrency info of worker through env vars (#884) * feat(api): specify machine type and concurrency info of worker through env vars * support memory and memory type for worker * update go.work.sum * lint --- server/api/internal/app/config/config.go | 11 +++++--- server/api/internal/app/repo.go | 25 ++++++++++++++++--- .../internal/infrastructure/gcpbatch/batch.go | 24 ++++++++++++------ 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/server/api/internal/app/config/config.go b/server/api/internal/app/config/config.go index 6bec69296..63204c464 100644 --- a/server/api/internal/app/config/config.go +++ b/server/api/internal/app/config/config.go @@ -69,9 +69,14 @@ type ( Auth_JWKSURI *string `pp:",omitempty"` // worker - Worker_BatchSAEmail string `pp:",omitempty"` - Worker_BinaryPath string `pp:",omitempty"` - Worker_ImageURL string `pp:",omitempty"` + Worker_BatchSAEmail string `envconfig:"WORKER_BATCH_SA_EMAIL" pp:",omitempty"` + Worker_BinaryPath string `envconfig:"WORKER_BINARY_PATH" default:"reearth-flow-worker" pp:",omitempty"` + Worker_BootDiskSizeGB string `envconfig:"WORKER_BOOT_DISK_SIZE_GB" default:"50" pp:",omitempty"` + Worker_BootDiskType string `envconfig:"WORKER_BOOT_DISK_TYPE" default:"pd-balanced" pp:",omitempty"` + Worker_ImageURL string `envconfig:"WORKER_IMAGE_URL" pp:",omitempty"` + Worker_MachineType string `envconfig:"WORKER_MACHINE_TYPE" default:"e2-standard-4" pp:",omitempty"` + Worker_MaxConcurrency string `envconfig:"WORKER_MAX_CONCURRENCY" default:"4" pp:",omitempty"` + Worker_TaskCount string `envconfig:"WORKER_TASK_COUNT" default:"1" pp:",omitempty"` } ) diff --git a/server/api/internal/app/repo.go b/server/api/internal/app/repo.go index df32a3b1b..a10fca540 100644 --- a/server/api/internal/app/repo.go +++ b/server/api/internal/app/repo.go @@ -2,6 +2,7 @@ package app import ( "context" + "strconv" "github.com/reearth/reearth-flow/api/internal/app/config" "github.com/reearth/reearth-flow/api/internal/infrastructure/auth0" @@ -107,10 +108,26 @@ func initBatch(ctx context.Context, conf *config.Config) (batchRepo gateway.Batc if conf.Worker_ImageURL != "" { config := gcpbatch.BatchConfig{ BinaryPath: conf.Worker_BinaryPath, - ImageURI: conf.Worker_ImageURL, - ProjectID: conf.GCPProject, - Region: conf.GCPRegion, - SAEmail: conf.Worker_BatchSAEmail, + BootDiskSizeGB: func() int { + tc, err := strconv.Atoi(conf.Worker_BootDiskSizeGB) + if err != nil { + log.Fatalf("Failed to convert BootDiskSizeDB: %v", err) + } + return tc + }(), + BootDiskType: conf.Worker_BootDiskType, + ImageURI: conf.Worker_ImageURL, + MachineType: conf.Worker_MachineType, + ProjectID: conf.GCPProject, + Region: conf.GCPRegion, + SAEmail: conf.Worker_BatchSAEmail, + TaskCount: func() int { + tc, err := strconv.Atoi(conf.Worker_TaskCount) + if err != nil { + log.Fatalf("Failed to convert TaskCount: %v", err) + } + return tc + }(), } batchRepo, err = gcpbatch.NewBatch(ctx, config) diff --git a/server/api/internal/infrastructure/gcpbatch/batch.go b/server/api/internal/infrastructure/gcpbatch/batch.go index 7d70a61ef..86286eb51 100644 --- a/server/api/internal/infrastructure/gcpbatch/batch.go +++ b/server/api/internal/infrastructure/gcpbatch/batch.go @@ -19,11 +19,15 @@ import ( ) type BatchConfig struct { - BinaryPath string - ImageURI string - ProjectID string - Region string - SAEmail string + BinaryPath string + BootDiskSizeGB int + BootDiskType string + ImageURI string + MachineType string + ProjectID string + Region string + SAEmail string + TaskCount int } type BatchClient interface { @@ -125,14 +129,20 @@ func (b *BatchRepo) SubmitJob(ctx context.Context, jobID id.JobID, workflowsURL, } taskGroup := &batchpb.TaskGroup{ - TaskCount: 1, + TaskCount: int64(b.config.TaskCount), TaskSpec: taskSpec, } log.Debugfc(ctx, "gcpbatch: configured task group with count=%d", taskGroup.TaskCount) + bootDisk := &batchpb.AllocationPolicy_Disk{ + Type: b.config.BootDiskType, + SizeGb: int64(b.config.BootDiskSizeGB), + } + instancePolicy := &batchpb.AllocationPolicy_InstancePolicy{ ProvisioningModel: batchpb.AllocationPolicy_STANDARD, - MachineType: "e2-standard-4", + MachineType: b.config.MachineType, + BootDisk: bootDisk, } log.Debugfc(ctx, "gcpbatch: configured instance policy with machine=%s", instancePolicy.MachineType)