Skip to content

Commit

Permalink
Changes to support ramp-up feature (#725)
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Singh <[email protected]>
  • Loading branch information
rishabh6788 authored Jan 15, 2025
1 parent 1417225 commit 4d96c35
Show file tree
Hide file tree
Showing 8 changed files with 828 additions and 365 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ indent-after-paren=4
indent-string=' '

# Maximum number of characters on a single line.
max-line-length=140
max-line-length=180

# Maximum number of lines in a module.
max-module-lines=1000
Expand Down
15 changes: 15 additions & 0 deletions osbenchmark/resources/workload-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"type": "integer",
"minimum": 1
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -75,6 +80,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -146,6 +156,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down
80 changes: 58 additions & 22 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ def os_clients(all_hosts, all_client_options):
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
schedule = schedule_for(task_allocation, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error), self.cfg)
Expand Down Expand Up @@ -1607,6 +1607,15 @@ async def __call__(self, *args, **kwargs):
# lazily initialize the schedule
self.logger.debug("Initializing schedule for client id [%s].", self.client_id)
schedule = self.schedule_handle()
self.schedule_handle.start()
rampup_wait_time = self.schedule_handle.ramp_up_wait_time
if rampup_wait_time:
self.logger.info("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time)
await asyncio.sleep(rampup_wait_time)

if rampup_wait_time:
console.println(f" Client id {self.client_id} is running now.")

self.logger.debug("Entering main loop for client id [%s].", self.client_id)
# noinspection PyBroadException
try:
Expand Down Expand Up @@ -1806,18 +1815,28 @@ def __repr__(self, *args, **kwargs):


class TaskAllocation:
def __init__(self, task, client_index_in_task):
def __init__(self, task, client_index_in_task, global_client_index, total_clients):
"""
:param task: The current task which is always a leaf task.
:param client_index_in_task: The task-specific index for the allocated client.
:param global_client_index: The globally unique index for the allocated client across
all concurrently executed tasks.
:param total_clients: The total number of clients executing tasks concurrently.
"""
self.task = task
self.client_index_in_task = client_index_in_task
self.global_client_index = global_client_index
self.total_clients = total_clients

def __hash__(self):
return hash(self.task) ^ hash(self.client_index_in_task)
return hash(self.task) ^ hash(self.global_client_index)

def __eq__(self, other):
return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task
return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index

def __repr__(self, *args, **kwargs):
return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task)
return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \
f"and [{self.global_client_index}/{self.total_clients}] in total"


class Allocator:
Expand Down Expand Up @@ -1858,12 +1877,16 @@ def allocations(self):
clients_executing_completing_task = []
for sub_task in task:
for client_index in range(start_client_index, start_client_index + sub_task.clients):
# this is the actual client that will execute the task. It may differ from the logical one in case we over-commit (i.e.
# more tasks than actually available clients)
physical_client_index = client_index % max_clients
if sub_task.completes_parent:
clients_executing_completing_task.append(physical_client_index)
allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index))
ta = TaskAllocation(task = sub_task,
client_index_in_task = client_index - start_client_index,
global_client_index=client_index,
# if task represents a parallel structure this is the total number of clients
# executing sub-tasks concurrently.
total_clients=task.clients)
allocations[physical_client_index].append(ta)
start_client_index += sub_task.clients

# uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them
Expand Down Expand Up @@ -1941,7 +1964,7 @@ def clients(self):

