启动Docker容器超过100个后报错问题

环境信息

AMD EPYC 7551 32-Core Processor 64bit / 452GB / 4.0TB
Kylin Linux Advanced Server V10 (Sword)
Linux-419.90-24.4.v2101.ky10.x86 64-x86 64-with-kylin-10-Sword
Docker 19.03.9

问题现象

Docker容器超过一定数量后无法启动新的容器。

排查和解决

先检查目前正常运行的容器数量,有134个:

> docker ps| wc-l
134

查看了系统可用CPU和内存资源,发现可用16个CPU线程和24GB内存,可用资源还比较宽裕。

考虑到目前运行的容器已经比较多,首先怀疑是打开文件数量(open files)达到上限,检查系统设置值:

> ulimit -n
1024

因为其他类似环境里一般都设置为65535,所以用ulimit -n 65536将打开文件数上限改大,但问题并没有解决。

既然启动新的容器失败,可能docker daemon日志会有线索,查了一下果然发现有错误信息如下:

>  journalctl -ru docker.service
...
04月 26 14:39:35 k2box-211 dockerd[3016]: time="2023-04-26T14:39:35.212798766+08:00" level=error msg="stream copy error: reading from a closed fifo"
04月 26 14:39:35 k2box-211 dockerd[3016]: time="2023-04-26T14:39:35.212674036+08:00" level=error msg="stream copy error: reading from a closed fifo"
04月 26 14:39:26 k2box-211 dockerd[3016]: time="2023-04-26T14:39:26.731792766+08:00" level=warning msg="Health check for container 443a24419f9864ca13a6772c35bb7fe9eabcf317d670d7c73c0383e5473cb397 error: context deadline exceeded"
04月 26 14:39:02 k2box-211 dockerd[3016]: time="2023-04-26T14:39:02.706135654+08:00" level=warning msg="Health check for container d18b85908134d2c45869f68daf825d0c7f6541e77673e8c83a03d34a1c0a3ad9 error: OCI runtime exec failed: exec failed: container_linux.go:318: starting container process caused \"exec: \\\"curl\\\": executable file not found in $PATH\": unknown"
04月 26 14:39:02 k2box-211 dockerd[3016]: time="2023-04-26T14:39:02.699741119+08:00" level=error msg="stream copy error: reading from a closed fifo"
04月 26 14:39:02 k2box-211 dockerd[3016]: time="2023-04-26T14:39:02.699578169+08:00" level=error msg="stream copy error: reading from a closed fifo"
...

日志提示context deadline exceeded,这个错误信息虽然比较模糊,但从时间上看与尝试创建新容器的时间是吻合的,并且应该是某个阈值的限制。在网上google了一下没有找到相同的问题。

尝试停止了一个正在运行的容器,此时启动新容器能正常成功1个,再启动则会失败,因此确认了是容器数量达到了某个阈值,但为什么是134个?

经过仔细分析,发现这134个容器的启动方式有区别,其中34个是通过docker run命令行启动的,另外100个是通过docker-java以程序方式启动的(http接口),查看相关代码发现后者果然有限制:

DockerHttpClient httpClient = new ApacheDockerHttpClient.Builder()
    .dockerHost(config.getDockerHost())
    .sslConfig(config.getSSLConfig())
    .maxConnections(100)  // 这里对连接数量做了限制
    .connectionTimeout(Duration.ofSeconds(30))
    .responseTimeout(Duration.ofSeconds(45))
    .build();

至此定位到问题原因,解决方法是将maxConnections(100)改为maxConnections(1024)并打包更新到问题环境。

使用Docker部署PostgreSql高可用集群(RepMgr方案)

本文记录了某项目中使用docker部署PostgreSQL集群的步骤和注意事项,使用的镜像是bitnami/postgresql-repmgr,其中与用户身份有关的内容在其他bitnami打包的镜像中也适用。

一、环境信息

CentOS 7.9
PostgreSQL 16

二、部署方案

