異常檢測-anomaly-detection

語言: CN / TW / HK

總結

異常檢測方法

方法總結

一些常見的異常檢測方法

一、基於分佈的方法

1. 3sigma

基於 正態分佈 ,3sigma準則認為超過3sigma的數據為異常點。

  • 圖1: 3sigma
def three_sigma(s):
  mu, std = np.mean(s), np.std(s)
  lower, upper = mu-3*std, mu+3*std
  return lower, upper

2. Z-score

Z-score為 標準分數 ,測量數據點和平均值的距離

  • 若A與平均值相差2個標準差,Z-score為2。
  • 當把Z-score=3作為閾值去剔除異常點時,便相當於3sigma。
def z_score(s):
	z_score = (s - np.mean(s)) / np.std(s)
	return z_score

3. boxplot

箱線圖時基於四分位距(IQR)找異常點的。

  • 圖2: boxplot
def boxplot(s):
  q1, q3 = s.quantile(.25), s.quantile(.75)
  iqr = q3 - q1
  lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
  return lower, upper

4. Grubbs假設檢驗

資料來源:

Grubbs’Test 為一種 假設檢驗 的方法,常被用來檢驗服從 正態分佈單變量 數據集(univariate data set)Y中的單個異常值。若有異常值,則其必為數據集中的最大值或最小值。原假設與備擇假設如下:

  • ● H0: 數據集中沒有異常值
  • ● H1: 數據集中有一個異常值

使用Grubbs測試需要總體是正態分佈的。

算法流程:

  1. 樣本從小到大排序
  2. 求樣本的mean和dev
  3. 計算min/max與mean的差距,更大的那個為可疑值
  4. 求可疑值的z-score (standard score),如果大於Grubbs臨界值,那麼就是outlier

Grubbs臨界值可以查表得到,它由兩個值決定:檢出水平α(越嚴格越小),樣本數量n,排除outlier,對剩餘序列循環做 1-4 步驟 [1]。詳細計算樣例可以參考。

from outliers import smirnov_grubbs as grubbs
print(grubbs.test([8, 9, 10, 1, 9], alpha=0.05))
print(grubbs.min_test_outliers([8, 9, 10, 1, 9], alpha=0.05))
print(grubbs.max_test_outliers([8, 9, 10, 1, 9], alpha=0.05))
print(grubbs.max_test_indices([8, 9, 10, 50, 9], alpha=0.05))

侷限:

  • 1、只能檢測 單維度 數據
  • 2、無法精確的輸出正常區間
  • 3、它的判斷機制是“逐一剔除”,所以每個異常值都要單獨計算整個步驟,數據量大吃不消。
  • 4、需假定數據服從正態分佈或 近正態分佈

二、基於距離的方法

1. KNN

資料來源:

依次計算每個樣本點與它最近的K個樣本的平均距離,再利用計算的距離與閾值進行比較,如果大於閾值,則認為是異常點。優點是不需要假設數據的分佈,缺點是僅可以找出全局異常點,無法找到局部異常點。

from pyod.models.knn import KNN

# 初始化檢測器clf
clf = KNN( method='mean', n_neighbors=3, )
clf.fit(X_train)
# 返回訓練數據上的分類標籤 (0: 正常值, 1: 異常值)
y_train_pred = clf.labels_
# 返回訓練數據上的異常值 (分值越大越異常)
y_train_scores = clf.decision_scores_

三、基於密度的方法

1. Local Outlier Factor (LOF)

資料來源:

LOF是基於 密度 的經典算法(Breuning et. al. 2000),通過給每個數據點都分配一個依賴於鄰域密度的 離羣因子 LOF,進而判斷該數據點是否為離羣點。它的好處在於可以量化每個數據點的異常程度(outlierness)。

  • 圖3:LOF異常檢測

數據點P的局部相對密度(局部異常因子)=點P鄰域內點的平均局部可達密度 跟 數據點P的局部可達密度 的比值:

數據點P的局部可達密度=P最近鄰的平均可達距離的倒數。距離越大,密度越小。

點P到點O的第k可達距離=max(點O的k近鄰距離,點P到點O的距離)。

  • 圖4:可達距離

點O的k近鄰距離=第 k個最近的點跟點O之間的距離。

整體來説,LOF算法流程如下:

  • ● 對於每個數據點,計算它與其他所有點的距離,並按從近到遠排序;
  • ● 對於每個數據點,找到它的K-Nearest-Neighbor,計算LOF得分。
