使用Kettle动态生成页码并实现分页数据同步

语言: CN / TW / HK

需求思路解决方案根据表数据生成页码根据页码循环同步数据总结拓展参考文献

需求

将 DB2 数据库中的表数据导入另一个 DB2 数据库的表里面。

源表(DB2):table1

目标表(DB2):table2

数据量:千万级别

思路

当时直接使用 Kettle 将数据从源表导入到目标表中,但是考虑到数据量过于庞大,实际执行过程花费了很长时间,因此考虑采用分页导入的方式来进行数据传输,即:

根据实际情况设置一个每次处理的数据量,比如:5,000条,然后根据总的数据条数和每次处理的数据量计算出一共分几页。

假设总数据量有:10,000,000,所以页数为:10,000,000/5,000=2000页

注: 若存在小数,小数部分算一页,比如:20.3算21页

解决方案

根据上述思路,我们首先需要考虑如何计算得到总页数,以及页码。可以考虑增加一个辅助配置表来存放页码,这也是我在网上看到的处理方法,但是不符合工作需求,所以我考虑必须自动计算一个表的总页数,并生成页码。最后根据页码来循环导入数据。

主流程如下:onetable_A.kjb

流程说明:

  1. 首先我们建立一个作业流程,然后配置一个 START 节点;
  2. 第一个转换流程用于查询表数据的数目,并计算得到总页数,以及得到一个页码集合;
  3. 再建一个作业流程,来循环处理页码,主要是将第几页的数据从源表同步到目标表中。

根据表数据生成页码

首先我们来重点关注第一个转换流程,这也是本次实现过程中最重要的一点。

流程结构如下:

getTotal 节点用于查询表数据的数目,其实就是在系统表中做查询操作,具体实现如下:

点击预览按钮,结果如下:

Java 代码脚本部分主要是根据上一步得到的数据数目来计算页码,并生成一个页码集合。关于 Java 代码脚本的使用,这里就不做介绍了。构建过程如下:

Java 代码如下:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;

    /* TODO: Your code here. (Using info fields)

    FieldHelper infoField = get(Fields.Info, "info_field_name");

    RowSet infoStream = findInfoRowSet("info_stream_tag");

    Object[] infoRow = null;

    int infoRowCount = 0;

    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){

      // do something with info data
      infoRowCount++;
    }
    */

  }

  Object[] r = getRow();

  if (r == null) {
    setOutputDone();
    return false;
  }

  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.

  /* TODO: Your code here. (See Sample)

  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);

  foobar += "bar";

  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);

  */

//此处创建 r,是为了获取输入参数TOTAL_SRC的值
r = createOutputRow(r, data.outputRowMeta.size());

double num = get(Fields.In, "TOTAL_SRC").getNumber(r);
int pageNum = 5000;
int pages = (int)num/pageNum +1;    //计算总页数
//生成页码,并输出
for(int i=0;i<pages;i++){
    //个人觉得r类似于输出器,如果想将每个页码都输出去,则必须独立进行声明,此步骤为本人测试所得
      r = createOutputRow(r, data.outputRowMeta.size());
    get(Fields.Out, "PAGE").setValue(r, i);     //将页码赋值给PAGE
    //get(Fields.Out, "TJOBSEQ").setValue(r, get(Fields.In, "JOB_SEQ").getString(r));
    //get(Fields.Out, "id").setValue(r, i);
  putRow(data.outputRowMeta, r);

  // Send the row on to the next step.

  return true;
}
复制代码

虽然我们想要得到的页码为整数类型,但是在设置 PAGE 类型时需要设置为 String 类型,否则会报错,如下图所示:

并不影响后续的使用。

对于下述流程进行测试验证。

执行结果为:

从结果可以看出,每个页码都被正确输出。那么接下来我们需要将页码复制到结果中,传递到接下来的作业流程中。

根据页码循环同步数据

页码生成完毕后,接下来就是根据页码从源表查询数据,然后同步到目标表中。流程设计如下:

传进来的页码必须先从结果中获取到,然后再定义为变量,才能被后续所使用。

第一个转换流程的内部实现如下:

为了区分,我们将变量名叫做 EPAGE。

接下来就是数据同步,查看 getData_Epage。

首先需要根据页码从源表中获取到数据,注意这里为了使用页码条件,查询得到的结果不得不多了一列结果。

表输出时,注意不要勾选裁剪表,需要指定数据库字段,将上面查到的结果中多的一列给剔除掉。如果不想在此处做数据库字段指定操作,可以修改表输入中的查询语句。

select
REC_CREATOR,
REC_CREATE_TIME,
....
from (select ROW_NUMBER() over() as a, g001.* from
ZGROD112.D112_L2_FV_QD_CPCSEG002 g001) as temp 
where a>=(5*${EPAGE}+1and a<=(5*(${EPAGE}+1))
复制代码

至此,关于大规模数据表之间的同步操作结束。

总结

本例重点讲述了如何根据表数据的数目动态生成页码,从而减少了页码配置表的构建步骤,最终减轻数据同步对于内存资源的占用。

拓展

实际工作中,我们不可能只对一张表做数据同步,往往会针对多张表做同步操作,所以对每张表还会有一个循环处理操作。关于这种情况的处理,需要一个额外的配置表,用来存放源系统和目标系统基本信息,以及源表和目标表信息,如果需要做增量同步操作,还可以加几个字段。后续有时间可以简单写一下。

参考文献

采用Kettle分页处理大数据量抽取任务