一文了解 NebulaGraph 上的 Spark 項目
本文首發於 NebulaGraph Community 公眾號
最近我試着搭建了方便大家一鍵試玩的 Nebula Graph 中的Spark 相關的項目,今天就把它們整理成文分享給大家。而且,我趟出來了 PySpark 下的 Nebula Spark Connector 的使用方式,後邊也會一併貢獻到文檔裏。
Nebula Graph 的三個 Spark 子項目
我曾經圍繞 NebulaGraph 的所有數據導入方法畫過一個草圖,其中已經包含了 Spark Connector,Nebula Exchange 的簡單介紹。在這篇文章中我將它們和另外的 Nebula Algorithm 進行稍微深入的探討。
注:這篇文檔 也很清楚為我們列舉了不同導入工具的選擇。
TL;DR
- Nebula Spark Connector 是一個 Spark Lib,它能讓 Spark 應用程序能夠以
dataframe
的形式從 Nebula Graph 中讀取和寫入圖數據。 - Nebula Exchange 建立在 Nebula Spark Connector 之上,作為一個 Spark Lib 同時可以直接被 Spark 提交 JAR 包執行的應用程序,它的設計目標是和 NebulaGraph 交換不同的數據源(對於開源版本,它是單向的:寫入,而對於企業版本,它是雙向的)。Nebula Exchange 支持的很多不同類型的數據源如:MySQL、Neo4j 、PostgreSQL 、ClickHouse 、Hive 等。除了直接寫入 NebulaGraph,它還可以選擇生成 SST 文件,並將其注入 Nebula Graph,以便使用 Nebula Graph 集羣之外算力幫助排序底層。
- Nebula Algorithm,建立在 Nebula Spark Connector 和 GraphX 之上,也是一個Spark Lib 和 Spark 上的應用程序,它用來在 NebulaGraph 的圖上運行常用的圖算法(pagerank,LPA等)。
Nebula Spark Connector
- 代碼: https:// github.com/vesoft-inc/n ebula-spark-connector
- 文檔: https:// docs.nebula-graph.io/3. 1.0/nebula-spark-connector/
- JAR 包: https:// repo1.maven.org/maven2/ com/vesoft/nebula-spark-connector/
- 代碼例子: example
NebulaGraph Spark Reader
為了從 Nebula Graph 中讀取數據,比如讀 vertex,Nebula Spark Connector 將掃描所有帶有給定 TAG 的 Nebula StorageD,比如這樣表示掃描 player
這個 TAG : withLabel("player")
,我們還可以指定 vertex 的屬性: withReturnCols(List("name", "age"))
。
指定好所有的讀 TAG 相關的配置之後,調用 spark.read.nebula.loadVerticesToDF
返回得到的就是掃描 NebulaGraph 之後轉換為 Dataframe 的圖數據,像這樣:
def readVertex(spark: SparkSession): Unit = { LOG.info("start to read nebula vertices") val config = NebulaConnectionConfig .builder() .withMetaAddress("metad0:9559,metad1:9559,metad2:9559") .withConenctionRetry(2) .build() val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace("basketballplayer") .withLabel("player") .withNoColumn(false) .withReturnCols(List("name", "age")) .withLimit(10) .withPartitionNum(10) .build() val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF() vertex.printSchema() vertex.show(20) println("vertex count: " + vertex.count()) }
寫入的例子我這裏不列出,不過,前邊給出的代碼示例的鏈接裏是有更詳細的例子,這裏值得一提的是, Spark Connector 讀數據為了滿足圖分析、圖計算的大量數據場景 ,和大部分其他客户端非常不同,它直接繞過了 GraphD,通過掃描 MetaD 和 StorageD 獲得數據,但是寫入的情況則是通過 GraphD 發起 nGQL DML 語句寫入的。
接下來我們來做一個上手練習吧。
上手 Nebula Spark Connector
先決條件:假設下面的程序是在一台有互聯網連接的 Linux 機器上運行的,最好是預裝了 Docker 和 Docker-Compose。
拉起環境
首先,讓我們用 Nebula-Up 部署基於容器的 NebulaGraph Core v3、Nebula Studio、Nebula Console 和 Spark、Hadoop 環境,如果還沒安裝好它也會嘗試為我們安裝 Docker 和 Docker-Compose。
# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
你知道嗎 Nebula-UP 可以一鍵裝更多東西,如果你的環境配置大一點(比如 8 GB RAM) curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash
可以裝更多東西,但是請注意 Nebula-UP 不是為生產環境準備的。
上述邊腳本執行後,讓我們用Nebula-Console(NebulaGraph 的命令行客户端)來連接它。
# Connect to nebula with console ~/.nebula-up/console.sh # Execute any queryies like ~/.nebula-up/console.sh -e "SHOW HOSTS"
加載一份數據進去,並執行一個圖查詢:
# Load the sample dataset ~/.nebula-up/load-basketballplayer-dataset.sh # 等一分鐘左右 # Make a Graph Query the sample dataset ~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
進入 Spark 環境
執行下面這一行,我們就可以進入到 Spark 環境:
docker exec -it spark_master_1 bash
如果我們想執行編譯,可以在裏邊安裝 mvn
:
docker exec -it spark_master_1 bash # in the container shell export MAVEN_VERSION=3.5.4 export MAVEN_HOME=/usr/lib/mvn export PATH=$MAVEN_HOME/bin:$PATH wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \ tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \ rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \ mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
跑 Spark Connector 的例子
選項 1(推薦):通過 PySpark
- 進入 PySpark Shell
~/.nebula-up/nebula-pyspark.sh
- 調用 Nebula Spark Reader
# call Nebula Spark Connector Reader df = spark.read.format( "com.vesoft.nebula.connector.NebulaDataSource").option( "type", "vertex").option( "spaceName", "basketballplayer").option( "label", "player").option( "returnCols", "name,age").option( "metaAddress", "metad0:9559").option( "partitionNumber", 1).load() # show the dataframe with limit of 2 df.show(n=2)
- 返回結果例子
____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.5 /_/ Using Python version 2.7.16 (default, Jan 14 2020 07:22:06) SparkSession available as 'spark'. >>> df = spark.read.format( ... "com.vesoft.nebula.connector.NebulaDataSource").option( ... "type", "vertex").option( ... "spaceName", "basketballplayer").option( ... "label", "player").option( ... "returnCols", "name,age").option( ... "metaAddress", "metad0:9559").option( ... "partitionNumber", 1).load() >>> df.show(n=2) +---------+--------------+---+ |_vertexId| name|age| +---------+--------------+---+ |player105| Danny Green| 31| |player109|Tiago Splitter| 34| +---------+--------------+---+ only showing top 2 rows
選項 2:編譯、提交示例 JAR 包
- 先克隆 Spark Connector 和它示例代碼的代碼倉庫,然後編譯:
注意,我們使用了 master 分支,因為當下 master 分支是兼容 3.x 的,一定要保證 spark connector 和數據庫內核版本是匹配的,版本對應關係參考代碼倉庫的 README.md
。
cd ~/.nebula-up/nebula-up/spark git clone https://github.com/vesoft-inc/nebula-spark-connector.git docker exec -it spark_master_1 bash cd /root/nebula-spark-connector
- 替換示例項目的代碼
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
- 把如下的代碼粘貼進去,這裏邊我們對前邊加載的圖:
basketballplayer
上做了頂點和邊的讀操作:分別調用readVertex
和readEdges
。
package com.vesoft.nebula.examples.connector import com.facebook.thrift.protocol.TCompactProtocol import com.vesoft.nebula.connector.connector.NebulaDataFrameReader import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory object NebulaSparkReaderExample { private val LOG = LoggerFactory.getLogger(this.getClass) def main(args: Array[String]): Unit = { val sparkConf = new SparkConf sparkConf .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) val spark = SparkSession .builder() .master("local") .config(sparkConf) .getOrCreate() readVertex(spark) readEdges(spark) spark.close() sys.exit() } def readVertex(spark: SparkSession): Unit = { LOG.info("start to read nebula vertices") val config = NebulaConnectionConfig .builder() .withMetaAddress("metad0:9559,metad1:9559,metad2:9559") .withConenctionRetry(2) .build() val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace("basketballplayer") .withLabel("player") .withNoColumn(false) .withReturnCols(List("name", "age")) .withLimit(10) .withPartitionNum(10) .build() val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF() vertex.printSchema() vertex.show(20) println("vertex count: " + vertex.count()) } def readEdges(spark: SparkSession): Unit = { LOG.info("start to read nebula edges") val config = NebulaConnectionConfig .builder() .withMetaAddress("metad0:9559,metad1:9559,metad2:9559") .withTimeout(6000) .withConenctionRetry(2) .build() val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace("basketballplayer") .withLabel("follow") .withNoColumn(false) .withReturnCols(List("degree")) .withLimit(10) .withPartitionNum(10) .build() val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() edge.printSchema() edge.show(20) println("edge count: " + edge.count()) } }
- 然後打包成 JAR 包
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
- 最後,把它提交到 Spark 裏執行:
cd example /spark/bin/spark-submit --master "local" \ --class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \ --driver-memory 4g target/example-3.0-SNAPSHOT.jar # 退出 spark 容器 exit
- 成功之後,我們會得到返回結果:
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s +---------+------------------+---+ |_vertexId| name|age| +---------+------------------+---+ |player105| Danny Green| 31| |player109| Tiago Splitter| 34| |player111| David West| 38| |player118| Russell Westbrook| 30| |player143|Kristaps Porzingis| 23| |player114| Tracy McGrady| 39| |player150| Luka Doncic| 20| |player103| Rudy Gay| 32| |player113| Dejounte Murray| 29| |player121| Chris Paul| 33| |player128| Carmelo Anthony| 34| |player130| Joel Embiid| 25| |player136| Steve Nash| 45| |player108| Boris Diaw| 36| |player122| DeAndre Jordan| 30| |player123| Ricky Rubio| 28| |player139| Marc Gasol| 34| |player142| Klay Thompson| 29| |player145| JaVale McGee| 31| |player102| LaMarcus Aldridge| 33| +---------+------------------+---+ only showing top 20 rows 22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s +---------+---------+-----+------+ | _srcId| _dstId|_rank|degree| +---------+---------+-----+------+ |player105|player100| 0| 70| |player105|player104| 0| 83| |player105|player116| 0| 80| |player109|player100| 0| 80| |player109|player125| 0| 90| |player118|player120| 0| 90| |player118|player131| 0| 90| |player143|player150| 0| 90| |player114|player103| 0| 90| |player114|player115| 0| 90| |player114|player140| 0| 90| |player150|player120| 0| 80| |player150|player137| 0| 90| |player150|player143| 0| 90| |player103|player102| 0| 70| |player113|player100| 0| 99| |player113|player101| 0| 99| |player113|player104| 0| 99| |player113|player105| 0| 99| |player113|player106| 0| 99| +---------+---------+-----+------+ only showing top 20 rows
事實上,在這個代碼倉庫下還有更多的例子,特別是GraphX 的例子,你可以嘗試自己去探索這部分。
請注意,在 GraphX 假定頂點 ID 是數字類型的,因此對於字符串類型的頂點 ID 情況,需要進行實時轉換,請參考 Nebula Algorithom 中的例子 ,瞭解如何繞過這一問題。
Nebula Exchange
- 代碼: https:// github.com/vesoft-inc/n ebula-exchange/
- 文檔: https:// docs.nebula-graph.com.cn /3.1.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/
- JAR 包: https:// github.com/vesoft-inc/n ebula-exchange/releases
- 配置例子: exchange-common/src/test/resources/application.conf
Nebula Exchange 是一個 Spark Lib,也是一個可以直接提交執行的 Spark 應用,它被用來從多個數據源讀取數據寫入 NebulaGraph 或者輸出NebulaGraph SST 文件。

