diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 2483103b5794..db958f183c45 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1982,5 +1982,9 @@ message StandardResourceHints { // SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB). // Payload: ASCII encoded string of the base 10 representation of an integer number of bytes. MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"]; + // Describes desired number of CPUs available in transform's execution environment. + // SDKs should accept and validate a positive integer count. + // Payload: ASCII encoded string of the base 10 representation of an integer number of CPUs. + CPU_COUNT = 2 [(beam_urn) = "beam:resources:cpu_count:v1"]; } } diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go index 1d547470ea1a..9d14cff3c7d6 100644 --- a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go +++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go @@ -27,7 +27,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/fn_execution/v1/beam_fn_api.proto // TODO: Consider consolidating common components in another package @@ -1883,30 +1883,57 @@ func (x *ProcessBundleSplitRequest) GetDesiredSplits() map[string]*ProcessBundle // first_residual_element. // - The current bundle, if no further splits happen, will have done exactly // the work under primary_roots and all elements up to and including the -// channel splits last_primary_element. +// channel split's last_primary_element. // // This allows the SDK to relinquish ownership of and commit to not process some // of the elements that it may have been sent (the residual) while retaining // ownership and commitment to finish the other portion (the primary). // -// For example, lets say the SDK is processing elements A B C D E and a split -// request comes in. The SDK could return a response with a channel split -// representing a last_primary_element of 3 (D) and first_residual_element of 4 -// (E). The SDK is now responsible for processing A B C D and the runner must -// process E in the future. A future split request could have the SDK split the -// elements B into B1 and B2 and C into C1 and C2 representing their primary and -// residual roots. The SDK would return a response with a channel split -// representing a last_primary_element of 0 (A) and first_residual_element of 3 -// (D) with primary_roots (B1, C1) and residual_roots (B2, C2). The SDK is now -// responsible for processing A B1 C1 and the runner must process C2 D2 (and E -// from the prior split) in the future. Yet another future split request could -// have the SDK could split B1 further into B1a and B1b primary and residuals -// and return C2 as a residual (assuming C2 was left unprocessed). The SDK would -// return a response with a channel split representing a last_primary_element of -// 0 (A) and first_residual_element of 4 (E) with primary_roots (B1a) and -// residual_roots (B1b, C1). The SDK is now responsible for processing A B1a the -// runner must process B1b C1 (in addition to C2, D, E from prior splits) in the -// future. +// Example with three splits of a single bundle: +// Let's say the SDK is processing elements [A B C D E]. These elements make +// up the 0-indexed channel. +// +// ** First Split ** +// Channel Split = [ A B C D <> E ] +// Primary Roots = [] (No elements were split) +// Residual Roots = [] +// +// Say a split request comes in. The SDK could return a response with a channel +// split representing a last_primary_element of 3 (D) and +// first_residual_element of 4 (E). The SDK is now responsible for processing A +// B C D and the runner must process E in the future. +// +// (A B C D) | (E) +// +// ** Second Split ** +// Channel Split = [ A < B C > D E ] +// Primary Roots = [B1 C1] +// Residual Roots = [B2 C2] +// +// A future split request could have the SDK split the elements B into B1 and +// B2 and C into C1 and C2 representing their primary and residual roots. The +// +// (A B1 C1) | (B2 C2 D) +// +// SDK would return a response with a channel split representing a +// last_primary_element of 0 (A) and first_residual_element of 3 (D) with +// primary_roots (B1, C1) and residual_roots (B2, C2). The SDK is now +// responsible for processing A B1 C1 and the runner must process B2 C2 D (and +// E from the prior split) in the future. +// +// ** Third Split ** +// Channel Split = [ A < B C > D E ] +// Primary Roots = [B1a] +// Residual Roots [B1b C1] +// Yet another future split request could have the SDK could split B1 further +// into B1a and B1b primary and residuals and return C1 as a residual (assuming +// C1 was left unprocessed). The SDK would return a response with a channel +// split representing a last_primary_element of 0 (A) and +// first_residual_element of 3 (E) with primary_roots (B1a) and residual_roots +// (B1b, C1). The SDK is now responsible for processing A B1a the runner must +// process B1b C1 (in addition to C2, D, E from prior splits) in the future. +// +// (A B1a) | (B1b C1) // // For more rigorous definitions see https://s.apache.org/beam-breaking-fusion type ProcessBundleSplitResponse struct { diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api_grpc.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api_grpc.pb.go index ac9e402750c4..cd53ea805705 100644 --- a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api_grpc.pb.go +++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.1.0 -// - protoc v4.24.0--rc1 +// - protoc v4.24.4 // source: org/apache/beam/model/fn_execution/v1/beam_fn_api.proto package fnexecution_v1 diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api.pb.go index a24609b2fd05..26cf245f7206 100644 --- a/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api.pb.go +++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api.pb.go @@ -22,7 +22,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/fn_execution/v1/beam_provision_api.proto package fnexecution_v1 diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api_grpc.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api_grpc.pb.go index f9c6f5681399..9064b348b4c0 100644 --- a/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api_grpc.pb.go +++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_provision_api_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.1.0 -// - protoc v4.24.0--rc1 +// - protoc v4.24.4 // source: org/apache/beam/model/fn_execution/v1/beam_provision_api.proto package fnexecution_v1 diff --git a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api.pb.go b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api.pb.go index 6a7663d77e9c..85bb2e368970 100644 --- a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api.pb.go +++ b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api.pb.go @@ -22,7 +22,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/job_management/v1/beam_artifact_api.proto package jobmanagement_v1 diff --git a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api_grpc.pb.go b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api_grpc.pb.go index 6b381b96f3d1..28e43e21fbbd 100644 --- a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api_grpc.pb.go +++ b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_artifact_api_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.1.0 -// - protoc v4.24.0--rc1 +// - protoc v4.24.4 // source: org/apache/beam/model/job_management/v1/beam_artifact_api.proto package jobmanagement_v1 diff --git a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api.pb.go b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api.pb.go index 0f33c7ab9e3c..8f7ca43ec0f5 100644 --- a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api.pb.go +++ b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api.pb.go @@ -22,7 +22,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/job_management/v1/beam_expansion_api.proto package jobmanagement_v1 diff --git a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api_grpc.pb.go b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api_grpc.pb.go index e2cc3c4f77ec..f1c3782f5fb8 100644 --- a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api_grpc.pb.go +++ b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_expansion_api_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.1.0 -// - protoc v4.24.0--rc1 +// - protoc v4.24.4 // source: org/apache/beam/model/job_management/v1/beam_expansion_api.proto package jobmanagement_v1 diff --git a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go index d93130d26d9f..62e0b313ec2d 100644 --- a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go +++ b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go @@ -22,7 +22,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/job_management/v1/beam_job_api.proto package jobmanagement_v1 diff --git a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api_grpc.pb.go b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api_grpc.pb.go index 08da7e4643c3..38f2c85a1c1c 100644 --- a/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api_grpc.pb.go +++ b/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.1.0 -// - protoc v4.24.0--rc1 +// - protoc v4.24.4 // source: org/apache/beam/model/job_management/v1/beam_job_api.proto package jobmanagement_v1 diff --git a/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go index b20a5dccbe05..49df2b5c2e59 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go @@ -22,7 +22,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/beam_runner_api.proto package pipeline_v1 @@ -1857,6 +1857,10 @@ const ( // SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB). // Payload: ASCII encoded string of the base 10 representation of an integer number of bytes. StandardResourceHints_MIN_RAM_BYTES StandardResourceHints_Enum = 1 + // Describes desired number of CPUs available in transform's execution environment. + // SDKs should accept and validate a positive integer count. + // Payload: ASCII encoded string of the base 10 representation of an integer number of CPUs. + StandardResourceHints_CPU_COUNT StandardResourceHints_Enum = 2 ) // Enum value maps for StandardResourceHints_Enum. @@ -1864,10 +1868,12 @@ var ( StandardResourceHints_Enum_name = map[int32]string{ 0: "ACCELERATOR", 1: "MIN_RAM_BYTES", + 2: "CPU_COUNT", } StandardResourceHints_Enum_value = map[string]int32{ "ACCELERATOR": 0, "MIN_RAM_BYTES": 1, + "CPU_COUNT": 2, } ) @@ -9223,42 +9229,45 @@ var file_org_apache_beam_model_pipeline_v1_beam_runner_api_proto_rawDesc = []byt 0x65, 0x63, 0x75, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x61, 0x67, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x48, 0x00, 0x52, 0x05, 0x74, 0x69, 0x6d, 0x65, 0x72, 0x42, 0x08, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x22, - 0x8f, 0x01, 0x0a, 0x15, 0x53, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x72, 0x64, 0x52, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x48, 0x69, 0x6e, 0x74, 0x73, 0x22, 0x76, 0x0a, 0x04, 0x45, 0x6e, 0x75, - 0x6d, 0x12, 0x34, 0x0a, 0x0b, 0x41, 0x43, 0x43, 0x45, 0x4c, 0x45, 0x52, 0x41, 0x54, 0x4f, 0x52, - 0x10, 0x00, 0x1a, 0x23, 0xa2, 0xb4, 0xfa, 0xc2, 0x05, 0x1d, 0x62, 0x65, 0x61, 0x6d, 0x3a, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x3a, 0x61, 0x63, 0x63, 0x65, 0x6c, 0x65, 0x72, - 0x61, 0x74, 0x6f, 0x72, 0x3a, 0x76, 0x31, 0x12, 0x38, 0x0a, 0x0d, 0x4d, 0x49, 0x4e, 0x5f, 0x52, - 0x41, 0x4d, 0x5f, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x01, 0x1a, 0x25, 0xa2, 0xb4, 0xfa, 0xc2, - 0x05, 0x1f, 0x62, 0x65, 0x61, 0x6d, 0x3a, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, - 0x3a, 0x6d, 0x69, 0x6e, 0x5f, 0x72, 0x61, 0x6d, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x3a, 0x76, - 0x31, 0x32, 0x8f, 0x01, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7a, 0x0a, 0x06, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x73, 0x12, 0x30, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, - 0x65, 0x61, 0x6d, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, - 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, + 0xc2, 0x01, 0x0a, 0x15, 0x53, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x72, 0x64, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x48, 0x69, 0x6e, 0x74, 0x73, 0x22, 0xa8, 0x01, 0x0a, 0x04, 0x45, 0x6e, + 0x75, 0x6d, 0x12, 0x34, 0x0a, 0x0b, 0x41, 0x43, 0x43, 0x45, 0x4c, 0x45, 0x52, 0x41, 0x54, 0x4f, + 0x52, 0x10, 0x00, 0x1a, 0x23, 0xa2, 0xb4, 0xfa, 0xc2, 0x05, 0x1d, 0x62, 0x65, 0x61, 0x6d, 0x3a, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x3a, 0x61, 0x63, 0x63, 0x65, 0x6c, 0x65, + 0x72, 0x61, 0x74, 0x6f, 0x72, 0x3a, 0x76, 0x31, 0x12, 0x38, 0x0a, 0x0d, 0x4d, 0x49, 0x4e, 0x5f, + 0x52, 0x41, 0x4d, 0x5f, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x01, 0x1a, 0x25, 0xa2, 0xb4, 0xfa, + 0xc2, 0x05, 0x1f, 0x62, 0x65, 0x61, 0x6d, 0x3a, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x73, 0x3a, 0x6d, 0x69, 0x6e, 0x5f, 0x72, 0x61, 0x6d, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x3a, + 0x76, 0x31, 0x12, 0x30, 0x0a, 0x09, 0x43, 0x50, 0x55, 0x5f, 0x43, 0x4f, 0x55, 0x4e, 0x54, 0x10, + 0x02, 0x1a, 0x21, 0xa2, 0xb4, 0xfa, 0xc2, 0x05, 0x1b, 0x62, 0x65, 0x61, 0x6d, 0x3a, 0x72, 0x65, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x3a, 0x63, 0x70, 0x75, 0x5f, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x3a, 0x76, 0x31, 0x32, 0x8f, 0x01, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7a, 0x0a, 0x06, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x12, 0x30, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, + 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x69, 0x70, + 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, + 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x22, 0x00, 0x30, 0x01, 0x3a, 0x3f, 0x0a, 0x08, 0x62, 0x65, 0x61, 0x6d, 0x5f, 0x75, + 0x72, 0x6e, 0x12, 0x21, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xc4, 0xa6, 0xaf, 0x58, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x62, 0x65, 0x61, 0x6d, 0x55, 0x72, 0x6e, 0x3a, 0x49, 0x0a, 0x0d, 0x62, 0x65, 0x61, 0x6d, 0x5f, + 0x63, 0x6f, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x12, 0x21, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xc5, 0xa6, 0xaf, 0x58, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x62, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, 0x73, 0x74, 0x61, + 0x6e, 0x74, 0x42, 0x78, 0x0a, 0x21, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x69, 0x70, 0x65, - 0x6c, 0x69, 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, - 0x00, 0x30, 0x01, 0x3a, 0x3f, 0x0a, 0x08, 0x62, 0x65, 0x61, 0x6d, 0x5f, 0x75, 0x72, 0x6e, 0x12, - 0x21, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x18, 0xc4, 0xa6, 0xaf, 0x58, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x65, 0x61, - 0x6d, 0x55, 0x72, 0x6e, 0x3a, 0x49, 0x0a, 0x0d, 0x62, 0x65, 0x61, 0x6d, 0x5f, 0x63, 0x6f, 0x6e, - 0x73, 0x74, 0x61, 0x6e, 0x74, 0x12, 0x21, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xc5, 0xa6, 0xaf, 0x58, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x62, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x42, - 0x78, 0x0a, 0x21, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, - 0x61, 0x6d, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x2e, 0x76, 0x31, 0x42, 0x09, 0x52, 0x75, 0x6e, 0x6e, 0x65, 0x72, 0x41, 0x70, 0x69, 0x5a, - 0x48, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, - 0x68, 0x65, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, - 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x6d, 0x6f, 0x64, 0x65, - 0x6c, 0x2f, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x5f, 0x76, 0x31, 0x3b, 0x70, 0x69, - 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x5f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x6c, 0x69, 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x09, 0x52, 0x75, 0x6e, 0x6e, 0x65, 0x72, 0x41, + 0x70, 0x69, 0x5a, 0x48, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, + 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, + 0x76, 0x32, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x6d, + 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x5f, 0x76, 0x31, + 0x3b, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x5f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api_grpc.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api_grpc.pb.go index d5e65f7b768d..20a30cf4dd01 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api_grpc.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api_grpc.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.1.0 -// - protoc v4.24.0--rc1 +// - protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/beam_runner_api.proto package pipeline_v1 diff --git a/sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go index 74348ddc3b33..2dfaffa2bff0 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go @@ -21,7 +21,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/endpoints.proto package pipeline_v1 diff --git a/sdks/go/pkg/beam/model/pipeline_v1/external_transforms.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/external_transforms.pb.go index 0bc21a56685e..edbe82264f5e 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/external_transforms.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/external_transforms.pb.go @@ -21,7 +21,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/external_transforms.proto package pipeline_v1 diff --git a/sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go index ec3e0d704a80..60edad2363be 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go @@ -21,7 +21,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/metrics.proto package pipeline_v1 diff --git a/sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go index 717fbbfb7a69..4bc6a57044cd 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go @@ -24,7 +24,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/schema.proto package pipeline_v1 diff --git a/sdks/go/pkg/beam/model/pipeline_v1/standard_window_fns.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/standard_window_fns.pb.go index dccd7d427503..e0522806df73 100644 --- a/sdks/go/pkg/beam/model/pipeline_v1/standard_window_fns.pb.go +++ b/sdks/go/pkg/beam/model/pipeline_v1/standard_window_fns.pb.go @@ -22,7 +22,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v4.24.0--rc1 +// protoc v4.24.4 // source: org/apache/beam/model/pipeline/v1/standard_window_fns.proto package pipeline_v1 diff --git a/sdks/go/pkg/beam/options/resource/hint.go b/sdks/go/pkg/beam/options/resource/hint.go index 1538fe65def2..d823f4feafa9 100644 --- a/sdks/go/pkg/beam/options/resource/hint.go +++ b/sdks/go/pkg/beam/options/resource/hint.go @@ -196,3 +196,40 @@ func (h acceleratorHint) MergeWithOuter(outer Hint) Hint { func (h acceleratorHint) String() string { return fmt.Sprintf("accelerator=%v", h.value) } + +// CPUCount hints that this scope should be put in a machine with at least this many CPUs or vCPUs. +// +// Hints are advisory only and runners may not respect them. +// +// See https://beam.apache.org/documentation/runtime/resource-hints/ for more information about +// resource hints. +func CPUCount(v uint64) Hint { + return CPUCountHint{value: uint64(v)} +} + +type CPUCountHint struct { + value uint64 +} + +func (CPUCountHint) URN() string { + return "beam:resources:cpu_count:v1" +} + +func (h CPUCountHint) Payload() []byte { + // Go strings are utf8, and if the string is ascii, + // byte conversion handles that directly. + return []byte(strconv.FormatUint(h.value, 10)) +} + +// MergeWithOuter by keeping the maximum of the two cpu counts. +func (h CPUCountHint) MergeWithOuter(outer Hint) Hint { + // Intentional runtime panic from type assertion to catch hint merge errors. + if outer.(CPUCountHint).value > h.value { + return outer + } + return h +} + +func (h CPUCountHint) String() string { + return fmt.Sprintf("cpu_count=%v", humanize.Bytes(uint64(h.value))) +} diff --git a/sdks/go/pkg/beam/options/resource/hint_test.go b/sdks/go/pkg/beam/options/resource/hint_test.go index cf24b47b6c91..7c2a1df79294 100644 --- a/sdks/go/pkg/beam/options/resource/hint_test.go +++ b/sdks/go/pkg/beam/options/resource/hint_test.go @@ -111,6 +111,38 @@ func TestParseMinRAMHint_panic(t *testing.T) { ParseMinRAM("a bad byte string") } +func TestCPUCountHint_MergeWith(t *testing.T) { + low := CPUCountHint{value: 2} + high := CPUCountHint{value: 128} + + if got, want := low.MergeWithOuter(high), high; got != want { + t.Errorf("%v.MergeWith(%v) = %v, want %v", low, high, got, want) + } + if got, want := high.MergeWithOuter(low), high; got != want { + t.Errorf("%v.MergeWith(%v) = %v, want %v", high, low, got, want) + } +} + +func TestCPUCountHint_Payload(t *testing.T) { + tests := []struct { + value uint64 + payload string + }{ + {0, "0"}, + {2, "2"}, + {11, "11"}, + {2003, "2003"}, + {1.2e7, "12000000"}, + } + + for _, test := range tests { + h := CPUCountHint{value: test.value} + if got, want := h.Payload(), []byte(test.payload); !bytes.Equal(got, want) { + t.Errorf("%v.Payload() = %v, want %v", h, got, want) + } + } +} + // We copy the URN from the proto for use as a constant rather than perform a direct look up // each time, or increase initialization time. However we do need to validate that they are // correct, and match the standard hint urns, so that's done here. @@ -130,7 +162,11 @@ func TestStandardHintUrns(t *testing.T) { }, { h: MinRAMBytes(2e9), urn: getStandardURN(pipepb.StandardResourceHints_MIN_RAM_BYTES), + }, { + h: CPUCount(4), + urn: getStandardURN(pipepb.StandardResourceHints_CPU_COUNT), }} + for _, test := range tests { if got, want := test.h.URN(), test.urn; got != want { t.Errorf("Checked urn for %T, got %q, want %q", test.h, got, want) @@ -154,12 +190,12 @@ func (h customHint) MergeWithOuter(outer Hint) Hint { } func TestHints_Equal(t *testing.T) { - hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas")) + hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"), CPUCount(4)) if got, want := hs.Equal(hs), true; got != want { t.Errorf("Self equal test: hs.Equal(hs) = %v, want %v", got, want) } - eq := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas")) + eq := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"), CPUCount(4)) if got, want := hs.Equal(eq), true; got != want { t.Errorf("identical equal test: hs.Equal(eq) = %v, want %v", got, want) } @@ -223,12 +259,13 @@ func TestHints_MergeWithOuter(t *testing.T) { func TestHints_Payloads(t *testing.T) { { - hs := NewHints(MinRAMBytes(2e9), Accelerator("type:jeans;count1;")) + hs := NewHints(MinRAMBytes(2e9), Accelerator("type:jeans;count1;"), CPUCount(4)) got := hs.Payloads() want := map[string][]byte{ "beam:resources:min_ram_bytes:v1": []byte("2000000000"), "beam:resources:accelerator:v1": []byte("type:jeans;count1;"), + "beam:resources:cpu_count:v1": []byte("4"), } if !reflect.DeepEqual(got, want) { t.Errorf("hs.Payloads() = %v, want %v", got, want) @@ -248,7 +285,7 @@ func TestHints_Payloads(t *testing.T) { func TestHints_NilHints(t *testing.T) { var hs1, hs2 Hints - hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas")) + hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"), CPUCount(4)) if got, want := hs1.Equal(hs2), true; got != want { t.Errorf("nils equal test: (nil).Equal(nil) = %v, want %v", got, want) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java index afd6a6ccb151..85cb2df9deab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java @@ -49,6 +49,8 @@ public class ResourceHints { private static final String MIN_RAM_URN = "beam:resources:min_ram_bytes:v1"; private static final String ACCELERATOR_URN = "beam:resources:accelerator:v1"; + private static final String CPU_COUNT_URN = "beam:resources:cpu_count:v1"; + // TODO: reference this from a common location in all packages that use this. private static String getUrn(ProtocolMessageEnum value) { return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn); @@ -57,6 +59,7 @@ private static String getUrn(ProtocolMessageEnum value) { static { checkState(MIN_RAM_URN.equals(getUrn(StandardResourceHints.Enum.MIN_RAM_BYTES))); checkState(ACCELERATOR_URN.equals(getUrn(StandardResourceHints.Enum.ACCELERATOR))); + checkState(CPU_COUNT_URN.equals(getUrn(StandardResourceHints.Enum.CPU_COUNT))); } private static ImmutableMap hintNameToUrn = @@ -64,12 +67,15 @@ private static String getUrn(ProtocolMessageEnum value) { .put("minRam", MIN_RAM_URN) .put("min_ram", MIN_RAM_URN) // Courtesy alias. .put("accelerator", ACCELERATOR_URN) + .put("cpuCount", CPU_COUNT_URN) + .put("cpu_count", CPU_COUNT_URN) // Courtesy alias. .build(); private static ImmutableMap> parsers = ImmutableMap.>builder() .put(MIN_RAM_URN, s -> new BytesHint(BytesHint.parse(s))) .put(ACCELERATOR_URN, s -> new StringHint(s)) + .put(CPU_COUNT_URN, s -> new IntHint(IntHint.parse(s))) .build(); private static final ResourceHints EMPTY = new ResourceHints(ImmutableMap.of()); @@ -212,6 +218,46 @@ public int hashCode() { } } + /*package*/ static class IntHint extends ResourceHint { + private final int value; + + @Override + public boolean equals(@Nullable Object other) { + if (other == null) { + return false; + } else if (this == other) { + return true; + } else if (other instanceof IntHint) { + return ((IntHint) other).value == value; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Integer.hashCode(value); + } + + public IntHint(int value) { + this.value = value; + } + + public static int parse(String s) { + return Integer.parseInt(s, 10); + } + + @Override + public ResourceHint mergeWithOuter(ResourceHint outer) { + return new IntHint(Math.max(value, ((IntHint) outer).value)); + } + + @Override + public byte[] toBytes() { + return String.valueOf(value).getBytes(Charsets.US_ASCII); + } + } + /** * Sets desired minimal available RAM size to have in transform's execution environment. * @@ -264,6 +310,23 @@ public ResourceHints withHint(String urn, ResourceHint hint) { return new ResourceHints(newHints.build()); } + /** + * Sets desired minimal CPU or vCPU count to have in transform's execution environment. + * + * @param cpuCount specifies a positive CPU count. + */ + public ResourceHints withCPUCount(int cpuCount) { + if (cpuCount <= 0) { + LOG.error( + "Encountered invalid non-positive cpu count hint value {}.\n" + + "The value is ignored. In the future, The method will require an object Long type " + + "and throw an IllegalArgumentException for invalid values.", + cpuCount); + return this; + } + return withHint(CPU_COUNT_URN, new IntHint(cpuCount)); + } + public Map hints() { return hints; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsTest.java index 3cc522176374..c7643f718aa5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsTest.java @@ -92,10 +92,13 @@ public void testFromOptions() { .withHint("beam:resources:bar", new ResourceHints.StringHint("foo"))); options = PipelineOptionsFactory.fromArgs( - "--resourceHints=min_ram=1KB", "--resourceHints=accelerator=foo") + "--resourceHints=min_ram=1KB", + "--resourceHints=accelerator=foo", + "--resourceHints=cpu_count=4") .as(ResourceHintsOptions.class); - assertEquals( - ResourceHints.fromOptions(options), - ResourceHints.create().withMinRam(1000).withAccelerator("foo")); + ResourceHints fromOptions = ResourceHints.fromOptions(options); + ResourceHints expect = + ResourceHints.create().withMinRam(1000).withAccelerator("foo").withCPUCount(4); + assertEquals(fromOptions, expect); } } diff --git a/sdks/python/apache_beam/transforms/resources.py b/sdks/python/apache_beam/transforms/resources.py index 7bb202ab5660..7c4160df8edd 100644 --- a/sdks/python/apache_beam/transforms/resources.py +++ b/sdks/python/apache_beam/transforms/resources.py @@ -42,6 +42,7 @@ 'ResourceHint', 'AcceleratorHint', 'MinRamHint', + 'CpuCountHint', 'merge_resource_hints', 'parse_resource_hints', 'resource_hints_from_options', @@ -177,6 +178,21 @@ def get_merged_value( ResourceHint.register_resource_hint('minRam', MinRamHint) +class CpuCountHint(ResourceHint): + """Describes number of CPUs available in transform's execution environment.""" + urn = resource_hints.CPU_COUNT.urn + + @classmethod + def get_merged_value( + cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes + return ResourceHint._use_max(outer_value, inner_value) + + +ResourceHint.register_resource_hint('cpu_count', CpuCountHint) +# Alias for interoperability with SDKs preferring camelCase. +ResourceHint.register_resource_hint('cpuCount', CpuCountHint) + + def parse_resource_hints(hints): # type: (Dict[Any, Any]) -> Dict[str, bytes] parsed_hints = {} for hint, value in hints.items(): diff --git a/sdks/python/apache_beam/transforms/resources_test.py b/sdks/python/apache_beam/transforms/resources_test.py index 939391b7adcb..939bdcd62651 100644 --- a/sdks/python/apache_beam/transforms/resources_test.py +++ b/sdks/python/apache_beam/transforms/resources_test.py @@ -46,6 +46,11 @@ class ResourcesTest(unittest.TestCase): val='gpu', urn='beam:resources:accelerator:v1', bytestr=b'gpu'), + param( + name='cpu_count', + val='4', + urn='beam:resources:cpu_count:v1', + bytestr=b'4'), ]) def test_known_resource_hints(self, name, val, urn, bytestr): t = PTransform() @@ -56,6 +61,7 @@ def test_known_resource_hints(self, name, val, urn, bytestr): @parameterized.expand([ param(name='min_ram', val='3,500G'), param(name='accelerator', val=1), + param(name='cpu_count', val=1), param(name='unknown_hint', val=1) ]) def test_resource_hint_parsing_fails_early(self, name, val):