Files
multi-model-optimizer/opt/hyper_heuristic.py
2025-11-14 11:34:48 +08:00

540 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from opt.predictor import NeuralPredictor
from opt.utils import *
from core.interface import *
from core.common import *
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
class Heuristic:
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
return -1
class LeastPoints(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_points = [], []
for index in config.keys():
if len(cp_assign[index]) == 0:
return index
machine_index.append(index)
machine_points.append(sum([cp_points[cp_idx] for cp_idx in cp_assign[index]]))
return machine_index[np.argmin(machine_points)]
class LeastNzTypes(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_nozzle = [], []
for index in config.keys():
if len(cp_assign[index]) == 0:
return index
machine_index.append(index)
machine_nozzle.append([cp_nozzle[cp_idx] for cp_idx in cp_assign[index]])
index = np.argmin(
[len(set(nozzle)) + 1e-5 * sum(cp_points[c] for c in cp_assign[machine_idx]) for machine_idx, nozzle in
enumerate(machine_nozzle)])
return machine_index[index]
class LeastCpTypes(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_types = [], []
for index in config.keys():
machine_index.append(index)
machine_types.append(len(cp_assign[index]) + 1e-5 * sum(cp_points[cp] for cp in cp_assign[index]))
return machine_index[np.argmin(machine_types)]
class LeastCpNzRatio(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_nz_type, machine_cp_type = [], [], []
for index in config.keys():
if len(cp_assign[index]) == 0:
return index
machine_index.append(index)
machine_nz_type.append(set(cp_nozzle[cp_idx] for cp_idx in cp_assign[index]))
machine_cp_type.append(len(cp_assign[index]))
min_idx = np.argmin([(machine_cp_type[idx] + 1e-5 * sum(
cp_points[c] for c in cp_assign[machine_index[idx]])) / (len(machine_nz_type[idx]) + 1e-5) for idx in
range(len(machine_index))])
return machine_index[min_idx]
def nozzle_assignment(cp_points, cp_nozzle, cp_assign, head_num):
nozzle_points = defaultdict(int)
for cp_idx in cp_assign:
nozzle_points[cp_nozzle[cp_idx]] += cp_points[cp_idx]
while len(nozzle_points.keys()) > head_num:
del nozzle_points[min(nozzle_points.items(), key=lambda x: x[1])[0]]
sum_points = sum(nozzle_points.values())
nozzle_points = defaultdict(int, {k: v for k, v in nozzle_points.items() if v / sum_points >= 0.8 / head_num})
nozzle_heads = defaultdict(int, {k: 1 for k in nozzle_points.keys()})
while sum(nozzle_heads.values()) != head_num:
max_cycle_nozzle = None
for nozzle, head_cnt in nozzle_heads.items():
if max_cycle_nozzle is None or nozzle_points[nozzle] / head_cnt > nozzle_points[max_cycle_nozzle] / \
nozzle_heads[max_cycle_nozzle]:
max_cycle_nozzle = nozzle
assert max_cycle_nozzle is not None
nozzle_heads[max_cycle_nozzle] += 1
return nozzle_heads, nozzle_points
class LeastCycle(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_cycle = [], []
for index, head_num in config.items():
assign_component = cp_assign[index]
if len(assign_component) == 0:
return index
nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component, head_num)
machine_index.append(index)
machine_cycle.append(
max(nozzle_points[nozzle] / head for nozzle, head in nozzle_heads.items()) + 1e-5 * sum(
cp_points[c] for c in cp_assign[index]))
return machine_index[np.argmin(machine_cycle)]
class LeastNzChange(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_nozzle_change = [], []
for index, head_num in config.items():
assign_component = cp_assign[index]
if len(assign_component) == 0:
return index
heads_points = []
nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component, head_num)
for nozzle, head in nozzle_heads.items():
for _ in range(head):
heads_points.append(nozzle_points[nozzle] / nozzle_heads[nozzle])
machine_index.append(index)
machine_nozzle_change.append(np.std(heads_points) + 1e-5 * sum(cp_points[c] for c in cp_assign[index]))
return machine_index[np.argmin(machine_nozzle_change)]
class LeastPickup(Heuristic):
@staticmethod
def apply(cp_points, cp_nozzle, cp_assign, config: defaultdict[int]):
machine_index, machine_pick_up = [], []
for index, head_num in config.items():
assign_component = cp_assign[index]
if len(assign_component) == 0:
return index
nozzle_heads, nozzle_points = nozzle_assignment(cp_points, cp_nozzle, assign_component, head_num)
nozzle_level, nozzle_counter = defaultdict(int), defaultdict(int)
level_points = defaultdict(int)
for cp_idx in sorted(assign_component, key=lambda x: cp_points[x], reverse=True):
nozzle, points = cp_nozzle[cp_idx], cp_points[cp_idx]
if nozzle not in nozzle_heads.keys():
continue
if nozzle_counter[nozzle] and nozzle_counter[nozzle] % nozzle_heads[nozzle] == 0:
nozzle_level[nozzle] += 1
level = nozzle_level[nozzle]
level_points[level] = max(level_points[level], points)
nozzle_counter[nozzle] += 1
machine_index.append(index)
machine_pick_up.append(sum(points for points in level_points.values()) + 1e-5 * sum(
cp_points[idx] for idx in cp_assign[index]))
return machine_index[np.argmin(machine_pick_up)]
class HyperHeuristicOpt(BaseOpt):
def __init__(self, machine_num, part_data, step_data, feeder_data=None):
super().__init__(None, part_data, step_data, feeder_data)
self.line_config = [MachineConfig() for _ in range(machine_num)]
# self.base_opt = FeederPriorityOpt
self.base_opt = CellDivisionOpt
self.heuristic_map = {
'p': LeastPoints,
'n': LeastNzTypes,
'c': LeastCpTypes,
'r': LeastCpNzRatio,
'k': LeastCycle,
'g': LeastNzChange,
'u': LeastPickup,
}
self.machine_num = machine_num
self.predictor = NeuralPredictor()
self.cp_feeders = defaultdict(int)
self.cp_nozzle = defaultdict(str)
self.cp_points = defaultdict(int)
self.cp_index = defaultdict(int)
part_points = defaultdict(int)
for _, data in self.step_data.iterrows():
part_points[data.part] += 1
division_part = []
for _, data in self.part_data.iterrows():
division_part.extend([part_points[data.part] / data.fdn for _ in range(data.fdn)])
division_points = sum(division_part) / len(division_part)
idx = 0
for cp_idx, data in self.part_data.iterrows():
self.cp_feeders[cp_idx] = 1
division_data = copy.deepcopy(data)
division_data['points'] = part_points[data.part]
feeder_limit, total_points = division_data.fdn, division_data.points
if feeder_limit != 1:
feeder_limit = round(min(max(total_points // division_points * 1.5, feeder_limit), total_points))
# feeder_limit = total_points # С<><D0A1>ģ<EFBFBD><C4A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
surplus_points = total_points % feeder_limit
for _ in range(feeder_limit):
division_data.fdn, division_data.points = 1, math.floor(total_points / feeder_limit)
if surplus_points:
division_data.points += 1
surplus_points -= 1
self.cp_points[idx], self.cp_nozzle[idx] = division_data.points, division_data.nz
self.cp_index[idx] = cp_idx
idx += 1
self.board_width = self.step_data['x'].max() - self.step_data['x'].min()
self.board_height = self.step_data['y'].max() - self.step_data['y'].min()
def generate_pattern(self):
"""
Generates a random pattern.
:return: The generated pattern string.
"""
return "".join([random.choice(list(self.heuristic_map.keys()))
for _ in range(random.randrange(1, len(self.cp_points)))])
def convertor(self, component_list, individual):
component_num = len(self.cp_feeders.keys())
cp_assign = [[] for _ in range(self.machine_num)]
component_machine_assign = [[0 for _ in range(self.machine_num)] for _ in range(component_num)]
machine_assign_counter = [0 for _ in range(self.machine_num)]
for idx, div_cp_idx in enumerate(component_list):
h = individual[idx % len(individual)]
cp_idx = self.cp_index[div_cp_idx]
if self.cp_points[cp_idx] == 0:
continue
machine_config = defaultdict(int) # <20>ɱ<EFBFBD><C9B1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļ<EFBFBD><C4BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>-<2D><>Ƭͷ<C6AC><CDB7>
if sum(component_machine_assign[cp_idx][:]) < self.cp_feeders[cp_idx]:
for machine_index in range(self.machine_num):
if component_machine_assign[cp_idx][machine_index] or machine_assign_counter[machine_index] < \
self.predictor.max_placement_points:
machine_config[machine_index] = self.line_config[machine_index].head_num
machine_index = self.heuristic_map[h].apply(self.cp_points, self.cp_nozzle, cp_assign, machine_config)
else:
for machine_index in range(self.machine_num):
if component_machine_assign[cp_idx][machine_index]:
machine_config[machine_index] = self.line_config[machine_index].head_num
machine_index = self.heuristic_map[h].apply(self.cp_points, self.cp_nozzle, cp_assign, machine_config)
cp_assign[machine_index].append(div_cp_idx)
if component_machine_assign[cp_idx][machine_index] == 0:
machine_assign_counter[machine_index] += 1
component_machine_assign[cp_idx][machine_index] = 1
return cp_assign
def crossover(self, parent1, parent2):
"""
Attempt to perform crossover between two chromosomes.
:param parent1: The first parent.
:param parent2: The second parent.
:return: The two individuals after crossover has been performed.
"""
point1, point2 = random.randrange(len(parent1)), random.randrange(len(parent2))
substr1, substr2 = parent1[point1:], parent2[point2:]
offspring1, offspring2 = "".join((parent1[:point1], substr2)), "".join((parent2[:point2], substr1))
return offspring1[:len(self.cp_points)], offspring2[:len(self.cp_points)]
def mutation(self, individual):
"""
Attempts to mutate the individual by replacing a random heuristic in the chromosome by a generated pattern.
:param individual: The individual to mutate.
:return: The mutated individual.
"""
pattern = list(individual)
mutation_point = random.randrange(len(pattern))
pattern[mutation_point] = self.generate_pattern()
return ''.join(pattern)[:len(self.cp_points)]
def initialize(self, population_size):
return [self.generate_pattern() for _ in range(population_size)]
def cal_ind_val(self, component_list, individual):
machine_cp_assign = self.convertor(component_list, individual)
component_number = len(self.cp_feeders)
machine_cp_points = [[0 for _ in range(component_number)] for _ in range(self.machine_num)]
for machine_idx in range(self.machine_num):
for idx in machine_cp_assign[machine_idx]:
machine_cp_points[machine_idx][self.cp_index[idx]] += self.cp_points[idx]
machine_cp_feeders = [[0 for _ in range(component_number)] for _ in range(self.machine_num)]
for cp_idx in range(component_number):
if self.cp_points[cp_idx] == 0:
continue
feeder_nums = self.cp_feeders[cp_idx]
for machine_idx in range(self.machine_num):
if machine_cp_points[machine_idx][cp_idx]:
machine_cp_feeders[machine_idx][cp_idx] = 1
feeder_nums -= 1
while feeder_nums > 0:
assign_machine = None
for machine_idx in range(self.machine_num):
if machine_cp_points[machine_idx][cp_idx] == 0:
continue
if assign_machine is None:
assign_machine = machine_idx
continue
if machine_cp_points[assign_machine][cp_idx] / machine_cp_feeders[assign_machine][cp_idx] \
< machine_cp_points[machine_idx][cp_idx] / machine_cp_feeders[machine_idx][cp_idx]:
assign_machine = machine_idx
machine_cp_feeders[assign_machine][cp_idx] += 1
feeder_nums -= 1
nozzle_type = defaultdict(str)
for idx, cp_idx in self.cp_index.items():
nozzle_type[cp_idx] = self.cp_nozzle[idx]
obj = []
for machine_idx in range(self.machine_num):
div_cp_points, div_cp_nozzle = defaultdict(int), defaultdict(str)
idx = 0
for cp_idx in range(component_number):
total_points = machine_cp_points[machine_idx][cp_idx]
if total_points == 0:
continue
div_index = 0
div_points = [total_points // machine_cp_feeders[machine_idx][cp_idx] for _ in
range(machine_cp_feeders[machine_idx][cp_idx])]
while sum(div_points) < total_points:
div_points[div_index] += 1
div_index += 1
for points in div_points:
div_cp_points[idx] = points
div_cp_nozzle[idx] = nozzle_type[cp_idx]
idx += 1
obj.append(self.predictor.eval(div_cp_points, div_cp_nozzle,
self.board_width, self.board_height, self.line_config[machine_idx]))
return obj
def evaluate(self, assignment):
partial_step_data, partial_part_data = defaultdict(pd.DataFrame), defaultdict(pd.DataFrame)
for machine_index in range(self.machine_num):
partial_step_data[machine_index] = pd.DataFrame(columns=self.step_data.columns)
partial_part_data[machine_index] = self.part_data.copy(deep=True)
partial_part_data[machine_index]['points'] = 0
# averagely assign available feeder
for part_index, data in self.part_data.iterrows():
feeder_limit = data.fdn
feeder_points = [assignment[machine_index][part_index] for machine_index in range(self.machine_num)]
if sum(feeder_points) == 0:
continue
for machine_index in range(self.machine_num):
partial_part_data[machine_index].loc[part_index, 'points'] = 0
for machine_index in range(self.machine_num):
if feeder_points[machine_index] == 0:
continue
partial_part_data[machine_index].loc[part_index, 'fdn'] = 1
feeder_limit -= 1
while feeder_limit:
assign_machine = None
for machine_index in range(self.machine_num):
if feeder_limit <= 0:
break
if feeder_points[machine_index] == 0:
continue
if assign_machine is None or feeder_points[machine_index] / \
partial_part_data[machine_index].loc[part_index].fdn > feeder_points[
assign_machine] / partial_part_data[assign_machine].loc[part_index].fdn:
assign_machine = machine_index
assert assign_machine is not None
partial_part_data[assign_machine].loc[part_index, 'fdn'] += 1
feeder_limit -= 1
for machine_index in range(self.machine_num):
if feeder_points[machine_index] > 0:
assert partial_part_data[machine_index].loc[part_index].fdn > 0 # assignment[machine_index][part_index]
# === assign placements ===
part2idx = defaultdict(int)
for idx, data in self.part_data.iterrows():
part2idx[data.part] = idx
machine_average_pos = [[0, 0] for _ in range(self.machine_num)]
machine_step_counter = [0 for _ in range(self.machine_num)]
part_step_data = defaultdict(list)
for _, data in self.step_data.iterrows():
part_step_data[part2idx[data.part]].append(data)
multiple_component_index = []
for part_index in range(len(self.part_data)):
machine_assign_set = []
for machine_index in range(self.machine_num):
if assignment[machine_index][part_index]:
machine_assign_set.append(machine_index)
if len(machine_assign_set) == 1:
for data in part_step_data[part_index]:
machine_index = machine_assign_set[0]
machine_average_pos[machine_index][0] += data.x
machine_average_pos[machine_index][1] += data.y
machine_step_counter[machine_index] += 1
partial_part_data[machine_index].loc[part_index, 'points'] += 1
partial_step_data[machine_index] = pd.concat(
[partial_step_data[machine_index], pd.DataFrame(data).T])
elif len(machine_assign_set) > 1:
multiple_component_index.append(part_index)
for machine_index in range(self.machine_num):
if machine_step_counter[machine_index] == 0:
continue
machine_average_pos[machine_index][0] /= machine_step_counter[machine_index]
machine_average_pos[machine_index][1] /= machine_step_counter[machine_index]
for part_index in multiple_component_index:
for data in part_step_data[part_index]:
idx = -1
min_dist = None
for machine_index in range(self.machine_num):
if partial_part_data[machine_index].loc[part_index, 'points'] >= assignment[machine_index][part_index]:
continue
dist = (data.x - machine_average_pos[machine_index][0]) ** 2 + (
data.y - machine_average_pos[machine_index][1]) ** 2
if min_dist is None or dist < min_dist:
min_dist, idx = dist, machine_index
assert idx >= 0
machine_step_counter[idx] += 1
machine_average_pos[idx][0] += (1 - 1 / machine_step_counter[idx]) * machine_average_pos[idx][0] \
+ data.x / machine_step_counter[idx]
machine_average_pos[idx][1] += (1 - 1 / machine_step_counter[idx]) * machine_average_pos[idx][1] \
+ data.y / machine_step_counter[idx]
partial_part_data[idx].loc[part_index, 'points'] += 1
partial_step_data[idx] = pd.concat([partial_step_data[idx], pd.DataFrame(data).T])
obj, result = [], []
for machine_index in range(self.machine_num):
rows = partial_part_data[machine_index]['points'] != 0
partial_part_data[machine_index] = partial_part_data[machine_index][rows]
opt = self.base_opt(self.line_config[machine_index], partial_part_data[machine_index],
partial_step_data[machine_index])
opt.optimize(hinter=False)
info = evaluation(self.line_config[machine_index], partial_part_data[machine_index],
partial_step_data[machine_index], opt.result)
obj.append(info.total_time)
result.append(opt.result)
return max(obj), result
def optimize(self):
# genetic-based hyper-heuristic
crossover_rate, mutation_rate = 0.6, 0.1
population_size, total_generation = 20, 50
group_size = 10
best_val = np.inf
component_list = list(range(len(self.cp_points)))
with tqdm(total=total_generation * group_size) as pbar:
pbar.set_description('hyper-heuristic algorithm process for PCB assembly line balance')
for _ in range(group_size):
random.shuffle(component_list)
new_population = []
population = self.initialize(population_size)
# calculate fitness value
pop_val = [max(self.cal_ind_val(component_list, individual)) for individual in population]
for _ in range(total_generation):
population += new_population
for individual in new_population:
pop_val.append(max(self.cal_ind_val(component_list, individual)))
select_index = GenOpe.get_top_kth(pop_val, population_size, reverse=False)
population = [population[idx] for idx in select_index]
pop_val = [pop_val[idx] for idx in select_index]
# min-max convert
max_val = max(pop_val)
sel_pop_val = list(map(lambda v: max_val - v, pop_val))
sum_pop_val = sum(sel_pop_val) + 1e-10
sel_pop_val = [v / sum_pop_val + 1e-3 for v in sel_pop_val]
# crossover and mutation
new_population = []
for pop in range(population_size):
if pop % 2 == 0 and np.random.random() < crossover_rate:
index1 = GenOpe.roulette_wheel_selection(sel_pop_val)
while True:
index2 = GenOpe.roulette_wheel_selection(sel_pop_val)
if index1 != index2:
break
offspring1, offspring2 = self.crossover(population[index1], population[index2])
if np.random.random() < mutation_rate:
offspring1 = self.mutation(offspring1)
if np.random.random() < mutation_rate:
offspring2 = self.mutation(offspring2)
new_population.append(offspring1)
new_population.append(offspring2)
pbar.update(1)
machine_assign = self.convertor(component_list, population[0])
assignment_result = [[0 for _ in range(len(self.part_data))] for _ in range(self.machine_num)]
for machine_idx in range(self.machine_num):
for idx in machine_assign[machine_idx]:
assignment_result[machine_idx][self.cp_index[idx]] += self.cp_points[idx]
val, res = self.evaluate(assignment_result)
if best_val is None or val < best_val:
best_val = val
self.result = res