Files
multi-model-optimizer/opt/smm/path_plan.py
2025-11-14 11:34:48 +08:00

291 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import numpy as np
from opt.utils import axis_moving_time
from core.common import *
from data.type import Point
class PathPlanOpt:
def __init__(self, config, part_data, step_data):
self.part_data = part_data
self.step_data = step_data
self.config = config
def dynamic_programming_cycle_path(self, cycle_point, cycle_slot):
head_sequence = []
num_pos = sum([placement != -1 for placement in cycle_point]) + 1
intv_ratio = self.config.head_intv // self.config.slot_intv
pos, head_set = [], []
feeder_set = set()
for head, slot in enumerate(cycle_slot):
if slot == -1:
continue
head_set.append(head)
placement = cycle_point[head]
pos.append([self.step_data.loc[placement]['x'] - head * self.config.head_intv + self.config.stopper_pos.x,
self.step_data.loc[placement]['y'] + self.config.stopper_pos.y,
self.step_data.loc[placement]['r'], head])
feeder_set.add(slot - head * intv_ratio)
pos.insert(0, [self.config.slotf1_pos.x + ((min(list(feeder_set)) + max(list(feeder_set))) / 2 - 1) *
self.config.slot_intv, self.config.slotf1_pos.y, None, 0])
def get_distance(pos_1, pos_2):
# 拾取起始与终止位置 或 非同轴
if pos_1[2] is None or pos_2[2] is None or pos_1[3] + (1 if pos_1[3] % 2 == 0 else -1) != pos_2[3]:
return max(axis_moving_time(pos_1[0] - pos_2[0], 0), axis_moving_time(pos_1[1] - pos_2[1], 1))
else:
return max(axis_moving_time(pos_1[0] - pos_2[0], 0), axis_moving_time(pos_1[1] - pos_2[1], 1),
axis_moving_time(pos_1[2] - pos_2[2], 2))
# 各节点之间的距离
dist = [[get_distance(pos_1, pos_2) for pos_2 in pos] for pos_1 in pos]
min_dist = [[np.inf for _ in range(num_pos)] for s in range(1 << num_pos)]
min_path = [[[] for _ in range(num_pos)] for s in range(1 << num_pos)]
# 状压dp搜索
for s in range(1, 1 << num_pos, 2):
# 考虑节点集合s必须包括节点0
if not (s & 1):
continue
for j in range(1, num_pos):
# 终点j需在当前考虑节点集合s内
if not (s & (1 << j)):
continue
if s == int((1 << j) | 1):
# 若考虑节点集合s仅含节点0和节点jdp边界赋予初值
# print('j:', j)
min_path[s][j] = [j]
min_dist[s][j] = dist[0][j]
# 枚举下一个节点i更新
for i in range(1, num_pos):
# 下一个节点i需在考虑节点集合s外
if s & (1 << i):
continue
if min_dist[s][j] + dist[j][i] < min_dist[s | (1 << i)][i]:
min_path[s | (1 << i)][i] = min_path[s][j] + [i]
min_dist[s | (1 << i)][i] = min_dist[s][j] + dist[j][i]
ans_dist = float('inf')
ans_path = []
# 求最终最短哈密顿回路
for i in range(1, num_pos):
if min_dist[(1 << num_pos) - 1][i] + dist[i][0] < ans_dist:
# 更新,回路化
ans_path = min_path[s][i]
ans_dist = min_dist[(1 << num_pos) - 1][i] + dist[i][0]
for parent in ans_path:
head_sequence.append(head_set[parent - 1])
start_head, end_head = head_sequence[0], head_sequence[-1]
if self.step_data.loc[cycle_point[start_head]]['x'] - start_head * self.config.head_intv > \
self.step_data.loc[cycle_point[end_head]]['x'] - end_head * self.config.head_intv:
head_sequence = list(reversed(head_sequence))
return ans_dist, head_sequence
def scan_based(self, part_result, cycle_result, slot_result):
point_result, sequence_result = [], []
class Mount:
def __init__(self):
self.pos = []
self.angle = []
self.part = []
self.step = []
def pop(self, index):
self.pos.pop(index)
self.angle.pop(index)
self.part.pop(index)
self.step.pop(index)
all_points = Mount()
for step_index, data in self.step_data.iterrows():
part_index = self.part_data[self.part_data.part == data.part].index.tolist()[0]
# 记录贴装点序号索引和对应的位置坐标
all_points.pos.append(Point(data.x + self.config.stopper_pos.x, data.y + self.config.stopper_pos.y))
all_points.angle.append(data.r)
all_points.part.append(part_index)
all_points.step.append(step_index)
head_num = self.config.head_num
left_boundary, right_boundary = min(all_points.pos, key=lambda p: p.x).x, \
max(all_points.pos, key=lambda p: p.x).x
search_step = max((right_boundary - left_boundary) / head_num / 2, 0)
ref_pos_y = min(all_points.pos, key=lambda p: p.y).y
for cycle_index, component_cycle in enumerate(part_result):
for _ in range(cycle_result[cycle_index]):
min_dist = np.inf
tmp_assigned_point, tmp_assigned_head_seq = [], []
tmp_all_points = Mount()
for search_dir in range(3): # 不同的搜索方向,贴装头和起始点的选取方法各不相同
if search_dir == 0:
# 从左向右搜索
search_points = np.arange(left_boundary, (left_boundary + right_boundary) / 2, search_step)
head_range = list(range(head_num))
elif search_dir == 1:
# 从右向左搜索
search_points = np.arange(right_boundary + 1e-3, (left_boundary + right_boundary) / 2, -search_step)
head_range = list(range(head_num - 1, -1, -1))
else:
# 从中间向两边搜索
search_points = np.arange(left_boundary, right_boundary, search_step / 2)
head_range, head_index = [], (head_num - 1) // 2
while head_index >= 0:
if 2 * head_index != head_num - 1:
head_range.append(head_num - 1 - head_index)
head_range.append(head_index)
head_index -= 1
for start_points in search_points:
cur_all_points = copy.deepcopy(all_points)
assigned_point = [-1] * head_num
assigned_mount_point, assigned_mount_angle = [Point(0, 0)] * head_num, [0] * head_num
head_counter, point_index = 0, -1
for head_index in head_range:
if head_counter == 0:
part_index = part_result[cycle_index][head_index]
if part_index == -1:
continue
min_horizontal_distance = np.inf
for index, part in enumerate(cur_all_points.part):
if part != part_result[cycle_index][head_index]:
continue
horizontal_distance = abs(cur_all_points.pos[index].x - start_points) + 0 * abs(
cur_all_points.pos[index].y - ref_pos_y)
if horizontal_distance < min_horizontal_distance:
min_horizontal_distance = horizontal_distance
point_index = index
else:
point_index = -1
min_cheby_distance = np.inf
for index, part in enumerate(cur_all_points.part):
if part != part_result[cycle_index][head_index]:
continue
point_pos = [Point(cur_all_points.pos[index].x - head_index * self.config.head_intv,
cur_all_points.pos[index].y)]
cheby_distance, euler_distance = 0, 0
for next_head in range(head_num):
if assigned_point[next_head] == -1:
continue
point_pos.append(Point(assigned_mount_point[next_head].x - next_head * head_num,
assigned_mount_point[next_head].y))
point_pos = sorted(point_pos, key=lambda p: p.x)
for mount_seq in range(len(point_pos) - 1):
delta_x = axis_moving_time(
point_pos[mount_seq].x - point_pos[mount_seq + 1].x, 0)
delta_y = axis_moving_time(
point_pos[mount_seq].y - point_pos[mount_seq + 1].y, 1)
cheby_distance += max(delta_x, delta_y)
euler_distance += math.sqrt(delta_x ** 2 + delta_y ** 2)
# cheby_distance += 0.01 * euler_distance
if cheby_distance < min_cheby_distance:
min_cheby_distance, min_euler_distance = cheby_distance, euler_distance
point_index = index
if point_index == -1:
continue
head_counter += 1
assigned_point[head_index] = all_points.step[point_index]
assigned_mount_point[head_index] = all_points.pos[point_index]
assigned_mount_angle[head_index] = all_points.angle[point_index]
cur_all_points.pop(point_index)
dist, head_seq = self.dynamic_programming_cycle_path(assigned_point, slot_result[cycle_index])
if min_dist is None or dist < min_dist:
tmp_all_points = cur_all_points
tmp_assigned_point, tmp_assigned_head_seq = assigned_point, head_seq
min_dist = dist
all_points = tmp_all_points
point_result.append(tmp_assigned_point)
sequence_result.append(tmp_assigned_head_seq)
return point_result, sequence_result
def greedy_cluster(self, part_result, cycle_result, slot_result):
point_result, sequence_result = [], []
# === assign CT group to feeder slot ===
component_point_pos = defaultdict(list)
for idx, data in self.step_data.iterrows():
component_point_pos[data.part].append([data.x + self.config.stopper_pos.x,
data.y + self.config.stopper_pos.y, idx])
for pos_list in component_point_pos.values():
pos_list.sort(key=lambda x: (x[0], x[1]))
component_point_index = defaultdict(int)
for cycle_set in range(len(cycle_result)):
for cycle in range(cycle_result[cycle_set]):
point_result.append([-1 for _ in range(self.config.head_num)])
for head in range(self.config.head_num):
part_index = part_result[cycle_set][head]
if part_index == -1:
continue
part = self.part_data.loc[part_index]['part']
point_info = component_point_pos[part][component_point_index[part]]
point_result[-1][head] = point_info[2]
# mount_point[head] = point_info[0:2]
component_point_index[part] += 1
sequence_result.append(
self.dynamic_programming_cycle_path(point_result[-1], slot_result[cycle_set])[1])
return point_result, sequence_result
def greedy_level_placing(self, part_result, cycle_result, slot_result):
point_result, sequence_result = [], []
part_indices = defaultdict(int)
for part_idx, data in self.part_data.iterrows():
part_indices[data.part] = part_idx
mount_point_pos = defaultdict(list)
for pcb_idx, data in self.step_data.iterrows():
mount_point_pos[part_indices[data.part]].append([data.x, data.y, pcb_idx])
for index_ in mount_point_pos.keys():
mount_point_pos[index_].sort(key=lambda x: (x[1], x[0]))
for cycle_idx, _ in enumerate(cycle_result):
for _ in range(cycle_result[cycle_idx]):
point_result.append([-1 for _ in range(self.config.head_num)])
for head in range(self.config.head_num):
if part_result[cycle_idx][head] == -1:
continue
index_ = part_result[cycle_idx][head]
point_result[-1][head] = mount_point_pos[index_][-1][2]
mount_point_pos[index_].pop()
sequence_result.append(self.dynamic_programming_cycle_path(point_result[-1], slot_result[cycle_idx])[1])
return point_result, sequence_result