使用Java nio实现快速合并大量csv文件

需求描述

某项目有一个将小csv文件合并为大csv文件的场景:典型情况每台设备每天产生1440个csv文件(即每分钟一个文件),每个文件大小约100KB,需要将它们合并为一个csv文件。

这样的设备大约有2000台,也就是每天288万个csv文件,按设备合并后应得到2000个csv文件。

问题分析

如果用Apache Commons CSV依次读取小文件再写入大文件,每台设备大约需要20~30秒,2000台设备需要10小时以上,时间太长无法接受。

通过查看样例数据发现,每个设备每天的csv文件表头是完全相同的,跨设备或跨天则不保证表头相同。恰好合并的规则也是按设备按天,因此想到可以利用数据的这个特点优化合并效率。

Java的nio包提供了文件通道(FileChannel)访问文件的方法,允许在两个文件通道间直接传输数据(transferTo),省去了数据在硬件、内核态和用户态之间多次复制的开销(零拷贝)。同时FileChannel允许跳到文件指定位置进行读取,我们可以利用这一点跳过csv的表头区域。

代码实现

以下代码封装了合并csv的逻辑,经测试在普通服务器SAS硬盘下合并1440个文件耗时约1秒,对比之前的20秒提升还是很明显的。

package com.acme;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.channels.FileChannel;

public class MergeCsvDemo {

    final static Logger logger = LoggerFactory.getLogger(MergeCsvDemo.class.getName());

    /**
     * Merge given csv files into one file.
     * @param srcFiles Source csv files, must have same header
     * @param destFile Destination csv file
     * @throws UserException
     */
    public static void mergeCsvFiles(File[] srcFiles, File destFile) throws UserException {

        if (destFile.exists()) {
            throw new UserException("Destination file already exists: " + destFile.getPath());
        }
        if (!destFile.getPath().toUpperCase().contains("CSV")) {
            throw new UserException("Only csv file is allowed: " + destFile.getPath());
        }
        if (srcFiles.length == 0) {
            throw new UserException("Please specify at least one source file");
        }
        if (!destFile.getParentFile().exists()) {
            destFile.getParentFile().mkdirs();
        }

        try {

            // 获取源文件表头长度(假设每个源文件表头相同)
            int headerLength = 0;
            BufferedReader br = new BufferedReader(new FileReader(srcFiles[0]));
            String line = br.readLine();
            if (line == null) {
                throw new UserException("Empty source file: " + srcFiles[0]);
            }
            headerLength = line.length();
            br.close();

            // 合并文件
            FileChannel destChannel = new FileOutputStream(destFile, true).getChannel();
            for (int i = 0; i < srcFiles.length; i++) {
                FileChannel srcChannel = new FileInputStream(srcFiles[i]).getChannel();
                // 非第一个文件时,跳过表头
                if (i > 0) {
                    srcChannel.position(headerLength);
                }
                destChannel.transferFrom(srcChannel, destChannel.size(), srcChannel.size());
                srcChannel.close();
            }
            destChannel.close();

        } catch (IOException e) {
            e.printStackTrace();
            throw new UserException(e.getMessage());
        }

    }
}

示例下载

为了减小压缩后的文件尺寸,示例里的csv文件是经过脱敏、截取和复制处理的,执行测试用例即可合并80个样例csv文件,合并后约370MB,耗时约0.6秒。

merge-csv-demo.zip

HBase单机版和集群版安装配置

HBase的官方文档十分详细,按照quickstart介绍的步骤安装就可以完成,配置集群版时需要稍微留意一下与hadoop相关的几个配置项。本文记录了自己在某项目集成环境安装HBase集群版的过程。

一、环境信息

> uname -a
Linux node1 3.10.0-1160.el7.x86_64 #1 SMP Mon Oct 19 16:18:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

> java --version
openjdk 11.0.15 2022-04-19

> hbase version
HBase 2.5.1

二、单机版

HBase单机版的安装还是十分简单的。

下载HBase

国内建议用清华镜像比较快:

> wget https://mirrors.tuna.tsinghua.edu.cn/apache/hbase/2.5.1/hbase-2.5.1-bin.tar.gz --no-check-certificate

