Spark笔记(3):集群运行Spark程序实例讲解

之前都是在spark shell上执行,当数据量达到一定程度,我们可以利用Spark的集群模式来运行,增加算力,而且本地小数据量验证成功的代码可以直接放到集群上跑。

这一小节从提交一个集群环境下的Spark Job出发,讨论了在集群运行Spark Job时的配置项,再讲到Spark基础的架构,最后讲解了一下Spark Job性能调试的经验。

一、spark-submit应用部署

Spark使用spark-submit脚本来启动集群部署模式,进入/usr/bin/目录,可以看到一个spark-submit脚本,输入spark-submit –help可以看到一些配置选项介绍,截取一部分如下图:

以下是一个spark-submit提交scala应用的shell脚本的示例,我们可以看到在主程序job_submit中,我们从控制台传入了四个所需参数,对spark-submit进行了配置,比如class/master/queue等等,以下会逐一讲解,最后我们传入了JAR_FILE及其参数,这是我们的需要执行的打包的scala程序:

所以spark-submit的一般格式是这样的:

1
2
usr/bin/spark-submit [options] <app jar | python file> [app options]
usr/bin/spark-submit [选项配置] <程序jar包或者python脚本文件> [程序参数]

这里主要讲几个比较常用的配置项:

  • – master: 集群的master URL,上面的示例中我们传入了yarn集群

有以下可以接收的值:

描述
spark://host:port 连接到指定端口的Spark独立集群,默认Saprk独立主节点端口为7077,例如spark://23.195.26.187:7077
mesos://host:port 连接到指定端口的Mesos集群,默认为5050端口
yarn yarn集群,需设置环境变量HADOOP_CONF_DIR为HADOOP配置目录
local 本地模式,使用单核
local[N] 本地模式,N个核
local[*] 本地模式 尽可能多的核
  • – class: 运行Java或scala程序时应用的主类,示例中为com.stat.UserAdProfileGenerator

  • – delopy-mode: 选择在客户端client还是在集群cluster启动驱动器

  • – name: 该应用显示在Spark网页用户界面上的名称,示例中未设置该参数

  • – files: 需放到该应用工作目录中的文件列表,我们程序中往往会使用一些数据文件,可以放在这里

  • – py-files: 需添加到PYTHONPATH中的文件列表,可以包含.py/.egg/.zip文件

  • – driver-memory: 驱动器进程使用的内存量,示例为16G

  • – executor-memory: 执行器进程使用的内存量,示例为16G

  • – num-executors: 指定执行器数量,只在yarn集群模式下使用,示例为50个

  • – exectors-cores: 执行器节点的核心数,只在yarn集群模式下使用,示例为1个

以上这些选项主要包括两部分,第一部分为调度的信息,比如master、driver-memory、exectors-cores,这部分和我们的性能调试息息相关,后面会讲到怎么去做调整;第二部分是依赖,比如files、py-files。

上面出现了一些陌生的概念,比如驱动器、执行器、集群等等,这些都是Spark架构中的概念,分别看一下。

二、Spark基础架构

下图为Spark架构组成图,有三个主要组成部分,Driver Program、Cluster Manager和Worker Node。

  • Driver Program:驱动器程序,负责中央协调,调度各个分布式工作节点;
  • Cluster Manager:集群管理器,控制整个集群,监控worker。自带的集群管理器称为独立集群管理器,也可以运行在Hadoop YARN或者Apache Mesos上;
  • Worker Node:工作节点,负责控制计算节点,启动Executor或者Driver。
  • Executor:执行器节点,运行在worker node上的一个进程;

分别具体介绍一下驱动器节点、执行器节点和集群管理器,如下:

驱动器节点

  • 作用:执行main()方法的进程,比如创建SparkContext、创建RDD、RDD转化和行动操作的代码,当我们启动Spark shell时,就启动了Spark驱动器程序,而当驱动器程序一旦终止,Spark应用也就结束了。

  • 职责:

      1. 所有的Spark程序不外乎创建RDD,RDD转化操作生成新的RDD,最后行动操作存储RDD的数据,这样就构成了一个有向无环图(DAG)的操作逻辑图,当驱动器节点启动时,Spark把逻辑图转为物理执行计划,即转变为一系列步骤(Stage),每个步骤包含多个任务(Task),这些任务打包后被送到集群进行分布式计算。Task是Spark中最小的工作单元。
      1. 驱动器节点启动,生成物理执行计划,驱动器程序负责协调各执行器进程之间的各个任务,执行器进程启动后会在驱动器上注册自己,以在驱动器上有每个执行器进程的完整记录,每个执行器节点代表一个能够处理任务和存储RDD数据的进程。Spark会把分配任务给合适的执行器进程,执行器进程会缓存数据,驱动器跟踪数据缓存任务,从而调度之后的任务,尽可能减少数据网络传输。

执行器节点

  • 作用:负责在Spark作业中运行任务,任务间相互独立;与Spark任务同时启动,伴随整个生命周期,即使执行器节点发生了异常奔溃,Saprk也可继续执行
  • 职责:
      1. 负责运行组成Spark应用的任务,并将结果返回给驱动器进程;
      1. 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在执行器进程中的,所以可以在运行时充分利用缓存数据提高运算速度。

集群管理器

  • 作用:Spark依赖于集群管理器启动执行器节点,在某些特殊情况下,也会依赖集群管理器来启动驱动器节点。Spark有自带的独立集群管理器,也可以运行在其他外部集群管理器上,如YARN和Mesos等。

这里主要说一下YARN集群管理器,Yarn(Yet Another Resource Negotiator)是Hadoop的资源管理层,它旨在提供一个通用且灵活的框架来管理Hadoop集群中的计算资源。它也是一种主/从结构,由三个部件组成,分别是:

  • Resource Manager (RM)
  • Node Manager (NM)
  • Application Master (AM)

想了解这三个组成部分的细节,可以细读这篇文章Hadoop Yarn的架构

在使用层面,我们看一下如何设置YARN作为集群管理器:

  1. 设置指向HADOOP配置目录的环境变量:找到HADOOP的配置目录,设为环境变量HADOOP_CONF_DIR => export HADOOP_CONF_DIR = “…”
  2. 使用spark-submit向主节点URL提交作业 => spark-submit –master yarn ···

再看一下如何配置资源用量,这个在之前已经提到过了:

  • –num -executors :设置执行器节点,默认值为2,一般要提高这个值
  • –executor -memory: 设置每个执行器的内存用量
  • –executor -cores: 设置每个执行器进程从YARN中占用的核心数目
  • –queue:设置队列名称,YARN可以将应用调度到多个队列中

这里几个参数的调优会在第四小节讲到。

讲完了spark-summit和架构部分,那如果想在代码层面进行debug,有哪些比较好的方法,下一章节分别介绍一下。

三、Spark UI与日志

Spark可以通过单机和集群模式来部署代码,单机部署的时候我们可以通过断点调试来修正代码,但集群部署却很难用该方法去调试,但我们可以通过日志分析或者Spark UI界面来进行调试和优化我们的代码。

Spark UI

Spark UI是呈现Spark应用性能以及其他信息的前端展示页面,

看一下导航栏,按照数字标注分别是:

  1. job页面:列出了当前应用的所有任务,以及所有的excutors中action的执行时间、耗时、完成情况;
  2. Stages:查看应用的所有stage,粒度上要比job更细一些;
  3. storage:查看应用目前使用了多少缓存,一般由cache persist等操作触发
  4. environment:展示当前spark所依赖的环境,比如jdk,lib等等
  5. executors:查看申请使用的内存大小以及shuffle中input和output等数据
  6. AppName:显示代码中使用setAppName设定应用名字

下图为Spark的UI主页面,这里展示了已完成的Job信息,点击Job进入详细页面,分为两部分,一部分是event timeline,另一部分是进行中和完成的job任务。

更多页面的详细介绍可以参考这个链接

基于SparkUI性能优化与调试

yarn logs

如果我们没法从UI界面中排查出错原因,这时候只有把driver和executor进程所生成的日志来得到更多的信息,因为日志会详细记录各种异常事件,比如内部警告以及用户代码输出的详细异常信息。

