spark111

语言: CN / TW / HK

0/处理框架?计算引擎?

什么是处理框架?什么是计算引擎?二者之间又有什么关系呢?
“处理框架”和“计算引擎”之间的区别没有什么权威的定义,
但大多数我们将处理框架定义为承担类似作用的一系列组件,这其中包括分布式存储系统和分布式计算引擎。

例如Apache hadoop可以看作一种以MapReduce作为计算引擎的处理框架,属于批处理框架。
spark是一种内存计算引擎,是一种可以进行流式计算的计算引擎。
例如apache spark可以纳入到hadoop框架中,并取代MapReduce计算引擎,这就是流式处理框架。
如果把分布式存储系统和spark计算引擎结合起来,就是流式处理框架,区别于上面的批处理框架。

也就是说处理框架的范围大于计算引擎,及计算引擎是处理框架的一部分。
spark是继mapreduce之后的下一代分布式内存计算引擎。

image.png

1/spark是什么?以及为什么会在有mapreduce计算引擎的情况下又出现了spark计算引擎。

spark是一种可以快速处理数据的计算引擎。
spark是加州大学伯克利分校的AMP实验室所开源的类Hadoop MapReduce的通用并行计算框架,
spark拥有mapreduce批计算引擎所具有的优点,
同时比mapreduce批计算引擎更好的是:
     spark Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,减少了I/O。
     因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的算法。
     在某些场景下,spark处理速度更快,可以达到低延迟的目的。

在spark计算引擎之前,大家用的是mapreduce批计算引擎。
mapreduce批计算引擎,适用于处理离线数据,也就是数据已经存在了分布式文件系统中了。
mapreduce具有一定的延迟性。

但是当今社会,一切都是快节奏,很多场景都是不允许延迟的,
spark计算引擎的出现,就是解决这个问题的。

2/spark和hadoop mapreduce的区别

<1>spark的前世今生

一提到大数据处理,相信很多人第一时间想到的是 Hadoop MapReduce。
没错,Hadoop MapReduce 为大数据处理技术奠定了基础。
近年来,随着 Spark 的发展,越来越多的声音提到了 Spark。
而Spark相比Hadoop MapReduce有哪些优势?

Spark与Hadoop MapReduce在业界有两种说法:
一是 Spark 将代替 Hadoop MapReduce,成为未来大数据处理发展的方向;
二是 Spark 将会和 Hadoop 结合,形成更大的生态圈。

其实 Spark 和 Hadoop MapReduce 的重点应用场合有所不同。
相对于 Hadoop MapReduce 来说,Spark 有点“青出于蓝而胜于蓝”的感觉,
Spark 是在Hadoop MapReduce 框架上发展起来的,在它的身上我们能明显看到 MapReduce的影子,
所以Spark 并非从头创新,而是站在了巨人“MapReduce”的肩膀上。
千秋功罪,留于日后评说,我们暂且搁下争议。

<2>Hadoop MapReduce,Spark都有哪些优势

1)计算速度快

大数据处理首先追求的是速度。
Spark到底有多快?
用官方的话说,“Spark允许Hadoop集群中的应用程序在内存中以100倍的速度运行,
即使在磁盘上运行也能快10倍”。
可能有的读者看到这里会大为感叹,的确如此,在有迭代计算的领域,Spark的计算速度远远超过MapReduce,
并且迭代次数越多,Spark的优势越明显。
这是因为Spark很好地利用了目前服务器内存越来越大这一优点,通过减少磁盘I/O来达到性能提升。
它们将中间处理数据的中间结果全部放到了内存中,仅在必要时才批量存入硬盘中。

2)应用灵活,上手容易

知道 AMPLab 的 Lester 为什么放弃 MapReduce 吗?
因为他需要把很多精力放到Map和Reduce的编程模型上,极为不便。 
Spark在简单的Map及Reduce操作之外,还支持SQL查询、流式查询及复杂查询,比如开箱即用的机器学习算法。
同时,用户可以在同一个工作流中无缝地搭配这些能力,应用十分灵活。

