使用Java nio实现快速合并大量csv文件

需求描述

某项目有一个将小csv文件合并为大csv文件的场景:典型情况每台设备每天产生1440个csv文件(即每分钟一个文件),每个文件大小约100KB,需要将它们合并为一个csv文件。

这样的设备大约有2000台,也就是每天288万个csv文件,按设备合并后应得到2000个csv文件。

问题分析

如果用Apache Commons CSV依次读取小文件再写入大文件,每台设备大约需要20~30秒,2000台设备需要10小时以上,时间太长无法接受。

通过查看样例数据发现,每个设备每天的csv文件表头是完全相同的,跨设备或跨天则不保证表头相同。恰好合并的规则也是按设备按天,因此想到可以利用数据的这个特点优化合并效率。

Java的nio包提供了文件通道(FileChannel)访问文件的方法,允许在两个文件通道间直接传输数据(transferTo),省去了数据在硬件、内核态和用户态之间多次复制的开销(零拷贝)。同时FileChannel允许跳到文件指定位置进行读取,我们可以利用这一点跳过csv的表头区域。

代码实现

以下代码封装了合并csv的逻辑,经测试在普通服务器SAS硬盘下合并1440个文件耗时约1秒,对比之前的20秒提升还是很明显的。

package com.acme;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.channels.FileChannel;

public class MergeCsvDemo {

    final static Logger logger = LoggerFactory.getLogger(MergeCsvDemo.class.getName());

    /**
     * Merge given csv files into one file.
     * @param srcFiles Source csv files, must have same header
     * @param destFile Destination csv file
     * @throws UserException
     */
    public static void mergeCsvFiles(File[] srcFiles, File destFile) throws UserException {

        if (destFile.exists()) {
            throw new UserException("Destination file already exists: " + destFile.getPath());
        }
        if (!destFile.getPath().toUpperCase().contains("CSV")) {
            throw new UserException("Only csv file is allowed: " + destFile.getPath());
        }
        if (srcFiles.length == 0) {
            throw new UserException("Please specify at least one source file");
        }
        if (!destFile.getParentFile().exists()) {
            destFile.getParentFile().mkdirs();
        }

        try {

            // 获取源文件表头长度(假设每个源文件表头相同)
            int headerLength = 0;
            BufferedReader br = new BufferedReader(new FileReader(srcFiles[0]));
            String line = br.readLine();
            if (line == null) {
                throw new UserException("Empty source file: " + srcFiles[0]);
            }
            headerLength = line.length();
            br.close();

            // 合并文件
            FileChannel destChannel = new FileOutputStream(destFile, true).getChannel();
            for (int i = 0; i < srcFiles.length; i++) {
                FileChannel srcChannel = new FileInputStream(srcFiles[i]).getChannel();
                // 非第一个文件时,跳过表头
                if (i > 0) {
                    srcChannel.position(headerLength);
                }
                destChannel.transferFrom(srcChannel, destChannel.size(), srcChannel.size());
                srcChannel.close();
            }
            destChannel.close();

        } catch (IOException e) {
            e.printStackTrace();
            throw new UserException(e.getMessage());
        }

    }
}

示例下载

为了减小压缩后的文件尺寸,示例里的csv文件是经过脱敏、截取和复制处理的,执行测试用例即可合并80个样例csv文件,合并后约370MB,耗时约0.6秒。

merge-csv-demo.zip

Java读写本地Parquet格式数据文件

Apache Parquet是大数据平台里广泛使用的一种开源的列式文件存储格式,MapReduce和Spark等计算框架都内置了对读写Parquet文件的支持,通常Parquet文件放在HDFS上使用。

有时我们需要用Java直接读写本地的Parquet文件,在没有MapReduce或Spark等工具的情况下,要实现读写Parquet文件可以借助hadoop-clientparquet-hadoop这两个包实现。

一、依赖类库

首先需要在Java工程的pom.xml里添加下面的依赖项(引入hadoop-client会显著增大fat jar包的体积,但目前没有很好的替代方案):

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.7</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.10.1</version>
</dependency>

二、将数据写入Parquet文件

Parquet官方提供了org.apache.parquet.example.data.Group作为一条记录的对象,这里演示以此对象写入parquet文件的方法。为了简化示例代码,parquet文件里每一列的类型都使用整型。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.*;

import java.io.IOException;

/**
 * 示例程序:数据以 org.apache.parquet.example.data.Group 的形式写入Parquet文件
 */
