Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batches and multiprocess #6

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/run_mopso.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ def objective_function_2(x):
plt.xlabel("Objective 1")
plt.ylabel("Objective 2")
plt.title("Pareto Front")
plt.savefig("tmp/pareto")
plt.savefig("tmp/pareto")
138 changes: 115 additions & 23 deletions optimizer/mopso.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import random
import numpy as np
from optimizer import Optimizer, FileManager
from concurrent.futures import ProcessPoolExecutor
import warnings
from .termcolors import bcolors


class Particle:
Expand Down Expand Up @@ -195,7 +198,7 @@ class MOPSO(Optimizer):

def __init__(self,
objective,
lower_bounds, upper_bounds, num_particles=50,
lower_bounds, upper_bounds, num_particles=50, num_batch = 1,
inertia_weight=0.5, cognitive_coefficient=1, social_coefficient=1,
incremental_pareto=False, initial_particles_position='spread'):
self.objective = objective
Expand All @@ -206,6 +209,7 @@ def __init__(self,
except FileNotFoundError as e:
print("Checkpoint not found. Fallback to standard construction.")
self.num_particles = num_particles
self.num_batch = num_batch
self.num_params = len(lower_bounds)
self.lower_bounds = lower_bounds
self.upper_bounds = upper_bounds
Expand All @@ -231,6 +235,32 @@ def __init__(self,
else:
raise ValueError(
f"MOPSO: initial_particles_position must be one of {VALID_INITIAL_PARTICLES_POSITIONS}")
if (num_batch == 1):
self.particles_batch.append(self.particles)
self.batch_size = len(self.particles)
else:
# Calculate the approximate batch size
self.batch_size = len(self.particles) // self.num_batch

# Check if the division leaves some elements unallocated
remaining_elements = len(self.particles) % self.batch_size

if remaining_elements > 0:
# Warn the user and suggest adjusting the number of particles or batches
warning_message = (
f"{bcolors.WARNING}The specified number of batches ({self.num_batch}) does not evenly divide the number of particles ({len(self.particles)}).{bcolors.ENDC}"
)
warnings.warn(warning_message)

# Use list comprehension to create batches
self.particles_batch = [self.particles[i:i + self.batch_size]
for i in range(0, len(self.particles), self.batch_size)]

# If the division leaves some elements unallocated, add them to the last batch
if remaining_elements > 0:
last_batch = self.particles_batch.pop()
last_batch.extend(self.particles[len(self.particles_batch) * self.batch_size:])
self.particles_batch.append(last_batch)

self.iteration = 0
self.incremental_pareto = incremental_pareto
Expand All @@ -256,6 +286,7 @@ def save_attributes(self):
'upper_bounds': self.upper_bounds,
'num_particles': self.num_particles,
'num_params': self.num_params,
'num_batch': self.num_batch,
'inertia_weight': self.inertia_weight,
'cognitive_coefficient': self.cognitive_coefficient,
'social_coefficient': self.social_coefficient,
Expand Down Expand Up @@ -283,11 +314,11 @@ def save_state(self):
particle.velocity,
particle.best_position,
np.ravel(particle.best_fitness)])
for particle in self.particles],
for batch in self.particles_batch for particle in batch],
'checkpoint/individual_states.csv')

print(f"Saving Pareto Front {len(self.pareto_front)}")
FileManager.save_csv([np.concatenate([particle.position, np.ravel(particle.fitness)])
for particle in self.pareto_front],
for batch in self.particles_batch for particle in batch],
'checkpoint/pareto_front.csv')

def load_checkpoint(self):
Expand All @@ -310,6 +341,7 @@ def load_checkpoint(self):
self.upper_bounds = pso_attributes['upper_bounds']
self.num_particles = pso_attributes['num_particles']
self.num_params = pso_attributes['num_params']
self.num_batch = pso_attributes['num_batch']
self.inertia_weight = pso_attributes['inertia_weight']
self.cognitive_coefficient = pso_attributes['cognitive_coefficient']
self.social_coefficient = pso_attributes['social_coefficient']
Expand All @@ -336,6 +368,35 @@ def load_checkpoint(self):
individual_states[i][3*self.num_params:], dtype=float)
)
self.particles.append(particle)

#restore batches
self.particles_batch = []
if (self.num_batch == 1):
self.particles_batch.append(self.particles)
self.batch_size = len(self.particles)
else:
# Calculate the approximate batch size
self.batch_size = len(self.particles) // self.num_batch

# Check if the division leaves some elements unallocated
remaining_elements = len(self.particles) % self.batch_size

if remaining_elements > 0:
# Warn the user and suggest adjusting the number of particles or batches
warning_message = (
f"{bcolors.WARNING}The specified number of batches ({self.num_batch}) does not evenly divide the number of particles ({len(self.particles)}).{bcolors.ENDC}"
)
warnings.warn(warning_message)

# Use list comprehension to create batches
self.particles_batch = [self.particles[i:i + self.batch_size]
for i in range(0, len(self.particles), self.batch_size)]

# If the division leaves some elements unallocated, add them to the last batch
if remaining_elements > 0:
last_batch = self.particles_batch.pop()
last_batch.extend(self.particles[len(self.particles_batch) * self.batch_size:])
self.particles_batch.append(last_batch)

