JavaAPI提交Spark任务
[toc]
# Java代码提交Spark2任务
# 实现功能
通过Java将Spark自带的Example提交到Yarn中
# 基础API介绍
# org.apache.spark.deploy.yarn.Client
org.apache.spark.deploy.yarn.Client
用于在 YARN 上启动和管理 Spark 应用程序。以下是对 Client
类的详细介绍:
# 功能概述
Client
类的主要职责是与 YARN ResourceManager 通信,申请运行 Spark 应用程序所需的资源,并启动相应的容器。具体功能包括:
- 资源申请:向 YARN ResourceManager 发送资源请求,以便为 Spark 应用程序分配计算资源(如 CPU、内存)。
- 应用程序启动:在获得资源后,启动 Spark 应用程序的各个组件(如 Driver 和 Executor)。
- 监控与管理:持续监控应用程序的运行状态,处理可能出现的错误和资源不足等问题。
# 主要方法
run()
: 这是 Client 类的主要入口点,用于启动整个应用程序的执行过程。prepareLocalResources()
: 准备 Spark 应用程序运行所需的本地资源(如 JAR 包、配置文件)。monitorApplication()
: 监控应用程序的运行状态,确保其按预期运行。
# org.apache.spark.deploy.yarn.ClientArguments
org.apache.spark.deploy.yarn.ClientArguments
是 Client
类的辅助类,用于解析和管理命令行参数。以下是对 ClientArguments
类的详细介绍:
# 功能概述
ClientArguments
类的主要作用是从命令行参数中提取出应用程序的配置信息和资源需求,并将这些信息传递给 Client
类。它解析的参数包括:
- 应用程序的 JAR 包路径
- 应用程序的主类
- 应用程序的主类参数
# org.apache.hadoop.yarn.client.api.YarnClient
org.apache.hadoop.yarn.client.api.YarnClient
是 YARN 的客户端 API,用于与 YARN ResourceManager 和 NodeManager 交互,以便管理应用程序的生命周期。以下是对 YarnClient
类的详细介绍:
# 功能概述
YarnClient
类提供了一组方法来提交、监控和管理 YARN 应用程序。它的主要职责包括:
- 应用程序提交:将应用程序提交到 YARN 集群,并获取应用程序的应用程序 ID。
- 应用程序监控:监控应用程序的运行状态,包括查询应用程序的状态、日志和进度。
- 应用程序管理:提供杀死和停止应用程序的方法。
# 主要方法
createYarnClient()
: 创建一个新的YarnClient
实例。init(Configuration conf)
: 初始化YarnClient
,使用给定的配置。start()
: 启动YarnClient
。submitApplication(ApplicationSubmissionContext appContext)
: 提交应用程序到 YARN。getApplicationReport(ApplicationId appId)
: 获取应用程序的报告,包括状态和其他详细信息。killApplication(ApplicationId appId)
: 杀死正在运行的应用程序。
# org.apache.hadoop.yarn.api.records.ApplicationReport
ApplicationReport
类是 Hadoop YARN API 的一部分,提供有关正在运行或已完成的应用程序的详细信息。以下是 ApplicationReport
可以获取的一些重要信息:
ApplicationReport 可以获取的信息
Application ID
- 方法:
getApplicationId()
- 描述:获取应用程序的唯一标识符。
- 方法:
Application Name
- 方法:
getName()
- 描述:获取应用程序的名称。
- 方法:
Application Type
- 方法:
getApplicationType()
- 描述:获取应用程序的类型(例如 MapReduce、Spark)。
- 方法:
User
- 方法:
getUser()
- 描述:获取提交应用程序的用户。
- 方法:
Queue
- 方法:
getQueue()
- 描述:获取应用程序运行所在的队列。
- 方法:
YARN Application State
- 方法:
getYarnApplicationState()
- 描述:获取应用程序的当前状态。
- 方法:
Final Application Status
- 方法:
getFinalApplicationStatus()
- 描述:获取应用程序的最终状态(例如 SUCCEEDED、FAILED、KILLED)。
- 方法:
Progress
- 方法:
getProgress()
- 描述:获取应用程序的进度,范围是 0 到 100。
- 方法:
Tracking URL
- 方法:
getTrackingUrl()
- 描述:获取用于跟踪应用程序状态的 URL。
- 方法:
Diagnostics
- 方法:
getDiagnostics()
- 描述:获取应用程序的诊断信息,通常在应用程序失败时提供。
- 方法:
Start Time
- 方法:
getStartTime()
- 描述:获取应用程序的启动时间。
- 方法:
Finish Time
- 方法:
getFinishTime()
- 描述:获取应用程序的完成时间。
- 方法:
Application Resource Usage Report
- 方法:
getApplicationResourceUsageReport()
- 描述:获取应用程序的资源使用报告,包括内存和 CPU 的使用情况。
- 方法:
Host
- 方法:
getHost()
- 描述:获取应用程序运行所在的主机。
- 方法:
RPC Port
- 方法:
getRpcPort()
- 描述:获取应用程序 RPC 端口。
- 方法:
AM Container Logs
- 方法:
getAMContainerLogs()
- 描述:获取应用程序主程序(Application Master)容器的日志 URL。
- 方法:
Application Priority
- 方法:
getPriority()
- 描述:获取应用程序的优先级。
- 方法:
# 使用示例
以下是一个示例代码,展示了如何获取并打印 ApplicationReport
中的详细信息:
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.conf.Configuration;
public class ApplicationReportExample {
public static void main(String[] args) {
try {
// 这里为了代码不报错,自动创建了一个applicationId,实际应用时需要获取实际applicationId
ApplicationId applicationId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
// 创建并初始化 YarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
Configuration yarnConf = new Configuration();
yarnClient.init(yarnConf);
yarnClient.start();
// 获取应用程序的 ApplicationReport
ApplicationReport report = yarnClient.getApplicationReport(applicationId);
// 打印应用程序的 ID
System.out.println("Application ID: " + report.getApplicationId());
// 打印应用程序的名称
System.out.println("Application Name: " + report.getName());
// 打印应用程序的类型
System.out.println("Application Type: " + report.getApplicationType());
// 打印提交应用程序的用户
System.out.println("User: " + report.getUser());
// 打印应用程序运行所在的队列
System.out.println("Queue: " + report.getQueue());
// 打印应用程序的当前 YARN 状态
System.out.println("YARN Application State: " + report.getYarnApplicationState());
// 打印应用程序的最终状态
System.out.println("Final Application Status: " + report.getFinalApplicationStatus());
// 打印应用程序的进度
System.out.println("Progress: " + report.getProgress());
// 打印跟踪应用程序状态的 URL
System.out.println("Tracking URL: " + report.getTrackingUrl());
// 打印应用程序的诊断信息
System.out.println("Diagnostics: " + report.getDiagnostics());
// 打印应用程序的启动时间
System.out.println("Start Time: " + report.getStartTime());
// 打印应用程序的完成时间
System.out.println("Finish Time: " + report.getFinishTime());
// 打印应用程序运行所在的主机
System.out.println("Host: " + report.getHost());
// 打印应用程序的 RPC 端口
System.out.println("RPC Port: " + report.getRpcPort());
// 打印应用程序的优先级
System.out.println("Application Priority: " + report.getPriority());
// 打印应用程序的资源使用报告(如果可用)
if (report.getApplicationResourceUsageReport() != null) {
System.out.println("Memory Seconds: " + report.getApplicationResourceUsageReport().getMemorySeconds());
System.out.println("Vcore Seconds: " + report.getApplicationResourceUsageReport().getVcoreSeconds());
}
// 停止 YarnClient
yarnClient.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 完整提交代码
# java代码
@Test
void spark2SubmitTask() {
// 设置 Hadoop 用户名
System.setProperty("HADOOP_USER_NAME", "hdfs");
// 创建 Spark 配置对象并设置应用名称
SparkConf sparkConf = new SparkConf().setAppName("SparkSubmitter");
// 设置 Spark 提交模式为集群模式
sparkConf.set("spark.submit.deployMode", "cluster");
// 设置 Spark 驱动程序的额外 Java 选项
sparkConf.set("spark.driver.extraJavaOptions", "-Dhdp.version=3.1.5.0-152");
// 设置 Spark Master 为 YARN
sparkConf.set("spark.master", "yarn");
// 设置 Spark YARN JAR 包的路径,即依赖的jar包
sparkConf.set("spark.yarn.jars", "hdfs:///spark-jars/*");
// 设置 Spark Executor 的实例数量
sparkConf.set("spark.executor.instances", "5");
// 设置 Spark 驱动程序的内存
sparkConf.set("spark.driver.memory", "5g");
// 设置 Spark 驱动程序的核心数
sparkConf.set("spark.driver.cores", "5");
// 设置 Spark Executor 的内存
sparkConf.set("spark.executor.memory", "6g");
// 设置 Spark Executor 的核心数
sparkConf.set("spark.executor.cores", "6");
// 设置 Spark YARN 队列名称
sparkConf.set("spark.yarn.queue", "default");
// 创建提交参数列表
List<String> submitArgs = new ArrayList<>();
submitArgs.add("--jar");
// 我们自己程序的代码,这里使用Spark自带的Example进行测试,
submitArgs.add("hdfs:///spark-example/spark-examples_2.11-2.3.2.3.1.5.0-152.jar");
submitArgs.add("--class");
submitArgs.add("org.apache.spark.examples.SparkPi");
submitArgs.add("--arg");
submitArgs.add("100");
// 创建客户端参数对象
ClientArguments clientArguments = new ClientArguments(submitArgs.toArray(new String[0]));
// 记录提交的应用参数日志
log.info("Submitting application with arguments: " + Arrays.toString(submitArgs.toArray()));
// 获取 YARN 配置对象
Configuration yarnConf = SparkHadoopUtil.get().newConfiguration(sparkConf);
// 创建并初始化 YARN 客户端
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
Client client = new Client(clientArguments, sparkConf);
yarnClient.init(yarnConf);
yarnClient.start();
// 提交应用程序并获取应用 ID
ApplicationId applicationId = client.submitApplication();
log.info("Application submitted successfully with Application ID: " + applicationId);
boolean isRunning = true;
// 循环检查应用程序状态
while (isRunning) {
ApplicationReport report = yarnClient.getApplicationReport(applicationId);
YarnApplicationState state = report.getYarnApplicationState();
// 输出应用程序报告信息
System.out.println("Application ID: " + report.getApplicationId());
System.out.println("Application Name: " + report.getName());
System.out.println("Application Type: " + report.getApplicationType());
System.out.println("User: " + report.getUser());
System.out.println("Queue: " + report.getQueue());
System.out.println("YARN Application State: " + report.getYarnApplicationState());
System.out.println("Final Application Status: " + report.getFinalApplicationStatus());
System.out.println("Progress: " + report.getProgress());
System.out.println("Tracking URL: " + report.getTrackingUrl());
System.out.println("Diagnostics: " + report.getDiagnostics());
System.out.println("Start Time: " + report.getStartTime());
System.out.println("Finish Time: " + report.getFinishTime());
System.out.println("Host: " + report.getHost());
System.out.println("RPC Port: " + report.getRpcPort());
System.out.println("Application Priority: " + report.getPriority());
if (report.getApplicationResourceUsageReport() != null) {
System.out.println("Memory Seconds: " + report.getApplicationResourceUsageReport().getMemorySeconds());
System.out.println("Vcore Seconds: " + report.getApplicationResourceUsageReport().getVcoreSeconds());
}
// 根据应用程序状态执行相应的操作
switch (state) {
case FINISHED:
if (report.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
log.info("Application completed successfully.");
} else {
log.error("Application finished with status: " + report.getFinalApplicationStatus());
}
isRunning = false;
break;
case KILLED:
log.error("Application was killed.");
isRunning = false;
break;
case FAILED:
log.error("Application failed with status: " + report.getFinalApplicationStatus());
isRunning = false;
break;
default:
log.info("Application is still in state: " + state);
Thread.sleep(10000); // 等待一段时间后再次检查状态
}
}
// 停止客户端
client.stop();
} catch (Exception e) {
log.error("Spark Application failed: " + e.getMessage(), e);
throw new RuntimeException("Spark Application failed", e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# pom.yml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>hadoop-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-reload4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.27</version>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# 注意事项
# 依赖注意
sparkConf.set("spark.yarn.jars", "hdfs:///spark-jars/*");
hdfs:///spark-jars/*
是我们项目需要用到的依赖,将这些依赖上传到HDFS中,依赖可以从以下两个渠道上传:
- Spark自带的jars目录,原生Spark对应的目录就是$SPARK_HOME/jars;Ambari对应的目录是/usr/hdp/3.1.5.0-152/spark2/jars。将这些依赖可以都传上去
- 项目代码中将我们用到的所有依赖都打包上传到HDFS中
tip:
无论那种上传到服务器后,运行程序,都有可能出现JAR包冲突,根据冲突问题自行解决就可以了。
# 问题记录
# bad substitution
问题日志
24/05/24 20:42:19 WARN yarn.YarnAllocator: Container marked as failed: container_e06_1710811188190_0193_01_000005 on host: ip--199. Exit status: 1. Diagnostics: [2024-05-24 20:42:17.341]Exception from container-launch.
Container id: container_e06_1710811188190_0193_01_000005
Exit code: 1
[2024-05-24 20:42:17.345]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
/hadoop/yarn/local/usercache/hdfs/appcache/application_1710811188190_0193/container_e06_1710811188190_0193_01_000005/launch_container.sh: line 33: $PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:/usr/hdp/3.1.5.0-152/hadoop/*:/usr/hdp/3.1.5.0-152/hadoop/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:$PWD/__spark_conf__/__hadoop_conf__: bad substitution
[2024-05-24 20:42:17.349]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
/hadoop/yarn/local/usercache/hdfs/appcache/application_1710811188190_0193/container_e06_1710811188190_0193_01_000005/launch_container.sh: line 33: $PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:/usr/hdp/3.1.5.0-152/hadoop/*:/usr/hdp/3.1.5.0-152/hadoop/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:$PWD/__spark_conf__/__hadoop_conf__: bad substitution
2
3
4
5
6
7
8
9
10
11
原因
程序找不到hdp.version
环境变量
解决方法
通过spark.driver.extraJavaOptions
设置版本
// Ambari 版本需要设置,否则会报错
sparkConf.set("spark.driver.extraJavaOptions", "-Dhdp.version=3.1.5.0-152");
2
# 项目源码位置
All-In-One/spark/springboot-submit-spark