时间序列数据随处可见:网站每分钟的访问量、传感器读数、股票价格、人流计数、服务器 CPU 使用率,都是典型场景。

多数时候这类数据遵循某种规律。异常检测的目标就是找到规律被打破的那些时刻。

打开网易新闻 查看精彩图片

什么是时间序列数据中的异常?

异常指的是与正常行为产生明显偏离的数据点或数据序列。举几个例子:凌晨 3 点网站流量突然飙升;传感器因设备故障出现读数骤降;已关门的商店内人流量异常激增。

为什么时间序列异常检测很困难

时间序列数据天然包含趋势(缓慢的上升或下降)、季节性(日级或周级的周期模式)以及噪声(随机波动)。这三个成分叠加在一起,让"正常"本身就在不断变化。

一个值的高低本身不构成异常,它是否异常取决于出现的时间点。中午有 1000 个访客是正常的,凌晨 3 点有 1000 个访客就不正常了。

打开网易新闻 查看精彩图片

学习"正常"的样子

在检测异常之前,系统需要先建立对正常行为的认知——预期的数值范围、长期趋势走向以及重复出现的季节性模式。

不同的数据特征对应不同的检测策略。

打开网易新闻 查看精彩图片

方法 1:统计阈值法(Z-Score)

最简单的做法,假设数据服从正态分布。

import numpy as np
def z_score_anomaly(data, threshold=3):
mean = np.mean(data)
std = np.std(data)
z_scores = (data - mean) / std
anomalies = np.abs(z_scores) > threshold
return anomalies

适用场景:没有趋势的平稳数据。

方法 2:移动平均 + 残差

适用于带有平滑趋势的数据。

import pandas as pd
def moving_average_anomaly(series, window=10, threshold=2):
rolling_mean = series.rolling(window).mean()
residual = series - rolling_mean
std = residual.std()
return abs(residual) > threshold * std

它的优势在于,每个数据点比较的是自身的局部上下文而非全局均值。

方法 3:季节性分解

周期性模式明显的数据最适合用这个方法。

from statsmodels.tsa.seasonal import seasonal_decompose
def seasonal_anomaly(series, period=24):
result = seasonal_decompose(series, model='additive', period=period)
residual = result.resid
threshold = 3 * residual.std()
return abs(residual) > threshold

季节性分解把原始序列拆成趋势、季节性和残差三个分量,异常通常藏在残差里。

方法 4:机器学习(Isolation Forest)

不依赖任何分布假设,直接隔离罕见模式。

from sklearn.ensemble import IsolationForest
def isolation_forest_anomaly(data, contamination=0.02):
model = IsolationForest(contamination=contamination)
preds = model.fit_predict(data.reshape(-1, 1))
return preds == -1

适用场景:模式未知、数据不规则,或者多变量时间序列。

方法 5:深度学习(自编码器)

自编码器学习重建正常序列,重建误差高的部分即为异常。

import numpy as np
def reconstruction_error(original, reconstructed):
return np.mean((original - reconstructed) ** 2)

适合处理模式复杂、维度较多、存在长期依赖关系的时间序列。

示例:人流量分析

import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose
# 生成商店人流量数据(1 周,每小时)
hours = pd.date_range('2024-01-01', periods=24*7, freq='H')
hour_of_day = hours.hour
# 正常:上午 9 点到晚上 9 点繁忙,夜间安静
base = 100 + 80 * ((hour_of_day >= 9) & (hour_of_day <= 21))
traffic = pd.Series(base + np.random.normal(0, 10, len(hours)), index=hours)
# 注入异常
traffic.iloc[15] = 200 # 凌晨 3 点飙升(摄像头问题)
traffic.iloc[75] = 5 # 营业时间下降(故障)
# 检测
result = seasonal_decompose(traffic, model='additive', period=24)
residual = result.resid
anomalies = abs(residual) > 3 * residual.std()
print(f"Detected {anomalies.sum()} anomalies")

减少误报

误报是异常检测在生产环境中最常见的痛点。三种思路可以缓解。

调整灵敏度:控制标记比例:

model = IsolationForest(contamination=0.02) # 仅标记 2%

要求持续性:只有连续多个点都表现异常时才触发告警:

# 仅当异常持续 3 个及以上连续点时才标记
consecutive_count >= 3

集成投票:多种方法同时判断,取多数一致的结果:

# 投票:如果 2 个及以上方法一致则标记
votes = method1 + method2 + method3
anomalies = votes >= 2

总结

异常检测的核心不在于找出"奇怪的数字",而在于理解每个时间点上什么才算正常。先对数据做可视化探索,从移动平均或季节性分解入手;如果数据模式复杂,引入 Isolation Forest;生产系统中建议组合多种方法以降低误判。

异常检测要做的,是识别那些偏离了时间、趋势和行为规律的数据点。

https://avoid.overfit.cn/post/a6de4ac94dd64768a768593e39b6c7cb

by Bhargavi Guddati