注:下载了几次2.4.15版解压缩时都报unexpected end of file错误,改为下载2.5.1版。

解压缩到指定目录

这里以安装到/usr/local为例:

> tar zxvf hbase-2.5.1-bin.tar.gz -C /usr/local/

启动HBase

启动HBase服务,成功后可以看到HMaster进程和HRegionServer进程,浏览器访问 http://localhost:16010 可以进入HBase Web界面,bin/hbase shell可进入命令行。

> bin/start-hbase.sh
running master, logging to /usr/local/hbase-2.5.1/bin/../logs/hbase-root-master-node1.out
: running regionserver, logging to /usr/local/hbase-2.5.1/bin/../logs/hbase-root-regionserver-node1.out

> jps
55825 HMaster
56062 HRegionServer

> bin/hbase shell
hbase:001:0>

如果提示Could not start ZK at requested port of 2181,修改hbase-site.xml设置hbase.cluster.distributed=true,这样其实HBase的运行模式是伪分布式(而非单机模式),即HMaster与HRegionServer运行在同一物理机的不同jvm进程。

停止HBase

> bin/stop-hbase.sh

停止后用jps命令确认看HMaster和HRegionServer进程是否已结束。

三、集群版

HBase单机版一般无法支撑生产环境的需要,这里介绍如何在单机版基础上配置集群版,这里以三台服务器为例,其中节点1和节点2作为HMaster(主备),节点1、节点2和节点3同时作为RegionServer。

配置ssh免密

主节点需要能够免密登录到其他节点,备份主节点也需要能够免密登录到其他节点。

> ssh-keygen
> ssh-copy-id root@node2
> ssh-copy-id root@node3

修改hbase-site.xml

HBase默认的hbase-site.xml内容是单机版配置,只适合测试和学习使用,现在需要改为集群版的配置。一是修改运行模式为集群模式;二是将HBase的数据目录由本地目录改为HDFS目录,其中host和port应参考core-site.xml里配置的fs.defaultFS的值:

> vi hbase-site.xml
<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>
<property>
  <name>hbase.rootdir</name>
  <value>hdfs://namenode1:8020/hbase</value>
</property>

同时删除单机版需要的hbase.tmp.dirhbase.unsafe.stream.capability.enforce这两个配置项。

此时重新启动HBase后,会自动在hdfs上创建hbase目录:

[root@node1 hbase-2.5.1]# hdfs dfs -ls /hbase
Found 12 items
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/.hbck
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/.tmp
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/MasterData
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/WALs
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/archive
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/corrupt
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/data
-rw-r--r--   3 hdfs supergroup         42 2022-01-05 13:34 /hbase/hbase.id
-rw-r--r--   3 hdfs supergroup          7 2022-01-05 13:34 /hbase/hbase.version
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/mobdir
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/oldWALs
drwx--x--x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/staging

修改regionservers

修改regionservers文件,每行填写一台服务器的名称:

> vi conf/regionservers
node1
node2
node3

创建备份HMaster

生产环境一般需要启动在两个节点上启动HMaster以提供高可用能力。在conf目录下创建backup-masters文件,里面填写备份HMaster服务器的名称:

> vi conf/backup-masters
node2

配置zookeeper

在hbase-site.xml里配置zookeeper:

> vi hbase-site.xml
<property>
  <name>hbase.zookeeper.quorum</name>
  <value>node1,node2,node3</value>
</property>
<property>
  <name>hbase.zookeeper.property.dataDir</name>
  <value>/usr/local/zookeeper</value>
</property>

配置从节点

将整个目录复制到其他从节点,我习惯用rsync比较快,如果没有装rsync也可以用scp命令:

> rsync -avr /usr/local/hbase-2.5.1 root@node2:/usr/local
> rsync -avr /usr/local/hbase-2.5.1 root@node3:/usr/local

启动HBase集群

在主节点上启动集群:

> bin/start-hbase.sh

启动后在各节点上执行jps检查HMaster和HRegionServer进程是否存在,如果启动失败查看日志以便定位原因。

四、常见问题

HMaster启动了,但所有RegionServer都没启动

检查日志文件发现HMaster在等待RegionServer,而RegionServer日志里则没有任何信息。

