import random import numpy as np import pandas as pd from base_optimizer.optimizer_common import * class DataMgr: def __init__(self): self.min_placement_points = 10 self.max_placement_points = 1000 self.max_component_types = 30 self.default_feeder_limit = 1 self.max_nozzle_types = 4 self.x_range = [50, 100, 150, 200, 300, 400, 500] self.y_range = [50, 100, 150, 200, 300, 400, 500] self.counter = 0 self.update = 1 self.pre_file = None self.part_col = ["part", "fdr", "nz", 'fdn', 'points'] self.component_data = pd.DataFrame(columns=self.part_col) # the component list update for several rounds def generator(self, mode='Train'): boundary = [random.choice(self.x_range), random.choice(self.y_range)] if boundary[0] < boundary[-1]: boundary[0], boundary[-1] = boundary[-1], boundary[0] nozzle_type_list = random.sample(['CN065', 'CN020', 'CN040', 'CN140'], self.max_nozzle_types) # determine the nozzle type of component if self.counter % self.get_update_round() == 0 or mode == 'test': self.component_data = self.component_data.loc[[]] total_points = random.randint(self.min_placement_points, self.max_placement_points) total_nozzles = random.randint(1, self.max_nozzle_types) selected_nozzle = random.sample(nozzle_type_list, total_nozzles) for cp_idx in range(min(random.randint(1, self.max_component_types), total_points)): part, nozzle = 'C' + str(cp_idx), random.choice(selected_nozzle) self.component_data = pd.concat( [self.component_data, pd.DataFrame([part, 'SM8', nozzle, 1, 0], index=self.part_col).T], ignore_index=True) random_fractions = np.random.rand(len(self.component_data)) normalized_fractions = random_fractions / random_fractions.sum() for cp_idx, fraction in enumerate(normalized_fractions): self.component_data.iloc[cp_idx].points = round(fraction * total_points) step_col = ["ref", "x", "y", "z", "r", "part", "desc", "fdr", "nz", "hd", "cs", "cy", "sk", "bl", "ar", "pl", "lv"] pcb_data = pd.DataFrame(columns=step_col) idx = 1 for _, data in self.component_data.iterrows(): for _ in range(data.points): part, nozzle = data.part, data.nz pos_x, pos_y = np.random.uniform(0, boundary[0]), np.random.uniform(0, boundary[1]) pcb_data = pd.concat([pcb_data, pd.DataFrame([['R' + str(idx), -pos_x, pos_y, 0.000, 0.000, part, '', 'A', '1-0 ' + nozzle, 1, 1, 1, 0, 1, 1, 1, 'L0']], columns=pcb_data.columns)], ignore_index=True) idx += 1 self.counter += 1 return pcb_data, self.component_data def recorder(self, file_handle, info: OptInfo, pcb_data, component_data): # 7个参数:总时间,周期数,吸嘴更换数,ANC往返次数,拾取次数,拾取路径,贴装路径 lineinfo = '{:.3f}'.format(info.total_time) + '\t' + str(info.cycle_counter) + '\t' + str( info.nozzle_change_counter) + '\t' + str(info.anc_round_counter) + '\t' + str( info.pickup_counter) + '\t' + '{:.3f}'.format(info.pickup_distance) + '\t' + '{:.3f}'.format( info.place_distance) # 2个参数: PCB尺寸 lineinfo += '\t' + '{:.3f}'.format(pcb_data['x'].max() - pcb_data['x'].min()) + '\t' + '{:.3f}'.format( pcb_data['y'].max() - pcb_data['y'].min()) # part_position = defaultdict(list) # for _, data in pcb_data.iterrows(): # part_position[data['part']].append([data['x'], data['y']]) point_counter, component_counter = 0, 0 nozzle_type = set() for _, data in component_data.iterrows(): if data.points == 0: continue nozzle_type.add(data.nz) point_counter += data.points component_counter += 1 # 3个参数:总点数,总元件数,总吸嘴数 lineinfo += '\t' + str(point_counter) + '\t' + str(component_counter) + '\t' + str(len(nozzle_type)) # 5 x 元件种类数 个参数: 元件名,吸嘴类型,点数,布局宽度,布局高度 for _, data in component_data.iterrows(): if data.points == 0: continue lineinfo += '\t' + data.part + '\t' + data.nz + '\t' + str(data.points) # lineinfo += '\t' + '{:.3f}'.format(np.ptp([pos[0] for pos in part_position[data.part]])) # lineinfo += '\t' + '{:.3f}'.format(np.ptp([pos[1] for pos in part_position[data.part]])) lineinfo += '\n' file_handle.write(lineinfo) def saver(self, file_path: str, pcb_data): lineinfo = '' for _, data in pcb_data.iterrows(): lineinfo += '\t' + '{:.3f}'.format(data.x) + '\t' + '{:.3f}'.format( data.y) + '\t0.000\t0.000\t' + data.part + '\t\tA\t' + data.nz + '\t1\t1\t1\t1\t1\t1\t1\tN\tL0\n' pos = file_path.find('.') file_path = file_path[:pos] + '-' + str(self.counter) + file_path[pos:] with open(file_path, 'w') as f: f.write(lineinfo) f.close() self.pre_file = file_path def remover(self): if self.pre_file is not None: os.remove(self.pre_file) self.pre_file = None def encode(self, cp_points: defaultdict[str], cp_nozzle: defaultdict[str], board_width, board_height): assert len(cp_points.keys()) == len(cp_nozzle.keys()) assert len(set(cp_nozzle.values())) <= self.max_nozzle_types # === general info === total_points = sum(points for points in cp_points.values()) total_component_types, total_nozzle_types = len(cp_points.keys()), len(set(cp_nozzle.values())) data = [total_points, total_component_types, total_nozzle_types] data.extend([board_width, board_height]) # === heuristic info === cycle, nozzle_change, anc_move, pickup = self.heuristic_objective(cp_points, cp_nozzle) data.extend([cycle, nozzle_change, anc_move, pickup]) # === nozzle info === nozzle_points = defaultdict(int) for cp_idx, nozzle in cp_nozzle.items(): nozzle_points[cp_nozzle[cp_idx]] += cp_points[cp_idx] # points for different nozzle type nozzle_items = [[nozzle, points] for nozzle, points in nozzle_points.items()] nozzle_items = sorted(nozzle_items, key=lambda x: x[1], reverse=True) nz2idx = defaultdict(int) nozzle_slice = [0 for _ in range(self.max_nozzle_types)] for idx, [nozzle, points] in enumerate(nozzle_items): nz2idx[nozzle] = idx nozzle_slice[idx] = points data.extend(nozzle_slice) # === component info === comp_data_slice = defaultdict(list) for idx in range(self.max_nozzle_types): comp_data_slice[idx] = [] cp_items = [[component, points] for component, points in cp_points.items()] cp_items = sorted(cp_items, key=lambda x: (x[1], nz2idx[cp_nozzle[x[0]]] * 0.1 + x[1]), reverse=True) for component, points in cp_items: nozzle = cp_nozzle[component] comp_data_slice[nz2idx[nozzle]].append(points) data_slice = [0 for _ in range(self.max_nozzle_types)] for idx in range(self.max_nozzle_types): data_slice[idx] = len(comp_data_slice[idx]) data.extend(data_slice) for idx in range(self.max_nozzle_types): if len(comp_data_slice[idx]) <= self.max_component_types: comp_data_slice[idx].extend([0 for _ in range(self.max_component_types - len(comp_data_slice[idx]))]) else: comp_data_slice[idx] = comp_data_slice[idx][:self.max_component_types] data.extend(comp_data_slice[idx]) return data def heuristic_objective(self, cp_points, cp_nozzle): if len(cp_points.keys()) == 0: return 0, 0, 0, 0 nozzle_heads, nozzle_points = defaultdict(int), defaultdict(int) for idx, points in cp_points.items(): if points == 0: continue nozzle = cp_nozzle[idx] nozzle_points[nozzle] += points nozzle_heads[nozzle] = 1 anc_round_counter = 0 while sum(nozzle_heads.values()) != max_head_index: max_cycle_nozzle = None for nozzle, head_num in nozzle_heads.items(): if max_cycle_nozzle is None or nozzle_points[nozzle] / head_num > nozzle_points[max_cycle_nozzle] / \ nozzle_heads[max_cycle_nozzle]: max_cycle_nozzle = nozzle assert max_cycle_nozzle is not None nozzle_heads[max_cycle_nozzle] += 1 head_nozzle_assignment, min_cost = None, None # generate initial nozzle group nozzle_group = [] # averagely assign for the same type of nozzles, and generate nozzle group nozzle_points_cpy = copy.deepcopy(nozzle_points) for nozzle, heads in nozzle_heads.items(): points = nozzle_points_cpy[nozzle] // heads for _ in range(heads): nozzle_group.append([nozzle, points]) nozzle_points_cpy[nozzle] -= heads * points for idx, [nozzle, _] in enumerate(nozzle_group): if nozzle_points_cpy[nozzle]: nozzle_group[idx][1] += 1 nozzle_points_cpy[nozzle] -= 1 while True: # assign nozzle group to each head nozzle_group.sort(key=lambda x: -x[1]) tmp_head_nozzle_assignment = [] head_total_points = [0 for _ in range(max_head_index)] for idx, nozzle_item in enumerate(nozzle_group): if idx < max_head_index: tmp_head_nozzle_assignment.append([nozzle_item.copy()]) head_total_points[idx] += nozzle_item[1] else: min_head = np.argmin(head_total_points) tmp_head_nozzle_assignment[min_head].append(nozzle_item.copy()) head_total_points[min_head] += nozzle_item[1] cost = t_cycle * max(head_total_points) for head in range(max_head_index): for cycle in range(len(tmp_head_nozzle_assignment[head])): if cycle + 1 == len(tmp_head_nozzle_assignment[head]): if tmp_head_nozzle_assignment[head][cycle][0] != tmp_head_nozzle_assignment[head][-1][0]: cost += t_nozzle_change else: if tmp_head_nozzle_assignment[head][cycle][0] != tmp_head_nozzle_assignment[head][cycle + 1][0]: cost += t_nozzle_change while True: min_head, max_head = np.argmin(head_total_points), np.argmax(head_total_points) min_head_nozzle, max_head_nozzle = tmp_head_nozzle_assignment[min_head][-1][0], \ tmp_head_nozzle_assignment[max_head][-1][0] if min_head_nozzle == max_head_nozzle: break min_head_list, max_head_list = [min_head], [max_head] minmax_head_points = 0 for head in range(max_head_index): if head in min_head_list or head in max_head_list: minmax_head_points += head_total_points[head] continue # the max/min heads with the sum nozzle type if tmp_head_nozzle_assignment[head][-1][0] == tmp_head_nozzle_assignment[min_head][-1][0]: min_head_list.append(head) minmax_head_points += head_total_points[head] if tmp_head_nozzle_assignment[head][-1][0] == tmp_head_nozzle_assignment[max_head][-1][0]: max_head_list.append(head) minmax_head_points += head_total_points[head] # todo: restriction of available nozzle # the reduction of cycles is not offset the cost of nozzle change average_points = minmax_head_points // (len(min_head_list) + len(max_head_list)) reminder_points = minmax_head_points % (len(min_head_list) + len(max_head_list)) max_cycle = average_points + (1 if reminder_points > 0 else 0) for head in range(max_head_index): if head in min_head_list or head in max_head_list: continue max_cycle = max(max_cycle, head_total_points[head]) nozzle_change_counter = 0 for head in min_head_list: if tmp_head_nozzle_assignment[head][0] == tmp_head_nozzle_assignment[head][-1]: nozzle_change_counter += 2 else: nozzle_change_counter += 1 if t_cycle * (max(head_total_points) - max_cycle) < t_nozzle_change * nozzle_change_counter: break cost -= t_cycle * (max(head_total_points) - max_cycle) - t_nozzle_change * nozzle_change_counter required_points = 0 # 待均摊的贴装点数较多的吸嘴类型 for head in min_head_list: points = average_points - head_total_points[head] tmp_head_nozzle_assignment[head].append([max_head_nozzle, points]) head_total_points[head] = average_points required_points += points for head in max_head_list: tmp_head_nozzle_assignment[head][-1][1] -= required_points // len(max_head_list) head_total_points[head] -= required_points // len(max_head_list) required_points -= (required_points // len(max_head_list)) * len(max_head_list) for head in max_head_list: if required_points <= 0: break tmp_head_nozzle_assignment[head][-1][1] -= 1 head_total_points[head] -= 1 required_points -= 1 if min_cost is None or cost < min_cost: min_cost = cost head_nozzle_assignment = copy.deepcopy(tmp_head_nozzle_assignment) else: break # 在吸嘴组中增加一个吸嘴 idx, nozzle = 0, nozzle_group[0][0] for idx, [nozzle_, _] in enumerate(nozzle_group): if nozzle_ != nozzle: break average_points, remainder_points = nozzle_points[nozzle] // (idx + 1), nozzle_points[nozzle] % (idx + 1) nozzle_group.append([nozzle, 0]) for idx, [nozzle_, _] in enumerate(nozzle_group): if nozzle_ == nozzle: nozzle_group[idx][1] = average_points + (1 if remainder_points > 0 else 0) remainder_points -= 1 cycle_counter, nozzle_change_counter = 0, 0 for head in range(max_head_index): head_cycle_counter = 0 for cycle in range(len(head_nozzle_assignment[head])): if cycle + 1 == len(head_nozzle_assignment[head]): if head_nozzle_assignment[head][0][0] != head_nozzle_assignment[head][-1][0]: nozzle_change_counter += 1 else: if head_nozzle_assignment[head][cycle][0] != head_nozzle_assignment[head][cycle + 1][0]: nozzle_change_counter += 1 head_cycle_counter += head_nozzle_assignment[head][cycle][1] cycle_counter = max(cycle_counter, head_cycle_counter) # === 元件拾取次数预估 === cp_info = [] for idx, points in cp_points.items(): if points == 0: continue feeder_limit = 1 # todo: 暂时仅考虑一种吸嘴的情形 reminder_points = points % feeder_limit for _ in range(feeder_limit): cp_info.append([idx, points // feeder_limit + (1 if reminder_points > 0 else 0), cp_nozzle[idx]]) reminder_points -= 1 cp_info.sort(key=lambda x: -x[1]) nozzle_level, nozzle_counter = defaultdict(int), defaultdict(int) level_points = defaultdict(int) for info in cp_info: nozzle = info[2] if nozzle_counter[nozzle] and nozzle_counter[nozzle] % nozzle_heads[nozzle] == 0: nozzle_level[nozzle] += 1 level = nozzle_level[nozzle] level_points[level] = max(level_points[level], info[1]) nozzle_counter[nozzle] += 1 pickup_counter = sum(points for points in level_points.values()) return cycle_counter, nozzle_change_counter, anc_round_counter, pickup_counter def decode(self, line_info): items = line_info.split('\t') board_width, board_height = float(items[7]), float(items[8]) total_points, total_component_types = int(items[9]), int(items[10]) part_col = ["part", "desc", "fdr", "nz", 'camera', 'group', 'feeder-limit', 'points'] step_col = ["ref", "x", "y", "z", "r", "part", "desc", "fdr", "nz", "hd", "cs", "cy", "sk", "bl", "ar", "pl", "lv"] component_data = pd.DataFrame(columns=part_col) pcb_data = pd.DataFrame(columns=step_col) idx = 1 for cp_counter in range(total_component_types): part, nozzle = items[12 + cp_counter * 3], items[13 + cp_counter * 3] points = int(items[14 + cp_counter * 3]) pos_list = [] for _ in range(points): pos_list.append([np.random.uniform(0, board_width), np.random.uniform(0, board_height)]) component_data = pd.concat([component_data, pd.DataFrame( [part, '', 'SM8', nozzle, '飞行相机1', 'CHIP-Rect', self.default_feeder_limit, points], index=part_col).T], ignore_index=True) for pos_x, pos_y in pos_list: pcb_data = pd.concat([pcb_data, pd.DataFrame([['R' + str(idx), - pos_x, pos_y, 0.000, 0.000, part, '', 'A', '1-0 ' + nozzle, 1, 1, 1, 0, 1, 1, 1, 'L0']], columns=pcb_data.columns)], ignore_index=True) return pcb_data, component_data def loader(self, file_path, data_filter=True, hinter=False): cp_data, point_data, time_data = [], [], [] with open(file_path, 'r') as file: line = file.readline() while line: items = line.split('\t') cp_points, cp_nozzle = defaultdict(int), defaultdict(str) for cp_idx in range((len(items) - 12) // 3): points = int(items[14 + cp_idx * 3]) if points == 0: continue component_type, nozzle_type = items[12 + cp_idx * 3], items[13 + cp_idx * 3] cp_points[component_type], cp_nozzle[component_type] = points, nozzle_type board_width, board_height = float(items[7]), float(items[8]) cycle, nozzle_change_data, pickup_data = float(items[1]), float(items[2]), float(items[4]) point_data.append(sum(int(items[14 + cp_idx * 3]) for cp_idx in range((len(items) - 12) // 3))) # assembly time data time_data.append(float(items[0])) cp_data.append([cp_points, cp_nozzle, board_width, board_height]) line = file.readline() if data_filter: cph_data = [point_data[idx] / time_data[idx] * 3600 for idx in range(len(time_data))] w_quart = 0.6 Q1, Q3 = np.percentile(np.array(cph_data), 25), np.percentile(np.array(cph_data), 75) indices = [i for i in range(len(cph_data)) if Q1 - w_quart * (Q3 - Q1) <= cph_data[i] <= Q3 + w_quart * (Q3 - Q1)] filter_cp_data, filter_time_data = [], [] for idx in indices: filter_cp_data.append(cp_data[idx]) filter_time_data.append(time_data[idx]) else: filter_cp_data, filter_time_data = cp_data, time_data if hinter: print( f"# of sample: {len(cp_data)}, outlier : {(1 - len(filter_cp_data) / len(cp_data)) * 100: .2f}%, " f"mean: {np.average(filter_time_data): .2f}, median: {np.median(filter_time_data): .2f}, " f"max: {np.max(filter_time_data): .2f}, min: {np.min(filter_time_data): .2f}, " f"std. dev: {np.std(filter_time_data): .2f}") return [filter_cp_data, filter_time_data] def metric(self, file_path): metric_data, time_data = [], [] with open(file_path, 'r') as file: line = file.readline() while line: items = line.split('\t') # cycle, nozzle change, anc move, pick up, pick distance, place distance, point metric_data.append([float(items[i]) for i in list(range(1, 7))]) metric_data[-1].extend([sum(int(items[14 + cp_idx * 3]) for cp_idx in range((len(items) - 12) // 3))]) # assembly time data time_data.append(float(items[0])) line = file.readline() return [metric_data, time_data] def neural_encode(self, input_data): train_data = [] for cp_points, cp_nozzle, board_width, board_height in input_data: train_data.append(self.encode(cp_points, cp_nozzle, board_width, board_height)) return train_data def get_feature(self): return (self.max_component_types + 2) * self.max_nozzle_types + 5 + 4 # def neural_encode(self, input_data): # train_data = [] # for cp_points, cp_nozzle, board_width, board_height in input_data: # train_data.append( # [len(cp_points.keys()), len(cp_nozzle.keys()), sum(cp_points.values()), board_width, board_height]) # return train_data # def get_feature(self): # return 5 def get_update_round(self): return self.update