Java实现HDFS和Yarn的客户端的方法十分类似,因此放在一起记录。使用配置好的客户端在现场实施比较方便,而且能投同时兼容HA和非HA的HDFS和Yarn集群。
添加依赖项
在pom.xml
里添加下面的依赖项:
<dependencies>
<!-- Hadoop Client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.3</version>
</dependency>
<!-- Yarn Client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.2.3</version>
</dependency>
</dependencies>
定义配置项
客户端要对接到指定的Hadoop集群,最方便的方式是从集群先获取到core-site.xml
、hdfs-site.xml
和yarn-site.xml
这三个配置文件(可联系集群管理员获取),前两者是HDFS客户端需要,后者是Yarn客户端需要。
然后将这些文件放在同一目录下,然后我们定义一个环境变量指向此目录,在客户端代码里用addResource()
方法加载这三个配置文件。Spark使用的是HADOOP_CONF_DIR
这个环境变量,为符合习惯我们也用这个环境变量。
HDFS客户端
实现HDFS客户端的代码很简单,下面的代码从HDFS下载指定的文件到本地同名文件:
package com.acme;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.FileOutputStream;
import java.io.OutputStream;
public class HdfsClientDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir == null) {
System.err.println("HADOOP_CONF_DIR not set");
return;
}
conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
FileSystem fs = FileSystem.get(conf);
// Download file from HDFS to local
FSDataInputStream in = fs.open(new Path("/tmp/10min_top.csv"));
OutputStream out = new FileOutputStream("c:/temp/10min_top.csv");
IOUtils.copyBytes(in, out, 4096, true);
}
}
Yarn客户端
实现Yarn客户端的代码很简单,下面的代码连接到Yarn并输出所有application的基本信息:
package com.acme;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.List;
public class YarnClientDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new YarnConfiguration();
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir == null) {
System.err.println("HADOOP_CONF_DIR not set");
return;
}
conf.addResource(new Path(hadoopConfDir + "/yarn-site.xml"));
System.out.println("yarn.resourcemanager.hostname: " + conf.get("yarn.resourcemanager.hostname"));
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// List all applications from yarn
List<ApplicationReport> applications = yarnClient.getApplications();
for (ApplicationReport app : applications) {
System.out.println(app.getApplicationId().getId() + ", " + app.getName() + ", " + app.getFinalApplicationStatus().name());
}
yarnClient.stop();
yarnClient.close();
}
}
上面代码执行结果示例:
yarn.resourcemanager.hostname: namenode1
16, k2a_job_1420, FAILED
17, k2a_job_1420, FAILED
14, k2a_job_1414, FAILED
15, k2a_job_1418, SUCCEEDED
12, k2a_job_1256, SUCCEEDED
13, k2a_job_1417, FAILED
10, k2a_job_1254, SUCCEEDED