> tail logs/hbase-root-master-node1.out
2022-01-05 17:36:36,007 INFO  [master/zookeeper-2:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(805)) - Waiting on regionserver count=0; waited=862160ms, expecting min=1 server(s), max=NO_LIMIT server(s), timeout=4500ms, lastChange=862160ms
2022-01-05 17:36:37,512 INFO  [master/zookeeper-2:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(805)) - Waiting on regionserver count=0; waited=863665ms, expecting min=1 server(s), max=NO_LIMIT server(s), timeout=4500ms, lastChange=863665ms

向前翻日志发现有警告JAVA_HOME没有配置(但在命令行里echo $JAVA_HOME其实有值),在conf/hbase-env.sh里再指定一次并重启HBase问题解决。

> vi conf/hbase-env.sh
# The java implementation to use.  Java 1.8+ required.
export JAVA_HOME=/home/k2data/jdk-11

RegionServer报错ClockOutOfSyncException

如果主节点与从节点的时钟不同步,RegionServer会报错退出,可以手动更新时间错误的节点。生产环境建议ntpdate或chrony保持时钟同步。

停止HBase服务时提示"no hbase master found"

原因是记录hbase各个服务的pid文件默认放在/tmp目录下,由于某些原因文件丢失了。可以手工在每个节点jps查看hbase服务(H开头的)的进程id,然后kill掉。

SLF4J提示找到多个实现

启动hbase或在hbase shell里执行命令时总是出现如下的slf4j警告提示:

[root@taos-1 target]# start-hbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hdfs/hadoop-3.2.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hbase-2.5.1/lib/client-facing-thirdparty/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.

一般是与hadoop自带的log4j2版本不一致导致的,按提示中的路径删除旧版的log4j2包即可

五、参考链接

https://hbase.apache.org/book.html#quickstart

Java读写本地Parquet格式数据文件

Apache Parquet是大数据平台里广泛使用的一种开源的列式文件存储格式,MapReduce和Spark等计算框架都内置了对读写Parquet文件的支持,通常Parquet文件放在HDFS上使用。

有时我们需要用Java直接读写本地的Parquet文件,在没有MapReduce或Spark等工具的情况下,要实现读写Parquet文件可以借助hadoop-clientparquet-hadoop这两个包实现。

一、依赖类库

首先需要在Java工程的pom.xml里添加下面的依赖项(引入hadoop-client会显著增大fat jar包的体积,但目前没有很好的替代方案):

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.7</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.10.1</version>
</dependency>

二、将数据写入Parquet文件

Parquet官方提供了org.apache.parquet.example.data.Group作为一条记录的对象,这里演示以此对象写入parquet文件的方法。为了简化示例代码,parquet文件里每一列的类型都使用整型。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.*;

import java.io.IOException;

/**
 * 示例程序:数据以 org.apache.parquet.example.data.Group 的形式写入Parquet文件
 */
public class WriteParquetDemoGroup {
    Configuration conf;

    public WriteParquetDemoGroup() {
        conf = new Configuration();
        conf.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        conf.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
    }

    public void writeParquet(int numRows, String[] fields, Path parquetPath) throws IOException {

        Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
        for (int j = 0; j < fields.length; j++) {
            schemaBuilder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, fields[j]));
        }
        MessageType schema = schemaBuilder.named("record");

        GroupWriteSupport.setSchema(schema, conf);
        GroupWriteSupport writeSupport = new GroupWriteSupport();
        writeSupport.init(conf);

        ParquetWriter<Group> writer = null;
        try {
            writer = new ParquetWriter<Group>(parquetPath,
                    writeSupport,
                    CompressionCodecName.SNAPPY,
                    ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
                    ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                    ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                    ParquetWriter.DEFAULT_WRITER_VERSION,
                    conf);

            for (int i = 0; i < numRows; i++) {
                Group group = new SimpleGroupFactory(schema).newGroup();
                for (int j = 0; j < fields.length; j++) {
                    group.add(fields[j], i * j);
                }
                writer.write(group);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

三、从Parquet文件读取数据

读取Parquet文件的代码本身很简单,只是要特别注意一点,为了能发挥Parquet列式存储的优势,应将要读取的列配置到PARQUET_READ_SCHEMA参数,以便跳过其他不需要扫描的列,从而提高读取性能。

public void readParquetWithReadSchema(Path parquetPath, String[] queryFields) throws IOException {
    // 将要读取的列配置到PARQUET_READ_SCHEMA,如果缺失这一步读取性能将严重降低
    Types.MessageTypeBuilder builder = Types.buildMessage();
    for (int j = 0; j < queryFields.length; j++) {
        builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, queryFields[j]));
    }
    MessageType messageType = builder.named("record");
    conf.set(ReadSupport.PARQUET_READ_SCHEMA, messageType.toString());

    // 读取Parquet文件
    GroupReadSupport readSupport = new GroupReadSupport();
    ParquetReader.Builder<Group> readerBuilder = ParquetReader.builder(readSupport, parquetPath);
    ParquetReader<Group> reader = readerBuilder.withConf(conf).build();
    Group line = null;
    while ((line = reader.read()) != null) {
        for (String field : queryFields) {
            line.getInteger(field, 0);
        }
    }
}