PostgreSQL集群有多种方案,这里我们使用在项目中多次使用比较稳定的repmgr方案,repmgr能够在集群环境管理每个PostgreSQL节点的主从状态,官网介绍如下:

repmgr是一个开源的工具套件,用于管理PostgreSQL服务器集群中的复制和故障转移。它通过提供设置备用服务器、监控复制和执行故障转移或手动切换操作等管理任务的工具,增强了PostgreSQL内置的热备功能。
repmgr自从PostgreSQL 9.0引入内置的复制机制后,就提供了高级的支持。当前的repmgr系列,repmgr 5,支持了PostgreSQL 9.3引入的最新的复制功能,如级联复制、时间线切换和通过复制协议进行基础备份。

bitnami提供的postgresql-repmgr镜像是将PostgreSQL与repmgr打包在一起,形成一个集成的解决方案,以便用户能够快速搭建PostgreSQL集群服务,用户可以从dockerhub获取到这个镜像。

这个PostgreSQL集群解决方案包括PostgreSQL复制管理器,一个用于管理PostgreSQL集群上的复制和故障转移的开源工具。

三、部署步骤

首先确认集群环境各个服务器节点的状态:

关闭防火墙(如果docker已启动需要重启docker服务,否则关闭防火墙后启动容器会报iptables命令错):

systemctl stop firewalld 
systemctl disable firewalld

确保服务器时钟准确:

ntpdate cn.ntp.org.cn

安装和启动docker服务和docker-compose工具:

yum install -y epel-release
yum install -y docker
yum install -y docker-compose
systemctl enable docker
systemctl start docker

安装postgresql-repmgr镜像:

docker pull bitnami/postgresql-repmgr

将各个服务器名称写入hosts文件,vi /etc/hosts添加下面内容:

10.102.9.80 pg-0
10.102.9.81 pg-1
10.102.9.82 pg-2

四、配置

数据目录

bitnami的镜像使用非root用户身份,即容器里的root用户映射到宿主机的非root用户,此用户是ID为1001的无名称用户。要让容器的数据能够持久化到宿主机,需要准备一个数据目录(此例中为/mnt/sda/bitnami/postgresql)并映射到容器内,此目录的owner是1001:

mkdir /mnt/sda/bitnami
mkdir /mnt/sda/bitnami/postgresql
chown 1001:root /mnt/sda/bitnami -R

网络

创建docker网络以便节点间能够通信:

docker network create --subnet=172.25.0.0/24 --gateway=172.25.0.1 pg-network

配置文件

在任意目录创建pg.yml文件,内容如下:

version: '2'
networks:
  default:
    external:
      name: pg-network
services:
  pg:
    container_name: "pg"
    image: bitnami/postgresql-repmgr:latest
    networks:
      default:
        ipv4_address: 172.25.0.110
    ports:
      - "5432:5432"
    restart: always
    volumes:
      - /mnt/sda/bitnami/postgresql:/bitnami/postgresql
      - /etc/hosts:/etc/hosts
    environment:
      - POSTGRESQL_POSTGRES_PASSWORD=adminpassword
      - POSTGRESQL_USERNAME=myuser
      - POSTGRESQL_PASSWORD=mypassword
      - POSTGRESQL_DATABASE=mydatabase
      - REPMGR_PASSWORD=adminpassword
      - REPMGR_PRIMARY_HOST=pg-0
      - REPMGR_PRIMARY_PORT=5432
      - REPMGR_PARTNER_NODES=pg-0,pg-1,pg-2:5432
      - REPMGR_NODE_NAME=pg-0
      - REPMGR_NODE_NETWORK_NAME=pg-0
      - REPMGR_PORT_NUMBER=5432

在第二个节点类似创建pg.yml文件,修改其中的部分内容(ip地址、以及2处节点名称,见代码中的标注)如下所示:

version: '2'
networks:
  default:
    external:
      name: pg-network