Spark核心部分的代码为63个Scala文件,非常的轻量级。
并且允许 Java、Scala、Python 开发者在自己熟悉的语言环境下进行工作,通过建立在Java、Scala、Python、SQL(应对交互式查询)的标准API以方便各行各业使用,同时还包括大量开箱即用的机器学习库。
它自带80多个高等级操作符,允许在Shell中进行交互式查询。即使是新手,也能轻松上手应用。

3)兼容竞争对手

Spark可以独立运行,除了可以运行在当下的YARN集群资源管理外,还可以读取已有的任何Hadoop数据。
它可以运行在任何Hadoop数据源上,比如HBase、HDFS等。
有了这个特性,让那些想从Hadoop应用迁移到Spark上的用户方便了很多。
Spark有兼容竞争对手的胸襟,何愁大事不成?

4)实时处理性能非凡

MapReduce更加适合处理离线数据,
而Spark很好地支持实时的流计算,依赖Spark Streaming对数据进行实时处理。
Spark Streaming 具备功能强大的API,允许用户快速开发流应用程序。
Spark Streaming 无须额外的代码和配置,就可以做大量的恢复和交付工作。

5)/其他

1/mapreduce和Spark都是大数据计算引擎,数据量最起码是GB和TB级别的。
2/Hadoop包括hdfs分布式文件存储系统和mapreduce分布式计算引擎。
3/spark没有分布式文件存储系统,它只是一个更高性能(比mapreduce性能好)的大数据计算引擎,
   所以如果想使用spark来处理数据,还需要依托其他的分布式文件存储系统来作为spark的数据输入,
   大家最常用的还是hdfs,所以hdfs和spark的组合还是很受欢迎的。
4/Hadoop mapreduce实质上更多是一个分布式数据基础设施: 它将巨大的数据集分派到一个由多台计算机组成的集群中,在集群中的多个节点上进行存储,
  意味着您不需要购买和维护昂贵的硬件,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到前所未有的高度。
5/Hadoop除了提供了一个HDFS分布式数据存储功能之外,还提供了叫做MapReduce的大数据处理功能,
    所以我们完全可以抛开Spark,使用Hadoop自身的MapReduce计算引擎来完成数据的处理,
    所以如果你处理的的是离线数据,或者你可以容忍延迟,你完全可以使用mapreduce,而不是使用spark。

6/spark必须依附其他的分布式文件存储系统,除了hdfs,还可以是其他的。
   但Spark默认来说还是被用在Hadoop上面的,毕竟大家都认为它们的结合是最好的。

8/Spark数据处理速度秒杀MapReduce
    Spark因为其处理数据的方式不一样DAG,RDD等机制,会比MapReduce快很多,
    MapReduce是分步对数据进行处理的: 
       “从集群中读取数据,进行一次处理,将结果写到集群,从集群中读取更新后的数据,进行下一次的处理,将结果写到集群,等等”
    反观Spark,它会在内存中以接近“实时”的时间完成所有的数据分析:“从集群中读取数据,完成所有必须的分析处理,将结果写回集群,最终完成” ,
    Spark的批处理速度比MapReduce快近10倍,内存中的数据分析速度则快近100倍,
    如果需要处理的数据和结果需求大部分情况下是静态的,且你也有耐心等待批处理的完成的话,MapReduce的处理方式也是完全可以接受的,
    但如果你需要对流数据进行分析,比如那些来自于工厂的传感器收集回来的数据,又或者说你的应用是需要多重数据处理的,那么你也许更应该使用Spark进行处理,
    大部分机器学习算法都是需要多重数据处理的,此外,通常会用到Spark的应用场景有以下方面:
         实时的市场活动,
         在线产品推荐,
         网络安全分析,
         机器日记监控等。

9/灾难恢复
  两者的灾难恢复方式迥异,但是都很不错。
  因为Hadoop将每次处理后的数据都写入到磁盘上,所以其天生就能很有弹性的对系统错误进行处理;
  Spark的数据对象存储在分布于数据集群中的叫做弹性分布式数据集(RDD: Resilient Distributed Dataset)中,这些数据对象既可以放在内存,也可以放在磁盘,
  所以RDD同样也可以提供完成的灾难恢复功能。

3/spark中的2个重要概念

