一段Python代码来分析温度数据中的异常值。这里使用了多种常见的异常检测方法:
我来帮你写一段Python代码来分析温度数据中的异常值。这里使用了多种常见的异常检测方法:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import warnings
warnings.filterwarnings('ignore')# 设置中文字体(如果使用中文标签)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial']
plt.rcParams['axes.unicode_minus'] = False# 1. 生成模拟温度数据(包含一些异常值)
np.random.seed(42)
n_samples = 1000# 正常温度数据(假设冬季气温,大部分在-5°C到10°C之间)
normal_temps = np.random.normal(loc=2, scale=4, size=int(n_samples*0.95))# 添加一些异常值
outliers = np.array([
25, # 异常高温
30, # 异常高温
-20, # 异常低温
-25, # 异常低温
15, 15, 15, # 重复的较高温度
-15, -15, # 重复的较低温度
])# 合并数据
all_temps = np.concatenate([normal_temps, outliers])
dates = pd.date_range(start='2024-01-01', periods=len(all_temps), freq='H')# 创建DataFrame
df = pd.DataFrame({
'timestamp': dates,
'temperature': all_temps
})print("数据概览:")
print(f"总数据点: {len(df)}")
print(f"温度统计:")
print(df['temperature'].describe())
print("\n" + "="*50)# 2. 使用Z-score方法检测异常值
def detect_outliers_zscore(data, threshold=3):
"""使用Z-score方法检测异常值"""
z_scores = np.abs(stats.zscore(data))
outliers = np.where(z_scores > threshold)[0]
return outliers, z_scores# 3. 使用IQR方法检测异常值
def detect_outliers_iqr(data, k=1.5):
"""使用IQR方法检测异常值"""
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - k * IQR
upper_bound = Q3 + k * IQR
outliers = np.where((data < lower_bound) | (data > upper_bound))[0]
return outliers, (lower_bound, upper_bound)# 4. 使用Isolation Forest
def detect_outliers_isolation_forest(data, contamination=0.05):
"""使用孤立森林检测异常值"""
iso_forest = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=100
)
data_reshaped = data.values.reshape(-1, 1)
predictions = iso_forest.fit_predict(data_reshaped)
outliers = np.where(predictions == -1)[0]
return outliers, iso_forest.decision_function(data_reshaped)# 5. 使用LOF(局部离群因子)
def detect_outliers_lof(data, contamination=0.05):
"""使用LOF检测异常值"""
lof = LocalOutlierFactor(
contamination=contamination,
novelty=False,
n_neighbors=20
)
data_reshaped = data.values.reshape(-1, 1)
predictions = lof.fit_predict(data_reshaped)
outliers = np.where(predictions == -1)[0]
return outliers, lof.negative_outlier_factor_# 应用各种方法检测异常值
print("异常值检测结果:")
print("-" * 30)# Z-score方法
zscore_outliers, z_scores = detect_outliers_zscore(df['temperature'], threshold=3)
print(f"Z-score方法 (threshold=3):")
print(f" 检测到异常值数量: {len(zscore_outliers)}")
print(f" 异常值索引: {zscore_outliers[:10]}{'...' if len(zscore_outliers) > 10 else ''}")# IQR方法
iqr_outliers, (lower_bound, upper_bound) = detect_outliers_iqr(df['temperature'], k=1.5)
print(f"\nIQR方法 (k=1.5):")
print(f" 检测到异常值数量: {len(iqr_outliers)}")
print(f" 正常范围: [{lower_bound:.2f}, {upper_bound:.2f}]")
print(f" 异常值索引: {iqr_outliers[:10]}{'...' if len(iqr_outliers) > 10 else ''}")# Isolation Forest
iso_outliers, iso_scores = detect_outliers_isolation_forest(df['temperature'], contamination=0.05)
print(f"\nIsolation Forest (contamination=0.05):")
print(f" 检测到异常值数量: {len(iso_outliers)}")
print(f" 异常值索引: {iso_outliers[:10]}{'...' if len(iso_outliers) > 10 else ''}")# LOF方法
lof_outliers, lof_scores = detect_outliers_lof(df['temperature'], contamination=0.05)
print(f"\nLOF方法 (contamination=0.05):")
print(f" 检测到异常值数量: {len(lof_outliers)}")
print(f" 异常值索引: {lof_outliers[:10]}{'...' if len(lof_outliers) > 10 else ''}")# 6. 统计异常值
def get_consensus_outliers(methods_outliers, min_agreement=2):
"""获取多种方法都认为的异常值(共识)"""
outlier_counts = {}
for outliers in methods_outliers:
for idx in outliers:
outlier_counts[idx] = outlier_counts.get(idx, 0) + 1
# 返回至少被min_agreement种方法认为是异常值的索引
consensus_outliers = [idx for idx, count in outlier_counts.items()
if count >= min_agreement]
return sorted(consensus_outliers), outlier_countsall_methods = [zscore_outliers, iqr_outliers, iso_outliers, lof_outliers]
consensus_outliers, outlier_counts = get_consensus_outliers(all_methods, min_agreement=2)print(f"\n共识异常值 (至少2种方法认同):")
print(f" 数量: {len(consensus_outliers)}")
print(f" 索引和温度值:")
for idx in consensus_outliers[:15]: # 显示前15个
print(f" 索引 {idx}: {df['temperature'].iloc[idx]:.1f}°C")# 7. 可视化
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
fig.suptitle('温度异常值分析', fontsize=16, fontweight='bold')# 子图1: 温度分布直方图
axes[0, 0].hist(df['temperature'], bins=50, edgecolor='black', alpha=0.7)
axes[0, 0].axvline(df['temperature'].mean(), color='red', linestyle='--',
label=f'均值: {df["temperature"].mean():.2f}°C')
axes[0, 0].axvline(df['temperature'].median(), color='green', linestyle='--',
label=f'中位数: {df["temperature"].median():.2f}°C')
axes[0, 0].set_xlabel('温度 (°C)')
axes[0, 0].set_ylabel('频数')
axes[0, 0].set_title('温度分布直方图')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)# 子图2: 箱线图
axes[0, 1].boxplot(df['temperature'], vert=True, patch_artist=True)
axes[0, 1].set_title('温度箱线图')
axes[0, 1].set_ylabel('温度 (°C)')
axes[0, 1].grid(True, alpha=0.3)# 子图3: 时间序列图(标出异常值)
axes[1, 0].plot(df['timestamp'], df['temperature'], 'b-', alpha=0.6, label='正常值')
consensus_temps = df['temperature'].iloc[consensus_outliers]
consensus_timestamps = df['timestamp'].iloc[consensus_outliers]
axes[1, 0].scatter(consensus_timestamps, consensus_temps,
color='red', s=50, zorder=5, label='异常值')
axes[1, 0].set_xlabel('时间')
axes[1, 0].set_ylabel('温度 (°C)')
axes[1, 0].set_title('温度时间序列(标出异常值)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].tick_params(axis='x', rotation=45)# 子图4: 异常值检测方法比较
methods = ['Z-score', 'IQR', 'Isolation Forest', 'LOF']
outlier_counts = [len(zscore_outliers), len(iqr_outliers), len(iso_outliers), len(lof_outliers)]
x_pos = np.arange(len(methods))
axes[1, 1].bar(x_pos, outlier_counts, color=['skyblue', 'lightgreen', 'salmon', 'gold'])
axes[1, 1].set_xlabel('检测方法')
axes[1, 1].set_ylabel('检测到的异常值数量')
axes[1, 1].set_title('不同方法检测到的异常值数量')
axes[1, 1].set_xticks(x_pos)
axes[1, 1].set_xticklabels(methods, rotation=45)
for i, v in enumerate(outlier_counts):
axes[1, 1].text(i, v, str(v), ha='center', va='bottom', fontweight='bold')
axes[1, 1].grid(True, alpha=0.3, axis='y')# 子图5: QQ图(检验正态分布)
stats.probplot(df['temperature'], dist="norm", plot=axes[2, 0])
axes[2, 0].set_title('QQ图(检验正态性)')
axes[2, 0].grid(True, alpha=0.3)# 子图6: 共识异常值详细分析
temperature_values = df['temperature'].values
normal_mask = np.ones(len(temperature_values), dtype=bool)
normal_mask[consensus_outliers] = Falseaxes[2, 1].hist(temperature_values[normal_mask], bins=30, alpha=0.7,
label='正常值', color='blue', edgecolor='black')
axes[2, 1].hist(temperature_values[consensus_outliers], bins=10, alpha=0.7,
label='异常值', color='red', edgecolor='black')
axes[2, 1].set_xlabel('温度 (°C)')
axes[2, 1].set_ylabel('频数')
axes[2, 1].set_title('正常值与异常值分布对比')
axes[2, 1].legend()
axes[2, 1].grid(True, alpha=0.3)plt.tight_layout()
plt.show()# 8. 异常值处理建议
print("\n" + "="*50)
print("异常值处理建议:")
print("="*50)# 分析异常值特征
if len(consensus_outliers) > 0:
outlier_temps = df['temperature'].iloc[consensus_outliers]
mean_temp = df['temperature'].mean()
std_temp = df['temperature'].std()
print(f"\n异常值统计信息:")
print(f" 异常值平均温度: {outlier_temps.mean():.2f}°C")
print(f" 异常值温度范围: [{outlier_temps.min():.2f}°C, {outlier_temps.max():.2f}°C]")
print(f" 高温异常值: {sum(outlier_temps > mean_temp + 3*std_temp)} 个")
print(f" 低温异常值: {sum(outlier_temps < mean_temp - 3*std_temp)} 个")
print(f"\n处理建议:")
print("1. 数据清洗:")
print(f" - 考虑移除 {len(consensus_outliers)} 个共识异常值")
print(f" - 或使用中位数填充: {df['temperature'].median():.2f}°C")
print("\n2. 可能的原因分析:")
if any(outlier_temps > 20):
print(" - 高温异常可能是传感器故障或夏季数据混入")
if any(outlier_temps < -15):
print(" - 低温异常可能是传感器故障或极端天气")
print("\n3. 下一步行动:")
print(" - 验证异常值是否真实(检查传感器日志)")
print(" - 考虑使用移动平均或指数平滑处理")
print(" - 建立温度异常预警系统")
else:
print("未检测到显著的异常值。")# 9. 保存清洗后的数据(可选)
def clean_data(df, outlier_indices, method='remove'):
"""清理异常值"""
if method == 'remove':
cleaned_df = df.drop(outlier_indices).reset_index(drop=True)
elif method == 'median_fill':
cleaned_df = df.copy()
median_val = df['temperature'].median()
cleaned_df.loc[outlier_indices, 'temperature'] = median_val
elif method == 'mean_fill':
cleaned_df = df.copy()
mean_val = df['temperature'].mean()
cleaned_df.loc[outlier_indices, 'temperature'] = mean_val
elif method == 'interpolate':
cleaned_df = df.copy()
# 标记异常值为NaN,然后用插值填充
cleaned_df.loc[outlier_indices, 'temperature'] = np.nan
cleaned_df['temperature'] = cleaned_df['temperature'].interpolate()
else:
raise ValueError("不支持的清理方法")
return cleaned_df# 示例:使用中位数填充异常值
cleaned_df = clean_data(df, consensus_outliers, method='median_fill')
print(f"\n数据清洗结果:")
print(f" 原始数据形状: {df.shape}")
print(f" 清洗后数据形状: {cleaned_df.shape}")
print(f" 移除/修改的异常值数量: {len(consensus_outliers)}")# 保存清洗后的数据
cleaned_df.to_csv('cleaned_temperature_data.csv', index=False)
print("清洗后的数据已保存为 'cleaned_temperature_data.csv'")这段代码提供了完整的温度异常值分析流程:
主要功能:
1. 数据生成:创建包含正常温度和异常值的模拟数据
2. 多种异常检测方法:
- Z-score方法(基于标准差)
- IQR方法(基于四分位距)
- Isolation Forest(孤立森林)
- LOF(局部离群因子)
3. 可视化分析:
- 温度分布直方图
- 箱线图
- 时间序列图
- 不同方法对比
- QQ图
- 正常值与异常值对比
4. 数据处理建议:
- 移除异常值
- 中位数/均值填充
- 插值填充
5. 结果保存:保存清洗后的数据使用方法:
1. 安装所需库:
pip install numpy pandas matplotlib seaborn scikit-learn scipy
2. 替换模拟数据为你的实际温度数据
3. 根据需要调整参数(阈值、污染率等)
4. 运行代码获取分析结果这段代码可以帮你快速识别温度数据中的异常值,并提供多种处理方案。