from sklearn.neighbors import LocalOutlierFactor as LOF
 
X = [[-1.1], [0.2], [100.1], [0.3]]
clf = LOF(n_neighbors=2)
res = clf.fit_predict(X)
print(res)
print(clf.negative_outlier_factor_)

2. Connectivity-Based Outlier Factor (COF)

資料來源:

  • [5] Nowak-Brzezińska, A., & Horyń, C. (2020). Outliers in rules-the comparision of LOF, COF and KMEANS algorithms. Procedia Computer Science , 176 , 1420-1429.
  • [6] 機器學習_學習筆記系列(98):基於連接異常因子分析(Connectivity-Based Outlier Factor) - 劉智皓 (Chih-Hao Liu)

COF是LOF的變種,相比於LOF,COF可以處理低密度下的異常值,COF的局部密度是基於平均鏈式距離計算得到。在一開始的時候我們一樣會先計算出每個點的k-nearest neighbor。而接下來我們會計算每個點的Set based nearest Path,如下圖:

  • 圖5:Set based nearest Path

假使我們今天我們的k=5,所以F的neighbor為B、C、D、E、G。而對於F離他最近的點為E,所以SBN Path的第一個元素是F、第二個是E。離E最近的點為D所以第三個元素為D,接下來離D最近的點為C和G,所以第四和五個元素為C和G,最後離C最近的點為B,第六個元素為B。所以整個流程下來,F的SBN Path為{F, E, D, C, G, C, B}。而對於SBN Path所對應的距離e={e1, e2, e3,…,ek},依照上面的例子e={3,2,1,1,1}。

所以我們可以説假使我們想計算p點的SBN Path,我們只要直接計算p點和其neighbor所有點所構成的graph的minimum spanning tree,之後我們再以p點為起點執行shortest path算法,就可以得到我們的SBN Path。

而接下來我們有了SBN Path我們就會接着計算,p點的鏈式距離: 有了ac_distance後,我們就可以計算COF:

# https://zhuanlan.zhihu.com/p/362358580
from pyod.models.cof import COF
cof = COF(contamination = 0.06,  ## 異常值所佔的比例
          n_neighbors = 20,      ## 近鄰數量
        )
cof_label = cof.fit_predict(iris.values) # 鳶尾花數據
print("檢測出的異常值數量為:",np.sum(cof_label == 1))

3. Stochastic Outlier Selection (SOS)

資料來源:

將特徵矩陣(feature martrix)或者相異度矩陣(dissimilarity matrix)輸入給SOS算法,會返回一個異常概率值向量(每個點對應一個)。SOS的思想是:當一個點和其它所有點的關聯度(affinity)都很小的時候,它就是一個異常點。

  • 圖6:SOS計算流程

SOS的流程:

  1. 計算相異度矩陣D;
  2. 計算關聯度矩陣A;
  3. 計算關聯概率矩陣B;
  4. 算出異常概率向量。

相異度矩陣D是各樣本兩兩之間的度量距離,比如歐式距離或漢明距離等。關聯度矩陣反映的是度量距離方差,如圖7,點 的密度最大,方差最小; 的密度最小,方差最大。而關聯概率矩陣B(binding probability matrix)就是把關聯矩陣(affinity matrix)按行歸一化得到的,如圖8所示。

  • 圖7:關聯度矩陣中密度可視化

  • 圖8:關聯概率矩陣

得到了binding probability matrix,每個點的異常概率值就用如下的公式計算,當一個點和其它所有點的關聯度(affinity)都很小的時候,它就是一個異常點。

# Ref: https://github.com/jeroenjanssens/scikit-sos
import pandas as pd
from sksos import SOS
iris = pd.read_csv("http://bit.ly/iris-csv")
X = iris.drop("Name", axis=1).values
detector = SOS()
iris["score"] = detector.predict(X)
iris.sort_values("score", ascending=False).head(10)

四、基於聚類的方法

1. DBSCAN

DBSCAN算法(Density-Based Spatial Clustering of Applications with Noise)的輸入和輸出如下,對於無法形成聚類簇的孤立點,即為異常點(噪聲點)。

  • ● 輸入:數據集,鄰域半徑Eps,鄰域中數據對象數目閾值MinPts;
  • ● 輸出:密度聯通簇。
  • 圖9:DBSCAN