<1>DAG
   有向无环图。
   有方向的,没有闭环的,
   相对于mapreduce计算引擎,spark支持DAG,能缓存中间数据,
   减少数据落盘次数,不用频繁对磁盘进行IO。


<2>RDD
   RDD:即弹性分布式数据集
   Resilient(弹性的) Distributed(分布式) Datasets(数据集)

   弹性可理解为:数据存储是弹性的,可存储在内存,也可存储在磁盘; 
   分布式可理解为:数据分布在集群的不同的节点上。
   它是对数据的高度抽象概念。

   RDD是弹性分布式数据集的逻辑抽象概念,物理数据存储在不同的节点上,但对用户透明,用户不需要知道数据实际存在哪台机器。
   RDD包含的内容下图所示:

image.png

   只读分区的集合:这保证了RDD的一致性,在计算过程中更安全可靠,此外RDD可能包含多个分区,这些分区可能在不同的机器上。
   对数据的计算函数:RDD包含了对数据的计算函数,也就是得到这个RDD所要经过的计算。
   计算数据的位置:对用户而言不需要知道数据在哪里,这些信息隐含在RDD的结构中。
   分区器:对数据分区依赖的分区算法,如hash分区器
   依赖的RDD信息:该RDD可能依赖的父RDD信息,用于失败重算或计算的DAG划分。

  RDD的特性:
    ①一个RDD由多个分区/分片组成
    ②对RDD进行一个函数操作,会对RDD的所有分区都执行相同函数操作
    ③一个RDD依赖于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,若RDD1中某节点数据丢失,
     后面的RDD会根据前面的信息进行重新计算
    ④对于Key-Value的RDD可以制定一个partitioner,告诉他如何分片。常用hash/range
    ⑤移动数据不如移动计算,注:移动数据,不如移动计算。考虑磁盘IO和网络资源传输等

image.png

<3>RDD和DAG之间的关系
   多个RDD之间通过便相连,最后这些RDD和他们之间的边组成一个有向无环图,就是这个DAG。
   最终构成了一个apark job的拓扑结构,有向无环图。
   不只是spark,现在很多计算引擎都是dag模型的,例如tez就是mr的dag改进。

image.png

4/spark是怎么进行分布式计算的

image.png

上图是一个spark的wordcount例子,根据上述stage划分原则,这个spark job划分为2个stage,
有三行,分别是数据读取、计算和存储过程。

仅看代码,用户根本体会不到背后,数据是并行计算的。
从图中能看出数据分布在不同分区(也可以理解不同机器节点上),
数据经过flatMap、map和reduceByKey算子在不同RDD的分区中流转,
这些算子(action)就是上面所说对RDD对数据进行计算的函数.

下图从更高角度看:
   spark的运行架构由Driver(可理解为master)和Executor(可理解为worker或slaver)组成,
   Driver负责把用户代码进行DAG切分,划分为不同的Stage,
   然后把每个Stage对应的task调度提交到Executor进行计算,
   这样Executor就并行执行同一个Stage的task。

image.png

5/spark数据倾斜

数据倾斜,简单来说就是数据分配不均匀,有的计算节点分的多,有的计算节点分的少。
从而导致分的多的计算节点需要更多的时间才能完成计算,所以整个job的计算时间也就很多。

数据倾斜一般发生在对数据进行重新划分以及聚合的处理过程中。
执行spark job时,数据倾斜一般发生在shuffle过程中,因为spark的shuffle过程需要进行数据的重新划分
在执行shuffle过程中,spark把各个节点上相同key的数据拉取到同一个处理节点的task中进行处理,
比如对数据按照某个维度key进行聚合或者join等含shuffle操作。

在此过程中,如果各个key对应的数据量相差很大,存在一个或者几个key对应的数据量特别大,这个时候就会发生数据倾斜。
例如一个聚合作业中,大部分key对应100条数据,但是少数个别key却对应了100万条左右的数据,
那么在执行时若一个task处理一个key对应的数据,则大部分task节点很快计算完成,个别task要处理100万条数据,所以需要花费较多的时间才能计算完成。
而整个spark作业的运行速度是由运行时间最长的task决定的,这里整个作业的运行时间变得很长,不能充分利用Spark的并行能力,极大地影响作业的处理性能。

image.png

