SQL Server的資料和SSIS包及檢視、使用者定義函式和儲存過程遷移到AWS的Hive資料倉庫基本流程

語言: CN / TW / HK

首先,我們需要註冊AWS帳號,並啟用EC2(Elastic Cloud Computing,高效能雲端計算)、EMR(Elastic Map-Reduce,高效能大資料計算)、Redshift/Snowflake資料倉庫和S3(Simple Storage Service,簡單儲存服務)、DevOps(用於Github持續整合)和Github Enterprise這些服務。
其次,我們需要梳理清楚SQL Server資料庫中SSIS包及表、檢視、使用者定義函式和儲存過程之間的依賴關係。操作上可以使用SQL Server的系統儲存過程sp_depends、sysobjects和INFORMATION_SCHEMA架構下面的系統檢視來檢視資料庫物件之間的關係。
這個儲存過程專門用來檢視一個數據庫物件引用了哪些其它的資料庫物件
如果要用sp_depends看一個儲存過程、檢視或使用者定義函式用到了哪些表及欄位,就可以寫:
exec sp_depends <儲存過程名/檢視名/使用者定義函式>
如果是表名要加引號
exec sp_depends ‘<表名>’
SQL Server資料庫中 INFORMATION_SCHEMA的用法如下所示。
1.查詢資料庫的所有表和檢視:
select * from INFORMATION_SCHEMA.TABLES
2.查詢表名為xxx的所有列的資訊
select * from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME=’<表名/檢視名>’
在資料庫內建立的每個物件(約束、預設值、日誌、規則、儲存過程等)都在sysobjects表中佔一行。
對於SSIS包檢視引用的資料庫物件,可以將釋出在SQL Server伺服器上的SSIS包用系統儲存過程匯出為二進位制資料流,然後解析出其中用到資料庫物件和dtsx檔案,然後將得到的依賴性關係的資訊儲存到資料庫或檔案中。遷移ETL程式的時候,可以根據依賴性關係,確定遷移專案的順序和遷移SSIS包或資料庫物件的先後順序。
然而SQL Server的T-SQL語言不擅長處理二進位制資料,故我們可以用SQL CLR儲存過程,用C#程式設計的方法來處理二進位制資料。其實,從本質上來說,SSIS包的ispac檔案或SQL Server資料庫裡儲存的已經發布的包的格式是Zip格式的壓縮包,壓縮包裡面包含多個Xml格式並以dtsx為副檔名的檔案,這些檔案裡面就包含SQL語句和源表或檢視以及目標表的資訊,SQL語句又可以通過程式程式碼的語法分析得到其中的資料庫物件名稱。
對於資料遷移,如果EMR叢集可以連線到SQL Server服務,我們可以用sqoop來同時連線資料庫和Hive數倉匯入資料或用PySpark程式去連線資料庫,然後讀取表資料到DataFrame物件,再儲存為Parquet檔案,連線到Hive外部表。否則,我們只能用bcp、sqlcmd或osql命令列工具將表匯出為資料檔案,比如Csv檔案,匯出的時候需要將資料欄位全部匯出為字串以保留原始資料的精度。用bcp命令匯出欄位帶引號表資料的Csv檔案的命令列如下所示。
bcp <表名> out <Csv檔案路徑> -S<伺服器> -U<使用者名稱> -P<密碼> -t “,” -w -q

bcp <表名> out <Csv檔案路徑> -S<伺服器> -E -t “,” -w -q
然後將Csv檔案壓縮為Zip或Gzip格式檔案之後上傳到S3,在EMR裡跑PySpark程式,將壓縮檔案解壓並將Csv讀入Dataframe物件然後轉換為Parquet格式。另外,我們還可以在公司內網的電腦上安裝Spark,接著用PySpark程式直接連到SQL Server資料庫,將表的資料儲存為Parquet格式資料檔案,然後直接上傳到S3。
然後建立Hive外部表連線到S3上的Parquet格式資料檔案,並根據SQL Server資料庫裡已經存在的檢視定義,在Hive裡面建立對應的檢視。而儲存過程和使用者定義函式的邏輯可以在公共庫裡建立PySpark函式來實現。但是由於SQL Server資料庫的T-SQL程式語言的內建函式和PySpark的內建函式有所不同,所以需要進行轉換。還有Parquet格檔案不能修改或追加記錄,只能刪除後重新建立,所以需要對原來的T-SQL裡面程式碼進行等效程式碼替換。比如,對於更新SQL語句,可以在表建立的時候加入更新SQL語句中的欄位,並以更新的條件作為CASE WHEN篩選的條件。而對於刪除SQL語句,可以在表建立的時候排除刪除的條件。對於插入SQL語句,可以在表建立的時候UNION ALL需要插入的資料集或對Hive表進行分割槽,新建分割槽插入資料。對於Merge合併語句,可以使用將需要合併資料的表和用來合併的表、檢視或查詢FULL JOIN一下,然後取後者的值返回,再通過判斷合併目標表和源資料的欄位值是否為NULL來確定要不要在合併資料的同時插入或刪除資料。當然,SQL Server資料庫裡的表分割槽設定還是可以繼續應用在Hive表分割槽中。
另外,在編寫PySpark程式碼的過程中,還有一些記憶體和磁碟快取以及AQE等其它的技巧來優化執行效能。
等數倉資料和源資料遷移完畢,並且PySpark程式也開發好之後。接下來我們需要一個排程平臺來管理PySpark程式的執行和監控,比如Airflow,用來連線到PySpark程式並按專案將PySpark程式放在不同的目錄下分類儲存在S3中。Airflow是一個編排、排程和監控workflow的web平臺,由Airbnb開源,跑在EC2上面。
資料倉庫的資料集市層使用Snowflake或Redshift雲資料庫儲存PySpark程式跑出來的報表資料,以供Tableau報表抽取展現,同時也儲存企業的基礎資料以供使用者自行編寫SQL查詢資料。Redshift雲資料庫的執行效能較好,而Snowflake雲資料庫的易用性和易管理性較好,管理成本也較低。
另外考慮到團隊開發的需要,我們這裡使用Github Enterprise作為版本控制平臺,並且我們在Github的DevOps裡配置Jenkins來做持續整合,來實現提交程式碼到Git程式碼倉庫時,同步儲存程式碼到AWS S3中,直接就能在Airflow裡看到結果,對於Hive和Snowflake/Redshift的DDL和DCL的SQL語句提交後直接在雲資料庫後直接執行的效果。Git程式碼倉庫分成開發、測試和生產三個分支。測試和生產分支的程式碼保持一致。開發人員在開發分支提交程式碼後,會直接將程式碼應用到開發環境。要釋出時,先建立一個以ticket ID和ticket描述為名稱的程式碼分友,將修改過的程式碼放到該分支裡,並提交到測試分支,記錄下提交的commit id,接著提交程式碼修改的審批申請,審批申請通過後在測試環境下測試。測試通過之後,使用git cherry-pick,用剛才記錄的commit id提交一個補丁到生產分支做程式碼釋出。程式碼釋出時一般Sprint結束的那一天,這個程式碼釋出的分支會和這個Sprint釋出的其它程式碼分支合併,然後使用Github Enterprise的Group Review功能,讓所有參與的開發者Review合併後的程式碼修改,等所有開發者都Review完了,就提交到生產程式碼倉庫,最近由持續整合功能釋出修改程式碼到生產環境並應用相關的DML和DCL SQL語句和Airflow程式碼。