public class WriteParquetDemoGroup {
    Configuration conf;

    public WriteParquetDemoGroup() {
        conf = new Configuration();
        conf.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        conf.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
    }

    public void writeParquet(int numRows, String[] fields, Path parquetPath) throws IOException {

        Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
        for (int j = 0; j < fields.length; j++) {
            schemaBuilder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, fields[j]));
        }
        MessageType schema = schemaBuilder.named("record");

        GroupWriteSupport.setSchema(schema, conf);
        GroupWriteSupport writeSupport = new GroupWriteSupport();
        writeSupport.init(conf);

        ParquetWriter<Group> writer = null;
        try {
            writer = new ParquetWriter<Group>(parquetPath,
                    writeSupport,
                    CompressionCodecName.SNAPPY,
                    ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
                    ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                    ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                    ParquetWriter.DEFAULT_WRITER_VERSION,
                    conf);

            for (int i = 0; i < numRows; i++) {
                Group group = new SimpleGroupFactory(schema).newGroup();
                for (int j = 0; j < fields.length; j++) {
                    group.add(fields[j], i * j);
                }
                writer.write(group);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

三、从Parquet文件读取数据

读取Parquet文件的代码本身很简单,只是要特别注意一点,为了能发挥Parquet列式存储的优势,应将要读取的列配置到PARQUET_READ_SCHEMA参数,以便跳过其他不需要扫描的列,从而提高读取性能。

public void readParquetWithReadSchema(Path parquetPath, String[] queryFields) throws IOException {
    // 将要读取的列配置到PARQUET_READ_SCHEMA,如果缺失这一步读取性能将严重降低
    Types.MessageTypeBuilder builder = Types.buildMessage();
    for (int j = 0; j < queryFields.length; j++) {
        builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, queryFields[j]));
    }
    MessageType messageType = builder.named("record");
    conf.set(ReadSupport.PARQUET_READ_SCHEMA, messageType.toString());

    // 读取Parquet文件
    GroupReadSupport readSupport = new GroupReadSupport();
    ParquetReader.Builder<Group> readerBuilder = ParquetReader.builder(readSupport, parquetPath);
    ParquetReader<Group> reader = readerBuilder.withConf(conf).build();
    Group line = null;
    while ((line = reader.read()) != null) {
        for (String field : queryFields) {
            line.getInteger(field, 0);
        }
    }
}

四、性能测试

写了一个简单的测试用例ParquetDemoTest,验证不同条件下上面的代码写入和读取Parquet文件的耗时。运行环境是普通笔记本电脑,i5 CPU + SSD硬盘。

写入性能:固定每行500列,可以看到写入parquet文件的耗时与写入的行数成正比;

读取性能:从parquet文件中读取少量列时速度是很快的,读取耗时与读取的列数成正比;

错误读取:当没有配置PARQUET_READ_SCHEMA时,读取多少列耗时都与读取500列差不多,未能体现列式存储的优势。

写入Parquet文件(Group), 100 行 x 500 列, 耗时 1580 ms
写入Parquet文件(Group), 500 行 x 500 列, 耗时 6927 ms
写入Parquet文件(Group), 1000 行 x 500 列, 耗时 12424 ms
写入Parquet文件(Group), 2000 行 x 500 列, 耗时 25849 ms
写入Parquet文件(Group), 3000 行 x 500 列, 耗时 36799 ms

读取Parquet文件(过滤列), 3000 行 x 5 列, 耗时 180 ms
读取Parquet文件(过滤列), 3000 行 x 10 列, 耗时 202 ms
读取Parquet文件(过滤列), 3000 行 x 15 列, 耗时 171 ms
读取Parquet文件(过滤列), 3000 行 x 50 列, 耗时 258 ms
读取Parquet文件(过滤列), 3000 行 x 100 列, 耗时 504 ms
读取Parquet文件(过滤列), 3000 行 x 200 列, 耗时 1608 ms
读取Parquet文件(过滤列), 3000 行 x 300 列, 耗时 2544 ms
读取Parquet文件(过滤列), 3000 行 x 400 列, 耗时 3998 ms
读取Parquet文件(过滤列), 3000 行 x 500 列, 耗时 6022 ms