# Runs a concrete schedule on one worker client
# Needs to determine the runners and concrete iterations per client.
def schedule_for(task, client_index, parameter_source):
def schedule_for(task_allocation, parameter_source):
"""
Calculates a client's schedule for a given task.
Expand All @@ -1951,15 +1974,17 @@ def schedule_for(task, client_index, parameter_source):
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
task = task_allocation.task
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task)

client_index = task_allocation.client_index_in_task
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)
params_for_op = parameter_source.partition(client_index, task.clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
Expand Down Expand Up @@ -1992,7 +2017,7 @@ def schedule_for(task, client_index, parameter_source):
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)
return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, task_runner, params):
Expand All @@ -2009,27 +2034,40 @@ def requires_time_period_schedule(task, task_runner, params):


class ScheduleHandle:
def __init__(self, task_name, sched, task_progress_control, runner, params):
def __init__(self, task_allocation, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.
:param task_name: The name of the task for which the schedule is generated.
:param task_allocation: The task allocation for which the schedule is generated.
:param sched: The scheduler for this task.
:param task_progress_control: Controls how and how often this generator will loop.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
self.task_name = task_name
self.task_allocation = task_allocation
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
self.params = params
# TODO: Can we offload the parameter source execution to a different thread / process? Is this too heavy-weight?
#from concurrent.futures import ThreadPoolExecutor
#import asyncio
#self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
#self.loop = asyncio.get_event_loop()
# from concurrent.futures import ThreadPoolExecutor
# import asyncio
# self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
# self.loop = asyncio.get_event_loop()
@property
def ramp_up_wait_time(self):
"""
:return: the number of seconds to wait until this client should start so load can gradually ramp-up.
"""
ramp_up_time_period = self.task_allocation.task.ramp_up_time_period
if ramp_up_time_period:
return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients)
else:
return 0

def start(self):
self.task_progress_control.start()

def before_request(self, now):
self.sched.before_request(now)
Expand All @@ -2041,20 +2079,18 @@ async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
try:
next_scheduled = self.sched.next(next_scheduled)
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = self.params.percent_completed if param_source_knows_progress else None
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
# current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner,
self.params.params())
self.task_progress_control.next()
except StopIteration:
return
else:
self.task_progress_control.start()
while not self.task_progress_control.completed:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand Down
35 changes: 31 additions & 4 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1765,14 +1765,24 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False)
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False)
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False)

# now descent to each operation
tasks = []
for task in self._r(ops_spec, "tasks", error_ctx="parallel"):
tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations,
default_warmup_time_period, default_time_period, completed_by))
default_warmup_time_period, default_time_period, default_ramp_up_time_period, completed_by))

for task in tasks:
if task.ramp_up_time_period != default_ramp_up_time_period:
if default_ramp_up_time_period is None:
self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies "
f"a ramp-up-time-period but it is only allowed on the 'parallel' element.")
else:
self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing "
f"'parallel' element in test-procedure '{test_procedure_name}'.")
if completed_by:
completion_task = None
for task in tasks:
Expand All @@ -1788,7 +1798,8 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
return workload.Parallel(tasks, clients)

def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None,
default_warmup_time_period=None, default_time_period=None, completed_by_name=None):
default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None,
completed_by_name=None):

op_spec = task_spec["operation"]
if isinstance(op_spec, str) and op_spec in ops:
Expand All @@ -1811,6 +1822,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
default_value=default_warmup_time_period),
time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False,
default_value=default_time_period),
ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name,
mandatory=False, default_value=default_ramp_up_time_period),
clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1),
completes_parent=(task_name == completed_by_name),
schedule=schedule,
Expand All @@ -1819,11 +1832,25 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
if task.warmup_iterations is not None and task.time_period is not None:
self._error(
"Operation '%s' in test_procedure '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period))
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period))
elif task.warmup_time_period is not None and task.iterations is not None:
self._error(
"Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))

if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and "
f"{task.iterations} iterations but mixing time periods and iterations is not allowed.")

if task.ramp_up_time_period is not None:
if task.warmup_time_period is None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds but no warmup-time-period.")
elif task.warmup_time_period < task.ramp_up_time_period:
self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is "
f"{task.warmup_time_period} seconds but must be greater than or equal to the "
f"ramp-up-time-period of {task.ramp_up_time_period} seconds.")

return task

Expand Down
Loading

0 comments on commit 4d96c35

Please sign in to comment.