services:
  pg:
    container_name: "pg"
    image: bitnami/postgresql-repmgr:latest
    networks:
      default:
        ipv4_address: 172.25.0.111    <-- 修改了这里
    ports:
      - "5432:5432"
    restart: always
    volumes:
      - /mnt/sda/bitnami/postgresql:/bitnami/postgresql
      - /etc/hosts:/etc/hosts
    environment:
      - POSTGRESQL_POSTGRES_PASSWORD=adminpassword
      - POSTGRESQL_USERNAME=myuser
      - POSTGRESQL_PASSWORD=mypassword
      - POSTGRESQL_DATABASE=mydatabase
      - REPMGR_PASSWORD=adminpassword
      - REPMGR_PRIMARY_HOST=pg-0
      - REPMGR_PRIMARY_PORT=5432
      - REPMGR_PARTNER_NODES=pg-0,pg-1,pg-2:5432
      - REPMGR_NODE_NAME=pg-1    <-- 修改了这里
      - REPMGR_NODE_NETWORK_NAME=pg-1    <-- 修改了这里
      - REPMGR_PORT_NUMBER=5432

第三个节点的情况类似,为节约篇幅这里不再贴配置文件内容。

启动服务

在每个节点分别使用docker-compose命令启动服务:

docker-compose -f /root/pg-ha/pg.yml up -d

查看repmgr状态,例如当前primary节点是哪一个:

docker exec -ti pg /opt/bitnami/scripts/postgresql-repmgr/entrypoint.sh repmgr -f /opt/bitnami/repmgr/conf/repmgr.conf service status

 ID | Name | Role    | Status    | Upstream | repmgrd | PID | Paused? | Upstream last seen
----+------+---------+-----------+----------+---------+-----+---------+--------------------
 1000 | pg-0 | standby |   running | pg-1     | running | 1   | no      | 1 second(s) ago
 1001 | pg-1 | primary | * running |          | running | 1   | no      | n/a
 1002 | pg-2 | standby |   running | pg-1     | running | 1   | no      | 0 second(s) ago

尝试连接数据库,验证服务是否正常(-U参数很重要):

docker exec -ti pg psql -U myuser -d mydatabase

若需要手工切换standby节点为primary执行下面的命令,需要节点之间配置过免密:

docker exec -it pg /opt/bitnami/scripts/postgresql-repmgr/entrypoint.sh repmgr -f /opt/bitnami/repmgr/conf/repmgr.conf standby switchover

Linux inode和常用操作

Linux系统里的inode中文名索引文件,用于保存文件的大小、时间、权限等元信息。每个文件都对应一个inode,每个inode也对应一个文件。因为在Linux里目录、设备等也是文件,所以也对应一个inode。

inode与文件块是分开保存的,它们都会占用磁盘空间,通常情况下是后者先占满,但当磁盘上有大量小文件时,可能前者会先达到上限,从而导致磁盘虽然还有空间但无法写入新文件。这种场景应提前考虑增加inode的上限数量,例如将2KB降为1KB。

file
图片来源:Wikipedia

让我们做一些实验来验证一下inode的行为:

查看磁盘已用和可用inode数量

执行df -i可查看分区里inode数量,一个分区下最大inode数量是格式化时确定的,默认最大数量是分区大小除以2KB得到
file

查看文件inode信息

执行ls -i可查看当前目录每个文件的inode值:
file

验证软链接有自己的inode

软链接是一个文件,其inode值与链接目标文件是不相同的。
file

验证硬链接共享同一个inode

硬链接不是一个文件,其inode号与链接目标文件相同。其实很正常,linux内部对文件的管理都是通过inode完成的,文件名只是一个易读的符号。
file

验证.和..是硬链接

分别对应当前目录的inode和上一级目录的inode。
file

验证通过inode号反查文件名

find -inum命令:
file

文件名存储在哪里?

inode里并不保存文件名(否则硬链接无法实现),那么文件名存储在哪里?答案是存储在“目录文件”里,当我们ls时,Linux就查询目录文件(但我们不能直接cat目录),此文件包含此目录下所有文件名到inode的映射关系。
file

