PySpark 連線 MySQL 示例

語言: CN / TW / HK

本文釋出於掘金,作者:嚴北(wx: shin-devops),禁止盜用

配置流程

  1. 安裝 pyspark
  2. 配置 mysql-connector.jar
  3. 建立連線
  4. 讀取資料

安裝 PySpark

本地建立一個新的專案,執行 pip install pyspark==3.0,安裝 PySpark。

MySQL-Connector 配置

下載

進入 https://dev.mysql.com/downloads/connector/j/ 下載對應的版本的 Platform Independent 的壓縮包:

Connector/J version JDBC version MySQL Server version JRE Required JDK Required for Compilation Status
5.1 3.0, 4.0, 4.1, 4.2 5.61, 5.71, 8.01 JRE 5 or higher1 JDK 5.0 AND JDK 8.0 or higher2, 3 General availability
8.0 4.2 5.6, 5.7, 8.0 JRE 8 or higher JDK 8.0 or higher2 General availability. 推薦

點選檢視完整 版本關聯關係

mysql-connector-java-8.0.20.tar.gz 為例,下載完成後,解壓得到 mysql-connector-java-8.0.19.jar

移動到 SPARK_HOME 路徑下

若通過其他方式安裝,可以在本地執行 echo $SPARK_HOME 可以檢視 Spark 安裝路徑。

直接通過 pip install pyspark==3.0 的方式安裝 PySpark,$SPARK_HOME 環境變數為空,這時候網上其他的配置文件提到的 “複製 mysql-connector.jar 到 $SPARK_HOME/jars 資料夾下” 的步驟就無法執行下去,程式碼會報:

java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
複製程式碼

本文釋出於掘金,作者:嚴北(wx: shin-devops),禁止盜用

解決方法,是通過 PySpark 程式碼中的 _find_spark_home 方法找到 $SPARK_HOME:

>>> from pyspark import find_spark_home
>>> print(find_spark_home._find_spark_home())

/home/ityoung/test-spark/venv/lib/python3.6/site-packages/pyspark
複製程式碼

然後將 $SPARK_HOME 設定為該路徑,複製 mysql-connector.jar$SPARK_HOME/jars 中即可:

export SPARK_HOME=/home/ityoung/test-spark/venv/lib/python3.6/site-packages/pyspark
mv mysql-connector-java-8.0.19.jar $SPARK_HOME/jars
複製程式碼

Spark 程式碼示例

參考:zhuanlan.zhihu.com/p/136777424 轉載註明出處

main.py

# 本文釋出於[掘金](https://juejin.im/user/5a9ea689f265da23906b89e1),作者:嚴北(wx: shin-devops),禁止盜用
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
​
if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'xxx',
            'password': 'xxx',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://host:port/database'
    
    # 讀取表
    data = spark.read.jdbc(url=url, table='tb_test', properties=prop)
    # 列印data資料型別
    print(type(data))
    # 展示資料
    data.show()
    # 關閉spark會話
    spark.stop()
複製程式碼

修改程式碼中的配置,執行即可看到資料輸出:

python main.py
複製程式碼

本文釋出於掘金,作者:嚴北(wx: shin-devops),禁止盜用