pyimport numpy as np
import random
class KMeans:
def __init__(self, n_clusters=3, max_iter=300, tol=1e-4):
self.n_clusters = n_clusters # 聚类数量
self.max_iter = max_iter # 最大迭代次数
self.tol = tol # 收敛阈值
self.centroids = None # 聚类中心
self.labels = None # 样本标签
def fit(self, X):
# 1. 随机初始化聚类中心
n_samples = X.shape[0]
random_indices = random.sample(range(n_samples), self.n_clusters)
self.centroids = X[random_indices]
for _ in range(self.max_iter):
# 2. 分配样本到最近的聚类中心
distances = self._compute_distances(X)
self.labels = np.argmin(distances, axis=1)
# 3. 保存旧中心用于收敛判断
old_centroids = self.centroids.copy()
# 4. 更新聚类中心
for i in range(self.n_clusters):
# 获取属于当前聚类的所有样本
cluster_samples = X[self.labels == i]
if len(cluster_samples) > 0:
self.centroids[i] = np.mean(cluster_samples, axis=0)
# 5. 检查是否收敛
centroid_shift = np.linalg.norm(old_centroids - self.centroids)
if centroid_shift < self.tol:
break
def predict(self, X):
distances = self._compute_distances(X)
return np.argmin(distances, axis=1)
def _compute_distances(self, X):
# 计算每个样本到所有聚类中心的距离
distances = np.zeros((X.shape[0], self.n_clusters))
for i, centroid in enumerate(self.centroids):
distances[:, i] = np.linalg.norm(X - centroid, axis=1)
return distances
# 生成测试数据
np.random.seed(42)
X = np.vstack([
np.random.normal(loc=[0, 0], scale=1, size=(100, 2)),
np.random.normal(loc=[5, 5], scale=1, size=(100, 2)),
np.random.normal(loc=[-5, 5], scale=1, size=(100, 2))
])
# 训练K-Means
kmeans = KMeans(n_clusters=3)
kmeans.fit(X)
# 预测
labels = kmeans.predict(X)
print("聚类中心:\n", kmeans.centroids)
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!