Spark相关知识

Spark相关知识总结

Spark特点

  1. 快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上,Spark实现了高效的DAG执行引擎,可以基于内存来高效处理数据流
  2. 易用:Spark支持Java,Python,R,Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。也支持交互式的python和scala的shell,可以很方便的在shell中使用spark来验证解决问题的方法
  3. 通用:spark提供了统一的解决方案。Spark可以用于批处理,交互式查询Spark SQL,实时流处理Spark Streaming,机器学习MLlib和图计算GraphX。
  4. 兼容性:Spark可以很方便的与其他开源产品进行融合。Spark可以使用Hadoop 的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS,HBase和Cassandra等。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,降低了Spark的使用门槛。

RDD知识点

  1. RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入了什么函数)
  2. RDD中的所有转换都是惰性求值/延迟执行的,也就是说不会直接计算,只有当发生一个要求返回结果给Driver的Action时,这些转换才会真正运行。
  3. 使用惰性求值的原因是可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地执行
  4. RDD的算子分为两类:Transformation(返回一个新的RDD,map,filter等)和Action操作(返回值不是RDD或无返回值,reduce,collect等)。
  5. 实际开发中如果某一个RDD后续会被频繁的使用,可以将该RDD进行持久化/缓存

数据分类

  1. 结构化数据:有固定的schema,如关系型数据库中的表
  2. 半结构化数据:没有固定的schema,但是有结构,数据一般是自描述性的,如Json
  3. 非结构化数据:没有固定的schema,也没有结构,如图片或音频之类的
  4. RDD可以处理上述三种数据,SparkSQL主要用于处理结构化数据

Shuffle知识点

  1. 要实现Tungsten-Sort Shuffle机制需要满足一下条件:

    • Shuffle依赖中不带聚合操作或者没有对输出进行排序的要求
    • Shuffle的序列化器支持序列化值的重定位(当前仅支持Kryo Serializer Spark SQL框架自定义的序列化器)
    • Shuffle过程中输出的分区个数少于16777216个
  2. Spark Shuffle分为两种:基于Hash的Shuffle和基于Sort的Shuffle,

    • Hash Shuffle特点:

      • 优点是可以省略不必要的排序开销,避免了排序所需的内存开销;
      • 缺点
        • 生产的文件过多,会对文件系统造成压力
        • 大量小文件的随机读写带来一定的磁盘开销
        • 数据块写入时所需的缓存空间也会随之增加,对内存造成压力。
    • Sort Shuffle特点:

      • 优点:

        • 小文件的数量大大减少,mapper端的内存占用变少
        • Spark不仅可以处理小规模的数据,即使处理大规模的数据也不会很容易的达到性能瓶颈
      • 缺点:

        • 如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传数据的过程中到reduce端,reduce会需要同时大量的记录进行反序列化,导致大量内存消耗和GC负担巨大,造成系统缓慢甚至奔溃
        • 强制了mapper端必须要排序,即使数据本身并不需要排序
        • 要基于记录本身进行排序,性能消耗很大

Spark运行流程

  1. SparkContext向资源管理器注册并向资源管理器申请运行Executor
  2. 资源管理器分配Executor然后资源管理器启动executor
  3. Executor发送心跳到资源管理器
  4. SparkContext构建DAG执行图
  5. 将DAG分解成Stage(TaskSet)
  6. 把Stage发送给TaskScheduler
  7. Executor向SparkContext申请Task
  8. TaskScheduler将task发送给Executor运行
  9. 同时SparkContext将应用程序代码发放给Executor
  10. task在Executor上运行,运行完毕释放所有资源

Spark数据倾斜

  1. Spark中的数据倾斜问题主要是指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题
  2. 解决方案:
    • 预聚合原始数据:避免shuffle过程;增大key粒度
    • 预处理导致倾斜的key:过滤;使用随机key;sample采样对倾斜key单独进行join
    • 提高并行度: reduce端并行度的设置,该方法并没有从根本上改变数据倾斜的本质问题,只是尽可能去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题,适用于有校对key对应的数据量都比较大的情况
    • 使用map join:将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中,然后对其创建一个broadcast变量;接着对另一个RDD执行map类算子,在算子函数内,从broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用需要的方式连接起来。这种思路不会发生shuffle操作,从根本上杜绝了join操作可能导致的数据倾斜问题,但是适用条件有限,比较适合有join操作产生数据倾斜问题且其中一个RDD的数据量较小。

