拼多多大数据开发 1 面知识点总结(配视频~超详细)

语言: CN / TW / HK

点击上方公众号进入   3分钟秒懂大数据  主页

然后点击右上角  “设为标星”  

比别人更快接收硬核文章

大家好,我是土哥。

今天给大家分享一波这十天录的视频的知识点,同时这十个知识点被 拼多多、网易 的面试官 都考察过,出现的频率都很高,所以,没看完视频的小伙伴,可以看看文字,一块学习学习~

1 Spark 的任务提交流程你熟悉吗?

当面试官问你,Spark 的任务提交流程你熟悉吗?

你应该这样回答他:

当在命令行执行 spark -submit --master  xxx.jar 的命令后,会执行以下操作:

1、客户端向资源管理器 Master 发送注册和申请资源的请求。Master 主要负责任务资源的分配,是 Spark 集群的老大。

2、Master 收到申请资源的请求后,会向指定的 Worker 发送请求,然后 worker 开启对应的 executor 进程(计算资源)。

3、executor 进程会向 Driver 端发送注册请求,然后申请要计算的 Task。

4、在 Driver 端内部,会执行一些操作,最终会通过 TaskScheduler 提交 Task 到 executor 进程中运行。具体的细节如下:

(1) Driver 端会运行客户端程序中的 main 方法;

(2)在 main 方法中构建了 SparkContext 上下文对象,该对象是所有 Spark 程序执行的入口,在构建 SparkContext 对象的内部,也构建了两个对象,分别是 DAGScheduler 和 TaskScheduler。

(3)因为在用户代码程序中,RDD 算子会涉及大量的转换操作,然后通过一个动作(action)操作,触发任务真正的运行, 在这里会按照 RDD 和 RDD 之间的依赖关系,首先 生成一张 DAG 有向无环图。图的方向就是 RDD 算子的操作顺序,最终会将 DAG 有向无环图发送给 DAGScheduler 对象。

(4)DAGScheduler 获取到 DAG 有向无环图之后, 按照宽依赖,进行 stage 划分,由于RDD算子中包含大量的宽依赖,所以会划分出多个 stage,每一个 stage 内部有很多可以并行运行的 task 线程,然后把这些并行运行的 task 线程封装在一个 taskSet 集合中,最后将 多个 taskSet 集合发送给 TaskScheduler 对象。

(5)TaskScheduler 对象获取得到这些 taskSet 集合之后,按照 多个 stage 之间的依赖关系,前面的 stage 中的 task 先运行,后面 stage 中的 task 后运行。然后 TaskScheduler 对象依次遍历每个 taskSet 集合,获取每一个 task ,最后把每一个 Task 依次提交到 worker 节点上的 Executor 进程中运行。

5 当所有的 Task 任务在Executor 进程中依次运行完成后,Driver 端会向 Master 发送一个注销请求。

6、Master 接收到请求后,然后通知对应的 worker 节点关闭 executor 进程,最后 worker 节点上的的计算资源得到释放。

2 Spark 数据倾斜的解决方案有哪些?

首先,产生数据倾斜的主要原因 是在 shuffle 过程中,由于不同 的 key 对应的数据量不同,从而导致不同的 task 所分配的数据量不均匀所产生的。

所以要解决 Spark 的数据倾斜问题,可以从以下几方面着手处理:

1、 提高 shuffle 操作的并行度。该办法简单粗暴,直接增加 shuffle 读 task 的数量,比如设置 reduceByKey(1000) ,一般默认200

优点:有效缓解数据倾斜。缺点:无法彻底根除问题。

2、使用随机数前缀和扩容 RDD 进行 join 操作。对大量相同的 key 通过附加随机前缀变成不一样的 key,然后将这些处理后的“不同key”分散到多个task中去处理。

优点:对 join 类型的数据倾斜大都可以处理,缺点:对内存要求很高。

3、将 reduce join 转为 map join。适用于两张表 join 时,一张表数据量比较小的情况。

通过将小表 全量数据 进行广播,然后 通过 map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。

优点:不会发生 shuffle ,缺点:只适用于 大表 + 小表。

4、过滤少数导致数据倾斜的 key。 该办法的前提条件是:少数几个数据量特别多的 key 对任务的执行影响不大,可以直接通过 where 子句过滤掉。

优点:实现简单,完全解决掉数据倾斜。缺点:受限于特定场景。

5、使用 Hive ETL 预处理数据。如果导致数据倾斜的是Hive 发生的,直接对Hive 进行预处理操作,从源头规避掉数据倾斜问题。

优点:提升 Spark 作业性能。缺点:Hive ETL 也会发生数据倾斜,治标不治本。

6、两阶段聚合方式。适合  reduceByKey ,group by 分组等场景。通过先局部聚合,再全局聚合等方式实现解决数据倾斜等。

优点:显著提升Spark 作业性能。缺点:局限于聚合类shuffle 操作。

3 Spark 任务调度的方式有哪些?

