Skip to content

Commit

Permalink
support memory and memory type for worker
Browse files Browse the repository at this point in the history
  • Loading branch information
pyshx committed Feb 20, 2025
1 parent 403661a commit 1cee710
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
2 changes: 2 additions & 0 deletions server/api/internal/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type (
// worker
Worker_BatchSAEmail string `envconfig:"WORKER_BATCH_SA_EMAIL" pp:",omitempty"`
Worker_BinaryPath string `envconfig:"WORKER_BINARY_PATH" default:"reearth-flow-worker" pp:",omitempty"`
Worker_BootDiskSizeGB string `envconfig:"WORKER_BOOT_DISK_SIZE_GB" default:"50" pp:",omitempty"`
Worker_BootDiskType string `envconfig:"WORKER_BOOT_DISK_TYPE" default:"pd-balanced" pp:",omitempty"`
Worker_ImageURL string `envconfig:"WORKER_IMAGE_URL" pp:",omitempty"`
Worker_MachineType string `envconfig:"WORKER_MACHINE_TYPE" default:"e2-standard-4" pp:",omitempty"`
Worker_MaxConcurrency string `envconfig:"WORKER_MAX_CONCURRENCY" default:"4" pp:",omitempty"`
Expand Down
13 changes: 7 additions & 6 deletions server/api/internal/app/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ func initBatch(ctx context.Context, conf *config.Config) (batchRepo gateway.Batc
if conf.Worker_ImageURL != "" {
config := gcpbatch.BatchConfig{
BinaryPath: conf.Worker_BinaryPath,
ImageURI: conf.Worker_ImageURL,
MachineType: conf.Worker_MachineType,
MaxConcurrency: func() int {
mc, err := strconv.Atoi(conf.Worker_MaxConcurrency)
BootDiskSizeGB: func() int {
tc, err := strconv.Atoi(conf.Worker_BootDiskSizeGB)
if err != nil {
log.Fatalf("Failed to convert MaxConcurrency: %v", err)
log.Fatalf("Failed to convert BootDiskSizeDB: %v", err)
}
return mc
return tc
}(),
BootDiskType: conf.Worker_BootDiskType,
ImageURI: conf.Worker_ImageURL,
MachineType: conf.Worker_MachineType,
ProjectID: conf.GCPProject,
Region: conf.GCPRegion,
SAEmail: conf.Worker_BatchSAEmail,
Expand Down
13 changes: 9 additions & 4 deletions server/api/internal/infrastructure/gcpbatch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"regexp"
"strconv"
"strings"

batch "cloud.google.com/go/batch/apiv1"
Expand All @@ -21,9 +20,10 @@ import (

type BatchConfig struct {
BinaryPath string
BootDiskSizeGB int
BootDiskType string
ImageURI string
MachineType string
MaxConcurrency int
ProjectID string
Region string
SAEmail string
Expand Down Expand Up @@ -124,7 +124,6 @@ func (b *BatchRepo) SubmitJob(ctx context.Context, jobID id.JobID, workflowsURL,
Variables: map[string]string{
"FLOW_RUNTIME_FEATURE_WRITER_DISABLE": "true",
"FLOW_WORKER_ENABLE_JSON_LOG": "true",
"FLOW_WORKER_MAXPROCS": strconv.Itoa(b.config.MaxConcurrency),
},
},
}
Expand All @@ -135,9 +134,15 @@ func (b *BatchRepo) SubmitJob(ctx context.Context, jobID id.JobID, workflowsURL,
}
log.Debugfc(ctx, "gcpbatch: configured task group with count=%d", taskGroup.TaskCount)

bootDisk := &batchpb.AllocationPolicy_Disk{
Type: b.config.BootDiskType,
SizeGb: int64(b.config.BootDiskSizeGB),
}

instancePolicy := &batchpb.AllocationPolicy_InstancePolicy{
ProvisioningModel: batchpb.AllocationPolicy_STANDARD,
MachineType: b.config.MachineType,
MachineType: b.config.MachineType,
BootDisk: bootDisk,
}
log.Debugfc(ctx, "gcpbatch: configured instance policy with machine=%s", instancePolicy.MachineType)

Expand Down

0 comments on commit 1cee710

Please sign in to comment.