import os import pickle import random import numpy as np import pandas as pd import torch.nn from base_optimizer.optimizer_interface import * from generator import * from estimator import * os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' class Heuristic: @staticmethod def apply(cp_points, cp_nozzle, cp_assign): return -1 class LeastPoints(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): machine_points = [] for machine_idx in range(len(cp_assign)): if len(cp_assign[machine_idx]) == 0: return machine_idx machine_points.append(sum([cp_points[cp_idx] for cp_idx in cp_assign[machine_idx]])) return np.argmin(machine_points) class LeastNzTypes(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): machine_nozzle = [] for machine_idx in range(len(cp_assign)): if len(cp_assign[machine_idx]) == 0: return machine_idx machine_nozzle.append([cp_nozzle[cp_idx] for cp_idx in cp_assign[machine_idx]]) return np.argmin([len(set(nozzle)) for nozzle in machine_nozzle]) class LeastCpTypes(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): return np.argmin([len(cp) for cp in cp_assign]) class LeastCpNzRatio(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): machine_nz_type, machine_cp_type = [], [] for machine_idx in range(len(cp_assign)): if len(cp_assign[machine_idx]) == 0: return machine_idx machine_nz_type.append([cp_nozzle[cp_idx] for cp_idx in cp_assign[machine_idx]]) machine_cp_type.append(cp_assign[machine_idx]) return np.argmin( [len(machine_cp_type[machine_idx]) / (len(machine_nz_type[machine_idx]) + 1e-5) for machine_idx in range(len(cp_assign))]) def nozzle_assignment(cp_points, cp_nozzle, cp_assign): nozzle_heads, nozzle_points = defaultdict(int), defaultdict(int) for cp_idx in cp_assign: nozzle_points[cp_nozzle[cp_idx]] += cp_points[cp_idx] nozzle_heads[cp_nozzle[cp_idx]] = 1 while sum(nozzle_heads.values()) != max_head_index: max_cycle_nozzle = None for nozzle, head_num in nozzle_heads.items(): if max_cycle_nozzle is None or nozzle_points[nozzle] / head_num > nozzle_points[max_cycle_nozzle] / \ nozzle_heads[max_cycle_nozzle]: max_cycle_nozzle = nozzle assert max_cycle_nozzle is not None nozzle_heads[max_cycle_nozzle] += 1 return nozzle_heads, nozzle_points class LeastCycle(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): machine_cycle = [] for machine_idx, assign_component in enumerate(cp_assign): if len(assign_component) == 0: return machine_idx nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component) machine_cycle.append(max(nozzle_points[nozzle] / head for nozzle, head in nozzle_heads.items())) return np.argmin(machine_cycle) class LeastNzChange(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): machine_nozzle_change = [] for machine_idx, assign_component in enumerate(cp_assign): if len(assign_component) == 0: return machine_idx heads_points = [] nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component) for nozzle, head in nozzle_heads.items(): for _ in range(head): heads_points.append(nozzle_points[nozzle] / nozzle_heads[nozzle]) machine_nozzle_change.append(np.std(heads_points)) return np.argmin(machine_nozzle_change) class LeastPickup(Heuristic): @staticmethod def apply(cp_points, cp_nozzle, cp_assign): machine_pick_up = [] for machine_idx, assign_component in enumerate(cp_assign): if len(assign_component) == 0: return machine_idx nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component) nozzle_level, nozzle_counter = defaultdict(int), defaultdict(int) level_points = defaultdict(int) for cp_idx in sorted(assign_component, key=lambda x: cp_points[x], reverse=True): nozzle, points = cp_nozzle[cp_idx], cp_points[cp_idx] if nozzle_counter[nozzle] and nozzle_counter[nozzle] % nozzle_heads[nozzle] == 0: nozzle_level[nozzle] += 1 level = nozzle_level[nozzle] level_points[level] = max(level_points[level], points) nozzle_counter[nozzle] += 1 machine_pick_up.append(sum(points for points in level_points.values())) return np.argmin(machine_pick_up) def generate_pattern(heuristic_map, cp_points): """ Generates a random pattern. :return: The generated pattern string. """ return "".join([random.choice(list(heuristic_map.keys())) for _ in range(random.randrange(1, len(cp_points)))]) def crossover(parent1, parent2): """ Attempt to perform crossover between two chromosomes. :param parent1: The first parent. :param parent2: The second parent. :return: The two individuals after crossover has been performed. """ point1, point2 = random.randrange(len(parent1)), random.randrange(len(parent2)) substr1, substr2 = parent1[point1:], parent2[point2:] offspring1, offspring2 = "".join((parent1[:point1], substr2)), "".join((parent2[:point2], substr1)) return offspring1, offspring2 def mutation(heuristic_map, cp_points, individual): """ Attempts to mutate the individual by replacing a random heuristic in the chromosome by a generated pattern. :param individual: The individual to mutate. :return: The mutated individual. """ pattern = list(individual) mutation_point = random.randrange(len(pattern)) pattern[mutation_point] = generate_pattern(heuristic_map, cp_points) return ''.join(pattern) def population_initialization(population_size, heuristic_map, cp_points): return [generate_pattern(heuristic_map, cp_points) for _ in range(population_size)] def convert_assignment_result(heuristic_map, cp_points, cp_nozzle, component_list, individual, machine_number): machine_cp_assign = [[] for _ in range(machine_number)] for idx, cp_idx in enumerate(component_list): h = individual[idx % len(individual)] machine_idx = heuristic_map[h].apply(cp_points, cp_nozzle, machine_cp_assign) machine_cp_assign[machine_idx].append(cp_idx) return machine_cp_assign def cal_individual_val(heuristic_map, cp_points, cp_nozzle, board_width, board_height, component_list, individual, machine_number, estimator): machine_cp_assign = convert_assignment_result(heuristic_map, cp_points, cp_nozzle, component_list, individual, machine_number) objective_val = [] for machine_idx in range(machine_number): machine_cp_points, machine_cp_nozzle = defaultdict(int), defaultdict(str) for cp_idx in machine_cp_assign[machine_idx]: machine_cp_points[cp_idx] = cp_points[cp_idx] machine_cp_nozzle[cp_idx] = cp_nozzle[cp_idx] objective_val.append(estimator.neural_network(machine_cp_points, machine_cp_nozzle, board_width, board_height)) # objective_val.append(estimator.heuristic_genetic(machine_cp_points, machine_cp_nozzle)) return objective_val def line_optimizer_hyperheuristic(component_data, pcb_data, machine_number): heuristic_map = { 'p': LeastPoints, 'n': LeastNzChange, 'c': LeastCpTypes, 'r': LeastCpNzRatio, 'k': LeastCycle, 'g': LeastNzChange, 'u': LeastPickup, } # genetic-based hyper-heuristic crossover_rate, mutation_rate = 0.8, 0.1 population_size, n_generations = 20, 100 n_iterations = 10 estimator = Estimator() best_val, best_component_list = None, None best_individual = None division_component_data = pd.DataFrame(columns=component_data.columns) for _, data in component_data.iterrows(): feeder_limit = data['feeder-limit'] data['feeder-limit'], data['points'] = 1, int(data['points'] / data['feeder-limit']) for _ in range(feeder_limit): division_component_data = pd.concat([division_component_data, pd.DataFrame(data).T]) division_component_data = division_component_data.reset_index() component_list = [idx for idx, data in division_component_data.iterrows() if data['points'] > 0] cp_points, cp_nozzle = defaultdict(int), defaultdict(str) for idx, data in division_component_data.iterrows(): cp_points[idx], cp_nozzle[idx] = data['points'], data['nz'] board_width, board_height = pcb_data['x'].max() - pcb_data['x'].min(), pcb_data['y'].max() - pcb_data['y'].min() with tqdm(total=n_generations * n_iterations) as pbar: pbar.set_description('hyper-heuristic algorithm process for PCB assembly line balance') for _ in range(n_iterations): random.shuffle(component_list) new_population = [] population = population_initialization(population_size, heuristic_map, cp_points) # calculate fitness value pop_val = [] for individual in population: val = cal_individual_val(heuristic_map, cp_points, cp_nozzle, board_width, board_height, component_list, individual, machine_number, estimator) pop_val.append(max(val)) for _ in range(n_generations): select_index = get_top_k_value(pop_val, population_size - len(new_population), reverse=False) population = [population[idx] for idx in select_index] pop_val = [pop_val[idx] for idx in select_index] population += new_population for individual in new_population: val = cal_individual_val(heuristic_map, cp_points, cp_nozzle, board_width, board_height, component_list, individual, machine_number, estimator) pop_val.append(max(val)) # min-max convert max_val = max(pop_val) sel_pop_val = list(map(lambda v: max_val - v, pop_val)) sum_pop_val = sum(sel_pop_val) + 1e-10 sel_pop_val = [v / sum_pop_val + 1e-3 for v in sel_pop_val] # crossover and mutation new_population = [] for pop in range(population_size): if pop % 2 == 0 and np.random.random() < crossover_rate: index1 = roulette_wheel_selection(sel_pop_val) while True: index2 = roulette_wheel_selection(sel_pop_val) if index1 != index2: break offspring1, offspring2 = crossover(population[index1], population[index2]) if np.random.random() < mutation_rate: offspring1 = mutation(heuristic_map, cp_points, offspring1) if np.random.random() < mutation_rate: offspring2 = mutation(heuristic_map, cp_points, offspring2) new_population.append(offspring1) new_population.append(offspring2) pbar.update(1) val = cal_individual_val(heuristic_map, cp_points, cp_nozzle, board_width, board_height, component_list, population[0], machine_number, estimator) val = max(val) if best_val is None or val < best_val: best_val = val best_individual = population[0] best_component_list = component_list.copy() machine_cp_points = convert_assignment_result(heuristic_map, cp_points, cp_nozzle, best_component_list, best_individual, machine_number) val = cal_individual_val(heuristic_map, cp_points, cp_nozzle, board_width, board_height, best_component_list, best_individual, machine_number, estimator) print(val) assignment_result = [[0 for _ in range(len(component_data))] for _ in range(machine_number)] for machine_idx in range(machine_number): for cp_idx in machine_cp_points[machine_idx]: idx = division_component_data.iloc[cp_idx]['index'] assignment_result[machine_idx][idx] += cp_points[cp_idx] print(assignment_result) return assignment_result if __name__ == '__main__': warnings.simplefilter(action='ignore', category=FutureWarning) parser = argparse.ArgumentParser(description='network training implementation') parser.add_argument('--train', default=True, type=bool, help='determine whether training the network') parser.add_argument('--save', default=True, type=bool, help='determine whether saving the parameters of network, linear regression model, etc.') parser.add_argument('--overwrite', default=False, type=bool, help='determine whether overwriting the training and testing data') parser.add_argument('--train_file', default='train_data.txt', type=str, help='training file path') parser.add_argument('--test_file', default='test_data.txt', type=str, help='testing file path') parser.add_argument('--num_epochs', default=8000, type=int, help='number of epochs for training process') parser.add_argument('--batch_size', default=10000, type=int, help='size of training batch') parser.add_argument('--lr', default=1e-5, type=float, help='learning rate for the network') params = parser.parse_args() data_mgr = DataMgr() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if params.overwrite: file = {params.train_file: params.batch_size, params.test_file: params.batch_size // data_mgr.get_update_round() // 5} for file_name, file_batch_size in file.items(): with open('opt/' + file_name, 'a') as f: for _ in range(int(file_batch_size)): mode = file_name.split('.')[0].split('_')[0] pcb_data, component_data = data_mgr.generator(mode) # random generate a PCB data # data_mgr.remover() # remove the last saved data # data_mgr.saver('data/' + file_name, pcb_data) # save new data info = base_optimizer(1, pcb_data, component_data, feeder_data=pd.DataFrame(columns=['slot', 'part', 'arg']), method='feeder-scan', hinter=True) data_mgr.recorder(f, info, pcb_data, component_data) f.close() net = Net(input_size=data_mgr.get_feature(), output_size=1).to(device) data = data_mgr.loader('opt/' + params.train_file) if params.train: x_fit, y_fit = np.array(data[2:]).T, np.array([data[1]]).T lr = LinearRegression() lr.fit(x_fit, y_fit) x_train = np.array(data[0][::data_mgr.get_update_round()]) # y_train = lr.predict(x_fit[::data_mgr.get_update_round()]) y_train = np.array(data[1][::data_mgr.get_update_round()]) x_train = torch.from_numpy(x_train.reshape((-1, np.shape(x_train)[1]))).float().to(device) y_train = torch.from_numpy(y_train.reshape((-1, 1))).float().to(device) optimizer = torch.optim.Adam(net.parameters(), lr=params.lr) # scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5000, gamma=0.1) loss_func = torch.nn.MSELoss() for epoch in range(params.num_epochs): pred = net(x_train) loss = loss_func(pred, y_train) optimizer.zero_grad() loss.backward() optimizer.step() # scheduler.step() if epoch % 100 == 0: print('Epoch: ', epoch, ', Loss: ', loss.item()) if loss.item() < 1e-4: break net_predict = net(x_train).view(-1) pred_time, real_time = net_predict.cpu().detach().numpy(), y_train.view(-1).cpu().detach().numpy() pred_error = np.array([]) for t1, t2 in np.nditer([pred_time, real_time]): pred_error = np.append(pred_error, abs(t1 - t2) / (t2 + 1e-10) * 100) print('--------------------------------------') print(f'average prediction error for train data : {np.average(pred_error): .2f}% ') print(f'maximum prediction error for train data : {np.max(pred_error): .2f}% ') mse = np.linalg.norm((net_predict - y_train.view(-1)).cpu().detach().numpy()) print(f'mean square error for training data result : {mse: 2f} ') if params.save: if not os.path.exists('model'): os.mkdir('model') torch.save(net.state_dict(), 'model/net_model.pth') with open('model/lr_model.pkl', 'wb') as f: pickle.dump(lr, f) torch.save(optimizer.state_dict(), 'model/optimizer_state.pth') else: net.load_state_dict(torch.load('model/net_model.pth')) # optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate) # optimizer.load_state_dict(torch.load('model/optimizer_state.pth')) data = data_mgr.loader('opt/' + params.test_file) x_test, y_test = np.array(data[0]), np.array(data[1]) # x_test, y_test = np.array(data[0]), lr.predict(np.array(data[2:]).T) x_test = torch.from_numpy(x_test.reshape((-1, np.shape(x_test)[1]))).float().to(device) net.eval() with torch.no_grad(): pred_time = net(x_test).view(-1).cpu().detach().numpy() x_test = x_test.cpu().detach().numpy() over_set = [] pred_idx, pred_error = 0, np.array([]) for t1, t2 in np.nditer([pred_time, y_test.reshape(-1)]): pred_error = np.append(pred_error, abs(t1 - t2) / (t2 + 1e-10) * 100) if pred_error[-1] > 5: over_set.append(pred_idx + 1) print(f'\033[0;31;31midx: {pred_idx + 1: d}, net: {t1: .3f}, real: {t2: .3f}, ' f'gap: {pred_error[-1]: .3f}\033[0m') else: pass # print(f'idx: {pred_idx + 1: d}, net: {t1: .3f}, real: {t2: .3f}, gap: {pred_error[-1]: .3f}') pred_idx += 1 print('over:', over_set) print('size:', len(over_set)) print('--------------------------------------') print(f'average prediction error for test data : {np.average(pred_error): .3f}% ') print(f'maximum prediction error for test data : {np.max(pred_error): .3f}% ') mse = np.linalg.norm(pred_time - y_test.reshape(-1)) print(f'mean square error for test data result : {mse: 2f} ')