import os import pickle import random import numpy as np import pandas as pd import torch.nn from base_optimizer.optimizer_interface import * from generator import * from estimator import * os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' class Heuristic: @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): return -1 class LeastPoints(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_points = [] for machine_idx in machine_assign: if len(cp_assign[machine_idx]) == 0: return machine_idx machine_points.append(sum([cp_points[cp_idx] for cp_idx in cp_assign[machine_idx]])) return machine_assign[np.argmin(machine_points)] class LeastNzTypes(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_nozzle = [] for machine_idx in machine_assign: if len(cp_assign[machine_idx]) == 0: return machine_idx machine_nozzle.append([cp_nozzle[cp_idx] for cp_idx in cp_assign[machine_idx]]) index = np.argmin( [len(set(nozzle)) + 1e-5 * sum(cp_points[c] for c in cp_assign[machine_idx]) for machine_idx, nozzle in enumerate(machine_nozzle)]) return machine_assign[index] class LeastCpTypes(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_types = [] if len(machine_assign) == 1: return machine_assign[0] for machine_idx in machine_assign: machine_types.append( len(cp_assign[machine_idx]) + 1e-5 * sum(cp_points[cp] for cp in cp_assign[machine_idx])) return machine_assign[np.argmin(machine_types)] class LeastCpNzRatio(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_nz_type, machine_cp_type = [], [] if len(machine_assign) == 1: return machine_assign[0] for machine_idx in machine_assign: if len(cp_assign[machine_idx]) == 0: return machine_idx machine_nz_type.append(set(cp_nozzle[cp_idx] for cp_idx in cp_assign[machine_idx])) machine_cp_type.append(len(cp_assign[machine_idx])) min_idx = np.argmin([(machine_cp_type[idx] + 1e-5 * sum(cp_points[c] for c in cp_assign[idx])) / ( len(machine_nz_type[idx]) + 1e-5) for idx in range(len(machine_assign))]) return machine_assign[min_idx] def nozzle_assignment(cp_points, cp_nozzle, cp_assign): nozzle_heads, nozzle_points = defaultdict(int), defaultdict(int) for cp_idx in cp_assign: nozzle_points[cp_nozzle[cp_idx]] += cp_points[cp_idx] nozzle_heads[cp_nozzle[cp_idx]] = 1 while sum(nozzle_heads.values()) != max_head_index: max_cycle_nozzle = None for nozzle, head_num in nozzle_heads.items(): if max_cycle_nozzle is None or nozzle_points[nozzle] / head_num > nozzle_points[max_cycle_nozzle] / \ nozzle_heads[max_cycle_nozzle]: max_cycle_nozzle = nozzle assert max_cycle_nozzle is not None nozzle_heads[max_cycle_nozzle] += 1 return nozzle_heads, nozzle_points class LeastCycle(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_cycle = [] for machine_idx in machine_assign: assign_component = cp_assign[machine_idx] if len(assign_component) == 0: return machine_idx nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component) machine_cycle.append( max(nozzle_points[nozzle] / head for nozzle, head in nozzle_heads.items()) + 1e-5 * sum( cp_points[c] for c in cp_assign[machine_idx])) return machine_assign[np.argmin(machine_cycle)] class LeastNzChange(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_nozzle_change = [] for machine_idx in machine_assign: assign_component = cp_assign[machine_idx] if len(assign_component) == 0: return machine_idx heads_points = [] nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component) for nozzle, head in nozzle_heads.items(): for _ in range(head): heads_points.append(nozzle_points[nozzle] / nozzle_heads[nozzle]) machine_nozzle_change.append(np.std(heads_points) + 1e-5 * sum(cp_points[c] for c in cp_assign[machine_idx])) return machine_assign[np.argmin(machine_nozzle_change)] class LeastPickup(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign, machine_assign): if len(machine_assign) == 1: return machine_assign[0] machine_pick_up = [] for machine_idx in machine_assign: assign_component = cp_assign[machine_idx] if len(assign_component) == 0: return machine_idx nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component) nozzle_level, nozzle_counter = defaultdict(int), defaultdict(int) level_points = defaultdict(int) for cp_idx in sorted(assign_component, key=lambda x: cp_points[x], reverse=True): nozzle, points = cp_nozzle[cp_idx], cp_points[cp_idx] if nozzle_counter[nozzle] and nozzle_counter[nozzle] % nozzle_heads[nozzle] == 0: nozzle_level[nozzle] += 1 level = nozzle_level[nozzle] level_points[level] = max(level_points[level], points) nozzle_counter[nozzle] += 1 machine_pick_up.append(sum(points for points in level_points.values()) + 1e-5 * sum( cp_points[idx] for idx in cp_assign[machine_idx])) return machine_assign[np.argmin(machine_pick_up)] def generate_pattern(heuristic_map, cp_points): """ Generates a random pattern. :return: The generated pattern string. """ return "".join([random.choice(list(heuristic_map.keys())) for _ in range(random.randrange(1, len(cp_points)))]) def crossover(cp_points, parent1, parent2): """ Attempt to perform crossover between two chromosomes. :param parent1: The first parent. :param parent2: The second parent. :return: The two individuals after crossover has been performed. """ point1, point2 = random.randrange(len(parent1)), random.randrange(len(parent2)) substr1, substr2 = parent1[point1:], parent2[point2:] offspring1, offspring2 = "".join((parent1[:point1], substr2)), "".join((parent2[:point2], substr1)) return offspring1[:len(cp_points)], offspring2[:len(cp_points)] def mutation(heuristic_map, cp_points, individual): """ Attempts to mutate the individual by replacing a random heuristic in the chromosome by a generated pattern. :param individual: The individual to mutate. :return: The mutated individual. """ pattern = list(individual) mutation_point = random.randrange(len(pattern)) pattern[mutation_point] = generate_pattern(heuristic_map, cp_points) return ''.join(pattern)[:len(cp_points)] def population_initialization(population_size, heuristic_map, cp_points): return [generate_pattern(heuristic_map, cp_points) for _ in range(population_size)] def convert_assignment_result(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, component_list, individual, machine_number): cp_assign = [[] for _ in range(machine_number)] machine_all, machine_assign = list(range(machine_number)), defaultdict(set) for idx, div_cp_idx in enumerate(component_list): h = individual[idx % len(individual)] cp_idx = cp_index[div_cp_idx] if len(machine_assign[cp_idx]) < cp_feeders[cp_idx]: machine_idx = heuristic_map[h].apply(cp_points, cp_nozzle, cp_assign, machine_all) else: machine_idx = heuristic_map[h].apply(cp_points, cp_nozzle, cp_assign, list(machine_assign[cp_idx])) cp_assign[machine_idx].append(div_cp_idx) machine_assign[cp_idx].add(machine_idx) return cp_assign def cal_individual_val(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, board_width, board_height, component_list, individual, machine_number, estimator): machine_cp_assign = convert_assignment_result(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, component_list, individual, machine_number) component_number = len(cp_feeders) machine_cp_points = [[0 for _ in range(component_number)] for _ in range(machine_number)] for machine_idx in range(machine_number): for idx in machine_cp_assign[machine_idx]: machine_cp_points[machine_idx][cp_index[idx]] += cp_points[idx] machine_cp_feeders = [[0 for _ in range(component_number)] for _ in range(machine_number)] for cp_idx in range(component_number): feeder_nums = cp_feeders[cp_idx] for machine_idx in range(machine_number): if machine_cp_points[machine_idx][cp_idx]: machine_cp_feeders[machine_idx][cp_idx] = 1 feeder_nums -= 1 while feeder_nums > 0: assign_machine = None for machine_idx in range(machine_number): if machine_cp_points[machine_idx][cp_idx] == 0: continue if assign_machine is None: assign_machine = machine_idx continue if machine_cp_points[assign_machine][cp_idx] / machine_cp_feeders[assign_machine][cp_idx] \ < machine_cp_points[machine_idx][cp_idx] / machine_cp_feeders[machine_idx][cp_idx]: assign_machine = machine_idx machine_cp_feeders[assign_machine][cp_idx] += 1 feeder_nums -= 1 nozzle_type = defaultdict(str) for idx, cp_idx in cp_index.items(): nozzle_type[cp_idx] = cp_nozzle[idx] objective_val = [] for machine_idx in range(machine_number): div_cp_points, div_cp_nozzle = defaultdict(int), defaultdict(str) idx = 0 for cp_idx in range(component_number): total_points = machine_cp_points[machine_idx][cp_idx] if total_points == 0: continue div_index = 0 div_points = [total_points // machine_cp_feeders[machine_idx][cp_idx] for _ in range(machine_cp_feeders[machine_idx][cp_idx])] while sum(div_points) < total_points: div_points[div_index] += 1 div_index += 1 for points in div_points: div_cp_points[idx] = points div_cp_nozzle[idx] = nozzle_type[cp_idx] idx += 1 objective_val.append(estimator.predict(div_cp_points, div_cp_nozzle, board_width, board_height)) return objective_val def line_optimizer_hyperheuristic(component_data, pcb_data, machine_number): heuristic_map = { 'p': LeastPoints, 'n': LeastNzChange, 'c': LeastCpTypes, 'r': LeastCpNzRatio, 'k': LeastCycle, 'g': LeastNzChange, 'u': LeastPickup, } # genetic-based hyper-heuristic crossover_rate, mutation_rate = 0.6, 0.1 population_size, n_generations = 20, 50 n_iterations = 10 estimator = NeuralEstimator() best_val = None best_heuristic_list = None best_component_list = None cp_feeders, cp_nozzle = defaultdict(int), defaultdict(str) cp_points, cp_index = defaultdict(int), defaultdict(int) division_component_data = pd.DataFrame(columns=component_data.columns) division_points = min(component_data['points']) idx = 0 for cp_idx, data in component_data.iterrows(): cp_feeders[cp_idx] = data['fdn'] division_data = copy.deepcopy(data) feeder_limit, total_points = division_data.fdn, division_data.points feeder_limit = max(total_points // division_points * 3, feeder_limit) surplus_points = total_points % feeder_limit for _ in range(feeder_limit): division_data.fdn, division_data.points = 1, math.floor(total_points / feeder_limit) if surplus_points: division_data.points += 1 surplus_points -= 1 cp_points[idx], cp_nozzle[idx] = division_data.points, division_data.nz cp_index[idx] = cp_idx idx += 1 division_component_data = pd.concat([division_component_data, pd.DataFrame(division_data).T]) division_component_data = division_component_data.reset_index() component_list = [idx for idx, data in division_component_data.iterrows() if data.points > 0] board_width, board_height = pcb_data['x'].max() - pcb_data['x'].min(), pcb_data['y'].max() - pcb_data['y'].min() with tqdm(total=n_generations * n_iterations) as pbar: pbar.set_description('hyper-heuristic algorithm process for PCB assembly line balance') for _ in range(n_iterations): random.shuffle(component_list) new_population = [] population = population_initialization(population_size, heuristic_map, cp_points) # calculate fitness value pop_val = [] for individual in population: val = cal_individual_val(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, board_width, board_height, component_list, individual, machine_number, estimator) pop_val.append(max(val)) for _ in range(n_generations): select_index = get_top_k_value(pop_val, population_size - len(new_population), reverse=False) population = [population[idx] for idx in select_index] pop_val = [pop_val[idx] for idx in select_index] population += new_population for individual in new_population: val = cal_individual_val(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, board_width, board_height, component_list, individual, machine_number, estimator) pop_val.append(max(val)) # min-max convert max_val = max(pop_val) sel_pop_val = list(map(lambda v: max_val - v, pop_val)) sum_pop_val = sum(sel_pop_val) + 1e-10 sel_pop_val = [v / sum_pop_val + 1e-3 for v in sel_pop_val] # crossover and mutation new_population = [] for pop in range(population_size): if pop % 2 == 0 and np.random.random() < crossover_rate: index1 = roulette_wheel_selection(sel_pop_val) while True: index2 = roulette_wheel_selection(sel_pop_val) if index1 != index2: break offspring1, offspring2 = crossover(cp_points, population[index1], population[index2]) if np.random.random() < mutation_rate: offspring1 = mutation(heuristic_map, cp_points, offspring1) if np.random.random() < mutation_rate: offspring2 = mutation(heuristic_map, cp_points, offspring2) new_population.append(offspring1) new_population.append(offspring2) if len(new_population) >= population_size * crossover_rate: break pbar.update(1) val = cal_individual_val(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, board_width, board_height, component_list, population[0], machine_number, estimator) machine_assign = convert_assignment_result(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, component_list, population[0], machine_number) assignment_result = [[0 for _ in range(len(component_data))] for _ in range(machine_number)] for machine_idx in range(machine_number): for idx in machine_assign[machine_idx]: assignment_result[machine_idx][cp_index[idx]] += cp_points[idx] partial_pcb_data, partial_component_data = convert_line_assigment(pcb_data, component_data, assignment_result) max_machine_idx = np.argmax(val) val = exact_assembly_time(partial_pcb_data[max_machine_idx], partial_component_data[max_machine_idx]) if best_val is None or val < best_val: for machine_idx in range(machine_number): if machine_idx == max_machine_idx: continue val = max(val, exact_assembly_time(partial_pcb_data[machine_idx], partial_component_data[machine_idx])) if best_val is None or val < best_val: best_val = val best_heuristic_list = population[0] best_component_list = component_list.copy() val = cal_individual_val(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, board_width, board_height, best_component_list, best_heuristic_list, machine_number, estimator) print(val) machine_cp_points = convert_assignment_result(heuristic_map, cp_index, cp_points, cp_nozzle, cp_feeders, best_component_list, best_heuristic_list, machine_number) assignment_result = [[0 for _ in range(len(component_data))] for _ in range(machine_number)] for machine_idx in range(machine_number): for idx in machine_cp_points[machine_idx]: assignment_result[machine_idx][cp_index[idx]] += cp_points[idx] return assignment_result