一文了解 NebulaGraph 上的 Spark 項目

語言: CN / TW / HK

本文首發於 NebulaGraph Community 公眾號

最近我試着搭建了方便大家一鍵試玩的 Nebula Graph 中的Spark 相關的項目,今天就把它們整理成文分享給大家。而且,我趟出來了 PySpark 下的 Nebula Spark Connector 的使用方式,後邊也會一併貢獻到文檔裏。

Nebula Graph 的三個 Spark 子項目

我曾經圍繞 NebulaGraph 的所有數據導入方法畫過一個草圖,其中已經包含了 Spark Connector,Nebula Exchange 的簡單介紹。在這篇文章中我將它們和另外的 Nebula Algorithm 進行稍微深入的探討。

注:這篇文檔 也很清楚為我們列舉了不同導入工具的選擇。

TL;DR

  • Nebula Spark Connector 是一個 Spark Lib,它能讓 Spark 應用程序能夠以 dataframe 的形式從 Nebula Graph 中讀取和寫入圖數據。
  • Nebula Exchange 建立在 Nebula Spark Connector 之上,作為一個 Spark Lib 同時可以直接被 Spark 提交 JAR 包執行的應用程序,它的設計目標是和 NebulaGraph 交換不同的數據源(對於開源版本,它是單向的:寫入,而對於企業版本,它是雙向的)。Nebula Exchange 支持的很多不同類型的數據源如:MySQL、Neo4j 、PostgreSQL 、ClickHouse 、Hive 等。除了直接寫入 NebulaGraph,它還可以選擇生成 SST 文件,並將其注入 Nebula Graph,以便使用 Nebula Graph 集羣之外算力幫助排序底層。
  • Nebula Algorithm,建立在 Nebula Spark Connector 和 GraphX 之上,也是一個Spark Lib 和 Spark 上的應用程序,它用來在 NebulaGraph 的圖上運行常用的圖算法(pagerank,LPA等)。

Nebula Spark Connector

NebulaGraph Spark Reader

為了從 Nebula Graph 中讀取數據,比如讀 vertex,Nebula Spark Connector 將掃描所有帶有給定 TAG 的 Nebula StorageD,比如這樣表示掃描 player 這個 TAG : withLabel("player") ,我們還可以指定 vertex 的屬性: withReturnCols(List("name", "age"))

指定好所有的讀 TAG 相關的配置之後,調用 spark.read.nebula.loadVerticesToDF 返回得到的就是掃描 NebulaGraph 之後轉換為 Dataframe 的圖數據,像這樣:

def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
        .withConenctionRetry(2)
        .build()
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("basketballplayer")
      .withLabel("player")
      .withNoColumn(false)
      .withReturnCols(List("name", "age"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    vertex.printSchema()
    vertex.show(20)
    println("vertex count: " + vertex.count())
  }

寫入的例子我這裏不列出,不過,前邊給出的代碼示例的鏈接裏是有更詳細的例子,這裏值得一提的是, Spark Connector 讀數據為了滿足圖分析、圖計算的大量數據場景 ,和大部分其他客户端非常不同,它直接繞過了 GraphD,通過掃描 MetaD 和 StorageD 獲得數據,但是寫入的情況則是通過 GraphD 發起 nGQL DML 語句寫入的。

接下來我們來做一個上手練習吧。

上手 Nebula Spark Connector

先決條件:假設下面的程序是在一台有互聯網連接的 Linux 機器上運行的,最好是預裝了 Docker 和 Docker-Compose。

拉起環境

首先,讓我們用 Nebula-Up 部署基於容器的 NebulaGraph Core v3、Nebula Studio、Nebula Console 和 Spark、Hadoop 環境,如果還沒安裝好它也會嘗試為我們安裝 Docker 和 Docker-Compose。

# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark

你知道嗎 Nebula-UP 可以一鍵裝更多東西,如果你的環境配置大一點(比如 8 GB RAM) curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash 可以裝更多東西,但是請注意 Nebula-UP 不是為生產環境準備的。

上述邊腳本執行後,讓我們用Nebula-Console(NebulaGraph 的命令行客户端)來連接它。

# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"

加載一份數據進去,並執行一個圖查詢:

# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# 等一分鐘左右

# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'

進入 Spark 環境

執行下面這一行,我們就可以進入到 Spark 環境:

docker exec -it spark_master_1 bash

如果我們想執行編譯,可以在裏邊安裝 mvn

docker exec -it spark_master_1 bash
# in the container shell

export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH

wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  mv apache-maven-$MAVEN_VERSION /usr/lib/mvn

跑 Spark Connector 的例子

選項 1(推薦):通過 PySpark

  • 進入 PySpark Shell
~/.nebula-up/nebula-pyspark.sh
  • 調用 Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
  "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "vertex").option(
    "spaceName", "basketballplayer").option(
    "label", "player").option(
    "returnCols", "name,age").option(
    "metaAddress", "metad0:9559").option(
    "partitionNumber", 1).load()

