Skip to content
This repository has been archived by the owner on Jun 30, 2022. It is now read-only.

Commit

Permalink
Add autoscaling pipeline options
Browse files Browse the repository at this point in the history
----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=124417190
  • Loading branch information
robertwb authored and aaltay committed Jun 10, 2016
1 parent 9782343 commit 2f9e11c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
10 changes: 10 additions & 0 deletions google/cloud/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,19 @@ def __init__(self, packages, options, environment_version):
parallelWorkerSettings=dataflow.WorkerSettings(
baseUrl='https://dataflow.googleapis.com',
servicePath=self.google_cloud_options.dataflow_endpoint)))
pool.autoscalingSettings = dataflow.AutoscalingSettings()
# Set worker pool options received through command line.
if self.worker_options.num_workers:
pool.numWorkers = self.worker_options.num_workers
if self.worker_options.max_num_workers:
pool.autoscalingSettings.maxNumWorkers = (
self.worker_options.max_num_workers)
if self.worker_options.autoscaling_algorithm:
values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
pool.autoscalingSettings.algorithm = {
'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
}.get(self.worker_options.autoscaling_algorithm)
if self.worker_options.machine_type:
pool.machineType = self.worker_options.machine_type
if self.worker_options.disk_size_gb:
Expand Down
16 changes: 13 additions & 3 deletions google/cloud/dataflow/utils/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ def _add_argparse_args(cls, parser):
help=
('Number of workers to use when executing the Dataflow job. If not '
'set, the Dataflow service will use a reasonable default.'))
parser.add_argument(
'--max_num_workers',
type=int,
default=None,
help=
('Maximum number of workers to use when executing the Dataflow job.'))
parser.add_argument(
'--autoscaling_algorithm',
type=str,
choices=['NONE', 'THROUGHPUT_BASED'],
default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
help=
('If and how to auotscale the workerpool.'))
# TODO(silviuc): Remove --machine_type variant of the flag.
parser.add_argument(
'--worker_machine_type', '--machine_type',
Expand Down Expand Up @@ -428,9 +441,6 @@ def _add_argparse_args(cls, parser):
'workers will install them in same order they were specified on the '
'command line.'))

# TODO(silviuc): Add autoscaling related options:
# --autoscaling_algorithm, --max_num_workers.

# TODO(silviuc): Add --files_to_stage option.
# This could potentially replace the --requirements_file and --setup_file.

Expand Down

0 comments on commit 2f9e11c

Please sign in to comment.