四、性能测试

写了一个简单的测试用例ParquetDemoTest,验证不同条件下上面的代码写入和读取Parquet文件的耗时。运行环境是普通笔记本电脑,i5 CPU + SSD硬盘。

写入性能:固定每行500列,可以看到写入parquet文件的耗时与写入的行数成正比;

读取性能:从parquet文件中读取少量列时速度是很快的,读取耗时与读取的列数成正比;

错误读取:当没有配置PARQUET_READ_SCHEMA时,读取多少列耗时都与读取500列差不多,未能体现列式存储的优势。

写入Parquet文件(Group), 100 行 x 500 列, 耗时 1580 ms
写入Parquet文件(Group), 500 行 x 500 列, 耗时 6927 ms
写入Parquet文件(Group), 1000 行 x 500 列, 耗时 12424 ms
写入Parquet文件(Group), 2000 行 x 500 列, 耗时 25849 ms
写入Parquet文件(Group), 3000 行 x 500 列, 耗时 36799 ms

读取Parquet文件(过滤列), 3000 行 x 5 列, 耗时 180 ms
读取Parquet文件(过滤列), 3000 行 x 10 列, 耗时 202 ms
读取Parquet文件(过滤列), 3000 行 x 15 列, 耗时 171 ms
读取Parquet文件(过滤列), 3000 行 x 50 列, 耗时 258 ms
读取Parquet文件(过滤列), 3000 行 x 100 列, 耗时 504 ms
读取Parquet文件(过滤列), 3000 行 x 200 列, 耗时 1608 ms
读取Parquet文件(过滤列), 3000 行 x 300 列, 耗时 2544 ms
读取Parquet文件(过滤列), 3000 行 x 400 列, 耗时 3998 ms
读取Parquet文件(过滤列), 3000 行 x 500 列, 耗时 6022 ms

读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6188 ms
读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6795 ms
读取Parquet文件(未过滤列), 3000 行 x 10 列, 耗时 6717 ms
读取Parquet文件(未过滤列), 3000 行 x 15 列, 耗时 6268 ms
读取Parquet文件(未过滤列), 3000 行 x 50 列, 耗时 6311 ms
读取Parquet文件(未过滤列), 3000 行 x 100 列, 耗时 7317 ms
读取Parquet文件(未过滤列), 3000 行 x 200 列, 耗时 6637 ms
读取Parquet文件(未过滤列), 3000 行 x 300 列, 耗时 6676 ms
读取Parquet文件(未过滤列), 3000 行 x 400 列, 耗时 7225 ms
读取Parquet文件(未过滤列), 3000 行 x 500 列, 耗时 6928 ms

五、代码下载

文中使用的代码:parquet-demo-1.0.0-src.zip

参考资料

https://www.arm64.ca/post/reading-parquet-files-java/

Python处理csv速度慢问题一例和解决

问题现象

一段python代码处理一个60万行csv文件耗时过长,从内存增长图和作业日志上看,处理此csv文件用了十几分钟,但在本地python命令行里测试读取200万行的csv(列数相同)文件只需要不到30秒,因此猜测是在pandas处理此文件数据时存在未优化的代码。

file

分析和解决