# restore pareto front
self.pareto_front = []
Expand All @@ -349,8 +410,19 @@ def load_checkpoint(self):
best_position=None,
best_fitness=None)
self.pareto_front.append(particle)

def optimize(self, num_iterations=100, max_iter_no_improv=None):


def process_batch(self, worker_id, batch):
# Launch a program for this batch using objective_function
print(f"Worker ID {worker_id}")
params = [particle.position for particle in batch]
optimization_output = self.objective.evaluate(params, worker_id )
for p_id, particle in enumerate(batch):
particle.set_fitness(optimization_output[:,p_id])
batch[p_id] = particle
return batch

def optimize(self, num_iterations = 100, max_iter_no_improv = None):
"""
Perform the MOPSO optimization process and return the Pareto front of non-dominated
solutions. If `history_dir` is specified, the position and fitness of all particles
Expand All @@ -365,23 +437,36 @@ def optimize(self, num_iterations=100, max_iter_no_improv=None):
list: List of Particle objects representing the Pareto front of non-dominated solutions.
"""
for _ in range(num_iterations):
optimization_output = self.objective.evaluate(
[particle.position for particle in self.particles])
[particle.set_fitness(optimization_output[:, p_id])
for p_id, particle in enumerate(self.particles)]

FileManager.save_csv([np.concatenate([particle.position, np.ravel(
particle.fitness)]) for particle in self.particles],
with ProcessPoolExecutor(max_workers=self.num_batch) as executor:
futures = [executor.submit(self.process_batch, worker_id, batch)
for worker_id, batch in enumerate(self.particles_batch )]

new_batches = []
for future in futures:
batch = future.result()
new_batches.append(batch)
self.particles_batch = new_batches
save_particles = []
updated_particles = []
for batch in self.particles_batch:
for particle in batch:
l = np.concatenate([particle.position, np.ravel(particle.fitness)])
save_particles.append(l)
updated_particles.append(particle)
FileManager.save_csv(save_particles,
'history/iteration' + str(self.iteration) + '.csv')

self.particles = updated_particles
self.update_pareto_front()

for particle in self.particles:
particle.update_velocity(self.pareto_front,
self.inertia_weight,
self.cognitive_coefficient,
self.social_coefficient)
particle.update_position(self.lower_bounds, self.upper_bounds)
for batch in self.particles_batch:
for particle in batch:
particle.update_velocity(self.pareto_front,
self.inertia_weight,
self.cognitive_coefficient,
self.social_coefficient)
particle.update_position(
self.lower_bounds, self.upper_bounds)

self.iteration += 1

Expand All @@ -394,12 +479,17 @@ def update_pareto_front(self):
"""
Update the Pareto front of non-dominated solutions across all iterations.
"""
particles = self.particles
all_particles = [
particle for batch in self.particles_batch for particle in batch]

if self.incremental_pareto:
particles = particles + self.pareto_front
all_particles = [
particle for batch in self.particles_batch for particle in batch]

particles = all_particles + self.pareto_front
self.pareto_front = [copy.deepcopy(particle) for particle in particles
if not particle.is_dominated(particles)]
print(len(self.pareto_front))

crowding_distances = self.calculate_crowding_distance(
self.pareto_front)
Expand All @@ -413,10 +503,12 @@ def get_current_pareto_front(self):
Returns:
list: List of Particle objects representing the Pareto front.
"""
all_particles = [
particle for batch in self.particles_batch for particle in batch]
pareto_front = []
for particle in self.particles:
for particle in all_particles:
dominated = False
for other_particle in self.particles:
for other_particle in all_particles:
if np.all(particle.fitness >= other_particle.fitness) and \
np.any(particle.fitness > other_particle.fitness):
dominated = True
Expand Down
18 changes: 9 additions & 9 deletions optimizer/objective.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ def __init__(self, objective_functions, num_objectives = None) -> None:
else:
self.num_objectives = num_objectives
pass
def evaluate(self, items):
return np.array([objective_function([item for item in items]) for objective_function in self.objective_functions])

def evaluate(self, items, worker_id=0):
return np.array([objective_function([item for item in items], worker_id) for objective_function in self.objective_functions])

def type(self):
return self.__class__.__name__

class ElementWiseObjective(Objective):
def __init__(self, objective_functions, num_objectives=None) -> None:
super().__init__(objective_functions, num_objectives)
def evaluate(self, items):
return np.array([[obj_func(item) for item in items] for obj_func in self.objective_functions])

def evaluate(self, items, worker_id=0):
return np.array([[obj_func(item, worker_id) for item in items] for obj_func in self.objective_functions])

class BatchObjective(Objective):
def __init__(self, objective_functions, num_objectives=None) -> None:
super().__init__(objective_functions, num_objectives)
super().__init__(objective_functions, num_objectives)
11 changes: 11 additions & 0 deletions optimizer/termcolors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'

3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
version='0.0.1',
packages=find_packages(include=['optimizer', 'optimizer.*']),
install_requires=['scikit-learn','numpy','matplotlib','pandas']
)
)