diff --git a/opt/smm/solver.py b/opt/smm/solver.py new file mode 100644 index 0000000..5635985 --- /dev/null +++ b/opt/smm/solver.py @@ -0,0 +1,518 @@ +# -*- coding: gb2312 -*- +import copy + +import numpy as np +import time +from collections import defaultdict, deque + + +class GRB: + CONTINUOUS = 'C' # 连续变量 + INTEGER = 'I' # 整数变量 + BINARY = 'B' # 二元变量 (0-1) + + OPTIMAL = 0 + UNBOUNDED = 1 + INFEASIBLE = 2 + TIME_LIMIT = 3 + INTERRUPTED = 4 + + MINIMIZE = 0 + MAXIMIZE = 1 + + +class Simplex: + def __init__(self, obj, max_mode=False): # default is solve min LP, if want to solve max lp,should * -1 + self.mat, self.max_mode = np.array([[0] + obj]) * (-1 if max_mode else 1), max_mode + self.res = None + self.objVal = 0 + self.varVal = None + + def add_constraint(self, a, b): + self.mat = np.vstack([self.mat, [b] + a]) + + def _simplex(self, mat, B, m, n): + while mat[0, 1:].min() < -1e-10: + col = np.where(mat[0, 1:] < 0)[0][0] + 1 # use Bland's method to avoid degeneracy. + row = np.array([mat[i][0] / mat[i][col] if mat[i][col] > 0 else 0x7fffffff for i in + range(1, mat.shape[0])]).argmin() + 1 # find the theta index + if mat[row][col] <= -1e-10: + self.res = GRB.UNBOUNDED + return None # the theta is ∞, the problem is unbounded + self._pivot(mat, B, row, col) + self.res = GRB.OPTIMAL + return mat[0][0] * (1 if self.max_mode else -1), {B[i]: mat[i, 0] for i in range(1, m) if B[i] < n} + + def _pivot(self, mat, B, row, col): + mat[row] /= mat[row][col] + ids = np.arange(mat.shape[0]) != row + mat[ids] -= mat[row] * mat[ids, col:col + 1] # for each i!= row do: mat[i]= mat[i] - mat[row] * mat[i][col] + B[row] = col + + def solve(self): + m, n = self.mat.shape # m - 1 is the number slack variables we should add + temp, B = np.vstack([np.zeros((1, m - 1)), np.eye(m - 1)]), list(range(n - 1, n + m - 1)) # add diagonal array + mat = np.hstack([self.mat, temp]) # combine them! + if mat[1:, 0].min() < -1e-10: # is the initial basic solution feasible? + row = mat[1:, 0].argmin() + 1 # find the index of min b + temp, mat[0] = np.copy(mat[0]), 0 # set first row value to zero, and store the previous value + mat = np.hstack([mat, np.array([1] + [-1] * (m - 1)).reshape((-1, 1))]) + self._pivot(mat, B, row, mat.shape[1] - 1) + try: + if self._simplex(mat, B, m, n)[0] != 0: + self.res = GRB.INFEASIBLE + return None # the problem has no answer + except: + return None # the problem is unbounded + + if mat.shape[1] - 1 in B: # if the x0 in B, we should pivot it. + self._pivot(mat, B, B.index(mat.shape[1] - 1), np.where(mat[0, 1:] != 0)[0][0] + 1) + mat = np.vstack([temp, mat[1:, :-1]]) # recover the first line + for i, x in enumerate(B[1:]): + mat[0] -= mat[0, x] * mat[i + 1] + self._simplex(mat, B, m, n) + + if self.res == GRB.OPTIMAL: + self.objVal = mat[0][0] * (1 if self.max_mode else -1) + self.varVal = {B[i]: mat[i, 0] for i in range(1, m) if B[i] < n} + + +class LinearExpr: + def __init__(self, constant=0, coefficients=None): + self.constant = constant # 常数项 + self.coefficients = coefficients or {} # 变量名到系数的映射 + + @classmethod + def from_var(cls, var, coeff=1): + return cls(0, {var.name: coeff}) + + @classmethod + def from_constant(cls, value): + """从常数创建线性表达式""" + return cls(value, {}) + + def copy(self): + """创建副本""" + return LinearExpr(self.constant, self.coefficients.copy()) + + def get_coeff(self, var_name): + """直接获取变量的系数,无需递归""" + return self.coefficients.get(var_name, 0) + + def set_coeff(self, var_name, coeff): + """直接设置变量的系数""" + if coeff == 0 and var_name in self.coefficients: + del self.coefficients[var_name] + elif coeff != 0: + self.coefficients[var_name] = coeff + + def add_coeff(self, var_name, coeff): + """增加变量的系数""" + current = self.coefficients.get(var_name, 0) + new_coeff = current + coeff + self.set_coeff(var_name, new_coeff) + + def __add__(self, other): + result = self.copy() + + if isinstance(other, LinearExpr): + result.constant += other.constant + for var_name, coeff in other.coefficients.items(): + result.add_coeff(var_name, coeff) + elif isinstance(other, Var): + result.add_coeff(other.name, 1) + else: # 常数 + result.constant += other + + return result + + def __radd__(self, other): + return self.__add__(other) + + def __sub__(self, other): + return self + (-1 * other) + + def __rsub__(self, other): + return (-1 * self) + other + + def __mul__(self, other): + if not isinstance(other, (int, float)): + raise ValueError("linear expressions can only be multiplied by scalars.") + + result = LinearExpr(self.constant * other) + for var_name, coeff in self.coefficients.items(): + result.coefficients[var_name] = coeff * other + + return result + + def __rmul__(self, other): + return self.__mul__(other) + + def __neg__(self): + return -1 * self + + def __le__(self, other): + return Constraint(self, other, '<=') + + def __ge__(self, other): + return Constraint(self, other, '>=') + + def __eq__(self, other): + return Constraint(self, other, '==') + + +class Var: + def __init__(self, name, vtype, lb=None, ub=None): + self.name = name + self.lb = lb + self.ub = ub + self.idx = 0 + self.x = 0 + self.vtype = vtype + self.Start = None # todo: start solution + + def to_expr(self): + return LinearExpr.from_var(self) + + def __str__(self): + return self.name + + # 委托给线性表达式的方法 + def __add__(self, other): + return self.to_expr() + other + + def __radd__(self, other): + return other + self.to_expr() + + def __sub__(self, other): + return self.to_expr() - other + + def __rsub__(self, other): + return other - self.to_expr() + + def __mul__(self, other): + return self.to_expr() * other + + def __rmul__(self, other): + return other * self.to_expr() + + def __le__(self, other): + return self.to_expr() <= other + + def __ge__(self, other): + return self.to_expr() >= other + + def __eq__(self, other): + return self.to_expr() == other + + def __neg__(self): + return LinearExpr.from_var(self, -1) + + +class Constraint: + def __init__(self, lhs, rhs, sense): + self.lhs = self._to_linear_expr(lhs) + self.rhs = self._to_linear_expr(rhs) + self.sense = sense # '<=', '>=', '==' + + @staticmethod + def _to_linear_expr(expr): + if isinstance(expr, LinearExpr): + return expr + elif isinstance(expr, Var): + return expr.to_expr() + elif isinstance(expr, (int, float)): + return LinearExpr.from_constant(expr) + else: + raise TypeError(f"unsupported expression type: {type(expr)}") + + +# @Date : 2025/11/10 +# @Author : guangyu-lu +class Model: + def __init__(self, name=""): + self.name = name + self.variables = {} + self.constraints = [] + self.NameNumVars = defaultdict(int) + self.NumVars = 0 + + self.Status = None + self.objval = 0 + self.TimeLimit = np.inf + self.OutputFlag = True + self.modelSense = None + + def terminate(self): + pass # todo: terminate function + + def addVar(self, vtype=GRB.CONTINUOUS, lb=0, ub=np.inf, name=None): + if name: + if cnt := self.NameNumVars[name]: + var_name = f"{name}_{cnt}" + else: + self.NameNumVars.pop(name) + self.NameNumVars[name] += 1 + var_name = name + else: + cnt = self.NameNumVars["usr_name"] + var_name = f"usr_name_{cnt}" + self.NameNumVars["usr_name"] += 1 + + variable = Var(var_name, vtype) + variable.idx = self.NumVars + variable.lb = lb + variable.ub = ub + self.variables[var_name] = variable + self.NumVars += 1 + return variable + + def addVars(self, *indices, vtype, lb=0, ub=np.inf, name=None): + # todo: negative variable(s) solving + result = defaultdict(Var) + usr_name = "usr_name" if name is None else name + + from itertools import product + index_ranges = [range(dim) if isinstance(dim, int) else dim for dim in indices] + for index_tuple in product(*index_ranges): + var_name = f"{name}_{self.NameNumVars[usr_name]}" + self.NameNumVars[usr_name] += 1 + + var = self.addVar(name=var_name, lb=lb, ub=ub, vtype=vtype) + result[index_tuple if len(index_tuple) > 1 else index_tuple[0]] = var + + return result + + def addConstr(self, constraint): + """添加约束""" + self.constraints.append(constraint) + return constraint + + def addConstrs(self, constraints): + for constraint in constraints: + self.addConstr(constraint) + + def setObjective(self, expr, sense=GRB.MINIMIZE): + """设置目标函数""" + self.objective = Constraint(expr, 0, '==').lhs + self.modelSense = sense + + def optimize(self): + # todo: set start solution + obj = [0] * len(self.variables) + for var_, coeff in self.objective.coefficients.items(): + obj[self.variables[var_].idx] = coeff + + sv = Simplex(obj, max_mode=self.modelSense == GRB.MAXIMIZE) + for constr in self.constraints: + constr_expr = [0] * len(self.variables) + for var_, coeff in constr.lhs.coefficients.items(): + constr_expr[self.variables[var_].idx] += coeff + + for var_, coeff in constr.rhs.coefficients.items(): + constr_expr[self.variables[var_].idx] -= coeff + + if constr.sense == '<=': + sv.add_constraint(constr_expr, constr.rhs.constant - constr.lhs.constant + 1e-10) + elif constr.sense == ">=": + sv.add_constraint([-c for c in constr_expr], constr.lhs.constant - constr.rhs.constant + 1e-10) + else: # '==' + sv.add_constraint(constr_expr, constr.rhs.constant - constr.lhs.constant + 1e-10) + sv.add_constraint([-c for c in constr_expr], constr.lhs.constant - constr.rhs.constant + 1e-10) + + # for variable bound + for var in self.variables.values(): + if var.vtype == GRB.BINARY: + constr_expr = [0] * len(self.variables) + constr_expr[var.idx] = 1 + sv.add_constraint(constr_expr, 1) + else: + if var.ub != np.inf: + constr_expr = [0] * len(self.variables) + constr_expr[var.idx] = 1 + sv.add_constraint(constr_expr, var.ub + 1e-10) + if var.lb != 0: + constr_expr = [0] * len(self.variables) + constr_expr[var.idx] = -1 + sv.add_constraint(constr_expr, -var.lb + 1e-10) + + class Node: + def __init__(self, solver, lb=0, ub=np.inf): + self.sv = solver + self.lb = lb + self.ub = ub + + self.idx_col = {i: i + 1 for i in range(self.sv.mat.shape[1] - 1)} + self.fixed_var = dict() + + start_time = time.time() + sv.solve() + if sv.res != GRB.OPTIMAL: + self.Status = sv.res + return + + sense = 1 if sv.max_mode else -1 + global_ub, global_lower = sv.objVal * sense, -np.inf + queue = deque() + root_node = Node(sv, 0, global_ub) + incub_node, cur_node = None, root_node + + # todo: tightening constraint(s) + # todo: pre-solve remove row(s) and column(s) + while True: + if self.OutputFlag: + pass # todo: show the solving process + if time.time() - start_time > self.TimeLimit: + self.Status = GRB.TIME_LIMIT + break + if cur_node.sv.res == GRB.OPTIMAL: + if cur_node.sv.objVal * sense > global_ub + 1e-10: + continue # node prune + branch_idx = -1 + + for var in self.variables.values(): + if var.idx not in cur_node.idx_col.keys(): + continue + + if (col := cur_node.idx_col[var.idx]) not in cur_node.sv.varVal.keys(): + continue + + val = cur_node.sv.varVal[col] + if var.vtype == GRB.INTEGER and abs(round(val) - val) > 1e-8: + branch_idx = var.idx + + if cur_node.ub >= global_ub: + left_constr_expr = [0] * (cur_node.sv.mat.shape[1] - 1) + left_constr_expr[col - 1] = 1 + + left_node = copy.deepcopy(cur_node) + left_node.sv.add_constraint(left_constr_expr, int(val) + 1e-10) + + queue.append(left_node) + + right_constr_expr = [0] * (cur_node.sv.mat.shape[1] - 1) + right_constr_expr[col - 1] = -1 + + right_node = copy.deepcopy(cur_node) + right_node.sv.add_constraint(right_constr_expr, -int(val) - 1 + 1e-10) + + queue.append(right_node) + break + + elif var.vtype == GRB.BINARY and abs(val) > 1e-8 and abs(val - 1) > 1e-8: + branch_idx = var.idx + # todo: remove columns corresponding to the sum of binary variables + # remove the variable corresponding columns + for idx in cur_node.idx_col.keys(): + if idx > var.idx: + cur_node.idx_col[idx] -= 1 + cur_node.idx_col.pop(var.idx) + + left_node = copy.deepcopy(cur_node) + left_node.sv.mat = np.delete(left_node.sv.mat, col, axis=1) + left_node.fixed_var[var.idx] = 0 + queue.append(left_node) + + right_node = copy.deepcopy(cur_node) + for row in range(right_node.sv.mat.shape[0]): + right_node.sv.mat[row][0] -= right_node.sv.mat[row][col] + right_node.sv.mat = np.delete(right_node.sv.mat, col, axis=1) + right_node.fixed_var[var.idx] = 1 + queue.append(right_node) + break + + if branch_idx == -1: + cur_node.lb = cur_node.ub = cur_node.sv.objVal * sense + if cur_node.lb > global_lower: + # todo: solution pool + global_lower = cur_node.lb + incub_node = cur_node + + # todo: stop optimizer and output the incumbent result + if len(queue) == 0 or global_ub - global_lower <= 1e-8: + break + + cur_node = queue.popleft() + cur_node.sv.solve() # todo: dual simplex / interior algorithm + # todo: primal heuristic - esp. rounding and pumping + # todo: cutting plane : + # Gomory*, Cover, Implied bound, Projected implied bound, MIR, Flow cover, Zero half, RLT, Relax-and-lift BQP + + if incub_node and incub_node.sv.res == GRB.OPTIMAL: + self.objval = round(incub_node.sv.objVal * 1e8) / 1e8 + self.objective.constant + + # convert the variable result + idx2var = {var.idx: var.name for var in self.variables.values()} + for idx, val in incub_node.fixed_var.items(): + self.variables[idx2var[idx]].x = val + + col2idx = {col: idx for idx, col in incub_node.idx_col.items()} + for col, val in incub_node.sv.varVal.items(): + idx = col2idx[int(col)] + if self.variables[idx2var[idx]].vtype == GRB.INTEGER or self.variables[idx2var[idx]].vtype == GRB.BINARY: + self.variables[idx2var[idx]].x = round(val) + else: + self.variables[idx2var[idx]].x = round(val * 1e8) / 1e8 + + if incub_node and self.Status != GRB.TIMEOUT: + self.Status = GRB.OPTIMAL + else: + self.Status = GRB.INFEASIBLE + + +def quicksum(expr_list): + if not expr_list: + return LinearExpr.from_constant(0) + + constant = 0 + coefficients = {} + + for expr in expr_list: + if isinstance(expr, LinearExpr): + linear_expr = expr + elif isinstance(expr, Var): + linear_expr = expr.to_expr() + elif isinstance(expr, (int, float)): + linear_expr = LinearExpr.from_constant(expr) + else: + raise TypeError(f"Unsupported Expression Type: {type(expr)}") + + # 累加常数项 + constant += linear_expr.constant + + # 累加系数 + for var_name, coeff in linear_expr.coefficients.items(): + if var_name in coefficients: + coefficients[var_name] += coeff + else: + coefficients[var_name] = coeff + + return LinearExpr(constant, coefficients) + + +# 测试代码 +if __name__ == "__main__": + # todo: test instance + mdl = Model() + + x = mdl.addVars(4, ub=10, vtype=GRB.CONTINUOUS, name='x') + + # == Objective function === + mdl.setObjective(3 * x[0] + 9 * x[1] + 20 * x[2] + 19 * x[3], GRB.MINIMIZE) + + # === Constraint === + mdl.addConstr(110 * x[0] + 160 * x[1] + 420 * x[2] + 260 * x[3] >= 2000) + mdl.addConstr(4 * x[0] + 8 * x[1] + 4 * x[2] + 14 * x[3] >= 55) + mdl.addConstr(2 * x[0] + 285 * x[1] + 22 * x[2] + 80 * x[3] >= 800) + + mdl.optimize() # todo: callback function + if mdl.Status == GRB.OPTIMAL: + print(f"objective = {mdl.objval}") + for name, var in mdl.variables.items(): + print(f"{name} = {var.x}") + elif mdl.Status == GRB.INFEASIBLE: + print('infeasible model') + elif mdl.Status == GRB.UNBOUNDED: + print('unbounded model') + +