在YARN模式下,我们可以在控制台输入:

1
yarn logs -applicationId <app ID>

示例如下:

app ID一般会在程序执行的时候打印到控制台,或者一般把日志重定向到log文件中,在执行过程中用tail -f 查看执行日志,实时监控执行进程。

四、Spark 性能调优经验

打印信息

可以在程序中打印出一些输出信息,数据量很大的时候,做一些take(10)/show(10)/count()这样的action操作,就可以看到一些变量的中间状态了,比如将yarn作为资源管理器时,直接可以在yarn logs中就可以看到这些信息。这对我们调试也是很有帮助的,可以知道我们的程序是否得到了我们想得到的结果。

并行度调优

在物理执行期间,RDD会分为多个分区,每个分区存储了全体数据的一个子集,然后Spark会给每个分区的数据创建一个Task,Spark一般会根据其底层的存储系统自动推断出合适的并行度,比如从HDFS读数据会为每个文件区块创建一个分区,从混洗后的RDD派生下来的RDD的并行度保持和父一致。

并行度会从两方面影响性能:

  1. 并行度过低时,会出现资源限制的情况,可以提高并行度来充分利用更多的计算核心
  2. 并行度过高时,每个分区产生的间接开销累计起来会更大。评价并行度是否过高可以看你的任务是不是在瞬间(毫秒级)完成的,或者任务是不是没有读写任何数据。

如何调优:

  1. 在数据混洗操作时,对混洗后的RDD指定并行度
  2. 对任何已有的RDD进行重新分区来获取更多或更少的分区数,重新分区可用repartition(),减少分区可用coalesce(),coalesce不会打乱数据,所以比repartition效率高

以下是一个实例:

1
2
3
4
5
6
7
8
9
10
input = sc.textFile("s3o://···")
input.getNumPartitions()
10000

lines = input.filter(lambda x: x.startwith('20190625'))
lines.getNumPartitions()
10000

lines = lines.coalesce(5).cache()
4

内存管理

1.改变内存比例

  • RDD存储(60%):当调用RDD的persist()或cache()方法时,这个RDD分区会被存储到缓存中,Spark会根据spark.storage.memoryFraction限制用来缓存的内存占整个JVM堆空间的比例大小。若超出限制,旧的分区数据会被移出内存。
  • 数据混洗与聚合缓存区(20%):当数据进行数据混洗时,Spark会创造一些中间缓存区来存储数据混洗的输出数据。根据spark.shuffle.memoryFraction限定这种缓存区占总内存的比例。
  • 用户的代码(20%):spark可以执行任意代码,所以用户的代码可以申请大量内存,它可以访问JVM堆空间中除了分配给RDD存储和数据混洗存储以外的全部空间。

2.改进缓存等级

  • cache()操作:它以内存优先即MEMORY_ONLY的存储等级持久化数据,分区不够用的话,旧分区会直接删除。需要用到时会再重新算,这样效率会低。一般不太建议直接使用cache,一旦cache的量很大,就会导致内存溢出。
  • persist()操作可以持久化级别,比如使用MEMORY_AND_DISK级别调用persist(),内存如果放不下的旧分区会被写入磁盘,当再次需要用到的时候再从磁盘上读取,代价比重算要低,性能也会稳定一些,比如从数据库中读取数据时,重算(即读取)的代价很大,这个时候就很有效

硬件资源

硬件资源会很大程度上影响Job的效率,主要有以下几个:

  • – executor-memory: executor的内存,各种部署模式都可以,当任务失败,报错信息为sparkContext shutdown时,基本是内存不足导致的。可以尝试调大–excutor-memory参数,但若系统条件受限,无法加大内存,可以局部进行调试,把程序按执行步骤检查问题点。
  • – executor-cores: executor的核数,仅在yarn模式下
  • – num–executors: executor节点的总数,仅在yarn模式下

性能调优可以在实践中不断积累经验,以上只是一些参考。如果想了解更多的调优方法,可以访问Tuning Spark

参考资料:

《Spark大数据分析》

分享到