GraphX 圖計算實踐之模式匹配抽取特定子圖

語言: CN / TW / HK

本文首發於 Nebula Graph Community 公眾號

GraphX 圖計算實踐之模式匹配抽取特定子圖

前言

Nebula Graph 本身提供了高效能的 OLTP 查詢可以較好地實現各種實時的查詢場景,同時它也提供了基於 Spark GraphX 的 nebula-algorithm 庫以便支援實時的圖演算法,這裡給 Nebula 點個贊,很不錯!

但實踐過程中,我發現部分 OLAP 場景中,想實現模式匹配分析,Nebula 的支撐就顯得不那麼完善了。

這裡我對模式匹配的解釋是:在一張大圖中,根據特定的規則抽取出對應的子圖。

舉一個簡單的例子,比如想要對每個點都進行二度擴散,並按照一定邏輯過濾,最終保留符合要求的二度擴散的子圖,這樣的任務用 nebula-algorithm 就不太好實現了。

當然,上面這個例子我們可以通過編寫 nGQL 語句——查詢出對應的資料,但 Nebula 的優勢在 OLTP 場景,針對特定點進行查詢。對於全圖資料的計算,無論是計算架構還是記憶體大小都不是特別適合的。所以,為了補充該部分(模式匹配)的功能,這裡使用 Spark GraphX 來滿足 OLAP 的計算需求。

GraphX 介紹

GraphX 是 Spark 生態的一個分散式圖計算引擎,提供了許多的圖計算介面,方便進行圖的各項操作。關於 GraphX 的基礎知識我這裡不進行過多的介紹了,主要是介紹一下實現模式匹配的思路。

實現模式匹配主要是依賴於一個重要的 API:PregelAPI,它是一種 BSP(BSP:Bulk Synchronous Parallel,即整體同步並行)計算模型,一次計算是由一系列超步實現的。

只看定義不是特別好理解,所以直接介紹它在 GraphX 中的實現,瞭解它是如何使用的。

Pregel 執行原理

原始碼定義如下:

  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }
 

相關引數含義如下:

  • initialMsg: 節點的初始化資訊,呼叫 vprog 函式處理 initialMsg;
  • maxIrerations:最大迭代次數;
  • activeDiraction:控制 sendMsg 傳送的方向,只有滿足方向要求的三元組才會進入下一次迭代;
  • vprog:更新節點資訊的函式。節點收到訊息後,執行相關邏輯更新節點資訊;
  • sendMsg:節點和節點之間傳送訊息,引數為一個三元組,並且滿足 activeDiraction 的方向條件,把訊息 Msg 傳送給 VertexID,VertexID 可以是 src 點也可以是 dst 點;
  • mergeMsg:當同一個 VertexID 接收到多條訊息時,合併多條訊息為一條,便於 vprog 處理。

只看定義和邏輯同樣不太清楚,所以下邊再介紹一下 Pregel 的迭代流程:

  1. 對於一個 graph 物件,只有啟用態的點才會參與下一次迭代,啟用態的條件是完成了一次傳送/收到訊息 A 的動作;
  2. 首先初始化所有節點,也就是每個點都呼叫一次 vprog 方法,引數為 initialMsg,這樣使所有節點都在啟用態;
  3. 然後是將圖劃分為若干三元組 Triplet,三元組的組成是:src點,edge,dst 點,只保留啟用點 activeDirection 方向的三元組;
  4. 執行 sendMsg 方法,將訊息 A 傳送給一個 VertexID 的點,由於返回值是一個 Iterator,也就是可以同時給 src、dst 傳送訊息,若傳送 Iterator.empty 則認為沒有傳送訊息;
  5. 由於一個 VertexID 的點會收到多條訊息,所以呼叫 mergeMsg 方法合併訊息,合併為一個 A;
  6. 合併之後呼叫 vprog 更新節點的訊息,這樣就完成了一次迭代;
  7. 重複 3-6 的步驟,執行 maxIterations 次迭代或者所有的點都不是啟用態則退出,完成 Pregel 的所有計算。

模式匹配的思路

知道 Pregel 的計算原理之後,那麼怎麼實現模式匹配呢,主要就是根據迭代的思想,不停地將邊資訊聚合到點上,在迭代的過程中控制傳送訊息的邏輯來實現特定模式的路徑

我們可以定義訊息為多條路徑的集合,傳送訊息時就是對傳送點的路徑集合中,每條路徑都增加一個邊 e,這樣就實現了路徑的遍歷,其實對於一個點來說,本質就是一個廣度優先遍歷的過程。

還是以二度查詢為例,看如下例子:

GraphX 圖計算實踐之模式匹配抽取特定子圖

首先,對每個點都執行一次初始化,每個點的屬性為一個空的路徑集合,路徑集合使用二維陣列表示,使所有點成為啟用態。

然後,進行第一次迭代,可以看到會有兩個三元組 A-E1->BB-E2->C,那麼很容易可以得到這次迭代的結果:A:[]B:[[E1]]C:[[E2]]

