import random import numpy as np import pandas as pd from base_optimizer.optimizer_common import * class DataMgr: def __init__(self): self.min_placement_points = 100 self.max_placement_points = 800 self.max_component_types = 30 self.default_feeder_limit = 1 self.nozzle_type_list = ['CN065', 'CN140', 'CN220', 'CN040'] # self.x_range = [50, 100, 150, 200, 300, 400, 500] # self.y_range = [50, 100, 150, 200, 300, 400, 500] self.x_range = [400] self.y_range = [200] self.counter = 0 self.update = 10 self.pre_file = None self.part_col = ["part", "desc", "fdr", "nz", 'camera', 'group', 'feeder-limit', 'points'] self.component_data = pd.DataFrame(columns=self.part_col) # the component list update for several rounds def generator(self, mode='Train'): boundary = [random.choice(self.x_range), random.choice(self.y_range)] # determine the nozzle type of component if self.counter % 10 == 0 or mode == 'test': self.component_data = self.component_data.loc[[]] total_points = random.randint(self.min_placement_points, self.max_placement_points) total_nozzles = random.randint(1, len(self.nozzle_type_list)) selected_nozzle = random.sample(self.nozzle_type_list, total_nozzles) for cp_idx in range(min(random.randint(1, self.max_component_types), total_points)): part, nozzle = 'C' + str(cp_idx), random.choice(selected_nozzle) self.component_data = pd.concat([self.component_data, pd.DataFrame( [part, '', 'SM8', nozzle, '飞行相机1', 'CHIP-Rect', self.default_feeder_limit, 0], index=self.part_col).T], ignore_index=True) random_fractions = np.random.rand(len(self.component_data)) normalized_fractions = random_fractions / random_fractions.sum() for cp_idx, fraction in enumerate(normalized_fractions): self.component_data.iloc[cp_idx].points = round(fraction * total_points) step_col = ["ref", "x", "y", "z", "r", "part", "desc", "fdr", "nz", "hd", "cs", "cy", "sk", "bl", "ar", "pl", "lv"] pcb_data = pd.DataFrame(columns=step_col) idx = 1 for _, data in self.component_data.iterrows(): for _ in range(data.points): part, nozzle = data.part, data.nz pos_x, pos_y = np.random.uniform(0, boundary[0]), np.random.uniform(0, boundary[1]) pcb_data = pd.concat([pcb_data, pd.DataFrame([['R' + str(idx), -pos_x, pos_y, 0.000, 0.000, part, '', 'A', '1-0 ' + nozzle, 1, 1, 1, 0, 1, 1, 1, 'L0']], columns=pcb_data.columns)], ignore_index=True) idx += 1 self.counter += 1 return pcb_data, self.component_data def recorder(self, file_handle, info: OptInfo, pcb_data, component_data): lineinfo = '{:.6f}'.format(info.placement_time) + '\t' + str(info.cycle_counter) + '\t' + str( info.nozzle_change_counter) + '\t' + str(info.pickup_counter) + '\t' + '{:.3f}'.format( info.pickup_movement) + '\t' + '{:.3f}'.format(info.placement_movement) lineinfo += '\t' + '{:.3f}'.format(pcb_data['x'].max() - pcb_data['x'].min()) + '\t' + '{:.3f}'.format( pcb_data['y'].max() - pcb_data['y'].min()) part_xposition, part_yposition = defaultdict(list), defaultdict(list) for _, data in pcb_data.iterrows(): part_xposition[data['part']].append(data['x']) part_yposition[data['part']].append(data['y']) point_counter, component_counter = 0, 0 nozzle_type = set() for _, data in component_data.iterrows(): if data.points == 0: continue nozzle_type.add(data.nz) point_counter += data.points component_counter += 1 lineinfo += '\t' + str(point_counter) + '\t' + str(component_counter) + '\t' + str(len(nozzle_type)) for _, data in component_data.iterrows(): lineinfo += '\t' + data.part + '\t' + data.nz + '\t' + str(data.points) # lineinfo += '\t' + str( # round((np.average(part_xposition[data.part]) + stopper_pos[0] - slotf1_pos[0]) / slot_interval)) lineinfo += '\n' file_handle.write(lineinfo) def saver(self, file_path: str, pcb_data): lineinfo = '' for _, data in pcb_data.iterrows(): lineinfo += '\t' + '{:.3f}'.format(data.x) + '\t' + '{:.3f}'.format( data.y) + '\t0.000\t0.000\t' + data.part + '\t\tA\t' + data.nz + '\t1\t1\t1\t1\t1\t1\t1\tN\tL0\n' pos = file_path.find('.') file_path = file_path[:pos] + '-' + str(self.counter) + file_path[pos:] with open(file_path, 'w') as f: f.write(lineinfo) f.close() self.pre_file = file_path def remover(self): if self.pre_file is not None: os.remove(self.pre_file) self.pre_file = None def encode(self, cp_points: defaultdict[str], cp_nozzle: defaultdict[int], width, height): cp2nz = defaultdict(int) for idx, nozzle in enumerate(self.nozzle_type_list): cp2nz[nozzle] = idx # === general info === total_points = sum(points for points in cp_points.values()) total_component_types, total_nozzle_types = len(cp_points.keys()), len(set(cp_nozzle.values())) data = [total_points, total_component_types, total_nozzle_types] data.extend([width, height]) # === nozzle info === data_slice = [0 for _ in range(len(self.nozzle_type_list))] for component, points in cp_points.items(): idx = cp2nz[cp_nozzle[component]] data_slice[idx] += points data.extend(data_slice) # === component info === cp_items = [[component, points] for component, points in cp_points.items()] cp_items = sorted(cp_items, key=lambda x: (-x[1], x[0])) for component, points in cp_items: nozzle = cp_nozzle[component] data_slice = [0 for _ in range(len(self.nozzle_type_list))] data_slice[cp2nz[nozzle]] = points data.extend(data_slice) for _ in range(self.max_component_types - total_component_types): data.extend([0 for _ in range(len(self.nozzle_type_list))]) return data def decode(self, line_info): boundary = [random.choice(self.x_range), random.choice(self.y_range)] items = line_info.split('\t') total_points, total_component_types = int(items[8]), int(items[9]) part_col = ["part", "desc", "fdr", "nz", 'camera', 'group', 'feeder-limit', 'points'] step_col = ["ref", "x", "y", "z", "r", "part", "desc", "fdr", "nz", "hd", "cs", "cy", "sk", "bl", "ar", "pl", "lv"] component_data = pd.DataFrame(columns=part_col) pcb_data = pd.DataFrame(columns=step_col) idx = 1 for cp_counter in range(total_component_types): # todo: 这里为了调试暂时未修改 part, nozzle = items[11 + cp_counter * 3], items[12 + cp_counter * 3] points = int(items[13 + cp_counter * 3]) component_data = pd.concat([component_data, pd.DataFrame( [part, '', 'SM8', nozzle, '飞行相机1', 'CHIP-Rect', self.default_feeder_limit, points], index=part_col).T], ignore_index=True) for _ in range(points): pos_x, pos_y = np.random.uniform(0, boundary[0]), np.random.uniform(0, boundary[1]) pcb_data = pd.concat([pcb_data, pd.DataFrame([['R' + str(idx), -pos_x, pos_y, 0.000, 0.000, part, '', 'A', '1-0 ' + nozzle, 1, 1, 1, 0, 1, 1, 1, 'L0']], columns=pcb_data.columns)], ignore_index=True) return pcb_data, component_data def loader(self, file_path): train_data, time_data = [], [] cycle_data, nozzle_change_data, pickup_data, movement_data, point_data = [], [], [], [], [] with open(file_path, 'r') as file: line = file.readline() while line: items = line.split('\t') total_points, total_component_types = float(items[8]), float(items[9]) cycle_data.append(float(items[1])) nozzle_change_data.append(float(items[2])) pickup_data.append(float(items[3])) movement_data.append(float(items[4])) point_data.append(total_points) # assembly time data time_data.append(float(items[0])) cp_points, cp_nozzle = defaultdict(int), defaultdict(str) for cp_counter in range(int(total_component_types)): component_type, nozzle_type = items[11 + cp_counter * 3], items[12 + cp_counter * 3] points = int(items[13 + cp_counter * 3]) cp_points[component_type], cp_nozzle[component_type] = points, nozzle_type train_data.append(self.encode(cp_points, cp_nozzle, float(items[6]), float(items[7]))) line = file.readline() return train_data, time_data, cycle_data, nozzle_change_data, pickup_data, movement_data, point_data def get_feature(self): return (self.max_component_types + 1) * len(self.nozzle_type_list) + 5 def get_update_round(self): return self.update