處理流程如下

  1. 從數據集中任意選取一個數據對象點p;
  2. 如果對於參數Eps和MinPts,所選取的數據對象點p為核心點,則找出所有從p密度可達的數據對象點,形成一個簇;
  3. 如果選取的數據對象點 p 是邊緣點,選取另一個數據對象點;
  4. 重複以上2、3步,直到所有點被處理。
# Ref: https://zhuanlan.zhihu.com/p/515268801
from sklearn.cluster import DBSCAN
import numpy as np
X = np.array([[1, 2], [2, 2], [2, 3],
              [8, 7], [8, 8], [25, 80]])
clustering = DBSCAN(eps=3, min_samples=2).fit(X)
 
clustering.labels_
array([ 0,  0,  0,  1,  1, -1])
# 0,,0,,0:表示前三個樣本被分為了一個羣
# 1, 1:中間兩個被分為一個羣
# -1:最後一個為異常點,不屬於任何一個羣

五、基於樹的方法

1. Isolation Forest (iForest)

資料來源:

孤立森林中的 “孤立” (isolation) 指的是 “把異常點從所有樣本中孤立出來”,論文中的原文是 “separating an instance from the rest of the instances”。

我們用一個隨機超平面對一個數據空間進行切割,切一次可以生成兩個子空間。接下來,我們再繼續隨機選取超平面,來切割第一步得到的兩個子空間,以此循環下去,直到每子空間裏面只包含一個數據點為止。我們可以發現,那些密度很高的簇要被切很多次才會停止切割,即每個點都單獨存在於一個子空間內,但那些分佈稀疏的點,大都很早就停到一個子空間內了。所以,整個孤立森林的算法思想:異常樣本更容易快速落入葉子結點或者説,異常樣本在決策樹上,距離根節點更近。

隨機選擇m個特徵,通過在所選特徵的最大值和最小值之間隨機選擇一個值來分割數據點。觀察值的劃分遞歸地重複,直到所有的觀察值被孤立。

  • 圖10:孤立森林

獲得 t 個孤立樹後,單棵樹的訓練就結束了。接下來就可以用生成的孤立樹來評估測試數據了,即計算異常分數 s。對於每個樣本 x,需要對其綜合計算每棵樹的結果,通過下面的公式計算異常得分:

  • ● h(x):為樣本在iTree上的PathLength;
  • ● E(h(x)):為樣本在t棵iTree的PathLength的均值;
  • ● c(n):為n個樣本構建一個二叉搜索樹BST中的未成功搜索平均路徑長度(均值h(x)對外部節點終端的估計等同於BST中的未成功搜索)。 是對樣本x的路徑長度h(x)進行標準化處理。H(n-1)是調和數,可使用ln(n-1)+0.5772156649(歐拉常數)估算。

指數部分值域為(−∞,0),因此s值域為(0,1)。當PathLength越小,s越接近1,此時樣本為異常值的概率越大。

# Ref:https://zhuanlan.zhihu.com/p/484495545
from sklearn.datasets import load_iris 
from sklearn.ensemble import IsolationForest
 
data = load_iris(as_frame=True) 
X,y = data.data,data.target 
df = data.frame 
 
# 模型訓練
iforest = IsolationForest(n_estimators=100, max_samples='auto',  
                          contamination=0.05, max_features=4,  
                          bootstrap=False, n_jobs=-1, random_state=1)
 
 
#  fit_predict 函數 訓練和預測一起 可以得到模型是否異常的判斷,-1為異常,1為正常
df['label'] = iforest.fit_predict(X) 
 
# 預測 decision_function 可以得出 異常評分
df['scores'] = iforest.decision_function(X)

六、基於降維的方法

1. Principal Component Analysis (PCA)

資料來源:

PCA在異常檢測方面的做法,大體有兩種思路:

  • (1) 將數據映射到低維特徵空間,然後在特徵空間不同維度上查看每個數據點跟其它數據的偏差;
  • (2) 將數據映射到低維特徵空間,然後由低維特徵空間重新映射回原空間,嘗試用低維特徵重構原始數據,看重構誤差的大小。

PCA在做特徵值分解,會得到:

  • ● 特徵向量:反應了原始數據方差變化程度的不同方向;
  • ● 特徵值:數據在對應方向上的方差大小。

所以,最大特徵值對應的特徵向量為數據方差最大的方向,最小特徵值對應的特徵向量為數據方差最小的方向。原始數據在不同方向上的方差變化反應了其內在特點。如果單個數據樣本跟整體數據樣本表現出的特點不太一致,比如在某些方向上跟其它數據樣本偏離較大,可能就表示該數據樣本是一個異常點。

