Java实现HDFS和Yarn的客户端

Java实现HDFS和Yarn的客户端的方法十分类似,因此放在一起记录。使用配置好的客户端在现场实施比较方便,而且能投同时兼容HA和非HA的HDFS和Yarn集群。

添加依赖项

pom.xml里添加下面的依赖项:

<dependencies>
    <!-- Hadoop Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.3</version>
    </dependency>

    <!-- Yarn Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-client</artifactId>
        <version>3.2.3</version>
    </dependency>
</dependencies>

定义配置项

客户端要对接到指定的Hadoop集群,最方便的方式是从集群先获取到core-site.xmlhdfs-site.xmlyarn-site.xml这三个配置文件(可联系集群管理员获取),前两者是HDFS客户端需要,后者是Yarn客户端需要。

然后将这些文件放在同一目录下,然后我们定义一个环境变量指向此目录,在客户端代码里用addResource()方法加载这三个配置文件。Spark使用的是HADOOP_CONF_DIR这个环境变量,为符合习惯我们也用这个环境变量。

HDFS客户端

实现HDFS客户端的代码很简单,下面的代码从HDFS下载指定的文件到本地同名文件:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.FileOutputStream;
import java.io.OutputStream;

public class HdfsClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
        conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
        FileSystem fs = FileSystem.get(conf);

        // Download file from HDFS to local
        FSDataInputStream in = fs.open(new Path("/tmp/10min_top.csv"));
        OutputStream out = new FileOutputStream("c:/temp/10min_top.csv");
        IOUtils.copyBytes(in, out, 4096, true);
    }
}

Yarn客户端

实现Yarn客户端的代码很简单,下面的代码连接到Yarn并输出所有application的基本信息:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.List;

public class YarnClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new YarnConfiguration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/yarn-site.xml"));

        System.out.println("yarn.resourcemanager.hostname: " + conf.get("yarn.resourcemanager.hostname"));

        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // List all applications from yarn
        List<ApplicationReport> applications = yarnClient.getApplications();
        for (ApplicationReport app : applications) {
            System.out.println(app.getApplicationId().getId() + ", " + app.getName() + ", " + app.getFinalApplicationStatus().name());
        }
        yarnClient.stop();
        yarnClient.close();
    }
}

上面代码执行结果示例:

yarn.resourcemanager.hostname: namenode1
16, k2a_job_1420, FAILED
17, k2a_job_1420, FAILED
14, k2a_job_1414, FAILED
15, k2a_job_1418, SUCCEEDED
12, k2a_job_1256, SUCCEEDED
13, k2a_job_1417, FAILED
10, k2a_job_1254, SUCCEEDED

代码下载

hdfs-client-demo.zip

hadoop-conf-files.zip

Windows下使用hadoop

1、从apache下载hadoop,并解压缩,例如hadoop-2.7.3.tar.gz

2、在hadoop-env.cmd里修改设置JAVA_HOME和HADOOP_HOME

set JAVA_HOME="C:\Program Files\Java\jdk1.7.0_79"
set HADOOP_HOME="C:\install\hadoop-2.7.3"

注意如果JAVA_HOME有空格,要用双引号,否则提示JAVA_HOME incorrect set。

3、下载winutils

winutils包含winutils.exe和hadoop.dll,将这两个文件复制到hadoop/bin目录下,否则执行hadoop命令会提示如下错误:

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/St ring;JZ)V at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(Native Method) at org.apache.hadoop.util.NativeCrc32.calculateChunkedSumsByteArray(NativeCrc32.java:86) at org.apache.hadoop.util.DataChecksum.calculateChunkedSums(DataChecksum.java:430) at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:202)

下载地址:https://github.com/steveloughran/winutils

注意与所使用的hadoop版本要匹配。参考链接

4、查看远程hdfs文件列表(根目录)

hadoop fs -ls hdfs://192.168.130.100/

其他文件操作类似,可以执行hadoop fs命令查看。

5、在Eclipse里调试mapreduce程序