通過 spark-submit 的方式使用 Nebula Exchange 的方法很直接:
- 首先創建配置文件,讓 Exchange 知道應該如何獲取和寫入數據
- 然後用指定的配置文件調用 Exchange 包
現在,讓我們用上一章中創建的相同環境做一個實際測試。
一鍵試玩 Exchange
先跑起來看看吧
請參考前邊拉起環境這一章節,先一鍵裝好環境。
一鍵執行:
~/.nebula-up/nebula-exchange-example.sh
恭喜你,已經第一次執行成功一個 Exchange 的數據導入任務啦!
再看看一些細節
這個例子裏,我們實際上是用 Exchange 從 CSV 文件這一其中支持的數據源中讀取數據寫入 NebulaGraph 集羣的。這個 CSV 文件中第一列是頂點 ID,第二和第三列是 "姓名 "和 "年齡 "的屬性:
player800,"Foo Bar",23 player801,"Another Name",21
- 咱們可以進到 Spark 環境裏看看
docker exec -it spark_master_1 bash cd /root
- 可以看到我們提交 Exchange 任務時候指定的配置文件
exchange.conf
它是一個HOCON
格式的文件: - 在
.nebula
中描述了 Nebula Graph 集羣的相關信息 - 在
.tags
中描述瞭如何將必填字段對應到我們的數據源(這裏是 CSV 文件)等有關 Vertecies 的信息。
{ # Spark relation config spark: { app: { name: Nebula Exchange } master:local driver: { cores: 1 maxResultSize: 1G } executor: { memory: 1G } cores:{ max: 16 } } # Nebula Graph relation config nebula: { address:{ graph:["graphd:9669"] meta:["metad0:9559", "metad1:9559", "metad2:9559"] } user: root pswd: nebula space: basketballplayer # parameters for SST import, not required path:{ local:"/tmp" remote:"/sst" hdfs.namenode: "hdfs://localhost:9000" } # nebula client connection parameters connection { # socket connect & execute timeout, unit: millisecond timeout: 30000 } error: { # max number of failures, if the number of failures is bigger than max, then exit the application. max: 32 # failed import job will be recorded in output path output: /tmp/errors } # use google's RateLimiter to limit the requests send to NebulaGraph rate: { # the stable throughput of RateLimiter limit: 1024 # Acquires a permit from RateLimiter, unit: MILLISECONDS # if it can't be obtained within the specified timeout, then give up the request. timeout: 1000 } } # Processing tags # There are tag config examples for different dataSources. tags: [ # HDFS csv # Import mode is client, just change type.sink to sst if you want to use client import mode. { name: player type: { source: csv sink: client } path: "file:///root/player.csv" # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields fields: [_c1, _c2] nebula.fields: [name, age] vertex: { field:_c0 } separator: "," header: false batch: 256 partition: 32 } ] }
- 我們應該能看到那個 CSV 數據源和這個配置文件都在同一個目錄下了:
bash-5.0# ls -l total 24 drwxrwxr-x 2 1000 1000 4096 Jun 1 04:26 download -rw-rw-r-- 1 1000 1000 1908 Jun 1 04:23 exchange.conf -rw-rw-r-- 1 1000 1000 2593 Jun 1 04:23 hadoop.env drwxrwxr-x 7 1000 1000 4096 Jun 6 03:27 nebula-spark-connector -rw-rw-r-- 1 1000 1000 51 Jun 1 04:23 player.csv
- 然後,實際上我們可以手動再次提交一下這個 Exchange 任務
/spark/bin/spark-submit --master local \ --class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \ -c exchange.conf
- 部分返回結果
22/06/06 03:56:26 INFO Exchange$: Processing Tag player 22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2 22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age 22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv ... 22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s 22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2 22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0 ...
更多的數據源,請參考文檔和配置的例子。
關於 Exchange 輸出 SST 文件的實踐,你可以參考文檔和我的舊文 Nebula Exchange SST 2.x實踐指南 。
Nebula Algorithm
- 代碼倉庫: https:// github.com/vesoft-inc/n ebula-algorithm
- 文檔: https:// docs.nebula-graph.com.cn /3.1.0/nebula-algorithm/
- JAR 包: https:// repo1.maven.org/maven2/ com/vesoft/nebula-algorithm/
- 示例代碼: example/src/main/scala/com/vesoft/nebula/algorithm
通過 spark-submit 提交任務
我在 這個代碼倉庫 裏給出了例子,今天我們藉助 Nebula-UP 可以更方便體驗它。
參考前邊拉起環境這一章節,先一鍵裝好環境。
在如上通過 Nebula-UP 的 Spark 模式部署了需要的依賴之後
- 加載LiveJournal 數據集
~/.nebula-up/load-LiveJournal-dataset.sh
- 在 LiveJournal 數據集上執行一個 PageRank 算法,結果輸出到 CSV 文件中
~/.nebula-up/nebula-algo-pagerank-example.sh
- 檢查輸出結果:
docker exec -it spark_master_1 bash head /output/part*000.csv _id,pagerank 637100,0.9268620883822242 108150,1.1855749056722755 957460,0.923720299211093 257320,0.9967932799358413
配置文件解讀
完整文件在 這裏 ,這裏,我們介紹一下主要的字段:
-
.data
指定了源是 Nebula,表示從集羣獲取圖數據,輸出sink
是csv
,表示寫到本地文件裏。
data: { # data source. optional of nebula,csv,json source: nebula # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text sink: csv # if your algorithm needs weight hasWeight: false }
-
.nebula.read
規定了讀 NebulaGraph 集羣的對應關係,這裏是讀取所有 edge type:follow
的邊數據為一整張圖
nebula: { # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid. read: { # Nebula metad server address, multiple addresses are split by English comma metaAddress: "metad0:9559" # Nebula space space: livejournal # Nebula edge types, multiple labels means that data from multiple edges will union together labels: ["follow"] # Nebula edge property name for each edge type, this property will be as weight col for algorithm. # Make sure the weightCols are corresponding to labels. weightCols: [] }
-
.algorithm
裏配置了我們要調用的算法,和算法的配置
algorithm: { executeAlgo: pagerank # PageRank parameter pagerank: { maxIter: 10 resetProb: 0.15 # default 0.15 }
作為一個庫在 Spark 中調用 Nebula Algoritm
請注意另一方面,我們可以將 Nebula Algoritm 作為一個庫調用,它的好處在於:
- 對算法的輸出格式有更多的控制/定製功能
- 可以對非數字 ID 的情況進行轉換,見 這裏
這裏我先不給出例子了,如果大家感興趣可以給 Nebula-UP 提需求,我也會增加相應的例子。
交流圖數據庫技術?加入 Nebula 交流羣請先 填寫下你的 Nebula 名片 ,Nebula 小助手會拉你進羣~~
- 一文了解 NebulaGraph 上的 Spark 項目
- 使用 MyBatis 操作 Nebula Graph 的實踐
- GraphX 圖計算實踐之模式匹配抽取特定子圖
- 圖數據庫|如何從零到一構建一個企業股權圖譜系統?
- GitHub 自動合併 pr 的機器人——auto-merge-bot
- 圖數據庫|正反向邊的最終一致性——TOSS 介紹
- Nebula Graph 在網易遊戲業務中的實踐
- Nebula Graph 在企查查的應用
- 基於 BDD 理論的 Nebula 集成測試框架重構(下篇)
- 圖數據庫 Nebula Graph 集羣通信:從心跳説起
- 在 Spark 數據導入中的一些實踐細節
- GraphX 在圖數據庫 Nebula Graph 的圖計算實踐
- 調試 Docker 容器內部的 Nebula Graph 進程