import numpy as np
from .decision_tree import DecisionTree
[docs]
class RandomForest:
"""
Random Forest for classification and regression.
An ensemble method that combines multiple decision trees using:
- Bootstrap aggregating (bagging): Each tree trains on random subset of data
- Random feature selection: Each split considers random subset of features
Parameters
----------
n_estimators : int, default=100
Number of trees in the forest.
max_depth : int, default=None
Maximum depth of each tree. If None, trees grow until all leaves are pure.
min_samples_split : int, default=2
Minimum number of samples required to split a node in each tree.
min_samples_leaf : int, default=1
Minimum number of samples required in a leaf node in each tree.
max_features : int, float, str, or None, default='sqrt'
Number of features to consider when looking for the best split:
- If int, then consider `max_features` features at each split.
- If float, then `max_features` is a fraction and `int(max_features * n_features)` features are considered.
- If 'sqrt', then `max_features = sqrt(n_features)`.
- If 'log2', then `max_features = log2(n_features)`.
- If None, then `max_features = n_features`.
criterion : str, default='gini'
Splitting criterion. 'gini' for classification, 'mse' for regression.
bootstrap : bool, default=True
Whether to use bootstrap sampling when building trees.
random_state : int or None, default=None
Random seed for reproducibility.
"""
[docs]
def __init__(
self,
n_estimators=100,
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
max_features='sqrt',
criterion='gini',
bootstrap=True,
random_state=None
):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_features = max_features
self.criterion = criterion
self.bootstrap = bootstrap
self.random_state = random_state
# Will be set during fit
self.trees = []
self.n_features_ = None
self.n_classes_ = None
self.is_classification = None
# Set random seed if provided
if self.random_state is not None:
np.random.seed(self.random_state)
[docs]
def fit(self, X, y):
"""
Build a forest of trees from training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data
y : array-like of shape (n_samples,)
Target values
"""
X = np.asarray(X)
y = np.asarray(y)
# Validate inputs
if X.shape[0] != y.shape[0]:
raise ValueError("X and y must have the same number of samples")
if X.shape[0] == 0:
raise ValueError("X cannot be empty")
if self.criterion not in ['gini', 'mse']:
raise ValueError(f"criterion must be 'gini' or 'mse', got '{self.criterion}'")
# Store metadata
self.n_features_ = X.shape[1]
self.is_classification = (self.criterion == 'gini')
if self.is_classification:
self.n_classes_ = len(np.unique(y))
else:
self.n_classes_ = None
# Calculate max_features value
max_features = self._get_max_features(self.n_features_)
# Initialize list to store trees
self.trees = []
# Build n_estimators trees
for i in range(self.n_estimators):
# Create bootstrap sample (with replacement)
if self.bootstrap:
n_samples = X.shape[0]
# Random indices with replacement
indices = np.random.choice(n_samples, size=n_samples, replace=True)
X_bootstrap = X[indices]
y_bootstrap = y[indices]
else:
# Use all data
X_bootstrap = X
y_bootstrap = y
# Create and train a decision tree
tree = DecisionTree(
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf,
criterion=self.criterion
)
# Store original method and create wrapper for random feature selection
original_find_best_split = tree._find_best_split
def create_random_feature_splitter(tree_instance, n_features_to_use):
"""Create a function that restricts feature search to random subset."""
def random_feature_find_best_split(X_subset, y_subset):
"""Wrapper that restricts feature search to random subset."""
# Select random subset of features for this split
n_available_features = X_subset.shape[1]
n_to_select = min(n_features_to_use, n_available_features)
feature_indices = np.random.choice(
n_available_features,
size=n_to_select,
replace=False
)
best_feature = None
best_threshold = None
best_gain = -np.inf
# Only search through selected features
for feature_idx in feature_indices:
feature_values = np.unique(X_subset[:, feature_idx])
for threshold in feature_values:
left_mask = X_subset[:, feature_idx] <= threshold
right_mask = ~left_mask
n_left = np.sum(left_mask)
n_right = np.sum(right_mask)
if n_left >= tree_instance.min_samples_leaf and n_right >= tree_instance.min_samples_leaf:
left_y = y_subset[left_mask]
right_y = y_subset[right_mask]
gain = tree_instance._calculate_information_gain(y_subset, left_y, right_y)
if gain > best_gain:
best_feature = feature_idx
best_threshold = threshold
best_gain = gain
return best_feature, best_threshold, best_gain
return random_feature_find_best_split
# Replace the method with random feature version
tree._find_best_split = create_random_feature_splitter(tree, max_features)
# Train the tree
tree.fit(X_bootstrap, y_bootstrap)
# Store the tree
self.trees.append(tree)
def _get_max_features(self, n_features):
"""
Calculate the number of features to consider at each split.
Parameters
----------
n_features : int
Total number of features
Returns
-------
int
Number of features to use
"""
if self.max_features is None:
return n_features
elif isinstance(self.max_features, int):
return min(self.max_features, n_features)
elif isinstance(self.max_features, float):
return max(1, int(self.max_features * n_features))
elif self.max_features == 'sqrt':
return int(np.sqrt(n_features))
elif self.max_features == 'log2':
return int(np.log2(n_features))
else:
raise ValueError(
f"max_features must be int, float, 'sqrt', 'log2', or None, "
f"got '{self.max_features}'"
)
[docs]
def predict(self, X):
"""
Predict target values for samples in X.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples
Returns
-------
ndarray of shape (n_samples,)
Predicted values
"""
X = np.asarray(X)
if len(self.trees) == 0:
raise ValueError("Forest has not been fitted. Call 'fit' first.")
if X.shape[0] == 0:
return np.array([])
# Validate number of features
if X.shape[1] != self.n_features_:
raise ValueError(
f"X has {X.shape[1]} features, but forest expects {self.n_features_} features"
)
# Get predictions from all trees
all_predictions = []
for tree in self.trees:
predictions = tree.predict(X)
all_predictions.append(predictions)
# Stack predictions: shape (n_estimators, n_samples)
all_predictions = np.array(all_predictions)
# Aggregate predictions
if self.is_classification:
# Classification: Majority voting
# For each sample, find the most common prediction across all trees
final_predictions = []
for sample_idx in range(X.shape[0]):
# Get predictions from all trees for this sample
sample_predictions = all_predictions[:, sample_idx]
# Find most common class (mode)
unique, counts = np.unique(sample_predictions, return_counts=True)
most_common = unique[np.argmax(counts)]
final_predictions.append(most_common)
return np.array(final_predictions)
else:
# Regression: Average predictions
return np.mean(all_predictions, axis=0)
[docs]
def predict_proba(self, X):
"""
Predict class probabilities for classification.
Only available for classification (criterion='gini').
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples
Returns
-------
ndarray of shape (n_samples, n_classes)
Class probabilities
"""
if not self.is_classification:
raise ValueError("predict_proba is only available for classification")
X = np.asarray(X)
if len(self.trees) == 0:
raise ValueError("Forest has not been fitted. Call 'fit' first.")
if X.shape[1] != self.n_features_:
raise ValueError(
f"X has {X.shape[1]} features, but forest expects {self.n_features_} features"
)
# Get predictions from all trees
all_predictions = []
for tree in self.trees:
predictions = tree.predict(X)
all_predictions.append(predictions)
# Stack predictions: shape (n_estimators, n_samples)
all_predictions = np.array(all_predictions)
# Calculate probabilities for each sample
n_samples = X.shape[0]
probabilities = np.zeros((n_samples, self.n_classes_))
for sample_idx in range(n_samples):
# Get predictions from all trees for this sample
sample_predictions = all_predictions[:, sample_idx]
# Count votes for each class
for class_idx in range(self.n_classes_):
# Count how many trees predicted this class
votes = np.sum(sample_predictions == class_idx)
probabilities[sample_idx, class_idx] = votes / self.n_estimators
return probabilities