注:以上均默认ext2文件系统。

使用Java nio实现快速合并大量csv文件

需求描述

某项目有一个将小csv文件合并为大csv文件的场景:典型情况每台设备每天产生1440个csv文件(即每分钟一个文件),每个文件大小约100KB,需要将它们合并为一个csv文件。

这样的设备大约有2000台,也就是每天288万个csv文件,按设备合并后应得到2000个csv文件。

问题分析

如果用Apache Commons CSV依次读取小文件再写入大文件,每台设备大约需要20~30秒,2000台设备需要10小时以上,时间太长无法接受。

通过查看样例数据发现,每个设备每天的csv文件表头是完全相同的,跨设备或跨天则不保证表头相同。恰好合并的规则也是按设备按天,因此想到可以利用数据的这个特点优化合并效率。

Java的nio包提供了文件通道(FileChannel)访问文件的方法,允许在两个文件通道间直接传输数据(transferTo),省去了数据在硬件、内核态和用户态之间多次复制的开销(零拷贝)。同时FileChannel允许跳到文件指定位置进行读取,我们可以利用这一点跳过csv的表头区域。

代码实现

以下代码封装了合并csv的逻辑,经测试在普通服务器SAS硬盘下合并1440个文件耗时约1秒,对比之前的20秒提升还是很明显的。

package com.acme;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.channels.FileChannel;

public class MergeCsvDemo {

    final static Logger logger = LoggerFactory.getLogger(MergeCsvDemo.class.getName());

    /**
     * Merge given csv files into one file.
     * @param srcFiles Source csv files, must have same header
     * @param destFile Destination csv file
     * @throws UserException
     */
    public static void mergeCsvFiles(File[] srcFiles, File destFile) throws UserException {

        if (destFile.exists()) {
            throw new UserException("Destination file already exists: " + destFile.getPath());
        }
        if (!destFile.getPath().toUpperCase().contains("CSV")) {
            throw new UserException("Only csv file is allowed: " + destFile.getPath());
        }
        if (srcFiles.length == 0) {
            throw new UserException("Please specify at least one source file");
        }
        if (!destFile.getParentFile().exists()) {
            destFile.getParentFile().mkdirs();
        }

        try {

            // 获取源文件表头长度(假设每个源文件表头相同)
            int headerLength = 0;
            BufferedReader br = new BufferedReader(new FileReader(srcFiles[0]));
            String line = br.readLine();
            if (line == null) {
                throw new UserException("Empty source file: " + srcFiles[0]);
            }
            headerLength = line.length();
            br.close();

            // 合并文件
            FileChannel destChannel = new FileOutputStream(destFile, true).getChannel();
            for (int i = 0; i < srcFiles.length; i++) {
                FileChannel srcChannel = new FileInputStream(srcFiles[i]).getChannel();
                // 非第一个文件时,跳过表头
                if (i > 0) {
                    srcChannel.position(headerLength);
                }
                destChannel.transferFrom(srcChannel, destChannel.size(), srcChannel.size());
                srcChannel.close();
            }
            destChannel.close();

        } catch (IOException e) {
            e.printStackTrace();
            throw new UserException(e.getMessage());
        }

    }
}

示例下载

为了减小压缩后的文件尺寸,示例里的csv文件是经过脱敏、截取和复制处理的,执行测试用例即可合并80个样例csv文件,合并后约370MB,耗时约0.6秒。

merge-csv-demo.zip

Java读写本地Parquet格式数据文件

Apache Parquet是大数据平台里广泛使用的一种开源的列式文件存储格式,MapReduce和Spark等计算框架都内置了对读写Parquet文件的支持,通常Parquet文件放在HDFS上使用。

有时我们需要用Java直接读写本地的Parquet文件,在没有MapReduce或Spark等工具的情况下,要实现读写Parquet文件可以借助hadoop-clientparquet-hadoop这两个包实现。

一、依赖类库

