Skip to content

Commit

Permalink
feat(api): specify machine type and concurrency info of worker throug…
Browse files Browse the repository at this point in the history
…h env vars (#884)

* feat(api): specify machine type and concurrency info of worker through env vars

* support memory and memory type for worker

* update go.work.sum

* lint
  • Loading branch information
pyshx authored Feb 20, 2025
1 parent e96aa0d commit 9c2660d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 14 deletions.
11 changes: 8 additions & 3 deletions server/api/internal/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ type (
Auth_JWKSURI *string `pp:",omitempty"`

// worker
Worker_BatchSAEmail string `pp:",omitempty"`
Worker_BinaryPath string `pp:",omitempty"`
Worker_ImageURL string `pp:",omitempty"`
Worker_BatchSAEmail string `envconfig:"WORKER_BATCH_SA_EMAIL" pp:",omitempty"`
Worker_BinaryPath string `envconfig:"WORKER_BINARY_PATH" default:"reearth-flow-worker" pp:",omitempty"`
Worker_BootDiskSizeGB string `envconfig:"WORKER_BOOT_DISK_SIZE_GB" default:"50" pp:",omitempty"`
Worker_BootDiskType string `envconfig:"WORKER_BOOT_DISK_TYPE" default:"pd-balanced" pp:",omitempty"`
Worker_ImageURL string `envconfig:"WORKER_IMAGE_URL" pp:",omitempty"`
Worker_MachineType string `envconfig:"WORKER_MACHINE_TYPE" default:"e2-standard-4" pp:",omitempty"`
Worker_MaxConcurrency string `envconfig:"WORKER_MAX_CONCURRENCY" default:"4" pp:",omitempty"`
Worker_TaskCount string `envconfig:"WORKER_TASK_COUNT" default:"1" pp:",omitempty"`
}
)

Expand Down
25 changes: 21 additions & 4 deletions server/api/internal/app/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"strconv"

"github.com/reearth/reearth-flow/api/internal/app/config"
"github.com/reearth/reearth-flow/api/internal/infrastructure/auth0"
Expand Down Expand Up @@ -107,10 +108,26 @@ func initBatch(ctx context.Context, conf *config.Config) (batchRepo gateway.Batc
if conf.Worker_ImageURL != "" {
config := gcpbatch.BatchConfig{
BinaryPath: conf.Worker_BinaryPath,
ImageURI: conf.Worker_ImageURL,
ProjectID: conf.GCPProject,
Region: conf.GCPRegion,
SAEmail: conf.Worker_BatchSAEmail,
BootDiskSizeGB: func() int {
tc, err := strconv.Atoi(conf.Worker_BootDiskSizeGB)
if err != nil {
log.Fatalf("Failed to convert BootDiskSizeDB: %v", err)
}
return tc
}(),
BootDiskType: conf.Worker_BootDiskType,
ImageURI: conf.Worker_ImageURL,
MachineType: conf.Worker_MachineType,
ProjectID: conf.GCPProject,
Region: conf.GCPRegion,
SAEmail: conf.Worker_BatchSAEmail,
TaskCount: func() int {
tc, err := strconv.Atoi(conf.Worker_TaskCount)
if err != nil {
log.Fatalf("Failed to convert TaskCount: %v", err)
}
return tc
}(),
}

batchRepo, err = gcpbatch.NewBatch(ctx, config)
Expand Down
24 changes: 17 additions & 7 deletions server/api/internal/infrastructure/gcpbatch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ import (
)

type BatchConfig struct {
BinaryPath string
ImageURI string
ProjectID string
Region string
SAEmail string
BinaryPath string
BootDiskSizeGB int
BootDiskType string
ImageURI string
MachineType string
ProjectID string
Region string
SAEmail string
TaskCount int
}

type BatchClient interface {
Expand Down Expand Up @@ -125,14 +129,20 @@ func (b *BatchRepo) SubmitJob(ctx context.Context, jobID id.JobID, workflowsURL,
}

taskGroup := &batchpb.TaskGroup{
TaskCount: 1,
TaskCount: int64(b.config.TaskCount),
TaskSpec: taskSpec,
}
log.Debugfc(ctx, "gcpbatch: configured task group with count=%d", taskGroup.TaskCount)

bootDisk := &batchpb.AllocationPolicy_Disk{
Type: b.config.BootDiskType,
SizeGb: int64(b.config.BootDiskSizeGB),
}

instancePolicy := &batchpb.AllocationPolicy_InstancePolicy{
ProvisioningModel: batchpb.AllocationPolicy_STANDARD,
MachineType: "e2-standard-4",
MachineType: b.config.MachineType,
BootDisk: bootDisk,
}
log.Debugfc(ctx, "gcpbatch: configured instance policy with machine=%s", instancePolicy.MachineType)

Expand Down

0 comments on commit 9c2660d

Please sign in to comment.