经过排查,发现是在讲csv文件里的时间戳转换为datetime类型时,消耗了大量时间,更换timestamp转datetime的函数:

# input[key]['k_ts'] = input[key]['k_ts'].apply(lambda x:pd.to_datetime(x, utc=True, unit='ms').tz_convert('Asia/Shanghai'))

input[key]['k_ts'] = input[key]['k_ts'].apply(lambda x:datetime.datetime.fromtimestamp(x/1000))

同时反方向的datetime转换到timestamp函数也做相应更改:

# output[key][ts_col] = output[key][ts_col].apply(lambda x:x.timestamp() * 1000).astype('int64')

output[key][ts_col] = output[key][ts_col].apply(lambda x:int(time.mktime(x.timetuple())*1e3 + x.microsecond/1e3))

修改后问题解决。

Flutter环境配置要点

修改pub镜像地址:

export PUB_HOSTED_URL=https://pub.flutter-io.cn
export FLUTTER_STORAGE_BASE_URL=https://storage.flutter-io.cn

修改项目中的build.gradle文件(2处):

repositories {
    //google()
    //jcenter()
    maven { url 'https://maven.aliyun.com/repository/google' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public' }
}

修改全局的flutter.gradle文件:

此修改在执行flutter upgrade后会被覆盖。

repositories {
    //google()
    //jcenter()
    maven { url 'https://maven.aliyun.com/repository/google' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public' }
    }

修改android sdk位置:

flutter config --android-sdk c:/android-sdk
flutter config --android-studio-dir c:/android-studio-4.1.1

若flutter build apk打包时报错:

使用flutter build apk --verbose查看原因
遇到的一种情况是asset里的图标造成的,需要执行下面的命令:

flutter build apk --verbose --no-tree-shake-icons

减小apk尺寸

1、打包时添加--analyze-size参数可查看各个依赖包的大小,视情况可以去掉或替换一些依赖包;

2、在Android Studio里安装CMake和NDK(版本号20.1.x,更高版本去掉了一些toolchain所以打包时报错缺少arm-linux-androideabi,见这个issue),这样打包时会stripping以便减小apk体积;

注:实际测试减尺寸效果不明显

3、分架构打包,目前主流机型是arm64-v8a:

flutter build apk --verbose --no-tree-shake-icons --split-per-abi

4、混淆代码
打包时加上--split-debug-info参数,起到混淆代码的作用,同时能够减小apk体积。实测从8.7MB减小到8.2MB。

flutter build apk --verbose --no-tree-shake-icons --split-per-abi --release --target-platform android-arm64 --split-debug-info=/tmp/myapp/symbols

CentOS 7.6内核SLAB泄露问题

公司产品使用docker-java控制作业在指定的运行环境(docker容器)里执行,因此经常需要反复启动和停止docker容器,在项目中发现存在一段时间(若干天)后启动容器失败的现象。

问题现象

多次启动docker容器并停止后,发现buff/cache内存占用不断增加,且手工释放没有明显效果:
file

file

内存不足的一个表现是无法启动docker容器,报错如下:

OCI runtime create failed: container_linux.go:346: starting container process caused "process_linux.go:319: getting the final child's pid from pipe caused \"EOF\"": unknown

查看meminfo判断主要是slab过多:
file

问题解决

在网上查询后发现有用户遇到类似问题,原因是kernal 3.10的kmem memory limit不稳定,但这个feature是默认开启的。解决方法是升级内核到kernal 3.10.1075以上,或升级CentOS发行版到7.8或以上。

补充:后来发现3.10版的内核仍然没有完全解决这个问题,升级到4.x版内核后问题解决。

参考资料

https://github.com/moby/moby/issues/37722
https://zhuanlan.zhihu.com/p/106757502

Java实现HDFS和Yarn的客户端

Java实现HDFS和Yarn的客户端的方法十分类似,因此放在一起记录。使用配置好的客户端在现场实施比较方便,而且能投同时兼容HA和非HA的HDFS和Yarn集群。

添加依赖项

pom.xml里添加下面的依赖项:

<dependencies>
    <!-- Hadoop Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.3</version>
    </dependency>

    <!-- Yarn Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-client</artifactId>
        <version>3.2.3</version>
    </dependency>
</dependencies>

定义配置项

客户端要对接到指定的Hadoop集群,最方便的方式是从集群先获取到core-site.xmlhdfs-site.xmlyarn-site.xml这三个配置文件(可联系集群管理员获取),前两者是HDFS客户端需要,后者是Yarn客户端需要。

然后将这些文件放在同一目录下,然后我们定义一个环境变量指向此目录,在客户端代码里用addResource()方法加载这三个配置文件。Spark使用的是HADOOP_CONF_DIR这个环境变量,为符合习惯我们也用这个环境变量。

HDFS客户端

实现HDFS客户端的代码很简单,下面的代码从HDFS下载指定的文件到本地同名文件:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.FileOutputStream;
import java.io.OutputStream;

public class HdfsClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
        conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
        FileSystem fs = FileSystem.get(conf);

        // Download file from HDFS to local
        FSDataInputStream in = fs.open(new Path("/tmp/10min_top.csv"));
        OutputStream out = new FileOutputStream("c:/temp/10min_top.csv");
        IOUtils.copyBytes(in, out, 4096, true);
    }
}

