Files
smt-optimizer/base_optimizer/optimizer_common.py

859 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from functools import wraps
from collections import defaultdict
from tqdm import tqdm
from gurobipy import *
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
import os
import time
import math
import random
import copy
import torch
import torch.nn
import argparse
import joblib
import pickle
import warnings
import heapq
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import traceback
matplotlib.use('TkAgg')
# 机器参数
max_head_index, max_slot_index = 6, 120
interval_ratio = 2
slot_interval = 15
head_interval = slot_interval * interval_ratio
head_nozzle = ['' for _ in range(max_head_index)] # 头上已经分配吸嘴
# 位置信息
slotf1_pos, slotr1_pos = [-31.267, 44.], [807., 810.545] # F1(前基座最左侧)、R1(后基座最右侧)位置
fix_camera_pos = [269.531, 694.823] # 固定相机位置
anc_marker_pos = [336.457, 626.230] # ANC基准点位置
stopper_pos = [665.150, 124.738] # 止档块位置
# 算法权重参数
e_nz_change, e_gang_pick = 4, 0.6
# 电机参数
head_rotary_velocity = 8e-5 # 贴装头R轴旋转时间
x_max_velocity, y_max_velocity = 1.4, 1.2
x_max_acceleration, y_max_acceleration = x_max_velocity / 0.079, y_max_velocity / 0.079
# TODO: 不同种类供料器宽度
# feeder_width = {'SM8': (7.25, 7.25), 'SM12': (7.25, 7.25), 'SM16': (7.25, 7.25),
# 'SM24': (7.25, 7.25), 'SM32': (7.25, 7.25)}
feeder_width = {'SM8': (7.25, 7.25), 'SM12': (7.00, 20.00), 'SM16': (7.00, 22.00),
'SM24': (7.00, 29.00), 'SM32': (7.00, 44.00)}
# 可用吸嘴数量限制
nozzle_limit = {'CN065': 6, 'CN040': 6, 'CN020': 6, 'CN400': 6, 'CN140': 6}
# 时间参数
t_cycle = 0.3
t_anc = 0.6
t_pick, t_place = .078, .051 # 贴装/拾取用时
t_nozzle_put, t_nozzle_pick = 0.9, 0.75 # 装卸吸嘴用时
t_nozzle_change = t_nozzle_put + t_nozzle_pick
t_fix_camera_check = 0.12 # 固定相机检测时间
# 时间参数(整线相关)
T_pp, T_tr, T_nc, T_pl = 2, 5, 25, 0
# 时间参数 (数据拟合获得)
Fit_cy, Fit_nz, Fit_pu, Fit_pl, Fit_mv = 0.326, 0.870, 0.159, 0.041, 0.001
class OptResult:
def __init__(self, cp_assign=None, cycle_assign=None, slot_assign=None, place_assign=None, sequence_assign=None):
self.component_assign = [] if cp_assign is None else cp_assign
self.cycle_assign = [] if cycle_assign is None else cycle_assign
self.feeder_slot_assign = [] if slot_assign is None else slot_assign
self.placement_assign = [] if place_assign is None else place_assign
self.head_sequence = [] if sequence_assign is None else sequence_assign
class OptInfo:
def __init__(self):
self.total_time = .0 # 总组装时间
self.total_points = 0 # 总贴装点数
self.total_components = 0 # 总元件数
self.pickup_time = .0 # 拾取过程运动时间
self.round_time = .0 # 往返基座/基板运动时间
self.place_time = .0 # 贴装过程运动时间
self.operation_time = .0 # 拾取/贴装/换吸嘴等机械动作用时
self.cycle_counter = 0 # 周期数
self.nozzle_change_counter = 0 # 吸嘴更换次数
self.anc_round_counter = 0 # 前往ANC次数
self.pickup_counter = 0 # 拾取次数
self.total_distance = .0 # 总移动路径
self.place_distance = .0 # 贴装移动路径
self.pickup_distance = .0 # 拾取移动路径
def print(self):
print('-Cycle counter: {}'.format(self.cycle_counter))
print(f'-Nozzle change counter: {self.nozzle_change_counter: d}')
print(f'-ANC round: {self.anc_round_counter: d}')
print(f'-Pick operation counter: {self.pickup_counter: d}')
print(f'-Pick time: {self.pickup_time: .3f}, Pick distance: {self.pickup_distance: .3f}')
print(f'-Place time: {self.place_time: .3f}, Place distance: {self.place_distance: .3f}')
print(
f'-Round time: {self.total_time - self.place_time - self.place_time: .3f}, Round distance: '
f'{self.total_distance - self.pickup_distance - self.place_distance: .3f}')
minutes, seconds = int(self.total_time // 60), int(self.total_time) % 60
millisecond = int((self.total_time - minutes * 60 - seconds) * 60)
print(f'-Operation time: {self.operation_time: .3f}, ', end='')
if minutes > 0:
print(f'Total time: {minutes: d} min {seconds} s {millisecond: 2d} ms ({self.total_time: .3f}s)')
else:
print(f'Total time: {seconds} s {millisecond :2d} ms ({self.total_time :.3f}s)')
def metric(self):
return Fit_cy * self.cycle_counter + Fit_nz * self.nozzle_change_counter + Fit_pu * self.pickup_counter + \
Fit_pl * self.total_points + Fit_mv * self.pickup_distance
def axis_moving_time(distance, axis=0):
distance = abs(distance) * 1e-3
Lamax = x_max_velocity ** 2 / x_max_acceleration if axis == 0 else y_max_velocity ** 2 / y_max_acceleration
Tmax = x_max_velocity / x_max_acceleration if axis == 0 else y_max_velocity / y_max_acceleration
if axis == 0:
return 2 * math.sqrt(distance / x_max_acceleration) if distance < Lamax else 2 * Tmax + (
distance - Lamax) / x_max_velocity
else:
return 2 * math.sqrt(distance / y_max_acceleration) if distance < Lamax else 2 * Tmax + (
distance - Lamax) / y_max_velocity
def head_rotary_time(angle):
while -180 > angle > 180:
if angle > 180:
angle -= 360
else:
angle += 360
return abs(angle) * head_rotary_velocity
def find_commonpart(head_group, feeder_group):
feeder_group_len = len(feeder_group)
max_length, max_common_part = -1, []
for offset in range(-max_head_index + 1, feeder_group_len - 1):
# offset: head_group相对于feeder_group的偏移量
length, common_part = 0, []
for hd_index in range(max_head_index):
fd_index = hd_index + offset
if fd_index < 0 or fd_index >= feeder_group_len:
common_part.append(-1)
continue
if head_group[hd_index] == feeder_group[fd_index] and head_group[hd_index] != -1:
length += 1
common_part.append(head_group[hd_index])
else:
common_part.append(-1)
if length > max_length:
max_length = length
max_common_part = common_part
return max_common_part
def timer_wrapper(func):
@wraps(func)
def measure_time(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
hinter = True
for key, val in kwargs.items():
if key == 'hinter':
hinter = val
if hinter:
print(f"function {func.__name__} running time : {time.time() - start_time:.3f} s")
return result
return measure_time
def feeder_assignment(component_data, pcb_data, component_result, cycle_result):
# Section: 供料器分配结果
feeder_slot_result, feeder_group_result = [], []
feeder_limit = defaultdict(int)
for component in range(len(component_data)):
feeder_limit[component] = component_data.loc[component].fdn
for component_cycle in component_result:
new_feeder_group = []
for component in component_cycle:
if component == -1 or feeder_limit[component] == 0 or new_feeder_group.count(component) >= feeder_limit[
component]:
new_feeder_group.append(-1)
else:
new_feeder_group.append(component)
if len(new_feeder_group) == 0:
continue
while sum(i >= 0 for i in new_feeder_group) != 0:
max_common_part, index = [], -1
max_common_length = -1
for feeder_index in range(len(feeder_group_result)):
common_part = find_commonpart(new_feeder_group, feeder_group_result[feeder_index])
if sum(i > 0 for i in common_part) > max_common_length:
max_common_length = sum(i > 0 for i in common_part)
max_common_part, index = common_part, feeder_index
new_feeder_length = 0
for feeder in new_feeder_group:
if feeder != -1 and feeder_limit[feeder] > 0:
new_feeder_length += 1
if new_feeder_length > max_common_length:
# 新分配供料器
feeder_group_result.append([])
for feeder_index in range(len(new_feeder_group)):
feeder = new_feeder_group[feeder_index]
if feeder != -1 and feeder_limit[feeder] > 0:
feeder_group_result[-1].append(feeder)
new_feeder_group[feeder_index] = -1
feeder_limit[feeder] -= 1
else:
feeder_group_result[-1].append(-1)
else:
# 使用旧供料器
for feeder_index, feeder_part in enumerate(max_common_part):
if feeder_part != -1:
new_feeder_group[feeder_index] = -1
# 去除多余的元素
for feeder_group in feeder_group_result:
while len(feeder_group) > 0 and feeder_group[0] == -1:
feeder_group.pop(0)
while len(feeder_group) > 0 and feeder_group[-1] == -1:
feeder_group.pop(-1)
# 确定供料器组的安装位置
point_num = len(pcb_data)
component_pos = [[] for _ in range(len(component_data))]
for point_cnt in range(point_num):
part = pcb_data.loc[point_cnt, 'part']
index = np.where(component_data['part'].values == part)[0]
component_pos[index[0]].append(pcb_data.loc[point_cnt, 'x'] + stopper_pos[0])
# 元件使用的头
CT_Head = defaultdict(list)
for component_cycle in component_result:
for head, component in enumerate(component_cycle):
if component == -1:
continue
if component not in CT_Head:
CT_Head[component] = [head, head]
CT_Head[component][0] = min(CT_Head[component][0], head)
CT_Head[component][1] = max(CT_Head[component][1], head)
# 供料器组分配的优先顺序
feeder_assign_sequence = []
for i in range(len(feeder_group_result)):
for j in range(len(feeder_group_result)):
if j in feeder_assign_sequence:
continue
if len(feeder_assign_sequence) == i:
feeder_assign_sequence.append(j)
else:
seq = feeder_assign_sequence[-1]
if cycle_result[seq] * len([k for k in feeder_group_result[seq] if k >= 0]) < cycle_result[j] * len(
[k for k in feeder_group_result[seq] if k >= 0]):
feeder_assign_sequence.pop(-1)
feeder_assign_sequence.append(j)
# TODO: 暂未考虑机械限位
feeder_group_slot = [-1] * len(feeder_group_result)
feeder_lane_state = [0] * max_slot_index # 0表示空1表示已占有
for index in feeder_assign_sequence:
feeder_group = feeder_group_result[index]
best_slot = []
for cp_index, component in enumerate(feeder_group):
if component == -1:
continue
best_slot.append(round((sum(component_pos[component]) / len(component_pos[component]) - slotf1_pos[
0]) / slot_interval) + 1 - cp_index * interval_ratio)
best_slot = round(sum(best_slot) / len(best_slot))
search_dir, step = 0, 0 # dir: 1-向右, 0-向左
left_out_range, right_out_range = False, False
while True:
assign_slot = best_slot + step if search_dir else best_slot - step
# 出现越界,反向搜索
if assign_slot + (len(feeder_group) - 1) * interval_ratio >= max_slot_index / 2:
right_out_range = True
search_dir = 0
step += 1
elif assign_slot < 0:
left_out_range = True
search_dir = 1
step += 1
else:
if left_out_range or right_out_range:
step += 1 # 单向搜索
else:
search_dir = 1 - search_dir # 双向搜索
if search_dir == 0:
step += 1
assign_available = True
# === 分配对应槽位 ===
for slot in range(assign_slot, assign_slot + interval_ratio * len(feeder_group), interval_ratio):
feeder_index = int((slot - assign_slot) / interval_ratio)
pick_part = feeder_group[feeder_index]
if feeder_lane_state[slot] == 1 and pick_part != -1:
assign_available = False
break
if pick_part != -1 and (slot - CT_Head[pick_part][0] * interval_ratio <= 0 or
slot + (max_head_index - CT_Head[pick_part][1] - 1) * interval_ratio > max_slot_index // 2):
assign_available = False
break
if assign_available:
for idx, part in enumerate(feeder_group):
if part != -1:
feeder_lane_state[assign_slot + idx * interval_ratio] = 1
feeder_group_slot[index] = assign_slot
break
if feeder_group_slot[index] == -1:
raise Exception('feeder assign error!')
# 按照最大匹配原则,确定各元件周期拾取槽位
for component_cycle in component_result:
feeder_slot_result.append([-1] * max_head_index)
head_index = [head for head, component in enumerate(component_cycle) if component >= 0]
while head_index:
max_overlap_counter = 0
overlap_feeder_group_index, overlap_feeder_group_offset = -1, -1
for feeder_group_idx, feeder_group in enumerate(feeder_group_result):
# offset 头1 相对于 供料器组第一个元件的偏移量
for offset in range(-max_head_index + 1, max_head_index + len(feeder_group)):
overlap_counter = 0
for head in head_index:
if 0 <= head + offset < len(feeder_group) and component_cycle[head] == \
feeder_group[head + offset]:
overlap_counter += 1
if overlap_counter > max_overlap_counter:
max_overlap_counter = overlap_counter
overlap_feeder_group_index, overlap_feeder_group_offset = feeder_group_idx, offset
feeder_group = feeder_group_result[overlap_feeder_group_index]
head_index_cpy = copy.deepcopy(head_index)
for idx, head in enumerate(head_index_cpy):
if 0 <= head + overlap_feeder_group_offset < len(feeder_group) and component_cycle[head] == \
feeder_group[head + overlap_feeder_group_offset]:
feeder_slot_result[-1][head] = feeder_group_slot[overlap_feeder_group_index] + interval_ratio * (
head + overlap_feeder_group_offset)
head_index.remove(head)
return feeder_slot_result
def optimal_nozzle_assignment(component_data, pcb_data):
# === Nozzle Assignment ===
# number of points for nozzle & number of heads for nozzle
nozzle_points, nozzle_assigned_counter = defaultdict(int), defaultdict(int)
if len(pcb_data) == 0:
return nozzle_assigned_counter
for _, step in pcb_data.iterrows():
part = step['part']
idx = component_data[component_data['part'] == part].index.tolist()[0]
nozzle = component_data.loc[idx]['nz']
nozzle_assigned_counter[nozzle] = 0
nozzle_points[nozzle] += 1
assert len(nozzle_points.keys()) <= max_head_index
total_points, available_head = len(pcb_data), max_head_index
# S1: set of nozzle types which are sufficient to assign one nozzle to the heads
# S2: temporary nozzle set
# S3: set of nozzle types which already have the maximum reasonable nozzle amounts.
S1, S2, S3 = [], [], []
for nozzle in nozzle_points.keys(): # Phase 1
if nozzle_points[nozzle] * max_head_index < total_points:
nozzle_assigned_counter[nozzle] = 1
available_head -= 1
total_points -= nozzle_points[nozzle]
S1.append(nozzle)
else:
S2.append(nozzle)
available_head_ = available_head # Phase 2
for nozzle in S2:
nozzle_assigned_counter[nozzle] = math.floor(available_head * nozzle_points[nozzle] / total_points)
available_head_ = available_head_ - nozzle_assigned_counter[nozzle]
S2.sort(key=lambda x: nozzle_points[x] / (nozzle_assigned_counter[x] + 1e-10), reverse=True)
while available_head_ > 0:
nozzle = S2[0]
nozzle_assigned_counter[nozzle] += 1
S2.remove(nozzle)
S3.append(nozzle)
available_head_ -= 1
phase_iteration = len(S2) - 1
while phase_iteration > 0: # Phase 3
nozzle_i_val, nozzle_j_val = 0, 0
nozzle_i, nozzle_j = None, None
for nozzle in S2:
if nozzle_i is None or nozzle_points[nozzle] / nozzle_assigned_counter[nozzle] > nozzle_i_val:
nozzle_i_val = nozzle_points[nozzle] / nozzle_assigned_counter[nozzle]
nozzle_i = nozzle
if nozzle_assigned_counter[nozzle] > 1:
if nozzle_j is None or nozzle_points[nozzle] / (nozzle_assigned_counter[nozzle] - 1) < nozzle_j_val:
nozzle_j_val = nozzle_points[nozzle] / (nozzle_assigned_counter[nozzle] - 1)
nozzle_j = nozzle
if nozzle_i and nozzle_j and nozzle_points[nozzle_j] / (nozzle_assigned_counter[nozzle_j] - 1) < \
nozzle_points[nozzle_i] / nozzle_assigned_counter[nozzle_i]:
nozzle_assigned_counter[nozzle_j] -= 1
nozzle_assigned_counter[nozzle_i] += 1
S2.remove(nozzle_i)
S3.append(nozzle_i)
else:
break
return nozzle_assigned_counter
# === 遗传算法公用函数 ===
def sigma_scaling(pop_val, c: float):
# function: f' = max(f - (avg(f) - c · sigma(f), 0)
avg_val = sum(pop_val) / len(pop_val)
sigma_val = math.sqrt(sum(abs(v - avg_val) for v in pop_val) / len(pop_val))
for idx, val in enumerate(pop_val):
pop_val[idx] = max(val - (avg_val - c * sigma_val), 0)
return pop_val
def directed_edge_recombination_crossover(c, individual1, individual2):
assert len(individual1) == len(individual2)
left_edge_list, right_edge_list = defaultdict(list), defaultdict(list)
for index in range(len(individual1) - 1):
elem1, elem2 = individual1[index], individual1[index + 1]
right_edge_list[elem1].append(elem2)
left_edge_list[elem2].append(elem1)
for index in range(len(individual2) - 1):
elem1, elem2 = individual2[index], individual2[index + 1]
right_edge_list[elem1].append(elem2)
left_edge_list[elem2].append(elem1)
offspring = []
while len(offspring) != len(individual1):
while True:
center_element = np.random.choice(individual1)
if center_element not in offspring: # 避免重复选取
break
direction, candidate = 1, [center_element]
parent = center_element
for edge_list in left_edge_list.values():
while parent in edge_list:
edge_list.remove(parent)
for edge_list in right_edge_list.values():
while parent in edge_list:
edge_list.remove(parent)
while True:
max_len, max_len_neighbor = -1, 0
if direction == 1:
if len(right_edge_list[parent]) == 0:
direction, parent = -1, center_element
continue
for neighbor in right_edge_list[parent]:
if max_len < len(right_edge_list[neighbor]):
max_len_neighbor = neighbor
max_len = len(right_edge_list[neighbor])
candidate.append(max_len_neighbor)
parent = max_len_neighbor
elif direction == -1:
if len(left_edge_list[parent]) == 0:
direction, parent = 0, center_element
continue
for neighbor in left_edge_list[parent]:
if max_len < len(left_edge_list[neighbor]):
max_len_neighbor = neighbor
max_len = len(left_edge_list[neighbor])
candidate.insert(0, max_len_neighbor)
parent = max_len_neighbor
else:
break
# 移除重复元素
for edge_list in left_edge_list.values():
while max_len_neighbor in edge_list:
edge_list.remove(max_len_neighbor)
for edge_list in right_edge_list.values():
while max_len_neighbor in edge_list:
edge_list.remove(max_len_neighbor)
offspring += candidate
return offspring
def partially_mapped_crossover(parent1, parent2):
range_ = np.random.randint(0, len(parent1), 2) # 前闭后开
range_ = sorted(range_)
parent1_cpy, parent2_cpy = [-1 for _ in range(len(parent1))], [-1 for _ in range(len(parent2))]
parent1_cpy[range_[0]: range_[1] + 1] = copy.deepcopy(parent2[range_[0]: range_[1] + 1])
parent2_cpy[range_[0]: range_[1] + 1] = copy.deepcopy(parent1[range_[0]: range_[1] + 1])
for index in range(len(parent1)):
if range_[0] <= index <= range_[1]:
continue
cur_ptr, cur_elem = 0, parent1[index]
while True:
parent1_cpy[index] = cur_elem
if parent1_cpy.count(cur_elem) == 1:
break
parent1_cpy[index] = -1
if cur_ptr == 0:
cur_ptr, cur_elem = 1, parent2[index]
else:
index_ = parent1_cpy.index(cur_elem)
cur_elem = parent2[index_]
for index in range(len(parent2)):
if range_[0] <= index <= range_[1]:
continue
cur_ptr, cur_elem = 0, parent2[index]
while True:
parent2_cpy[index] = cur_elem
if parent2_cpy.count(cur_elem) == 1:
break
parent2_cpy[index] = -1
if cur_ptr == 0:
cur_ptr, cur_elem = 1, parent1[index]
else:
index_ = parent2_cpy.index(cur_elem)
cur_elem = parent1[index_]
return parent1_cpy, parent2_cpy
def cycle_crossover(parent1, parent2):
offspring1, offspring2 = [-1 for _ in range(len(parent1))], [-1 for _ in range(len(parent2))]
idx = 0
while True:
if offspring1[idx] != -1:
break
offspring1[idx] = parent1[idx]
idx = parent1.index(parent2[idx])
for idx, gene in enumerate(offspring1):
if gene == -1:
offspring1[idx] = parent2[idx]
idx = 0
while True:
if offspring2[idx] != -1:
break
offspring2[idx] = parent2[idx]
idx = parent2.index(parent1[idx])
for idx, gene in enumerate(offspring2):
if gene == -1:
offspring2[idx] = parent1[idx]
return offspring1, offspring2
def swap_mutation(parent):
range_ = np.random.randint(0, len(parent), 2)
parent[range_[0]], parent[range_[1]] = parent[range_[1]], parent[range_[0]]
return parent
def constraint_swap_mutation(component_points, individual, machine_number):
offspring = individual.copy()
idx, component_index = 0, random.randint(0, len(component_points) - 1)
for points in component_points.values():
if component_index == 0:
while True:
index1, index2 = random.sample(range(points + machine_number - 1), 2)
if offspring[idx + index1] != offspring[idx + index2]:
break
clip = offspring[idx: idx + points + machine_number - 1].copy()
avl_machine = 0
for idx_, gene in enumerate(clip):
if gene == 0 and (idx_ == 0 or clip[idx_ - 1] != 0):
avl_machine += 1
clip[index1], clip[index2] = clip[index2], clip[index1]
for idx_, gene in enumerate(clip):
if gene == 0 and (idx_ == 0 or clip[idx_ - 1] != 0):
avl_machine -= 1
if avl_machine != 0:
return offspring
offspring[idx + index1], offspring[idx + index2] = offspring[idx + index2], offspring[idx + index1]
break
component_index -= 1
idx += (points + machine_number - 1)
return offspring
def random_selective(data, possibility): # 依概率选择随机数
assert len(data) == len(possibility) and len(data) > 0
sum_val = sum(possibility)
possibility = [p / sum_val for p in possibility]
random_val = random.random()
idx = 0
for idx, val in enumerate(possibility):
random_val -= val
if random_val <= 0:
break
return data[idx]
def insert_mutation(parent):
pos, val = np.random.randint(0, len(parent), 1), parent[-1]
parent[pos: len(parent) - 1] = parent[pos + 1:]
parent[pos] = val
return parent
def roulette_wheel_selection(pop_eval):
# Roulette wheel
random_val = np.random.random() * sum(pop_eval)
for idx, val in enumerate(pop_eval):
random_val -= val
if random_val <= 0:
return idx
return len(pop_eval) - 1
def get_top_k_value(pop_val, k: int, reverse=True):
res = []
pop_val_cpy = copy.deepcopy(pop_val)
pop_val_cpy.sort(reverse=reverse)
for i in range(min(len(pop_val_cpy), k)):
for j in range(len(pop_val)):
if abs(pop_val_cpy[i] - pop_val[j]) < 1e-9 and j not in res:
res.append(j)
break
return res
def get_line_config_number(machine_number, component_number):
div_counter = 0
div_set = set()
for div1 in range(component_number - 2):
for div2 in range(div1 + 1, component_number - 1):
machine_div = [div1 + 1, div2 - div1, component_number - div2 - 1]
machine_div.sort()
div_str = "".join(str(s) + '|' for s in machine_div)
if div_str in div_set:
continue
div_set.add(div_str)
assign_component_counter = defaultdict(list)
for div in machine_div:
assign_component_counter[div] += 1
case_div_counter, case_comp_number = 1, component_number
for idx in range(machine_number - 1):
div = 1
while machine_div[idx]:
div *= (case_comp_number / machine_div[idx])
case_comp_number -= 1
machine_div[idx] -= 1
case_div_counter *= div
for key, val in assign_component_counter.values():
div = 1
while val:
div *= val
val -= 1
case_div_counter /= div
div_counter += case_div_counter
return div_counter
def convert_line_assigment(pcb_data, component_data, assignment_result):
machine_number = len(assignment_result)
placement_points = []
partial_pcb_data, partial_component_data = defaultdict(pd.DataFrame), defaultdict(pd.DataFrame)
for machine_index in range(machine_number):
if pcb_data is not None:
partial_pcb_data[machine_index] = pd.DataFrame(columns=pcb_data.columns)
partial_component_data[machine_index] = component_data.copy(deep=True)
placement_points.append(sum(assignment_result[machine_index]))
if pcb_data is not None:
assert sum(placement_points) == len(pcb_data)
# === averagely assign available feeder ===
for part_index, data in component_data.iterrows():
feeder_limit = data.fdn
feeder_points = [assignment_result[machine_index][part_index] for machine_index in range(machine_number)]
for machine_index in range(machine_number):
if pcb_data is None:
partial_component_data[machine_index].loc[part_index, 'points'] = assignment_result[machine_index][
part_index]
else:
partial_component_data[machine_index].loc[part_index, 'points'] = 0
for machine_index in range(machine_number):
if feeder_points[machine_index] == 0:
continue
partial_component_data[machine_index].loc[part_index].fdn = 1
feeder_limit -= 1
while feeder_limit:
assign_machine = None
for machine_index in range(machine_number):
if feeder_limit <= 0:
break
if feeder_points[machine_index] == 0:
continue
if assign_machine is None or feeder_points[machine_index] / \
partial_component_data[machine_index].loc[part_index].fdn > feeder_points[
assign_machine] / partial_component_data[assign_machine].loc[part_index].fdn:
assign_machine = machine_index
assert assign_machine is not None
partial_component_data[assign_machine].loc[part_index, 'fdn'] += 1
feeder_limit -= 1
for machine_index in range(machine_number):
if feeder_points[machine_index] > 0:
assert partial_component_data[machine_index].loc[part_index].fdn > 0
# === assign placements ===
if pcb_data is not None:
part2idx = defaultdict(int)
for idx, data in component_data.iterrows():
part2idx[data.part] = idx
machine_average_pos = [[0, 0] for _ in range(machine_number)]
machine_step_counter = [0 for _ in range(machine_number)]
part_pcb_data = defaultdict(list)
for _, data in pcb_data.iterrows():
part_pcb_data[part2idx[data.part]].append(data)
multiple_component_index = []
for part_index in range(len(component_data)):
machine_assign_set = []
for machine_index in range(machine_number):
if assignment_result[machine_index][part_index]:
machine_assign_set.append(machine_index)
if len(machine_assign_set) == 1:
for data in part_pcb_data[part_index]:
machine_index = machine_assign_set[0]
machine_average_pos[machine_index][0] += data.x
machine_average_pos[machine_index][1] += data.y
machine_step_counter[machine_index] += 1
partial_component_data[machine_index].loc[part_index, 'points'] += 1
partial_pcb_data[machine_index] = pd.concat([partial_pcb_data[machine_index], pd.DataFrame(data).T])
elif len(machine_assign_set) > 1:
multiple_component_index.append(part_index)
for machine_index in range(machine_number):
if machine_step_counter[machine_index] == 0:
continue
machine_average_pos[machine_index][0] /= machine_step_counter[machine_index]
machine_average_pos[machine_index][1] /= machine_step_counter[machine_index]
for part_index in multiple_component_index:
for data in part_pcb_data[part_index]:
idx = -1
min_dist = None
for machine_index in range(machine_number):
if partial_component_data[machine_index].loc[part_index, 'points'] >= \
assignment_result[machine_index][part_index]:
continue
dist = (data.x - machine_average_pos[machine_index][0]) ** 2 + (
data.y - machine_average_pos[machine_index][1]) ** 2
if min_dist is None or dist < min_dist:
min_dist, idx = dist, machine_index
assert idx >= 0
machine_step_counter[idx] += 1
machine_average_pos[idx][0] += (1 - 1 / machine_step_counter[idx]) * machine_average_pos[idx][0] \
+ data.x / machine_step_counter[idx]
machine_average_pos[idx][1] += (1 - 1 / machine_step_counter[idx]) * machine_average_pos[idx][1] \
+ data.y / machine_step_counter[idx]
partial_component_data[idx].loc[part_index, 'points'] += 1
partial_pcb_data[idx] = pd.concat([partial_pcb_data[idx], pd.DataFrame(data).T])
for machine_index in range(machine_number):
partial_component_data[machine_index] = partial_component_data[machine_index][
partial_component_data[machine_index]['points'] != 0].reset_index(drop=True)
return partial_pcb_data, partial_component_data
def random_division(num, div):
assert num >= div
res = [1 for _ in range(num)]
while sum(res) < num:
pos = random.randint(0, num - 1)
val = random.randint(1, num - sum(res))
res[pos] = val
return res
def list_range(start, end=None):
return list(range(start)) if end is None else list(range(start, end))