Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new resource hint to all sdks for number of cpus per worker machine #28848

Merged
merged 10 commits into from
Oct 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -1982,5 +1982,9 @@ message StandardResourceHints {
// SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB).
// Payload: ASCII encoded string of the base 10 representation of an integer number of bytes.
MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"];
// Describes desired number of CPUs available in transform's execution environment.
Abacn marked this conversation as resolved.
Show resolved Hide resolved
// SDKs should accept and validate a positive integer count.
// Payload: ASCII encoded string of the base 10 representation of an integer number of CPUs.
CPU_COUNT = 2 [(beam_urn) = "beam:resources:cpu_count:v1"];
}
}
67 changes: 47 additions & 20 deletions sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 45 additions & 36 deletions sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions sdks/go/pkg/beam/options/resource/hint.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,40 @@ func (h acceleratorHint) MergeWithOuter(outer Hint) Hint {
func (h acceleratorHint) String() string {
return fmt.Sprintf("accelerator=%v", h.value)
}

// CPUCount hints that this scope should be put in a machine with at least this many CPUs or vCPUs.
//
// Hints are advisory only and runners may not respect them.
//
// See https://beam.apache.org/documentation/runtime/resource-hints/ for more information about
// resource hints.
func CPUCount(v uint64) Hint {
return CPUCountHint{value: uint64(v)}
}

type CPUCountHint struct {
value uint64
}

func (CPUCountHint) URN() string {
return "beam:resources:cpu_count:v1"
}

func (h CPUCountHint) Payload() []byte {
// Go strings are utf8, and if the string is ascii,
// byte conversion handles that directly.
return []byte(strconv.FormatUint(h.value, 10))
}

// MergeWithOuter by keeping the maximum of the two cpu counts.
func (h CPUCountHint) MergeWithOuter(outer Hint) Hint {
// Intentional runtime panic from type assertion to catch hint merge errors.
if outer.(CPUCountHint).value > h.value {
return outer
}
return h
}

func (h CPUCountHint) String() string {
return fmt.Sprintf("cpu_count=%v", humanize.Bytes(uint64(h.value)))
}
Loading