# show the dataframe with limit of 2
df.show(n=2)
  • 返回結果例子
____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
...   "com.vesoft.nebula.connector.NebulaDataSource").option(
...     "type", "vertex").option(
...     "spaceName", "basketballplayer").option(
...     "label", "player").option(
...     "returnCols", "name,age").option(
...     "metaAddress", "metad0:9559").option(
...     "partitionNumber", 1).load()
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId|          name|age|
+---------+--------------+---+
|player105|   Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows

選項 2:編譯、提交示例 JAR 包

  • 先克隆 Spark Connector 和它示例代碼的代碼倉庫,然後編譯:

注意,我們使用了 master 分支,因為當下 master 分支是兼容 3.x 的,一定要保證 spark connector 和數據庫內核版本是匹配的,版本對應關係參考代碼倉庫的 README.md

cd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git

docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
  • 替換示例項目的代碼
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala

vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
  • 把如下的代碼粘貼進去,這裏邊我們對前邊加載的圖: basketballplayer 上做了頂點和邊的讀操作:分別調用 readVertexreadEdges
package com.vesoft.nebula.examples.connector

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object NebulaSparkReaderExample {

  private val LOG = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf
    sparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .master("local")
      .config(sparkConf)
      .getOrCreate()

    readVertex(spark)
    readEdges(spark)

    spark.close()
    sys.exit()
  }

  def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
        .withConenctionRetry(2)
        .build()
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("basketballplayer")
      .withLabel("player")
      .withNoColumn(false)
      .withReturnCols(List("name", "age"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    vertex.printSchema()
    vertex.show(20)
    println("vertex count: " + vertex.count())
  }

  def readEdges(spark: SparkSession): Unit = {
    LOG.info("start to read nebula edges")

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
        .withTimeout(6000)
        .withConenctionRetry(2)
        .build()
    val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("basketballplayer")
      .withLabel("follow")
      .withNoColumn(false)
      .withReturnCols(List("degree"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
    edge.printSchema()
    edge.show(20)
    println("edge count: " + edge.count())
  }

}
  • 然後打包成 JAR 包
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
  • 最後,把它提交到 Spark 裏執行:
cd example

/spark/bin/spark-submit --master "local" \
    --class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
    --driver-memory 4g target/example-3.0-SNAPSHOT.jar

# 退出 spark 容器
exit
  • 成功之後,我們會得到返回結果:
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId|              name|age|
+---------+------------------+---+
|player105|       Danny Green| 31|
|player109|    Tiago Splitter| 34|
|player111|        David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114|     Tracy McGrady| 39|
|player150|       Luka Doncic| 20|
|player103|          Rudy Gay| 32|
|player113|   Dejounte Murray| 29|
|player121|        Chris Paul| 33|
|player128|   Carmelo Anthony| 34|
|player130|       Joel Embiid| 25|
|player136|        Steve Nash| 45|
|player108|        Boris Diaw| 36|
|player122|    DeAndre Jordan| 30|
|player123|       Ricky Rubio| 28|
|player139|        Marc Gasol| 34|
|player142|     Klay Thompson| 29|
|player145|      JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows

22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
|   _srcId|   _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100|    0|    70|
|player105|player104|    0|    83|
|player105|player116|    0|    80|
|player109|player100|    0|    80|
|player109|player125|    0|    90|
|player118|player120|    0|    90|
|player118|player131|    0|    90|
|player143|player150|    0|    90|
|player114|player103|    0|    90|
|player114|player115|    0|    90|
|player114|player140|    0|    90|
|player150|player120|    0|    80|
|player150|player137|    0|    90|
|player150|player143|    0|    90|
|player103|player102|    0|    70|
|player113|player100|    0|    99|
|player113|player101|    0|    99|
|player113|player104|    0|    99|
|player113|player105|    0|    99|
|player113|player106|    0|    99|
+---------+---------+-----+------+
only showing top 20 rows

事實上,在這個代碼倉庫下還有更多的例子,特別是GraphX 的例子,你可以嘗試自己去探索這部分。

請注意,在 GraphX 假定頂點 ID 是數字類型的,因此對於字符串類型的頂點 ID 情況,需要進行實時轉換,請參考 Nebula Algorithom 中的例子 ,瞭解如何繞過這一問題。

Nebula Exchange

Nebula Exchange 是一個 Spark Lib,也是一個可以直接提交執行的 Spark 應用,它被用來從多個數據源讀取數據寫入 NebulaGraph 或者輸出NebulaGraph SST 文件。

通過 spark-submit 的方式使用 Nebula Exchange 的方法很直接:

  • 首先創建配置文件,讓 Exchange 知道應該如何獲取和寫入數據
  • 然後用指定的配置文件調用 Exchange 包

現在,讓我們用上一章中創建的相同環境做一個實際測試。

一鍵試玩 Exchange

先跑起來看看吧

請參考前邊拉起環境這一章節,先一鍵裝好環境。

一鍵執行:

~/.nebula-up/nebula-exchange-example.sh

恭喜你,已經第一次執行成功一個 Exchange 的數據導入任務啦!

再看看一些細節

這個例子裏,我們實際上是用 Exchange 從 CSV 文件這一其中支持的數據源中讀取數據寫入 NebulaGraph 集羣的。這個 CSV 文件中第一列是頂點 ID,第二和第三列是 "姓名 "和 "年齡 "的屬性:

player800,"Foo Bar",23
player801,"Another Name",21
  • 咱們可以進到 Spark 環境裏看看
docker exec -it spark_master_1 bash
cd /root
  • 可以看到我們提交 Exchange 任務時候指定的配置文件 exchange.conf 它是一個 HOCON 格式的文件:
  • .nebula 中描述了 Nebula Graph 集羣的相關信息
  • .tags 中描述瞭如何將必填字段對應到我們的數據源(這裏是 CSV 文件)等有關 Vertecies 的信息。
{
  # Spark relation config
  spark: {
    app: {
      name: Nebula Exchange
    }

    master:local

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory: 1G
    }

    cores:{
      max: 16
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["graphd:9669"]
      meta:["metad0:9559", "metad1:9559", "metad2:9559"]
    }
    user: root
    pswd: nebula
    space: basketballplayer

    # parameters for SST import, not required
    path:{
        local:"/tmp"
        remote:"/sst"
        hdfs.namenode: "hdfs://localhost:9000"
    }

    # nebula client connection parameters
    connection {
      # socket connect & execute timeout, unit: millisecond
      timeout: 30000
    }

    error: {
      # max number of failures, if the number of failures is bigger than max, then exit the application.
      max: 32
      # failed import job will be recorded in output path
      output: /tmp/errors
    }

    # use google's RateLimiter to limit the requests send to NebulaGraph
    rate: {
      # the stable throughput of RateLimiter
      limit: 1024
      # Acquires a permit from RateLimiter, unit: MILLISECONDS
      # if it can't be obtained within the specified timeout, then give up the request.
      timeout: 1000
    }
  }

  # Processing tags
  # There are tag config examples for different dataSources.
  tags: [

    # HDFS csv
    # Import mode is client, just change type.sink to sst if you want to use client import mode.
    {
      name: player
      type: {
        source: csv
        sink: client
      }
      path: "file:///root/player.csv"
      # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
      fields: [_c1, _c2]
      nebula.fields: [name, age]
      vertex: {
        field:_c0
      }
      separator: ","
      header: false
      batch: 256
      partition: 32
    }

  ]
}
  • 我們應該能看到那個 CSV 數據源和這個配置文件都在同一個目錄下了:
bash-5.0# ls -l
total 24
drwxrwxr-x    2 1000     1000          4096 Jun  1 04:26 download
-rw-rw-r--    1 1000     1000          1908 Jun  1 04:23 exchange.conf
-rw-rw-r--    1 1000     1000          2593 Jun  1 04:23 hadoop.env
drwxrwxr-x    7 1000     1000          4096 Jun  6 03:27 nebula-spark-connector
-rw-rw-r--    1 1000     1000            51 Jun  1 04:23 player.csv
  • 然後,實際上我們可以手動再次提交一下這個 Exchange 任務
/spark/bin/spark-submit --master local \
    --class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
    -c exchange.conf
  • 部分返回結果
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...

更多的數據源,請參考文檔和配置的例子。

關於 Exchange 輸出 SST 文件的實踐,你可以參考文檔和我的舊文 Nebula Exchange SST 2.x實踐指南

Nebula Algorithm

通過 spark-submit 提交任務

我在 這個代碼倉庫 裏給出了例子,今天我們藉助 Nebula-UP 可以更方便體驗它。

參考前邊拉起環境這一章節,先一鍵裝好環境。

在如上通過 Nebula-UP 的 Spark 模式部署了需要的依賴之後

  • 加載LiveJournal 數據集
~/.nebula-up/load-LiveJournal-dataset.sh
  • 在 LiveJournal 數據集上執行一個 PageRank 算法,結果輸出到 CSV 文件中
~/.nebula-up/nebula-algo-pagerank-example.sh
  • 檢查輸出結果:
docker exec -it spark_master_1 bash

head /output/part*000.csv
_id,pagerank
637100,0.9268620883822242
108150,1.1855749056722755
957460,0.923720299211093
257320,0.9967932799358413

配置文件解讀

完整文件在 這裏 ,這裏,我們介紹一下主要的字段:

  • .data 指定了源是 Nebula,表示從集羣獲取圖數據,輸出 sinkcsv ,表示寫到本地文件裏。
data: {
    # data source. optional of nebula,csv,json
    source: nebula
    # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
    sink: csv
    # if your algorithm needs weight
    hasWeight: false
  }
  • .nebula.read 規定了讀 NebulaGraph 集羣的對應關係,這裏是讀取所有 edge type: follow 的邊數據為一整張圖
nebula: {
    # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
    read: {
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "metad0:9559"
        # Nebula space
        space: livejournal
        # Nebula edge types, multiple labels means that data from multiple edges will union together
        labels: ["follow"]
        # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
        # Make sure the weightCols are corresponding to labels.
        weightCols: []
    }
  • .algorithm 裏配置了我們要調用的算法,和算法的配置
algorithm: {
    executeAlgo: pagerank

    # PageRank parameter
    pagerank: {
        maxIter: 10
        resetProb: 0.15  # default 0.15
    }

作為一個庫在 Spark 中調用 Nebula Algoritm

請注意另一方面,我們可以將 Nebula Algoritm 作為一個庫調用,它的好處在於:

  • 對算法的輸出格式有更多的控制/定製功能
  • 可以對非數字 ID 的情況進行轉換,見 這裏

這裏我先不給出例子了,如果大家感興趣可以給 Nebula-UP 提需求,我也會增加相應的例子。

交流圖數據庫技術?加入 Nebula 交流羣請先 填寫下你的 Nebula 名片 ,Nebula 小助手會拉你進羣~~