读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6188 ms
读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6795 ms
读取Parquet文件(未过滤列), 3000 行 x 10 列, 耗时 6717 ms
读取Parquet文件(未过滤列), 3000 行 x 15 列, 耗时 6268 ms
读取Parquet文件(未过滤列), 3000 行 x 50 列, 耗时 6311 ms
读取Parquet文件(未过滤列), 3000 行 x 100 列, 耗时 7317 ms
读取Parquet文件(未过滤列), 3000 行 x 200 列, 耗时 6637 ms
读取Parquet文件(未过滤列), 3000 行 x 300 列, 耗时 6676 ms
读取Parquet文件(未过滤列), 3000 行 x 400 列, 耗时 7225 ms
读取Parquet文件(未过滤列), 3000 行 x 500 列, 耗时 6928 ms

五、代码下载

文中使用的代码:parquet-demo-1.0.0-src.zip

参考资料

https://www.arm64.ca/post/reading-parquet-files-java/

CentOS 7.6内核SLAB泄露问题

公司产品使用docker-java控制作业在指定的运行环境(docker容器)里执行,因此经常需要反复启动和停止docker容器,在项目中发现存在一段时间(若干天)后启动容器失败的现象。

问题现象

多次启动docker容器并停止后,发现buff/cache内存占用不断增加,且手工释放没有明显效果:
file

file

内存不足的一个表现是无法启动docker容器,报错如下:

OCI runtime create failed: container_linux.go:346: starting container process caused "process_linux.go:319: getting the final child's pid from pipe caused \"EOF\"": unknown

查看meminfo判断主要是slab过多:
file

问题解决

在网上查询后发现有用户遇到类似问题,原因是kernal 3.10的kmem memory limit不稳定,但这个feature是默认开启的。解决方法是升级内核到kernal 3.10.1075以上,或升级CentOS发行版到7.8或以上。

补充:后来发现3.10版的内核仍然没有完全解决这个问题,升级到4.x版内核后问题解决。

参考资料

https://github.com/moby/moby/issues/37722
https://zhuanlan.zhihu.com/p/106757502

关于Apache Drill使用DIR0条件过滤分区的性能问题

问题描述

数据文件按设备号和日期(yyyyMMdd)两层分区,现在想查询指定设备下的所有数据,使用DIR0关键字过滤:

select * from dfs.`table1` where DIR0 = 'device0001'

发现planning时间很长,进一步检查日志发现是在planning阶段扫描了table1下的所有目录,而不是只扫描了device0001目录,而设备数量有8000多个,每个目录下有几百个parquet文件总共300万个文件左右,因此扫描的时间很长。
file

问题解决

通过跟踪DRILL代码发现,DRILL在planning开始的时候,需要创建一个Table对象并扫描此Table下的所有文件,并且未考虑DIR0指定的条件。

在drill的jira上查到相关bug记录:DRILL-2517DRILL-3996,后者2015年记录但至今仍未解决,因此短期内可能无法通过升级DRILL版本解决。

变通的解决方法是,将查询语句修改为:

select * from dfs.`table1/device0001`

即把DIR0里的条件放在FROM子句里,此时planning时间显著降低。

定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?

开发PDI(Kettle) Step Plugin

Pentaho Data Integration (PDI)是著名的ETL工具Kettle的现用名,这个工具允许用户以图形化的方式构造数据处理流程,除了内置丰富的数据处理节点以外,还允许用户自定义开发自己的数据处理节点以便实现更复杂或更定制的处理逻辑,使用的开发语言是Java。

在某项目里为了实现一个项目特定的数据转换,我开发了一个这样的处理节点,整体感受还是十分流畅的,记录如下(代码略):

从github克隆项目:

git clone git@github.com:pentaho/pdi-sdk-plugins.git

里面包含的kettle-sdk-step-plugin模块即是一个例子,可以基于这个模块的代码按需修改。
开发完成后运行自带的三个测试用例,这样部署后成功率比较高。
打包命令仍然是:

mvn package

打好的是一个jar包,放在${pdi_path}/plugins/steps/<demo_plugin_name>/目录下即可,如果目录不存在可按需创建。

使用Apache Drill处理数据文件

本文针对drill版本1.8。

安装Drill

官网下载tar包并解压即可,linux和windows都是如此。
注意:drill要求java版本是8或以上。

命令行使用Drill

