# -*- coding: gb2312 -*- import copy import numpy as np import time from collections import defaultdict, deque class GRB: CONTINUOUS = 'C' # 连续变量 INTEGER = 'I' # 整数变量 BINARY = 'B' # 二元变量 (0-1) OPTIMAL = 0 UNBOUNDED = 1 INFEASIBLE = 2 TIME_LIMIT = 3 INTERRUPTED = 4 MINIMIZE = 0 MAXIMIZE = 1 class Simplex: def __init__(self, obj, max_mode=False): # default is solve min LP, if want to solve max lp,should * -1 self.mat, self.max_mode = np.array([[0] + obj]) * (-1 if max_mode else 1), max_mode self.res = None self.objVal = 0 self.varVal = None def add_constraint(self, a, b): self.mat = np.vstack([self.mat, [b] + a]) def _simplex(self, mat, B, m, n): while mat[0, 1:].min() < -1e-10: col = np.where(mat[0, 1:] < 0)[0][0] + 1 # use Bland's method to avoid degeneracy. row = np.array([mat[i][0] / mat[i][col] if mat[i][col] > 0 else 0x7fffffff for i in range(1, mat.shape[0])]).argmin() + 1 # find the theta index if mat[row][col] <= -1e-10: self.res = GRB.UNBOUNDED return None # the theta is ∞, the problem is unbounded self._pivot(mat, B, row, col) self.res = GRB.OPTIMAL return mat[0][0] * (1 if self.max_mode else -1), {B[i]: mat[i, 0] for i in range(1, m) if B[i] < n} def _pivot(self, mat, B, row, col): mat[row] /= mat[row][col] ids = np.arange(mat.shape[0]) != row mat[ids] -= mat[row] * mat[ids, col:col + 1] # for each i!= row do: mat[i]= mat[i] - mat[row] * mat[i][col] B[row] = col def solve(self): m, n = self.mat.shape # m - 1 is the number slack variables we should add temp, B = np.vstack([np.zeros((1, m - 1)), np.eye(m - 1)]), list(range(n - 1, n + m - 1)) # add diagonal array mat = np.hstack([self.mat, temp]) # combine them! if mat[1:, 0].min() < -1e-10: # is the initial basic solution feasible? row = mat[1:, 0].argmin() + 1 # find the index of min b temp, mat[0] = np.copy(mat[0]), 0 # set first row value to zero, and store the previous value mat = np.hstack([mat, np.array([1] + [-1] * (m - 1)).reshape((-1, 1))]) self._pivot(mat, B, row, mat.shape[1] - 1) try: if self._simplex(mat, B, m, n)[0] != 0: self.res = GRB.INFEASIBLE return None # the problem has no answer except: return None # the problem is unbounded if mat.shape[1] - 1 in B: # if the x0 in B, we should pivot it. self._pivot(mat, B, B.index(mat.shape[1] - 1), np.where(mat[0, 1:] != 0)[0][0] + 1) mat = np.vstack([temp, mat[1:, :-1]]) # recover the first line for i, x in enumerate(B[1:]): mat[0] -= mat[0, x] * mat[i + 1] self._simplex(mat, B, m, n) if self.res == GRB.OPTIMAL: self.objVal = mat[0][0] * (1 if self.max_mode else -1) self.varVal = {B[i]: mat[i, 0] for i in range(1, m) if B[i] < n} class LinearExpr: def __init__(self, constant=0, coefficients=None): self.constant = constant # 常数项 self.coefficients = coefficients or {} # 变量名到系数的映射 @classmethod def from_var(cls, var, coeff=1): return cls(0, {var.name: coeff}) @classmethod def from_constant(cls, value): """从常数创建线性表达式""" return cls(value, {}) def copy(self): """创建副本""" return LinearExpr(self.constant, self.coefficients.copy()) def get_coeff(self, var_name): """直接获取变量的系数,无需递归""" return self.coefficients.get(var_name, 0) def set_coeff(self, var_name, coeff): """直接设置变量的系数""" if coeff == 0 and var_name in self.coefficients: del self.coefficients[var_name] elif coeff != 0: self.coefficients[var_name] = coeff def add_coeff(self, var_name, coeff): """增加变量的系数""" current = self.coefficients.get(var_name, 0) new_coeff = current + coeff self.set_coeff(var_name, new_coeff) def __add__(self, other): result = self.copy() if isinstance(other, LinearExpr): result.constant += other.constant for var_name, coeff in other.coefficients.items(): result.add_coeff(var_name, coeff) elif isinstance(other, Var): result.add_coeff(other.name, 1) else: # 常数 result.constant += other return result def __radd__(self, other): return self.__add__(other) def __sub__(self, other): return self + (-1 * other) def __rsub__(self, other): return (-1 * self) + other def __mul__(self, other): if not isinstance(other, (int, float)): raise ValueError("linear expressions can only be multiplied by scalars.") result = LinearExpr(self.constant * other) for var_name, coeff in self.coefficients.items(): result.coefficients[var_name] = coeff * other return result def __rmul__(self, other): return self.__mul__(other) def __neg__(self): return -1 * self def __le__(self, other): return Constraint(self, other, '<=') def __ge__(self, other): return Constraint(self, other, '>=') def __eq__(self, other): return Constraint(self, other, '==') class Var: def __init__(self, name, vtype, lb=None, ub=None): self.name = name self.lb = lb self.ub = ub self.idx = 0 self.x = 0 self.vtype = vtype self.Start = None # todo: start solution def to_expr(self): return LinearExpr.from_var(self) def __str__(self): return self.name # 委托给线性表达式的方法 def __add__(self, other): return self.to_expr() + other def __radd__(self, other): return other + self.to_expr() def __sub__(self, other): return self.to_expr() - other def __rsub__(self, other): return other - self.to_expr() def __mul__(self, other): return self.to_expr() * other def __rmul__(self, other): return other * self.to_expr() def __le__(self, other): return self.to_expr() <= other def __ge__(self, other): return self.to_expr() >= other def __eq__(self, other): return self.to_expr() == other def __neg__(self): return LinearExpr.from_var(self, -1) class Constraint: def __init__(self, lhs, rhs, sense): self.lhs = self._to_linear_expr(lhs) self.rhs = self._to_linear_expr(rhs) self.sense = sense # '<=', '>=', '==' @staticmethod def _to_linear_expr(expr): if isinstance(expr, LinearExpr): return expr elif isinstance(expr, Var): return expr.to_expr() elif isinstance(expr, (int, float)): return LinearExpr.from_constant(expr) else: raise TypeError(f"unsupported expression type: {type(expr)}") # @Date : 2025/11/10 # @Author : guangyu-lu class Model: def __init__(self, name=""): self.name = name self.variables = {} self.constraints = [] self.NameNumVars = defaultdict(int) self.NumVars = 0 self.Status = None self.objval = 0 self.TimeLimit = np.inf self.OutputFlag = True self.modelSense = None def terminate(self): pass # todo: terminate function def addVar(self, vtype=GRB.CONTINUOUS, lb=0, ub=np.inf, name=None): if name: if cnt := self.NameNumVars[name]: var_name = f"{name}_{cnt}" else: self.NameNumVars.pop(name) self.NameNumVars[name] += 1 var_name = name else: cnt = self.NameNumVars["usr_name"] var_name = f"usr_name_{cnt}" self.NameNumVars["usr_name"] += 1 variable = Var(var_name, vtype) variable.idx = self.NumVars variable.lb = lb variable.ub = ub self.variables[var_name] = variable self.NumVars += 1 return variable def addVars(self, *indices, vtype, lb=0, ub=np.inf, name=None): # todo: negative variable(s) solving result = defaultdict(Var) usr_name = "usr_name" if name is None else name from itertools import product index_ranges = [range(dim) if isinstance(dim, int) else dim for dim in indices] for index_tuple in product(*index_ranges): var_name = f"{name}_{self.NameNumVars[usr_name]}" self.NameNumVars[usr_name] += 1 var = self.addVar(name=var_name, lb=lb, ub=ub, vtype=vtype) result[index_tuple if len(index_tuple) > 1 else index_tuple[0]] = var return result def addConstr(self, constraint): """添加约束""" self.constraints.append(constraint) return constraint def addConstrs(self, constraints): for constraint in constraints: self.addConstr(constraint) def setObjective(self, expr, sense=GRB.MINIMIZE): """设置目标函数""" self.objective = Constraint(expr, 0, '==').lhs self.modelSense = sense def optimize(self): # todo: set start solution obj = [0] * len(self.variables) for var_, coeff in self.objective.coefficients.items(): obj[self.variables[var_].idx] = coeff sv = Simplex(obj, max_mode=self.modelSense == GRB.MAXIMIZE) for constr in self.constraints: constr_expr = [0] * len(self.variables) for var_, coeff in constr.lhs.coefficients.items(): constr_expr[self.variables[var_].idx] += coeff for var_, coeff in constr.rhs.coefficients.items(): constr_expr[self.variables[var_].idx] -= coeff if constr.sense == '<=': sv.add_constraint(constr_expr, constr.rhs.constant - constr.lhs.constant + 1e-10) elif constr.sense == ">=": sv.add_constraint([-c for c in constr_expr], constr.lhs.constant - constr.rhs.constant + 1e-10) else: # '==' sv.add_constraint(constr_expr, constr.rhs.constant - constr.lhs.constant + 1e-10) sv.add_constraint([-c for c in constr_expr], constr.lhs.constant - constr.rhs.constant + 1e-10) # for variable bound for var in self.variables.values(): if var.vtype == GRB.BINARY: constr_expr = [0] * len(self.variables) constr_expr[var.idx] = 1 sv.add_constraint(constr_expr, 1) else: if var.ub != np.inf: constr_expr = [0] * len(self.variables) constr_expr[var.idx] = 1 sv.add_constraint(constr_expr, var.ub + 1e-10) if var.lb != 0: constr_expr = [0] * len(self.variables) constr_expr[var.idx] = -1 sv.add_constraint(constr_expr, -var.lb + 1e-10) class Node: def __init__(self, solver, lb=0, ub=np.inf): self.sv = solver self.lb = lb self.ub = ub self.idx_col = {i: i + 1 for i in range(self.sv.mat.shape[1] - 1)} self.fixed_var = dict() start_time = time.time() sv.solve() if sv.res != GRB.OPTIMAL: self.Status = sv.res return sense = 1 if sv.max_mode else -1 global_ub, global_lower = sv.objVal * sense, -np.inf queue = deque() root_node = Node(sv, 0, global_ub) incub_node, cur_node = None, root_node # todo: tightening constraint(s) # todo: pre-solve remove row(s) and column(s) while True: if self.OutputFlag: pass # todo: show the solving process if time.time() - start_time > self.TimeLimit: self.Status = GRB.TIME_LIMIT break if cur_node.sv.res == GRB.OPTIMAL: if cur_node.sv.objVal * sense > global_ub + 1e-10: continue # node prune branch_idx = -1 for var in self.variables.values(): if var.idx not in cur_node.idx_col.keys(): continue if (col := cur_node.idx_col[var.idx]) not in cur_node.sv.varVal.keys(): continue val = cur_node.sv.varVal[col] if var.vtype == GRB.INTEGER and abs(round(val) - val) > 1e-8: branch_idx = var.idx if cur_node.ub >= global_ub: left_constr_expr = [0] * (cur_node.sv.mat.shape[1] - 1) left_constr_expr[col - 1] = 1 left_node = copy.deepcopy(cur_node) left_node.sv.add_constraint(left_constr_expr, int(val) + 1e-10) queue.append(left_node) right_constr_expr = [0] * (cur_node.sv.mat.shape[1] - 1) right_constr_expr[col - 1] = -1 right_node = copy.deepcopy(cur_node) right_node.sv.add_constraint(right_constr_expr, -int(val) - 1 + 1e-10) queue.append(right_node) break elif var.vtype == GRB.BINARY and abs(val) > 1e-8 and abs(val - 1) > 1e-8: branch_idx = var.idx # todo: remove columns corresponding to the sum of binary variables # remove the variable corresponding columns for idx in cur_node.idx_col.keys(): if idx > var.idx: cur_node.idx_col[idx] -= 1 cur_node.idx_col.pop(var.idx) left_node = copy.deepcopy(cur_node) left_node.sv.mat = np.delete(left_node.sv.mat, col, axis=1) left_node.fixed_var[var.idx] = 0 queue.append(left_node) right_node = copy.deepcopy(cur_node) for row in range(right_node.sv.mat.shape[0]): right_node.sv.mat[row][0] -= right_node.sv.mat[row][col] right_node.sv.mat = np.delete(right_node.sv.mat, col, axis=1) right_node.fixed_var[var.idx] = 1 queue.append(right_node) break if branch_idx == -1: cur_node.lb = cur_node.ub = cur_node.sv.objVal * sense if cur_node.lb > global_lower: # todo: solution pool global_lower = cur_node.lb incub_node = cur_node # todo: stop optimizer and output the incumbent result if len(queue) == 0 or global_ub - global_lower <= 1e-8: break cur_node = queue.popleft() cur_node.sv.solve() # todo: dual simplex / interior algorithm # todo: primal heuristic - esp. rounding and pumping # todo: cutting plane : # Gomory*, Cover, Implied bound, Projected implied bound, MIR, Flow cover, Zero half, RLT, Relax-and-lift BQP if incub_node and incub_node.sv.res == GRB.OPTIMAL: self.objval = round(incub_node.sv.objVal * 1e8) / 1e8 + self.objective.constant # convert the variable result idx2var = {var.idx: var.name for var in self.variables.values()} for idx, val in incub_node.fixed_var.items(): self.variables[idx2var[idx]].x = val col2idx = {col: idx for idx, col in incub_node.idx_col.items()} for col, val in incub_node.sv.varVal.items(): idx = col2idx[int(col)] if self.variables[idx2var[idx]].vtype == GRB.INTEGER or self.variables[idx2var[idx]].vtype == GRB.BINARY: self.variables[idx2var[idx]].x = round(val) else: self.variables[idx2var[idx]].x = round(val * 1e8) / 1e8 if incub_node and self.Status != GRB.TIMEOUT: self.Status = GRB.OPTIMAL else: self.Status = GRB.INFEASIBLE def quicksum(expr_list): if not expr_list: return LinearExpr.from_constant(0) constant = 0 coefficients = {} for expr in expr_list: if isinstance(expr, LinearExpr): linear_expr = expr elif isinstance(expr, Var): linear_expr = expr.to_expr() elif isinstance(expr, (int, float)): linear_expr = LinearExpr.from_constant(expr) else: raise TypeError(f"Unsupported Expression Type: {type(expr)}") # 累加常数项 constant += linear_expr.constant # 累加系数 for var_name, coeff in linear_expr.coefficients.items(): if var_name in coefficients: coefficients[var_name] += coeff else: coefficients[var_name] = coeff return LinearExpr(constant, coefficients) # 测试代码 if __name__ == "__main__": # todo: test instance mdl = Model() x = mdl.addVars(4, ub=10, vtype=GRB.CONTINUOUS, name='x') # == Objective function === mdl.setObjective(3 * x[0] + 9 * x[1] + 20 * x[2] + 19 * x[3], GRB.MINIMIZE) # === Constraint === mdl.addConstr(110 * x[0] + 160 * x[1] + 420 * x[2] + 260 * x[3] >= 2000) mdl.addConstr(4 * x[0] + 8 * x[1] + 4 * x[2] + 14 * x[3] >= 55) mdl.addConstr(2 * x[0] + 285 * x[1] + 22 * x[2] + 80 * x[3] >= 800) mdl.optimize() # todo: callback function if mdl.Status == GRB.OPTIMAL: print(f"objective = {mdl.objval}") for name, var in mdl.variables.items(): print(f"{name} = {var.x}") elif mdl.Status == GRB.INFEASIBLE: print('infeasible model') elif mdl.Status == GRB.UNBOUNDED: print('unbounded model')