from functools import wraps from collections import defaultdict from tqdm import tqdm from gurobipy import * from sklearn.linear_model import LinearRegression from sklearn.svm import SVR import os import time import math import random import copy import torch import torch.nn import argparse import joblib import pickle import warnings import heapq import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib import traceback matplotlib.use('TkAgg') # 机器参数 max_head_index, max_slot_index = 6, 120 interval_ratio = 2 slot_interval = 15 head_interval = slot_interval * interval_ratio head_nozzle = ['' for _ in range(max_head_index)] # 头上已经分配吸嘴 # 位置信息 slotf1_pos, slotr1_pos = [-31.267, 44.], [807., 810.545] # F1(前基座最左侧)、R1(后基座最右侧)位置 fix_camera_pos = [269.531, 694.823] # 固定相机位置 anc_marker_pos = [336.457, 626.230] # ANC基准点位置 stopper_pos = [665.150, 124.738] # 止档块位置 # 算法权重参数 e_nz_change, e_gang_pick = 4, 0.6 # 电机参数 head_rotary_velocity = 8e-5 # 贴装头R轴旋转时间 x_max_velocity, y_max_velocity = 1.4, 1.2 x_max_acceleration, y_max_acceleration = x_max_velocity / 0.079, y_max_velocity / 0.079 # TODO: 不同种类供料器宽度 feeder_width = {'SM8': (7.25, 7.25), 'SM12': (7.25, 7.25), 'SM16': (7.25, 7.25), 'SM24': (7.25, 7.25), 'SM32': (7.25, 7.25)} # feeder_width = {'SM8': (7.25, 7.25), 'SM12': (7.00, 20.00), 'SM16': (7.00, 22.00), # 'SM24': (7.00, 29.00), 'SM32': (7.00, 44.00)} # 可用吸嘴数量限制 nozzle_limit = {'CN065': 6, 'CN040': 6, 'CN020': 6, 'CN400': 6, 'CN140': 6} # 时间参数 t_cycle = 0.3 t_anc = 0.6 t_pick, t_place = .078, .051 # 贴装/拾取用时 t_nozzle_put, t_nozzle_pick = 0.9, 0.75 # 装卸吸嘴用时 t_nozzle_change = t_nozzle_put + t_nozzle_pick t_fix_camera_check = 0.12 # 固定相机检测时间 # 时间参数(整线相关) T_pp, T_tr, T_nc, T_pl = 2, 5, 25, 0 # 时间参数 (数据拟合获得) Fit_cy, Fit_nz, Fit_pu, Fit_pl, Fit_mv = 0.326, 0.8694, 0.159, 0.041, 0.001 class OptResult: def __init__(self, cp_assign=None, cycle_assign=None, slot_assign=None, place_assign=None, sequence_assign=None): self.component_assign = [] if cp_assign is None else cp_assign self.cycle_assign = [] if cycle_assign is None else cycle_assign self.feeder_slot_assign = [] if slot_assign is None else slot_assign self.placement_assign = [] if place_assign is None else place_assign self.head_sequence = [] if sequence_assign is None else sequence_assign class OptInfo: def __init__(self): self.total_time = .0 # 总组装时间 self.total_points = 0 # 总贴装点数 self.total_components = 0 # 总元件数 self.pickup_time = .0 # 拾取过程运动时间 self.round_time = .0 # 往返基座/基板运动时间 self.place_time = .0 # 贴装过程运动时间 self.operation_time = .0 # 拾取/贴装/换吸嘴等机械动作用时 self.cycle_counter = 0 # 周期数 self.nozzle_change_counter = 0 # 吸嘴更换次数 self.anc_round_counter = 0 # 前往ANC次数 self.pickup_counter = 0 # 拾取次数 self.total_distance = .0 # 总移动路径 self.place_distance = .0 # 贴装移动路径 self.pickup_distance = .0 # 拾取移动路径 def print(self): print('-Cycle counter: {}'.format(self.cycle_counter)) print(f'-Nozzle change counter: {self.nozzle_change_counter: d}') print(f'-ANC round: {self.anc_round_counter: d}') print(f'-Pick operation counter: {self.pickup_counter: d}') print(f'-Pick time: {self.pickup_time: .3f}, Pick distance: {self.pickup_distance: .3f}') print(f'-Place time: {self.place_time: .3f}, Place distance: {self.place_distance: .3f}') print( f'-Round time: {self.total_time - self.place_time - self.place_time: .3f}, Place distance: ' f'{self.total_distance - self.pickup_distance - self.place_distance: .3f}') minutes, seconds = int(self.total_time // 60), int(self.total_time) % 60 millisecond = int((self.total_time - minutes * 60 - seconds) * 60) print(f'-Operation time: {self.operation_time: .3f}, ', end='') if minutes > 0: print(f'Total time: {minutes: d} min {seconds} s {millisecond: 2d} ms ({self.total_time: .3f}s)') else: print(f'Total time: {seconds} s {millisecond :2d} ms ({self.total_time :.3f}s)') def metric(self): return Fit_cy * self.cycle_counter + Fit_nz * self.nozzle_change_counter + Fit_pu * self.pickup_counter + \ Fit_pl * self.total_points + Fit_mv * self.pickup_distance def axis_moving_time(distance, axis=0): distance = abs(distance) * 1e-3 Lamax = x_max_velocity ** 2 / x_max_acceleration if axis == 0 else y_max_velocity ** 2 / y_max_acceleration Tmax = x_max_velocity / x_max_acceleration if axis == 0 else y_max_velocity / y_max_acceleration if axis == 0: return 2 * math.sqrt(distance / x_max_acceleration) if distance < Lamax else 2 * Tmax + ( distance - Lamax) / x_max_velocity else: return 2 * math.sqrt(distance / y_max_acceleration) if distance < Lamax else 2 * Tmax + ( distance - Lamax) / y_max_velocity def head_rotary_time(angle): while -180 > angle > 180: if angle > 180: angle -= 360 else: angle += 360 return abs(angle) * head_rotary_velocity def find_commonpart(head_group, feeder_group): feeder_group_len = len(feeder_group) max_length, max_common_part = -1, [] for offset in range(-max_head_index + 1, feeder_group_len - 1): # offset: head_group相对于feeder_group的偏移量 length, common_part = 0, [] for hd_index in range(max_head_index): fd_index = hd_index + offset if fd_index < 0 or fd_index >= feeder_group_len: common_part.append(-1) continue if head_group[hd_index] == feeder_group[fd_index] and head_group[hd_index] != -1: length += 1 common_part.append(head_group[hd_index]) else: common_part.append(-1) if length > max_length: max_length = length max_common_part = common_part return max_common_part def timer_wrapper(func): @wraps(func) def measure_time(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) hinter = True for key, val in kwargs.items(): if key == 'hinter': hinter = val if hinter: print(f"function {func.__name__} running time : {time.time() - start_time:.3f} s") return result return measure_time def feeder_assignment(component_data, pcb_data, component_result, cycle_result): # Section: 供料器分配结果 feeder_slot_result, feeder_group_result = [], [] feeder_limit = defaultdict(int) for component in range(len(component_data)): feeder_limit[component] = component_data.loc[component].fdn for component_cycle in component_result: new_feeder_group = [] for component in component_cycle: if component == -1 or feeder_limit[component] == 0 or new_feeder_group.count(component) >= feeder_limit[ component]: new_feeder_group.append(-1) else: new_feeder_group.append(component) if len(new_feeder_group) == 0: continue while sum(i >= 0 for i in new_feeder_group) != 0: max_common_part, index = [], -1 max_common_length = -1 for feeder_index in range(len(feeder_group_result)): common_part = find_commonpart(new_feeder_group, feeder_group_result[feeder_index]) if sum(i > 0 for i in common_part) > max_common_length: max_common_length = sum(i > 0 for i in common_part) max_common_part, index = common_part, feeder_index new_feeder_length = 0 for feeder in new_feeder_group: if feeder != -1 and feeder_limit[feeder] > 0: new_feeder_length += 1 if new_feeder_length > max_common_length: # 新分配供料器 feeder_group_result.append([]) for feeder_index in range(len(new_feeder_group)): feeder = new_feeder_group[feeder_index] if feeder != -1 and feeder_limit[feeder] > 0: feeder_group_result[-1].append(feeder) new_feeder_group[feeder_index] = -1 feeder_limit[feeder] -= 1 else: feeder_group_result[-1].append(-1) else: # 使用旧供料器 for feeder_index, feeder_part in enumerate(max_common_part): if feeder_part != -1: new_feeder_group[feeder_index] = -1 # 去除多余的元素 for feeder_group in feeder_group_result: while len(feeder_group) > 0 and feeder_group[0] == -1: feeder_group.pop(0) while len(feeder_group) > 0 and feeder_group[-1] == -1: feeder_group.pop(-1) # 确定供料器组的安装位置 point_num = len(pcb_data) component_pos = [[] for _ in range(len(component_data))] for point_cnt in range(point_num): part = pcb_data.loc[point_cnt, 'part'] index = np.where(component_data['part'].values == part)[0] component_pos[index[0]].append(pcb_data.loc[point_cnt, 'x'] + stopper_pos[0]) # 元件使用的头 CT_Head = defaultdict(list) for component_cycle in component_result: for head, component in enumerate(component_cycle): if component == -1: continue if component not in CT_Head: CT_Head[component] = [head, head] CT_Head[component][0] = min(CT_Head[component][0], head) CT_Head[component][1] = max(CT_Head[component][1], head) # 供料器组分配的优先顺序 feeder_assign_sequence = [] for i in range(len(feeder_group_result)): for j in range(len(feeder_group_result)): if j in feeder_assign_sequence: continue if len(feeder_assign_sequence) == i: feeder_assign_sequence.append(j) else: seq = feeder_assign_sequence[-1] if cycle_result[seq] * len([k for k in feeder_group_result[seq] if k >= 0]) < cycle_result[j] * len( [k for k in feeder_group_result[seq] if k >= 0]): feeder_assign_sequence.pop(-1) feeder_assign_sequence.append(j) # TODO: 暂未考虑机械限位 feeder_group_slot = [-1] * len(feeder_group_result) feeder_lane_state = [0] * max_slot_index # 0表示空,1表示已占有 for index in feeder_assign_sequence: feeder_group = feeder_group_result[index] best_slot = [] for cp_index, component in enumerate(feeder_group): if component == -1: continue best_slot.append(round((sum(component_pos[component]) / len(component_pos[component]) - slotf1_pos[ 0]) / slot_interval) + 1 - cp_index * interval_ratio) best_slot = round(sum(best_slot) / len(best_slot)) search_dir, step = 0, 0 # dir: 1-向右, 0-向左 left_out_range, right_out_range = False, False while True: assign_slot = best_slot + step if search_dir else best_slot - step # 出现越界,反向搜索 if assign_slot + (len(feeder_group) - 1) * interval_ratio >= max_slot_index / 2: right_out_range = True search_dir = 0 step += 1 elif assign_slot < 0: left_out_range = True search_dir = 1 step += 1 else: if left_out_range or right_out_range: step += 1 # 单向搜索 else: search_dir = 1 - search_dir # 双向搜索 if search_dir == 0: step += 1 assign_available = True # === 分配对应槽位 === for slot in range(assign_slot, assign_slot + interval_ratio * len(feeder_group), interval_ratio): feeder_index = int((slot - assign_slot) / interval_ratio) pick_part = feeder_group[feeder_index] if feeder_lane_state[slot] == 1 and pick_part != -1: assign_available = False break if pick_part != -1 and (slot - CT_Head[pick_part][0] * interval_ratio <= 0 or slot + (max_head_index - CT_Head[pick_part][1] - 1) * interval_ratio > max_slot_index // 2): assign_available = False break if assign_available: for idx, part in enumerate(feeder_group): if part != -1: feeder_lane_state[assign_slot + idx * interval_ratio] = 1 feeder_group_slot[index] = assign_slot break if feeder_group_slot[index] == -1: raise Exception('feeder assign error!') # 按照最大匹配原则,确定各元件周期拾取槽位 for component_cycle in component_result: feeder_slot_result.append([-1] * max_head_index) head_index = [head for head, component in enumerate(component_cycle) if component >= 0] while head_index: max_overlap_counter = 0 overlap_feeder_group_index, overlap_feeder_group_offset = -1, -1 for feeder_group_idx, feeder_group in enumerate(feeder_group_result): # offset 头1 相对于 供料器组第一个元件的偏移量 for offset in range(-max_head_index + 1, max_head_index + len(feeder_group)): overlap_counter = 0 for head in head_index: if 0 <= head + offset < len(feeder_group) and component_cycle[head] == \ feeder_group[head + offset]: overlap_counter += 1 if overlap_counter > max_overlap_counter: max_overlap_counter = overlap_counter overlap_feeder_group_index, overlap_feeder_group_offset = feeder_group_idx, offset feeder_group = feeder_group_result[overlap_feeder_group_index] head_index_cpy = copy.deepcopy(head_index) for idx, head in enumerate(head_index_cpy): if 0 <= head + overlap_feeder_group_offset < len(feeder_group) and component_cycle[head] == \ feeder_group[head + overlap_feeder_group_offset]: feeder_slot_result[-1][head] = feeder_group_slot[overlap_feeder_group_index] + interval_ratio * ( head + overlap_feeder_group_offset) head_index.remove(head) return feeder_slot_result def dynamic_programming_cycle_path(pcb_data, cycle_placement, assigned_feeder): head_sequence = [] num_pos = sum([placement != -1 for placement in cycle_placement]) + 1 pos, head_set = [], [] feeder_set = set() for head, feeder in enumerate(assigned_feeder): if feeder == -1: continue head_set.append(head) placement = cycle_placement[head] if feeder != -1 and placement == -1: print(assigned_feeder) print(cycle_placement) pos.append([pcb_data.iloc[placement]['x'] - head * head_interval + stopper_pos[0], pcb_data.iloc[placement]['y'] + stopper_pos[1]]) feeder_set.add(feeder - head * interval_ratio) pos.insert(0, [slotf1_pos[0] + ((min(list(feeder_set)) + max(list(feeder_set))) / 2 - 1) * slot_interval, slotf1_pos[1]]) def get_distance(pos_1, pos_2): return math.sqrt((pos_1[0] - pos_2[0]) ** 2 + (pos_1[1] - pos_2[1]) ** 2) # 各节点之间的距离 dist = [[get_distance(pos_1, pos_2) for pos_2 in pos] for pos_1 in pos] min_dist = [[np.inf for _ in range(num_pos)] for s in range(1 << num_pos)] min_path = [[[] for _ in range(num_pos)] for s in range(1 << num_pos)] # 状压dp搜索 for s in range(1, 1 << num_pos, 2): # 考虑节点集合s必须包括节点0 if not (s & 1): continue for j in range(1, num_pos): # 终点j需在当前考虑节点集合s内 if not (s & (1 << j)): continue if s == int((1 << j) | 1): # 若考虑节点集合s仅含节点0和节点j,dp边界,赋予初值 # print('j:', j) min_path[s][j] = [j] min_dist[s][j] = dist[0][j] # 枚举下一个节点i,更新 for i in range(1, num_pos): # 下一个节点i需在考虑节点集合s外 if s & (1 << i): continue if min_dist[s][j] + dist[j][i] < min_dist[s | (1 << i)][i]: min_path[s | (1 << i)][i] = min_path[s][j] + [i] min_dist[s | (1 << i)][i] = min_dist[s][j] + dist[j][i] ans_dist = float('inf') ans_path = [] # 求最终最短哈密顿回路 for i in range(1, num_pos): if min_dist[(1 << num_pos) - 1][i] + dist[i][0] < ans_dist: # 更新,回路化 ans_path = min_path[s][i] ans_dist = min_dist[(1 << num_pos) - 1][i] + dist[i][0] for parent in ans_path: head_sequence.append(head_set[parent - 1]) start_head, end_head = head_sequence[0], head_sequence[-1] if pcb_data.iloc[cycle_placement[start_head]]['x'] - start_head * head_interval > \ pcb_data.iloc[cycle_placement[end_head]]['x'] - end_head * head_interval: head_sequence = list(reversed(head_sequence)) return head_sequence @timer_wrapper def greedy_placement_route_generation(component_data, pcb_data, component_result, cycle_result, feeder_slot_result, hinter=True): placement_result, head_sequence_result = [], [] if len(pcb_data) == 0: return placement_result, head_sequence_result mount_point_index = [[] for _ in range(len(component_data))] mount_point_pos = [[] for _ in range(len(component_data))] for i in range(len(pcb_data)): part = pcb_data.iloc[i]['part'] component_index = component_data[component_data['part'] == part].index.tolist()[0] # 记录贴装点序号索引和对应的位置坐标 mount_point_index[component_index].append(i) mount_point_pos[component_index].append([pcb_data.iloc[i]['x'], pcb_data.iloc[i]['y']]) search_dir = 1 # 0:自左向右搜索 1:自右向左搜索 for cycle_set in range(len(component_result)): floor_cycle, ceil_cycle = sum(cycle_result[:cycle_set]), sum(cycle_result[:(cycle_set + 1)]) for cycle in range(floor_cycle, ceil_cycle): if sum(component_result[cycle_set]) == -max_head_index: continue # search_dir = 1 - search_dir assigned_placement = [-1] * max_head_index max_pos = [max(mount_point_pos[component_index], key=lambda x: x[0]) for component_index in range(len(mount_point_pos)) if len(mount_point_pos[component_index]) > 0][0][0] min_pos = [min(mount_point_pos[component_index], key=lambda x: x[0]) for component_index in range(len(mount_point_pos)) if len(mount_point_pos[component_index]) > 0][0][0] point2head_range = min(math.floor((max_pos - min_pos) / head_interval) + 1, max_head_index) # 最近邻确定 way_point = None head_range = range(max_head_index - 1, -1, -1) if search_dir else range(max_head_index) for head_counter, head in enumerate(head_range): if component_result[cycle_set][head] == -1: continue component_index = component_result[cycle_set][head] if way_point is None or head_counter % point2head_range == 0: index = 0 if way_point is None: if search_dir: index = np.argmax(mount_point_pos[component_index], axis=0)[0] else: index = np.argmin(mount_point_pos[component_index], axis=0)[0] else: for next_head in head_range: component_index = component_result[cycle_set][next_head] if assigned_placement[next_head] == -1 and component_index != -1: num_points = len(mount_point_pos[component_index]) index = np.argmin( [abs(mount_point_pos[component_index][i][0] - way_point[0]) * .1 + abs( mount_point_pos[component_index][i][1] - way_point[1]) for i in range(num_points)]) head = next_head break # index = np.argmax(mount_point_pos[component_index], axis=0)[0] assigned_placement[head] = mount_point_index[component_index][index] # 记录路标点 way_point = mount_point_pos[component_index][index] way_point[0] += (max_head_index - head - 1) * head_interval if search_dir else -head * head_interval mount_point_index[component_index].pop(index) mount_point_pos[component_index].pop(index) else: head_index, point_index = -1, -1 min_cheby_distance, min_euler_distance = float('inf'), float('inf') for next_head in range(max_head_index): if assigned_placement[next_head] != -1 or component_result[cycle_set][next_head] == -1: continue next_comp_index = component_result[cycle_set][next_head] for counter in range(len(mount_point_pos[next_comp_index])): if search_dir: delta_x = abs(mount_point_pos[next_comp_index][counter][0] - way_point[0] + (max_head_index - next_head - 1) * head_interval) else: delta_x = abs(mount_point_pos[next_comp_index][counter][0] - way_point[0] - next_head * head_interval) delta_y = abs(mount_point_pos[next_comp_index][counter][1] - way_point[1]) euler_distance = pow(axis_moving_time(delta_x, 0), 2) + pow(axis_moving_time(delta_y, 1), 2) cheby_distance = max(axis_moving_time(delta_x, 0), axis_moving_time(delta_y, 1)) + 5e-2 * euler_distance if cheby_distance < min_cheby_distance or (abs(cheby_distance - min_cheby_distance) < 1e-9 and euler_distance < min_euler_distance): # if euler_distance < min_euler_distance: min_cheby_distance, min_euler_distance = cheby_distance, euler_distance head_index, point_index = next_head, counter component_index = component_result[cycle_set][head_index] assert (0 <= head_index < max_head_index) assigned_placement[head_index] = mount_point_index[component_index][point_index] way_point = mount_point_pos[component_index][point_index] way_point[0] += (max_head_index - head_index - 1) * head_interval if search_dir \ else -head_index * head_interval mount_point_index[component_index].pop(point_index) mount_point_pos[component_index].pop(point_index) placement_result.append(assigned_placement) # 各个头上贴装的元件类型 head_sequence_result.append( dynamic_programming_cycle_path(pcb_data, assigned_placement, feeder_slot_result[cycle_set])) return placement_result, head_sequence_result @timer_wrapper def beam_search_for_route_generation(component_data, pcb_data, component_result, cycle_result, feeder_slot_result): beam_width = 4 # 集束宽度 base_points = [float('inf'), float('inf')] mount_point_index = [[] for _ in range(len(component_data))] mount_point_pos = [[] for _ in range(len(component_data))] for i in range(len(pcb_data)): part = pcb_data.loc[i]['part'] component_index = component_data[component_data['part'] == part].index.tolist()[0] # 记录贴装点序号索引和对应的位置坐标 mount_point_index[component_index].append(i) mount_point_pos[component_index].append([pcb_data.loc[i]['x'], pcb_data.loc[i]['y']]) # 记录最左下角坐标 if mount_point_pos[component_index][-1][0] < base_points[0]: base_points[0] = mount_point_pos[component_index][-1][0] if mount_point_pos[component_index][-1][1] < base_points[1]: base_points[1] = mount_point_pos[component_index][-1][1] beam_placement_sequence, beam_head_sequence = [], [] beam_mount_point_index, beam_mount_point_pos = [], [] for beam_counter in range(beam_width): beam_mount_point_index.append(copy.deepcopy(mount_point_index)) beam_mount_point_pos.append(copy.deepcopy(mount_point_pos)) beam_placement_sequence.append([]) beam_head_sequence.append([]) beam_distance = [0 for _ in range(beam_width)] # 记录当前集束搜索点的点数 def argpartition(list, kth): if kth < len(list): return np.argpartition(list, kth) else: index, indexes = 0, [] while len(indexes) < kth: indexes.append(index) index += 1 if index >= len(list): index = 0 return np.array(indexes) with tqdm(total=100) as pbar: search_dir = 0 pbar.set_description('route schedule') for cycle_set in range(len(component_result)): floor_cycle, ceil_cycle = sum(cycle_result[:cycle_set]), sum(cycle_result[:(cycle_set + 1)]) for cycle in range(floor_cycle, ceil_cycle): search_dir = 1 - search_dir beam_way_point = None for beam_counter in range(beam_width): beam_placement_sequence[beam_counter].append([-1 for _ in range(max_head_index)]) head_range = range(max_head_index - 1, -1, -1) if search_dir else range(max_head_index) for head in head_range: component_index = component_result[cycle_set][head] if component_index == -1: continue if beam_way_point is None: # 首个贴装点的选取,距离基准点最近的beam_width个点 beam_way_point = [[0, 0]] * beam_width for beam_counter in range(beam_width): if search_dir: index = np.argmax(beam_mount_point_pos[beam_counter][component_index], axis=0)[0] else: index = np.argmin(beam_mount_point_pos[beam_counter][component_index], axis=0)[0] beam_placement_sequence[beam_counter][-1][head] = beam_mount_point_index[beam_counter][component_index][index] beam_way_point[beam_counter] = beam_mount_point_pos[beam_counter][component_index][index] beam_way_point[beam_counter][0] += (max_head_index - head - 1) * head_interval if \ search_dir else -head * head_interval beam_mount_point_index[beam_counter][component_index].pop(index) beam_mount_point_pos[beam_counter][component_index].pop(index) else: # 后续贴装点 search_beam_distance = [] search_beam_index = [0] * (beam_width ** 2) for beam_counter in range(beam_width ** 2): search_beam_distance.append(beam_distance[beam_counter // beam_width]) for beam_counter in range(beam_width): # 对于集束beam_counter + 1最近的beam_width个点 num_points = len(beam_mount_point_pos[beam_counter][component_index]) dist = [] for i in range(num_points): if search_dir: delta_x = axis_moving_time( beam_mount_point_pos[beam_counter][component_index][i][0] - beam_way_point[beam_counter][0] + (max_head_index - head - 1) * head_interval, 0) else: delta_x = axis_moving_time( beam_mount_point_pos[beam_counter][component_index][i][0] - beam_way_point[beam_counter][0] - head * head_interval, 0) delta_y = axis_moving_time(beam_mount_point_pos[beam_counter][component_index][i][1] - beam_way_point[beam_counter][1], 1) dist.append(max(delta_x, delta_y)) indexes = argpartition(dist, kth=beam_width)[:beam_width] # 记录中间信息 for i, index in enumerate(indexes): search_beam_distance[i + beam_counter * beam_width] += dist[index] search_beam_index[i + beam_counter * beam_width] = index indexes = np.argsort(search_beam_distance) beam_mount_point_pos_cpy = copy.deepcopy(beam_mount_point_pos) beam_mount_point_index_cpy = copy.deepcopy(beam_mount_point_index) beam_placement_sequence_cpy = copy.deepcopy(beam_placement_sequence) beam_head_sequence_cpy = copy.deepcopy(beam_head_sequence) beam_counter = 0 assigned_placement = [] for i, index in enumerate(indexes): # 拷贝原始集束数据 beam_mount_point_pos[beam_counter] = copy.deepcopy(beam_mount_point_pos_cpy[index // beam_width]) beam_mount_point_index[beam_counter] = copy.deepcopy(beam_mount_point_index_cpy[index // beam_width]) beam_placement_sequence[beam_counter] = copy.deepcopy(beam_placement_sequence_cpy[index // beam_width]) beam_head_sequence[beam_counter] = copy.deepcopy(beam_head_sequence_cpy[index // beam_width]) # 更新各集束最新扫描的的贴装点 component_index = component_result[cycle_set][head] beam_placement_sequence[beam_counter][-1][head] = \ beam_mount_point_index[beam_counter][component_index][search_beam_index[index]] if beam_placement_sequence[beam_counter][ -1] in assigned_placement and beam_width - beam_counter < len(indexes) - i: continue assigned_placement.append(beam_placement_sequence[beam_counter][-1]) # 更新参考基准点 beam_way_point[beam_counter] = beam_mount_point_pos[beam_counter][component_index][search_beam_index[index]] beam_way_point[beam_counter][0] += (max_head_index - head - 1) * head_interval if \ search_dir else -head * head_interval # 更新各集束贴装路径长度,移除各集束已分配的贴装点 beam_distance[beam_counter] = search_beam_distance[index] beam_mount_point_pos[beam_counter][component_index].pop(search_beam_index[index]) beam_mount_point_index[beam_counter][component_index].pop(search_beam_index[index]) beam_counter += 1 if beam_counter >= beam_width: break assert(beam_counter >= beam_width) # 更新头贴装顺序 for beam_counter in range(beam_width): beam_head_sequence[beam_counter].append( dynamic_programming_cycle_path(pcb_data, beam_placement_sequence[beam_counter][-1], feeder_slot_result[cycle_set])) pbar.update(1 / sum(cycle_result) * 100) index = np.argmin(beam_distance) return beam_placement_sequence[index], beam_head_sequence[index] def optimal_nozzle_assignment(component_data, pcb_data): # === Nozzle Assignment === # number of points for nozzle & number of heads for nozzle nozzle_points, nozzle_assigned_counter = defaultdict(int), defaultdict(int) if len(pcb_data) == 0: return nozzle_assigned_counter for _, step in pcb_data.iterrows(): part = step['part'] idx = component_data[component_data['part'] == part].index.tolist()[0] nozzle = component_data.loc[idx]['nz'] nozzle_assigned_counter[nozzle] = 0 nozzle_points[nozzle] += 1 assert len(nozzle_points.keys()) <= max_head_index total_points, available_head = len(pcb_data), max_head_index # S1: set of nozzle types which are sufficient to assign one nozzle to the heads # S2: temporary nozzle set # S3: set of nozzle types which already have the maximum reasonable nozzle amounts. S1, S2, S3 = [], [], [] for nozzle in nozzle_points.keys(): # Phase 1 if nozzle_points[nozzle] * max_head_index < total_points: nozzle_assigned_counter[nozzle] = 1 available_head -= 1 total_points -= nozzle_points[nozzle] S1.append(nozzle) else: S2.append(nozzle) available_head_ = available_head # Phase 2 for nozzle in S2: nozzle_assigned_counter[nozzle] = math.floor(available_head * nozzle_points[nozzle] / total_points) available_head_ = available_head_ - nozzle_assigned_counter[nozzle] S2.sort(key=lambda x: nozzle_points[x] / (nozzle_assigned_counter[x] + 1e-10), reverse=True) while available_head_ > 0: nozzle = S2[0] nozzle_assigned_counter[nozzle] += 1 S2.remove(nozzle) S3.append(nozzle) available_head_ -= 1 phase_iteration = len(S2) - 1 while phase_iteration > 0: # Phase 3 nozzle_i_val, nozzle_j_val = 0, 0 nozzle_i, nozzle_j = None, None for nozzle in S2: if nozzle_i is None or nozzle_points[nozzle] / nozzle_assigned_counter[nozzle] > nozzle_i_val: nozzle_i_val = nozzle_points[nozzle] / nozzle_assigned_counter[nozzle] nozzle_i = nozzle if nozzle_assigned_counter[nozzle] > 1: if nozzle_j is None or nozzle_points[nozzle] / (nozzle_assigned_counter[nozzle] - 1) < nozzle_j_val: nozzle_j_val = nozzle_points[nozzle] / (nozzle_assigned_counter[nozzle] - 1) nozzle_j = nozzle if nozzle_i and nozzle_j and nozzle_points[nozzle_j] / (nozzle_assigned_counter[nozzle_j] - 1) < \ nozzle_points[nozzle_i] / nozzle_assigned_counter[nozzle_i]: nozzle_assigned_counter[nozzle_j] -= 1 nozzle_assigned_counter[nozzle_i] += 1 S2.remove(nozzle_i) S3.append(nozzle_i) else: break return nozzle_assigned_counter # === 遗传算法公用函数 === def sigma_scaling(pop_val, c: float): # function: f' = max(f - (avg(f) - c · sigma(f), 0) avg_val = sum(pop_val) / len(pop_val) sigma_val = math.sqrt(sum(abs(v - avg_val) for v in pop_val) / len(pop_val)) for idx, val in enumerate(pop_val): pop_val[idx] = max(val - (avg_val - c * sigma_val), 0) return pop_val def directed_edge_recombination_crossover(c, individual1, individual2): assert len(individual1) == len(individual2) left_edge_list, right_edge_list = defaultdict(list), defaultdict(list) for index in range(len(individual1) - 1): elem1, elem2 = individual1[index], individual1[index + 1] right_edge_list[elem1].append(elem2) left_edge_list[elem2].append(elem1) for index in range(len(individual2) - 1): elem1, elem2 = individual2[index], individual2[index + 1] right_edge_list[elem1].append(elem2) left_edge_list[elem2].append(elem1) offspring = [] while len(offspring) != len(individual1): while True: center_element = np.random.choice(individual1) if center_element not in offspring: # 避免重复选取 break direction, candidate = 1, [center_element] parent = center_element for edge_list in left_edge_list.values(): while parent in edge_list: edge_list.remove(parent) for edge_list in right_edge_list.values(): while parent in edge_list: edge_list.remove(parent) while True: max_len, max_len_neighbor = -1, 0 if direction == 1: if len(right_edge_list[parent]) == 0: direction, parent = -1, center_element continue for neighbor in right_edge_list[parent]: if max_len < len(right_edge_list[neighbor]): max_len_neighbor = neighbor max_len = len(right_edge_list[neighbor]) candidate.append(max_len_neighbor) parent = max_len_neighbor elif direction == -1: if len(left_edge_list[parent]) == 0: direction, parent = 0, center_element continue for neighbor in left_edge_list[parent]: if max_len < len(left_edge_list[neighbor]): max_len_neighbor = neighbor max_len = len(left_edge_list[neighbor]) candidate.insert(0, max_len_neighbor) parent = max_len_neighbor else: break # 移除重复元素 for edge_list in left_edge_list.values(): while max_len_neighbor in edge_list: edge_list.remove(max_len_neighbor) for edge_list in right_edge_list.values(): while max_len_neighbor in edge_list: edge_list.remove(max_len_neighbor) offspring += candidate return offspring def partially_mapped_crossover(parent1, parent2): range_ = np.random.randint(0, len(parent1), 2) # 前闭后开 range_ = sorted(range_) parent1_cpy, parent2_cpy = [-1 for _ in range(len(parent1))], [-1 for _ in range(len(parent2))] parent1_cpy[range_[0]: range_[1] + 1] = copy.deepcopy(parent2[range_[0]: range_[1] + 1]) parent2_cpy[range_[0]: range_[1] + 1] = copy.deepcopy(parent1[range_[0]: range_[1] + 1]) for index in range(len(parent1)): if range_[0] <= index <= range_[1]: continue cur_ptr, cur_elem = 0, parent1[index] while True: parent1_cpy[index] = cur_elem if parent1_cpy.count(cur_elem) == 1: break parent1_cpy[index] = -1 if cur_ptr == 0: cur_ptr, cur_elem = 1, parent2[index] else: index_ = parent1_cpy.index(cur_elem) cur_elem = parent2[index_] for index in range(len(parent2)): if range_[0] <= index <= range_[1]: continue cur_ptr, cur_elem = 0, parent2[index] while True: parent2_cpy[index] = cur_elem if parent2_cpy.count(cur_elem) == 1: break parent2_cpy[index] = -1 if cur_ptr == 0: cur_ptr, cur_elem = 1, parent1[index] else: index_ = parent2_cpy.index(cur_elem) cur_elem = parent1[index_] return parent1_cpy, parent2_cpy def cycle_crossover(parent1, parent2): offspring1, offspring2 = [-1 for _ in range(len(parent1))], [-1 for _ in range(len(parent2))] idx = 0 while True: if offspring1[idx] != -1: break offspring1[idx] = parent1[idx] idx = parent1.index(parent2[idx]) for idx, gene in enumerate(offspring1): if gene == -1: offspring1[idx] = parent2[idx] idx = 0 while True: if offspring2[idx] != -1: break offspring2[idx] = parent2[idx] idx = parent2.index(parent1[idx]) for idx, gene in enumerate(offspring2): if gene == -1: offspring2[idx] = parent1[idx] return offspring1, offspring2 def swap_mutation(parent): range_ = np.random.randint(0, len(parent), 2) parent[range_[0]], parent[range_[1]] = parent[range_[1]], parent[range_[0]] return parent def constraint_swap_mutation(component_points, individual, machine_number): offspring = individual.copy() idx, component_index = 0, random.randint(0, len(component_points) - 1) for points in component_points.values(): if component_index == 0: while True: index1, index2 = random.sample(range(points + machine_number - 1), 2) if offspring[idx + index1] != offspring[idx + index2]: break clip = offspring[idx: idx + points + machine_number - 1].copy() avl_machine = 0 for idx_, gene in enumerate(clip): if gene == 0 and (idx_ == 0 or clip[idx_ - 1] != 0): avl_machine += 1 clip[index1], clip[index2] = clip[index2], clip[index1] for idx_, gene in enumerate(clip): if gene == 0 and (idx_ == 0 or clip[idx_ - 1] != 0): avl_machine -= 1 if avl_machine != 0: return offspring offspring[idx + index1], offspring[idx + index2] = offspring[idx + index2], offspring[idx + index1] break component_index -= 1 idx += (points + machine_number - 1) return offspring def random_selective(data, possibility): # 依概率选择随机数 assert len(data) == len(possibility) and len(data) > 0 sum_val = sum(possibility) possibility = [p / sum_val for p in possibility] random_val = random.random() idx = 0 for idx, val in enumerate(possibility): random_val -= val if random_val <= 0: break return data[idx] def insert_mutation(parent): pos, val = np.random.randint(0, len(parent), 1), parent[-1] parent[pos: len(parent) - 1] = parent[pos + 1:] parent[pos] = val return parent def roulette_wheel_selection(pop_eval): # Roulette wheel random_val = np.random.random() * sum(pop_eval) for idx, val in enumerate(pop_eval): random_val -= val if random_val <= 0: return idx return len(pop_eval) - 1 def get_top_k_value(pop_val, k: int, reverse=True): res = [] pop_val_cpy = copy.deepcopy(pop_val) pop_val_cpy.sort(reverse=reverse) for i in range(min(len(pop_val_cpy), k)): for j in range(len(pop_val)): if abs(pop_val_cpy[i] - pop_val[j]) < 1e-9 and j not in res: res.append(j) break return res def get_line_config_number(machine_number, component_number): div_counter = 0 div_set = set() for div1 in range(component_number - 2): for div2 in range(div1 + 1, component_number - 1): machine_div = [div1 + 1, div2 - div1, component_number - div2 - 1] machine_div.sort() div_str = "".join(str(s) + '|' for s in machine_div) if div_str in div_set: continue div_set.add(div_str) assign_component_counter = defaultdict(list) for div in machine_div: assign_component_counter[div] += 1 case_div_counter, case_comp_number = 1, component_number for idx in range(machine_number - 1): div = 1 while machine_div[idx]: div *= (case_comp_number / machine_div[idx]) case_comp_number -= 1 machine_div[idx] -= 1 case_div_counter *= div for key, val in assign_component_counter.values(): div = 1 while val: div *= val val -= 1 case_div_counter /= div div_counter += case_div_counter return div_counter def convert_line_assigment(pcb_data, component_data, assignment_result): machine_number = len(assignment_result) placement_points = [] partial_pcb_data, partial_component_data = defaultdict(pd.DataFrame), defaultdict(pd.DataFrame) for machine_index in range(machine_number): if pcb_data is not None: partial_pcb_data[machine_index] = pd.DataFrame(columns=pcb_data.columns) partial_component_data[machine_index] = component_data.copy(deep=True) placement_points.append(sum(assignment_result[machine_index])) if pcb_data is not None: assert sum(placement_points) == len(pcb_data) # === averagely assign available feeder === for part_index, data in component_data.iterrows(): feeder_limit = data.fdn feeder_points = [assignment_result[machine_index][part_index] for machine_index in range(machine_number)] for machine_index in range(machine_number): if pcb_data is None: partial_component_data[machine_index].loc[part_index, 'points'] = assignment_result[machine_index][ part_index] else: partial_component_data[machine_index].loc[part_index, 'points'] = 0 for machine_index in range(machine_number): if feeder_points[machine_index] == 0: continue partial_component_data[machine_index].loc[part_index].fdn = 1 feeder_limit -= 1 while feeder_limit: assign_machine = None for machine_index in range(machine_number): if feeder_limit <= 0: break if feeder_points[machine_index] == 0: continue if assign_machine is None or feeder_points[machine_index] / \ partial_component_data[machine_index].loc[part_index].fdn > feeder_points[ assign_machine] / partial_component_data[assign_machine].loc[part_index].fdn: assign_machine = machine_index assert assign_machine is not None partial_component_data[assign_machine].loc[part_index, 'fdn'] += 1 feeder_limit -= 1 for machine_index in range(machine_number): if feeder_points[machine_index] > 0: assert partial_component_data[machine_index].loc[part_index].fdn > 0 # === assign placements === if pcb_data is not None: part2idx = defaultdict(int) for idx, data in component_data.iterrows(): part2idx[data.part] = idx machine_average_pos = [[0, 0] for _ in range(machine_number)] machine_step_counter = [0 for _ in range(machine_number)] part_pcb_data = defaultdict(list) for _, data in pcb_data.iterrows(): part_pcb_data[part2idx[data.part]].append(data) multiple_component_index = [] for part_index in range(len(component_data)): machine_assign_set = [] for machine_index in range(machine_number): if assignment_result[machine_index][part_index]: machine_assign_set.append(machine_index) if len(machine_assign_set) == 1: for data in part_pcb_data[part_index]: machine_index = machine_assign_set[0] machine_average_pos[machine_index][0] += data.x machine_average_pos[machine_index][1] += data.y machine_step_counter[machine_index] += 1 partial_component_data[machine_index].loc[part_index, 'points'] += 1 partial_pcb_data[machine_index] = pd.concat([partial_pcb_data[machine_index], pd.DataFrame(data).T]) elif len(machine_assign_set) > 1: multiple_component_index.append(part_index) for machine_index in range(machine_number): if machine_step_counter[machine_index] == 0: continue machine_average_pos[machine_index][0] /= machine_step_counter[machine_index] machine_average_pos[machine_index][1] /= machine_step_counter[machine_index] for part_index in multiple_component_index: for data in part_pcb_data[part_index]: idx = -1 min_dist = None for machine_index in range(machine_number): if partial_component_data[machine_index].loc[part_index, 'points'] >= \ assignment_result[machine_index][part_index]: continue dist = (data.x - machine_average_pos[machine_index][0]) ** 2 + ( data.y - machine_average_pos[machine_index][1]) ** 2 if min_dist is None or dist < min_dist: min_dist, idx = dist, machine_index assert idx >= 0 machine_step_counter[idx] += 1 machine_average_pos[idx][0] += (1 - 1 / machine_step_counter[idx]) * machine_average_pos[idx][0] \ + data.x / machine_step_counter[idx] machine_average_pos[idx][1] += (1 - 1 / machine_step_counter[idx]) * machine_average_pos[idx][1] \ + data.y / machine_step_counter[idx] partial_component_data[idx].loc[part_index, 'points'] += 1 partial_pcb_data[idx] = pd.concat([partial_pcb_data[idx], pd.DataFrame(data).T]) for machine_index in range(machine_number): partial_component_data[machine_index] = partial_component_data[machine_index][ partial_component_data[machine_index]['points'] != 0].reset_index(drop=True) return partial_pcb_data, partial_component_data def random_division(num, div): assert num >= div res = [1 for _ in range(num)] while sum(res) < num: pos = random.randint(0, num - 1) val = random.randint(1, num - sum(res)) res[pos] = val return res def list_range(start, end=None): return list(range(start)) if end is None else list(range(start, end))