集成学习是一种机器学习技术,它将两个或多个学习器(例如回归模型、神经网络)来生成更好的预测。换句话说,集成模型将多个单独的模型组合在一起,以产生比单独使用单个模型更准确的预测。集成学习主要包括三种常见策略:BaggingBoosting 和 Stacking

为什么使用集成学习
使用集成学习的根本原因在于单一模型往往存在局限性,其预测能力可能受制于特定数据分布、模型假设或训练随机性,导致泛化能力不足。集成学习通过构建多个基础模型并汇总结果,利用集体智慧降低整体预测方差、偏差或同时减少两者,从而提升模型耐操性与准确性。不同基础模型可能捕捉数据中互补的模式或特征,相互纠正错误,使最终决策更稳定可靠。

Bagging

Bagging是自助聚合(Bootstrap Aggregating)的缩写,是一种集成学习技术。Bagging算法通过基于自助法Bootstrap Method)的重采样技术,从原始训练数据集中构建多个互不相同的子训练集。具体而言,该算法采用有放回抽样策略,从原始数据集中随机抽取与原数据集规模相等的样本,形成自助样本集Bootstrap Sample);此过程重复多次以生成多个独立的自助样本集。随后,在每个自助样本集上独立训练一个基学习器,所有基学习器均采用相同的算法类型。最终,针对回归任务,通过算术平均对多个基学习器的预测结果进行聚合;而对于分类任务,则采用硬投票机制,将得票数最多的类别作为最终预测输出。
随机森林Random Forest)是 Bagging的一个经典应用。

随机森林

随机森林的核心思想是构建大量决策树,每个树独立训练并投票决定最终结果。训练过程中,每个决策树基于原始数据集的随机子样本生成,同时每个树在节点分裂时仅考虑随机选取的特征子集。这种双重随机性设计减少了模型过拟合风险,并增强了泛化能力。预测阶段,对于分类任务,随机森林采用多数投票机制综合所有树的输出类别;对于回归任务,则采用平均值作为最终预测。
随机森林

Boosting

提升方法Boosting)是一种可以用来减小监督学习中偏差的机器学习算法。主要也是学习一系列弱分类器,并将其组合为一个强分类器。

AdaBoost

AdaBoost通过调整样本权重和分类器权重,使模型逐步聚焦于难以分类的样本。具体而言,初始时所有样本的权重相等,即D1(i)=1nD_1(i) = \frac{1}{n},其中nn为样本总数。在每一轮迭代中,首先根据当前样本权重训练一个弱分类器ht(x)h_t(x),然后计算其加权误差率

ϵt=i=1nDt1(i)I(yiht(xi))i=1nDt1(i)\epsilon_t = \frac{\sum_{i=1}^n D_{t-1}(i) \cdot I(y_i \neq h_t(x_i))}{\sum_{i=1}^n D_{t-1}(i)}

其中I()I(\cdot)为指示函数,yiy_i为真实标签,ht(xi)h_t(x_i)为弱分类器的预测结果。
接着根据误差率计算该分类器的权重

αt=12log(1ϵtϵt)\alpha_t = \frac{1}{2} \log\left(\frac{1 - \epsilon_t}{\epsilon_t}\right)

误差率越小,αt\alpha_t越大,表明该分类器在最终决策中起更重要的作用。
随后更新样本权重,使得被错误分类的样本权重增加,正确分类的样本权重减少

Dt(i)=Dt1(i)eαtyiht(xi)D_t(i) = D_{t-1}(i) \cdot e^{-\alpha_t y_i h_t(x_i)}

其中yiht(xi)y_i h_t(x_i)为分类是否正确的判断(若正确则为+1,否则为-1)。最后,所有弱分类器的预测结果按权重αt\alpha_t进行加权投票,最终的强分类器为

H(x)=sign(t=1Tαtht(x))H(x) = \text{sign}\left(\sum_{t=1}^T \alpha_t h_t(x)\right)

其中TT为迭代次数。通过这种动态调整机制,AdaBoost能够逐步优化模型,减少偏差并提升泛化能力。

Gradient Boosting

Gradient Boosting是一种基于梯度下降的集成学习方法,通过迭代地训练多个弱学习器,并逐步修正前序模型的残差,从而构建一个强预测模型。与AdaBoost不同,Gradient Boosting并非直接调整样本权重,而是通过最小化损失函数的梯度来优化模型。
它从初始化一个简单的基础模型开始,然后进行多轮迭代。在每一轮迭代mm中,算法首先计算当前模型的伪残差(即损失函数关于当前预测的负梯度),定义为

rim=L(yi,F(xi))F(xi)F=Fm1r_{im} = -\frac{\partial L(y_i, F(x_i))}{\partial F(x_i)} \bigg|_{F=F_{m-1}}

这些伪残差代表了前一轮模型的预测错误方向。接着,训练一个新的弱学习器hm(x)h_m(x)来拟合这些伪残差,使其尽可能准确地预测误差。然后,通过线搜索找到最优步长γm\gamma_m以最小化损失函数