如上图,是一个简单的数据倾斜示意图,在shuffle过程中对数据按照key进行重新划分,
其中一个key("hello")对应的数据量远大于其他key("world","start")的数据量,从而出现了数据倾斜。
这就导致后续的task处理任务中,task1的运行时间肯定远远高于其他的task2、task3的运行时间。
该spark job的运行时间也由task1的运行时间决定。
因此,在处理过程中出现数据倾斜时,spark作业的运行会非常缓慢,无法体现出spark处理大数据的高效并行优势,
甚至可能因为某些task处理的数据量过大导致内存(OOM,out of memory)溢出,使得整个作业无法正常执行。

6/spark数据倾斜的现象

数据倾斜发生后现现象会有两种情况:
<1>大部份的Task执行的时候会很快,当发生数据倾斜的task会执行很长时间。
<2>有时候数据倾斜直接报OOM即:JVM Out Of Memory内存溢出的错误。
   这就不是简单的计算速度慢的问题了,而是根本没有内存去处理了,导致spark job任务挂掉。

7/spark数据倾斜的解决方法

<1>过滤倾斜key优化法

 在spark job作业中发生数据倾斜,
 如果我们能够确定只是少数个别的key数据量倾斜较大,而这些key又对计算结果影响不大,
 在这种情况下,我们可以采用这种过滤key的方法。
 针对发生倾斜的key,预先直接过滤掉它们,使得后面的计算能够高效地快速完成。
 这就是在执行spark作业的时候,先对数据做一个调研,看看数据量比较大的key都是什么样的数据,
 如果都是一些空值,或者是一些不重要的key,则可以直接过滤掉。

<2>提升shuffle操作的并行度优化法,人为指定并行度。

在Spark的操作算子中,有一些常用的操作算子会触发shuffle操作,
如reduceByKey、aggregateByKey、groupByKey、cogroup、join、repartition等。

spark jo出现数据倾斜时,很可能就是我们的开发代码中使用的这些算子导致的,
因为这些算子的执行会触发shuffle过程,进而引起数据的重新划分,导致数据倾斜的产生。
默认情况下,我们不会考虑shuffle操作的并行度分配,而是交由spark控制。
但是spark的默认shuffle并行度(即shuffle read task的数量)为200,这对于很多场景都有点过小。
在出现数据倾斜时,我们可以显式提高shuffle操作的并行度,以缓解某些一般的数据倾斜情况。

 这种方法需要在使用shuffle类算子时,自己指定并行参数。
 在开发中我们可以对使用到的shuffle类算子预先传入一个并行参数,
 如aggregateByKey(1000),显式设置该算子执行时shuffle read task的数量为1000,
 增加实际执行的并行度到1000。

 这样增加shuffle read task的数量后,可以让原本分配给一个task的多个key数据重新划分给多个task,通过提高并行度再次分散数据,从而减少每个task的处理数据量,有效缓解和减轻数据倾斜的影响。

image.png

上图展示了一个并行度优化的简单情形。
如上图所示,由于低并行度发生数据倾斜问题时,在提高了shuffle操作的并行度之后,
之前由一个task处理的数据量被重新分散到不同的多个task(task1、task2、task3)中进行处理,
这样原来的各个task每次处理的数据量减少很多,从而缓解了存在的数据倾斜问题,能够明显提高处理速度。

显式提高shuffle操作的并行度的方法也是一种易于实现的优化方法。
这种方法可以有效缓解数据量较大而并行度较低的数据倾斜问题,但是它并不能彻底根除倾斜问题。
如果数据倾斜的原因是某些(个)key数据量较大,则这时提高并行度并不能改善数据倾斜的低性能。
同样的这种方法适用场景也很有限,只适用于一些普通场景。

<3>加随机数,及先聚合一次,再聚合一次。

在spark执行作业中,涉及聚合操作时往往会比较容易出现数据倾斜现象。
如果在spark作业执行聚合类操作算子如reduceByKey、aggregateByKey等的过程中,发生数据倾斜时,
即出现某些key要聚合的数据量较大的情况下,我们可以采用两阶段聚合优化方法,先将相同的数据打乱成不同的局部数据,进行局部聚合后再统一合并聚合。

image.png