Files
multi-model-optimizer/opt/smm/feeder_priority.py
2025-11-14 11:34:48 +08:00

792 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from opt.smm.basis import *
from opt.utils import *
class FeederPriorityOpt(BaseOpt):
def __init__(self, config, part_data, step_data, feeder_data=pd.DataFrame(columns=['slot', 'part'])):
super().__init__(config, part_data, step_data, feeder_data)
self.e_gang_pick = 0.6
self.e_nz_change = 4
def optimize(self, hinter=True):
self.feeder_priority_assignment(hinter=hinter)
self.result.point, self.result.sequence = self.path_planner.scan_based(self.result.part, self.result.cycle,
self.result.slot)
def feeder_priority_assignment(self, hinter=True):
feeder_allocate_val = np.inf
nozzle_pattern_list = self.feeder_nozzle_pattern()
pbar = tqdm(total=len(nozzle_pattern_list), desc='feeder priority process') if hinter else None
# 第1步确定吸嘴分配模式
allocated_feeder_data = copy.deepcopy(self.feeder_data)
for nozzle_pattern in nozzle_pattern_list:
feeder_data = copy.deepcopy(allocated_feeder_data)
# 第2步分配供料器位置
self.feeder_allocate(feeder_data, nozzle_pattern, figure=False)
# 第3步扫描供料器基座确定元件拾取的先后顺序
result = OptResult()
result.part, result.cycle, result.slot = self.feeder_base_scan(feeder_data)
info = evaluation(self.config, self.part_data, self.step_data, result)
val = self.cycle_weight * info.cycle_counter + self.nozzle_change_weight * info.nozzle_change_counter + \
self.pickup_weight * info.pickup_counter + self.move_weight * info.pickup_distance
if val < feeder_allocate_val:
feeder_allocate_val = val
self.result, self.feeder_data = result, feeder_data
if pbar:
pbar.update(1)
return self.result.part, self.result.cycle, self.result.slot
def feeder_nozzle_pattern(self):
nozzle_pattern_list = []
nozzle_points = defaultdict(int)
head_num = self.config.head_num
part_nozzle = defaultdict(str)
for _, data in self.part_data.iterrows():
part_nozzle[data.part] = data.nz
for _, data in self.step_data.iterrows():
nozzle_points[part_nozzle[data.part]] += 1
while len(nozzle_points.keys()) > head_num:
del nozzle_points[min(nozzle_points.items(), key=lambda x: x[1])[0]]
sum_points = sum(nozzle_points.values())
nozzle_points = defaultdict(int, {k: v for k, v in nozzle_points.items() if v / sum_points >= 0.8 / head_num})
head_assign_indexes = [int(head_num // 2 + pow(-1, h + 1) * (math.ceil(h / 2) - 1 / 2) +
math.ceil((head_num + 1) % 2) / 2) - 1 for h in range(1, head_num + 1)]
while len(nozzle_points):
nozzle_heads, nozzle_indices = defaultdict(int), defaultdict(str),
min_points_nozzle = None
for idx, (nozzle, points) in enumerate(nozzle_points.items()):
nozzle_heads[nozzle], nozzle_indices[idx] = 1, nozzle
if min_points_nozzle is None or points < nozzle_points[min_points_nozzle]:
min_points_nozzle = nozzle
while sum(nozzle_heads.values()) != head_num:
max_cycle = None
for nozzle, head_cnt in nozzle_heads.items():
if max_cycle is None or nozzle_points[nozzle] / head_cnt > nozzle_points[max_cycle] / \
nozzle_heads[max_cycle]:
max_cycle = nozzle
elif nozzle_points[nozzle] / head_cnt == nozzle_points[max_cycle] / nozzle_heads[max_cycle]:
if head_cnt > nozzle_heads[max_cycle]:
max_cycle = nozzle
assert max_cycle is not None
nozzle_heads[max_cycle] += 1
num_permu = reduce(lambda x, y: x * y, range(1, len(nozzle_indices.keys()) + 1))
num_permu = num_permu // 2 if len(nozzle_indices.keys()) > 3 else num_permu
for permu in itertools.permutations(nozzle_indices.keys()):
if (num_permu := num_permu - 1) < 0:
break
nozzle_pattern_list.append([])
for idx in permu:
for _ in range(nozzle_heads[nozzle_indices[idx]]):
nozzle_pattern_list[-1].append(nozzle_indices[idx])
if len(nozzle_points.keys()) > 1:
nozzle_average_points = []
for nozzle, head in nozzle_heads.items():
nozzle_average_points.append([nozzle, head, nozzle_points[nozzle] / head])
nozzle_average_points = sorted(nozzle_average_points, key=lambda x: -x[2])
idx = 0
nozzle_pattern_list.append(['' for _ in range(head_num)])
for nozzle, head, _ in nozzle_average_points:
for _ in range(head):
nozzle_pattern_list[-1][head_assign_indexes[idx]] = nozzle
idx += 1
idx = 1
nozzle_pattern_list.append(['' for _ in range(head_num)])
for nozzle, head, _ in nozzle_average_points:
for _ in range(head):
nozzle_pattern_list[-1][head_assign_indexes[-idx]] = nozzle
idx += 1
nozzle_points.pop(min_points_nozzle)
return nozzle_pattern_list
def feeder_allocate(self, feeder_data, nozzle_pattern, figure=False):
head_num, slot_num = self.config.head_num, self.config.slot_num
slot_intv, head_intv = self.config.slot_intv, self.config.head_intv
intv_ratio = round(head_intv / slot_intv)
feeder_points, feeder_division_points = defaultdict(int), defaultdict(int) # 供料器贴装点数
feeder_center_pos = defaultdict(float)
feeder_limit, feeder_arrange = defaultdict(int), defaultdict(int)
part_nozzle = defaultdict(str)
feeder_base = [-2] * slot_num # 已安装在供料器基座上的元件(-2: 未分配,-1: 占用状态)
feeder_base_points = [0] * slot_num # 供料器基座结余贴装点数量
part_index = defaultdict(int)
for idx, data in self.part_data.iterrows():
part_index[data.part] = idx
feeder_limit[idx] = data.fdn
feeder_arrange[idx] = 0
for _, data in self.step_data.iterrows():
pos, part = data.x + self.config.stopper_pos.x, data.part
index = part_index[part]
feeder_points[index] += 1
feeder_center_pos[index] += ((pos - feeder_center_pos[index]) / feeder_points[index])
part_nozzle[index] = self.part_data.loc[index].nz
for index, points in feeder_points.items():
feeder_division_points[index] = points // feeder_limit[index]
nozzle_part, nozzle_part_points = defaultdict(list), defaultdict(list)
for part, nozzle in part_nozzle.items():
for _ in range(feeder_limit[part]):
nozzle_part[nozzle].append(part)
nozzle_part_points[nozzle].append(feeder_points[part])
if feeder_data is not None:
for _, feeder in feeder_data.iterrows():
slot, part = feeder.slot, feeder.part
index = part_index[part]
# 供料器基座分配位置和对应贴装点数
feeder_base[slot], feeder_base_points[slot] = index, feeder_division_points[index]
feeder_type = self.part_data.loc[index].fdr
extra_width = feeder_width[feeder_type][0] + feeder_width[feeder_type][1] - slot_intv
while extra_width > 0:
slot += 1
feeder_base[slot] = -1
extra_width -= slot_intv
feeder_limit[index] -= 1
feeder_arrange[index] += 1
if feeder_limit[index] < 0:
info = 'the number of arranged feeder for [' + part + '] exceeds the quantity limit'
raise ValueError(info)
for nozzle, part in nozzle_part.items():
if index in part:
index_ = part.index(index)
nozzle_part[nozzle].pop(index_)
nozzle_part_points[nozzle].pop(index_)
break
head_assign_indexes = [int(head_num // 2 + pow(-1, h + 1) * (math.ceil(h / 2) - 1 / 2) +
math.ceil((head_num + 1) % 2) / 2) - 1 for h in range(1, head_num + 1)]
assert len(nozzle_pattern) == head_num
while True:
best_assign, best_assign_points = [], []
best_assign_slot, best_assign_value = -1, -np.inf
best_nozzle_part, best_nozzle_part_points = None, None
for slot in range(1, slot_num // 2 - (head_num - 1) * intv_ratio + 1):
feeder_assign, feeder_assign_points = [], []
tmp_feeder_limit, tmp_feeder_points = feeder_limit.copy(), feeder_points.copy()
tmp_nozzle_part, tmp_nozzle_part_points = copy.deepcopy(nozzle_part), copy.deepcopy(
nozzle_part_points)
# 记录扫描到的已安装的供料器元件类型
for head in range(head_num):
feeder_assign.append(feeder_base[slot + head * intv_ratio])
if feeder_assign[-1] >= 0:
feeder_assign_points.append(feeder_base_points[slot + head * intv_ratio])
if feeder_assign_points[-1] <= 0:
feeder_assign[-1], feeder_assign_points[-1] = -1, 0
else:
feeder_assign_points.append(0)
if -2 not in feeder_assign:
continue
assign_part_stack, assign_part_stack_points = [], []
for idx in head_assign_indexes:
if feeder_assign[idx] != -2:
continue
# 吸嘴匹配模式非空,按对应吸嘴类型进行元件分配
nozzle_assign = nozzle_pattern[idx]
if len(tmp_nozzle_part[nozzle_assign]) == 0:
# 当前头对应吸嘴类型无可用元件,将计划分配的元件压入堆栈
part = max(tmp_feeder_points.keys(),
key=lambda x: tmp_feeder_points[x] / tmp_feeder_limit[x]
if tmp_feeder_limit[x] != 0 else 0)
for nozzle, part_list in tmp_nozzle_part.items():
if part in part_list:
nozzle_assign = nozzle
assign_part_stack.append(part)
assign_part_stack_points.append(feeder_division_points[part])
break
else:
# 当前头对应吸嘴类型有可用元件,直接分配对应类型的元件
index_ = tmp_nozzle_part[nozzle_assign].index(max(tmp_nozzle_part[nozzle_assign],
key=lambda x: tmp_feeder_points[x] /
tmp_feeder_limit[x] if
tmp_feeder_limit[x] != 0 else 0))
part = tmp_nozzle_part[nozzle_assign][index_]
feeder_type = self.part_data.loc[part].fdr
extra_width, extra_slot = feeder_width[feeder_type][0] + feeder_width[feeder_type][1] - slot_intv, 1
slot_overlap = False
while extra_width > 0:
slot_ = slot + idx * intv_ratio + extra_slot
if feeder_base[slot_] != -2 or slot_ > slot_num // 2:
slot_overlap = True
break
if idx + extra_slot // 2 < head_num and feeder_assign[idx + extra_slot // 2] >= 0:
slot_overlap = True
break
extra_width -= slot_intv
extra_slot += 1
# 可用供料器数目充足且不存在和已有供料器的占位冲突
if tmp_feeder_limit[part] > 0 and not slot_overlap:
feeder_assign[idx], feeder_assign_points[idx] = part, feeder_division_points[part]
extra_width, extra_head = feeder_width[feeder_type][0] + feeder_width[feeder_type][
1] - head_intv, 1
while extra_width > 0 and idx + extra_head < head_num:
feeder_assign[idx + extra_head] = -1
extra_head += 1
extra_width -= head_intv
else:
part = -1 # 存在位置冲突的元件,不占用可用供料器数
if part >= 0 and tmp_feeder_limit[part] == 0:
continue
if part in tmp_nozzle_part[nozzle_assign]:
index = tmp_nozzle_part[nozzle_assign].index(part)
tmp_nozzle_part[nozzle_assign].pop(index)
tmp_nozzle_part_points[nozzle_assign].pop(index)
tmp_feeder_limit[part] -= 1
tmp_feeder_points[part] -= feeder_division_points[part]
# 元件堆栈出栈,首先分配吸嘴类型一致的头
if nozzle_pattern:
for head, feeder in enumerate(feeder_assign):
if feeder != -2:
continue
for idx, part in enumerate(assign_part_stack):
feeder_type = self.part_data.loc[part].fdr
extra_width, extra_slot = feeder_width[feeder_type][0] + feeder_width[feeder_type][
1] - slot_intv, 1
slot_overlap = False
while extra_width > 0:
slot_ = slot + head * intv_ratio + extra_slot
if feeder_base[slot_] != -2 or slot_ > slot_num // 2:
slot_overlap = True
break
extra_width -= slot_intv
extra_slot += 1
if self.part_data.loc[part].nz == nozzle_pattern[head] and not slot_overlap:
feeder_assign[head], feeder_assign_points[head] = assign_part_stack[idx], \
assign_part_stack_points[idx]
assign_part_stack.pop(idx)
assign_part_stack_points.pop(idx)
break
# 元件堆栈,然后分配元件堆栈中未分配的其它元件
for head in head_assign_indexes:
if feeder_assign[head] != -2 or len(assign_part_stack) == 0:
continue
part, points = assign_part_stack[0], assign_part_stack_points[0]
feeder_type = self.part_data.loc[part].fdr
extra_width = feeder_width[feeder_type][0] + feeder_width[feeder_type][1] - slot_intv
extra_slot = 1
slot_overlap = False
while extra_width > 0:
slot_ = slot + head * intv_ratio + extra_slot
if feeder_base[slot_] != -2 or slot_ > slot_num // 2:
slot_overlap = True
break
extra_width -= slot_intv
extra_slot += 1
if not slot_overlap:
feeder_assign[head], feeder_assign_points[head] = part, points
extra_width = feeder_width[feeder_type][0] + feeder_width[feeder_type][1] - head_intv
extra_head = 1
while extra_width > 0 and head + extra_head < head_num:
feeder_assign[head + extra_head] = -1
extra_head += 1
extra_width -= head_intv
else:
# 返还由于机械限位无法分配的,压入元件堆栈中的元素
nozzle = self.part_data.loc[part].nz
tmp_nozzle_part[nozzle].insert(0, part)
tmp_nozzle_part_points[nozzle].insert(0, points)
assign_part_stack.pop(0)
assign_part_stack_points.pop(0)
# 仍然存在由于机械限位,无法进行分配的在堆栈中的元件
while assign_part_stack:
part, points = assign_part_stack[0], assign_part_stack_points[0]
nozzle = self.part_data.loc[part].nz
tmp_nozzle_part[nozzle].insert(0, part)
tmp_nozzle_part_points[nozzle].insert(0, points)
assign_part_stack.pop(0)
assign_part_stack_points.pop(0)
nozzle_change_counter = 0
average_slot, average_head = [], []
for head, feeder_ in enumerate(feeder_assign):
if feeder_ < 0:
continue
average_slot.append((feeder_center_pos[feeder_] - self.config.slotf1_pos.x) / slot_intv + 1)
average_head.append(head)
if nozzle_pattern and self.part_data.loc[feeder_].nz != nozzle_pattern[head]:
nozzle_change_counter += 1
if len(average_slot) == 0:
continue
average_slot = sum(average_slot) / len(average_slot) - sum(average_head) / len(average_head) * intv_ratio
assign_value = 0
feeder_assign_points_cpy = feeder_assign_points.copy()
while True:
points_filter = list(filter(lambda x: x > 0, feeder_assign_points_cpy))
if not points_filter:
break
assign_value += self.e_gang_pick * min(points_filter) * (len(points_filter) - 1)
for head, _ in enumerate(feeder_assign_points_cpy):
if feeder_assign_points_cpy[head] == 0:
continue
feeder_assign_points_cpy[head] -= min(points_filter)
assign_value -= (1e2 * self.e_nz_change * nozzle_change_counter + 1e-5 * abs(slot - average_slot))
if assign_value >= best_assign_value and sum(feeder_assign_points) != 0:
best_assign_value = assign_value
best_assign = feeder_assign.copy()
best_assign_points = feeder_assign_points.copy()
best_assign_slot = slot
best_nozzle_part, best_nozzle_part_points = \
tmp_nozzle_part.copy(), tmp_nozzle_part_points.copy()
if not best_assign_points:
break
for idx, part in enumerate(best_assign):
if part < 0:
continue
# 新安装的供料器
if feeder_base[best_assign_slot + idx * intv_ratio] != part:
# 除去分配给最大化同时拾取周期的项,保留结余项
feeder_base_points[best_assign_slot + idx * intv_ratio] += (
feeder_division_points[part] - min(filter(lambda x: x > 0, best_assign_points)))
feeder_points[part] -= feeder_division_points[part]
feeder_limit[part] -= 1
feeder_arrange[part] += 1
if feeder_limit[part] == 0:
feeder_division_points[part] = 0
for nozzle, part_list in nozzle_part.items():
if part in part_list:
index_ = part_list.index(part)
nozzle_part[nozzle].pop(index_)
nozzle_part_points[nozzle].pop(index_)
break
feeder_division_points[part] = 0
else:
# 已有的供料器
feeder_base_points[best_assign_slot + idx * intv_ratio] -= min(
filter(lambda x: x > 0, best_assign_points))
# 更新供料器基座信息
feeder_base[best_assign_slot + idx * intv_ratio] = part
feeder_type, extra_slot = self.part_data.loc[part].fdr, 0
extra_width = feeder_width[feeder_type][0] + feeder_width[feeder_type][1] - slot_intv
while extra_width > 0:
extra_slot += 1
if feeder_base[best_assign_slot + idx * intv_ratio + extra_slot] == -2:
feeder_base[best_assign_slot + idx * intv_ratio + extra_slot] = -1 # 标记槽位已占用
else:
assert 'feeder allocation conflict'
extra_width -= slot_intv
# 更新吸嘴信息
nozzle_pattern[idx] = self.part_data.loc[part].nz
# 更新头分配的先后顺序
head_assign_indexes = np.array(best_assign_points).argsort().tolist()
nozzle_part, nozzle_part_points = copy.deepcopy(best_nozzle_part), copy.deepcopy(
best_nozzle_part_points)
assert not list(filter(lambda x: x < 0, feeder_limit.values())) # 分配供料器数目在限制范围内
# 更新供料器占位信息
for _, data in feeder_data.iterrows():
feeder_base[data.slot] = -1
for slot, feeder in enumerate(feeder_base):
if feeder < 0:
continue
part = self.part_data.loc[feeder].part
feeder_data.loc[len(feeder_data.index)] = [slot, part]
if figure:
slotf1_pos = self.config.slotf1_pos
# 绘制供料器位置布局
for slot in range(slot_num // 2):
plt.scatter(slotf1_pos.x + slot_intv * slot, slotf1_pos.y, marker='x', s=12, color='black', alpha=0.5)
plt.text(slotf1_pos.x + slot_intv * slot, slotf1_pos.y - 45, str(slot + 1), ha='center', va='bottom',
size=8)
feeder_assign_range = []
for _, feeder in feeder_data.iterrows():
index = self.part_data[self.part_data.part == feeder.part].index.tolist()[0]
feeder_type = self.part_data.loc[index].fdr
width = feeder_width[feeder_type][0] + feeder_width[feeder_type][1]
start = slotf1_pos.x + slot_intv * (feeder.slot - 1) - slot_intv / 2
end = slotf1_pos.x + slot_intv * (feeder.slot - 1) - slot_intv / 2 + width
rec_x = [start, end, end, start]
rec_y = [slotf1_pos.y - 40, slotf1_pos.y - 40, slotf1_pos.y + 10, slotf1_pos.y + 10]
c = 'red' if feeder.arg == 0 else 'black' # 黑色表示已分配,红色表示新分配
plt.text(slotf1_pos.x + slot_intv * (feeder.slot - 1), slotf1_pos.y + 12,
feeder.part + ': ' + str(feeder_points[index]), ha='center', size=7, rotation=90, color=c)
plt.fill(rec_x, rec_y, facecolor='yellow', alpha=0.4)
feeder_assign_range.append([start, end])
# 记录重叠区间
feeder_assign_range.sort(key=lambda x: x[0])
for i in range(1, len(feeder_assign_range)):
if feeder_assign_range[i][0] < feeder_assign_range[i - 1][1]:
start, end = feeder_assign_range[i][0], feeder_assign_range[i - 1][1]
rec_x = [start, end, end, start]
rec_y = [slotf1_pos.y - 40, slotf1_pos.y - 40, slotf1_pos.y + 10, slotf1_pos.y + 10]
plt.fill(rec_x, rec_y, facecolor='red')
plt.plot([slotf1_pos.x - slot_intv / 2, slotf1_pos.x + slot_intv * (slot_num // 2 - 1 + 0.5)],
[slotf1_pos.y + 10, slotf1_pos.y + 10], color='black')
plt.plot([slotf1_pos.x - slot_intv / 2, slotf1_pos.x + slot_intv * (slot_num // 2 - 1 + 0.5)],
[slotf1_pos.y - 40, slotf1_pos.y - 40], color='black')
for counter in range(slot_num // 2 + 1):
pos = slotf1_pos.x + (counter - 0.5) * slot_intv
plt.plot([pos, pos], [slotf1_pos.y + 10, slotf1_pos.y - 40], color='black', linewidth=1)
plt.ylim(-10, 100)
plt.show()
def feeder_base_scan(self, feeder_data):
feeder_assign_check = set()
for _, feeder in feeder_data.iterrows():
feeder_assign_check.add(feeder.part)
part_index, part_points = defaultdict(int), defaultdict(int)
for idx, data in self.part_data.iterrows():
part_index[data.part] = idx
for _, data in self.step_data.iterrows():
part_points[part_index[data.part]] += 1
# assert len(feeder_assign_check) == len(part_points.values()) - list(part_points.values()).count(0) # 所有供料器均已分配槽位
mount_center_slot = defaultdict(float)
for _, data in self.step_data.iterrows():
idx = part_index[data.part]
mount_center_slot[idx] += (data.x - mount_center_slot[idx])
for idx, pos in mount_center_slot.items():
mount_center_slot[idx] = (pos / part_points[idx] + self.config.stopper_pos.x -
self.config.slotf1_pos.x) / self.config.slot_intv + 1
head_num, slot_num = self.config.head_num, self.config.slot_num
intv_ratio = round(self.config.head_intv / self.config.slot_intv)
feeder_part = [-1] * slot_num
for _, data in feeder_data.iterrows():
part_index = self.part_data[self.part_data.part == data.part].index.tolist()
if len(part_index) != 1:
print('unregistered component: ', data.part, ' in slot', data.slot)
continue
part_index = part_index[0]
feeder_part[data.slot] = part_index
part_result, cycle_result, slot_result = [], [], [] # 贴装点索引和拾取槽位优化结果
sum_nozzle_points, nozzle_pattern = -1, None
for slot in range(slot_num // 2 - (head_num - 1) * intv_ratio):
cur_nozzle_points, cur_nozzle_pattern = 0, ['' for _ in range(head_num)]
for head in range(head_num):
if (part := feeder_part[slot + head * intv_ratio]) == -1:
continue
cur_nozzle_pattern[head] = self.part_data.loc[part].nz
cur_nozzle_points += part_points[part]
if cur_nozzle_points > sum_nozzle_points:
sum_nozzle_points = cur_nozzle_points
nozzle_pattern = cur_nozzle_pattern
nozzle_mode, nozzle_mode_cycle = [nozzle_pattern], [0] # 吸嘴匹配模式
value_increment_base = 0
while True:
# === 周期内循环 ===
assigned_part = [-1 for _ in range(head_num)] # 当前扫描到的头分配元件信息
assigned_cycle = [0 for _ in range(head_num)] # 当前扫描到的元件最大分配次数
assigned_slot = [-1 for _ in range(head_num)] # 当前扫描到的供料器分配信息
best_assigned_eval_func = -float('inf')
nozzle_insert_cycle = 0
for cycle_index, nozzle_cycle in enumerate(nozzle_mode):
scan_eval_func_list = [] # 若干次扫描得到的最优解
# nozzle_cycle 吸嘴模式下,已扫描到的最优结果
cur_scan_part = [-1 for _ in range(head_num)]
cur_scan_cycle = [0 for _ in range(head_num)]
cur_scan_slot = [-1 for _ in range(head_num)]
cur_nozzle_limit = copy.deepcopy(nozzle_limit)
while True:
best_scan_part = [-1 for _ in range(head_num)]
best_scan_cycle = [0 for _ in range(head_num)]
best_scan_slot = [-1 for _ in range(head_num)]
best_scan_nozzle_limit = copy.deepcopy(cur_nozzle_limit)
scan_eval_func, search_break = -float('inf'), True
# 前供料器基座扫描
for slot in range(1, slot_num // 2 - (head_num - 1) * intv_ratio + 1):
if sum(feeder_part[slot: slot + head_num * intv_ratio: intv_ratio]) == -head_num:
continue
scan_cycle, scan_part, scan_slot = cur_scan_cycle.copy(), cur_scan_part.copy(), cur_scan_slot.copy()
scan_nozzle_limit = copy.deepcopy(cur_nozzle_limit)
# 预扫描确定各类型元件拾取数目(前瞻)
preview_scan_part = defaultdict(int)
for head in range(head_num):
part = feeder_part[slot + head * intv_ratio]
# 贴装头和拾取槽位满足对应关系
if scan_part[head] == -1 and part != -1 and part_points[part] > 0 and scan_part.count(
part) < part_points[part]:
preview_scan_part[part] += 1
part_counter = 0
for head in range(head_num):
part = feeder_part[slot + head * intv_ratio]
# 1.匹配条件满足: 贴装头和拾取槽位满足对应关系
if scan_part[head] == -1 and part != -1 and part_points[part] > 0 and scan_part.count(
part) < part_points[part]:
# 2.匹配条件满足:不超过可用吸嘴数的限制
nozzle = self.part_data.loc[part].nz
if scan_nozzle_limit[nozzle] <= 0:
continue
# 3.增量条件满足: 引入新的元件类型不会使代价函数的值减少(前瞻)
if scan_cycle.count(0) == head_num:
gang_pick_change = part_points[part]
else:
prev_cycle = min(filter(lambda x: x > 0, scan_cycle))
# 同时拾取数的提升
gang_pick_change = min(prev_cycle, part_points[part] // preview_scan_part[part])
# 4.拾取移动距离条件满足: 邻近元件进行同时抓取,降低移动路径长度
# reference_slot = -1
# for head_, slot_ in enumerate(scan_slot):
# if slot_ != -1:
# reference_slot = slot_ - head_ * intv_ratio
# if reference_slot != -1 and abs(reference_slot - slot) > (head_num - 1) * intv_ratio:
# continue
# 5.同时拾取的增量 和 吸嘴更换次数比较
prev_nozzle_change = 0
if cycle_index + 1 < len(nozzle_mode):
prev_nozzle_change = 2 * (nozzle_cycle[head] != nozzle_mode[cycle_index + 1][head])
# 避免首个周期吸杆占用率低的问题
nozzle_change = 2 * (nozzle != nozzle_cycle[head])
if cycle_index + 1 < len(nozzle_mode):
nozzle_change += 2 * (nozzle != nozzle_mode[cycle_index + 1][head])
nozzle_change -= prev_nozzle_change
val = self.e_gang_pick * gang_pick_change - self.e_nz_change * nozzle_change
if val < value_increment_base:
continue
part_counter += 1
scan_part[head] = part
scan_cycle[head] = part_points[part] // preview_scan_part[part]
scan_slot[head] = slot + head * intv_ratio
scan_nozzle_limit[nozzle] -= 1
nozzle_counter = 0 # 吸嘴更换次数
# 上一周期
for head, nozzle in enumerate(nozzle_cycle):
if scan_part[head] == -1:
continue
if self.part_data.loc[scan_part[head]].nz != nozzle and nozzle != '':
nozzle_counter += 2
# 下一周期(额外增加的吸嘴更换次数)
if cycle_index + 1 < len(nozzle_mode):
for head, nozzle in enumerate(nozzle_mode[cycle_index + 1]):
if scan_part[head] == -1:
continue
prev_counter, new_counter = 0, 0
if nozzle_cycle[head] != nozzle and nozzle_cycle[head] != '' and nozzle != '':
prev_counter += 2
if self.part_data.loc[scan_part[head]].nz != nozzle and nozzle != '':
new_counter += 2
nozzle_counter += new_counter - prev_counter
else:
for head, nozzle in enumerate(nozzle_mode[0]):
if scan_part[head] == -1:
continue
prev_counter, new_counter = 0, 0
if nozzle_cycle[head] != nozzle and nozzle_cycle[head] != '' and nozzle != '':
prev_counter += 2
if self.part_data.loc[scan_part[head]].nz != nozzle and nozzle != '':
new_counter += 2
nozzle_counter += new_counter - prev_counter
if part_counter == 0: # 当前情形下未扫描到任何元件
continue
search_break = False
scan_part_head = defaultdict(list)
for head, part in enumerate(scan_part):
if part == -1:
continue
scan_part_head[part].append(head)
for part, heads in scan_part_head.items():
part_cycle = part_points[part] // len(heads)
for head in heads:
scan_cycle[head] = part_cycle
# 计算扫描后的代价函数,记录扫描后的最优解
# 短期收益
cycle = min(filter(lambda x: x > 0, scan_cycle))
gang_pick_counter, gang_pick_slot_set = 0, set()
for head, pick_slot in enumerate(scan_slot):
gang_pick_slot_set.add(pick_slot - head * intv_ratio)
eval_func_short_term = self.e_gang_pick * (head_num - scan_slot.count(-1) - len(
gang_pick_slot_set)) * cycle - self.e_nz_change * nozzle_counter
# 长期收益
gang_pick_slot_dict = defaultdict(list)
for head, pick_slot in enumerate(scan_slot):
gang_pick_slot_dict[pick_slot - head * intv_ratio].append(scan_cycle[head])
eval_func_long_term = 0
for pick_cycle in gang_pick_slot_dict.values():
while pick_cycle:
min_cycle = min(pick_cycle)
eval_func_long_term += self.e_gang_pick * (len(pick_cycle) - 1) * min(pick_cycle)
pick_cycle = list(map(lambda c: c - min_cycle, pick_cycle))
pick_cycle = list(filter(lambda c: c > 0, pick_cycle))
eval_func_long_term -= self.e_nz_change * nozzle_counter
# 拾取过程中的移动路径
pick_slot_set = set()
for head, pick_slot in enumerate(scan_slot):
if pick_slot == -1:
continue
pick_slot_set.add(pick_slot - head * intv_ratio)
slot_offset = 0
for head, part in enumerate(scan_part):
if part == -1:
continue
slot_offset += abs(scan_slot[head] - mount_center_slot[part])
ratio = 0.5
eval_func = (1 - ratio) * eval_func_short_term + ratio * eval_func_long_term - 1e-5 * (
max(pick_slot_set) - min(pick_slot_set)) - 1e-5 * slot_offset
if eval_func >= scan_eval_func:
scan_eval_func = eval_func
best_scan_part, best_scan_cycle = scan_part.copy(), scan_cycle.copy()
best_scan_slot = scan_slot.copy()
best_scan_nozzle_limit = copy.deepcopy(scan_nozzle_limit)
if search_break:
break
scan_eval_func_list.append(scan_eval_func)
cur_scan_part = best_scan_part.copy()
cur_scan_slot = best_scan_slot.copy()
cur_scan_cycle = best_scan_cycle.copy()
cur_nozzle_limit = copy.deepcopy(best_scan_nozzle_limit)
if len(scan_eval_func_list) and sum(scan_eval_func_list) > best_assigned_eval_func:
best_assigned_eval_func = sum(scan_eval_func_list)
assigned_part = cur_scan_part.copy()
assigned_slot = cur_scan_slot.copy()
assigned_cycle = cur_scan_cycle.copy()
nozzle_insert_cycle = cycle_index
# 从供料器基座中移除对应数量的贴装点
nonzero_cycle = [cycle for cycle in assigned_cycle if cycle > 0]
if not nonzero_cycle:
value_increment_base -= head_num
continue
for head, slot in enumerate(assigned_slot):
if assigned_part[head] == -1:
continue
part_points[feeder_part[slot]] -= min(nonzero_cycle)
insert_cycle = sum([nozzle_mode_cycle[c] for c in range(nozzle_insert_cycle + 1)])
part_result.insert(insert_cycle, assigned_part)
cycle_result.insert(insert_cycle, min(nonzero_cycle))
slot_result.insert(insert_cycle, assigned_slot)
# 更新吸嘴匹配模式
cycle_nozzle = nozzle_mode[nozzle_insert_cycle].copy()
for head, part in enumerate(assigned_part):
if part == -1:
continue
cycle_nozzle[head] = self.part_data.loc[part].nz
if cycle_nozzle == nozzle_mode[nozzle_insert_cycle]:
nozzle_mode_cycle[nozzle_insert_cycle] += 1
elif nozzle_insert_cycle + 1 < len(nozzle_mode) and cycle_nozzle == nozzle_mode[nozzle_insert_cycle + 1]:
nozzle_mode_cycle[nozzle_insert_cycle + 1] += 1
else:
nozzle_mode.insert(nozzle_insert_cycle + 1, cycle_nozzle)
nozzle_mode_cycle.insert(nozzle_insert_cycle + 1, 1)
if sum(part_points.values()) == 0:
break
return part_result, cycle_result, slot_result