首先需要在Java工程的pom.xml里添加下面的依赖项(引入hadoop-client会显著增大fat jar包的体积,但目前没有很好的替代方案):

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.7</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.10.1</version>
</dependency>

二、将数据写入Parquet文件

Parquet官方提供了org.apache.parquet.example.data.Group作为一条记录的对象,这里演示以此对象写入parquet文件的方法。为了简化示例代码,parquet文件里每一列的类型都使用整型。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.*;

import java.io.IOException;

/**
 * 示例程序:数据以 org.apache.parquet.example.data.Group 的形式写入Parquet文件
 */
public class WriteParquetDemoGroup {
    Configuration conf;

    public WriteParquetDemoGroup() {
        conf = new Configuration();
        conf.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        conf.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
    }

    public void writeParquet(int numRows, String[] fields, Path parquetPath) throws IOException {

        Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
        for (int j = 0; j < fields.length; j++) {
            schemaBuilder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, fields[j]));
        }
        MessageType schema = schemaBuilder.named("record");

        GroupWriteSupport.setSchema(schema, conf);
        GroupWriteSupport writeSupport = new GroupWriteSupport();
        writeSupport.init(conf);

        ParquetWriter<Group> writer = null;
        try {
            writer = new ParquetWriter<Group>(parquetPath,
                    writeSupport,
                    CompressionCodecName.SNAPPY,
                    ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
                    ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                    ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                    ParquetWriter.DEFAULT_WRITER_VERSION,
                    conf);

            for (int i = 0; i < numRows; i++) {
                Group group = new SimpleGroupFactory(schema).newGroup();
                for (int j = 0; j < fields.length; j++) {
                    group.add(fields[j], i * j);
                }
                writer.write(group);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

三、从Parquet文件读取数据

读取Parquet文件的代码本身很简单,只是要特别注意一点,为了能发挥Parquet列式存储的优势,应将要读取的列配置到PARQUET_READ_SCHEMA参数,以便跳过其他不需要扫描的列,从而提高读取性能。

public void readParquetWithReadSchema(Path parquetPath, String[] queryFields) throws IOException {
    // 将要读取的列配置到PARQUET_READ_SCHEMA,如果缺失这一步读取性能将严重降低
    Types.MessageTypeBuilder builder = Types.buildMessage();
    for (int j = 0; j < queryFields.length; j++) {
        builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, queryFields[j]));
    }
    MessageType messageType = builder.named("record");
    conf.set(ReadSupport.PARQUET_READ_SCHEMA, messageType.toString());

    // 读取Parquet文件
    GroupReadSupport readSupport = new GroupReadSupport();
    ParquetReader.Builder<Group> readerBuilder = ParquetReader.builder(readSupport, parquetPath);
    ParquetReader<Group> reader = readerBuilder.withConf(conf).build();
    Group line = null;
    while ((line = reader.read()) != null) {
        for (String field : queryFields) {
            line.getInteger(field, 0);
        }
    }
}

四、性能测试

写了一个简单的测试用例ParquetDemoTest,验证不同条件下上面的代码写入和读取Parquet文件的耗时。运行环境是普通笔记本电脑,i5 CPU + SSD硬盘。

写入性能:固定每行500列,可以看到写入parquet文件的耗时与写入的行数成正比;

读取性能:从parquet文件中读取少量列时速度是很快的,读取耗时与读取的列数成正比;

错误读取:当没有配置PARQUET_READ_SCHEMA时,读取多少列耗时都与读取500列差不多,未能体现列式存储的优势。

写入Parquet文件(Group), 100 行 x 500 列, 耗时 1580 ms
写入Parquet文件(Group), 500 行 x 500 列, 耗时 6927 ms
写入Parquet文件(Group), 1000 行 x 500 列, 耗时 12424 ms
写入Parquet文件(Group), 2000 行 x 500 列, 耗时 25849 ms
写入Parquet文件(Group), 3000 行 x 500 列, 耗时 36799 ms

