SQL Server的数据和SSIS包及视图、用户定义函数和存储过程迁移到AWS的Hive数据仓库基本流程

语言: CN / TW / HK

首先,我们需要注册AWS帐号,并启用EC2(Elastic Cloud Computing,高性能云计算)、EMR(Elastic Map-Reduce,高性能大数据计算)、Redshift/Snowflake数据仓库和S3(Simple Storage Service,简单存储服务)、DevOps(用于Github持续集成)和Github Enterprise这些服务。
其次,我们需要梳理清楚SQL Server数据库中SSIS包及表、视图、用户定义函数和存储过程之间的依赖关系。操作上可以使用SQL Server的系统存储过程sp_depends、sysobjects和INFORMATION_SCHEMA架构下面的系统视图来查看数据库对象之间的关系。
这个存储过程专门用来查看一个数据库对象引用了哪些其它的数据库对象
如果要用sp_depends看一个存储过程、视图或用户定义函数用到了哪些表及字段,就可以写:
exec sp_depends <存储过程名/视图名/用户定义函数>
如果是表名要加引号
exec sp_depends ‘<表名>’
SQL Server数据库中 INFORMATION_SCHEMA的用法如下所示。
1.查询数据库的所有表和视图:
select * from INFORMATION_SCHEMA.TABLES
2.查询表名为xxx的所有列的信息
select * from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME=’<表名/视图名>’
在数据库内创建的每个对象(约束、默认值、日志、规则、存储过程等)都在sysobjects表中占一行。
对于SSIS包查看引用的数据库对象,可以将发布在SQL Server服务器上的SSIS包用系统存储过程导出为二进制数据流,然后解析出其中用到数据库对象和dtsx文件,然后将得到的依赖性关系的信息存储到数据库或文件中。迁移ETL程序的时候,可以根据依赖性关系,确定迁移项目的顺序和迁移SSIS包或数据库对象的先后顺序。
然而SQL Server的T-SQL语言不擅长处理二进制数据,故我们可以用SQL CLR存储过程,用C#编程的方法来处理二进制数据。其实,从本质上来说,SSIS包的ispac文件或SQL Server数据库里存储的已经发布的包的格式是Zip格式的压缩包,压缩包里面包含多个Xml格式并以dtsx为扩展名的文件,这些文件里面就包含SQL语句和源表或视图以及目标表的信息,SQL语句又可以通过程序代码的语法分析得到其中的数据库对象名称。
对于数据迁移,如果EMR集群可以连接到SQL Server服务,我们可以用sqoop来同时连接数据库和Hive数仓导入数据或用PySpark程序去连接数据库,然后读取表数据到DataFrame对象,再存储为Parquet文件,连接到Hive外部表。否则,我们只能用bcp、sqlcmd或osql命令行工具将表导出为数据文件,比如Csv文件,导出的时候需要将数据字段全部导出为字符串以保留原始数据的精度。用bcp命令导出字段带引号表数据的Csv文件的命令行如下所示。
bcp <表名> out <Csv文件路径> -S<服务器> -U<用户名> -P<密码> -t “,” -w -q

bcp <表名> out <Csv文件路径> -S<服务器> -E -t “,” -w -q
然后将Csv文件压缩为Zip或Gzip格式文件之后上传到S3,在EMR里跑PySpark程序,将压缩文件解压并将Csv读入Dataframe对象然后转换为Parquet格式。另外,我们还可以在公司内网的电脑上安装Spark,接着用PySpark程序直接连到SQL Server数据库,将表的数据保存为Parquet格式数据文件,然后直接上传到S3。
然后建立Hive外部表连接到S3上的Parquet格式数据文件,并根据SQL Server数据库里已经存在的视图定义,在Hive里面创建对应的视图。而存储过程和用户定义函数的逻辑可以在公共库里创建PySpark函数来实现。但是由于SQL Server数据库的T-SQL编程语言的内置函数和PySpark的内置函数有所不同,所以需要进行转换。还有Parquet格文件不能修改或追加记录,只能删除后重新创建,所以需要对原来的T-SQL里面代码进行等效代码替换。比如,对于更新SQL语句,可以在表创建的时候加入更新SQL语句中的字段,并以更新的条件作为CASE WHEN筛选的条件。而对于删除SQL语句,可以在表创建的时候排除删除的条件。对于插入SQL语句,可以在表创建的时候UNION ALL需要插入的数据集或对Hive表进行分区,新建分区插入数据。对于Merge合并语句,可以使用将需要合并数据的表和用来合并的表、视图或查询FULL JOIN一下,然后取后者的值返回,再通过判断合并目标表和源数据的字段值是否为NULL来确定要不要在合并数据的同时插入或删除数据。当然,SQL Server数据库里的表分区设置还是可以继续应用在Hive表分区中。
另外,在编写PySpark代码的过程中,还有一些内存和磁盘缓存以及AQE等其它的技巧来优化运行性能。
等数仓数据和源数据迁移完毕,并且PySpark程序也开发好之后。接下来我们需要一个调度平台来管理PySpark程序的运行和监控,比如Airflow,用来连接到PySpark程序并按项目将PySpark程序放在不同的目录下分类存储在S3中。Airflow是一个编排、调度和监控workflow的web平台,由Airbnb开源,跑在EC2上面。
数据仓库的数据集市层使用Snowflake或Redshift云数据库存储PySpark程序跑出来的报表数据,以供Tableau报表抽取展现,同时也存储企业的基础数据以供用户自行编写SQL查询数据。Redshift云数据库的运行性能较好,而Snowflake云数据库的易用性和易管理性较好,管理成本也较低。
另外考虑到团队开发的需要,我们这里使用Github Enterprise作为版本控制平台,并且我们在Github的DevOps里配置Jenkins来做持续集成,来实现提交代码到Git代码仓库时,同步保存代码到AWS S3中,直接就能在Airflow里看到结果,对于Hive和Snowflake/Redshift的DDL和DCL的SQL语句提交后直接在云数据库后直接运行的效果。Git代码仓库分成开发、测试和生产三个分支。测试和生产分支的代码保持一致。开发人员在开发分支提交代码后,会直接将代码应用到开发环境。要发布时,先创建一个以ticket ID和ticket描述为名称的代码分友,将修改过的代码放到该分支里,并提交到测试分支,记录下提交的commit id,接着提交代码修改的审批申请,审批申请通过后在测试环境下测试。测试通过之后,使用git cherry-pick,用刚才记录的commit id提交一个补丁到生产分支做代码发布。代码发布时一般Sprint结束的那一天,这个代码发布的分支会和这个Sprint发布的其它代码分支合并,然后使用Github Enterprise的Group Review功能,让所有参与的开发者Review合并后的代码修改,等所有开发者都Review完了,就提交到生产代码仓库,最近由持续集成功能发布修改代码到生产环境并应用相关的DML和DCL SQL语句和Airflow代码。