Files
assembly-line-optimizer/lineopt_reconfiguration.py

651 lines
26 KiB
Python

from base_optimizer.optimizer_common import *
from estimator import *
def random_component_assignment(pcb_data, component_data, machine_number, estimator=None):
# == the set of feasible component type for each nozzle type
nozzle_part_list = defaultdict(list)
component_points = []
for idx, data in component_data.iterrows():
component_points.append(data.points)
nozzle_part_list[data.nz].append(idx)
component_number = len(component_data)
assignment_result = [[0 for _ in range(component_number)] for _ in range(machine_number)]
# === ensure every nozzle types ===
selected_part = []
for part_list in nozzle_part_list.values():
part = random.sample(part_list, 1)[0]
machine_index = random.randint(0, machine_number - 1)
assignment_result[machine_index][part] += 1
component_points[part] -= 1
selected_part.append(part)
# === assign one placement which has not been selected ===
for part in range(component_number):
if part in selected_part:
continue
assignment_result[random.randint(0, machine_number - 1)][part] += 1
component_points[part] -= 1
machine_assign = list(range(machine_number))
random.shuffle(machine_assign)
finished_assign_counter = component_points.count(0)
while finished_assign_counter < component_number:
for machine_index in machine_assign:
part = random.randint(0, component_number - 1)
feeder_counter = 0
for idx in range(machine_number):
if assignment_result[idx][part] > 0 or idx == machine_index:
feeder_counter += 1
if component_points[part] == 0 or feeder_counter > component_data.iloc[part].fdn:
continue
# feeder limit restriction
points = random.randint(1, component_points[part])
assignment_result[machine_index][part] += points
component_points[part] -= points
if component_points[part] == 0:
finished_assign_counter += 1
assert sum(component_points) == 0
objective_value = 0
cp_items = converter(pcb_data, component_data, assignment_result)
for machine_index in range(machine_number):
cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index]
objective_value = max(objective_value, estimator.predict(cp_points, cp_nozzle, board_width, board_height))
return objective_value, assignment_result
def greedy_component_assignment(component_points, component_nozzle, component_feeders, task_block_weight):
pass # 不清楚原文想说什么
def local_search_component_assignment(pcb_data, component_data, machine_number, estimator):
# maximum number of iterations : 5000
# maximum number of unsuccessful iterations: 50
component_number = len(component_data)
iteration_counter, unsuccessful_iteration_counter = 5000, 50
optimal_val, optimal_assignment = random_component_assignment(pcb_data, component_data, machine_number, estimator)
for _ in range(iteration_counter):
machine_idx = random.randint(0, machine_number - 1)
if sum(optimal_assignment[machine_idx]) == 0:
continue
part_set = []
for part_idx in range(component_number):
if optimal_assignment[machine_idx][part_idx] != 0:
part_set.append(part_idx)
part_idx = random.sample(part_set, 1)[0]
r = random.randint(1, optimal_assignment[machine_idx][part_idx])
assignment = copy.deepcopy(optimal_assignment)
cyclic_counter = 0
swap_machine_idx = None
swap_available = False
while cyclic_counter <= 2 * machine_number:
cyclic_counter += 1
swap_machine_idx = random.randint(0, machine_number - 1)
feeder_available = 0
for machine in range(machine_number):
if optimal_assignment[machine][part_idx] or machine == swap_machine_idx:
feeder_available += 1
if feeder_available <= component_data.iloc[part_idx].fdn and swap_machine_idx != machine_idx:
swap_available = True
break
assert swap_machine_idx is not None
if swap_available:
assignment[machine_idx][part_idx] -= r
assignment[swap_machine_idx][part_idx] += r
val = 0
cp_items = converter(pcb_data, component_data, assignment)
for machine_index in range(machine_number):
cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index]
val = max(val, estimator.predict(cp_points, cp_nozzle, board_width, board_height))
if val < optimal_val:
optimal_assignment, optimal_val = assignment, val
unsuccessful_iteration_counter = 50
else:
unsuccessful_iteration_counter -= 1
if unsuccessful_iteration_counter <= 0:
break
return optimal_val, optimal_assignment
def reconfig_crossover_operation(component_data, parent1, parent2, machine_number):
offspring1, offspring2 = copy.deepcopy(parent1), copy.deepcopy(parent2)
component_number = len(component_data)
# === crossover ===
mask_bit = []
for _ in range(machine_number):
mask_bit.append(random.randint(0, 1))
if sum(mask_bit) == 0 or sum(mask_bit) == machine_number:
return offspring1, offspring2
for machine_index in range(machine_number):
if mask_bit:
offspring1[machine_index] = copy.deepcopy(parent1[machine_index])
offspring2[machine_index] = copy.deepcopy(parent2[machine_index])
else:
offspring1[machine_index] = copy.deepcopy(parent2[machine_index])
offspring2[machine_index] = copy.deepcopy(parent1[machine_index])
# === balancing ===
# equally to reach the correct number
for part_index in range(component_number):
for offspring in [offspring1, offspring2]:
additional_points = sum([offspring[mt][part_index] for mt in range(machine_number)]) - \
component_data.iloc[part_index]['points']
if additional_points > 0:
# if a component type has more placements, decrease the assigned values on every head equally keeping
# the proportion of the number of placement among the heads
points_list = []
for machine_index in range(machine_number):
points = math.floor(
additional_points * offspring[machine_index][part_index] / component_data[part_index]['points'])
points_list.append(points)
offspring[machine_index][part_index] -= points
additional_points -= sum(points_list)
for machine_index in range(machine_number):
if additional_points == 0:
break
if offspring[machine_index][part_index] == 0:
continue
offspring[machine_index][part_index] -= 1
additional_points += 1
elif additional_points < 0:
# otherwise, increase the assigned nonzero values equally
machine_set = []
for machine_index in range(machine_number):
if offspring[machine_index][part_index] == 0:
continue
machine_set.append(machine_index)
points = -math.ceil(additional_points / len(machine_set))
for machine_index in machine_set:
offspring[machine_index][part_index] += points
additional_points += points
for machine_index in machine_set:
if additional_points == 0:
break
offspring[machine_index][part_index] += 1
additional_points -= 1
return offspring1, offspring2
def reconfig_mutation_operation(component_data, parent, machine_number):
offspring = copy.deepcopy(parent)
swap_direction = random.randint(0, 1)
if swap_direction:
swap_machine1, swap_machine2 = random.sample(list(range(machine_number)), 2)
else:
swap_machine2, swap_machine1 = random.sample(list(range(machine_number)), 2)
component_list = []
for component_index, points in enumerate(offspring[swap_machine1]):
if points:
component_list.append(component_index)
if len(component_list) == 0:
return offspring
swap_component_index = random.sample(component_list, 1)[0]
swap_points = random.randint(1, offspring[swap_machine1][swap_component_index])
offspring[swap_machine1][swap_component_index] -= swap_points
offspring[swap_machine2][swap_component_index] += swap_points
feeder_counter = 0
for machine_index in range(machine_number):
if offspring[machine_index][swap_component_index]:
feeder_counter += 1
if feeder_counter > component_data.iloc[swap_component_index].fdn:
return parent
return offspring
def evolutionary_component_assignment(pcb_data, component_data, machine_number, estimator):
# population size: 10
# probability of the mutation: 0.1
# probability of the crossover: 0.8
# number of generation: 100
population_size = 10
generation_number = 100
mutation_rate, crossover_rate = 0.1, 0.8
population, pop_val = [], []
for _ in range(population_size):
population.append(random_component_assignment(pcb_data, component_data, machine_number, estimator)[1])
cp_items = converter(pcb_data, component_data, population[-1])
val = 0
for machine_index in range(machine_number):
cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index]
val = max(val, estimator.predict(cp_points, cp_nozzle, board_width, board_height))
pop_val.append(val)
with tqdm(total=generation_number) as pbar:
pbar.set_description('evolutionary algorithm process for PCB assembly line balance')
new_population = []
for _ in range(generation_number):
population += new_population
# calculate fitness value
for individual in new_population:
val = 0
cp_items = converter(pcb_data, component_data, individual)
for machine_index in range(machine_number):
cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index]
val = max(val, estimator.predict(cp_points, cp_nozzle, board_width, board_height))
pop_val.append(val)
select_index = get_top_k_value(pop_val, population_size, reverse=False)
population = [population[idx] for idx in select_index]
pop_val = [pop_val[idx] for idx in select_index]
# min-max convert
max_val = max(pop_val)
pop_val_sel = list(map(lambda v: max_val - v, pop_val))
sum_pop_val = sum(pop_val_sel) + 1e-10
pop_val_sel = [v / sum_pop_val + 1e-3 for v in pop_val_sel]
# crossover and mutation
new_population = []
for pop in range(population_size):
if pop % 2 == 0 and np.random.random() < crossover_rate:
index1 = roulette_wheel_selection(pop_val_sel)
while True:
index2 = roulette_wheel_selection(pop_val_sel)
if index1 != index2:
break
offspring1, offspring2 = reconfig_crossover_operation(component_data, population[index1],
population[index2], machine_number)
if np.random.random() < mutation_rate:
offspring1 = reconfig_mutation_operation(component_data, offspring1, machine_number)
if np.random.random() < mutation_rate:
offspring2 = reconfig_mutation_operation(component_data, offspring2, machine_number)
new_population.append(offspring1)
new_population.append(offspring2)
pbar.update(1)
return min(pop_val), population[np.argmin(pop_val)]
class SpiderMonkeyOpt:
def __init__(self, pop_size, pcb_data, component_data, machine_number, estimator):
self.PcbData = pcb_data
self.ComponentData = component_data
self.Estimator = estimator
self.PopSize = pop_size
self.LocalLimit = pop_size
self.GlobalLimit = pop_size
self.MachineNum = machine_number
self.GroupSize = 0
# self.Dim = sum(data.fdn for _, data in component_data.iterrows()) + machine_number
self.CpPoints = defaultdict(int)
self.CpIndex = defaultdict(int)
self.CpNozzle = defaultdict(str)
self.Dim = 0
for cp_idx, data in component_data.iterrows():
# cp_feeders[cp_idx] = data.fdn
division_data = copy.deepcopy(data)
feeder_limit, total_points = division_data.fdn, division_data.points
surplus_points = total_points % feeder_limit
for _ in range(feeder_limit):
division_data.fdn, division_data.points = 1, math.floor(total_points / feeder_limit)
if surplus_points:
division_data.points += 1
surplus_points -= 1
self.CpPoints[self.Dim], self.CpNozzle[self.Dim] = division_data.points, division_data.nz
self.CpIndex[self.Dim] = cp_idx
self.Dim += 1
self.Dim += machine_number
component_list = list(range(len(self.CpPoints)))
self.GenPosition = []
self.GenPopVal = []
for _ in range(self.PopSize):
random.shuffle(component_list)
self.GenPosition.append(component_list.copy())
idx, prev = 0, 0
div = random.sample(range(len(component_list)), self.MachineNum - 1)
div.append(len(component_list))
div.sort(reverse=False)
for _ in range(1, self.MachineNum + 1):
self.GenPosition[-1].append(div[idx] - prev)
prev = div[idx]
idx += 1
self.GenPopVal.append(self.CalIndividualVal(self.GenPosition[-1]))
self.GroupPoint = [[0 for _ in range(2)] for _ in range(self.PopSize)]
self.GroupPart = 1
self.GenProb = None
self.GlobalMin = self.GenPopVal[0]
self.GlobalLeaderPosition = self.GenPosition[0].copy()
self.GlobalLimitCount = 0
self.LocalMin = np.ones(self.PopSize) * 1e10
self.LocalLeaderPosition = [[0 for _ in range(self.Dim)] for _ in range(self.PopSize)]
self.LocalLimitCount = [0 for _ in range(self.PopSize)]
for k in range(self.GroupSize):
self.LocalMin[k] = self.GenPopVal[int(self.GroupPoint[k, 0])]
self.LocalLeaderPosition[k,:] = self.GenPosition[int(self.GroupPoint[k, 0]),:]
self.CrossoverRatio = 0.1
def GlobalLearning(self):
GlobalTrial = self.GlobalMin
for i in range(self.PopSize):
if self.GenPopVal[i] < self.GlobalMin:
self.GlobalMin = self.GenPopVal[i]
self.GlobalLeaderPosition = self.GenPosition[i].copy()
if math.fabs(GlobalTrial - self.GlobalMin) < 1e-5:
self.GlobalLimitCount = self.GlobalLimitCount + 1
else:
self.GlobalLimitCount = 0
def LocalLearning(self):
OldMin = np.zeros(self.PopSize)
for k in range(self.GroupSize):
OldMin[k] = self.LocalMin[k]
for k in range(self.GroupSize):
i = int(self.GroupPoint[k][0])
while i <= int(self.GroupPoint[k][1]):
if self.GenPopVal[i] < self.LocalMin[k]:
self.LocalMin[k] = self.GenPopVal[i]
self.LocalLeaderPosition[k] = self.GenPosition[i].copy()
i = i + 1
for k in range(self.GroupSize):
if math.fabs(OldMin[k] - self.LocalMin[k]) < 1e-5:
self.LocalLimitCount[k] = self.LocalLimitCount[k] + 1
else:
self.LocalLimitCount[k] = 0
def CalculateProbabilities(self):
self.GenProb = [0 for _ in range(self.PopSize)]
MaxVal = self.GenPopVal[0]
i = 1
while i < self.PopSize:
if self.GenPopVal[i] > MaxVal:
MaxVal = self.GenPopVal[i]
i += 1
for i in range(self.PopSize):
self.GenProb[i] = (0.9 * (self.GenPopVal[i] / MaxVal)) + 0.1
def LocalLeaderPhase(self, k):
lo = int(self.GroupPoint[k][0])
hi = int(self.GroupPoint[k][1])
i = lo
while i <= hi:
NewGene1, NewGene2 = self.CrossoverOperation(self.GenPosition[i], self.LocalLeaderPosition[k])
NewGeneVal1, NewGeneVal2 = self.CalIndividualVal(NewGene1), self.CalIndividualVal(NewGene2)
if NewGeneVal1 < self.GenPopVal[i]:
self.GenPosition[i] = NewGene1
self.GenPopVal[i] = NewGeneVal1
if NewGeneVal2 < self.GenPopVal[i]:
self.GenPosition[i] = NewGene2
self.GenPopVal[i] = NewGeneVal2
i += 1
def GlobalLeaderPhase(self, k):
lo = int(self.GroupPoint[k][0])
hi = int(self.GroupPoint[k][1])
i = lo
l = lo
while l < hi:
if random.random() < self.GenProb[i]:
l += 1
NewGene1, NewGene2 = self.CrossoverOperation(self.GenPosition[i], self.GlobalLeaderPosition)
NewGeneVal1, NewGeneVal2 = self.CalIndividualVal(NewGene1), self.CalIndividualVal(NewGene2)
if NewGeneVal1 < self.GenPopVal[i]:
self.GenPosition[i] = NewGene1
self.GenPopVal[i] = NewGeneVal1
if NewGeneVal2 < self.GenPopVal[i]:
self.GenPosition[i] = NewGene2
self.GenPopVal[i] = NewGeneVal2
i += 1
if i == hi:
i = lo
def LocalLeaderDecision(self):
for k in range(self.GroupSize):
if self.LocalLimitCount[k] > self.LocalLimit:
i = self.GroupPoint[k][0]
while i <= int(self.GroupPoint[k][1]):
if random.random() >= self.CrossoverRatio:
NewGenPosition = list(range(self.Dim - self.MachineNum))
random.shuffle(NewGenPosition)
idx, prev = 0, 0
div = random.sample(range(len(NewGenPosition)), self.MachineNum - 1)
div.append(len(NewGenPosition))
div.sort(reverse=False)
for _ in range(1, self.MachineNum + 1):
NewGenPosition.append(div[idx] - prev)
prev = div[idx]
idx += 1
NewGenVal = self.CalIndividualVal(NewGenPosition)
if NewGenVal < self.GenPopVal[i]:
self.GenPosition[i] = NewGenPosition.copy()
self.GenPopVal[i] = NewGenVal
else:
NewGene1, NewGene2 = self.CrossoverOperation(self.GenPosition[i], self.GlobalLeaderPosition)
NewGeneVal1, NewGeneVal2 = self.CalIndividualVal(NewGene1), self.CalIndividualVal(NewGene2)
if NewGeneVal1 < self.GenPopVal[i]:
self.GenPosition[i] = NewGene1.copy()
self.GenPopVal[i] = NewGeneVal1
if NewGeneVal2 < self.GenPopVal[i]:
self.GenPosition[i] = NewGene2.copy()
self.GenPopVal[i] = NewGeneVal2
i += 1
self.LocalLimitCount[k] = 0
def GlobalLeaderDecision(self):
if self.GlobalLimitCount> self.GlobalLimit:
self.GroupPart += 1
self.GlobalLimitCount = 0
self.CreateGroup()
self.LocalLearning()
def CreateGroup(self):
g = 0
lo = 0
while lo < self.PopSize:
hi = lo + int(self.PopSize / self.GroupPart)
self.GroupPoint[g][0] = lo
self.GroupPoint[g][1] = hi
if self.PopSize - hi < int(self.PopSize / self.GroupPart):
self.GroupPoint[g][1] = (self.PopSize - 1)
g = g + 1
lo = hi + 1
self.GroupSize = g
def CrossoverOperation(self, gene1, gene2):
len_ = len(gene1)
sub1, sub2 = partially_mapped_crossover(gene1[0: len_ - self.MachineNum], gene2[0: len_ - self.MachineNum])
pos1, pos2 = random.randint(0, self.MachineNum - 1), random.randint(0, self.MachineNum - 1)
machine_assign1, machine_assign2 = gene1[len_ - self.MachineNum:], gene2[len_ - self.MachineNum:]
machine_assign1[pos1], machine_assign2[pos2] = machine_assign2[pos2], machine_assign1[pos1]
while sum(machine_assign1) != len_ - self.MachineNum:
machine_idx = random.randint(0, self.MachineNum - 1)
if machine_assign1[machine_idx] == 0:
continue
if sum(machine_assign1) > len_ - self.MachineNum:
machine_assign1[machine_idx] -= 1
else:
machine_assign1[machine_idx] += 1
while sum(machine_assign2) != len_ - self.MachineNum:
machine_idx = random.randint(0, self.MachineNum - 1)
if machine_assign2[machine_idx] == 0:
continue
if sum(machine_assign2) > len_ - self.MachineNum:
machine_assign2[machine_idx] -= 1
else:
machine_assign2[machine_idx] += 1
sub1.extend(machine_assign1)
sub2.extend(machine_assign2)
return sub1, sub2
def CalIndividualVal(self, gene):
ComponentNum = len(self.ComponentData)
assignment_result = [[0 for _ in range(ComponentNum)] for _ in range(self.MachineNum)]
idx, machine_index = 0, 0
for num in range(self.Dim - self.MachineNum, self.Dim):
for _ in range(gene[num]):
assignment_result[machine_index][self.CpIndex[gene[idx]]] += self.CpPoints[gene[idx]]
idx += 1
machine_index += 1
val = 0
cp_items = converter(self.PcbData, self.ComponentData, assignment_result)
for machine_index in range(self.MachineNum):
cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index]
val = max(val, self.Estimator.predict(cp_points, cp_nozzle, board_width, board_height))
return val
def spider_monkey_component_assignment(pcb_data, component_data, machine_number, estimator):
population_size, iteration_number = 20, 50
smo = SpiderMonkeyOpt(population_size, pcb_data, component_data, machine_number, estimator)
# ========================== Calling: GlobalLearning() ======================== #
smo.GlobalLearning()
# ========================== Calling: create_group() ========================== #
smo.CreateGroup()
# ========================= Calling: LocalLearning() ========================== #
smo.LocalLearning()
# ================================= Looping ================================== #
with tqdm(total=iteration_number) as pbar:
pbar.set_description('spider monkey algorithm process for PCB assembly line balance')
for _ in range(iteration_number):
for k in range(smo.GroupSize):
# ==================== Calling: LocalLeaderPhase() =================== #
smo.LocalLeaderPhase(k)
# =================== Calling: CalculateProbabilities() ================== #
smo.CalculateProbabilities()
for k in range(smo.GroupSize):
# ==================== Calling: GlobalLeaderPhase() ================== #
smo.GlobalLeaderPhase(k)
# ======================= Calling: GlobalLearning() ====================== #
smo.GlobalLearning()
# ======================= Calling: LocalLearning() ======================= #
smo.LocalLearning()
# ================== Calling: LocalLeaderDecision() ====================== #
smo.LocalLeaderDecision()
# ===================== Calling: GlobalLeaderDecision() ================== #
smo.GlobalLeaderDecision()
pbar.update(1)
assignment_result = [[0 for _ in range(len(component_data))] for _ in range(machine_number)]
idx, machine_index = 0, 0
for num in range(smo.Dim - machine_number, smo.Dim):
for _ in range(smo.GlobalLeaderPosition[num]):
assignment_result[machine_index][smo.CpIndex[smo.GlobalLeaderPosition[idx]]] += \
smo.CpPoints[smo.GlobalLeaderPosition[idx]]
idx += 1
machine_index += 1
return smo.GlobalMin, assignment_result
@timer_wrapper
def line_optimizer_reconfiguration(component_data, pcb_data, machine_number):
# === assignment of heads to modules is omitted ===
optimal_assignment, optimal_val = [], None
estimator = ReconfigEstimator() # element from list [0, 1, 2, 5, 10] task_block ~= cycle
# === assignment of components to heads
for i in range(5):
if i == 0:
# random
print('random component allocation algorithm process for PCB assembly line balance')
val, assignment = random_component_assignment(pcb_data, component_data, machine_number, estimator)
elif i == 1:
# spider monkey
val, assignment = spider_monkey_component_assignment(pcb_data, component_data, machine_number, estimator)
elif i == 2:
# local search
print('local search component allocation algorithm process for PCB assembly line balance')
val, assignment = local_search_component_assignment(pcb_data, component_data, machine_number, estimator)
elif i == 3:
# evolutionary
val, assignment = evolutionary_component_assignment(pcb_data, component_data, machine_number, estimator)
else:
# brute force
# which is proved to be useless, since it only ran in reasonable time for the smaller test instances
continue
if optimal_val is None or val < optimal_val:
optimal_val, optimal_assignment = val, assignment.copy()
if optimal_val is None:
raise Exception('no feasible solution! ')
return optimal_assignment