γm=argminγi=1nL(yi,Fm1(xi)+γhm(xi))\gamma_m = \arg\min_{\gamma} \sum_{i=1}^n L(y_i, F_{m-1}(x_i) + \gamma h_m(x_i))

最后,更新整体模型为Fm(x)=Fm1(x)+νγmhm(x)F_m(x) = F_{m-1}(x) + \nu \gamma_m h_m(x),其中ν\nu是学习率。这个过程重复进行MM次,直到达到预定的迭代次数或误差收敛,最终模型是所有弱学习器的加权和FM(x)F_M(x)

集成学习实战

随机森林

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class RandomForest:
def __init__(self, n_estimators=100, max_features='sqrt', random_state=None):
"""
随机森林初始化
:param n_estimators: 树的数量
:param max_features: 每棵树分裂时考虑的最大特征数('sqrt'为特征总数的平方根)
:param random_state: 随机种子
"""
self.n_estimators = n_estimators
self.max_features = max_features
self.random_state = random_state
self.trees = []
self.feature_indices = [] # 存储每棵树使用的特征索引

def fit(self, X, y):
"""
训练随机森林
:param X: 训练数据(n_samples, n_features)
:param y: 标签(n_samples,)
"""
np.random.seed(self.random_state)
n_samples, n_features = X.shape

# 确定每棵树使用的特征数
if self.max_features == 'sqrt':
max_features = int(np.sqrt(n_features))
else:
max_features = n_features # 默认使用所有特征

# 创建并训练每棵树
for _ in range(self.n_estimators):
tree = DecisionTreeClassifier()

# 1. 自助采样(有放回抽样)
sample_indices = np.random.choice(n_samples, n_samples, replace=True)
X_bootstrap = X[sample_indices]
y_bootstrap = y[sample_indices]

# 2. 随机选择特征子集
feature_idx = np.random.choice(n_features, max_features, replace=False)
X_bootstrap_subset = X_bootstrap[:, feature_idx]

# 训练决策树
tree.fit(X_bootstrap_subset, y_bootstrap)
self.trees.append(tree)
self.feature_indices.append(feature_idx)

def predict(self, X):
"""
预测
:param X: 测试数据(n_samples, n_features)
:return: 预测结果(n_samples,)
"""
predictions = np.zeros((X.shape[0], self.n_estimators))

# 收集每棵树的预测
for i, (tree, feature_idx) in enumerate(zip(self.trees, self.feature_indices)):
X_subset = X[:, feature_idx]
predictions[:, i] = tree.predict(X_subset)

# 多数投票
return np.array([Counter(row).most_common(1)[0][0] for row in predictions])

AdaBoost

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
from sklearn.tree import DecisionTreeClassifier

class AdaBoost:
def __init__(self, n_estimators=50):
self.n_estimators = n_estimators # 弱学习器数量
self.alphas = [] # 弱学习器权重
self.models = [] # 弱学习器列表
self.classes = None # 存储类别标签

def fit(self, X, y):
n_samples = X.shape[0]
self.classes = np.unique(y)

# 初始化样本权重
w = np.ones(n_samples) / n_samples

for _ in range(self.n_estimators):
# 训练决策树桩(深度=1)
tree = DecisionTreeClassifier(max_depth=1)
tree.fit(X, y, sample_weight=w)
pred = tree.predict(X)

# 计算加权错误率
err = w.dot(pred != y) / w.sum()

# 计算当前弱学习器权重
alpha = 0.5 * np.log((1 - err) / max(err, 1e-10)) # 避免除零

# 更新样本权重
w = w * np.exp(-alpha * y * pred)
w = w / w.sum() # 归一化权重

# 存储模型和权重
self.alphas.append(alpha)
self.models.append(tree)

def predict(self, X):
# 初始化预测矩阵
preds = np.zeros((X.shape[0], len(self.classes)))

# 加权投票
for alpha, tree in zip(self.alphas, self.models):
class_idx = np.searchsorted(self.classes, tree.predict(X))
preds[np.arange(X.shape[0]), class_idx] += alpha

# 返回概率最高的类别
return self.classes[np.argmax(preds, axis=1)]

Gradient Boosting

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import numpy as np
from sklearn.tree import DecisionTreeRegressor

class GradientBoostingRegressor:
def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
self.trees = []
self.initial_pred = None

def fit(self, X, y):
# 初始预测:目标变量的均值
self.initial_pred = np.mean(y)
y_pred = np.full_like(y, self.initial_pred, dtype=float)

for _ in range(self.n_estimators):
# 计算负梯度(残差)
residual = y - y_pred

# 用决策树拟合残差
tree = DecisionTreeRegressor(max_depth=self.max_depth)
tree.fit(X, residual)

# 更新预测值
y_pred += self.learning_rate * tree.predict(X)

# 保存树
self.trees.append(tree)

def predict(self, X):
# 初始预测值
y_pred = np.full(X.shape[0], self.initial_pred)

# 累加所有树的预测
for tree in self.trees:
y_pred += self.learning_rate * tree.predict(X)

return y_pred