SQL Server的數據和SSIS包及視圖、用户定義函數和存儲過程遷移到AWS的Hive數據倉庫基本流程

語言: CN / TW / HK

首先,我們需要註冊AWS帳號,並啟用EC2(Elastic Cloud Computing,高性能雲計算)、EMR(Elastic Map-Reduce,高性能大數據計算)、Redshift/Snowflake數據倉庫和S3(Simple Storage Service,簡單存儲服務)、DevOps(用於Github持續集成)和Github Enterprise這些服務。
其次,我們需要梳理清楚SQL Server數據庫中SSIS包及表、視圖、用户定義函數和存儲過程之間的依賴關係。操作上可以使用SQL Server的系統存儲過程sp_depends、sysobjects和INFORMATION_SCHEMA架構下面的系統視圖來查看數據庫對象之間的關係。
這個存儲過程專門用來查看一個數據庫對象引用了哪些其它的數據庫對象
如果要用sp_depends看一個存儲過程、視圖或用户定義函數用到了哪些表及字段,就可以寫:
exec sp_depends <存儲過程名/視圖名/用户定義函數>
如果是表名要加引號
exec sp_depends ‘<表名>’
SQL Server數據庫中 INFORMATION_SCHEMA的用法如下所示。
1.查詢數據庫的所有表和視圖:
select * from INFORMATION_SCHEMA.TABLES
2.查詢表名為xxx的所有列的信息
select * from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME=’<表名/視圖名>’
在數據庫內創建的每個對象(約束、默認值、日誌、規則、存儲過程等)都在sysobjects表中佔一行。
對於SSIS包查看引用的數據庫對象,可以將發佈在SQL Server服務器上的SSIS包用系統存儲過程導出為二進制數據流,然後解析出其中用到數據庫對象和dtsx文件,然後將得到的依賴性關係的信息存儲到數據庫或文件中。遷移ETL程序的時候,可以根據依賴性關係,確定遷移項目的順序和遷移SSIS包或數據庫對象的先後順序。
然而SQL Server的T-SQL語言不擅長處理二進制數據,故我們可以用SQL CLR存儲過程,用C#編程的方法來處理二進制數據。其實,從本質上來説,SSIS包的ispac文件或SQL Server數據庫裏存儲的已經發布的包的格式是Zip格式的壓縮包,壓縮包裏面包含多個Xml格式並以dtsx為擴展名的文件,這些文件裏面就包含SQL語句和源表或視圖以及目標表的信息,SQL語句又可以通過程序代碼的語法分析得到其中的數據庫對象名稱。
對於數據遷移,如果EMR集羣可以連接到SQL Server服務,我們可以用sqoop來同時連接數據庫和Hive數倉導入數據或用PySpark程序去連接數據庫,然後讀取表數據到DataFrame對象,再存儲為Parquet文件,連接到Hive外部表。否則,我們只能用bcp、sqlcmd或osql命令行工具將表導出為數據文件,比如Csv文件,導出的時候需要將數據字段全部導出為字符串以保留原始數據的精度。用bcp命令導出字段帶引號表數據的Csv文件的命令行如下所示。
bcp <表名> out <Csv文件路徑> -S<服務器> -U<用户名> -P<密碼> -t “,” -w -q

bcp <表名> out <Csv文件路徑> -S<服務器> -E -t “,” -w -q
然後將Csv文件壓縮為Zip或Gzip格式文件之後上傳到S3,在EMR裏跑PySpark程序,將壓縮文件解壓並將Csv讀入Dataframe對象然後轉換為Parquet格式。另外,我們還可以在公司內網的電腦上安裝Spark,接着用PySpark程序直接連到SQL Server數據庫,將表的數據保存為Parquet格式數據文件,然後直接上傳到S3。
然後建立Hive外部表連接到S3上的Parquet格式數據文件,並根據SQL Server數據庫裏已經存在的視圖定義,在Hive裏面創建對應的視圖。而存儲過程和用户定義函數的邏輯可以在公共庫裏創建PySpark函數來實現。但是由於SQL Server數據庫的T-SQL編程語言的內置函數和PySpark的內置函數有所不同,所以需要進行轉換。還有Parquet格文件不能修改或追加記錄,只能刪除後重新創建,所以需要對原來的T-SQL裏面代碼進行等效代碼替換。比如,對於更新SQL語句,可以在表創建的時候加入更新SQL語句中的字段,並以更新的條件作為CASE WHEN篩選的條件。而對於刪除SQL語句,可以在表創建的時候排除刪除的條件。對於插入SQL語句,可以在表創建的時候UNION ALL需要插入的數據集或對Hive表進行分區,新建分區插入數據。對於Merge合併語句,可以使用將需要合併數據的表和用來合併的表、視圖或查詢FULL JOIN一下,然後取後者的值返回,再通過判斷合併目標表和源數據的字段值是否為NULL來確定要不要在合併數據的同時插入或刪除數據。當然,SQL Server數據庫裏的表分區設置還是可以繼續應用在Hive表分區中。
另外,在編寫PySpark代碼的過程中,還有一些內存和磁盤緩存以及AQE等其它的技巧來優化運行性能。
等數倉數據和源數據遷移完畢,並且PySpark程序也開發好之後。接下來我們需要一個調度平台來管理PySpark程序的運行和監控,比如Airflow,用來連接到PySpark程序並按項目將PySpark程序放在不同的目錄下分類存儲在S3中。Airflow是一個編排、調度和監控workflow的web平台,由Airbnb開源,跑在EC2上面。
數據倉庫的數據集市層使用Snowflake或Redshift雲數據庫存儲PySpark程序跑出來的報表數據,以供Tableau報表抽取展現,同時也存儲企業的基礎數據以供用户自行編寫SQL查詢數據。Redshift雲數據庫的運行性能較好,而Snowflake雲數據庫的易用性和易管理性較好,管理成本也較低。
另外考慮到團隊開發的需要,我們這裏使用Github Enterprise作為版本控制平台,並且我們在Github的DevOps裏配置Jenkins來做持續集成,來實現提交代碼到Git代碼倉庫時,同步保存代碼到AWS S3中,直接就能在Airflow裏看到結果,對於Hive和Snowflake/Redshift的DDL和DCL的SQL語句提交後直接在雲數據庫後直接運行的效果。Git代碼倉庫分成開發、測試和生產三個分支。測試和生產分支的代碼保持一致。開發人員在開發分支提交代碼後,會直接將代碼應用到開發環境。要發佈時,先創建一個以ticket ID和ticket描述為名稱的代碼分友,將修改過的代碼放到該分支裏,並提交到測試分支,記錄下提交的commit id,接着提交代碼修改的審批申請,審批申請通過後在測試環境下測試。測試通過之後,使用git cherry-pick,用剛才記錄的commit id提交一個補丁到生產分支做代碼發佈。代碼發佈時一般Sprint結束的那一天,這個代碼發佈的分支會和這個Sprint發佈的其它代碼分支合併,然後使用Github Enterprise的Group Review功能,讓所有參與的開發者Review合併後的代碼修改,等所有開發者都Review完了,就提交到生產代碼倉庫,最近由持續集成功能發佈修改代碼到生產環境並應用相關的DML和DCL SQL語句和Airflow代碼。