Yarn客户端

实现Yarn客户端的代码很简单,下面的代码连接到Yarn并输出所有application的基本信息:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.List;

public class YarnClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new YarnConfiguration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/yarn-site.xml"));

        System.out.println("yarn.resourcemanager.hostname: " + conf.get("yarn.resourcemanager.hostname"));

        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // List all applications from yarn
        List<ApplicationReport> applications = yarnClient.getApplications();
        for (ApplicationReport app : applications) {
            System.out.println(app.getApplicationId().getId() + ", " + app.getName() + ", " + app.getFinalApplicationStatus().name());
        }
        yarnClient.stop();
        yarnClient.close();
    }
}

上面代码执行结果示例:

yarn.resourcemanager.hostname: namenode1
16, k2a_job_1420, FAILED
17, k2a_job_1420, FAILED
14, k2a_job_1414, FAILED
15, k2a_job_1418, SUCCEEDED
12, k2a_job_1256, SUCCEEDED
13, k2a_job_1417, FAILED
10, k2a_job_1254, SUCCEEDED

代码下载

hdfs-client-demo.zip

hadoop-conf-files.zip

使用Netdata实时监控服务器资源

Netdata是一款轻量的服务器资源监控软件,与Promethues这样的套件相比,其在Linux下的安装部署十分简便,即装即用,本身占用的资源不算高(但默认只能记录1天左右的数据,要记录更长时间数据需要配置数据库)。Netdata还支持监控每个docker容器的资源占用,对使用docker作为底座的系统十分有用,可以更方便的找出有问题的容器。

在CentOS里安装netdata:

yum install netdata

安装后服务即自动启动。需要修改/etc/netdata/netdata.conf里的ip地址(默认值是localhost,阿里云环境需要修改为网卡的内网地址):

bind to xx.xx.xx.xx

重启netdata服务:

service netdata restart

必要时在防火墙上开启19999端口。

通过浏览器访问netdata地址http://xx.xx.xx.xx:19999(阿里云环境使用公网ip访问)

file

关于Apache Drill使用DIR0条件过滤分区的性能问题

问题描述

数据文件按设备号和日期(yyyyMMdd)两层分区,现在想查询指定设备下的所有数据,使用DIR0关键字过滤:

select * from dfs.`table1` where DIR0 = 'device0001'

发现planning时间很长,进一步检查日志发现是在planning阶段扫描了table1下的所有目录,而不是只扫描了device0001目录,而设备数量有8000多个,每个目录下有几百个parquet文件总共300万个文件左右,因此扫描的时间很长。
file

问题解决

通过跟踪DRILL代码发现,DRILL在planning开始的时候,需要创建一个Table对象并扫描此Table下的所有文件,并且未考虑DIR0指定的条件。

在drill的jira上查到相关bug记录:DRILL-2517DRILL-3996,后者2015年记录但至今仍未解决,因此短期内可能无法通过升级DRILL版本解决。

变通的解决方法是,将查询语句修改为:

select * from dfs.`table1/device0001`

即把DIR0里的条件放在FROM子句里,此时planning时间显著降低。

定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?