大数据Hadoop-MapReduce学习之旅第五篇

语言: CN / TW / HK

「这是我参与11月更文挑战的第14天,活动详情查看:2021最后一次更文挑战」。

一、Join 应用

1、Reduce Join

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

2、Reduce Join 案例实操

  • 需求

    image.png

    image.png

    image.png

  • 需求分析

    通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

    image.png

  • 代码实现

    • 创建商品和订单合并后的TableBean类

    ```java public class TableBean implements Writable {

    private String id; // 订单id
    private String pid; // 商品id
    private int amount; // 商品数量
    private String pname;// 商品名称
    private String flag; // 标记是什么表 order pd
    
    // 空参构造
    public TableBean() {
    }
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public String getPid() {
        return pid;
    }
    
    public void setPid(String pid) {
        this.pid = pid;
    }
    
    public int getAmount() {
        return amount;
    }
    
    public void setAmount(int amount) {
        this.amount = amount;
    }
    
    public String getPname() {
        return pname;
    }
    
    public void setPname(String pname) {
        this.pname = pname;
    }
    
    public String getFlag() {
        return flag;
    }
    
    public void setFlag(String flag) {
        this.flag = flag;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }
    
    @Override
    public String toString() {
        // id  pname  amount
        return id + "\t" + pname + "\t" + amount;
    }
    

    } ```

    • 编写TableMapper类

    ```java public class TableMapper extends Mapper {

    private String fileName;
    private Text outK = new Text();
    private TableBean outV = new TableBean();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 初始化  order  pd
        FileSplit split = (FileSplit) context.getInputSplit();
        fileName = split.getPath().getName();
    }
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
    
        // 2 判断是哪个文件的
        if (fileName.contains("order")) {// 处理的是订单表
    
            String[] split = line.split("\t");
    
            // 封装k  v
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");
    
        } else {// 处理的是商品表
            String[] split = line.split("\t");
    
            outK.set(split[0]);
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }
    
        // 写出
        context.write(outK, outV);
    }
    

    } ```

    • 编写TableReducer类

    ```java public class TableReducer extends Reducer {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        //  01     1001   1   order
        //  01     1004   4   order
        //  01 小米          pd
        // 准备初始化集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();
    
        // 循环遍历
        for (TableBean value : values) {
    
            if ("order".equals(value.getFlag())) {// 订单表
    
                TableBean tmptableBean = new TableBean();
    
                try {
                    BeanUtils.copyProperties(tmptableBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
    
                orderBeans.add(tmptableBean);
            } else {// 商品表
    
                try {
                    BeanUtils.copyProperties(pdBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
    
        // 循环遍历orderBeans,赋值 pdname
        for (TableBean orderBean : orderBeans) {
    
            orderBean.setPname(pdBean.getPname());
    
            context.write(orderBean, NullWritable.get());
        }
    }
    

    } ```

    • 编写TableDriver类

    ```java public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);
    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);
    
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);
    
        FileInputFormat.setInputPaths(job, new Path("D:\input\inputtable"));
        FileOutputFormat.setOutputPath(job, new Path("D:\hadoop\output2"));
    
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
    

    } ```

  • 测试 运行程序查看结果

    txt 1004 小米 4 1001 小米 1 1005 华为 5 1002 华为 2 1006 格力 6 1003 格力 3

  • 总结

    缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

    解决方案:Map端实现数据合并。

3、Map Join

  • 使用场景

    Map Join适用于一张表十分小、一张表很大的场景。

  • 优点

    思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

    在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

  • 具体办法:采用DistributedCache

    在Mapper的setup阶段,将文件读取到缓存集合中。

    在Driver驱动类中加载缓存。

    java //缓存普通文件到Task运行节点。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

4、Map Join 案例实操

  • 需求

    image.png

  • 需求分析

    MapJoin适用于关联表中有小表的情形。

    image.png

  • 实现代码

    • 先在MapJoinDriver驱动类中添加缓存文件

    ```java public class MapJoinDriver {

    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
    
        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置Map输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
    
        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);
    
        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\input\inputtable2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\hadoop\output8888"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
    

    } ```

    • 在MapJoinMapper类中的setup方法中读取缓存文件

    ```java public class MapJoinMapper extends Mapper {

    private HashMap<String, String> pdMap = new HashMap<>();
    private Text outK = new Text();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 获取缓存的文件,并把文件内容封装到集合 pd.txt
        URI[] cacheFiles = context.getCacheFiles();
    
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
    
        // 从流中读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
    
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            // 切割
            String[] fields = line.split("\t");
    
            // 赋值
            pdMap.put(fields[0], fields[1]);
        }
    
        // 关流
        IOUtils.closeStream(reader);
    }
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
        // 处理 order.txt
        String line = value.toString();
    
        String[] fields = line.split("\t");
    
        // 获取pid
        String pname = pdMap.get(fields[1]);
    
        // 获取订单id 和订单数量
        // 封装
        outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);
    
        context.write(outK, NullWritable.get());
    }
    

    } ```

二、数据清洗(ETL)

“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

1、需求

去除日志中字段个数小于等于11的日志。

image.png

2、需求分析

需要在Map阶段对输入的数据根据规则进行过滤清洗。

3、实现代码

  • 编写WebLogMapper类

```java public class WebLogMapper extends Mapper {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    // 1 获取一行
    String line = value.toString();

    // 2 ETL
    boolean result = parseLog(line, context);

    if (!result) {
        return;
    }

    // 3 写出
    context.write(value, NullWritable.get());
}

private boolean parseLog(String line, Context context) {
    // 切割
    // 1.206.126.5 - - [19/Sep/2013:05:41:41 +0000] "-" 400 0 "-" "-"
    String[] fields = line.split(" ");

    // 2 判断一下日志的长度是否大于11
    if (fields.length > 11) {
        return true;
    } else {
        return false;
    }
}

} ```

  • 编写WebLogDriver类

```java public class WebLogDriver {

public static void main(String[] args) throws Exception {

    // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
    args = new String[]{"D:/input/inputlog", "D:/hadoop/output11111"};

    // 1 获取job信息
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    // 2 加载jar包
    job.setJarByClass(LogDriver.class);

    // 3 关联map
    job.setMapperClass(WebLogMapper.class);

    // 4 设置最终输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    // 设置reducetask个数为0
    job.setNumReduceTasks(0);

    // 5 设置输入和输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 6 提交
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
}

} ```

三、MapReduce 开发总结

  1. 输入数据接口:InputFormat
    • 默认使用的实现类是:TextInputFormat
    • TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
    • CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
  2. 逻辑处理接口:Mapper
    • 用户根据业务需求实现其中三个方法:map() setup() cleanup ()
  3. Partitioner分区
    • 有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
    • 如果业务上有特别的需求,可以自定义分区。
  4. Comparable排序
    • 当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
    • 部分排序:对最终输出的每一个文件进行内部排序。
    • 全排序:对所有数据进行排序,通常只有一个Reduce。
    • 二次排序:排序的条件有两个。
  5. Combiner合并
    • Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。
  6. 逻辑处理接口:Reducer
    • 用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
  7. 输出数据接口:OutputFormat
    • 默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
    • 用户还可以自定义OutputFormat。

四、友情链接

大数据Hadoop-MapReduce学习之旅第四篇

大数据Hadoop-MapReduce学习之旅第三篇

大数据Hadoop-MapReduce学习之旅第二篇

大数据Hadoop-MapReduce学习之旅第一篇