1725 lines
88 KiB
Python
1725 lines
88 KiB
Python
import copy
|
||
import math
|
||
import random
|
||
import time
|
||
|
||
# import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
import openpyxl
|
||
|
||
from base_optimizer.optimizer_common import *
|
||
|
||
|
||
def dynamic_programming_cycle_path(pcb_data, cycle_placement, assigned_feeder):
|
||
head_sequence = []
|
||
num_pos = sum([placement != -1 for placement in cycle_placement]) + 1
|
||
|
||
pos, head_set = [], []
|
||
feeder_set = set()
|
||
for head, feeder in enumerate(assigned_feeder):
|
||
if feeder == -1:
|
||
continue
|
||
|
||
head_set.append(head)
|
||
placement = cycle_placement[head]
|
||
pos.append([pcb_data.iloc[placement]['x'] - head * head_interval + stopper_pos[0],
|
||
pcb_data.iloc[placement]['y'] + stopper_pos[1], pcb_data.iloc[placement]['r'], head])
|
||
|
||
feeder_set.add(feeder - head * interval_ratio)
|
||
|
||
pos.insert(0, [slotf1_pos[0] + ((min(list(feeder_set)) + max(list(feeder_set))) / 2 - 1) * slot_interval,
|
||
slotf1_pos[1], None, 0])
|
||
|
||
def get_distance(pos_1, pos_2):
|
||
# 拾取起始与终止位置 或 非同轴
|
||
if pos_1[2] is None or pos_2[2] is None or pos_1[3] + (1 if pos_1[3] % 2 == 0 else -1) != pos_2[3]:
|
||
return max(axis_moving_time(pos_1[0] - pos_2[0], 0), axis_moving_time(pos_1[1] - pos_2[1], 1))
|
||
else:
|
||
return max(axis_moving_time(pos_1[0] - pos_2[0], 0), axis_moving_time(pos_1[1] - pos_2[1], 1),
|
||
head_rotary_time(pos_1[2] - pos_2[2]))
|
||
|
||
# 各节点之间的距离
|
||
dist = [[get_distance(pos_1, pos_2) for pos_2 in pos] for pos_1 in pos]
|
||
|
||
min_dist = [[np.inf for _ in range(num_pos)] for s in range(1 << num_pos)]
|
||
min_path = [[[] for _ in range(num_pos)] for s in range(1 << num_pos)]
|
||
|
||
# 状压dp搜索
|
||
for s in range(1, 1 << num_pos, 2):
|
||
# 考虑节点集合s必须包括节点0
|
||
if not (s & 1):
|
||
continue
|
||
for j in range(1, num_pos):
|
||
# 终点j需在当前考虑节点集合s内
|
||
if not (s & (1 << j)):
|
||
continue
|
||
if s == int((1 << j) | 1):
|
||
# 若考虑节点集合s仅含节点0和节点j,dp边界,赋予初值
|
||
# print('j:', j)
|
||
min_path[s][j] = [j]
|
||
min_dist[s][j] = dist[0][j]
|
||
|
||
# 枚举下一个节点i,更新
|
||
for i in range(1, num_pos):
|
||
# 下一个节点i需在考虑节点集合s外
|
||
if s & (1 << i):
|
||
continue
|
||
if min_dist[s][j] + dist[j][i] < min_dist[s | (1 << i)][i]:
|
||
min_path[s | (1 << i)][i] = min_path[s][j] + [i]
|
||
min_dist[s | (1 << i)][i] = min_dist[s][j] + dist[j][i]
|
||
|
||
ans_dist = float('inf')
|
||
ans_path = []
|
||
# 求最终最短哈密顿回路
|
||
for i in range(1, num_pos):
|
||
if min_dist[(1 << num_pos) - 1][i] + dist[i][0] < ans_dist:
|
||
# 更新,回路化
|
||
ans_path = min_path[s][i]
|
||
ans_dist = min_dist[(1 << num_pos) - 1][i] + dist[i][0]
|
||
|
||
for parent in ans_path:
|
||
head_sequence.append(head_set[parent - 1])
|
||
|
||
start_head, end_head = head_sequence[0], head_sequence[-1]
|
||
if pcb_data.iloc[cycle_placement[start_head]]['x'] - start_head * head_interval > \
|
||
pcb_data.iloc[cycle_placement[end_head]]['x'] - end_head * head_interval:
|
||
head_sequence = list(reversed(head_sequence))
|
||
return ans_dist, head_sequence
|
||
|
||
|
||
def dynamic_programming_cycle_route(place_pos: list[Point], pick_pos: Point, rtn_seq=True):
|
||
head_sequence = []
|
||
num_pos = len(place_pos) + 1
|
||
|
||
pos = [pick_pos] + place_pos
|
||
|
||
def get_distance(pos_1: Point, pos_2: Point):
|
||
# 拾取起始与终止位置 或 非同轴
|
||
if pos_1.h is None or pos_2.h is None or pos_1.h + (1 if pos_1.h % 2 == 0 else -1) != pos_2.h:
|
||
return max(axis_moving_time(pos_1.x - pos_2.x, 0), axis_moving_time(pos_1.y - pos_2.y, 1))
|
||
else:
|
||
return max(axis_moving_time(pos_1.x - pos_2.x, 0), axis_moving_time(pos_1.y - pos_2.y, 1),
|
||
head_rotary_time(pos_1.r - pos_2.r))
|
||
|
||
# 各节点之间的距离
|
||
dist = [[get_distance(pos_1, pos_2) for pos_2 in pos] for pos_1 in pos]
|
||
|
||
min_dist = [[np.inf for _ in range(num_pos)] for s in range(1 << num_pos)]
|
||
min_path = [[[] for _ in range(num_pos)] for s in range(1 << num_pos)] if rtn_seq else None
|
||
|
||
# 状压dp搜索
|
||
for s in range(1, 1 << num_pos, 2):
|
||
# 考虑节点集合s必须包括节点0
|
||
if not (s & 1):
|
||
continue
|
||
for j in range(1, num_pos):
|
||
# 终点j需在当前考虑节点集合s内
|
||
if not (s & (1 << j)):
|
||
continue
|
||
if s == int((1 << j) | 1):
|
||
# 若考虑节点集合s仅含节点0和节点j,dp边界,赋予初值
|
||
# print('j:', j)
|
||
if min_path:
|
||
min_path[s][j] = [j]
|
||
min_dist[s][j] = dist[0][j]
|
||
|
||
# 枚举下一个节点i,更新
|
||
for i in range(1, num_pos):
|
||
# 下一个节点i需在考虑节点集合s外
|
||
if s & (1 << i):
|
||
continue
|
||
if min_dist[s][j] + dist[j][i] < min_dist[s | (1 << i)][i]:
|
||
if min_path:
|
||
min_path[s | (1 << i)][i] = min_path[s][j] + [i]
|
||
min_dist[s | (1 << i)][i] = min_dist[s][j] + dist[j][i]
|
||
|
||
ans_dist = float('inf')
|
||
ans_path = []
|
||
# 求最终最短哈密顿回路
|
||
for i in range(1, num_pos):
|
||
if min_dist[(1 << num_pos) - 1][i] + dist[i][0] < ans_dist:
|
||
# 更新,回路化
|
||
if min_path:
|
||
ans_path = min_path[s][i]
|
||
ans_dist = min_dist[(1 << num_pos) - 1][i] + dist[i][0]
|
||
|
||
if len(ans_path):
|
||
for parent in ans_path:
|
||
head_sequence.append(place_pos[parent - 1].h)
|
||
|
||
if place_pos[ans_path[0] - 1].x > place_pos[ans_path[-1] - 1].x:
|
||
head_sequence = list(reversed(head_sequence))
|
||
|
||
return ans_dist, head_sequence
|
||
|
||
|
||
def quick_sort_cycle_route(place_pos: list[Point], pick_pos: Point, rtn_seq=True):
|
||
cycle_place_pos = sorted(place_pos, key=lambda pt: pt.x)
|
||
|
||
move_time = 0
|
||
for mount_seq in range(len(cycle_place_pos) - 1):
|
||
if cycle_place_pos[mount_seq].h + (1 if cycle_place_pos[mount_seq].h % 2 == 0 else -1) \
|
||
!= cycle_place_pos[mount_seq + 1].h:
|
||
move_time += max(axis_moving_time(cycle_place_pos[mount_seq].x - cycle_place_pos[mount_seq + 1].x, 0),
|
||
axis_moving_time(cycle_place_pos[mount_seq].y - cycle_place_pos[mount_seq + 1].y, 1))
|
||
else:
|
||
move_time += max(axis_moving_time(cycle_place_pos[mount_seq].x - cycle_place_pos[mount_seq + 1].x, 0),
|
||
axis_moving_time(cycle_place_pos[mount_seq].y - cycle_place_pos[mount_seq + 1].y, 1),
|
||
head_rotary_time(cycle_place_pos[mount_seq].r - cycle_place_pos[mount_seq + 1].r))
|
||
|
||
delta_x = axis_moving_time(cycle_place_pos[0].x - pick_pos.x, 0)
|
||
delta_y = axis_moving_time(cycle_place_pos[0].y - pick_pos.y, 1)
|
||
move_time += max(delta_x, delta_y) + 0.01 * (delta_x ** 2 + delta_y ** 2)
|
||
|
||
delta_x = axis_moving_time(cycle_place_pos[-1].x - pick_pos.x, 0)
|
||
delta_y = axis_moving_time(cycle_place_pos[-1].y - pick_pos.y, 1)
|
||
move_time += max(delta_x, delta_y) + 0.01 * (delta_x ** 2 + delta_y ** 2)
|
||
|
||
head_seq = [place_pos.h for place_pos in cycle_place_pos]
|
||
return move_time, head_seq
|
||
|
||
|
||
@timer_wrapper
|
||
def place_allocate_sequence_route_generation(component_data, pcb_data, component_result, cycle_result,
|
||
feeder_slot_result, hinter=True):
|
||
placement_result, head_sequence_result = [], []
|
||
if len(pcb_data) == 0:
|
||
return placement_result, head_sequence_result
|
||
mount_point_index = [[] for _ in range(len(component_data))]
|
||
mount_point_pos = [[] for _ in range(len(component_data))]
|
||
|
||
for i in range(len(pcb_data)):
|
||
part = pcb_data.iloc[i]['part']
|
||
component_index = component_data[component_data['part'] == part].index.tolist()[0]
|
||
# 记录贴装点序号索引和对应的位置坐标
|
||
mount_point_index[component_index].append(i)
|
||
mount_point_pos[component_index].append([pcb_data.iloc[i]['x'], pcb_data.iloc[i]['y']])
|
||
|
||
search_dir = 0 # 0:自左向右搜索 1:自右向左搜索
|
||
for cycle_set in range(len(component_result)):
|
||
floor_cycle, ceil_cycle = sum(cycle_result[:cycle_set]), sum(cycle_result[:(cycle_set + 1)])
|
||
for cycle in range(floor_cycle, ceil_cycle):
|
||
# search_dir = 1 - search_dir
|
||
assigned_placement = [-1] * max_head_index
|
||
max_pos = [max(mount_point_pos[component_index], key=lambda x: x[0]) for component_index in
|
||
range(len(mount_point_pos)) if len(mount_point_pos[component_index]) > 0][0][0]
|
||
min_pos = [min(mount_point_pos[component_index], key=lambda x: x[0]) for component_index in
|
||
range(len(mount_point_pos)) if len(mount_point_pos[component_index]) > 0][0][0]
|
||
point2head_range = min(math.floor((max_pos - min_pos) / head_interval) + 1, max_head_index)
|
||
|
||
# 最近邻确定
|
||
way_point = None
|
||
head_range = range(max_head_index - 1, -1, -1) if search_dir else range(max_head_index)
|
||
for head_counter, head in enumerate(head_range):
|
||
if component_result[cycle_set][head] == -1:
|
||
continue
|
||
|
||
component_index = component_result[cycle_set][head]
|
||
if way_point is None:
|
||
index = 0
|
||
if way_point is None:
|
||
if search_dir:
|
||
index = np.argmax(mount_point_pos[component_index], axis=0)[0]
|
||
else:
|
||
index = np.argmin(mount_point_pos[component_index], axis=0)[0]
|
||
else:
|
||
for next_head in head_range:
|
||
component_index = component_result[cycle_set][next_head]
|
||
if assigned_placement[next_head] == -1 and component_index != -1:
|
||
num_points = len(mount_point_pos[component_index])
|
||
index = np.argmin(
|
||
[abs(mount_point_pos[component_index][i][0] - way_point[0]) * .1 + abs(
|
||
mount_point_pos[component_index][i][1] - way_point[1]) for i in
|
||
range(num_points)])
|
||
# index = random.randint(0, num_points - 1)
|
||
head = next_head
|
||
break
|
||
# index = np.argmax(mount_point_pos[component_index], axis=0)[0]
|
||
assigned_placement[head] = mount_point_index[component_index][index]
|
||
|
||
# 记录路标点
|
||
way_point = mount_point_pos[component_index][index]
|
||
way_point[0] += (max_head_index - head - 1) * head_interval if search_dir else -head * head_interval
|
||
|
||
mount_point_index[component_index].pop(index)
|
||
mount_point_pos[component_index].pop(index)
|
||
else:
|
||
head_index, point_index = -1, -1
|
||
min_cheby_distance, min_euler_distance = float('inf'), float('inf')
|
||
for next_head in range(max_head_index):
|
||
if assigned_placement[next_head] != -1 or component_result[cycle_set][next_head] == -1:
|
||
continue
|
||
next_comp_index = component_result[cycle_set][next_head]
|
||
for counter in range(len(mount_point_pos[next_comp_index])):
|
||
if search_dir:
|
||
delta_x = abs(mount_point_pos[next_comp_index][counter][0] - way_point[0]
|
||
+ (max_head_index - next_head - 1) * head_interval)
|
||
else:
|
||
delta_x = abs(mount_point_pos[next_comp_index][counter][0] - way_point[0]
|
||
- next_head * head_interval)
|
||
|
||
delta_y = abs(mount_point_pos[next_comp_index][counter][1] - way_point[1])
|
||
|
||
euler_distance = pow(axis_moving_time(delta_x, 0), 2) + pow(axis_moving_time(delta_y, 1), 2)
|
||
cheby_distance = max(axis_moving_time(delta_x, 0),
|
||
axis_moving_time(delta_y, 1)) + 5e-2 * euler_distance
|
||
if cheby_distance < min_cheby_distance or (abs(cheby_distance - min_cheby_distance) < 1e-9
|
||
and euler_distance < min_euler_distance):
|
||
# if euler_distance < min_euler_distance:
|
||
min_cheby_distance, min_euler_distance = cheby_distance, euler_distance
|
||
head_index, point_index = next_head, counter
|
||
|
||
component_index = component_result[cycle_set][head_index]
|
||
assert 0 <= head_index < max_head_index
|
||
# point_index = random.randint(0, len(mount_point_index[component_index]) - 1) # 贴装点随机分配
|
||
assigned_placement[head_index] = mount_point_index[component_index][point_index]
|
||
way_point = mount_point_pos[component_index][point_index]
|
||
way_point[0] += (max_head_index - head_index - 1) * head_interval if search_dir \
|
||
else -head_index * head_interval
|
||
|
||
mount_point_index[component_index].pop(point_index)
|
||
mount_point_pos[component_index].pop(point_index)
|
||
|
||
placement_result.append(assigned_placement) # 各个头上贴装的元件类型
|
||
head_sequence_result.append(
|
||
dynamic_programming_cycle_path(pcb_data, assigned_placement, feeder_slot_result[cycle_set])[1])
|
||
|
||
return placement_result, head_sequence_result
|
||
|
||
|
||
@timer_wrapper
|
||
def beam_search_route_generation(component_data, pcb_data, component_result, cycle_result, feeder_slot_result):
|
||
beam_width = 4 # 集束宽度
|
||
base_points = [float('inf'), float('inf')]
|
||
|
||
mount_point_index = [[] for _ in range(len(component_data))]
|
||
mount_point_pos = [[] for _ in range(len(component_data))]
|
||
|
||
for idx, data in pcb_data.iterrows():
|
||
component_index = component_data[component_data['part'] == data.part].index.tolist()[0]
|
||
|
||
# 记录贴装点序号索引和对应的位置坐标
|
||
mount_point_index[component_index].append(idx)
|
||
mount_point_pos[component_index].append([data.x, data.y])
|
||
|
||
# 记录最左下角坐标
|
||
base_points[0] = min(base_points[0], mount_point_pos[component_index][-1][0])
|
||
base_points[1] = min(base_points[1], mount_point_pos[component_index][-1][1])
|
||
|
||
beam_placement_sequence, beam_head_sequence = [], []
|
||
beam_mount_point_index, beam_mount_point_pos = [], []
|
||
|
||
for beam_counter in range(beam_width):
|
||
beam_mount_point_index.append(copy.deepcopy(mount_point_index))
|
||
beam_mount_point_pos.append(copy.deepcopy(mount_point_pos))
|
||
|
||
beam_placement_sequence.append([])
|
||
beam_head_sequence.append([])
|
||
|
||
beam_distance = [0 for _ in range(beam_width)] # 记录当前集束搜索点的点数
|
||
|
||
def argpartition(list, kth):
|
||
if kth < len(list):
|
||
return np.argpartition(list, kth)
|
||
else:
|
||
index, indexes = 0, []
|
||
while len(indexes) < kth:
|
||
indexes.append(index)
|
||
index += 1
|
||
if index >= len(list):
|
||
index = 0
|
||
return np.array(indexes)
|
||
|
||
with tqdm(total=100) as pbar:
|
||
search_dir = 0
|
||
pbar.set_description('beam search route schedule')
|
||
for cycle_set in range(len(component_result)):
|
||
floor_cycle, ceil_cycle = sum(cycle_result[:cycle_set]), sum(cycle_result[:(cycle_set + 1)])
|
||
for cycle in range(floor_cycle, ceil_cycle):
|
||
search_dir = 1 - search_dir
|
||
beam_way_point = None
|
||
for beam_counter in range(beam_width):
|
||
beam_placement_sequence[beam_counter].append([-1 for _ in range(max_head_index)])
|
||
|
||
head_range = range(max_head_index - 1, -1, -1) if search_dir else range(max_head_index)
|
||
for head in head_range:
|
||
component_index = component_result[cycle_set][head]
|
||
if component_index == -1:
|
||
continue
|
||
|
||
if beam_way_point is None:
|
||
# 首个贴装点的选取,距离基准点最近的beam_width个点
|
||
beam_way_point = [[0, 0]] * beam_width
|
||
|
||
for beam_counter in range(beam_width):
|
||
if search_dir:
|
||
index = np.argmax(beam_mount_point_pos[beam_counter][component_index], axis=0)[0]
|
||
else:
|
||
index = np.argmin(beam_mount_point_pos[beam_counter][component_index], axis=0)[0]
|
||
|
||
beam_placement_sequence[beam_counter][-1][head] = \
|
||
beam_mount_point_index[beam_counter][component_index][index]
|
||
|
||
beam_way_point[beam_counter] = beam_mount_point_pos[beam_counter][component_index][index]
|
||
beam_way_point[beam_counter][0] += (max_head_index - head - 1) * head_interval if \
|
||
search_dir else -head * head_interval
|
||
|
||
beam_mount_point_index[beam_counter][component_index].pop(index)
|
||
beam_mount_point_pos[beam_counter][component_index].pop(index)
|
||
else:
|
||
# 后续贴装点
|
||
search_beam_distance = []
|
||
search_beam_component_index = [0] * (beam_width ** 2)
|
||
for beam_counter in range(beam_width ** 2):
|
||
search_beam_distance.append(beam_distance[beam_counter // beam_width])
|
||
|
||
for beam_counter in range(beam_width):
|
||
# 对于集束beam_counter + 1最近的beam_width个点
|
||
num_points = len(beam_mount_point_pos[beam_counter][component_index])
|
||
|
||
dist = []
|
||
for i in range(num_points):
|
||
if search_dir:
|
||
delta_x = axis_moving_time(
|
||
beam_mount_point_pos[beam_counter][component_index][i][0] -
|
||
beam_way_point[beam_counter][0] + (max_head_index - head - 1) * head_interval,
|
||
0)
|
||
else:
|
||
delta_x = axis_moving_time(
|
||
beam_mount_point_pos[beam_counter][component_index][i][0] -
|
||
beam_way_point[beam_counter][0] - head * head_interval, 0)
|
||
|
||
delta_y = axis_moving_time(beam_mount_point_pos[beam_counter][component_index][i][1] -
|
||
beam_way_point[beam_counter][1], 1)
|
||
|
||
dist.append(max(delta_x, delta_y))
|
||
|
||
indexes = argpartition(dist, kth=beam_width)[:beam_width]
|
||
|
||
# 记录中间信息
|
||
for i, index in enumerate(indexes):
|
||
search_beam_distance[i + beam_counter * beam_width] += dist[index]
|
||
search_beam_component_index[i + beam_counter * beam_width] = index
|
||
|
||
indexes = np.argsort(search_beam_distance)
|
||
|
||
beam_mount_point_pos_cpy = copy.deepcopy(beam_mount_point_pos)
|
||
beam_mount_point_index_cpy = copy.deepcopy(beam_mount_point_index)
|
||
|
||
beam_placement_sequence_cpy = copy.deepcopy(beam_placement_sequence)
|
||
beam_head_sequence_cpy = copy.deepcopy(beam_head_sequence)
|
||
beam_counter = 0
|
||
assigned_placement = []
|
||
|
||
for i, index in enumerate(indexes):
|
||
# 拷贝原始集束数据
|
||
beam_mount_point_pos[beam_counter] = copy.deepcopy(beam_mount_point_pos_cpy[index // beam_width])
|
||
beam_mount_point_index[beam_counter] = copy.deepcopy(beam_mount_point_index_cpy[index // beam_width])
|
||
beam_placement_sequence[beam_counter] = copy.deepcopy(beam_placement_sequence_cpy[index // beam_width])
|
||
beam_head_sequence[beam_counter] = copy.deepcopy(beam_head_sequence_cpy[index // beam_width])
|
||
|
||
# 更新各集束最新扫描的的贴装点
|
||
component_index = component_result[cycle_set][head]
|
||
|
||
beam_placement_sequence[beam_counter][-1][head] = \
|
||
beam_mount_point_index[beam_counter][component_index][search_beam_component_index[index]]
|
||
|
||
if beam_placement_sequence[beam_counter][-1] in assigned_placement \
|
||
and beam_width - beam_counter < len(indexes) - i:
|
||
continue
|
||
|
||
assigned_placement.append(beam_placement_sequence[beam_counter][-1])
|
||
|
||
# 更新参考基准点
|
||
beam_way_point[beam_counter] = beam_mount_point_pos[beam_counter][component_index][
|
||
search_beam_component_index[index]]
|
||
beam_way_point[beam_counter][0] += (max_head_index - head - 1) * head_interval if \
|
||
search_dir else -head * head_interval
|
||
|
||
# 更新各集束贴装路径长度,移除各集束已分配的贴装点
|
||
beam_distance[beam_counter] = search_beam_distance[index]
|
||
|
||
beam_mount_point_pos[beam_counter][component_index].pop(search_beam_component_index[index])
|
||
beam_mount_point_index[beam_counter][component_index].pop(search_beam_component_index[index])
|
||
|
||
beam_counter += 1
|
||
|
||
if beam_counter >= beam_width:
|
||
break
|
||
assert beam_counter >= beam_width
|
||
|
||
# 更新头贴装顺序
|
||
for beam_counter in range(beam_width):
|
||
beam_head_sequence[beam_counter].append(
|
||
dynamic_programming_cycle_path(pcb_data, beam_placement_sequence[beam_counter][-1],
|
||
feeder_slot_result[cycle_set])[1])
|
||
|
||
pbar.update(1 / sum(cycle_result) * 100)
|
||
|
||
index = np.argmin(beam_distance)
|
||
print('beam distance : ', beam_distance[index])
|
||
return beam_placement_sequence[index], beam_head_sequence[index]
|
||
|
||
|
||
# @timer_wrapper
|
||
def scan_based_placement_route_generation(component_data, pcb_data, component_assign, cycle_assign, feeder_slot_result,
|
||
hinter=True):
|
||
placement_result, head_sequence_result = [], []
|
||
|
||
mount_point_pos, mount_point_index, mount_point_angle, mount_point_part = [], [], [], []
|
||
for _, data in pcb_data.iterrows():
|
||
component_index = component_data[component_data.part == data.part].index.tolist()[0]
|
||
# 记录贴装点序号索引和对应的位置坐标
|
||
mount_point_index.append(len(mount_point_index))
|
||
mount_point_pos.append([data.x + stopper_pos[0], data.y + stopper_pos[1]])
|
||
mount_point_angle.append(data.r)
|
||
|
||
mount_point_part.append(component_index)
|
||
|
||
left_boundary, right_boundary = min(mount_point_pos, key=lambda x: x[0])[0], max(mount_point_pos, key=lambda x: x[0])[0]
|
||
search_step = max((right_boundary - left_boundary) / max_head_index / 2, 0)
|
||
|
||
ref_pos_y = min(mount_point_pos, key=lambda x: x[1])[1]
|
||
for cycle_index, component_cycle in enumerate(component_assign):
|
||
for _ in range(cycle_assign[cycle_index]):
|
||
min_dist = None
|
||
tmp_assigned_placement, tmp_assigned_head_seq = [], []
|
||
tmp_mount_point_pos, tmp_mount_point_index = [], []
|
||
for search_dir in range(3): # 不同的搜索方向,贴装头和起始点的选取方法各不相同
|
||
if search_dir == 0:
|
||
# 从左向右搜索
|
||
searchPoints = np.arange(left_boundary, (left_boundary + right_boundary) / 2, search_step)
|
||
head_range = list(range(max_head_index))
|
||
elif search_dir == 1:
|
||
# 从右向左搜索
|
||
searchPoints = np.arange(right_boundary + 1e-3, (left_boundary + right_boundary) / 2, -search_step)
|
||
head_range = list(range(max_head_index - 1, -1, -1))
|
||
else:
|
||
# 从中间向两边搜索
|
||
searchPoints = np.arange(left_boundary, right_boundary, search_step / 2)
|
||
head_range, head_index = [], (max_head_index - 1) // 2
|
||
while head_index >= 0:
|
||
if 2 * head_index != max_head_index - 1:
|
||
head_range.append(max_head_index - 1 - head_index)
|
||
head_range.append(head_index)
|
||
head_index -= 1
|
||
|
||
for startPoint in searchPoints:
|
||
mount_point_pos_cpy, mount_point_index_cpy = copy.deepcopy(mount_point_pos), copy.deepcopy(
|
||
mount_point_index)
|
||
mount_point_angle_cpy = copy.deepcopy(mount_point_angle)
|
||
|
||
assigned_placement = [-1] * max_head_index
|
||
assigned_mount_point = [[0, 0]] * max_head_index
|
||
assigned_mount_angle = [0] * max_head_index
|
||
head_counter, point_index = 0, -1
|
||
for head_index in head_range:
|
||
if head_counter == 0:
|
||
component_index = component_assign[cycle_index][head_index]
|
||
|
||
if component_index == -1:
|
||
continue
|
||
|
||
min_horizontal_distance = None
|
||
for index, mount_index in enumerate(mount_point_index_cpy):
|
||
if mount_point_part[mount_index] != component_index:
|
||
continue
|
||
horizontal_distance = abs(mount_point_pos_cpy[index][0] - startPoint) + 0 * abs(
|
||
mount_point_pos_cpy[index][1] - ref_pos_y)
|
||
|
||
if min_horizontal_distance is None or horizontal_distance < min_horizontal_distance:
|
||
min_horizontal_distance = horizontal_distance
|
||
point_index = index
|
||
else:
|
||
point_index = -1
|
||
min_cheby_distance = None
|
||
|
||
next_comp_index = component_assign[cycle_index][head_index]
|
||
# if assigned_placement[head_index] != -1 or next_comp_index == -1:
|
||
# continue
|
||
for index, mount_index in enumerate(mount_point_index_cpy):
|
||
if mount_point_part[mount_index] != next_comp_index:
|
||
continue
|
||
|
||
point_pos = [[mount_point_pos_cpy[index][0] - head_index * head_interval,
|
||
mount_point_pos_cpy[index][1]]]
|
||
|
||
cheby_distance, euler_distance = 0, 0
|
||
for next_head in range(max_head_index):
|
||
if assigned_placement[next_head] == -1:
|
||
continue
|
||
point_pos.append(assigned_mount_point[next_head].copy())
|
||
point_pos[-1][0] -= next_head * head_interval
|
||
|
||
point_pos = sorted(point_pos, key=lambda x: x[0])
|
||
for mount_seq in range(len(point_pos) - 1):
|
||
delta_x = axis_moving_time(point_pos[mount_seq][0] - point_pos[mount_seq + 1][0], 0)
|
||
delta_y = axis_moving_time(point_pos[mount_seq][1] - point_pos[mount_seq + 1][1], 1)
|
||
cheby_distance += max(delta_x, delta_y)
|
||
euler_distance += math.sqrt(delta_x ** 2 + delta_y ** 2)
|
||
|
||
# cheby_distance += 0.01 * euler_distance
|
||
if min_cheby_distance is None or cheby_distance < min_cheby_distance:
|
||
min_cheby_distance, min_euler_distance = cheby_distance, euler_distance
|
||
point_index = index
|
||
|
||
if point_index == -1:
|
||
continue
|
||
|
||
head_counter += 1
|
||
|
||
assigned_placement[head_index] = mount_point_index_cpy[point_index]
|
||
assigned_mount_point[head_index] = mount_point_pos_cpy[point_index].copy()
|
||
assigned_mount_angle[head_index] = mount_point_angle_cpy[point_index]
|
||
|
||
mount_point_index_cpy.pop(point_index)
|
||
mount_point_pos_cpy.pop(point_index)
|
||
mount_point_angle_cpy.pop(point_index)
|
||
|
||
dist, head_seq = dynamic_programming_cycle_path(pcb_data, assigned_placement,
|
||
feeder_slot_result[cycle_index])
|
||
|
||
if min_dist is None or dist < min_dist:
|
||
tmp_mount_point_pos, tmp_mount_point_index = mount_point_pos_cpy, mount_point_index_cpy
|
||
tmp_assigned_placement, tmp_assigned_head_seq = assigned_placement, head_seq
|
||
min_dist = dist
|
||
|
||
mount_point_pos, mount_point_index = tmp_mount_point_pos, tmp_mount_point_index
|
||
|
||
placement_result.append(tmp_assigned_placement)
|
||
head_sequence_result.append(tmp_assigned_head_seq)
|
||
|
||
return placement_result, head_sequence_result
|
||
# return placement_route_relink_heuristic(component_data, pcb_data, placement_result, head_sequence_result,
|
||
# feeder_slot_result, cycle_assign)
|
||
|
||
|
||
def placement_route_relink_heuristic(component_data, pcb_data, placement_result, head_sequence_result,
|
||
feeder_slot_result, cycle_result, hinter=True):
|
||
cycle_group_index = defaultdict(int)
|
||
cycle_index = 0
|
||
for cycle_group, group in enumerate(cycle_result):
|
||
for _ in range(group):
|
||
cycle_group_index[cycle_index] = cycle_group
|
||
cycle_index += 1
|
||
|
||
mount_point_pos, mount_point_angle, mount_point_index, mount_point_part = [], [], [], []
|
||
for i, data in pcb_data.iterrows():
|
||
component_index = component_data[component_data.part == data.part].index.tolist()[0]
|
||
# 记录贴装点序号索引和对应的位置坐标
|
||
mount_point_index.append(i)
|
||
mount_point_pos.append([data.x + stopper_pos[0], data.y + stopper_pos[1]])
|
||
mount_point_angle.append(data.r)
|
||
|
||
mount_point_part.append(component_index)
|
||
|
||
cycle_length, cycle_average_pos = [], []
|
||
for cycle, placement in enumerate(placement_result):
|
||
# prev_pos, prev_angle = None, None
|
||
cycle_pos_list = []
|
||
for idx, head in enumerate(head_sequence_result[cycle]):
|
||
if point_index := placement[head] == -1:
|
||
continue
|
||
cycle_pos_list.append(
|
||
[mount_point_pos[point_index][0] - head * head_interval, mount_point_pos[point_index][1]])
|
||
|
||
cycle_average_pos.append([sum(map(lambda pos: pos[0], cycle_pos_list)) / len(cycle_pos_list),
|
||
sum(map(lambda pos: pos[1], cycle_pos_list)) / len(cycle_pos_list)])
|
||
cycle_length.append(
|
||
dynamic_programming_cycle_path(pcb_data, placement, feeder_slot_result[cycle_group_index[cycle]])[0])
|
||
|
||
best_placement_result, best_head_sequence_result = copy.deepcopy(placement_result), copy.deepcopy(
|
||
head_sequence_result)
|
||
|
||
best_cycle_length, best_cycle_average_pos = copy.deepcopy(cycle_length), copy.deepcopy(cycle_average_pos)
|
||
|
||
n_runningtime, n_iteration = 30, 0
|
||
start_time = time.time()
|
||
with tqdm(total=n_runningtime) as pbar:
|
||
pbar.set_description('swap heuristic process')
|
||
prev_time = start_time
|
||
while True:
|
||
n_iteration += 1
|
||
placement_result, head_sequence_result = copy.deepcopy(best_placement_result), copy.deepcopy(
|
||
best_head_sequence_result)
|
||
cycle_length = best_cycle_length.copy()
|
||
cycle_average_pos = copy.deepcopy(best_cycle_average_pos)
|
||
|
||
cycle_index = roulette_wheel_selection(cycle_length) # 根据周期加权移动距离随机选择周期
|
||
|
||
point_dist = [] # 周期内各贴装点距离中心位置的切氏距离
|
||
for head in head_sequence_result[cycle_index]:
|
||
point_index = placement_result[cycle_index][head]
|
||
_delta_x = abs(mount_point_pos[point_index][0] - head * head_interval - cycle_average_pos[cycle_index][0])
|
||
_delta_y = abs(mount_point_pos[point_index][1] - cycle_average_pos[cycle_index][1])
|
||
point_dist.append(max(_delta_x, _delta_y))
|
||
|
||
# 随机选择一个异常点
|
||
head_index = head_sequence_result[cycle_index][roulette_wheel_selection(point_dist)]
|
||
point_index = placement_result[cycle_index][head_index]
|
||
|
||
# 找距离该异常点最近的周期
|
||
min_dist = None
|
||
chg_cycle_index = -1
|
||
for idx in range(len(cycle_average_pos)):
|
||
if idx == cycle_index:
|
||
continue
|
||
dist_ = 0
|
||
component_type_check = False
|
||
for head in head_sequence_result[idx]:
|
||
dist_ += max(abs(mount_point_pos[placement_result[idx][head]][0] - mount_point_pos[point_index][0]),
|
||
abs(mount_point_pos[placement_result[idx][head]][1] - mount_point_pos[point_index][1]))
|
||
if mount_point_part[placement_result[idx][head]] == mount_point_part[point_index]:
|
||
component_type_check = True
|
||
|
||
if (min_dist is None or dist_ < min_dist) and component_type_check:
|
||
min_dist = dist_
|
||
chg_cycle_index = idx
|
||
|
||
if chg_cycle_index == -1:
|
||
continue
|
||
|
||
chg_head, min_chg_dist = None, None
|
||
chg_cycle_point = []
|
||
for head in head_sequence_result[chg_cycle_index]:
|
||
index = placement_result[chg_cycle_index][head]
|
||
chg_cycle_point.append([mount_point_pos[index][0] - head * head_interval, mount_point_pos[index][1]])
|
||
|
||
for idx, head in enumerate(head_sequence_result[chg_cycle_index]):
|
||
chg_cycle_point_cpy = copy.deepcopy(chg_cycle_point)
|
||
index = placement_result[chg_cycle_index][head]
|
||
if mount_point_part[index] != mount_point_part[point_index]:
|
||
continue
|
||
chg_cycle_point_cpy[idx][0] = (mount_point_pos[index][0]) - head * head_interval
|
||
|
||
chg_dist = 0
|
||
aver_chg_pos = [sum(map(lambda x: x[0], chg_cycle_point_cpy)) / len(chg_cycle_point_cpy),
|
||
sum(map(lambda x: x[1], chg_cycle_point_cpy)) / len(chg_cycle_point_cpy)]
|
||
|
||
for pos in chg_cycle_point_cpy:
|
||
chg_dist += max(abs(aver_chg_pos[0] - pos[0]), abs(aver_chg_pos[1] - pos[1]))
|
||
|
||
# 更换后各点距离中心更近
|
||
if min_chg_dist is None or chg_dist < min_chg_dist:
|
||
chg_head = head
|
||
min_chg_dist = chg_dist
|
||
|
||
if chg_head is None:
|
||
continue
|
||
|
||
# === 第一轮,变更周期chg_cycle_index的贴装点重排 ===
|
||
chg_placement_res = placement_result[chg_cycle_index].copy()
|
||
chg_placement_res[chg_head] = point_index
|
||
|
||
cycle_point_list = defaultdict(list)
|
||
for head, point in enumerate(chg_placement_res):
|
||
if point == -1:
|
||
continue
|
||
cycle_point_list[mount_point_part[point]].append(point)
|
||
|
||
for key, point_list in cycle_point_list.items():
|
||
cycle_point_list[key] = sorted(point_list, key=lambda p: mount_point_pos[p][0])
|
||
|
||
chg_placement_res = []
|
||
for head, point_index in enumerate(placement_result[chg_cycle_index]):
|
||
if point_index == -1:
|
||
chg_placement_res.append(-1)
|
||
else:
|
||
part = mount_point_part[point_index]
|
||
chg_placement_res.append(cycle_point_list[part][0])
|
||
cycle_point_list[part].pop(0)
|
||
|
||
chg_place_moving, chg_head_res = dynamic_programming_cycle_path(pcb_data, chg_placement_res,
|
||
feeder_slot_result[
|
||
cycle_group_index[chg_cycle_index]])
|
||
|
||
# === 第二轮,原始周期cycle_index的贴装点重排 ===
|
||
placement_res = placement_result[cycle_index].copy()
|
||
placement_res[head_index] = placement_result[chg_cycle_index][chg_head]
|
||
|
||
for point in placement_res:
|
||
if point == -1:
|
||
continue
|
||
cycle_point_list[mount_point_part[point]].append(point)
|
||
|
||
for key, point_list in cycle_point_list.items():
|
||
cycle_point_list[key] = sorted(point_list, key=lambda p: mount_point_pos[p][0])
|
||
|
||
placement_res = []
|
||
for head, point_index in enumerate(placement_result[cycle_index]):
|
||
if point_index == -1:
|
||
placement_res.append(-1)
|
||
else:
|
||
part = mount_point_part[point_index]
|
||
placement_res.append(cycle_point_list[part][0])
|
||
cycle_point_list[part].pop(0)
|
||
|
||
place_moving, place_head_res = dynamic_programming_cycle_path(pcb_data, placement_res, feeder_slot_result[
|
||
cycle_group_index[cycle_index]])
|
||
|
||
# 更新贴装顺序分配结果
|
||
placement_result[cycle_index], head_sequence_result[cycle_index] = placement_res, place_head_res
|
||
placement_result[chg_cycle_index], head_sequence_result[chg_cycle_index] = chg_placement_res, chg_head_res
|
||
|
||
# 更新移动路径
|
||
cycle_length[cycle_index], cycle_length[chg_cycle_index] = place_moving, chg_place_moving
|
||
|
||
# 更新平均坐标和最大偏离点索引
|
||
point_list, point_index_list = [], []
|
||
for head in head_sequence_result[cycle_index]:
|
||
point_index_list.append(placement_result[cycle_index][head])
|
||
point_pos = mount_point_pos[point_index_list[-1]].copy()
|
||
point_pos[0] -= head * head_interval
|
||
point_list.append(point_pos)
|
||
|
||
cycle_average_pos[cycle_index] = [sum(map(lambda x: x[0], point_list)) / len(point_list),
|
||
sum(map(lambda x: x[1], point_list)) / len(point_list)]
|
||
|
||
point_list, point_index_list = [], []
|
||
for head in head_sequence_result[chg_cycle_index]:
|
||
point_index_list.append(placement_result[chg_cycle_index][head])
|
||
point_pos = mount_point_pos[point_index_list[-1]].copy()
|
||
point_pos[0] -= head * head_interval
|
||
point_list.append(point_pos)
|
||
|
||
cycle_average_pos[chg_cycle_index] = [sum(map(lambda x: x[0], point_list)) / len(point_list),
|
||
sum(map(lambda x: x[1], point_list)) / len(point_list)]
|
||
|
||
if sum(cycle_length) < sum(best_cycle_length):
|
||
best_cycle_length = cycle_length.copy()
|
||
best_cycle_average_pos = copy.deepcopy(cycle_average_pos)
|
||
best_placement_result, best_head_sequence_result = copy.deepcopy(placement_result), copy.deepcopy(
|
||
head_sequence_result)
|
||
|
||
cur_time = time.time()
|
||
if cur_time - start_time > n_runningtime:
|
||
break
|
||
|
||
pbar.update(cur_time - prev_time)
|
||
prev_time = cur_time
|
||
|
||
# print("number of iteration: ", n_iteration)
|
||
return best_placement_result, best_head_sequence_result
|
||
|
||
|
||
class RouteNode:
|
||
def __init__(self, component_data=None, pcb_data=None):
|
||
self.cycle_time = 0
|
||
self.total_time = 0
|
||
self.placement_res = []
|
||
self.headseq_res = []
|
||
|
||
if component_data is not None and pcb_data is not None:
|
||
self.unassigned_comp_point = [[] for _ in range(len(component_data))]
|
||
for idx, data in pcb_data.iterrows():
|
||
component_index = component_data[component_data['part'] == data.part].index.tolist()[0]
|
||
self.unassigned_comp_point[component_index].append(idx)
|
||
else:
|
||
self.unassigned_comp_point = [[] for _ in range(100)]
|
||
|
||
def __lt__(self, other):
|
||
return self.cycle_time < other.cycle_time
|
||
|
||
|
||
@timer_wrapper
|
||
def dynamic_beam_route_generation(component_data, pcb_data, component_assign, cycle_assign, feeder_assign):
|
||
empty_node = RouteNode(component_data, pcb_data)
|
||
max_beam_width = 4
|
||
point_pos = [Point(data.x + stopper_pos[0], data.y + stopper_pos[1], data.r) for _, data in pcb_data.iterrows()]
|
||
|
||
left_boundary, right_boundary = min(point_pos, key=lambda pt: pt.x).x, max(point_pos, key=lambda pt: pt.x).x
|
||
search_step = max((right_boundary - left_boundary) / max_head_index, 0)
|
||
cycle_pick_pos = []
|
||
for feeder_slot in feeder_assign:
|
||
slot_set = set()
|
||
for head, slot in enumerate(feeder_slot):
|
||
if slot == -1:
|
||
continue
|
||
slot_set.add(slot - head * interval_ratio)
|
||
pick_pos = Point(slotf1_pos[0] + ((min(list(slot_set)) + max(list(slot_set)))/ 2 - 1) * slot_interval, slotf1_pos[1], 0)
|
||
cycle_pick_pos.append(pick_pos)
|
||
|
||
ref_node = None
|
||
placement_result, head_sequence_result = [], []
|
||
start_time = time.time()
|
||
for beam_width in range(1, max_beam_width + 1):
|
||
beam_node_list = [copy.deepcopy(empty_node) for _ in range(beam_width)]
|
||
cycle_index = 0
|
||
current_ref_node = RouteNode(component_data, pcb_data)
|
||
for cycle_group_index, cycle_num in enumerate(cycle_assign):
|
||
for _ in range(cycle_num):
|
||
for beam_node in beam_node_list:
|
||
beam_node.placement_res.append([-1 for _ in range(max_head_index)])
|
||
|
||
if ref_node:
|
||
current_ref_node.placement_res.append([-1 for _ in range(max_head_index)])
|
||
|
||
beam_node_list_all = []
|
||
for search_dir in range(3): # 不同的搜索方向,贴装头和起始点的选取方法各不相同
|
||
if search_dir == 0:
|
||
# 从左向右搜索
|
||
ref_pos_x_list = np.arange(left_boundary, (left_boundary + right_boundary) / 2, search_step)
|
||
head_range = list(range(max_head_index))
|
||
elif search_dir == 1:
|
||
# 从右向左搜索
|
||
ref_pos_x_list = np.arange(right_boundary + 1e-3, (left_boundary + right_boundary) / 2, -search_step)
|
||
head_range = list(range(max_head_index - 1, -1, -1))
|
||
else:
|
||
# 从中间向两边搜索
|
||
ref_pos_x_list = np.arange(left_boundary, right_boundary, search_step)
|
||
head_range, head_index = [], (max_head_index - 1) // 2
|
||
while head_index >= 0:
|
||
if 2 * head_index != max_head_index - 1:
|
||
head_range.append(max_head_index - 1 - head_index)
|
||
head_range.append(head_index)
|
||
head_index -= 1
|
||
|
||
for ref_pos_x in ref_pos_x_list:
|
||
beam_node_assigning_list = copy.deepcopy(beam_node_list)
|
||
cur_assigning_ref_node = copy.deepcopy(current_ref_node)
|
||
is_first_head = True
|
||
for head_index in head_range:
|
||
if (part_index := component_assign[cycle_group_index][head_index]) == -1:
|
||
continue
|
||
|
||
if is_first_head:
|
||
is_first_head = False
|
||
for beam_index, beam_node in enumerate(beam_node_assigning_list):
|
||
point_index, min_horizontal_dist = -1, None
|
||
for index in beam_node.unassigned_comp_point[part_index]:
|
||
horizontal_dist = abs(point_pos[index].x - ref_pos_x)
|
||
if min_horizontal_dist is None or horizontal_dist < min_horizontal_dist:
|
||
min_horizontal_dist, point_index = horizontal_dist, index
|
||
beam_node.placement_res[-1][head_index] = point_index
|
||
beam_node.unassigned_comp_point[part_index].remove(point_index)
|
||
else:
|
||
beam_move_time, beam_indices, beam_point_indices = [], [], []
|
||
for beam_index, beam_node in enumerate(beam_node_assigning_list):
|
||
cycle_place_pos = []
|
||
for next_head in range(max_head_index):
|
||
if beam_node.placement_res[-1][next_head] == -1:
|
||
continue
|
||
cycle_place_pos.append(Point(point_pos[beam_node.placement_res[-1][
|
||
next_head]].x - next_head * head_interval, point_pos[
|
||
beam_node.placement_res[-1][next_head]].y,
|
||
point_pos[
|
||
beam_node.placement_res[-1][next_head]].r,
|
||
next_head))
|
||
cycle_place_pos.append([])
|
||
for point_index in beam_node.unassigned_comp_point[part_index]:
|
||
cycle_place_pos[-1] = Point(point_pos[point_index].x - head_index * head_interval,
|
||
point_pos[point_index].y, point_pos[point_index].r,
|
||
head_index)
|
||
|
||
move_time, _ = quick_sort_cycle_route(cycle_place_pos,
|
||
cycle_pick_pos[cycle_group_index])
|
||
|
||
beam_move_time.append(move_time)
|
||
beam_indices.append(beam_index)
|
||
beam_point_indices.append(point_index)
|
||
|
||
next_node_assigning_list, assigned_placement_res = [], []
|
||
for index in np.argsort(beam_move_time):
|
||
tmp_placement_res = beam_node_assigning_list[beam_indices[index]].placement_res[-1].copy()
|
||
tmp_placement_res[head_index] = beam_point_indices[index]
|
||
if tmp_placement_res in assigned_placement_res:
|
||
continue
|
||
|
||
assigned_placement_res.append(tmp_placement_res)
|
||
next_node_assigning_list.append(
|
||
copy.deepcopy(beam_node_assigning_list[beam_indices[index]]))
|
||
next_node_assigning_list[-1].cycle_time = beam_move_time[index]
|
||
next_node_assigning_list[-1].unassigned_comp_point[part_index].remove(
|
||
beam_point_indices[index])
|
||
next_node_assigning_list[-1].placement_res[-1][head_index] = beam_point_indices[index]
|
||
if len(next_node_assigning_list) == beam_width:
|
||
break
|
||
|
||
beam_node_assigning_list = next_node_assigning_list
|
||
|
||
if ref_node:
|
||
point_index = ref_node.placement_res[cycle_index][head_index]
|
||
cur_assigning_ref_node.placement_res[-1][head_index] = point_index
|
||
cur_assigning_ref_node.unassigned_comp_point[part_index].remove(point_index)
|
||
beam_node_assigning_list.append(copy.deepcopy(cur_assigning_ref_node))
|
||
|
||
beam_node_list_all.extend(beam_node_assigning_list)
|
||
|
||
for beam_node in beam_node_list_all:
|
||
beam_node.cycle_time = 0
|
||
cycle_place_pos = []
|
||
for head, index in enumerate(beam_node.placement_res[-1]):
|
||
if index == -1:
|
||
continue
|
||
cycle_place_pos.append(
|
||
Point(point_pos[index].x - head * head_interval, point_pos[index].y, point_pos[index].r,
|
||
head))
|
||
|
||
cycle_time, headseq = dynamic_programming_cycle_route(cycle_place_pos,
|
||
cycle_pick_pos[cycle_group_index])
|
||
beam_node.headseq_res.append(headseq)
|
||
beam_node.total_time += cycle_time
|
||
|
||
beam_node_list, assigned_placement_res = [], []
|
||
for index in np.argsort([beam_node.total_time for beam_node in beam_node_list_all]):
|
||
if beam_node_list_all[index].placement_res in assigned_placement_res:
|
||
continue
|
||
assigned_placement_res.append(beam_node_list_all[index].placement_res)
|
||
beam_node_list.append(beam_node_list_all[index])
|
||
if len(beam_node_list) == beam_width:
|
||
break
|
||
|
||
if ref_node:
|
||
for head_index in range(max_head_index):
|
||
if (part_index := component_assign[cycle_group_index][head_index]) == -1:
|
||
continue
|
||
|
||
point_index = ref_node.placement_res[cycle_index][head_index]
|
||
current_ref_node.placement_res[-1][head_index] = point_index
|
||
current_ref_node.unassigned_comp_point[part_index].remove(point_index)
|
||
|
||
cycle_place_pos = []
|
||
for head, index in enumerate(current_ref_node.placement_res[-1]):
|
||
if index == -1:
|
||
continue
|
||
cycle_place_pos.append(
|
||
Point(point_pos[index].x - head * head_interval, point_pos[index].y, point_pos[index].r,
|
||
head))
|
||
|
||
cycle_time, _ = dynamic_programming_cycle_route(cycle_place_pos, cycle_pick_pos[cycle_group_index])
|
||
current_ref_node.total_time += cycle_time
|
||
current_ref_node.headseq_res.append(ref_node.headseq_res[cycle_index])
|
||
cycle_index += 1
|
||
|
||
min_idx = 0
|
||
for idx in range(1, len(beam_node_list)):
|
||
if beam_node_list[min_idx].total_time > beam_node_list[idx].total_time:
|
||
min_idx = idx
|
||
print(
|
||
f"current beam {beam_width}, assembly time: {beam_node_list[min_idx].total_time:.3f}, running time : "
|
||
f"{time.time() - start_time:.3f} s")
|
||
|
||
ref_node = copy.deepcopy(beam_node_list[min_idx])
|
||
placement_result, head_sequence_result = ref_node.placement_res, ref_node.headseq_res
|
||
return placement_result, head_sequence_result
|
||
|
||
|
||
class PAPSolution:
|
||
def __init__(self, point_pos: list[Point], cycle_res, feeder_slot_res, placement_res, headseq_res):
|
||
self.placement_res = placement_res
|
||
self.headseq_res = headseq_res
|
||
|
||
self.cycle_center_pos = []
|
||
self.cycle_pick_pos = []
|
||
self.cycle_move_time = []
|
||
|
||
self.unassigned_head = []
|
||
self.unassigned_component = []
|
||
|
||
cycle_index = 0
|
||
for cycle_group_index, cycle_group_num in enumerate(cycle_res):
|
||
|
||
slot_set = set()
|
||
for head, slot in enumerate(feeder_slot_res[cycle_group_index]):
|
||
if slot == -1:
|
||
continue
|
||
slot_set.add(slot - head * interval_ratio)
|
||
pick_pos = Point(slotf1_pos[0] + ((min(list(slot_set)) + max(list(slot_set))) / 2 - 1) * slot_interval,
|
||
slotf1_pos[1], 0)
|
||
for _ in range(cycle_group_num):
|
||
self.cycle_pick_pos.append(pick_pos)
|
||
|
||
for _ in range(cycle_group_num):
|
||
center_pos = Point(0, 0)
|
||
cycle_place_pos = []
|
||
for index, head in enumerate(headseq_res[cycle_index]):
|
||
cycle_place_pos.append(Point(point_pos[placement_res[cycle_index][head]].x - head * head_interval,
|
||
point_pos[placement_res[cycle_index][head]].y))
|
||
cycle_place_pos[-1].h = head
|
||
|
||
center_pos.x = (1 - 1 / (index + 1)) * center_pos.x + cycle_place_pos[-1].x / (index + 1)
|
||
center_pos.y = (1 - 1 / (index + 1)) * center_pos.y + cycle_place_pos[-1].y / (index + 1)
|
||
|
||
self.cycle_center_pos.append(center_pos)
|
||
self.cycle_move_time.append(
|
||
dynamic_programming_cycle_route(cycle_place_pos, self.cycle_pick_pos[-1], rtn_seq=False)[0])
|
||
self.unassigned_head.append(None)
|
||
self.unassigned_component.append(None)
|
||
|
||
cycle_index += 1
|
||
|
||
|
||
def all_random_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
selected_cycle = random.sample(range(0, num_of_cycle),num_of_selected_cycle)
|
||
|
||
for cycle in selected_cycle:
|
||
selected_head = random.sample(destroy_solution.headseq_res[cycle], 1)[0]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def all_worst_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
selected_cycle = np.argpartition(-np.array(destroy_solution.cycle_move_time), kth=num_of_selected_cycle)[
|
||
:num_of_selected_cycle]
|
||
for cycle in selected_cycle:
|
||
head_derivative = []
|
||
for head in destroy_solution.headseq_res[cycle]:
|
||
delta_x = abs(point_pos[destroy_solution.placement_res[cycle][
|
||
head]].x - head * head_interval - destroy_solution.cycle_center_pos[cycle].x)
|
||
delta_y = abs(
|
||
point_pos[destroy_solution.placement_res[cycle][head]].y - destroy_solution.cycle_center_pos[cycle].y)
|
||
head_derivative.append(math.sqrt(delta_x ** 2 + delta_y ** 2 + 1e-8))
|
||
|
||
selected_head = destroy_solution.headseq_res[cycle][np.argmax(head_derivative)]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def all_weighted_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
total_time = sum(solution.cycle_move_time)
|
||
p = [cycle_time / total_time for cycle_time in solution.cycle_move_time]
|
||
selected_cycle = np.random.choice(range(num_of_cycle), size=num_of_selected_cycle, p=p, replace=False)
|
||
|
||
for cycle in selected_cycle:
|
||
head_derivative = []
|
||
for head in destroy_solution.headseq_res[cycle]:
|
||
delta_x = abs(point_pos[destroy_solution.placement_res[cycle][
|
||
head]].x - head * head_interval - destroy_solution.cycle_center_pos[cycle].x)
|
||
delta_y = abs(
|
||
point_pos[destroy_solution.placement_res[cycle][head]].y - destroy_solution.cycle_center_pos[cycle].y)
|
||
head_derivative.append(math.sqrt(delta_x ** 2 + delta_y ** 2 + 1e-8))
|
||
|
||
selected_head = random.choices(destroy_solution.headseq_res[cycle], weights=head_derivative, k=1)[0]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def weighted_head_random_cycle_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
selected_cycle = random.sample(range(0, num_of_cycle), num_of_selected_cycle)
|
||
|
||
for cycle in selected_cycle:
|
||
head_derivative = []
|
||
for head in destroy_solution.headseq_res[cycle]:
|
||
delta_x = abs(point_pos[destroy_solution.placement_res[cycle][
|
||
head]].x - head * head_interval - destroy_solution.cycle_center_pos[cycle].x)
|
||
delta_y = abs(
|
||
point_pos[destroy_solution.placement_res[cycle][head]].y - destroy_solution.cycle_center_pos[cycle].y)
|
||
head_derivative.append(math.sqrt(delta_x ** 2 + delta_y ** 2 + 1e-8))
|
||
|
||
selected_head = random.choices(destroy_solution.headseq_res[cycle], weights=head_derivative, k=1)[0]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def random_head_worst_cycle_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
selected_cycle = np.argpartition(-np.array(destroy_solution.cycle_move_time), kth=num_of_selected_cycle)[
|
||
:num_of_selected_cycle]
|
||
|
||
for cycle in selected_cycle:
|
||
selected_head = random.sample(destroy_solution.headseq_res[cycle], 1)[0]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def worst_head_weighted_cycle_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
total_time = sum(solution.cycle_move_time)
|
||
p = [cycle_time / total_time for cycle_time in solution.cycle_move_time]
|
||
selected_cycle = np.random.choice(range(num_of_cycle), size=num_of_selected_cycle, p=p, replace=False)
|
||
|
||
for cycle in selected_cycle:
|
||
head_derivative = []
|
||
for head in destroy_solution.headseq_res[cycle]:
|
||
delta_x = abs(point_pos[destroy_solution.placement_res[cycle][
|
||
head]].x - head * head_interval - destroy_solution.cycle_center_pos[cycle].x)
|
||
delta_y = abs(
|
||
point_pos[destroy_solution.placement_res[cycle][head]].y - destroy_solution.cycle_center_pos[cycle].y)
|
||
head_derivative.append(math.sqrt(delta_x ** 2 + delta_y ** 2 + 1e-8))
|
||
|
||
selected_head = destroy_solution.headseq_res[cycle][np.argmax(head_derivative)]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def random_head_weighted_cycle_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
total_time = sum(solution.cycle_move_time)
|
||
p = [cycle_time / total_time for cycle_time in solution.cycle_move_time]
|
||
selected_cycle = np.random.choice(range(num_of_cycle), size=num_of_selected_cycle, p=p, replace=False)
|
||
|
||
for cycle in selected_cycle:
|
||
selected_head = random.sample(destroy_solution.headseq_res[cycle], 1)[0]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def worst_head_random_cycle_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
selected_cycle = random.sample(range(0, num_of_cycle), num_of_selected_cycle)
|
||
|
||
for cycle in selected_cycle:
|
||
head_derivative = []
|
||
for head in destroy_solution.headseq_res[cycle]:
|
||
delta_x = abs(point_pos[destroy_solution.placement_res[cycle][
|
||
head]].x - head * head_interval - destroy_solution.cycle_center_pos[cycle].x)
|
||
delta_y = abs(
|
||
point_pos[destroy_solution.placement_res[cycle][head]].y - destroy_solution.cycle_center_pos[cycle].y)
|
||
head_derivative.append(math.sqrt(delta_x ** 2 + delta_y ** 2 + 1e-8))
|
||
|
||
selected_head = destroy_solution.headseq_res[cycle][np.argmax(head_derivative)]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def weighted_head_worst_cycle_break(point_pos, comp_of_point, solution, ratio):
|
||
destroy_solution = copy.deepcopy(solution)
|
||
unassigned_points = []
|
||
num_of_cycle = len(solution.cycle_move_time)
|
||
num_of_selected_cycle = round(num_of_cycle * ratio)
|
||
selected_cycle = np.argpartition(-np.array(destroy_solution.cycle_move_time), kth=num_of_selected_cycle)[
|
||
:num_of_selected_cycle]
|
||
|
||
for cycle in selected_cycle:
|
||
head_derivative = []
|
||
for head in destroy_solution.headseq_res[cycle]:
|
||
delta_x = abs(point_pos[destroy_solution.placement_res[cycle][
|
||
head]].x - head * head_interval - destroy_solution.cycle_center_pos[cycle].x)
|
||
delta_y = abs(
|
||
point_pos[destroy_solution.placement_res[cycle][head]].y - destroy_solution.cycle_center_pos[cycle].y)
|
||
head_derivative.append(math.sqrt(delta_x ** 2 + delta_y ** 2 + 1e-8))
|
||
|
||
selected_head = random.choices(destroy_solution.headseq_res[cycle], weights=head_derivative, k=1)[0]
|
||
|
||
# update center position
|
||
head_num = len(destroy_solution.headseq_res[cycle]) - 1
|
||
point_index = destroy_solution.placement_res[cycle][selected_head]
|
||
if head_num == 0:
|
||
destroy_solution.cycle_center_pos[cycle] = Point(0, 0)
|
||
else:
|
||
destroy_solution.cycle_center_pos[cycle].x = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].x - (point_pos[point_index].x - selected_head * head_interval) / head_num
|
||
destroy_solution.cycle_center_pos[cycle].y = (1 + 1.0 / head_num) * destroy_solution.cycle_center_pos[
|
||
cycle].y - point_pos[point_index].y / head_num
|
||
|
||
# destroy operation
|
||
selected_point = destroy_solution.placement_res[cycle][selected_head]
|
||
destroy_solution.unassigned_head[cycle] = selected_head
|
||
destroy_solution.unassigned_component[cycle] = comp_of_point[selected_point]
|
||
|
||
destroy_solution.headseq_res[cycle].remove(selected_head)
|
||
destroy_solution.placement_res[cycle][selected_head] = -1
|
||
|
||
unassigned_points.append(selected_point)
|
||
|
||
return destroy_solution, unassigned_points
|
||
|
||
|
||
def random_repair(point_pos, comp_of_point, solution, unassigned_points):
|
||
for point in unassigned_points:
|
||
unassigned_cycle_list = []
|
||
for cycle, head in enumerate(solution.unassigned_head):
|
||
if head is not None and solution.unassigned_component[cycle] == comp_of_point[point]:
|
||
unassigned_cycle_list.append(cycle)
|
||
|
||
selected_cycle = random.sample(unassigned_cycle_list, 1)[0]
|
||
selected_head = solution.unassigned_head[selected_cycle]
|
||
|
||
# update center pos、move time and head sequence
|
||
cycle_place_pos = [
|
||
Point(point_pos[point].x - selected_head * head_interval, point_pos[point].y, point_pos[point].r,
|
||
selected_head)]
|
||
for head in solution.headseq_res[selected_cycle]:
|
||
cycle_place_pos.append(
|
||
Point(point_pos[solution.placement_res[selected_cycle][head]].x - head * head_interval,
|
||
point_pos[solution.placement_res[selected_cycle][head]].y,
|
||
point_pos[solution.placement_res[selected_cycle][head]].r, head))
|
||
|
||
solution.cycle_move_time[selected_cycle], solution.headseq_res[
|
||
selected_cycle] = dynamic_programming_cycle_route(cycle_place_pos, solution.cycle_pick_pos[selected_cycle])
|
||
|
||
num_head = len(solution.headseq_res[selected_cycle])
|
||
solution.cycle_center_pos[selected_cycle].x = (1 - 1.0 / num_head) * solution.cycle_center_pos[
|
||
selected_cycle].x + (point_pos[point].x - selected_head * head_interval) / num_head
|
||
solution.cycle_center_pos[selected_cycle].y = (1 - 1.0 / num_head) * solution.cycle_center_pos[
|
||
selected_cycle].y + point_pos[point].y / num_head
|
||
|
||
solution.placement_res[selected_cycle][selected_head] = point
|
||
solution.unassigned_head[selected_cycle] = None
|
||
solution.unassigned_component[selected_cycle] = None
|
||
return solution
|
||
|
||
|
||
def greedy_repair(point_pos, comp_of_point, solution, unassigned_points):
|
||
for point in unassigned_points:
|
||
unassigned_cycle_list, unassigned_cycle_dist = [], []
|
||
for cycle, head in enumerate(solution.unassigned_head):
|
||
if head is not None and solution.unassigned_component[cycle] == comp_of_point[point]:
|
||
unassigned_cycle_list.append(cycle)
|
||
|
||
delta_x = abs(solution.cycle_center_pos[cycle].x - point_pos[point].x + head * head_interval)
|
||
delta_y = abs(solution.cycle_center_pos[cycle].y - point_pos[point].y)
|
||
|
||
unassigned_cycle_dist.append(math.sqrt(delta_x ** 2 + delta_y ** 2) + 1e-10)
|
||
|
||
selected_cycle = unassigned_cycle_list[np.argmin(unassigned_cycle_dist)]
|
||
selected_head = solution.unassigned_head[selected_cycle]
|
||
|
||
# update center pos、move time and head sequence
|
||
cycle_place_pos = [
|
||
Point(point_pos[point].x - selected_head * head_interval, point_pos[point].y, point_pos[point].r,
|
||
selected_head)]
|
||
for head in solution.headseq_res[selected_cycle]:
|
||
cycle_place_pos.append(
|
||
Point(point_pos[solution.placement_res[selected_cycle][head]].x - head * head_interval,
|
||
point_pos[solution.placement_res[selected_cycle][head]].y,
|
||
point_pos[solution.placement_res[selected_cycle][head]].r, head))
|
||
|
||
solution.cycle_move_time[selected_cycle], solution.headseq_res[
|
||
selected_cycle] = dynamic_programming_cycle_route(cycle_place_pos, solution.cycle_pick_pos[selected_cycle])
|
||
|
||
num_head = len(solution.headseq_res[selected_cycle])
|
||
solution.cycle_center_pos[selected_cycle].x = (1 - 1.0 / num_head) * solution.cycle_center_pos[
|
||
selected_cycle].x + (point_pos[point].x - selected_head * head_interval) / num_head
|
||
solution.cycle_center_pos[selected_cycle].y = (1 - 1.0 / num_head) * solution.cycle_center_pos[
|
||
selected_cycle].y + point_pos[point].y / num_head
|
||
|
||
solution.placement_res[selected_cycle][selected_head] = point
|
||
solution.unassigned_head[selected_cycle] = None
|
||
solution.unassigned_component[selected_cycle] = None
|
||
return solution
|
||
|
||
|
||
def weighted_repair(point_pos, comp_of_point, solution, unassigned_points):
|
||
for point in unassigned_points:
|
||
unassigned_cycle_list, unassigned_cycle_dist = [], []
|
||
for cycle, head in enumerate(solution.unassigned_head):
|
||
if head is not None and solution.unassigned_component[cycle] == comp_of_point[point]:
|
||
unassigned_cycle_list.append(cycle)
|
||
|
||
delta_x = abs(solution.cycle_center_pos[cycle].x - point_pos[point].x + head * head_interval)
|
||
delta_y = abs(solution.cycle_center_pos[cycle].y - point_pos[point].y)
|
||
|
||
unassigned_cycle_dist.append(math.sqrt(delta_x ** 2 + delta_y ** 2) + 1e-10)
|
||
|
||
selected_cycle = random.choices(unassigned_cycle_list, weights=unassigned_cycle_dist, k=1)[0]
|
||
selected_head = solution.unassigned_head[selected_cycle]
|
||
|
||
# update center pos、move time and head sequence
|
||
cycle_place_pos = [
|
||
Point(point_pos[point].x - selected_head * head_interval, point_pos[point].y, point_pos[point].r,
|
||
selected_head)]
|
||
for head in solution.headseq_res[selected_cycle]:
|
||
cycle_place_pos.append(
|
||
Point(point_pos[solution.placement_res[selected_cycle][head]].x - head * head_interval,
|
||
point_pos[solution.placement_res[selected_cycle][head]].y,
|
||
point_pos[solution.placement_res[selected_cycle][head]].r, head))
|
||
|
||
solution.cycle_move_time[selected_cycle], solution.headseq_res[
|
||
selected_cycle] = dynamic_programming_cycle_route(cycle_place_pos, solution.cycle_pick_pos[selected_cycle])
|
||
|
||
num_head = len(solution.headseq_res[selected_cycle])
|
||
solution.cycle_center_pos[selected_cycle].x = (1 - 1.0 / num_head) * solution.cycle_center_pos[
|
||
selected_cycle].x + (point_pos[point].x - selected_head * head_interval) / num_head
|
||
solution.cycle_center_pos[selected_cycle].y = (1 - 1.0 / num_head) * solution.cycle_center_pos[
|
||
selected_cycle].y + point_pos[point].y / num_head
|
||
|
||
solution.placement_res[selected_cycle][selected_head] = point
|
||
solution.unassigned_head[selected_cycle] = None
|
||
solution.unassigned_component[selected_cycle] = None
|
||
return solution
|
||
|
||
|
||
def select_and_apply_destroy_operator(point_pos, comp_of_point, weight, solution, ratio=0.3):
|
||
destroy_operator_map = {
|
||
0: all_random_break,
|
||
1: all_worst_break,
|
||
2: all_weighted_break,
|
||
3: weighted_head_random_cycle_break,
|
||
4: random_head_worst_cycle_break,
|
||
5: worst_head_weighted_cycle_break,
|
||
6: random_head_weighted_cycle_break,
|
||
7: worst_head_random_cycle_break,
|
||
8: weighted_head_worst_cycle_break,
|
||
}
|
||
assert len(destroy_operator_map) == len(weight)
|
||
destroy_index, destroy_solution, unassigned_points = -1, solution, []
|
||
roulette = np.array(weight).cumsum()
|
||
r = random.uniform(0, max(roulette))
|
||
for index in range(len(destroy_operator_map)):
|
||
if roulette[index] >= r:
|
||
destroy_index = index
|
||
destroy_solution, unassigned_points = destroy_operator_map[index](point_pos, comp_of_point, solution, ratio)
|
||
break
|
||
return destroy_index, destroy_solution, unassigned_points
|
||
|
||
|
||
def select_and_apply_repair_operator(point_pos, comp_of_point, weight, solution, unassigned_points):
|
||
repair_operator_map = {
|
||
0: random_repair,
|
||
1: greedy_repair,
|
||
2: weighted_repair,
|
||
}
|
||
assert len(repair_operator_map) == len(weight)
|
||
repair_index, repair_solution = -1, solution
|
||
roulette = np.array(weight).cumsum()
|
||
r = random.uniform(0, max(roulette))
|
||
for index in range(len(weight)):
|
||
if roulette[index] >= r:
|
||
repair_index = index
|
||
repair_solution = repair_operator_map[index](point_pos, comp_of_point, solution, unassigned_points)
|
||
break
|
||
|
||
return repair_index, repair_solution
|
||
|
||
|
||
@timer_wrapper
|
||
def alns_route_reschedule(pcb_data, placement_res, headseq_res, component_res, feeder_slot_res, cycle_res, hinter=True):
|
||
point_pos = [Point(data.x + stopper_pos[0], data.y + stopper_pos[1], data.r) for _, data in pcb_data.iterrows()]
|
||
comp_of_point = defaultdict(int)
|
||
cycle_index = 0
|
||
for cycle_group_index, cycle_group_num in enumerate(cycle_res):
|
||
for _ in range(cycle_group_num):
|
||
for index, head in enumerate(headseq_res[cycle_index]):
|
||
comp_of_point[placement_res[cycle_index][head]] = component_res[cycle_group_index][head]
|
||
cycle_index += 1
|
||
|
||
init_temperature, final_temperature = 1, 0.2
|
||
alpha, beta = 0.8, 0.5
|
||
solution = PAPSolution(point_pos, cycle_res, feeder_slot_res, placement_res, headseq_res)
|
||
|
||
num_destroy_operators, num_repair_operators = 9, 3
|
||
destroy_weight, repair_weight = [1 for _ in range(num_destroy_operators)], [1 for _ in range(num_repair_operators)]
|
||
destroy_score, repair_score = [1 for _ in range(num_destroy_operators)], [1 for _ in range(num_repair_operators)]
|
||
destroy_times, repair_times = [0 for _ in range(num_destroy_operators)], [0 for _ in range(num_repair_operators)]
|
||
|
||
best_solution = copy.deepcopy(solution)
|
||
max_iteration = 2000
|
||
# === 初始化各周期的移动路径长度,中心位置坐标和未分配的贴装头 ===
|
||
omega_new_global_best, omega_better_than_current, omega_accept, omega_reject = 3, 2, 0.8, 0.2
|
||
best_move_time_list = []
|
||
with tqdm(total=max_iteration) as pbar:
|
||
pbar.set_description('adaptive large neighbor search route reschedule')
|
||
for _ in range(max_iteration):
|
||
temperature = init_temperature
|
||
solution = best_solution
|
||
while temperature > final_temperature:
|
||
destroy_index, destroy_solution, unassigned_points = select_and_apply_destroy_operator(point_pos,
|
||
comp_of_point,
|
||
destroy_weight,
|
||
solution)
|
||
|
||
repair_index, repair_solution = select_and_apply_repair_operator(point_pos, comp_of_point,
|
||
repair_weight, destroy_solution,
|
||
unassigned_points)
|
||
if sum(repair_solution.cycle_move_time) <= sum(solution.cycle_move_time):
|
||
solution = repair_solution
|
||
if sum(repair_solution.cycle_move_time) <= sum(best_solution.cycle_move_time):
|
||
best_solution = repair_solution
|
||
destroy_score[destroy_index] += omega_new_global_best
|
||
repair_score[repair_index] += omega_new_global_best
|
||
else:
|
||
destroy_score[destroy_index] += omega_better_than_current
|
||
repair_score[repair_index] += omega_better_than_current
|
||
else:
|
||
if random.random() < np.exp(
|
||
(sum(solution.cycle_move_time) - sum(repair_solution.cycle_move_time)) / temperature):
|
||
solution = repair_solution
|
||
destroy_score[destroy_index] += omega_accept
|
||
repair_score[repair_index] += omega_accept
|
||
else:
|
||
destroy_score[destroy_index] += omega_reject
|
||
repair_score[repair_index] += omega_reject
|
||
|
||
best_move_time_list.append(sum(best_solution.cycle_move_time))
|
||
destroy_times[destroy_index] += 1
|
||
repair_times[repair_index] += 1
|
||
|
||
destroy_weight[destroy_index] = destroy_weight[destroy_index] * beta + (1 - beta) * destroy_score[
|
||
destroy_index] / destroy_times[destroy_index]
|
||
|
||
repair_weight[repair_index] = repair_weight[repair_index] * beta + (1 - beta) * repair_score[
|
||
repair_index] / repair_times[repair_index]
|
||
|
||
temperature *= alpha
|
||
pbar.update(1)
|
||
|
||
# plt.plot(range(len(best_move_time_list)), best_move_time_list)
|
||
# plt.show()
|
||
|
||
print('best move time', best_move_time_list[-1])
|
||
workbook = openpyxl.load_workbook('result/alns_route.xlsx')
|
||
worksheet = workbook.active
|
||
writing_colum = None
|
||
for col in worksheet.iter_cols(min_col=0, max_col=100):
|
||
for cell in col:
|
||
if cell.value is None:
|
||
writing_colum = cell.column_letter
|
||
break
|
||
if writing_colum:
|
||
break
|
||
|
||
for row_num, value in enumerate(best_move_time_list):
|
||
cell_reference = f"{writing_colum}{row_num + 1}"
|
||
worksheet[cell_reference] = value
|
||
workbook.save('result/alns_route.xlsx')
|
||
|
||
return best_solution.placement_res, best_solution.headseq_res
|
||
|
||
|
||
def place_cluster_greedy_route_generation(component_data, pcb_data, component_result, cycle_result, feeder_slot_result):
|
||
placement_result, head_sequence_result = [], []
|
||
|
||
# === assign CT group to feeder slot ===
|
||
component_point_pos = defaultdict(list)
|
||
|
||
for idx, data in pcb_data.iterrows():
|
||
component_point_pos[data.part].append([data.x + stopper_pos[0], data.y + stopper_pos[1], idx])
|
||
|
||
for pos_list in component_point_pos.values():
|
||
pos_list.sort(key=lambda x: (x[0], x[1]))
|
||
|
||
component_point_index = defaultdict(int)
|
||
for cycle_set in range(len(cycle_result)):
|
||
for cycle in range(cycle_result[cycle_set]):
|
||
placement_result.append([-1 for _ in range(max_head_index)])
|
||
for head in range(max_head_index):
|
||
part_index = component_result[cycle_set][head]
|
||
if part_index == -1:
|
||
continue
|
||
|
||
part = component_data.iloc[part_index]['part']
|
||
point_info = component_point_pos[part][component_point_index[part]]
|
||
|
||
placement_result[-1][head] = point_info[2]
|
||
# mount_point[head] = point_info[0:2]
|
||
|
||
component_point_index[part] += 1
|
||
head_sequence_result.append(
|
||
dynamic_programming_cycle_path(pcb_data, placement_result[-1], feeder_slot_result[cycle_set])[1])
|
||
return placement_result, head_sequence_result
|
||
|
||
|
||
def greedy_level_placing_route_generation(component_data, pcb_data, component_result, cycle_result, feeder_slot_result, hinter=False):
|
||
placement_result, head_sequence = [], []
|
||
part_indices = defaultdict(int)
|
||
for part_idx, data in component_data.iterrows():
|
||
part_indices[data.part] = part_idx
|
||
|
||
mount_point_pos = defaultdict(list)
|
||
for pcb_idx, data in pcb_data.iterrows():
|
||
mount_point_pos[part_indices[data.part]].append([data.x, data.y, pcb_idx])
|
||
|
||
for index_ in mount_point_pos.keys():
|
||
mount_point_pos[index_].sort(key=lambda x: (x[1], x[0]))
|
||
|
||
for cycle_idx, _ in enumerate(cycle_result):
|
||
for _ in range(cycle_result[cycle_idx]):
|
||
placement_result.append([-1 for _ in range(max_head_index)])
|
||
for head in range(max_head_index):
|
||
if component_result[cycle_idx][head] == -1:
|
||
continue
|
||
index_ = component_result[cycle_idx][head]
|
||
placement_result[-1][head] = mount_point_pos[index_][-1][2]
|
||
mount_point_pos[index_].pop()
|
||
head_sequence.append(
|
||
dynamic_programming_cycle_path(pcb_data, placement_result[-1], feeder_slot_result[cycle_idx])[1])
|
||
return placement_result, head_sequence
|
||
|