最简单的方式是用embedded模式启动drill(启动后可以在浏览器里访问 http://localhost:8047 来管理drill):

bin/drill-embedded

这样就以嵌入模式启动了drill服务并同时进入了内置的sqline命令行。如果drill服务已经启动,则可以这样进入sqline命令行(参考):

bin/sqline -u jdbc:drill:drillbit=localhost

作为例子,用SQL语法查询一下drill自带的数据(命令行里的cp表示classpath,注意查询语句最后要加分号结尾):

apache drill> SELECT * FROM cp.`employee.json` LIMIT 3;

查询任意数据文件的内容:

apache drill> SELECT * FROM dfs.`/home/hao/apache-drill-1.16.0/sample-data/region.parquet`;

退出命令行用!quit

配置和查看Drill参数

如果要永久性修改参数值,需要修改$DRILL_HOME/conf/drill-override.conf文件(见文档);SET、RESET命令可以在当前session里修改参数值(文档)。

配置参数:

SET `store.parquet.block-size` = 536870912

重置参数为缺省值:

RESET `store.parquet.block-size`

查看参数:

select * from sys.options where name = 'store.parquet.block-size'

在java代码里使用Drill

下面是在java代码里使用Drill的例子代码,要注意的一点是,JDBC的URL是jdbc:drill:drillbit=localhost,而不是很多教程上说的jdbc:drill:zk=localhost

package com.acme;

import java.sql.*;

public class DrillJDBCExample {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    //static final String DB_URL = "jdbc:drill:zk=localhost:2181";
    static final String DB_URL = "jdbc:drill:drillbit=localhost"; //for embedded mode installation

    static final String USER = "admin";
    static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL,USER,PASS);

            stmt = conn.createStatement();
            /* Perform a select on data in the classpath storage plugin. */
            String sql = "select employee_id,first_name,last_name from cp.`employee.json`";
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
                int id  = rs.getInt("employee_id");
                String first = rs.getString("first_name");
                String last = rs.getString("last_name");

                System.out.print("ID: " + id);
                System.out.print(", First: " + first);
                System.out.println(", Last: " + last);
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

让Drill访问数据库

根据要访问的数据库的不同,需要为Drill添加相应的驱动,方法见RDBMS Storage Plugin

利用Drill将csv格式转换为parquet格式

原理是在drill里创建一张格式为parquet的表,该表的路径(下例中的/parquet1)对应的是磁盘上的一个目录。

ALTER SESSION SET `store.format`='parquet';
ALTER SESSION SET `store.parquet.compression` = 'snappy';

CREATE TABLE dfs.tmp.`/parquet1` AS 
SELECT * FROM dfs.`/my/csv/file.csv`;

让drill支持.zip、.arc压缩格式

(暂缺)

参考资料

Drill in 10 Minutes
How to convert a csv file to parquet

pytorch环境搭建和简单的人脸识别应用

注:本文是一边安装一边记录形成的,其中有不少问题是由于centos版本较低导致的,建议使用高版本(>=7.6)安装以节约时间。

安装python

使用Anaconda管理环境

设置anaconda使用tuna(清华大学镜像):

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --set show_channel_urls yes

tuna还维护了一些第三方镜像,包括conda-forge, menpo等,需要的话也可以像上面这样添加,tuna页面有说明。

在conda环境里安装pytorch:

conda install pytorch-cpu -c pytorch

CUDA toolkit下载:https://developer.nvidia.com/accelerated-computing-toolkit

cuDNN下载:https://developer.nvidia.com/cudnn

用pip安装dlib(win下未成功,linux下没试):

https://pypi.org/simple/dlib/ 下载.whl格式的dlib,然后用pip安装:

pip install xxx.whl

用conda安装dlib:

用conda安装dlib(如果已经将menpo源添加到conda,则可以省略-c menpo选项):

conda install -c menpo dlib

conda提示Killed:

Update 201902: 另一个引发Killed的原因是内存不足,例如这台ec2上只有约300MB可用内存,经常出现Killed,增加512MB(总共800MB可用内存)后Killed的情况就少了很多。

这里遇到一个坑:用conda安装各种包时总报错“Killed”,网上也没找到类似问题,最后发现居然是磁盘空间满了,原来anaconda3居然占用了接近4GB的空间把硬盘填满了。另一个原因,需要conda update conda更新一下conda版本,之后就能正常安装了。

解决上述磁盘空间满问题后,发现conda install dlib时还会报错"Killed",加-vvv参数后发现可能与TUNA源有关:

从~/.condarc里删除相关TUNA源后问题即解决(应该主要是conda-forge这个镜像源的问题)。

偶尔还会遇到killed提示,但再试一次通常可以成功,因此猜测网络条件也会产生影响。

dlib与python版本:

在python 3.5环境里安装的dlib,虽然安装成功,但import dlib失败:

提示“ImportError: /root/miniconda3/envs/py35/lib/python3.5/site-packages/dlib.cpython-35m-x86_64-linux-gnu.so: undefined symbol: _PyThreadState_UncheckedGet”:

重新换python 3.6环境里安装dlib,可以import dlib成功:

升级glibc

由于是在centos6.5环境里,glibc版本比较低(2.12),而menpo源的dlib 19要求glibc 2.14,因此需要手工升级glibc。链接

安装必要的python库

conda install dlib -c menpo
conda install opencv -c menpo
conda install numpy
pip install imutils

如果安装opencv报内存不足(300MB剩余内存有此问题,700MB可以通过),可以尝试用pip安装opencv:

pip install opencv-python==3.4.1.15

Python编码问题

print()命令打印来自mysql的中文时报错如下:

UnicodeEncodeError: 'ascii' codec can't encode character '\xe9'

解决(绕过)方法:用logging.info()替代print()命令。

import logging
logging.basicConfig(level=logging.DEBUG,format='[%(levelname)s][%(asctime)s] %(message)s')
logging.info("my messages")

注意,默认logging输出到的是stderr而不是stdout。

一个简单的人脸检测程序

检测一张图片里的多个人脸,用绿色矩形标记并输出为图片。参考链接

from imutils import face_utils
import imutils
import dlib
import cv2
image = cv2.imread('/root/person01.jpg')
image = imutils.resize(image, width=500)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
detector = dlib.get_frontal_face_detector()
rects = detector(gray, 1)
for (i, rect) in enumerate(rects):
    (x, y, w, h) = face_utils.rect_to_bb(rect)
    cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 1)