在前面提到第一種做法中,樣本$x_i$的異常分數為該樣本在所有方向上的偏離程度:

其中, 為樣本在重構空間裏離特徵向量的距離。若存在樣本點偏離各主成分越遠, 會越大,意味偏移程度大,異常分數高。 是特徵值,用於歸一化,使不同方向上的偏離程度具有可比性。

在計算異常分數時,關於特徵向量(即度量異常用的標杆)選擇又有兩種方式:

  • ● 考慮在前k個特徵向量方向上的偏差:前k個特徵向量往往直接對應原始數據裏的某幾個特徵,在前幾個特徵向量方向上偏差比較大的數據樣本,往往就是在原始數據中那幾個特徵上的極值點。
  • ● 考慮後r個特徵向量方向上的偏差:後r個特徵向量通常表示某幾個原始特徵的線性組合,線性組合之後的方差比較小反應了這幾個特徵之間的某種關係。在後幾個特徵方向上偏差比較大的數據樣本,表示它在原始數據裏對應的那幾個特徵上出現了與預計不太一致的情況。

得分大於閾值C則判斷為異常。

第二種做法,PCA提取了數據的主要特徵,如果一個數據樣本不容易被重構出來,表示這個數據樣本的特徵跟整體數據樣本的特徵不一致,那麼它顯然就是一個異常的樣本:

其中, 是基於k維特徵向量重構的樣本。

基於低維特徵進行數據樣本的重構時,捨棄了較小的特徵值對應的特徵向量方向上的信息。換一句話説,重構誤差其實主要來自較小的特徵值對應的特徵向量方向上的信息。基於這個直觀的理解,PCA在異常檢測上的兩種不同思路都會特別關注較小的特徵值對應的特徵向量。所以,我們説PCA在做異常檢測時候的兩種思路本質上是相似的,當然第一種方法還可以關注較大特徵值對應的特徵向量。

# Ref: https://zhuanlan.zhihu.com/p/48110105
from sklearn.decomposition import PCA
pca = PCA()
pca.fit(centered_training_data)
transformed_data = pca.transform(training_data)
y = transformed_data
 
# 計算異常分數
lambdas = pca.singular_values_
M = ((y*y)/lambdas)
 
# 前k個特徵向量和後r個特徵向量
q = 5
print "Explained variance by first q terms: ", sum(pca.explained_variance_ratio_[:q])
q_values = list(pca.singular_values_ < .2)
r = q_values.index(True)
 
# 對每個樣本點進行距離求和的計算
major_components = M[:,range(q)]
minor_components = M[:,range(r, len(features))]
major_components = np.sum(major_components, axis=1)
minor_components = np.sum(minor_components, axis=1)
 
# 人為設定c1、c2閾值
components = pd.DataFrame({'major_components': major_components, 
                               'minor_components': minor_components})
c1 = components.quantile(0.99)['major_components']
c2 = components.quantile(0.99)['minor_components']
 
# 製作分類器
def classifier(major_components, minor_components):  
    major = major_components > c1
    minor = minor_components > c2    
    return np.logical_or(major,minor)
 
results = classifier(major_components=major_components, minor_components=minor_components)

2. AutoEncoder

資料來源:

PCA是線性降維,AutoEncoder是非線性降維。根據正常數據訓練出來的AutoEncoder,能夠將正常樣本重建還原,但是卻無法將異於正常分佈的數據點較好地還原,導致還原誤差較大。因此如果一個新樣本被編碼,解碼之後,它的誤差超出正常數據編碼和解碼後的誤差範圍,則視作為異常數據。需要注意的是,AutoEncoder訓練使用的數據是正常數據(即無異常值),這樣才能得到重構後誤差分佈範圍是多少以內是合理正常的。所以AutoEncoder在這裏做異常檢測時,算是一種有監督學習的方法。

  • 圖11:自編碼器
# Ref: https://zhuanlan.zhihu.com/p/260882741
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
 
# 標準化數據
scaler = preprocessing.MinMaxScaler()
X_train = pd.DataFrame(scaler.fit_transform(dataset_train),
                              columns=dataset_train.columns,
                              index=dataset_train.index)
# Random shuffle training data
X_train.sample(frac=1)
X_test = pd.DataFrame(scaler.transform(dataset_test),
                             columns=dataset_test.columns,
                             index=dataset_test.index)
 