在 Spark 中,任务调度的方式包含 Stage 级的调度和 Task 级的调度。

首先 在创建 SparkContext 上下文对象时,会创建出 DAGScheduler  和 TaskScheduler 。

其中 DAGScheduler 负责 Stage 级的调度,主要是将 job 根据宽依赖切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler 调度。

TaskScheduler 负责 Task 级的调度,将 DAGScheduler 发送过来的 TaskSet 按照指定的调度 策略分发到 Executor 上执行。

TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的先进先出调度策略,另一种是 FAIR,公平策略。

4 在 Flink 中,反压有哪些危害呢?

反压如果不能得到正确的处理,可能会影响到 checkpoint 时长和 state 大小,甚至 可能会导致资源耗尽甚至系统崩溃。

1)影响 checkpoint 时长:barrier 不会越过普通数据,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,导致 checkpoint 总体时间(End to End Duration)变长。

2)影响 state 大小:barrier 对齐时,接受到较快的输入管道的 barrier 后,它后面数 据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会 被放到 state 里面,导致 checkpoint 变大。

这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一 致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。

5 Flink  SQL 支持哪些 Join 操作呢?

Flink SQL 支持的 Join 操作主要包括以下 3 大类:

1、流表与流表的 Join 2    流表与维表的 Join 3、动态表字段的列转行等。

1 流表与流表的 Join 包含  Regular Join 、 Interval Join、Temporal Join。

其中 Regular Join 主要是用于 两条流之间的 操作,可以是 inner join 、或者 out join ,full join 等,适用场景如计算点击率等。缺点 会产生回撤流。

其次 Interval Join  主要是计算两条流在一段时间区间内的 Join,可以让一条流去 Join 另一条流中前后一段时间内的数据。

而 Temporal Join 主要是用于 快照 join ,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join,可以用于汇率计算的场景。

2 流表与维表的 Join ,如 Lookup Join , Lookup Join 主要用于 流与外部维表的 Join 操作,因为一般的用户画像数据会存储在Mysql、Hbase 或者 Redis 中,当用户日志流过来后,需要实时查询数据,就需要用到 LookUp join 操作。

3 动态表字段的列转行。包含 Table Function 和 Array Expansion。

其中  Array Expansion 是将表中 ARRAY 类型字段(列)压平,转为多行,适用于将一行转多行的操作。

而 Table Function 和  Array Expansion 功能类似,但本质是一个 UDTF 函数,即用户可以自定义UDF 等实现逻辑处理。

6、Flink SQL 应用提交流程

(1) 调用 parse() 方法,将 sql 转为 未经校验的 AST 抽象语法树(sqlNode) ,在解析阶段主要用到词法解析和语法解析。

词法解析将 sql 语句转为一组 token,语法解析对 token 进行递归下降语法分析。

(2)调用 validate() 方法,将 AST 抽象语法树转为经过校验的抽象语法树(SqlNode).在校验阶段主要校验 两部分:

校验表名、字段名、函数名是否正确,

校验特殊的类型是否正确,如 join 操作、groupby 是否有嵌套等。

(3)调用 rel() 方法,将 抽象语法树 SqlNode 转为 关系代数树 RelNode(关系表达式) 和 RexNode(行表达式) ,在这个步骤中,DDL 不执行 rel 方法,因为 DDL 实际是对元数据的修改,不涉及复杂查询。

(4)调用 convert()方法,将 RelNode 转为 operation ,operation 包含多种类型,但最终都会生成根节点 ModifyOperation。

在 Flink 内部的 operation 之后,会调用 translate 方法将 operation 转为 transformation。在这中间也经历了四大步骤:

(1) 调用 translateToRel() 方法 先将 ModifyOperation 转换成 Calcite RelNode 逻辑计划树,再对应转换成 FlinkLogicalRel( RelNode 逻辑计划树);

(2) 调用 optimize() 方法 将 FlinkLogicalRel 优化成 FlinkPhysicalRel。在这中间的优化规则包含 基于规则优化 RBO 和 基于代价优化 CBO 。

(3) 调用 TranslateToExecNodeGraph() 方法 将物理计划转为 execGraph。

(4) 调用 TranslateToPlan() 方法 将 execGraph 转为 transformation。

7  kafka 是如何保证对应类型数据被写到相同分区的?

主要是通过 消息键 和 分区器 来实现,分区器为键生成一个 offset,

然后使用 offset 对主题分区进行取模,为消息选取分区,这样就可以保证包含同一个键的消息会被写到同一个分区上。

如果 ProducerRecord 没有指定分区,且消息的 key 不为空,则使用 Hash 算法来计算分区分配。

如果 ProducerRecord 没有指定分区,且消息的 key 也是空,则用 轮询 的方式选择一个分区。

8 Kafka 的文件存储机制了解吗?

在 Kafka 中,一个 Topic 会被分割成多个 Partition 分区,当用户查看创建的一个 Partition 时,可以看到里面包含 3 个文件,分别为 log 文件、index 文件,以及 timeindex 文件。