再進行第二次迭代,這裡要做限制,已經發送過的路徑不再發送,也就是判斷 E 是否已被接收了,防止重複傳送的情況。所以第二次迭代的結果就只有 B-E2->C 這個三元組有效,也就是把 B 的集合中的每條路徑分別增加一個 E2,併發給 C,C 將路徑合併即可,那麼結果就是:A:[]B:[[E1]]C:[[E2][E1,E2]]

此時 C 節點上的集合中就有了 E1,E2 兩條邊,剛好是 A 節點 2 度遍歷的結果。

這裡舉的是簡單例子,只是說明這樣的一個思路,核心邏輯就是傳遞邊來實現路徑遍歷,實際上每個節點會收到許多點的資訊,那麼可以將點的結果進行過濾,按照頭結點分組即可。實現看如下例子:

GraphX 圖計算實踐之模式匹配抽取特定子圖

在這個例子中根據要求,能得到的結果就是 A 和 G 的2度路徑子圖,迭代的結果我不再贅述,直接列出C,F節點的屬性:C:[[E2],[E6],[E1,E2],[E5,E6]]F:[[E4],[E3,E4]],當然點 H,B,D 也有路徑,但其實可以清楚的看到想要的結果是在 C,F 節點上的。

那麼,結果有了但它是分散的,怎麼合併起來呢?我們可以將每個點路徑的第一個邊的起始點拿出來作為 key,因為迭代時每條路徑是有序的,其實這個 key 就是目標點,比如 E1,E3 的起始點都是 A,E5 的起始點是 G,我們將每條路徑都增加一個key,變更為key:path,過濾掉小於 2 條邊的路徑,再按照key分組,就得到了目標點對應的子圖路徑了,這樣是不是就拿到了 A 和 G 各自的2度點邊了呢!

思路延伸

2 度擴散這個例子還是比較簡單的,實際業務中,會有很多的情況,當然圖的結構也會比較複雜,比如:

  1. 不同標籤的點如何遍歷
  2. 不同型別的邊如何遍歷
  3. 出現環路如何解決
  4. 邊的方向是有向還是無向
  5. 多條邊如何處理
  6. ...

等等的這些問題,但是核心點不變,就是基於 Pregel 實現廣度優先遍歷,累積邊形成路徑資訊,主要的邏輯基本都在於 sendMsg 這個方法,來控制發或者不發,來決定路徑的走向,以滿足模式匹配的業務要求。一次迭代就是積累一層的路徑資訊,所以迭代次數與圖的深度一致。在迭代完成後,每個點上都有一些結果,他們可能是中間結果,也可能是最終結果,一般按照指定 key(一般是頭結點)分組再進行一些業務邏輯的過濾(比如路徑長度),即可得到指定結構的子圖,接下來就可以用於業務的分析操作了。

此外,還可以藉助 GraphFrames 來實現諸如:二度擴散,這種簡單的模式匹配。通過使用類似 Spark SQL 的運算元,十分容易的得到計算結果,大大減少程式碼的難度。但是由於文件較少,又不如 GraphX 多種運算元的靈活,對於複雜的模式還是不太推薦的,感興趣的可以去了解一下。

總結

利用 GraphX 的 Pregel API 進行廣度優先遍歷來實現模式匹配的好處:

  1. GraphX 有多種圖運算元可以靈活處理圖資料;
  2. 基於 Pregel,使用路徑當做訊息可以靈活控制模式子圖的結構,理論上可以實現任何結構的模式提取;
  3. 能夠支援較大資料量的全圖模式匹配,彌補 Nebula 相簿 OLAP 的不足;
  4. 無縫整合到大資料生態圈,方便結果的分析使用。

使用這種方式雖然能夠實現模式匹配,但是也有很多缺點,比如說:

  1. 每次迭代的訊息都是路徑集合,越往後訊息會越大,導致 JOIN 的資料量很大,記憶體佔用較高。可以通過優化過濾掉不必要傳送的資訊來解決;
  2. 迭代的次數有限,太多了則會出現記憶體爆炸,不過一般業務中超過 10 層以上的情況也很少;
  3. 由於節點 ID 通常是 String,需要提前做對映表,計算完又要轉換回來,導致計算過程中 shuffle 的次數很多。

針對上面問題,如果你有更好的實現方案,或者通過其他計算引擎能夠更好的實現,請務必與我交流指導!

最後,雖然 GraphX 使用起來上手有一定難度,計算也高度依賴記憶體,但瑕不掩瑜它仍然是一款優秀的圖計算框架,尤其是分散式的特效能夠進行大量資料的計算,同時 Spark 又能較好地與大資料生態整合,又有官方提供的 nebula-spark-connector 方便讀寫 Nebula 資料,使用起來還是非常不錯的。

我的分享就到這裡了,歡迎大家交流更好想法!

我是繁凡,一名大資料開發工程師,目前從事圖譜產品開發,致力於大規模圖資料在業務中的使用。最近使用 GraphX 實踐了一些業務要求的模式匹配開發,在這裡分享一些使用的思路。


交流圖資料庫技術?加入 Nebula 交流群請先填寫下你的 Nebula 名片,Nebula 小助手會拉你進群~~