tf.random.set_seed(10)
act_func = 'relu'
# Input layer:
model=Sequential()
# First hidden layer, connected to input vector X.
model.add(Dense(10,activation=act_func,
                kernel_initializer='glorot_uniform',
                kernel_regularizer=regularizers.l2(0.0),
                input_shape=(X_train.shape[1],)
               )
         )
model.add(Dense(2,activation=act_func,
                kernel_initializer='glorot_uniform'))
model.add(Dense(10,activation=act_func,
                kernel_initializer='glorot_uniform'))
model.add(Dense(X_train.shape[1],
                kernel_initializer='glorot_uniform'))
model.compile(loss='mse',optimizer='adam')
print(model.summary())
 
 
# Train model for 100 epochs, batch size of 10:
NUM_EPOCHS=100
BATCH_SIZE=10
history=model.fit(np.array(X_train),np.array(X_train),
                  batch_size=BATCH_SIZE,
                  epochs=NUM_EPOCHS,
                  validation_split=0.05,
                  verbose = 1)
 
plt.plot(history.history['loss'],
         'b',
         label='Training loss')
plt.plot(history.history['val_loss'],
         'r',
         label='Validation loss')
plt.legend(loc='upper right')
plt.xlabel('Epochs')
plt.ylabel('Loss, [mse]')
plt.ylim([0,.1])
plt.show()
 
# 查看訓練集還原的誤差分佈如何,以便制定正常的誤差分佈範圍
X_pred = model.predict(np.array(X_train))
X_pred = pd.DataFrame(X_pred,
                      columns=X_train.columns)
X_pred.index = X_train.index
 
scored = pd.DataFrame(index=X_train.index)
scored['Loss_mae'] = np.mean(np.abs(X_pred-X_train), axis = 1)
plt.figure()
sns.distplot(scored['Loss_mae'],
             bins = 10,
             kde= True,
            color = 'blue')
plt.xlim([0.0,.5])
 
# 誤差閾值比對,找出異常值
X_pred = model.predict(np.array(X_test))
X_pred = pd.DataFrame(X_pred,
                      columns=X_test.columns)
X_pred.index = X_test.index
threshod = 0.3
scored = pd.DataFrame(index=X_test.index)
scored['Loss_mae'] = np.mean(np.abs(X_pred-X_test), axis = 1)
scored['Threshold'] = threshod
scored['Anomaly'] = scored['Loss_mae'] > scored['Threshold']
scored.head()

七、基於分類的方法

1. One-Class SVM

資料來源:

One-Class SVM,這個算法的思路非常簡單,就是尋找一個超平面將樣本中的正例圈出來,預測就是用這個超平面做決策,在圈內的樣本就認為是正樣本,在圈外的樣本是負樣本,用在異常檢測中,負樣本可看作異常樣本。它屬於無監督學習,所以不需要標籤。

  • 圖12:One-Class SVM

One-Class SVM又一種推導方式是SVDD(Support Vector Domain Description,支持向量域描述),對於SVDD來説,我們期望所有不是異常的樣本都是正類別,同時它採用一個超球體,而不是一個超平面來做劃分,該算法在特徵空間中獲得數據周圍的球形邊界,期望最小化這個超球體的體積,從而最小化異常點數據的影響。

假設產生的超球體參數為中心 o 和對應的超球體半徑r>0,超球體體積V(r)被最小化,中心o是支持行了的線性組合;跟傳統SVM方法相似,可以要求所有訓練數據點xi到中心的距離嚴格小於r。但是同時構造一個懲罰係數為C的鬆弛變量 ζi,優化問題入下所示:

C是調節鬆弛變量的影響大小,説的通俗一點就是,給那些需要鬆弛的數據點多少鬆弛空間,如果C比較小,會給離羣點較大的彈性,使得它們可以不被包含進超球體。詳細推導過程參考資料[15] [16]。

from sklearn import svm
# fit the model
clf = svm.OneClassSVM(nu=0.1, kernel='rbf', gamma=0.1)
clf.fit(X)
y_pred = clf.predict(X)
n_error_outlier = y_pred[y_pred == -1].size

八、基於預測的方法

資料來源:

對於單條時序數據,根據其預測出來的時序曲線和真實的數據相比,求出每個點的殘差,並對殘差序列建模,利用KSigma或者分位數等方法便可以進行異常檢測。具體的流程如下:

  • 圖13:基於預測的方法

九、總結

異常檢測方法總結如下:

參考資料

結束