cv2.imwrite('/root/output.jpg',image)

python连接mysql数据库

过去常用的MySQLdb是c语言实现的,目前更建议使用MySQL官方提供的Connector/Python,是一个纯python的实现。

pip install mysql-connector-python

不过Connector/Python对mysql服务器版本有要求,例如2.0版以上就无法连接5.1版的mysql服务器,会报Bad Handshake错误。

另一个纯python的实现是PyMySQL,使用起来兼容性比较好。安装方法是:

pip install PyMySQL

用PyMySQL的一个简单例子:

import pymysql.cursors
cnx = pymysql.connect(user='xxx',password='xxx', host='localhost', database='xxx', charset='utf8mb4')
try:
    with cnx.cursor() as cursor:
        sql = "SELECT id, name, gender FROM user"
        cursor.execute(sql)
        for row in cursor:
            print(row)
finally:
    cnx.close()

安装tensorflow

tensorflow支持python3.5,所以用conda创建一个python 3.5环境后安装(参考链接):

conda create -n tensorflow python=3.5
source activate tensorflow
pip install tensorflow

如果是在centos 6上,由于自带的gcc是4.4.7版本过低,所以需要升级到gcc 4.8版本。直接下载编译的时间比较长(也比较容易遇到问题),可以使用devtools2辅助升级(参考链接):

