Source code for humancompatible.detect.methods.msd.one_rule

import logging
from typing import List, Tuple, Dict

import numpy as np
import pyomo.environ as pyo

logger = logging.getLogger(__name__)


[docs] class OneRule: """Implementation of a MIO formulation for finding an optimal conjunction. This class implements a Mixed-Integer Optimization (MIO) formulation to discover an optimal conjunction (a logical AND of features) that maximizes the absolute difference in target outcomes between the subgroup defined by this conjunction and its complement. The formulation is inspired by the 1Rule method from the paper "Learning Optimal and Fair Classifiers" by Malioutov and Varshney (http://proceedings.mlr.press/v28/malioutov13.pdf). """
[docs] def __init__(self) -> None: """ Initializes the OneRule solver. """ # Stores the Pyomo model after solving self.model: pyo.ConcreteModel | None = None
def _make_abs_model( self, X: np.ndarray[bool], y: np.ndarray[bool], weights: np.ndarray[float], n_min: int, feat_init: Dict[int, int] | None = None, ) -> pyo.ConcreteModel: """ Creates the Mixed-Integer Optimization (MIO) formulation to find an optimal conjunction. This private method constructs the Pyomo model for the MIO problem. The objective is to maximize the absolute difference in weighted sums of positive outcomes between the subgroup and its complement, subject to constraints that define the subgroup based on selected features and a minimum support. Args: X (np.ndarray[bool]): The input feature matrix, where each row is an instance and each column is a boolean feature. Shape `(n_instances, n_features)`. y (np.ndarray[bool]): The binary target labels for each instance. `True` for positive outcomes, `False` for negative. Shape `(n_instances,)`. weights (np.ndarray[float]): Weights for each instance, used in the objective function to calculate weighted means. Typically, these are normalized counts. Shape `(n_instances,)`. n_min (int): Minimum subgroup support (number of instances) required for a valid subgroup. feat_init (Dict[int, int] | None, default None): Initialization for the `use_feat` binary variables. A dictionary where keys are feature indices and values are `0` (feature not used) or `1` (feature used) in the conjunction. Defaults to an empty dictionary, allowing the solver to determine initial values. Returns: pyo.ConcreteModel: The constructed Pyomo MIO model instance. Notes: - The model uses `pyomo.Binary` variables for `use_feat` to select features for the conjunction. - `ingroup` variables determine if an instance belongs to the current subgroup. - `force_0` and `force_1` constraints ensure `ingroup[i]` is 1 if and only if all `use_feat[j]` that are 1 also have `Xint[i, j]` as 1. - `minimum` constraint enforces the `n_min` support. - `o` and `b` variables, along with `abs_obj_u1` and `abs_obj_u2` constraints, linearize the absolute value in the objective function. """ if feat_init is None: feat_init = {} n, d = X.shape # Convert boolean X to integer for Pyomo compatibility (0 or 1) Xint = np.zeros_like(X, dtype=int) Xint[X] = 1 model = pyo.ConcreteModel() model.all_i = pyo.Set(initialize=np.arange(n)) model.feat_i = pyo.Set(initialize=np.arange(d)) model.pos_i = pyo.Set(initialize=np.where(y)[0]) # Indices of positive outcomes model.neg_i = pyo.Set(initialize=np.where(~y)[0]) # Indices of negative ones # Decision variables # `use_feat[j]` is 1 if feature `j` is part of the conjunction, 0 otherwise. model.use_feat = pyo.Var(model.feat_i, domain=pyo.Binary, initialize=feat_init) # `ingroup[i]` is 1 if instance `i` is in the subgroup, 0 otherwise. model.ingroup = pyo.Var(model.all_i, domain=pyo.NonNegativeReals, bounds=(0, 1)) # Constraints to define subgroup membership: # If a feature `j` is used (`use_feat[j] == 1`) AND instance `i` does NOT # have that feature (`Xint[i, j] == 0`), then `ingroup[i]` must be 0. model.force_0 = pyo.Constraint( model.all_i, model.feat_i, rule=lambda m, i, j: ( m.ingroup[i] <= 1 - (m.use_feat[j] - Xint[i, j] * m.use_feat[j]) ), ) # If an instance `i` has all features selected in the conjunction, # then `ingroup[i]` must be 1. model.force_1 = pyo.Constraint( model.all_i, rule=lambda m, i: ( m.ingroup[i] >= 1 - sum(m.use_feat[j] - Xint[i, j] * m.use_feat[j] for j in m.feat_i) ), ) # Minimum subgroup support constraint # The sum of `ingroup` variables (total members in subgroup) must be at least `n_min`. model.minimum = pyo.Constraint( expr=sum(model.ingroup[i] for i in model.all_i) >= n_min ) # Variables and constraints for linearizing the absolute value in the objective # `o` represents the absolute difference we want to maximize. model.o = pyo.Var(domain=pyo.NonNegativeReals) # `b` is a binary variable used for linearization. model.b = pyo.Var(domain=pyo.Binary) # Calculate weighted sums for positive and negative outcomes within the subgroup term1 = sum(model.ingroup[i] * weights[i] for i in model.pos_i) term2 = sum(model.ingroup[i] * weights[i] for i in model.neg_i) # Linearization of `abs(term1 - term2)`: # `o <= (term1 - term2) + M * b` # `o <= (term2 - term1) + M * (1 - b)` # (where M is a sufficiently large number, here implicitly 2, because weights sum to 1) model.abs_obj_u1 = pyo.Constraint(expr=model.o <= term1 - term2 + 2 * model.b) model.abs_obj_u2 = pyo.Constraint( expr=model.o <= term2 - term1 + 2 * (1 - model.b) ) # Objective function: maximize the absolute difference `o` model.obj = pyo.Objective( expr=model.o, sense=pyo.maximize, ) return model def _make_solver(self, solver_name: str, verbose: int = 2, time_limit: int = 300): # Solver setup if solver_name == "gurobi": solver = pyo.SolverFactory(solver_name, solver_io="python") else: solver = pyo.SolverFactory(solver_name) opts = {} # Set time limit and log outputs parameter for the solver if "cplex" in solver_name: opts["mip display"] = 4 if verbose == 2 else 0 opts["simplex display"] = 2 if verbose == 2 else 0 opts["bar display"] = 1 if verbose == 2 else 0 opts["timelimit"] = time_limit elif "glpk" in solver_name: opts["msg_lev"] = "GLP_MSG_ALL" if verbose == 2 else "GLP_MSG_OFF" opts["tmlim"] = time_limit elif "xpress" in solver_name: opts["outputlog"] = 1 if verbose == 2 else 0 opts["maxtime"] = time_limit opts["soltimelimit"] = time_limit elif "highs" in solver_name: opts["log_to_console"] = True if verbose == 2 else False opts["output_flag"] = 1 if verbose == 2 else 0 opts["time_limit"] = time_limit elif "gurobi" in solver_name: opts["OutputFlag"] = 1 if verbose == 2 else 0 opts["TimeLimit"] = time_limit else: if verbose >= 1: logger.warning( f'Time limit not set! Not implemented for the selected solver "{solver_name}".' ) solver.options.update({k: v for k, v in opts.items() if v is not None}) return solver
[docs] def find_rule( self, X: np.ndarray[bool], y: np.ndarray[bool], n_min: int = 0, time_limit: int = 300, solver_name: str = "appsi_highs", verbose: int = 1, ) -> Tuple[List[int] | None, bool]: """ Finds a single conjunction (rule) that maximizes the absolute difference in target outcomes between the subgroup it defines and its complement. This method prepares the data (by creating unique rows and assigning weights), builds the MIO model using `_make_abs_model`, and then solves it using whichever solver you specify in `solver_name`. Args: X (np.ndarray[bool]): Input data matrix of boolean features, shape `(n_instances, n_features)`. y (np.ndarray[bool]): Target labels (binary), shape `(n_instances,)`. n_min (int, default 0): Minimum subgroup support (number of rows) required for a valid subgroup. time_limit (int, default 300): Time budget for the solver (in seconds). Note that only some solvers support this option. solver_name (str, default "appsi_highs"): Method for solving the MIO formulation. Can be chosen among: - "appsi_highs" - "gurobi" - "cplex" - "glpk" - "xpress" - Other solvers, see Pyomo documentation (Note that only the 5 solvers above support the graceful `time_limit`) verbose (int, default 1): Verbosity level. 0 = silent, 1 = logger output only, 2 = all detailed logs (including solver output). Returns: Tuple[List[int] | None, bool]: A tuple of a list of integer indices representing the features (literals) that form the optimal conjunction. These indices correspond to the columns in the input `X` that define the subgroup. If the solver fails to find any feasible solution within the time budget, `None` is returned instead. The boolean flag is `True` if the returned solution is globally optimal. Raises: AssertionError: If `y`'s shape is not (X.shape[0],) or if `X` or `y` are not of boolean dtype. ValueError: If the solver terminates with condition other than timeout, optimality or infeasibility. Exception: Any exceptions raised by Pyomo or solver during model creation or solving. Notes: - The input `X` and `y` are first processed to get unique rows and assign weights based on their original counts and class proportions. This helps in handling duplicate rows efficiently. - Requires a compatible MIP solver; e.g. Gurobi, HiGHS solver to be installed and configured for Pyomo. - The `rule` returned contains indices of the *original* features (columns of `X`) that define the conjunction. """ assert y.shape == (X.shape[0],) assert X.dtype == bool and y.dtype == bool assert n_min <= min(sum(y), sum(~y)) # Handle edge cases where target is all positive or all negative size1 = np.sum(y) if size1 == 0: if verbose >= 1: logger.info( "Target 'y' contains no positive outcomes. Returning all features as rule." ) return list(range(X.shape[1])) if size1 == y.shape[0]: if verbose >= 1: logger.info( "Target 'y' contains only positive outcomes. Returning empty rule." ) return [] size0 = y.shape[0] - size1 # Aggregate identical rows and calculate weights # This step is crucial for efficiency if X contains many duplicate rows. # X0: unique rows where y is False, counts0: their frequencies X0, counts0 = np.unique(X[~y], return_counts=True, axis=0) # X1: unique rows where y is True, counts1: their frequencies X1, counts1 = np.unique(X[y], return_counts=True, axis=0) # Concatenate unique rows and calculate normalized weights X_unique = np.concatenate([X0, X1], axis=0) # Weights are normalized by the total count of their respective class weights_unique = np.concatenate([counts0 / size0, counts1 / size1], axis=0) # Recreate 'y' for the unique rows (False for original negatives, True for original positives) y_unique = np.zeros_like(weights_unique, dtype=bool) y_unique[X0.shape[0] :] = True # Mark rows from X1 as True # Create and solve the MIO model int_model = self._make_abs_model( X_unique, y_unique, weights=weights_unique, n_min=n_min ) # Solve the model solver = self._make_solver(solver_name, verbose=verbose, time_limit=time_limit) result = solver.solve(int_model, load_solutions=False, tee=(verbose == 2)) is_optimal = True if result.solver.termination_condition != pyo.TerminationCondition.optimal: is_optimal = False if verbose >= 1: logger.info("Solver did not prove optimality of the solution.") if ( result.solver.termination_condition == pyo.TerminationCondition.maxTimeLimit ): if verbose >= 1: logger.info("Timed out.") elif result.solver.termination_condition in [ pyo.TerminationCondition.infeasible, pyo.TerminationCondition.infeasibleOrUnbounded, # problem is always bounded by 0 ]: if verbose >= 1: logger.info("Infeasible formulation, something went wrong.") else: raise ValueError( f"Unexpected termination condition: {result.solver.termination_condition}." ) try: int_model.solutions.load_from(result) except ValueError: if verbose >= 1: logger.info("No solution found. Try increasing `time_limit`.") return None, False self.model = int_model # Store the solved model instance # Extract the chosen features from the model's solution # `use_feat[i].value` will be approximately 1 for selected features, 0 otherwise. vals = [int_model.use_feat[i].value for i in int_model.feat_i] # Collect indices of features that are selected (value close to 1) rule = [i for i in int_model.feat_i if vals[i] is not None and vals[i] >= 1e-4] return rule, is_optimal