Java实现HDFS和Yarn的客户端

Java实现HDFS和Yarn的客户端的方法十分类似,因此放在一起记录。使用配置好的客户端在现场实施比较方便,而且能投同时兼容HA和非HA的HDFS和Yarn集群。

添加依赖项

pom.xml里添加下面的依赖项:

<dependencies>
    <!-- Hadoop Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.3</version>
    </dependency>

    <!-- Yarn Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-client</artifactId>
        <version>3.2.3</version>
    </dependency>
</dependencies>

定义配置项

客户端要对接到指定的Hadoop集群,最方便的方式是从集群先获取到core-site.xmlhdfs-site.xmlyarn-site.xml这三个配置文件(可联系集群管理员获取),前两者是HDFS客户端需要,后者是Yarn客户端需要。

然后将这些文件放在同一目录下,然后我们定义一个环境变量指向此目录,在客户端代码里用addResource()方法加载这三个配置文件。Spark使用的是HADOOP_CONF_DIR这个环境变量,为符合习惯我们也用这个环境变量。

HDFS客户端

实现HDFS客户端的代码很简单,下面的代码从HDFS下载指定的文件到本地同名文件:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.FileOutputStream;
import java.io.OutputStream;

public class HdfsClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
        conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
        FileSystem fs = FileSystem.get(conf);

        // Download file from HDFS to local
        FSDataInputStream in = fs.open(new Path("/tmp/10min_top.csv"));
        OutputStream out = new FileOutputStream("c:/temp/10min_top.csv");
        IOUtils.copyBytes(in, out, 4096, true);
    }
}

Yarn客户端

实现Yarn客户端的代码很简单,下面的代码连接到Yarn并输出所有application的基本信息:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.List;

public class YarnClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new YarnConfiguration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/yarn-site.xml"));

        System.out.println("yarn.resourcemanager.hostname: " + conf.get("yarn.resourcemanager.hostname"));

        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // List all applications from yarn
        List<ApplicationReport> applications = yarnClient.getApplications();
        for (ApplicationReport app : applications) {
            System.out.println(app.getApplicationId().getId() + ", " + app.getName() + ", " + app.getFinalApplicationStatus().name());
        }
        yarnClient.stop();
        yarnClient.close();
    }
}

上面代码执行结果示例:

yarn.resourcemanager.hostname: namenode1
16, k2a_job_1420, FAILED
17, k2a_job_1420, FAILED
14, k2a_job_1414, FAILED
15, k2a_job_1418, SUCCEEDED
12, k2a_job_1256, SUCCEEDED
13, k2a_job_1417, FAILED
10, k2a_job_1254, SUCCEEDED

代码下载

hdfs-client-demo.zip

hadoop-conf-files.zip