from base_optimizer.optimizer_common import * from estimator import * def random_component_assignment(pcb_data, component_data, machine_number, estimator=None): # == the set of feasible component type for each nozzle type nozzle_part_list = defaultdict(list) component_points = [] for idx, data in component_data.iterrows(): component_points.append(data.points) nozzle_part_list[data.nz].append(idx) component_number = len(component_data) assignment_result = [[0 for _ in range(component_number)] for _ in range(machine_number)] # === ensure every nozzle types === selected_part = [] for part_list in nozzle_part_list.values(): part = random.sample(part_list, 1)[0] machine_index = random.randint(0, machine_number - 1) assignment_result[machine_index][part] += 1 component_points[part] -= 1 selected_part.append(part) # === assign one placement which has not been selected === for part in range(component_number): if part in selected_part: continue assignment_result[random.randint(0, machine_number - 1)][part] += 1 component_points[part] -= 1 machine_assign = list(range(machine_number)) random.shuffle(machine_assign) finished_assign_counter = component_points.count(0) while finished_assign_counter < component_number: for machine_index in machine_assign: part = random.randint(0, component_number - 1) feeder_counter = 0 for idx in range(machine_number): if assignment_result[idx][part] > 0 or idx == machine_index: feeder_counter += 1 if component_points[part] == 0 or feeder_counter > component_data.iloc[part].fdn: continue # feeder limit restriction points = random.randint(1, component_points[part]) assignment_result[machine_index][part] += points component_points[part] -= points if component_points[part] == 0: finished_assign_counter += 1 assert sum(component_points) == 0 objective_value = 0 cp_items = converter(pcb_data, component_data, assignment_result) for machine_index in range(machine_number): cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index] objective_value = max(objective_value, estimator.predict(cp_points, cp_nozzle, board_width, board_height)) return objective_value, assignment_result def greedy_component_assignment(component_points, component_nozzle, component_feeders, task_block_weight): pass # 不清楚原文想说什么 def local_search_component_assignment(pcb_data, component_data, machine_number, estimator): # maximum number of iterations : 5000 # maximum number of unsuccessful iterations: 50 component_number = len(component_data) iteration_counter, unsuccessful_iteration_counter = 5000, 50 optimal_val, optimal_assignment = random_component_assignment(pcb_data, component_data, machine_number, estimator) for _ in range(iteration_counter): machine_idx = random.randint(0, machine_number - 1) if sum(optimal_assignment[machine_idx]) == 0: continue part_set = [] for part_idx in range(component_number): if optimal_assignment[machine_idx][part_idx] != 0: part_set.append(part_idx) part_idx = random.sample(part_set, 1)[0] r = random.randint(1, optimal_assignment[machine_idx][part_idx]) assignment = copy.deepcopy(optimal_assignment) cyclic_counter = 0 swap_machine_idx = None swap_available = False while cyclic_counter <= 2 * machine_number: cyclic_counter += 1 swap_machine_idx = random.randint(0, machine_number - 1) feeder_available = 0 for machine in range(machine_number): if optimal_assignment[machine][part_idx] or machine == swap_machine_idx: feeder_available += 1 if feeder_available <= component_data.iloc[part_idx].fdn and swap_machine_idx != machine_idx: swap_available = True break assert swap_machine_idx is not None if swap_available: assignment[machine_idx][part_idx] -= r assignment[swap_machine_idx][part_idx] += r val = 0 cp_items = converter(pcb_data, component_data, assignment) for machine_index in range(machine_number): cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index] val = max(val, estimator.predict(cp_points, cp_nozzle, board_width, board_height)) if val < optimal_val: optimal_assignment, optimal_val = assignment, val unsuccessful_iteration_counter = 50 else: unsuccessful_iteration_counter -= 1 if unsuccessful_iteration_counter <= 0: break return optimal_val, optimal_assignment def reconfig_crossover_operation(component_data, parent1, parent2, machine_number): offspring1, offspring2 = copy.deepcopy(parent1), copy.deepcopy(parent2) component_number = len(component_data) # === crossover === mask_bit = [] for _ in range(machine_number): mask_bit.append(random.randint(0, 1)) if sum(mask_bit) == 0 or sum(mask_bit) == machine_number: return offspring1, offspring2 for machine_index in range(machine_number): if mask_bit: offspring1[machine_index] = copy.deepcopy(parent1[machine_index]) offspring2[machine_index] = copy.deepcopy(parent2[machine_index]) else: offspring1[machine_index] = copy.deepcopy(parent2[machine_index]) offspring2[machine_index] = copy.deepcopy(parent1[machine_index]) # === balancing === # equally to reach the correct number for part_index in range(component_number): for offspring in [offspring1, offspring2]: additional_points = sum([offspring[mt][part_index] for mt in range(machine_number)]) - \ component_data.iloc[part_index]['points'] if additional_points > 0: # if a component type has more placements, decrease the assigned values on every head equally keeping # the proportion of the number of placement among the heads points_list = [] for machine_index in range(machine_number): points = math.floor( additional_points * offspring[machine_index][part_index] / component_data[part_index]['points']) points_list.append(points) offspring[machine_index][part_index] -= points additional_points -= sum(points_list) for machine_index in range(machine_number): if additional_points == 0: break if offspring[machine_index][part_index] == 0: continue offspring[machine_index][part_index] -= 1 additional_points += 1 elif additional_points < 0: # otherwise, increase the assigned nonzero values equally machine_set = [] for machine_index in range(machine_number): if offspring[machine_index][part_index] == 0: continue machine_set.append(machine_index) points = -math.ceil(additional_points / len(machine_set)) for machine_index in machine_set: offspring[machine_index][part_index] += points additional_points += points for machine_index in machine_set: if additional_points == 0: break offspring[machine_index][part_index] += 1 additional_points -= 1 return offspring1, offspring2 def reconfig_mutation_operation(component_data, parent, machine_number): offspring = copy.deepcopy(parent) swap_direction = random.randint(0, 1) if swap_direction: swap_machine1, swap_machine2 = random.sample(list(range(machine_number)), 2) else: swap_machine2, swap_machine1 = random.sample(list(range(machine_number)), 2) component_list = [] for component_index, points in enumerate(offspring[swap_machine1]): if points: component_list.append(component_index) if len(component_list) == 0: return offspring swap_component_index = random.sample(component_list, 1)[0] swap_points = random.randint(1, offspring[swap_machine1][swap_component_index]) offspring[swap_machine1][swap_component_index] -= swap_points offspring[swap_machine2][swap_component_index] += swap_points feeder_counter = 0 for machine_index in range(machine_number): if offspring[machine_index][swap_component_index]: feeder_counter += 1 if feeder_counter > component_data.iloc[swap_component_index].fdn: return parent return offspring def evolutionary_component_assignment(pcb_data, component_data, machine_number, estimator): # population size: 10 # probability of the mutation: 0.1 # probability of the crossover: 0.8 # number of generation: 100 population_size = 10 generation_number = 100 mutation_rate, crossover_rate = 0.1, 0.8 population, pop_val = [], [] for _ in range(population_size): population.append(random_component_assignment(pcb_data, component_data, machine_number, estimator)[1]) cp_items = converter(pcb_data, component_data, population[-1]) val = 0 for machine_index in range(machine_number): cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index] val = max(val, estimator.predict(cp_points, cp_nozzle, board_width, board_height)) pop_val.append(val) with tqdm(total=generation_number) as pbar: pbar.set_description('evolutionary algorithm process for PCB assembly line balance') new_population = [] for _ in range(generation_number): population += new_population # calculate fitness value for individual in new_population: val = 0 cp_items = converter(pcb_data, component_data, individual) for machine_index in range(machine_number): cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index] val = max(val, estimator.predict(cp_points, cp_nozzle, board_width, board_height)) pop_val.append(val) select_index = get_top_k_value(pop_val, population_size, reverse=False) population = [population[idx] for idx in select_index] pop_val = [pop_val[idx] for idx in select_index] # min-max convert max_val = max(pop_val) pop_val_sel = list(map(lambda v: max_val - v, pop_val)) sum_pop_val = sum(pop_val_sel) + 1e-10 pop_val_sel = [v / sum_pop_val + 1e-3 for v in pop_val_sel] # crossover and mutation new_population = [] for pop in range(population_size): if pop % 2 == 0 and np.random.random() < crossover_rate: index1 = roulette_wheel_selection(pop_val_sel) while True: index2 = roulette_wheel_selection(pop_val_sel) if index1 != index2: break offspring1, offspring2 = reconfig_crossover_operation(component_data, population[index1], population[index2], machine_number) if np.random.random() < mutation_rate: offspring1 = reconfig_mutation_operation(component_data, offspring1, machine_number) if np.random.random() < mutation_rate: offspring2 = reconfig_mutation_operation(component_data, offspring2, machine_number) new_population.append(offspring1) new_population.append(offspring2) pbar.update(1) return min(pop_val), population[np.argmin(pop_val)] class SpiderMonkeyOpt: def __init__(self, pop_size, pcb_data, component_data, machine_number, estimator): self.PcbData = pcb_data self.ComponentData = component_data self.Estimator = estimator self.PopSize = pop_size self.LocalLimit = pop_size self.GlobalLimit = pop_size self.MachineNum = machine_number self.GroupSize = 0 # self.Dim = sum(data.fdn for _, data in component_data.iterrows()) + machine_number self.CpPoints = defaultdict(int) self.CpIndex = defaultdict(int) self.CpNozzle = defaultdict(str) self.Dim = 0 for cp_idx, data in component_data.iterrows(): # cp_feeders[cp_idx] = data.fdn division_data = copy.deepcopy(data) feeder_limit, total_points = division_data.fdn, division_data.points surplus_points = total_points % feeder_limit for _ in range(feeder_limit): division_data.fdn, division_data.points = 1, math.floor(total_points / feeder_limit) if surplus_points: division_data.points += 1 surplus_points -= 1 self.CpPoints[self.Dim], self.CpNozzle[self.Dim] = division_data.points, division_data.nz self.CpIndex[self.Dim] = cp_idx self.Dim += 1 self.Dim += machine_number component_list = list(range(len(self.CpPoints))) self.GenPosition = [] self.GenPopVal = [] for _ in range(self.PopSize): random.shuffle(component_list) self.GenPosition.append(component_list.copy()) idx, prev = 0, 0 div = random.sample(range(len(component_list)), self.MachineNum - 1) div.append(len(component_list)) div.sort(reverse=False) for _ in range(1, self.MachineNum + 1): self.GenPosition[-1].append(div[idx] - prev) prev = div[idx] idx += 1 self.GenPopVal.append(self.CalIndividualVal(self.GenPosition[-1])) self.GroupPoint = [[0 for _ in range(2)] for _ in range(self.PopSize)] self.GroupPart = 1 self.GenProb = None self.GlobalMin = self.GenPopVal[0] self.GlobalLeaderPosition = self.GenPosition[0].copy() self.GlobalLimitCount = 0 self.LocalMin = np.ones(self.PopSize) * 1e10 self.LocalLeaderPosition = [[0 for _ in range(self.Dim)] for _ in range(self.PopSize)] self.LocalLimitCount = [0 for _ in range(self.PopSize)] for k in range(self.GroupSize): self.LocalMin[k] = self.GenPopVal[int(self.GroupPoint[k, 0])] self.LocalLeaderPosition[k,:] = self.GenPosition[int(self.GroupPoint[k, 0]),:] self.CrossoverRatio = 0.1 def GlobalLearning(self): GlobalTrial = self.GlobalMin for i in range(self.PopSize): if self.GenPopVal[i] < self.GlobalMin: self.GlobalMin = self.GenPopVal[i] self.GlobalLeaderPosition = self.GenPosition[i].copy() if math.fabs(GlobalTrial - self.GlobalMin) < 1e-5: self.GlobalLimitCount = self.GlobalLimitCount + 1 else: self.GlobalLimitCount = 0 def LocalLearning(self): OldMin = np.zeros(self.PopSize) for k in range(self.GroupSize): OldMin[k] = self.LocalMin[k] for k in range(self.GroupSize): i = int(self.GroupPoint[k][0]) while i <= int(self.GroupPoint[k][1]): if self.GenPopVal[i] < self.LocalMin[k]: self.LocalMin[k] = self.GenPopVal[i] self.LocalLeaderPosition[k] = self.GenPosition[i].copy() i = i + 1 for k in range(self.GroupSize): if math.fabs(OldMin[k] - self.LocalMin[k]) < 1e-5: self.LocalLimitCount[k] = self.LocalLimitCount[k] + 1 else: self.LocalLimitCount[k] = 0 def CalculateProbabilities(self): self.GenProb = [0 for _ in range(self.PopSize)] MaxVal = self.GenPopVal[0] i = 1 while i < self.PopSize: if self.GenPopVal[i] > MaxVal: MaxVal = self.GenPopVal[i] i += 1 for i in range(self.PopSize): self.GenProb[i] = (0.9 * (self.GenPopVal[i] / MaxVal)) + 0.1 def LocalLeaderPhase(self, k): lo = int(self.GroupPoint[k][0]) hi = int(self.GroupPoint[k][1]) i = lo while i <= hi: NewGene1, NewGene2 = self.CrossoverOperation(self.GenPosition[i], self.LocalLeaderPosition[k]) NewGeneVal1, NewGeneVal2 = self.CalIndividualVal(NewGene1), self.CalIndividualVal(NewGene2) if NewGeneVal1 < self.GenPopVal[i]: self.GenPosition[i] = NewGene1 self.GenPopVal[i] = NewGeneVal1 if NewGeneVal2 < self.GenPopVal[i]: self.GenPosition[i] = NewGene2 self.GenPopVal[i] = NewGeneVal2 i += 1 def GlobalLeaderPhase(self, k): lo = int(self.GroupPoint[k][0]) hi = int(self.GroupPoint[k][1]) i = lo l = lo while l < hi: if random.random() < self.GenProb[i]: l += 1 NewGene1, NewGene2 = self.CrossoverOperation(self.GenPosition[i], self.GlobalLeaderPosition) NewGeneVal1, NewGeneVal2 = self.CalIndividualVal(NewGene1), self.CalIndividualVal(NewGene2) if NewGeneVal1 < self.GenPopVal[i]: self.GenPosition[i] = NewGene1 self.GenPopVal[i] = NewGeneVal1 if NewGeneVal2 < self.GenPopVal[i]: self.GenPosition[i] = NewGene2 self.GenPopVal[i] = NewGeneVal2 i += 1 if i == hi: i = lo def LocalLeaderDecision(self): for k in range(self.GroupSize): if self.LocalLimitCount[k] > self.LocalLimit: i = self.GroupPoint[k][0] while i <= int(self.GroupPoint[k][1]): if random.random() >= self.CrossoverRatio: NewGenPosition = list(range(self.Dim - self.MachineNum)) random.shuffle(NewGenPosition) idx, prev = 0, 0 div = random.sample(range(len(NewGenPosition)), self.MachineNum - 1) div.append(len(NewGenPosition)) div.sort(reverse=False) for _ in range(1, self.MachineNum + 1): NewGenPosition.append(div[idx] - prev) prev = div[idx] idx += 1 NewGenVal = self.CalIndividualVal(NewGenPosition) if NewGenVal < self.GenPopVal[i]: self.GenPosition[i] = NewGenPosition.copy() self.GenPopVal[i] = NewGenVal else: NewGene1, NewGene2 = self.CrossoverOperation(self.GenPosition[i], self.GlobalLeaderPosition) NewGeneVal1, NewGeneVal2 = self.CalIndividualVal(NewGene1), self.CalIndividualVal(NewGene2) if NewGeneVal1 < self.GenPopVal[i]: self.GenPosition[i] = NewGene1.copy() self.GenPopVal[i] = NewGeneVal1 if NewGeneVal2 < self.GenPopVal[i]: self.GenPosition[i] = NewGene2.copy() self.GenPopVal[i] = NewGeneVal2 i += 1 self.LocalLimitCount[k] = 0 def GlobalLeaderDecision(self): if self.GlobalLimitCount> self.GlobalLimit: self.GroupPart += 1 self.GlobalLimitCount = 0 self.CreateGroup() self.LocalLearning() def CreateGroup(self): g = 0 lo = 0 while lo < self.PopSize: hi = lo + int(self.PopSize / self.GroupPart) self.GroupPoint[g][0] = lo self.GroupPoint[g][1] = hi if self.PopSize - hi < int(self.PopSize / self.GroupPart): self.GroupPoint[g][1] = (self.PopSize - 1) g = g + 1 lo = hi + 1 self.GroupSize = g def CrossoverOperation(self, gene1, gene2): len_ = len(gene1) sub1, sub2 = partially_mapped_crossover(gene1[0: len_ - self.MachineNum], gene2[0: len_ - self.MachineNum]) pos1, pos2 = random.randint(0, self.MachineNum - 1), random.randint(0, self.MachineNum - 1) machine_assign1, machine_assign2 = gene1[len_ - self.MachineNum:], gene2[len_ - self.MachineNum:] machine_assign1[pos1], machine_assign2[pos2] = machine_assign2[pos2], machine_assign1[pos1] while sum(machine_assign1) != len_ - self.MachineNum: machine_idx = random.randint(0, self.MachineNum - 1) if machine_assign1[machine_idx] == 0: continue if sum(machine_assign1) > len_ - self.MachineNum: machine_assign1[machine_idx] -= 1 else: machine_assign1[machine_idx] += 1 while sum(machine_assign2) != len_ - self.MachineNum: machine_idx = random.randint(0, self.MachineNum - 1) if machine_assign2[machine_idx] == 0: continue if sum(machine_assign2) > len_ - self.MachineNum: machine_assign2[machine_idx] -= 1 else: machine_assign2[machine_idx] += 1 sub1.extend(machine_assign1) sub2.extend(machine_assign2) return sub1, sub2 def CalIndividualVal(self, gene): ComponentNum = len(self.ComponentData) assignment_result = [[0 for _ in range(ComponentNum)] for _ in range(self.MachineNum)] idx, machine_index = 0, 0 for num in range(self.Dim - self.MachineNum, self.Dim): for _ in range(gene[num]): assignment_result[machine_index][self.CpIndex[gene[idx]]] += self.CpPoints[gene[idx]] idx += 1 machine_index += 1 val = 0 cp_items = converter(self.PcbData, self.ComponentData, assignment_result) for machine_index in range(self.MachineNum): cp_points, cp_nozzle, board_width, board_height = cp_items[machine_index] val = max(val, self.Estimator.predict(cp_points, cp_nozzle, board_width, board_height)) return val def spider_monkey_component_assignment(pcb_data, component_data, machine_number, estimator): population_size, iteration_number = 20, 50 smo = SpiderMonkeyOpt(population_size, pcb_data, component_data, machine_number, estimator) # ========================== Calling: GlobalLearning() ======================== # smo.GlobalLearning() # ========================== Calling: create_group() ========================== # smo.CreateGroup() # ========================= Calling: LocalLearning() ========================== # smo.LocalLearning() # ================================= Looping ================================== # with tqdm(total=iteration_number) as pbar: pbar.set_description('spider monkey algorithm process for PCB assembly line balance') for _ in range(iteration_number): for k in range(smo.GroupSize): # ==================== Calling: LocalLeaderPhase() =================== # smo.LocalLeaderPhase(k) # =================== Calling: CalculateProbabilities() ================== # smo.CalculateProbabilities() for k in range(smo.GroupSize): # ==================== Calling: GlobalLeaderPhase() ================== # smo.GlobalLeaderPhase(k) # ======================= Calling: GlobalLearning() ====================== # smo.GlobalLearning() # ======================= Calling: LocalLearning() ======================= # smo.LocalLearning() # ================== Calling: LocalLeaderDecision() ====================== # smo.LocalLeaderDecision() # ===================== Calling: GlobalLeaderDecision() ================== # smo.GlobalLeaderDecision() pbar.update(1) assignment_result = [[0 for _ in range(len(component_data))] for _ in range(machine_number)] idx, machine_index = 0, 0 for num in range(smo.Dim - machine_number, smo.Dim): for _ in range(smo.GlobalLeaderPosition[num]): assignment_result[machine_index][smo.CpIndex[smo.GlobalLeaderPosition[idx]]] += \ smo.CpPoints[smo.GlobalLeaderPosition[idx]] idx += 1 machine_index += 1 return smo.GlobalMin, assignment_result @timer_wrapper def line_optimizer_reconfiguration(component_data, pcb_data, machine_number): # === assignment of heads to modules is omitted === optimal_assignment, optimal_val = [], None estimator = ReconfigEstimator() # element from list [0, 1, 2, 5, 10] task_block ~= cycle # === assignment of components to heads for i in range(5): if i == 0: # random print('random component allocation algorithm process for PCB assembly line balance') val, assignment = random_component_assignment(pcb_data, component_data, machine_number, estimator) elif i == 1: # spider monkey val, assignment = spider_monkey_component_assignment(pcb_data, component_data, machine_number, estimator) elif i == 2: # local search print('local search component allocation algorithm process for PCB assembly line balance') val, assignment = local_search_component_assignment(pcb_data, component_data, machine_number, estimator) elif i == 3: # evolutionary val, assignment = evolutionary_component_assignment(pcb_data, component_data, machine_number, estimator) else: # brute force # which is proved to be useless, since it only ran in reasonable time for the smaller test instances continue if optimal_val is None or val < optimal_val: optimal_val, optimal_assignment = val, assignment.copy() if optimal_val is None: raise Exception('no feasible solution! ') return optimal_assignment