Spark性能优化

  1. RDD复用
    使用缓存,缓存将DataFrame,数据表或者RDD放入集群中执行器的临时存储区(内存或磁盘),这将使后续读取更快。但是缓存数据会导致序列化,反序列化和存储开销。因此缓存适用于多次重复使用相同数据集的情形。
  2. 尽早filter,减少对内存的占用
  3. 读取大量小文件,使用wholeTextFiles
  4. 合理使用mapPartition和foreachPartition
  5. 并行度设置,即各个stage的task数量。官方推荐,task数量应该设置为spark作业总CPU core数量的2-3倍
  6. 使用持久化和checkpoint
  7. 使用广播变量

Spark相关问题

Spark运行效率比MapReduce效率高在哪里

spark是借鉴了Mapreduce,并在其基础上发展起来的,继承了其分布式计算的优点并进行了改进,spark生态更为丰富,功能更为强大,性能更加适用范围广,mapreduce更简单,稳定性好。主要区别:

  • spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘
  • Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在节点内存中的只读性的数据集,这些集合石弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算
  • Spark更通用,提供了transformation和action这两大类的多功能api,另外还有流式处理sparkstreaming模块、图计算等等,mapreduce只提供了map和reduce两种操作,流计算及其他的模块支持比较缺乏
  • Spark框架和生态更为复杂,有RDD,血缘lineage、执行时的有向无环图DAG,stage划分等,很多时候spark作业都需要根据不同业务场景的需要进行调优以达到性能要求,mapreduce框架及其生态相对较为简单,对性能的要求也相对较弱,运行较为稳定,适合长期后台运行
  • Spark计算框架对内存的利用和运行的并行度比mapreduce高,Spark运行容器为executor,内部ThreadPool中线程运行一个Task,mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask。Spark程序运行并行度高
  • Spark对于executor的优化,在JVM虚拟机的基础上对内存弹性利用:storage memory与Execution memory的弹性扩容,使得内存利用效率更高

Hadoop和Spark的相同点和不同点

Hadoop底层使用MapReduce计算架构,只有map和reduce两种操作,表达能力比较欠缺,而且在MR过程中会重复的读写hdfs,造成大量的磁盘io读写操作,所以适合高时延环境下批处理计算的应用;

Spark是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括map、reduce、filter、flatmap、groupbykey、reducebykey、union和join等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark与hadoop最大的区别在于迭代式计算模型。基于mapreduce框架的Hadoop主要分为map和reduce两个阶段,两个阶段完了就结束了,所以在一个job里面能做的处理很有限;spark计算模型是基于内存的迭代式计算模型,可以分为n个阶段,根据用户编写的RDD算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以spark相较于mapreduce,计算模型更加灵活,可以提供更强大的功能。

但是spark也有劣势,由于spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如OOM内存溢出等情况,导致spark程序可能无法运行起来,而mapreduce虽然运行缓慢,但是至少可以慢慢运行完

RDD持久化原理

spark非常重要的一个功能特性就是可以将RDD持久化在内存中。

调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。

Checkpoint机制

应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。

原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

  • 控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。

  • 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续。

checkpoint和持久化的区别

最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低

RDD机制

rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。从物理的角度来看rdd存储的是block和node之间的映射。

RDD是spark提供的核心抽象,全称为弹性分布式数据集。

RDD在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作(分布式数据集)

比如有个RDD有90W数据,3个partition,则每个分区上有30W数据。RDD通常通过Hadoop上的文件,即HDFS或者HIVE表来创建,还可以通过应用程序中的集合来创建;RDD最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的RDD partition因为节点故障,导致数据丢失,那么RDD可以通过自己的数据来源重新计算该partition。这一切对使用者都是透明的。

RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。比如某结点内存只能处理20W数据,那么这20W数据就会放入内存中计算,剩下10W放到磁盘中。RDD的弹性体现在于RDD上自动进行内存和磁盘之间权衡和切换的机制。