读取Parquet文件(过滤列), 3000 行 x 5 列, 耗时 180 ms
读取Parquet文件(过滤列), 3000 行 x 10 列, 耗时 202 ms
读取Parquet文件(过滤列), 3000 行 x 15 列, 耗时 171 ms
读取Parquet文件(过滤列), 3000 行 x 50 列, 耗时 258 ms
读取Parquet文件(过滤列), 3000 行 x 100 列, 耗时 504 ms
读取Parquet文件(过滤列), 3000 行 x 200 列, 耗时 1608 ms
读取Parquet文件(过滤列), 3000 行 x 300 列, 耗时 2544 ms
读取Parquet文件(过滤列), 3000 行 x 400 列, 耗时 3998 ms
读取Parquet文件(过滤列), 3000 行 x 500 列, 耗时 6022 ms

读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6188 ms
读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6795 ms
读取Parquet文件(未过滤列), 3000 行 x 10 列, 耗时 6717 ms
读取Parquet文件(未过滤列), 3000 行 x 15 列, 耗时 6268 ms
读取Parquet文件(未过滤列), 3000 行 x 50 列, 耗时 6311 ms
读取Parquet文件(未过滤列), 3000 行 x 100 列, 耗时 7317 ms
读取Parquet文件(未过滤列), 3000 行 x 200 列, 耗时 6637 ms
读取Parquet文件(未过滤列), 3000 行 x 300 列, 耗时 6676 ms
读取Parquet文件(未过滤列), 3000 行 x 400 列, 耗时 7225 ms
读取Parquet文件(未过滤列), 3000 行 x 500 列, 耗时 6928 ms

五、代码下载

文中使用的代码:parquet-demo-1.0.0-src.zip

参考资料

https://www.arm64.ca/post/reading-parquet-files-java/

CentOS 7.6内核SLAB泄露问题

公司产品使用docker-java控制作业在指定的运行环境(docker容器)里执行,因此经常需要反复启动和停止docker容器,在项目中发现存在一段时间(若干天)后启动容器失败的现象。

问题现象

多次启动docker容器并停止后,发现buff/cache内存占用不断增加,且手工释放没有明显效果:
file

file

内存不足的一个表现是无法启动docker容器,报错如下:

OCI runtime create failed: container_linux.go:346: starting container process caused "process_linux.go:319: getting the final child's pid from pipe caused \"EOF\"": unknown

查看meminfo判断主要是slab过多:
file

问题解决

在网上查询后发现有用户遇到类似问题,原因是kernal 3.10的kmem memory limit不稳定,但这个feature是默认开启的。解决方法是升级内核到kernal 3.10.1075以上,或升级CentOS发行版到7.8或以上。

补充:后来发现3.10版的内核仍然没有完全解决这个问题,升级到4.x版内核后问题解决。

参考资料

https://github.com/moby/moby/issues/37722
https://zhuanlan.zhihu.com/p/106757502

关于Apache Drill使用DIR0条件过滤分区的性能问题

问题描述

数据文件按设备号和日期(yyyyMMdd)两层分区,现在想查询指定设备下的所有数据,使用DIR0关键字过滤:

select * from dfs.`table1` where DIR0 = 'device0001'

发现planning时间很长,进一步检查日志发现是在planning阶段扫描了table1下的所有目录,而不是只扫描了device0001目录,而设备数量有8000多个,每个目录下有几百个parquet文件总共300万个文件左右,因此扫描的时间很长。
file

问题解决

通过跟踪DRILL代码发现,DRILL在planning开始的时候,需要创建一个Table对象并扫描此Table下的所有文件,并且未考虑DIR0指定的条件。

在drill的jira上查到相关bug记录:DRILL-2517DRILL-3996,后者2015年记录但至今仍未解决,因此短期内可能无法通过升级DRILL版本解决。

变通的解决方法是,将查询语句修改为:

select * from dfs.`table1/device0001`

即把DIR0里的条件放在FROM子句里,此时planning时间显著降低。

定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?

开发PDI(Kettle) Step Plugin

