Files
multi-model-optimizer/opt/smm/hybrid_genetic.py
2025-11-14 11:34:48 +08:00

536 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from opt.smm.basis import *
from opt.utils import *
class HybridGeneticOpt(BaseOpt):
def __init__(self, config, part_data, step_data, feeder_data=None):
super().__init__(config, part_data, step_data, feeder_data)
self.part_nozzle = defaultdict(str)
self.part_point_pos = defaultdict(list)
self.designated_nozzle = [''] * self.config.head_num
self.designated_slot = [None] * self.config.slot_num
self.pickup_group = [] # pair_group: pickups from same initial group
self.pickup_group_cycle = []
self.pair_group = []
self.feeder_part_arrange = defaultdict(list)
def pickup_group_combine(self, supply, supply_cycle, demand, demand_cycle):
combination, combination_cycle = demand.copy(), demand_cycle.copy()
supply_cpy = supply.copy()
while True:
supply_cpy_bits = self.config.head_num - supply_cpy.count(None)
if supply_cpy_bits == 0:
break
max_match_offset, max_match_counter = 0, 0
supply_cpy_index = [idx for idx, part in enumerate(supply_cpy) if part] # 加快搜索速度
for offset in range(-supply_cpy_index[-1], self.config.head_num - supply_cpy_index[0]):
match_counter = 0
for idx, part in enumerate(supply_cpy):
if 0 <= idx + offset < self.config.head_num:
if part is None:
continue
if combination[idx + offset] is None and self.designated_nozzle[idx + offset] == self.designated_nozzle[idx]:
match_counter += 1
if match_counter > max_match_counter:
max_match_counter = match_counter
max_match_offset = offset
if match_counter == supply_cpy_bits:
break
for idx, part in enumerate(supply_cpy):
if 0 <= idx + max_match_offset < self.config.head_num:
if part is None:
continue
if demand[idx + max_match_offset] is None:
combination[idx + max_match_offset] = part
combination_cycle[idx + max_match_offset] = supply_cycle[idx]
supply_cpy[idx] = None
if max_match_counter == 0:
break
return combination, combination_cycle
def cal_ind_val(self, individual):
place_time, pick_time = 0.234, 0.4
x_moving_speed, y_moving_speed = 300, 300 # mm/s
prev_pair_index = None
sequenced_pickup_group, sequenced_pickup_cycle = [], []
for gene in individual:
pickup = self.pickup_group[gene]
pair_index = None
for idx, pair in enumerate(self.pair_group):
if gene in pair:
pair_index = idx
break
if pair_index is not None and pair_index == prev_pair_index:
for idx, component in enumerate(pickup):
sequenced_pickup_group[-1][idx] = component
else:
sequenced_pickup_group.append(pickup.copy())
sequenced_pickup_cycle.append(self.pickup_group_cycle[gene])
V = [float('inf') for _ in range(len(sequenced_pickup_group) + 1)] # Node Value
V[0] = 0
V_SNode = [-1 for _ in range(len(sequenced_pickup_group) + 1)]
nozzle_assigned_heads = defaultdict(int)
for nozzle in self.designated_nozzle:
nozzle_assigned_heads[nozzle] += 1
pickup_result, pickup_cycle_result = [[] for _ in range(len(V))], [[] for _ in range(len(V))]
component_point_index = defaultdict(int)
intv_ratio = self.config.head_intv // self.config.head_intv
for i in range(1, len(V)):
cost, t0 = 0, 0
load = defaultdict(int)
Pd, Pd_cycle = [None for _ in range(self.config.head_num)], [0 for _ in range(self.config.head_num)] # demand pickup
j = i
while j < len(V):
Ps, Ps_cycle = sequenced_pickup_group[j - 1], [sequenced_pickup_cycle[j - 1] for _ in
range(self.config.head_num)] # supply pickup and its cycle
for part in Ps:
if part:
load[self.part_nozzle[part]] += 1
is_combinable = True
for nozzle, counter in load.items():
if counter > nozzle_assigned_heads[nozzle]:
is_combinable = False
if is_combinable:
cost = cost - t0
# combine sequenced pickup ρb and ps into ρu(union pickup)
Pu, Pu_cycle = self.pickup_group_combine(Ps, Ps_cycle, Pd, Pd_cycle)
# decide the placement cluster and sequencing of pickup ρu
pickup_action_counter, place_action_counter = 0, self.config.head_num - Pu.count(None)
right_most_slot, left_most_slot = 0, self.config.slot_num // 2 # most left and right pickup slot
# === TODO: 机械限位、后槽位分配未处理 ===
for head in range(self.config.head_num):
if not Pu[head]:
continue
assert Pu[head] in self.feeder_part_arrange.keys()
for slot in self.feeder_part_arrange[Pu[head]]:
left_most_slot = min(slot - head * intv_ratio, left_most_slot)
right_most_slot = max(slot - head * intv_ratio, right_most_slot)
# calculate forward, backward, pick and place traveling time
t_FW, t_BW, t_PL, t_PU = 0, 0, 0, 0
cycle = 0
while cycle < max(Pu_cycle):
mount_points = []
for head, part in enumerate(Pu):
if part is None or cycle > Pu_cycle[head]:
continue
idx = component_point_index[part]
mount_points.append(Point(self.part_point_pos[part][idx].x - head * self.config.head_intv
+ self.config.stopper_pos.x,
self.part_point_pos[part][idx].y + self.config.stopper_pos.y))
assert len(mount_points) > 0
# calculate cycle moving distance
mount_points.sort(key=lambda p: p.x)
slotf1_pos = self.config.slotf1_pos
t_FW += max(
abs(slotf1_pos.x + (left_most_slot - 1) * self.config.slot_intv - mount_points[0].x) / x_moving_speed,
abs(slotf1_pos.y - mount_points[0].y) / y_moving_speed)
t_BW += max(
abs(slotf1_pos.x + (right_most_slot - 1) * self.config.slot_intv - mount_points[-1].x) / x_moving_speed,
abs(slotf1_pos.y - mount_points[-1].y) / y_moving_speed)
# pick up moving time
t_PU += (right_most_slot - left_most_slot) * self.config.slot_intv / x_moving_speed
# place moving time
for idx_points in range(len(mount_points) - 1):
t_PL += max(abs(mount_points[idx_points].x - mount_points[idx_points + 1].x) / x_moving_speed,
abs(mount_points[idx_points].y - mount_points[idx_points + 1].y) / y_moving_speed)
cycle += 1
t0 = t_FW + (t_PL + place_action_counter * place_time) + t_BW
cost += (t_PU + pickup_action_counter * pick_time) + t0
if V[i - 1] + cost < V[j]:
pickup_result[j], pickup_cycle_result[j] = Pu, Pu_cycle
V_SNode[j] = i - 1
V[j] = V[i - 1] + cost
Pd, Pd_cycle = Pu, Pu_cycle
j += 1
else:
break
node = len(V) - 1
while True:
prev_node = V_SNode[node]
if prev_node == -1:
break
for k in range(prev_node + 1, node):
pickup_result[k], pickup_cycle_result[k] = [], []
node = prev_node
return V[-1], pickup_result, pickup_cycle_result
def convertor(self, individual):
part_result, cycle_result, feeder_slot_result = [], [], []
# initial result
_, pickup_result, pickup_cycle_result = self.cal_ind_val(individual)
for idx, pickup in enumerate(pickup_result):
while pickup and max(pickup_cycle_result[idx]) != 0:
cycle = min([cycle_ for cycle_ in pickup_cycle_result[idx] if cycle_ > 0])
feeder_part_arrange_index = defaultdict(int)
part_result.append([-1 for _ in range(self.config.head_num)])
feeder_slot_result.append([-1 for _ in range(self.config.head_num)])
cycle_result.append(cycle)
for head, part in enumerate(pickup):
if part is None or pickup_cycle_result[idx][head] == 0:
continue
part_result[-1][head] = self.part_data[self.part_data['part'] == part].index.tolist()[0]
feeder_slot_result[-1][head] = self.feeder_part_arrange[part][feeder_part_arrange_index[part]]
feeder_part_arrange_index[part] += 1
if feeder_part_arrange_index[part] >= len(self.feeder_part_arrange[part]):
feeder_part_arrange_index[part] = 0
pickup_cycle_result[idx][head] -= cycle
return part_result, cycle_result, feeder_slot_result
def optimal_nozzle_assignment(self):
if len(self.step_data) == 0:
return defaultdict(int)
# === Nozzle Assignment ===
# number of points for nozzle & number of heads for nozzle
nozzle_points, nozzle_assigned_counter = defaultdict(int), defaultdict(int)
for _, data in self.step_data.iterrows():
idx = self.part_data[self.part_data['part'] == data['part']].index.tolist()[0]
nozzle = self.part_data.loc[idx]['nz']
nozzle_assigned_counter[nozzle] = 0
nozzle_points[nozzle] += 1
assert len(nozzle_points.keys()) <= self.config.head_num
total_points, available_head = len(self.step_data), self.config.head_num
# S1: set of nozzle types which are sufficient to assign one nozzle to the heads
# S2: temporary nozzle set
# S3: set of nozzle types which already have the maximum reasonable nozzle amounts.
S1, S2, S3 = [], [], []
for nozzle in nozzle_points.keys(): # Phase 1
if nozzle_points[nozzle] * self.config.head_num < total_points:
nozzle_assigned_counter[nozzle] = 1
available_head -= 1
total_points -= nozzle_points[nozzle]
S1.append(nozzle)
else:
S2.append(nozzle)
available_head_ = available_head # Phase 2
for nozzle in S2:
nozzle_assigned_counter[nozzle] = math.floor(available_head * nozzle_points[nozzle] / total_points)
available_head_ = available_head_ - nozzle_assigned_counter[nozzle]
S2.sort(key=lambda x: nozzle_points[x] / (nozzle_assigned_counter[x] + 1e-10), reverse=True)
while available_head_ > 0:
nozzle = S2[0]
nozzle_assigned_counter[nozzle] += 1
S2.remove(nozzle)
S3.append(nozzle)
available_head_ -= 1
phase_iteration = len(S2) - 1
while phase_iteration > 0: # Phase 3
nozzle_i_val, nozzle_j_val = 0, 0
nozzle_i, nozzle_j = None, None
for nozzle in S2:
if nozzle_i is None or nozzle_points[nozzle] / nozzle_assigned_counter[nozzle] > nozzle_i_val:
nozzle_i_val = nozzle_points[nozzle] / nozzle_assigned_counter[nozzle]
nozzle_i = nozzle
if nozzle_assigned_counter[nozzle] > 1:
if nozzle_j is None or nozzle_points[nozzle] / (nozzle_assigned_counter[nozzle] - 1) < nozzle_j_val:
nozzle_j_val = nozzle_points[nozzle] / (nozzle_assigned_counter[nozzle] - 1)
nozzle_j = nozzle
if nozzle_i and nozzle_j and nozzle_points[nozzle_j] / (nozzle_assigned_counter[nozzle_j] - 1) < \
nozzle_points[nozzle_i] / nozzle_assigned_counter[nozzle_i]:
nozzle_assigned_counter[nozzle_j] -= 1
nozzle_assigned_counter[nozzle_i] += 1
S2.remove(nozzle_i)
S3.append(nozzle_i)
else:
break
return nozzle_assigned_counter
@timer_wrapper
def optimize(self):
nozzle_assigned_counter = self.optimal_nozzle_assignment()
# nozzle assignment result:
self.designated_nozzle = [''] * self.config.head_num
head_index = 0
for nozzle, num in nozzle_assigned_counter.items():
while num > 0:
self.designated_nozzle[head_index] = nozzle
head_index += 1
num -= 1
# === component assignment ===
part_points, nozzle_components = defaultdict(int), defaultdict(list) # 元件贴装点数,吸嘴-元件对应关系
component_feeder_limit, component_divided_points = defaultdict(int), defaultdict(list)
for _, data in self.step_data.iterrows():
part = data['part']
idx = self.part_data[self.part_data['part'] == part].index.tolist()[0]
nozzle = self.part_data.loc[idx]['nz']
component_feeder_limit[part] = self.part_data.loc[idx].fdn
part_points[part] += 1
if nozzle_components[nozzle].count(part) < component_feeder_limit[part]:
nozzle_components[nozzle].append(part)
for part, feeder_limit in component_feeder_limit.items():
for _ in range(feeder_limit):
component_divided_points[part].append(part_points[part] // feeder_limit)
for part, divided_points in component_divided_points.items():
index = 0
while sum(divided_points) < part_points[part]:
divided_points[index] += 1
index += 1
CT_Group, CT_Points = [], [] # CT: Component Type
while sum(len(nozzle_components[nozzle]) for nozzle in nozzle_components.keys()) != 0:
CT_Group.append([None for _ in range(self.config.head_num)])
CT_Points.append([0 for _ in range(self.config.head_num)])
for head_index in range(self.config.head_num):
nozzle = self.designated_nozzle[head_index] # 分配的吸嘴
if len(nozzle_components[nozzle]) == 0: # 无可用元件
continue
max_points, designated_part = 0, None
for part in nozzle_components[nozzle]:
if part_points[part] > max_points:
max_points = part_points[part]
designated_part = part
part_points[designated_part] -= component_divided_points[designated_part][-1]
CT_Group[-1][head_index] = designated_part
CT_Points[-1][head_index] = component_divided_points[designated_part][-1]
component_divided_points[designated_part].pop()
nozzle_components[nozzle].remove(designated_part)
# === assign CT group to feeder slot ===
for _, data in self.step_data.iterrows():
self.part_point_pos[data.part].append(Point(data.x + self.config.stopper_pos.x,
data.y + self.config.stopper_pos.y))
for pos_list in self.part_point_pos.values():
pos_list.sort(key=lambda p: (p.x, p.y))
CT_Group_slot = [-1] * len(CT_Group)
feeder_lane = [None] * self.config.slot_num # 供料器基座上已分配的元件类型
CT_Head = defaultdict(list)
for pickup in CT_Group:
for head, CT in enumerate(pickup):
if CT is None:
continue
if CT not in CT_Head:
CT_Head[CT] = [head, head]
CT_Head[CT][0] = min(CT_Head[CT][0], head)
CT_Head[CT][1] = max(CT_Head[CT][1], head)
intv_ratio = self.config.head_intv // self.config.slot_intv
for CTIdx, pickup in enumerate(CT_Group):
best_slot = []
for cp_index, part in enumerate(pickup):
if part is None:
continue
best_slot.append(round((sum(p.x for p in self.part_point_pos[part]) / len(
self.part_point_pos[part]) - self.config.slotf1_pos.x) / self.config.slot_intv) + 1 - cp_index * intv_ratio)
best_slot = round(sum(best_slot) / len(best_slot))
search_dir, step = 0, 0 # dir: 1-向右, 0-向左
prev_assign_available = True
while True:
assign_slot = best_slot + step if search_dir else best_slot - step
if assign_slot + (len(pickup) - 1) * intv_ratio >= self.config.slot_num / 2 or assign_slot < 0:
if not prev_assign_available:
raise Exception('feeder assign error!')
# prev_assign_available = False
search_dir = 1 - search_dir
if search_dir == 1:
step += 1
continue
prev_assign_available = True
assign_available = True
# 分配对应槽位
for slot in range(assign_slot, assign_slot + intv_ratio * len(pickup), intv_ratio):
pickup_index = int((slot - assign_slot) / intv_ratio)
pick_part = pickup[pickup_index]
# 检查槽位占用情况
if feeder_lane[slot] and pick_part:
assign_available = False
break
# 检查机械限位冲突
if pick_part and (slot - CT_Head[pick_part][0] * intv_ratio <= 0 or slot + (
self.config.head_num - CT_Head[pick_part][1] - 1) * intv_ratio > self.config.slot_num // 2):
assign_available = False
break
if assign_available:
for idx, part in enumerate(pickup):
if part:
feeder_lane[assign_slot + idx * intv_ratio] = part
CT_Group_slot[CTIdx] = assign_slot
break
search_dir = 1 - search_dir
if search_dir == 1:
step += 1
# === Initial Pickup Group ===
initial_pickup, initial_pickup_cycle = [], []
for index, CT in enumerate(CT_Group):
while True:
if CT_Points[index].count(0) == self.config.head_num:
break
min_element = min([Points for Points in CT_Points[index] if Points > 0])
initial_pickup.append(copy.deepcopy(CT_Group[index]))
initial_pickup_cycle.append(min_element)
for head in range(self.config.head_num):
if CT_Points[index][head] >= min_element:
CT_Points[index][head] -= min_element
if CT_Points[index][head] == 0:
CT_Group[index][head] = None
# pickup partition rule
partition_probability = 0.1
for idx, Pickup in enumerate(initial_pickup):
pickup_num = len([element for element in Pickup if element is not None])
if 2 <= pickup_num <= self.config.head_num / 3 or (
self.config.head_num / 3 <= pickup_num <= self.config.head_num / 2 and np.random.rand() < partition_probability):
# partitioned into single component pickups
# or partition the potentially inefficient initial pickups with a small probability
pair_index = []
for index, CT in enumerate(Pickup):
if CT is not None:
pair_index.append(len(self.pickup_group))
self.pickup_group.append([None for _ in range(self.config.head_num)])
self.pickup_group[-1][index] = CT
self.pickup_group_cycle.append(initial_pickup_cycle[idx])
self.pair_group.append(pair_index)
else:
self.pickup_group.append(Pickup)
self.pickup_group_cycle.append(initial_pickup_cycle[idx])
# basic parameter
# crossover rate & mutation rate: 80% & 10%
# population size: 200
# the number of generation: 500
crossover_rate, mutation_rate = 0.8, 0.1
population_size, n_generations = 200, 500
# initial solution
population = []
for _ in range(population_size):
pop_permutation = list(range(len(self.pickup_group)))
np.random.shuffle(pop_permutation)
population.append(pop_permutation)
best_individual, best_pop_val = [], []
# === 记录不同元件对应的槽位 ===
self.feeder_part_arrange = defaultdict(list)
for slot in range(1, self.config.slot_num // 2 + 1):
if feeder_lane[slot]:
self.feeder_part_arrange[feeder_lane[slot]].append(slot)
# === 记录不同元件的注册吸嘴类型 ===
self.part_nozzle = defaultdict(str)
for pickup in self.pickup_group:
for part in pickup:
if part is None or part in self.part_nozzle.keys():
continue
self.part_nozzle[part] = self.part_data[self.part_data['part'] == part]['nz'].tolist()[0]
with tqdm(total=n_generations) as pbar:
pbar.set_description('hybrid genetic process')
# calculate fitness value
pop_val = [self.cal_ind_val(individual)[0] for individual in population] # val is related to assembly time
for _ in range(n_generations):
# min-max convert
max_val = 1.5 * max(pop_val)
convert_pop_val = list(map(lambda v: max_val - v, pop_val))
# crossover and mutation
c = 0
new_population, new_pop_val = [], []
for pop in range(population_size):
if pop % 2 == 0 and np.random.random() < crossover_rate:
index1, index2 = GenOpe.roulette_wheel_selection(convert_pop_val), -1
while True:
index2 = GenOpe.roulette_wheel_selection(convert_pop_val)
if index1 != index2:
break
# 两点交叉算子
offspring1 = GenOpe.directed_edge_recombine_crossover(population[index1], population[index2])
offspring2 = GenOpe.directed_edge_recombine_crossover(population[index2], population[index1])
if np.random.random() < mutation_rate:
GenOpe.swap_mutation(offspring1)
if np.random.random() < mutation_rate:
GenOpe.swap_mutation(offspring2)
new_population.append(offspring1)
new_population.append(offspring2)
new_pop_val.append(self.cal_ind_val(offspring1)[0])
new_pop_val.append(self.cal_ind_val(offspring2)[0])
# generate next generation
top_k_index = GenOpe.get_top_kth(pop_val, population_size - len(new_population), reverse=False)
for index in top_k_index:
new_population.append(population[index])
new_pop_val.append(pop_val[index])
population = new_population
pop_val = new_pop_val
pbar.update(1)
best_individual = population[np.argmin(pop_val)]
self.result.part, self.result.cycle, self.result.slot = self.convertor(best_individual)
self.result.point, self.result.sequence = self.path_planner.greedy_cluster(self.result.part, self.result.cycle,
self.result.slot)