wget http://people.centos.org/tru/devtools-2/devtools-2.repo -O /etc/yum.repos.d/devtools-2.repo
yum install devtoolset-2-gcc devtoolset-2-binutils devtoolset-2-gcc-c++
ln -s /opt/rh/devtoolset-2/root/usr/bin/* /usr/local/bin/

此时执行tensorflow程序可能会提示找不到库GLIBCXX_3.4.17的问题:

ImportError: /usr/lib64/libstdc++.so.6: version `GLIBCXX_3.4.17' not found (required by /root/miniconda3/envs/tensorflow/lib/python3.5/site-packages/tensorflow/python/_pywrap_tensorflow_internal.so)

此时可以从conda自带的lib目录里复制一个libstdc++.so.6.0.24版本并更新软连接解决:

cp /root/miniconda3/lib/libstdc++.so.6.0.24 /usr/lib64/
ln -s /usr/lib64/libstdc++.so.6.0.24 /usr/lib64/libstdc++.so.6

这时执行tensorflow程序遇到glibc版本过低的提示:

ImportError: /lib64/libc.so.6: version `GLIBC_2.16' not found

查看系统的glibc版本是2.14,因此需要升级。但是按照前述升级到glibc 2.14的方法(链接),升级为2.16时却遇到了下面的问题:

error while loading shared libraries: __vdso_time: invalid mode for dlopen(): Invalid argument

由于libc.so.6有问题,导致一些基本命令(例如rm)不可用,此时要切换回glibc 2.14的方法如下:

LD_PRELOAD=/opt/glibc-2.14/lib/libc-2.14.so rm -f /lib64/libc.so.6
LD_PRELOAD=/opt/glibc-2.14/lib/libc-2.14.so ln -s /opt/glibc-2.14/lib/libc-2.14.so /lib64/libc.so.6

原因(可能)是因为升级glibc时没有包含完整环境,使用下面的configure参数:

../configure --prefix=/opt/glibc-2.17 --disable-profile --enable-add-ons --with-headers=/usr/include --with-binutils=/usr/bin

上面命令有可能在check过程中出错(不支持--no-whole-archive),原因是binutils版本过低(2.23),需要升级到2.25,方法见链接

升级binutils后虽然能够支持--no-whole-archive参数了,但升级glibc 2.16后的异常却没有任何变化。

至此,决定升级linux版本解决(从centos 6.5升级到7.6)。Update: 在centos 7.6下安装tensorflow十分顺利。

 

参考资料:

https://github.com/jcjohnson/pytorch-examples

Anaconda使用总结

mysql数据库ibdata1文件过大问题处理

问题

一个mysql数据库,由于数据不断积累,目前ibdata1文件已经达到100GB大小,接近磁盘存储空间上限。

使用mysqlshow命令查看各个表占用的空间大小,发现dap_app_usage表占用的空间最大。其中data_length是数据大小,index_length是索引大小,data_free是未被使用的空间(通常是删除数据后留下的,这些空间不会被文件系统回收,但未来新插入的数据可以使用)。

mysqlshow --status apptimer

由于dap_app_usage表里保存了2014~2018年的数据,其中2015年及以前的数据属于冷数据,因此可以备份这些数据并从数据库里删除。

备份数据

我们按月备份数据以方便未来使用这些数据,每个月预估压缩后的文件大小为100MB,写一个脚本循环调用mysqldump命令以节约手工输入命令的时间并避免出错:

#!/bin/sh
#Dump dap_app_usage table to files by month
for i in {201401..201412}
do
    echo [date] Backing up $i ...
    mysqldump -uroot apptimer dap_app_usage -t --where="date between '${i}01' and '${i}31'"|gzip>~/dap_app_usage_${i}_data.sql.gz
done

删除数据

一次删除大量(如一亿条)数据效率很低,编写一个脚本小批量多次删除:

#!/bin/sh
# Delete specified range data from dap_app_usage table
# Make sure you have backup of these data

st=20140101
ed=20141231

if [[ ${st} -lt 19700101 ]]; then
    echo 'Wrong start date'
    exit
fi

if [[ ${ed} -lt ${st} ]]; then
        echo 'End date must be greater than start date'
        exit
fi

if [[ $((${ed}-${st})) -gt 1130 ]]; then
    echo 'Date range too large (1 year max)'
    exit
fi

echo "Delete data from ${st} to ${ed}"
for i in {1..99999}
do
    echo [date] delete iteration ${i}
    mysql -uroot apptimer -e "select count(*) from dap_app_usage where date between '${st}' and '${ed}'"
    mysql -uroot apptimer -e "delete from dap_app_usage where date between '${st}' and '${ed}' limit 1000"
    sleep 1
done

后续

为避免以后再出现类似问题,应定期执行上述清理操作,保证在dap_app_usage表里只保存近期一段时间的数据,这样可以维持数据库文件大小在一个固定值。

参考资料:

How to shrink/purge ibdata1 file in MySQL

JVM内存模型

JVM的内存分为堆(heap)和栈(thread stack)两类区域,分别存放不同数据,规则如下。

以下数据存放在heap中:

  • 所有对象(object)。但对象的method里的local variable存放在stack中。
  • 所有对象的member variable,不论是primitive或是指向一个对象。
  • 所有静态类的variable

以下数据存放在stack中(每个线程有自己的thread stack,互相不可见):

  • 当前线程执行过的所有方法(method)
  • 当前线程内所有local variable(method里的变量)。如果此变量指向一个对象,则变量存放在stack中而对象仍然存放在heap中。
  • 当前线程内所有primitive类型(如int, long, boolean)的local variable

java-memory-model-3

参考资料:

Java Memory Model

Java 内存区域和GC机制