这三个文件中存储的都是二进制格式的数据,其中 log 文件存储的是 BatchRecords 消息内容,而 index 和 timeindex 分别存储的是一些索引信息。

这三个文件共同组成一个 Segment,而文件名中的(0)表示的是一个 Segment 的起始 Offset。

Kafka 会根据 log.segment.bytes 的配置来决定单个 Segment 文件(log)的大小,当写入数据达到这个大小时就会创建一个新的 Segment。

在 三个文件中:timeindex 文件包含两个字段,分别为:timestamp 和 offset , i ndex 文件 包含两个字段,分别为: offset  和 position,  log 文件包含多个字段,其中最重要的就是 records 字段。

根据这三个文件,就可以基于 offset 找到对应的 Message。

9 Kafka 的 消息确认机制了解吗?

为保证 producer 发送的数据,能可靠的达到指定的 topic ,Producer 提供了消息确认机制也就是(ack 机制)。生产者往 Broker 的 topic 中发送消息时,通过配置 ack 的值来决定消息发送成功与否。

当 acks = 0:producer 不会等待任何来自 broker 的响应。

特点:低延迟,高吞吐,数据可能会丢失。

如果当中出现问题,导致 broker 没有收到消息,那么 producer 无从得知,会造成消息丢失。

当 acks = 1(默认值)时,只要集群中 partition 的 Leader 节点收到消息,生产者就会收到一个来自服务器的成功响应。

这种情况,如果在 follower 同步之前,leader 出现故障,将会丢失数据。

当 acks = -1时,只有当所有参与复制的节点全部都收到消息时,生产者才会收到一个来自服务器的成功响应。

这种模式是最安全的,可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群依然可以运行。

根据实际的应用场景,选择设置不同的 acks,以此保证数据的可靠性。

另外,Producer 发送消息还可以选择同步或异步模式,如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。

如果需要确保消息的可靠性,必须将 producer.type 设置为 sync

10 你能谈一下 yarn 的基础架构及调度流程吗?

yarn 的基础架构主要包含 3 大组件,分别为 ResourceManager、ApplicationMaster、NodeManager.

其中:

ResourceManager 是一个全局的资源管理器,负责整个系统的资源管理和分配,主要包括两个组件,即调度器(Scheduler)和应用程序管理器(Applications Manager)。

ApplicationMaster:ApplicationMaster 是 Resource Manager 根据 接收用户提交的作业,按照作业的上下文信息等 分配出一个 container 资源,然后 通知 NodeManager 为用户作业创建出一个 ApplicationMaster。

NodeManager:NodeManager 管理 YARN 集群中的每个节点,对节点进行资源监控和健康状态管理。

yarn 的调度流程简单总结如下:

1、客户端提交应用程序给 ResourceManager ResouceManager 收到请求后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster。

2、applicationMaster 来运行和管理 container 里面的任务,其中 container 会通过心跳机制向 applicationMaster 来发送运行信息。

3、任务完成之后,application 向 ResourceManager 报告,任务完成,container 进行资源释放。

拼多多一面考察的算法

举例:

(1)输入:1,3,6,2,7,5    输出:4

含义:当去除2时,最大递增子序列长度为1,3,6,7,所以长度为 4

(1)输入:1,3,8,2,6,5,7,9,3,11,1,2,3,4,3,5,6,7,8   输出:8

含义:当去除数字 3 时,最大长度为1,2,3,4,5,6,7,8 所以长度为8

解题思路:

该题的做法就是,定义最大递增长度初始值 max = 0,同时定义 累加器初始值 count = 1, 并定义开关初始值 flag = 0, 因为题目要求最多只能去除一个数字,所以让 flag 刚开始为0,当遇到要去除的数字时,flag ++,然后判断 flag <=1 里面的逻辑即可。

代码如下:

    public static int test(int[] nums){
if(nums.length==0||nums==null){
return 0;
}
// 累加器初始值为 1
int count = 1;
// 定义最大递增序列初始值为 1
int max = 1;
// 定义开关 flag 初始值为 0
int flag = 0;

for(int i=1;i<nums.length;i++){
if(flag<=1){
if(nums[i] - nums[i-1] >0){
count++;
}else {
if (i==nums.length-1) {
break;
}
if(nums[i+1]-nums[i-1]>0&&nums[i] - nums[i-1]<=0){
flag++;
}else {
count = 1;
}
}
}else{
flag = 0;
count = 1;
}
max = Math.max(count,max);
}
return max;
}

以上就是 10 个视频的全部内容,想看视频教程的,可以关注土哥抖音号,每天都会分享一道知识点,坚持打卡 100 天~

之前看到许多小伙伴的留言,建议视频多添加一些图片,后面视频会不断优化,希望能帮助大家更好的学习。 觉得对你有帮助,请关注点赞、转发、在看 三连走起~