Pentaho Data Integration (PDI)是著名的ETL工具Kettle的现用名,这个工具允许用户以图形化的方式构造数据处理流程,除了内置丰富的数据处理节点以外,还允许用户自定义开发自己的数据处理节点以便实现更复杂或更定制的处理逻辑,使用的开发语言是Java。

在某项目里为了实现一个项目特定的数据转换,我开发了一个这样的处理节点,整体感受还是十分流畅的,记录如下(代码略):

从github克隆项目:

git clone git@github.com:pentaho/pdi-sdk-plugins.git

里面包含的kettle-sdk-step-plugin模块即是一个例子,可以基于这个模块的代码按需修改。
开发完成后运行自带的三个测试用例,这样部署后成功率比较高。
打包命令仍然是:

mvn package

打好的是一个jar包,放在${pdi_path}/plugins/steps/<demo_plugin_name>/目录下即可,如果目录不存在可按需创建。

使用Apache Drill处理数据文件

本文针对drill版本1.8。

安装Drill

官网下载tar包并解压即可,linux和windows都是如此。
注意:drill要求java版本是8或以上。

命令行使用Drill

最简单的方式是用embedded模式启动drill(启动后可以在浏览器里访问 http://localhost:8047 来管理drill):

bin/drill-embedded

这样就以嵌入模式启动了drill服务并同时进入了内置的sqline命令行。如果drill服务已经启动,则可以这样进入sqline命令行(参考):

bin/sqline -u jdbc:drill:drillbit=localhost

作为例子,用SQL语法查询一下drill自带的数据(命令行里的cp表示classpath,注意查询语句最后要加分号结尾):

apache drill> SELECT * FROM cp.`employee.json` LIMIT 3;

查询任意数据文件的内容:

apache drill> SELECT * FROM dfs.`/home/hao/apache-drill-1.16.0/sample-data/region.parquet`;

退出命令行用!quit

配置和查看Drill参数

如果要永久性修改参数值,需要修改$DRILL_HOME/conf/drill-override.conf文件(见文档);SET、RESET命令可以在当前session里修改参数值(文档)。

配置参数:

SET `store.parquet.block-size` = 536870912

重置参数为缺省值:

RESET `store.parquet.block-size`

查看参数:

select * from sys.options where name = 'store.parquet.block-size'

在java代码里使用Drill

下面是在java代码里使用Drill的例子代码,要注意的一点是,JDBC的URL是jdbc:drill:drillbit=localhost,而不是很多教程上说的jdbc:drill:zk=localhost

package com.acme;

import java.sql.*;

public class DrillJDBCExample {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    //static final String DB_URL = "jdbc:drill:zk=localhost:2181";
    static final String DB_URL = "jdbc:drill:drillbit=localhost"; //for embedded mode installation

    static final String USER = "admin";
    static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL,USER,PASS);

            stmt = conn.createStatement();
            /* Perform a select on data in the classpath storage plugin. */
            String sql = "select employee_id,first_name,last_name from cp.`employee.json`";
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
                int id  = rs.getInt("employee_id");
                String first = rs.getString("first_name");
                String last = rs.getString("last_name");

                System.out.print("ID: " + id);
                System.out.print(", First: " + first);
                System.out.println(", Last: " + last);
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

让Drill访问数据库

根据要访问的数据库的不同,需要为Drill添加相应的驱动,方法见RDBMS Storage Plugin

利用Drill将csv格式转换为parquet格式

原理是在drill里创建一张格式为parquet的表,该表的路径(下例中的/parquet1)对应的是磁盘上的一个目录。

ALTER SESSION SET `store.format`='parquet';
ALTER SESSION SET `store.parquet.compression` = 'snappy';

CREATE TABLE dfs.tmp.`/parquet1` AS 
SELECT * FROM dfs.`/my/csv/file.csv`;

让drill支持.zip、.arc压缩格式

(暂缺)

参考资料

Drill in 10 Minutes
How to convert a csv file to parquet