在eclipse里直接运行mapreduce程序时可能会提示:

ERROR [main] util.Shell (Shell.java:getWinUtilsPath(303)) - Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)

原因是没有配置HADOOP_HOME环境变量,在run configuration里加上,或者在windows系统环境变量里加上这个环境变量即可。参考

 

 

Hadoop安装配置和使用问题记录

关于Hadoop如何配置的教程网上已经有不少了,配合hadoop主页上的说明,基本可以顺利在多台机器上配置好hadoop集群。这里记录的是我在实际配置和使用hadoop时遇到的问题,其中一些属于hadoop周边,如ssh配置时容易遇到的问题和解决方法,供大家参考。我目前是在windows xp里通过cygwin安装的hadoop,版本是0.17.2.1。

Cygwin部分

1、安装cygwin。

cygwin.com下载网络安装包,在选择组件的时候建议直接把openssh组件选中,有些版本的cygwin似乎不会自动安装diffutils组件,所以需要手工选中一下,否则配置ssh时会提示缺少该组件包。

2、cygwin控制台里的缺省提示符不太习惯

export PS1="u@w$"设一下好多了。

3、配置sshd

比较简单,可以参考这个链接。在ssh-host-config时问到Should privilege separation be used? (yes/no)时回答no即可,如果由于安全因素要回答yes时,在启动sshd时有可能遇到Privilege separation user sshd does not exist的错误信息,解决方法可参考这个链接

4、配置ssh使用证书方式自动验证

这里花了一些时间,后来觉得可能是由于机器上有两个版本的cygwin的缘故,因为在另一台机器上安装就没有出现问题。在使用ssh-keygen -t rsa命令后提示输入密码时,直接按两次回车即可。公钥的复制等过程这里不再赘述。

Hadoop部分

5、在Eclipse里运行hadoop的WordCount程序时提示异常:javax.security.auth.login.LoginException: Login failed: CreateProcess: whoami error=2

解决方法是把c:\cygwin\bin加入系统的path环境变量,然后重启Eclipse以便改动生效。

6、在Eclipse里运行WordCount时遇到java heap size不够的异常

在运行配置里加入-Xms200M就可以解决。(hadoop的helloworld要求的内存比较多?)

7、当要运行的运算依赖第三方类库时的问题

这个链接有所讨论,但暂时没看到除了在命令行里使用hadoop jar命令以外的解决方法,比如在0.17.2和0.18.1版本里我都没有看到JobConf类里有类似addJar()这样的方法,在JobConf#setJar()里使用逗号分隔多个jar文件的方式则会报找不到文件的错误。解决方式可能有两个:

  • a) 把所需要的第三方jar文件复制到每个节点机器的jre里(暂时没有试验)

  • b) 把第三方jar包和自己的类打到一个包里。

update: 在网上找到另一种方式,通过DistributedCache实现,原文里可能有笔误,我试验正确的方法是 调用DistributedCache.addArchiveToClassPath()方法,注意其第一个参数必须是相对路径,如/test/lib /my.jar,而不能是像hdfs://192.168.0.5:47110/test/lib/my.jar这样的绝对路径。关于DistributedCache的说明在里有一些。

8、调试mapreduce程序的方式

这个链接里讲得比较清楚了,因为很有用所以特意重复一次。如果文件存放在HDFS里,那么只需要调用JobConf#.set("mapred.job.tracker", "local")即可;如果文件也是存在本地的,还需要调用JobConf#set("fs.default.name", "local")方法。我通常让文件存在HDFS里调试,因为要使用本地文件要么参数需要改变,要么代码需要改变,维护两个环境很麻烦。在程序里用System.out.println()输出的内容可以在datanode的hadoop安装路径的logs/userlogs目录里找到。

9、使用自定义InputFormat时获取xmi:id

使用EMF模型元素作为key的时候,需要注意并不是在代码的任何地方都能得到xmi:id的值的。具体来说,在WritableComparable#write()方法里能得到(前提是该对象本来就有resource,即eobj.eResource()!=null),而在WritableComparable#readFields()里是不能得到的,在RecordWriter#write()方法里同样不能得到,因为后两者的EMF元素对象都是反序列化得到的,已经不是内存里原来的那个实例了。

