import pandas as pd from base_optimizer.optimizer_common import * class DataMgr: def __init__(self): self.min_placement_points = 100 self.max_placement_points = 800 self.max_component_types = 50 self.default_feeder_limit = 1 self.nozzle_type_list = ['CN065', 'CN140', 'CN220', 'CN040'] self.x_range = [50, 100, 150, 200, 300, 400, 500] self.y_range = [50, 100, 150, 200, 300, 400, 500] self.counter = 0 self.pre_file = None def generator(self): boundary = [random.choice(self.x_range), random.choice(self.y_range)] total_points = random.randint(self.min_placement_points, self.max_placement_points) # determine the nozzle type of component component_list = defaultdict(str) for cp_idx in range(min(random.randint(1, self.max_component_types), total_points)): component_list['C' + str(cp_idx)] = random.choice(self.nozzle_type_list) step_col = ["ref", "x", "y", "z", "r", "part", "desc", "fdr", "nz", "hd", "cs", "cy", "sk", "bl", "ar", "pl", "lv"] pcb_data = pd.DataFrame(columns=step_col) for idx in range(total_points): part = random.choice(list(component_list.keys())) nozzle = component_list[part] pos_x, pos_y = np.random.uniform(0, boundary[0]), np.random.uniform(0, boundary[1]) pcb_data = pd.concat([pcb_data, pd.DataFrame([['R' + str(idx), -pos_x, pos_y, 0.000, 0.000, part, '', 'A', '1-0 ' + nozzle, 1, 1, 1, 0, 1, 1, 1, 'L0']], columns=pcb_data.columns)], ignore_index=True) part_col = ["part", "desc", "fdr", "nz", 'camera', 'group', 'feeder-limit', 'points'] component_data = pd.DataFrame(columns=part_col) for _, data in pcb_data.iterrows(): part, nozzle = data.part, data.nz.split(' ')[1] if part not in component_data['part'].values: component_data = pd.concat([component_data, pd.DataFrame( [part, '', 'SM8', nozzle, '飞行相机1', 'CHIP-Rect', self.default_feeder_limit, 0], index=part_col).T], ignore_index=True) part_index = component_data[component_data['part'] == part].index.tolist()[0] component_data.loc[part_index, 'points'] += 1 self.counter += 1 return pcb_data, component_data def recorder(self, file_path, info: OptInfo, pcb_data, component_data): lineinfo = '{:.6f}'.format(info.placement_time) + '\t' + str(info.cycle_counter) + '\t' + str( info.nozzle_change_counter) + '\t' + str(info.pickup_counter) + '\t' + '{:.3f}'.format( info.pickup_movement) + '\t' + '{:.3f}'.format(info.placement_movement) lineinfo += '\t' + '{:.3f}'.format(pcb_data['x'].max() - pcb_data['x'].min()) + '\t' + '{:.3f}'.format( pcb_data['y'].max() - pcb_data['y'].min()) point_counter, component_counter = 0, 0 nozzle_type = set() for _, data in component_data.iterrows(): if data.points == 0: continue nozzle_type.add(data.nz) point_counter += data.points component_counter += 1 lineinfo += '\t' + str(point_counter) + '\t' + str(component_counter) + '\t' + str(len(nozzle_type)) for _, data in component_data.iterrows(): lineinfo += '\t' + data.part + '\t' + data.nz + '\t' + str(data.points) lineinfo += '\n' with open(file_path, 'a') as f: f.write(lineinfo) f.close() def saver(self, file_path: str, pcb_data): lineinfo = '' for _, data in pcb_data.iterrows(): lineinfo += '\t' + '{:.3f}'.format(data.x) + '\t' + '{:.3f}'.format( data.y) + '\t0.000\t0.000\t' + data.part + '\t\tA\t' + data.nz + '\t1\t1\t1\t1\t1\t1\t1\tN\tL0\n' pos = file_path.find('.') file_path = file_path[:pos] + '-' + str(self.counter) + file_path[pos:] with open(file_path, 'w') as f: f.write(lineinfo) f.close() self.pre_file = file_path def remover(self): if self.pre_file is not None: os.remove(self.pre_file) self.pre_file = None def encode(self, cp_points: defaultdict[str], cp_nozzle: defaultdict[int], width, height): cp2nz = defaultdict(int) for idx, nozzle in enumerate(self.nozzle_type_list): cp2nz[nozzle] = idx total_points = sum(points for points in cp_points.values()) total_component_types, total_nozzle_types = len(cp_points.keys()), len(set(cp_nozzle.values())) data = [total_points, total_component_types, total_nozzle_types] data.extend([width, height]) for component, points in cp_points.items(): nozzle = cp_nozzle[component] data_slice = [0 for _ in range(len(self.nozzle_type_list))] data_slice[cp2nz[nozzle]] = points data.extend(data_slice) for _ in range(self.max_component_types - total_component_types): data.extend([0 for _ in range(len(self.nozzle_type_list))]) return data def loader(self, file_path): train_data, time_data = [], [] cycle_data, nozzle_change_data, pickup_data, movement_data, point_data = [], [], [], [], [] pcb_width, pcb_height = [], [] with open(file_path, 'r') as file: line = file.readline() while line: items = line.split('\t') total_points, total_component_types = int(items[8]), int(items[9]) cycle_data.append(float(items[1])) nozzle_change_data.append(float(items[2])) pickup_data.append(float(items[3])) movement_data.append(float(items[4])) point_data.append(total_points) # assembly time data time_data.append(float(items[0])) cp_points, cp_nozzle = defaultdict(int), defaultdict(str) for cp_counter in range(total_component_types): component_type, nozzle_type = items[11 + cp_counter * 3], items[12 + cp_counter * 3] points = int(items[13 + cp_counter * 3]) cp_points[component_type], cp_nozzle[component_type] = points, nozzle_type train_data.append(self.encode(cp_points, cp_nozzle, float(items[6]), float(items[7]))) line = file.readline() return train_data, time_data, cycle_data, nozzle_change_data, pickup_data, movement_data, point_data def get_feature(self): return self.max_component_types * len(self.nozzle_type_list) + 5