基於圖演算法、圖資料庫、機器學習、GNN 的欺詐檢測方法與程式碼示例

語言: CN / TW / HK

 /fraud-detection-with-nebulagraph/featured-image.webp

本文是一個基於 NebulaGraph 圖演算法、圖資料庫、機器學習、GNN 的 Fraud Detection 方法綜述。

除了基本方法思想的介紹之外,我還給大家弄了可以跑的 Playground。值得一提的是,這是第一次給大家介紹 Nebula-DGL 這個專案。

 

基於圖資料庫的欺詐檢測方法

1.1 建立圖譜

首先,對現有的歷史資料、標註資訊面向關聯關係進行屬性圖建模。這種原始資料是多個表結構中的銀行、電子商務或者保險行業裡的交易事件記錄、使用者資料和風控標註,而建模過程就是抽象出我們關心的實體、實體間的關聯關係、和其中有意義的屬性。

一般來說,自然人、公司實體、電話號碼、地址、裝置(比如終端裝置、網路地址、終端裝置所連線的 WiFi SSID 等)、訂單都是實體本身,其他資訊比如風險標註(是否高風險、風險描述等)、自然人和公司實體的資訊(職業、收入、學歷等)都作為實體的屬性來建模。

下圖是一個可以參考的貸款反欺詐的示例建模,它來自一份作者開源的圖結構資料生成專案。

注,你可以訪問 https://github.com/wey-gu/fraud-detection-datagen 獲取這個開源的資料生成器程式碼和一份示例的資料。

 

 

1.2 圖資料庫查詢識別風險

有了一張囊括了人、公司、歷史貸款申請記錄、電話、線上申請網路裝置的圖譜,我們可以挖掘一些有意思的資訊。

事實上,很多被發現、並有效被阻止從而止損的騙保行為是具有群體聚集性的。比如欺詐團伙可能是一小批人(比如 3 到 5 人)有組織地收集更大規模的身份證資訊(比如 30 張),同時發起多個金融機構大量貸款,然後在放款後選擇丟棄這批留下了違約記錄的身份證,再進一步選擇下一批身份證資訊如法炮製。

這種團伙作案的方式因為利用了大量新的身份資訊,完全利用歷史記錄去黑名單規避風險的方式是無效的。不過,藉助於關聯關係的視角,這些模式是一定程度上可以被及時識別出來的。

這些可以被識別出的規律我把它分成兩種:

一種是風控專家可以直接用某種模式來描述的,例如:和已經被標註為高風險的實體有直接或者間接的關聯關係(新訂單申請人使用了和過往高風險記錄相同的網路裝置),這種模式對應到圖譜中,通過一個圖查詢就可以實時給出結果。

另一種是隱含在資料的關聯關係背後,需要通過圖演算法挖掘得出的一些風險提示,例如:儘管給定的實體與有限的標註高風險實體沒有匹配的關聯,但是它在圖中形成了聚集性可能提示我們這可能是一個尚未得手的進行中的團伙貸款詐騙的其中一次申請,這種情況可以通過定期在歷史資料中批量執行社群發現演算法得出,並在高聚集社群中利用中心性演算法給出核心實體,一併提示給風險專家進行後續評估和風險標註。

1.2.1 基於圖譜與專家圖模式匹配的欺詐檢測示例

在開始之前,我們利用 Nebula-UP 來一鍵部署一套 NebulaGraph 圖資料庫:

更多請參考 https://github.com/wey-gu/nebula-up/

curl -fsSL nebula-up.siwei.io/install.sh | bash

首先,我們把前邊建模的圖譜載入到 NebulaGraph 裡:

# 克隆資料集程式碼倉庫
git clone https://github.com/wey-gu/fraud-detection-datagen.git
cp -r data_sample_numerical_vertex_id data
# 去掉表頭
sed -i '1d' data/*.csv
docker run --rm -ti \
    --network=nebula-net \
    -v ${PWD}:/root/ \
    -v ${PWD}/data/:/data \
    vesoft/nebula-importer:v3.1.0 \
    --config /root/nebula_graph_importer.yaml

有了這樣一個圖譜,風控專家可以在視覺化探索工具中按需探索實體之間的關係,繪製相應的風險模式:

./viz_graph_query.webp

在這個探索截圖裡,我們可以明顯看到一個群控裝置的風險模式,這個模式可以被交給圖資料庫開發者,抽象成可以被風控應用定期、實時查詢的語句:

## 針對一筆交易申請關聯查詢
MATCH (n) WHERE id(n) == "200000010265"
OPTIONAL MATCH p_shared_d=(n)-[:used_device]->(d)<-[:used_device]-(:applicant)-[:with_phone_num]->(pn:phone_num)<-[e:with_phone_num]-(:applicant)
RETURN p_shared_d

我們可以很容易在此模型之上,通過修改返回的關聯裝置計數,作為意向指標查詢的判斷 API:

## 群控指標
MATCH (n) WHERE id(n) == "200000010265"
OPTIONAL MATCH p_shared_d=(n)-[:used_device]->(d)<-[:used_device]-(:applicant)-[:with_phone_num]->(pn:phone_num)<-[e:with_phone_num]-(:applicant)
RETURN count(e)

如此,我們可以建立一個相對有效的風控系統,利用有限的標註資料和專家資源,去更高效控制團伙欺詐作案風險,然而,在現實情況下,我們的大多數標註資料的獲取還是過於昂貴,那麼有沒有什麼方法是更有效利用有限的風險標註和圖結構,來預測出風險呢?

 

1.3 利用圖擴充標註

答案是肯定的, Xiaojin Z. 和 Zoubin G. 在論文:Learning from Labeled and Unlabeled Data with Label Propagation http://mlg.eng.cam.ac.uk/zoubin/papers/CMU-CALD-02-107.pdf(CMU-CALD-02-107)中,利用標籤傳播(Label Propagation)演算法來把有限的標註資訊在圖上通過關聯關係傳播到更多實體中。

這樣,在我們建立的圖譜中,我們可以很容易地藉助有限的高風險標註,去“傳播”產生更多的標註資訊。這些擴展出來的標註資訊一方面可以在實時的圖查詢中給出更多的結果,另一方面,它還能作為風控專家重要的輸入資訊,幫助推進反欺詐調查行動的開展。

一般來說,我們可以通過定期離線地全圖掃描資料,通過圖演算法擴充、更新標註,再將有效的更新標註寫回到圖譜之中。

1.3.1 圖演算法擴充欺詐風險標註的示例

下面,我給出一個可以跑通的案例:

這個例子中,我用到了 Yelp 這個欺詐識別的經典資料,這份資料不只會用在這個例子中,後邊 GNN 方法中的案例我也會用到它,所以大家可以耐心把資料匯入 NebulaGraph。

生成匯入的方法在這裡,https://github.com/wey-gu/nebulagraph-yelp-frauddetection

cd ~
git clone https://github.com/wey-gu/nebulagraph-yelp-frauddetection
cd nebulagraph-yelp-frauddetection
python3 -m pip install -r requirements.txt
python3 data_download.py

# 匯入相簿
docker run --rm -ti \
    --network=nebula-net \
    -v ${PWD}/yelp_nebulagraph_importer.yaml:/root/importer.yaml \
    -v ${PWD}/data:/root \
    vesoft/nebula-importer:v3.1.0 \
    --config /root/importer.yaml

結束之後,我們可以看一下圖上的統計:

~/.nebula-up/console.sh -e "USE yelp; SHOW STATS"

然後,我們可以看到:

([email protected]) [(none)]> USE yelp; SHOW STATS
+---------+---------------------------------------+---------+
| Type    | Name                                  | Count   |
+---------+---------------------------------------+---------+
| "Tag"   | "review"                              | 45954   |
| "Edge"  | "shares_restaurant_in_one_month_with" | 1147232 |
| "Edge"  | "shares_restaurant_rating_with"       | 6805486 |
| "Edge"  | "shares_user_with"                    | 98630   |
| "Space" | "vertices"                            | 45954   |
| "Space" | "edges"                               | 8051348 |
+---------+---------------------------------------+---------+
Got 6 rows (time spent 1911/4488 us)

目前,市面上的 LPA 標籤傳播演算法都是用來做社群檢測的,很少有實現是用來做標籤拓展的(只有 SK-Learn 中有這個實現),這裡,我們參考 Thibaud M https://datascience.stackexchange.com/users/77683/thibaud-m  給出來的實現。

原始的討論參考:https://datascience.stackexchange.com/a/55720/138720

為了讓這個演算法跑的快一點,會從 NebulaGraph 裡取一個點的子圖,在這個小的子圖上做標註的擴充:

首先,我們啟動一個 Jupyter 的 Playground,

參考 https://github.com/wey-gu/nebula-dgl 中的 Playground 過程:

git clone https://github.com/wey-gu/nebula-dgl.git
cd nebula-dgl
# 執行 Jupyter Notebook
docker run -it --name dgl -p 8888:8888 --network nebula-net \
    -v "$PWD":/home/jovyan/work jupyter/datascience-notebook \
    start-notebook.sh --NotebookApp.token='nebulagraph'

訪問:http://localhost:8888/lab/tree/work?token=nebulagraph

安裝依賴(這些依賴在後邊的 GNN 例子中也會被用到)

!python3 -m pip install git+https://github.com/vesoft-inc/[email protected]
!python3 -m pip install .

然後,我們從圖中讀取一個子圖,從 2048 這個點開始探索兩步內的所有邊。

import torch
import json
from torch import tensor
from dgl import DGLHeteroGraph, heterograph

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

config = Config()
config.max_connection_pool_size = 2
connection_pool = ConnectionPool()
connection_pool.init([('graphd', 9669)], config)

vertex_id = 2048
client = connection_pool.get_session('root''nebula')
r = client.execute_json(
    "USE yelp;"
    f"GET SUBGRAPH WITH PROP 2 STEPS FROM {vertex_id} YIELD VERTICES AS nodes, EDGES AS relationships;")

r = json.loads(r)
data = r.get('results', [{}])[0].get('data')
columns = r.get('results', [{}])[0].get('columns')

# create node and nodedata
node_id_map = {} # key: vertex id in NebulaGraph, value: node id in dgl_graph
node_idx = 0
features = [[] for _ in range(32)] + [[]]
for i in range(len(data)):
    for index, node in enumerate(data[i]['meta'][0]):
        nodeid = data[i]['meta'][0][index]['id']
        if nodeid not in node_id_map:
            node_id_map[nodeid] = node_idx
            node_idx += 1
            for f in range(32):
                features[f].append(data[i]['row'][0][index][f"review.f{f}"])
            features[32].append(data[i]['row'][0][index]['review.is_fraud'])

rur_start, rur_end, rsr_start, rsr_end, rtr_start, rtr_end = [], [], [], [], [], []
for i in range(len(data)):
    for edge in data[i]['meta'][1]:
        edge = edge['id']
        if edge['name'] == 'shares_user_with':
            rur_start.append(node_id_map[edge['src']])
            rur_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_rating_with':
            rsr_start.append(node_id_map[edge['src']])
            rsr_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_in_one_month_with':
            rtr_start.append(node_id_map[edge['src']])
            rtr_end.append(node_id_map[edge['dst']])

data_dict = {}
if rur_start:
    data_dict[('review''shares_user_with''review')] = tensor(rur_start), tensor(rur_end)
if rsr_start:
    data_dict[('review''shares_restaurant_rating_with''review')] = tensor(rsr_start), tensor(rsr_end)
if rtr_start:
    data_dict[('review''shares_restaurant_in_one_month_with''review')] = tensor(rtr_start), tensor(rtr_end)
# construct a dgl_graph, ref: https://docs.dgl.ai/en/0.9.x/generated/dgl.heterograph.html

dgl_graph: DGLHeteroGraph = heterograph(data_dict)

# load node features to dgl_graph
dgl_graph.ndata['label'] = tensor(features[32])

# heterogeneous graph to heterogeneous graph, keep ndata and edata
import dgl
hg = dgl.to_homogeneous(dgl_graph, ndata=['label'])

然後,我們用上邊提到的 Torch Label Spreading 實現,應用到我們的圖上。

from abc import abstractmethod
import torch

class BaseLabelPropagation:
    """Base class for label propagation models.
    
    Parameters
    ----------
    adj_matrix: torch.FloatTensor
        Adjacency matrix of the graph.
    """
    def __init__(self, adj_matrix):
        self.norm_adj_matrix = self._normalize(adj_matrix)
        self.n_nodes = adj_matrix.size(0)
        self.one_hot_labels = None 
        self.n_classes = None
        self.labeled_mask = None
        self.predictions = None

    @staticmethod
    @abstractmethod
    def _normalize(adj_matrix):
        raise NotImplementedError("_normalize must be implemented")

    @abstractmethod
    def _propagate(self):
        raise NotImplementedError("_propagate must be implemented")

    def _one_hot_encode(self, labels):
        # Get the number of classes
        classes = torch.unique(labels)
        classes = classes[classes != -1]
        self.n_classes = classes.size(0)

        # One-hot encode labeled data instances and zero rows corresponding to unlabeled instances
        unlabeled_mask = (labels == -1)
        labels = labels.clone()  # defensive copying
        labels[unlabeled_mask] = 0
        self.one_hot_labels = torch.zeros((self.n_nodes, self.n_classes), dtype=torch.float)
        self.one_hot_labels = self.one_hot_labels.scatter(1, labels.unsqueeze(1), 1)
        self.one_hot_labels[unlabeled_mask, 0] = 0

        self.labeled_mask = ~unlabeled_mask

    def fit(self, labels, max_iter, tol):
        """Fits a semi-supervised learning label propagation model.
        
        labels: torch.LongTensor
            Tensor of size n_nodes indicating the class number of each node.
            Unlabeled nodes are denoted with -1.
        max_iter: int
            Maximum number of iterations allowed.
        tol: float
            Convergence tolerance: threshold to consider the system at steady state.
        """
        self._one_hot_encode(labels)

        self.predictions = self.one_hot_labels.clone()
        prev_predictions = torch.zeros((self.n_nodes, self.n_classes), dtype=torch.float)

        for i in range(max_iter):
            # Stop iterations if the system is considered at a steady state
            variation = torch.abs(self.predictions - prev_predictions).sum().item()
            
            if variation < tol:
                print(f"The method stopped after {i} iterations, variation={variation:.4f}.")
                break

            prev_predictions = self.predictions
            self._propagate()

    def predict(self):
        return self.predictions

    def predict_classes(self):
        return self.predictions.max(dim=1).indices

class LabelPropagation(BaseLabelPropagation):
    def __init__(self, adj_matrix):
        super().__init__(adj_matrix)

    @staticmethod
    def _normalize(adj_matrix):
        """Computes D^-1 * W"""
        degs = adj_matrix.sum(dim=1)
        degs[degs == 0] = 1  # avoid division by 0 error
        return adj_matrix / degs[:, None]

    def _propagate(self):
        self.predictions = torch.matmul(self.norm_adj_matrix, self.predictions)

        # Put back already known labels
        self.predictions[self.labeled_mask] = self.one_hot_labels[self.labeled_mask]

    def fit(self, labels, max_iter=1000, tol=1e-3):
        super().fit(labels, max_iter, tol)

class LabelSpreading(BaseLabelPropagation):
    def __init__(self, adj_matrix):
        super().__init__(adj_matrix)
        self.alpha = None

    @staticmethod
    def _normalize(adj_matrix):
        """Computes D^-1/2 * W * D^-1/2"""
        degs = adj_matrix.sum(dim=1)
        norm = torch.pow(degs, -0.5)
        norm[torch.isinf(norm)] = 1
        return adj_matrix * norm[:, None] * norm[None, :]

    def _propagate(self):
        self.predictions = (
            self.alpha * torch.matmul(self.norm_adj_matrix, self.predictions)
            + (1 - self.alpha) * self.one_hot_labels
        )
    
    def fit(self, labels, max_iter=1000, tol=1e-3, alpha=0.5):
        """
        Parameters
        ----------
        alpha: float
            Clamping factor.
        """
        self.alpha = alpha
        super().fit(labels, max_iter, tol)
        
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

nx_hg = hg.to_networkx()
adj_matrix = nx.adjacency_matrix(nx_hg).toarray()
labels = hg.ndata['label']
# Create input tensors
adj_matrix_t = torch.FloatTensor(adj_matrix)
labels_t = torch.LongTensor(labels)

# Learn with Label Propagation
label_propagation = LabelPropagation(adj_matrix_t)
print("Label Propagation: ", end="")
label_propagation.fit(labels_t)
label_propagation_output_labels = label_propagation.predict_classes()

# Learn with Label Spreading
label_spreading = LabelSpreading(adj_matrix_t)
print("Label Spreading: ", end="")
label_spreading.fit(labels_t, alpha=0.8)
label_spreading_output_labels = label_spreading.predict_classes()

現在咱們看看染色的傳播效果:

color_map = {0: "blue", 1: "green"}
input_labels_colors = [color_map[int(l)] for l in labels]
lprop_labels_colors = [color_map[int(l)] for l in label_propagation_output_labels.numpy()]
lspread_labels_colors = [color_map[int(l)] for l in label_spreading_output_labels.numpy()]

plt.figure(figsize=(14, 6))
ax1 = plt.subplot(1, 4, 1)
ax2 = plt.subplot(1, 4, 2)
ax3 = plt.subplot(1, 4, 3)

ax1.title.set_text("Raw data (2 classes)")
ax2.title.set_text("Label Propagation")
ax3.title.set_text("Label Spreading")

pos = nx.spring_layout(nx_hg)
nx.draw(nx_hg, ax=ax1, pos=pos, node_color=input_labels_colors, node_size=50)
nx.draw(nx_hg, ax=ax2, pos=pos, node_color=lprop_labels_colors, node_size=50)
nx.draw(nx_hg, ax=ax3, pos=pos, node_color=lspread_labels_colors, node_size=50)

# Legend
ax4 = plt.subplot(1, 4, 4)
ax4.axis("off")
legend_colors = ["orange""blue""green""red""cyan"]
legend_labels = ["unlabeled""class 0""class 1""class 2""class 3"]
dummy_legend = [ax4.plot([], [], ls='-', c=c)[0] for c in legend_colors]
plt.legend(dummy_legend, legend_labels)

plt.show()

可以看到最後畫出來的結果:

./lpa_spread_notation_matplot.webp

可以看到有一些藍色標籤被 Spread 開了,實際上我的這個例子的效果並不理想(因為這個例子裡,綠色的才是重要的標籤),不過我給的子圖實在是太小了,也本不應該奢求有好的結果,只是為了個大家演示一下這個方法。

 

1.4 帶有圖特徵的機器學習

在風控領域開始利用圖的思想和能力之前,已經有很多利用機器學習的分類演算法基於歷史資料預測高風險行為的方法了,這些方法把記錄中領域專家認為有關的資訊(例如:年齡、學歷、收入)作為特徵,歷史標註資訊作為標籤去訓練風險預測模型。

那麼讀到的這裡,我們是否會想到在這些方法的基礎之上,如果把基於圖結構的屬性也考慮進來,作為特徵去訓練的模型可能更有效呢?答案也是肯定的,已經有很多論文和工程實踐揭示這樣的模型比未考慮圖特徵的演算法更加有效。這些被嘗試有效的圖結構特徵可能是實體的 PageRank 值、Degree 值或者是某一個社群發現演算法得出的社群 id。

在生產上,我們可以定期從圖譜中獲得實時的全圖資訊,在圖計算平臺中分析運算獲得所需特徵,經過預定的資料管道,匯入機器學習模型中週期獲得新的風險提示,並將部分結果寫回圖譜方便其他系統和專家抽取、參考。

1.4.1 帶有圖特徵的機器學習欺詐檢測示例

這裡,機器學習的方法我就不演示了,就是常見的分類方法,在此之上,我們可以在資料中通過圖演算法獲得一些新的屬性,這些屬性再處理一下作為新的特徵。我只演示一個社群發現的方法,我們可以對全圖跑一個 Louvain,得出不同節點的社群歸屬,然後把社群的值當做一個分類處理成為數值的特徵。

這個例子裡我們還用 https://github.com/wey-gu/fraud-detection-datagen 這個資料,在此基礎上,這個例子我用到了 Nebula-Algorithm 這個專案,它是一個 Spark 應用,可以在 NebulaGraph 相簿上執行很多常用的圖演算法。

首先,我們部署 Spark 和 Nebula Algorithm,還是利用 Nebula-UP,一鍵部署:

curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark

叢集起來之後,因為需要的配置檔案我已經放在了 Nebula-UP 內部,我們只需要一行就可以執行演算法啦!

cd ~/.nebula-up/nebula-up/spark && ls -l

docker exec -it sparkmaster /spark/bin/spark-submit \
    --master "local" --conf spark.rpc.askTimeout=6000s \
    --class com.vesoft.nebula.algorithm.Main \
    --driver-memory 4g /root/download/nebula-algo.jar \
    -p /root/louvain.conf

而最終的結果就在 sparkmaster 容器內的 /output 裡:

# docker exec -it sparkmaster bash
ls -l /output

之後,我們可以對這個 Louvain 的圖特徵做一些處理,並開始傳統的模型訓練了。

 

1.5 圖神經網路的方法

然而,這些圖特徵的方法的問題在於沒能充分考慮關聯關係,特徵工程昂貴繁瑣

注,這裡,我們使用的的工具為 Deep Graph library(DGL),NebulaGraph 圖資料庫和他們之間的橋樑,Nebula-DGL。

  • DGL: https://www.dgl.ai/

  • Nebula-DGL: https://github.com/wey-gu/nebula-dgl 我也是這個庫的作者 😁

在最近幾年的成果中,基於 GNN 的方法通過將圖結構與屬性資訊進行嵌入表示,使得我們能在不進行圖特徵抽取、特徵工程、專家與工程方法的資料標註的情況下,得到相比於基於傳統圖特徵的機器學習更好的效果。有意思的是,現在正是這些方法快速被發現、演進的時期,基於圖的深度學習是之前幾年最熱門的機器學習研究方向之一。

同時,圖深度學習的一些方法可以做到 Inductive Learning——模型可以在新的點、邊上進行推理,這樣,配合圖資料庫上線上的子圖查詢能力,線上實時的風險預測也變得很簡單可行了。

1.5.1 基於圖表示的圖神經網路欺詐檢測系統示例

利用 GNN 的方法中,圖資料庫並不是必須的,資料的儲存可以在其他幾種常見的介質之中,但是相簿能夠最大化助益模型訓練、模型更新、線上結果的更新。當我們把圖資料庫作為資料的單一資料來源(single source of truth)的時候,所有的基於線上、離線、圖譜的方法可以很容易被整合起來,從而組合所有方法的優勢與結果,做出更有效的欺詐檢測複合系統。

在這個示例中我們一樣分為:資料處理模型訓練構建檢測系統這幾部分。

注,這裡,我們使用的的工具為 Deep Graph library(DGL),NebulaGraph 圖資料庫和他們之間的橋樑,Nebula-DGL。

  • DGL: https://www.dgl.ai/

  • Nebula-DGL: https://github.com/wey-gu/nebula-dgl 我也是這個庫的作者 😁

1.5.1.1 資料集

本例中,我們使用的資料集是 Yelp-Fraud,他直接來自於論文 Enhancing Graph Neural Network-based Fraud Detectors against Camouflaged Fraudsters

這個圖中有一種點,三種關係:

  • 每一個評價中有被標註了的是否是虛假、欺詐評價的標籤

  • 32 個已經被處理過的數字型屬性

  • R-U-R:兩個評價由同一個使用者發出 shares_user_with

  • R-S-R:兩個評價是同餐廳同評分(評分可以是 1 到 5) shares_restaurant_rating_with

  • R-T-R:兩個評價是同餐廳同提交月份 shares_restaurant_in_one_month_with

  • 頂點:來自 Yelp 中的餐廳、酒店的評價,有兩類屬性:

  • 邊:三類評價之間的關聯關係

在開始之前,我們假設這個圖已經在我們的 NebulaGraph 裡邊了。

注,我已經幫大家提前做好了將這張圖匯入 NebulaGraph 的工作,長話短說就是:

# 部署 NebulaGraph
curl -fsSL nebula-up.siwei.io/install.sh | bash

# 拉取這個資料的 Repo
git clone https://github.com/wey-gu/nebulagraph-yelp-frauddetection && cd nebulagraph-yelp-frauddetection

# 安裝依賴,執行資料下載生成
python3 -m pip install -r requirements.txt
python3 data_download.py

# 匯入到 NebulaGraph
docker run --rm -ti \
 --network=nebula-net \
 -v ${PWD}/yelp_nebulagraph_importer.yaml:/root/importer.yaml \
 -v ${PWD}/data:/root \
 vesoft/nebula-importer:v3.1.0 \
 --config /root/importer.yaml

詳情參考:https://github.com/wey-gu/nebulagraph-yelp-frauddetection

1.5.1.2 資料處理

這部分的任務是將圖譜中和風險相關子圖的拓撲結構表示和其中有關的特徵(屬性)進行工程處理,序列化成為 DGL 的圖物件。

DGL 本身支援從點、邊列表(edgelist)形式 CSV 檔案,或者從 NetworkX 和 SciPy 的序列化稀疏的鄰接矩陣(adjacency matrix)的資料來構造它的圖物件,我們可以把原始的圖資料或者相簿中的資料全量匯出為這些形式,不過在真實的例子中相簿中的資料是實時變化的,能夠直接在 NebulaGraph 中的子圖上做 GNN 訓練一般來說是更理想。得益於 Nebula-DGL 這個庫,做這件事兒是很自然的。

注: DGL 外部資料員匯入文件:https://docs.dgl.ai/guide/graph-external.html

現在我們開始這個資料的匯入,在這之前,我先介紹一下 Nebula-DGL。

Nebula-DGL 可以根據給定的對映和轉換規則(YAML 格式),將 NebulaGraph 中的頂點、邊,和它們的屬性按照規則處理成為點、邊、和其中的標註(Label)與特徵(Feature),從而構造為 DGL 的圖物件。這其中,值得一提的是屬性到特徵的轉換。我們知道,特徵可能是某一個屬性的值、一個或者多個屬性的值做一定的數學變換、亦或是字元型的屬性按照列舉規則輸出為數字。相應的,Nebula-DGL 在規則中,我們都可以針對這幾種情況利用 filter 進行表達:

  • 特徵直接選取屬性的值:

這個例子裡,NebulaGraph 圖中 follow 這個邊將被抽取,邊上的屬性 degree 的值將直接被作為名為 degree 的特徵。

edge_types:
  - name: follow
    start_vertex_tag: player
    end_vertex_tag: player
    features:
      - name: degree
        properties:
          - name: degree
            type: int
            nullable: False
        filter:
          type: value
  • 特徵從屬性中經過數學變換

這個例子中,我們把 serve 邊之中的兩個屬性進行 (end_year - start_year) / 30 的處理,變為 service_time 這樣的一個特徵。

edge_types:
  - name: serve
    start_vertex_tag: player
    end_vertex_tag: team
    features:
      - name: service_time
        properties:
          - name: start_year
            type: int
            nullable: False
          - name: end_year
            type: int
            nullable: False
        # The variable was mapped by order of properties
        filter:
          typefunction
          function"lambda start_year, end_year: (end_year - start_year) / 30"
  • 列舉屬性值為數字特徵

這個例子中,我們把 team 頂點中的 name 屬性進行列舉,根據

vertex_tags:
  - name: team
    features:
      - name: coast
        properties:
          - name: name
            type: str
            nullable: False
        filter:
          # 0 stand for the east coast, 1 stand for the west coast
          type: enumeration
          enumeration:
            Celtics: 0
            Nets: 0
            Nuggets: 1
            Timberwolves: 1
            Thunder: 1
# ... not showing all teams here

可以看到這個轉換規則非常簡單直接,大家也可以參考 Nebula-DGL 的完整例子瞭解全部細節 https://github.com/wey-gu/nebula-dgl/tree/main/example。而有上邊資料處理規則的瞭解之後,我們可以開始處理這個 Yelp 圖資料了。

首先,定義如下規則,這裡,我們把頂點 review 和三種邊都對應過來了,同時,review 上的屬性也按照原本的值對應了過來:

nebulagraph_yelp_dgl_mapper.yaml

---
# If vertex id is string-typed, remap_vertex_id must be true.
remap_vertex_id: True
space: yelp
# str or int
vertex_id_type: int
vertex_tags:
  - name: review
    label:
      name: is_fraud
      properties:
        - name: is_fraud
          type: int
          nullable: False
      filter:
        type: value
    features:
      - name: f0
        properties:
          - name: f0
            typefloat
            nullable: False
        filter:
          type: value
      - name: f1
        properties:
          - name: f1
            typefloat
            nullable: False
        filter:
          type: value
# ...
      - name: f31
        properties:
          - name: f31
            typefloat
            nullable: False
        filter:
          type: value
edge_types:
  - name: shares_user_with
    start_vertex_tag: review
    end_vertex_tag: review
  - name: shares_restaurant_rating_with
    start_vertex_tag: review
    end_vertex_tag: review
  - name: shares_restaurant_in_one_month_with
    start_vertex_tag: review
    end_vertex_tag: review

然後,我們在安裝好 Nebula-DGL 之後只需要這幾行程式碼就可以將 NebulaGraph 中的這張圖構造為 DGL 的 DGLHeteroGraph 圖物件:

from nebula_dgl import NebulaLoader


nebula_config = {
    "graph_hosts": [
                ('graphd', 9669),
                ('graphd1', 9669),
                ('graphd2', 9669)
            ],
    "nebula_user""root",
    "nebula_password""nebula",
}

# load feature_mapper from yaml file
with open('nebulagraph_yelp_dgl_mapper.yaml''r') as f:
    feature_mapper = yaml.safe_load(f)

nebula_loader = NebulaLoader(nebula_config, feature_mapper)
g = nebula_loader.load()

g = g.to('cpu')
device = torch.device('cpu')

1.5.1.3 模型訓練

這裡,我用 GraphSAGE 演算法的點分類(Node Classification)方法來舉例,GraphSAGE 的原始版本是一個歸納學習(Inductive Learning)的演算法,這裡,歸納學習區別於它的反面:Transductive Learning ,可以把新的資料用在完全舊的圖之上習得的模型,這樣訓練出來的模型可以進行線上增量資料的欺詐檢測(而不是需要重新載入為全圖訓練才可以)。

https://user-images.githubusercontent.com/1651790/182301784-21850dac-0d47-4dd5-b66f-a28b87fe9d4d.svg

模型訓練系統(左邊):

  • 輸入:帶有欺詐標註的歷史交易圖譜

  • 輸出:一個 GraphSAGE 的 DGL 模型

線上推理系統(右邊):

  • 模型:基於帶有欺詐標註的歷史交易圖譜基於 GraphSAGE 訓練

  • 輸入:一筆新的交易

  • 輸出:這筆交易是否涉嫌欺詐

分割資料集

機器學習訓練的過程需要在已經有的資料、資訊中分割出用來訓練、驗證和測試的子集。他們可以是不相交的整體資料的真子集也可以彼此有重疊,在實際的情況中,有時候我們對資料的標註常常是不充分的,所以按照標註的比例去分割資料可能更有意義一些。下邊的例子是我按照點上是否標註欺詐為標準去分割資料集。

這裡邊有兩個地方值得注意:

train_test_split 中的 stratify=g.ndata['is_fraud']代表保持 is_fraud 的值的分佈去分割,符合我們前邊提到的思想。

我們分割的是 idx 索引,這樣,可以最終獲得三個集合的索引,供訓練、驗證和測試時候使用。同時我們還把對應集合 mask 放到圖物件 g 裡邊去了。

# Split the graph into train, validation, and test sets

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# features are g.ndata['f0'], g.ndata['f1'], g.ndata['f2'], ... g.ndata['f31']
# label is in g.ndata['is_fraud']

# concatenate all features
features = []
for i in range(32):
    features.append(g.ndata['f' + str(i)])

g.ndata['feat'] = torch.stack(features, dim=1)
g.ndata['label'] = g.ndata['is_fraud']
# numpy array as an index of range n

idx = torch.tensor(np.arange(g.number_of_nodes()), device=device, dtype=torch.int64)

# split based on value distribution of label: the property "is_fraud", which is a binary variable.
X_train_and_val_idx, X_test_idx, y_train_and_val, y_test = train_test_split(
    idx, g.ndata['is_fraud'], test_size=0.2, random_state=42, stratify=g.ndata['is_fraud'])

# split train and val
X_train_idx, X_val_idx, y_train, y_val = train_test_split(
    X_train_and_val_idx, y_train_and_val, test_size=0.2, random_state=42, stratify=y_train_and_val)

# list of index to mask
train_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
train_mask[X_train_idx] = True

val_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
val_mask[X_val_idx] = True

test_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
test_mask[X_test_idx] = True

g.ndata['train_mask'] = train_mask
g.ndata['val_mask'] = val_mask
g.ndata['test_mask'] = test_mask

異構圖轉換為同構圖

GraphSAGE 是針對同構圖,且邊無 feature 的演算法,而我們當下的 Yelp 圖譜是異構的:一類點、三類邊。那麼,如何才能用 GraphSAGE 去建模 Yelp 圖譜呢?

我們除了選擇用針對異構圖的 Inductive Learning 方法之外,還可想辦法把同構圖轉換成異構圖。為了在轉換中不丟失重要的邊型別資訊,我們可以把邊型別變成數值。

這裡我給了一維的 edge feature,當然(3-1)二維也是可以的。

# shares_restaurant_in_one_month_with: 1, b"001"
# shares_restaurant_rating_with: 2, b"010"
# shares_user_with: 4, b"100"

注:其實如果想用 0, 1, 2 這樣的分佈,轉換到同構圖之後的 hg.edata['_TYPE'] 也是可以直接拿來用的,詳見 https://docs.dgl.ai/en/0.9.x/generated/dgl.to_homogeneous.html 中的例子。

程式碼如下:

# three types of edges
In [1]: g.etypes
Out[1]:
['shares_restaurant_in_one_month_with',
 'shares_restaurant_rating_with',
 'shares_user_with']

In [2]:
g.edges['shares_restaurant_in_one_month_with'].data['he'] = torch.ones(
    g.number_of_edges('shares_restaurant_in_one_month_with'), dtype=torch.int64)
g.edges['shares_restaurant_rating_with'].data['he'] = torch.full(
    (g.number_of_edges('shares_restaurant_rating_with'),), 2, dtype=torch.int64)
g.edges['shares_user_with'].data['he'] = torch.full(
    (g.number_of_edges('shares_user_with'),), 4, dtype=torch.int64)

In [3]: g.edata['he']
Out[3]:
{('review',
  'shares_restaurant_in_one_month_with',
  'review'): tensor([1, 1, 1,  ..., 1, 1, 1]),
 ('review',
  'shares_restaurant_rating_with',
  'review'): tensor([2, 2, 2,  ..., 2, 2, 2]),
 ('review''shares_user_with''review'): tensor([4, 4, 4,  ..., 4, 4, 4])}

參考:https://discuss.dgl.ai/t/how-to-convert-from-a-heterogeneous-graph-to-a-homogeneous-graph-with-data/2764

然後將它轉換為同構圖,把 he 作為要保留的 edata:

hg = dgl.to_homogeneous(g, edata=['he'], ndata=['feat''label''train_mask''val_mask''test_mask'])

參考:https://docs.dgl.ai/en/latest/guide/graph-heterogeneous.html?highlight=to_homogeneous#converting-heterogeneous-graphs-to-homogeneous-graphs

預設的 GraphSAGE 實現是沒考慮 edge feature 的,我們要修改訊息傳遞的步驟,在後邊會涉及到這部分的實操。

參考:

  • https://discuss.dgl.ai/t/frequently-asked-questions-faq/1681 (問題 13)

  • https://discuss.dgl.ai/t/using-node-and-edge-features-in-message-passing/762

模型訓練程式碼

DGL 官方給出了例子在:https://github.com/dmlc/dgl/tree/master/examples/pytorch/graphsage,我在測試的時候還修復了一個小 bug。

因為我們處理過的同構圖裡是帶有 edge feature 的,不能照搬官方的 GraphSAGE 例子程式碼,我們有兩種方法來處理它:

a. 可以稍微改動一下 SAGEConv 訊息傳遞的部分,以 mean 聚合的方式為例:

  graph.update_all(msg_fn, fn.mean('m''neigh'))
+ graph.update_all(fn.copy_e('he''m'), fn.mean('m''neigh'))
- h_neigh = graph.dstdata['neigh']
+ h_neigh = torch.cat((graph.dstdata['neigh'], graph.dstdata['neigh_e'].reshape(-1, 1)), 1)

這個處理中,除了上邊訊息傳遞部分增加 edge feature 之外,還需要注意 feature 維度的處理。

b. 把邊引數作為邊權重,以 mean 聚合為例:

- graph.update_all(msg_fn, fn.mean('m''neigh'))
+ # consdier datatype with different weight, g.edata['he'] as weight here
+ g.update_all(fn.u_mul_e('h''he''m'), fn.mean('m''h'))

下邊,我們以把邊的型別作為權重的方式,mean 作為聚合的情況為例來實操:

我們來繼承,並覆蓋 SAGEConv

我們其實只是修改了 Message Passing 的部分。

from dgl import function as fn
from dgl.utils import check_eq_shape, expand_as_pair

class SAGEConv(dglnn.SAGEConv):
    def forward(self, graph, feat, edge_weight=None):
        r"""

        Description
        -----------
        Compute GraphSAGE layer.

        Parameters
        ----------
        graph : DGLGraph
            The graph.
        feat : torch.Tensor or pair of torch.Tensor
            If a torch.Tensor is given, it represents the input feature of shape
            :math:`(N, D_{in})`
            where :math:`D_{in}` is size of input feature, :math:`N` is the number of nodes.
            If a pair of torch.Tensor is given, the pair must contain two tensors of shape
            :math:`(N_{in}, D_{in_{src}})` and :math:`(N_{out}, D_{in_{dst}})`.
        edge_weight : torch.Tensor, optional
            Optional tensor on the edge. If given, the convolution will weight
            with regard to the message.

        Returns
        -------
        torch.Tensor
            The output feature of shape :math:`(N_{dst}, D_{out})`
            where :math:`N_{dst}` is the number of destination nodes in the input graph,
            :math:`D_{out}` is the size of the output feature.
        """
        self._compatibility_check()
        with graph.local_scope():
            if isinstance(feat, tuple):
                feat_src = self.feat_drop(feat[0])
                feat_dst = self.feat_drop(feat[1])
            else:
                feat_src = feat_dst = self.feat_drop(feat)
                if graph.is_block:
                    feat_dst = feat_src[:graph.number_of_dst_nodes()]
            msg_fn = fn.copy_src('h''m')
            if edge_weight is not None:
                assert edge_weight.shape[0] == graph.number_of_edges()
                graph.edata['_edge_weight'] = edge_weight
                msg_fn = fn.u_mul_e('h''_edge_weight''m')

            h_self = feat_dst

            # Handle the case of graphs without edges
            if graph.number_of_edges() == 0:
                graph.dstdata['neigh'] = torch.zeros(
                    feat_dst.shape[0], self._in_src_feats).to(feat_dst)

            # Determine whether to apply linear transformation before message passing A(XW)
            lin_before_mp = self._in_src_feats > self._out_feats

            # Message Passing
            if self._aggre_type == 'mean':
                graph.srcdata['h'] = self.fc_neigh(feat_src) if lin_before_mp else feat_src
                # graph.update_all(msg_fn, fn.mean('m', 'neigh'))
                #########################################################################
                # consdier datatype with different weight, g.edata['he'] as weight here
                g.update_all(fn.u_mul_e('h''he''m'), fn.mean('m''h'))
                #########################################################################
                h_neigh = graph.dstdata['neigh']
                if not lin_before_mp:
                    h_neigh = self.fc_neigh(h_neigh)
            elif self._aggre_type == 'gcn':
                check_eq_shape(feat)
                graph.srcdata['h'] = self.fc_neigh(feat_src) if lin_before_mp else feat_src
                if isinstance(feat, tuple):  # heterogeneous
                    graph.dstdata['h'] = self.fc_neigh(feat_dst) if lin_before_mp else feat_dst
                else:
                    if graph.is_block:
                        graph.dstdata['h'] = graph.srcdata['h'][:graph.num_dst_nodes()]
                    else:
                        graph.dstdata['h'] = graph.srcdata['h']
                graph.update_all(msg_fn, fn.sum('m''neigh'))
                graph.update_all(fn.copy_e('he''m'), fn.sum('m''neigh'))
                # divide in_degrees
                degs = graph.in_degrees().to(feat_dst)
                h_neigh = (graph.dstdata['neigh'] + graph.dstdata['h']) / (degs.unsqueeze(-1) + 1)
                if not lin_before_mp:
                    h_neigh = self.fc_neigh(h_neigh)
            elif self._aggre_type == 'pool':
                graph.srcdata['h'] = F.relu(self.fc_pool(feat_src))
                graph.update_all(msg_fn, fn.max('m''neigh'))
                graph.update_all(fn.copy_e('he''m'), fn.max('m''neigh'))
                h_neigh = self.fc_neigh(graph.dstdata['neigh'])
            elif self._aggre_type == 'lstm':
                graph.srcdata['h'] = feat_src
                graph.update_all(msg_fn, self._lstm_reducer)
                h_neigh = self.fc_neigh(graph.dstdata['neigh'])
            else:
                raise KeyError('Aggregator type {} not recognized.'.format(self._aggre_type))

            # GraphSAGE GCN does not require fc_self.
            if self._aggre_type == 'gcn':
                rst = h_neigh
            else:
                rst = self.fc_self(h_self) + h_neigh

            # bias term
            if self.bias is not None:
                rst = rst + self.bias

            # activation
            if self.activation is not None:
                rst = self.activation(rst)
            # normalization
            if self.norm is not None:
                rst = self.norm(rst)
            return rst

定義模型

class SAGE(nn.Module):
    def __init__(self, in_size, hid_size, out_size):
        super().__init__()
        self.layers = nn.ModuleList()
        # three-layer GraphSAGE-mean
        self.layers.append(dglnn.SAGEConv(in_size, hid_size, 'mean'))
        self.layers.append(dglnn.SAGEConv(hid_size, hid_size, 'mean'))
        self.layers.append(dglnn.SAGEConv(hid_size, out_size, 'mean'))
        self.dropout = nn.Dropout(0.5)
        self.hid_size = hid_size
        self.out_size = out_size

    def forward(self, blocks, x):
        h = x
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            h = layer(block, h)
            if l != len(self.layers) - 1:
                h = F.relu(h)
                h = self.dropout(h)
        return h

    def inference(self, g, device, batch_size):
        """Conduct layer-wise inference to get all the node embeddings."""
        feat = g.ndata['feat']
        sampler = MultiLayerFullNeighborSampler(1, prefetch_node_feats=['feat'])
        dataloader = DataLoader(
                g, torch.arange(g.num_nodes()).to(g.device), sampler, device=device,
                batch_size=batch_size, shuffle=False, drop_last=False,
                num_workers=0)
        buffer_device = torch.device('cpu')
        pin_memory = (buffer_device != device)

        for l, layer in enumerate(self.layers):
            y = torch.empty(
                g.num_nodes(), self.hid_size if l != len(self.layers) - 1 else self.out_size,
                device=buffer_device, pin_memory=pin_memory)
            feat = feat.to(device)
            for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
                x = feat[input_nodes]
                h = layer(blocks[0], x) # len(blocks) = 1
                if l != len(self.layers) - 1:
                    h = F.relu(h)
                    h = self.dropout(h)
                # by design, our output nodes are contiguous
                y[output_nodes[0]:output_nodes[-1]+1] = h.to(buffer_device)
            feat = y
        return y

定義訓練、推理的函式

def evaluate(model, graph, dataloader):
    model.eval()
    ys = []
    y_hats = []
    for it, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
        with torch.no_grad():
            x = blocks[0].srcdata['feat']
            ys.append(blocks[-1].dstdata['label'])
            y_hats.append(model(blocks, x))
    return MF.accuracy(torch.cat(y_hats), torch.cat(ys))

def layerwise_infer(device, graph, nid, model, batch_size):
    model.eval()
    with torch.no_grad():
        pred = model.inference(graph, device, batch_size) # pred in buffer_device
        pred = pred[nid]
        label = graph.ndata['label'][nid].to(pred.device)
        return MF.accuracy(pred, label)

def train(device, g, model, train_idx, val_idx):
    # create sampler & dataloader
    sampler = NeighborSampler([10, 10, 10],  # fanout for [layer-0, layer-1, layer-2]
                              prefetch_node_feats=['feat'],
                              prefetch_labels=['label'])
    use_uva = False
    train_dataloader = DataLoader(g, train_idx, sampler, device=device,
                                  batch_size=1024, shuffle=True,
                                  drop_last=False, num_workers=0,
                                  use_uva=use_uva)

    val_dataloader = DataLoader(g, val_idx, sampler, device=device,
                                batch_size=1024, shuffle=True,
                                drop_last=False, num_workers=0,
                                use_uva=use_uva)

    opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=5e-4)
    
    for epoch in range(10):
        model.train()
        total_loss = 0
        for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
            x = blocks[0].srcdata['feat']
            y = blocks[-1].dstdata['label']
            y_hat = model(blocks, x)
            loss = F.cross_entropy(y_hat, y)
            opt.zero_grad()
            loss.backward()
            opt.step()
            total_loss += loss.item()
        acc = evaluate(model, g, val_dataloader)
        print("Epoch {:05d} | Loss {:.4f} | Accuracy {:.4f} "
              .format(epoch, total_loss / (it+1), acc.item()))

從 NebulaGraph 中載入圖到 DGL,得到的是一個異構圖(一類點、三類邊)

from nebula_dgl import NebulaLoader

nebula_config = {
    "graph_hosts": [
                ('graphd', 9669),
                ('graphd1', 9669),
                ('graphd2', 9669)
            ],
    "nebula_user""root",
    "nebula_password""nebula",
}

with open('nebulagraph_yelp_dgl_mapper.yaml''r') as f:
     feature_mapper = yaml.safe_load(f)

nebula_loader = NebulaLoader(nebula_config, feature_mapper)

g = nebula_loader.load() # This will take you some time

# 作為窮人,我們用 CPU
g = g.to('cpu')
device = torch.device('cpu')

分出訓練、驗證、測試集,然後轉換成同構圖。

# Split the graph into train, validation and test sets

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# features are g.ndata['f0'], g.ndata['f1'], g.ndata['f2'], ... g.ndata['f31']
# label is in g.ndata['is_fraud']

# concatenate all features
features = []
for i in range(32):
    features.append(g.ndata['f'+str(i)])

g.ndata['feat'] = torch.stack(features, dim=1)
g.ndata['label'] = g.ndata['is_fraud']
# numpy array as index of range n

idx = torch.tensor(np.arange(g.number_of_nodes()), device=device, dtype=torch.int64)
# features.append(idx)
# concatenate one dim with index of node
# feature_and_idx = torch.stack(features, dim=1)

# split based on value distribution of label: the property "is_fraud", which is a binary variable.
X_train_and_val_idx, X_test_idx, y_train_and_val, y_test = train_test_split(
    idx, g.ndata['is_fraud'], test_size=0.2, random_state=42, stratify=g.ndata['is_fraud'])

# split train and val
X_train_idx, X_val_idx, y_train, y_val = train_test_split(
    X_train_and_val_idx, y_train_and_val, test_size=0.2, random_state=42, stratify=y_train_and_val)

# list of index to mask
train_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
train_mask[X_train_idx] = True

val_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
val_mask[X_val_idx] = True

test_mask = torch.zeros(g.number_of_nodes(), dtype=torch.bool)
test_mask[X_test_idx] = True

g.ndata['train_mask'] = train_mask
g.ndata['val_mask'] = val_mask
g.ndata['test_mask'] = test_mask

# shares_restaurant_in_one_month_with: 1, b"001"
# shares_restaurant_rating_with: 2, b"010"
# shares_user_with: 4, b"100"
# set edata of shares_restaurant_in_one_month_with to n of 1 tensor array
g.edges['shares_restaurant_in_one_month_with'].data['he'] = torch.ones(
    g.number_of_edges('shares_restaurant_in_one_month_with'), dtype=torch.float32)
g.edges['shares_restaurant_rating_with'].data['he'] = torch.full(
    (g.number_of_edges('shares_restaurant_rating_with'),), 2, dtype=torch.float32)
g.edges['shares_user_with'].data['he'] = torch.full(
    (g.number_of_edges('shares_user_with'),), 4, dtype=torch.float32)

# heterogeneous graph to heterogeneous graph, keep ndata and edata
hg = dgl.to_homogeneous(g, edata=['he'], ndata=['feat''label''train_mask''val_mask''test_mask'])

訓練、測試模型

# create GraphSAGE model
in_size = hg.ndata['feat'].shape[1]
out_size = 2
model = SAGE(in_size, 256, out_size).to(device)

# model training
print('Training...')
train(device, hg, model, X_train_idx, X_val_idx)

# test the model
print('Testing...')

acc = layerwise_infer(device, hg, X_test_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))

# 執行結果
# Test Accuracy 0.9996

有了模型之後,我們可以把它序列化成檔案,在需要的時候,只需要把模型的形式和這個序列化檔案再載入成一個 pyTorch 就可以用它進行推理了。

# save model
torch.save(model.state_dict(), "fraud_d.model")

# load model
device = torch.device('cpu')
model = SAGE(32, 256, 2).to(device)
model.load_state_dict(torch.load("fraud_d.model"))

1.5.1.4 推理介面

前邊提到過,GraphSAGE 是最簡單的支援 Inductive Learning 的模型,而上邊我們的訓練推理過程實際上還不是這樣的我們的測試和訓練的圖是同一張,雖然標註了訓練的點的索引,但實際上是整張圖作為輸入的。為了做到 Inductive Learning 我們只需要把訓練和測試分成兩個無交集的子圖來做訓練和最終測試:

# Inductive Learning, our test dataset are new nodes and new edges
hg_train = hg.subgraph(torch.cat([X_train_idx, X_val_idx]))

# model training
print('Training...')
train(device, hg_train, model, torch.arange(X_train_idx.shape[0]), torch.arange(X_train_idx.shape[0], hg_train.num_nodes()))

# test the model
print('Testing...')

hg_test = hg.subgraph(torch.cat([X_test_idx]))

sg_X_test_idx = torch.arange(hg_test.num_nodes())

acc = layerwise_infer(device, hg_test, sg_X_test_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))

# 執行結果
# Test Accuracy 0.9990

可以看到,我們上邊的程式碼裡,測試所用到的圖和訓練的圖是完全不同的兩組資料,這使得我們的線上系統可以是之前完全沒有遇到的資料,我們只要把對新來的一個交易請求資料寫進 NebulaGraph,然後再從這個點獲取一個線上系統可以返回的小子圖,就可以把它作為模型推理的輸入,獲得子圖的標籤了!

新的交易請求

還記得我們前邊畫的線上推理系統的流程圖麼?

      ┌─────────────────────┐                          ┌─────────────────┐      
      │                     │                          │                 │
─────▶│ Transaction Record  ├──────2. Fraud Risk ─────▶│  Inference API  │◀────┐
      │                     │◀────Prediction with ─────┤                 │     │
      │                     │        Sub Graph         │                 │     │
      └─────────────────────┘                          └─────────────────┘     │
           │           ▲                                        │              │
           │           │                                        │              │
       0. Insert   1. Get New                              3.req: Node         │
         Record    Record Sub                            Classification        │
           │         Graph                                      │              │
           ▼           │                                        │              │
┌──────────────────────┴─────────────────┐ ┌────────────────────┘      3.resp: │
│┌──────────────────────────────────────┐│ │                          Predicted│
││   Graph of Historical Transactions   ││ │                             Risk  │
│└──────────────────────────────────────┘│ │                                   │
│                   .─.              .   │ │                                   │
│                  (   )◀───────────( )  │ │                                   │
│                   `─'              '   │ │      ┌──────────────────────┐     │
│  .       .─.       ╲             ◁     │ │      │ GNN Model Λ          │     │
│ ( )◀────(   )       ╲           ╱      │ │  ┌───┴─┐        ╱ ╲      ┌──┴──┐  │
│  '       `─'         ╲       . ╱       │ │  ├─────┤       ╱   ╲     ├─────┤  │
│  ╲       ◀            ╲     ( )        │ └─▶├─────┼─────▶▕     ─────├─────┤──┘
│   ╲  .  ╱              ◁     '         │    ├─────┤       ╲   ╱     ├─────┤   
│    ◀( )╱               .─.         .─. │    └───┬─┘        ╲ ╱      └──┬──┘   
│      '                (   )◀──────(   )│        │           V          │      
│                        `─'         `─' │        └──────────────────────┘      
└────────────────────────────────────────┘        

現在,我們假設這個新的交易請求已經發起了,這條交易記錄已經被更新在圖譜裡了,咱們隨便取一個點作為這樣的請求吧

MATCH (n:review) RETURN n LIMIT 1
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| n                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| (2048 :review{f0: 0.0, f1: 0.08034700155258179, f10: 0.3952670097351074, f11: 0.18671999871730804, f12: 0.2836120128631592, f13: 0.2843089997768402, f14: 0.38148200511932373, f15: 0.3816460072994232, f16: 0.9999740123748779, f17: 0.6430919766426086, f18: 0.9999740123748779, f19: 0.5051100254058838, f2: 0.12382200360298157, f20: 0.4940490126609802, f21: 0.7766339778900146, f22: 0.7705119848251343, f23: 0.9480599761009216, f24: 0.4032529890537262, f25: 0.12437800318002701, f26: 0.3184080123901367, f27: 0.5223879814147949, f28: 0.4278610050678253, f29: 0.343284010887146, f3: 0.42868199944496155, f30: 0.37313398718833923, f31: 0.328357994556427, f4: 0.9999849796295166, f5: 0.9999849796295166, f6: 0.9999849796295166, f7: 0.4850809872150421, f8: 0.454602986574173, f9: 0.8863419890403748, is_fraud: 0}) |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

好,它是 2048 這個點。它的下一步是 1. Get New Record Subgraph 我們來獲取它的子圖:

GET SUBGRAPH WITH PROP FROM 2048 YIELD VERTICES AS nodes, EDGES AS relationships;

可以看到返回的結果其實還是很多的,不過對於 NebulaGraph 來說,這個子圖結果返回是在 10 ms 左右獲取的,這裡我就不貼出來了,如果我們在 NebulaGraph Studio 或者 Explorer 中可以把結果渲染出來(視覺化展示的 Query 可以去掉 WITH PROP ,可以給瀏覽器省點記憶體),結果就更容易讓人腦理解了:

./subgraph_console_view.webp

./subgraph_viz.webp

現在我們就來實現這一步的程式碼吧,它的輸入是點的 id:vertex_id。輸出是一個 dgl_graph,用來傳給推理介面。

# get SUBGRAPH of one node

import json
from torch import tensor
from dgl import DGLHeteroGraph, heterograph

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

config = Config()
config.max_connection_pool_size = 2
connection_pool = ConnectionPool()
connection_pool.init([('graphd', 9669)], config)

vertex_id = 2048
client = connection_pool.get_session('root''nebula')
r = client.execute_json(
    "USE yelp;"
    f"GET SUBGRAPH WITH PROP 2 STEPS FROM {vertex_id} YIELD VERTICES AS nodes, EDGES AS relationships;")

r = json.loads(r)
data = r.get('results', [{}])[0].get('data')

這裡我用到了 Nebula-Python 這個 NebulaGraph 的 Python SDK/Client,通過 execute_json 執行獲得了這個交易的子圖。下一步,咱們需要把它構造成一個 dgl_graph

# create node and nodedata
node_id_map = {} # key: vertex id in NebulaGraph, value: node id in dgl_graph
node_idx = 0
features = [[] for _ in range(32)] + [[]]
for i in range(len(data)):
    for index, node in enumerate(data[i]['meta'][0]):
        nodeid = data[i]['meta'][0][index]['id']
        if nodeid not in node_id_map:
            node_id_map[nodeid] = node_idx
            node_idx += 1
            for f in range(32):
                features[f].append(data[i]['row'][0][index][f"review.f{f}"])
            features[32].append(data[i]['row'][0][index]['review.is_fraud'])


"""
- R-U-R:兩個評價由同一個使用者發出 shares_user_with
- R-S-R:兩個評價是同餐廳同評分(評分可以是1到5) shares_restaurant_rating_with
- R-T-R:兩個評價是同餐廳同提交月份 shares_restaurant_in_one_month_with
"""
rur_start, rur_end, rsr_start, rsr_end, rtr_start, rtr_end = [], [], [], [], [], []
for i in range(len(data)):
    for edge in data[i]['meta'][1]:
        edge = edge['id']
        if edge['name'] == 'shares_user_with':
            rur_start.append(node_id_map[edge['src']])
            rur_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_rating_with':
            rsr_start.append(node_id_map[edge['src']])
            rsr_end.append(node_id_map[edge['dst']])
        elif edge['name'] == 'shares_restaurant_in_one_month_with':
            rtr_start.append(node_id_map[edge['src']])
            rtr_end.append(node_id_map[edge['dst']])

data_dict = {}
if rur_start:
    data_dict[('review''shares_user_with''review')] = tensor(rur_start), tensor(rur_end)
if rsr_start:
    data_dict[('review''shares_restaurant_rating_with''review')] = tensor(rsr_start), tensor(rsr_end)
if rtr_start:
    data_dict[('review''shares_restaurant_in_one_month_with''review')] = tensor(rtr_start), tensor(rtr_end)

# construct a dgl_graph
dgl_graph: DGLHeteroGraph = heterograph(data_dict)

實際上我就是按照 DGL 文件:https://docs.dgl.ai/en/0.9.x/generated/dgl.heterograph.html,中的方式去構造 data_dict,然後用 heterograph() 就把結果轉換為想要的 dgl_graph 了,其中 node_id_map 是 NebulaGraph 之中 Vertex_ID 到這個物件中 node_id 的字典。

最後,我們再把 node feature 也載入進去。

# load node features to dgl_graph
for i in range(32):
    dgl_graph.ndata[f"f{i}"] = tensor(features[i])
dgl_graph.ndata['label'] = tensor(features[32])

在開始推理之前,我們還需要把它轉換成同構圖,和前邊完全一樣:

import torch


# to homogeneous graph
features = []
for i in range(32):
    features.append(dgl_graph.ndata[f"f{i}"])

dgl_graph.ndata['feat'] = torch.stack(features, dim=1)

dgl_graph.edges['shares_restaurant_in_one_month_with'].data['he'] = torch.ones(
    dgl_graph.number_of_edges('shares_restaurant_in_one_month_with'), dtype=torch.float32)
dgl_graph.edges['shares_restaurant_rating_with'].data['he'] = torch.full(
    (dgl_graph.number_of_edges('shares_restaurant_rating_with'),), 2, dtype=torch.float32)
dgl_graph.edges['shares_user_with'].data['he'] = torch.full(
    (dgl_graph.number_of_edges('shares_user_with'),), 4, dtype=torch.float32)


# heterogeneous graph to heterogeneous graph, keep ndata and edata
import dgl
hg = dgl.to_homogeneous(dgl_graph, edata=['he'], ndata=['feat''label'])

最後,我們的推理介面就是:

def do_inference(device, graph, node_idx, model, batch_size):
    model.eval()
    with torch.no_grad():
        pred = model.inference(graph, device, batch_size) # pred in buffer_device
        return pred[node_idx]

我們可以呼叫一下試試推理我們這個新的點:

node_idx = node_id_map[vertex_id]
batch_size = 4096

result = do_inference(device, hg, node_idx, model, batch_size)

當然,我們也能在這個小子圖上計算他的正確率:

def test_inference(device, graph, nid, model, batch_size):
    model.eval()
    with torch.no_grad():
        pred = model.inference(graph, device, batch_size) # pred in buffer_device
        pred = pred[nid]
        label = graph.ndata['label'][nid].to(pred.device)
        return MF.accuracy(pred, label)

node_idx = torch.tensor(list(node_id_map.values()))
acc = test_inference(device, hg, node_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))

輸出結果:

In [307]: def test_inference(device, graph, nid, model, batch_size):
     ...:     model.eval()
     ...:     with torch.no_grad():
     ...:         pred = model.inference(graph, device, batch_size) # pred in buffer
     ...: _device
     ...:         pred = pred[nid]
     ...:         label = graph.ndata['label'][nid].to(pred.device)
     ...:         return MF.accuracy(pred, label)
     ...:
     ...: node_idx = torch.tensor(list(node_id_map.values()))
     ...: acc = test_inference(device, hg, node_idx, model, batch_size=4096)
     ...: print("Test Accuracy {:.4f}".format(acc.item()))
     ...:
100%|████████████████████████████████████████████████| 1/1 [00:00<00:00130.31it/s]
100%|████████████████████████████████████████████████| 1/1 [00:00<00:00152.29it/s]
100%|████████████████████████████████████████████████| 1/1 [00:00<00:00173.55it/s]
Test Accuracy 0.9688

這個示例專案的程式碼在:github.com/wey-gu/NebulaGraph-Fraud-Detection-GNN ,如有問題歡迎留言、ISSUE。

 

總結

總結起來,欺詐檢測的方法有:

  • 在一個交易歷史、風控的圖譜上,通過圖模式查詢直接獲得風險提示

  • 定期利用圖演算法擴充風險標註,寫回相簿

  • 定期計算圖譜中的圖特徵,和其他特徵一起用傳統機器學習方法離線預測風險

  • 將圖譜中的屬性處理成為點、邊特徵,用圖神經網路方法離線預測風險,部分可以 Inductive Learning 的方法結合相簿可以實現線上風險預測

 

交流圖資料庫技術?加入 NebulaGraph 交流群請先 填寫下你的 NebulaGraph 名片 ,Nebula 小助手會拉你進群哦~~
 

官網:https://nebula-graph.com.cn

GitHub:https://github.com/vesoft-inc/nebula

免費開源,可以右上角點 Star(7.9K) 支援/收藏下~