10、map进行到100%后,reduce过程进行到某个数值(如16%)后就不再继续,直到被hadoop强制关闭。

在tasknode的log里记录如下:

2008-11-20 11:17:06,455 INFO org.apache.hadoop.mapred.TaskTracker: task_200811191041_0015_r_000000_0 0.16666667% reduce > copy (6 of 12 at 0.00 MB/s) > 
2008-11-20 11:17:09,455 INFO org.apache.hadoop.mapred.TaskTracker: task_200811191041_0015_r_000000_0 0.16666667% reduce > copy (6 of 12 at 0.00 MB/s) > 
2008-11-20 11:17:15,455 INFO org.apache.hadoop.mapred.TaskTracker: task_200811191041_0015_r_000000_0 0.16666667% reduce > copy (6 of 12 at 0.00 MB/s) > 
2008-11-20 11:17:18,705 FATAL org.apache.hadoop.mapred.TaskTracker: Task: task_200811191041_0015_r_000000_0 - Killed due to Shuffle Failure: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out. 2008-11-20 11:17:18,705 INFO org.apache.hadoop.mapred.TaskTracker: About to purge task: task_200811191041_0015_r_000000_0 2008-11-20 11:17:18,705 INFO org.apache.hadoop.mapred.TaskRunner: task_200811191041_0015_r_000000_0 done; removing files. 2008-11-20 11:17:18,705 WARN org.apache.hadoop.mapred.TaskTracker: Unknown child task finshed: task_200811191041_0015_r_000000_0. Ignored. 2008-11-20 11:17:40,845 INFO org.apache.hadoop.mapred.TaskTracker: Received 'KillJobAction' for job: job_200811191041_0015 2008-11-20 11:17:40,845 INFO org.apache.hadoop.mapred.TaskRunner: task_200811191041_0015_m_000011_0 done; removing files. 2008-11-20 11:17:40,845 INFO org.apache.hadoop.mapred.TaskRunner: task_200811191041_0015_m_000005_0 done; removing files. 

在我的java application的控制台里的输入如下:

08/11/20 11:06:39 INFO mapred.JobClient:  map 96% reduce 11%
08/11/20 11:06:40 INFO mapred.JobClient:  map 100% reduce 11%
08/11/20 11:06:43 INFO mapred.JobClient:  map 100% reduce 13%
08/11/20 11:06:47 INFO mapred.JobClient:  map 100% reduce 16% (在这里停很久) 08/11/20 11:17:12 INFO mapred.JobClient:  map 100% reduce 0%
08/11/20 11:17:12 INFO mapred.JobClient: Task Id : task_200811191041_0015_r_000000_0, Status : FAILED
Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out. 08/11/20 11:17:14 WARN mapred.JobClient: Error reading task outputnode2 08/11/20 11:17:14 WARN mapred.JobClient: Error reading task outputnode2 08/11/20 11:17:25 INFO mapred.JobClient:  map 100% reduce 16%
08/11/20 11:17:30 INFO mapred.JobClient:  map 100% reduce 25%
08/11/20 11:17:31 INFO mapred.JobClient:  map 100% reduce 100%
08/11/20 11:17:32 INFO mapred.JobClient: Job complete: job_200811191041_0015

我想找到这个问题的所在了。是secondary name node所在的机器没有配置dfs.http.address这个参数,该参数在hadoop-default.xml里的缺省值是0.0.0.0:50070,应改为name node所在机器的ip地址。参考链接

11、一些参考链接。

http://hayesdavis.net/2008/06/14/running-hadoop-on-windows/
http://hi.baidu.com/shirdrn/blog/category/Hadoop
http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/index.html
http://blog.ring.idv.tw/comment.ser?i=231

搬家前链接:https://www.cnblogs.com/bjzhanghao/archive/2008/11/02/1325113.html