Python代码单元测试和性能测试

本文介绍如何使用pytest对python工程进行单元测试和统计测试覆盖率,同时介绍如何测试python程序的运行速度和内存消耗。

一、工程目录结构

测试用例统一放在tests目录下,tests内部目录结构与被测代码的目录结构相同,以便快速定位测试用例代码。

file

如果环境还没有安装过pytest,先执行pip install pytest进行安装。

二、测试用例内容

测试用例的原则是逻辑尽量简单直白,这样当用例运行失败时能够较快的通过调试定位到问题原因,建议在测试代码里添加必要的注释。另外注意下面几个规则:

  • 文件名必须以test_开头,这样pytest命令可以正确识别此文件为测试用例。
  • 文件里每个测试方法必须以test_开头,原因同上。
  • 如果测试用例引用外部文件,可将文件放在同目录下,然后用os.path.dirname(file)引用py文件所在目录,确保从各个目录执行此测试用例都能正确找到外部文件。
  • 如果测试用例生成新的文件,建议用pytest提供的tmp_path参数(文档)作为输出目录,此临时目录下的文件会被自动清理,以免影响测试用例的下次运行。

以下是一个测试用例的例子(test_adapter_csv.py):

from acme.datasets.adapter import *
import os
import pandas as pd

# 验证正常加载csv文件
def test_read_file_to_df():
    df = CSVHandler.read_file_to_df(os.path.dirname(__file__), file_type=CSVInputFile.BUFFER)
    assert len(df) == 16

# 验证数据正常写出到csv文件
def test_write_df_to_csv(tmp_path):
    df = pd.DataFrame()
    CSVHandler.write_df_to_csv(df, path=str(tmp_path), file_type=CSVOutputFile.LOT_FLOW)
    assert os.path.exists(os.path.join(str(tmp_path), 'RESULT.csv'))

为提高代码覆盖率,我们不能只验证输入正常的情况,还应该对处理异常的情况进行验证:

# 验证要加载的csv文件不存在的情况
def test_read_file_to_df_not_exist():
    with pytest.raises(FileNotFoundError):
        df = CSVHandler.read_file_to_df(os.path.dirname(__file__), file_type=CSVInputFile.DEMAND)

三、执行测试用例

方式1、在IDE里可直接执行test_xxx.py文件里的指定方法
file

方式2、在IDE里右键点击tests目录选择执行所有测试用例
file

方式3、在命令行里执行pytest命令可以执行所有测试用例
file

四、统计测试覆盖率

PyCharm专业版里内置了覆盖率工具,在运行时选择"Run with coverage“即可,这里介绍PyCharm社区版如何统计测试覆盖率。

首先确保已安装测试覆盖率组件:

pip install pytest-cov
pip install pytest-html

并且确保在tests目录内的每个目录下已包含init.py文件(否则统计时会忽略此目录下的测试用例)。
file

然后在命令行里执行:

pytest --cov=acme --cov-report=html

即可生成测试覆盖率结果文件,放在htmlcov目录下,打开其中的index.html可查看结果。

即使只有一个测试用例,工程的总体覆盖率也已经达到28%,这是因为有很多import、def代码在加载模块时被覆盖到,而实际的业务逻辑代码并没有被覆盖。
file

file

如果忽略 --cov-report=html 参数则会在控制台里输出每个py文件的覆盖率报告,但不会包含逐行的覆盖率结果。
file

五、性能测试(时间)

方案1:cProfile+Snakeviz

首先安装pytest-profiling组件和snakeviz查看工具:

pip install pytest-profiling
pip install snakeviz

在命令行里执行pytest时添加–profile参数即可统计运行时间:

pytest tests/datasets/test_adapter_csv.py --profile

file

同时会生成二进制格式的统计文件:
file

使用snakeviz工具查看指定prof文件的内容:

snakeviz prof/combined.prof

file

方案2、Viztracer(推荐)

viztracer查看结果的交互方式更加友好,有利于更快速的定位问题,因此推荐使用。

首先安装viztracer组件:

pip install viztracer

用viztracer命令运行指定的测试用例:

viztracer --min_duration 0.1ms tests/datasets/test_adapter_csv.py
vizviewer result.json

运行结束后会生成result.json文件:
file

用vizviewer命令查看result.json的内容(会自动启动浏览器):

vizviewer result.json

file

六、性能测试(内存)

a. 整体内存分析

首先安装必要的组件:

pip install memory_profiler
pip install matplotlib

使用mprof运行指定的测试用例:

mprof run tests/datasets/test_adapter_csv.py

每次运行结束后会生成一个.dat文件:
file

使用mprof plot命令查看最新一个.dat文件的内容,若不指定文件名则自动查看最新一个文件,图形展示了测试用例执行过程中消耗的内存变化情况:

mprof plot mprofile_20240410185255.dat

file

b. 逐行内存分析

有时我们希望分析某个函数或者某几行代码的内存占用,可以用@profile修饰要分析的函数,然后在PyCharm里点击测试函数旁边的运行按钮启动测试用例(命令行里启动不行)。当运行完成后,在控制台里会输出此函数的逐行内存变化。

file

逐行内存占用情况如下图所示,从图中可以看到,读取DataFrame的操作新增占用了0.6MB的内存。

file

我们也可以用@profile标记多个函数,在PyCharm里批量运行测试用例,这样在测试报告里可以分别查看这些函数的逐行内存占用情况:
file

HBase建表超时问题和解决

问题描述

尝试用下面的命令在hbase shell里创建启用SNAPPY压缩的表,建表命令一直没有返回,直到过了10分钟左右提示错误信息如下:

hbase:012:0> create 'hb_data_2',{NAME=>'DATA', 'COMPRESSION' => 'SNAPPY'}, {NUMREGIONS => 64, SPLITALGO => 'HexStringSplit' }

2022-12-05 19:54:03,386 INFO  [ReadOnlyZKClient-taos-1:2181,taos-2:2181,taos-3:2181@0x0e0d9e3f] zookeeper.ZooKeeper (ZooKeeper.java:close(1422)) - Session: 0x284b286aa820049 closed
2022-12-05 19:54:03,387 INFO  [ReadOnlyZKClient-taos-1:2181,taos-2:2181,taos-3:2181@0x0e0d9e3f-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(524)) - EventThread shut down for session: 0x284b286aa820049
ERROR: The procedure 844 is still running
For usage try 'help "create"'
Took 608.2809 seconds
hbase:013:0>

在web-ui里看到:

file

查看日志:

> tail /usr/local/hbase-2.5.1/logs/hbase-root-regionserver-taos-1.log -n200
...
2022-12-05T22:51:11,801 WARN  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-0] handler.AssignRegionHandler: Failed to open region hb_data_2,f0000000,1670241238417.d5ced9638670a54fb4172157ee539d34., will report to master
org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test. Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:339) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:306) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7220) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegionFromTableDir(HRegion.java:7183) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7159) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7118) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7074) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler.process(AssignRegionHandler.java:147) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.executor.EventHandler.run(EventHandler.java:100) ~[hbase-server-2.5.1.jar:2.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test.
        at org.apache.hadoop.hbase.util.CompressionTest.testCompression(CompressionTest.java:90) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:300) ~[hbase-server-2.5.1.jar:2.5.1]
        ... 10 more
2022-12-05T22:51:11,801 WARN  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-2] handler.AssignRegionHandler: Failed to open region hb_data_2,e4000000,1670241238417.722ed7fb3b36599c9ea0176774e3f91c., will report to master
org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test. Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:339) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:306) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7220) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegionFromTableDir(HRegion.java:7183) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7159) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7118) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7074) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler.process(AssignRegionHandler.java:147) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.executor.EventHandler.run(EventHandler.java:100) ~[hbase-server-2.5.1.jar:2.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test.
        at org.apache.hadoop.hbase.util.CompressionTest.testCompression(CompressionTest.java:90) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:300) ~[hbase-server-2.5.1.jar:2.5.1]
        ... 10 more
2022-12-05T22:51:11,801 INFO  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-1] handler.AssignRegionHandler: Open hb_data_2,,1670241238417.ec86493893d7742525919263abd01c2c.
2022-12-05T22:51:11,802 INFO  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-1] regionserver.HRegion: Closing region hb_data_2,,1670241238417.ec86493893d7742525919263abd01c2c.
2022-12-05T22:51:11,802 INFO  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-1] regionserver.HRegion: Closed hb_data_2,,1670241238417.ec86493893d7742525919263abd01c2c.
...

此时表处于ENABLING状态:

file

尝试删除,但删除命令也会死住直到超时:

hbase:017:0> drop 'hb_data_2'
ERROR: The procedure 2829 is still running
For usage try 'help "drop"'

Took 668.4171 seconds
hbase:018:0>

尝试更换算法为LZ4,仍然提示类似错误:

org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'lz4' previously failed test. Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:339) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:306) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7220) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegionFromTableDir(HRegion.java:7183) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7159) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7118) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7074) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler.process(AssignRegionHandler.java:147) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.executor.EventHandler.run(EventHandler.java:100) ~[hbase-server-2.5.1.jar:2.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'lz4' previously failed test.
        at org.apache.hadoop.hbase.util.CompressionTest.testCompression(CompressionTest.java:90) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:300) ~[hbase-server-2.5.1.jar:2.5.1]
        ... 10 more

解决方案

在每个节点的hbase-site.xml里添加如下配置:

<property>
  </name>hbase.table.sanity.checks</name>
  <value>false</value>
</property>

然后重启hbase集群:

> stop-hbase.sh
> start-hbase.sh

重新尝试建表(要换一个表名):

hbase:008:0> create 'hb_data_5',{NAME=>'DATA', 'COMPRESSION' => 'SNAPPY'}, {NUMREGIONS => 64, SPLITALGO => 'HexStringSplit' }

2022-12-06 15:30:37,106 INFO  [main] client.HBaseAdmin (HBaseAdmin.java:postOperationResult(3591)) - Operation: CREATE, Table Name: default:hb_data_5, procId: 13870 completed
Created table hb_data_5
Took 2.2455 seconds
=> Hbase::Table - hb_data_5
hbase:009:0>

但是返现原先的表无法禁用并删除(HBase里删表必须先disable):

hbase:005:0> disable 'hb_data_2'
2022-12-06 15:28:16,517 INFO  [main] client.HBaseAdmin (HBaseAdmin.java:rpcCall(926)) - Started disable of hb_data_2

ERROR: Table hb_data_2 is disabled!

For usage try 'help "disable"'

Took 0.0529 seconds
hbase:006:0> drop 'hb_data_2'

ERROR: Table org.apache.hadoop.hbase.TableNotDisabledException: Not DISABLED; tableName=hb_data_2, state=ENABLING
        at org.apache.hadoop.hbase.master.HMaster.checkTableModifiable(HMaster.java:2786)
        at org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure.prepareDelete(DeleteTableProcedure.java:241)
        at org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure.executeFromState(DeleteTableProcedure.java:90)
        at org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure.executeFromState(DeleteTableProcedure.java:58)
        at org.apache.hadoop.hbase.procedure2.StateMachineProcedure.execute(StateMachineProcedure.java:188)
        at org.apache.hadoop.hbase.procedure2.Procedure.doExecute(Procedure.java:922)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execProcedure(ProcedureExecutor.java:1648)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.executeProcedure(ProcedureExecutor.java:1394)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.access$1000(ProcedureExecutor.java:75)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor$WorkerThread.runProcedure(ProcedureExecutor.java:1960)
        at org.apache.hadoop.hbase.trace.TraceUtil.trace(TraceUtil.java:216)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor$WorkerThread.run(ProcedureExecutor.java:1987)
 should be disabled!

For usage try 'help "drop"'

Took 0.0226 seconds

使用JMeter测试TDengine数据库性能

TDengine 是一款专为时序数据打造的国产数据库产品,它可对TB量级的时序数据进行存储、分析和分发,常备应用于设备运行状态实时监测和预警等场景。

在前面的文章中我们曾经介绍过使用sysbench进行过数据库性能测试,但sysbench聚焦于mysql和postgresql这两种关系数据库,对其他数据库的支持几乎为零(曾支持过oracle但后来取消了)。因此这里我们使用更通用的测试框架JMeter进行性能测试。

环境信息

TDengine服务器 TDengine驱动 jmeter
3.0.0 3.0.2 5.5

添加依赖

将连接TDengine所需的jar包复制到jmeter安装目录下的lib目录:

taos-jdbcdriver-3.0.2.jar
fastjson-1.2.29.jar

准备数据库

在TDengine数据库里预先创建用于测试的库和表:

create database if not exists test vgroups 10 buffer 10;

测试发现create table既可以创建stable也可以创建table,为避免歧义我们严格用create stable来创建超级表:

create stable test.stable01 (time timestamp, col001 double, col002 double, col003 double, \
    col004 double, col005 double, col006 double, col007 double, col008 double, col009 double, \
    col010 double) tags (device varchar(20));

创建子表:

create table test.table01 using stable01 tags ('device01');

测试计划

在GUI模式下创建测试计划(tdengine_test.jmx):
file

执行测试

执行测试(-n表示不启动GUI,-t参数指定测试计划文件,-l指定测试csv格式的结果文件,-e表示测试结束后生成html格式报告,-o表示指定报告的路径名):

jmeter -n -t tdengine_test.jmx -l log1.jtl -e -o report1

测试机作为客户端,与TDengine服务不在同一个局域网内的情况下测试结果:
file

测试机作为客户端,与TDengine服务在同一个局域网内的情况下测试结果:
file

参考资料

https://jmeter.apache.org/usermanual/build-db-test-plan.html
https://jmeter.apache.org/usermanual/generating-dashboard.html
https://docs.taosdata.com/connector/java/
https://docs.taosdata.com/taos-sql/table/

理解TDengine里的虚拟节点(vgroup)

这篇是速记,基于TDengine 2.6版,详情建议看官方文档:数据节点管理

TDengine集群版通过虚拟节点(vgroup)实现database的负载均衡和水平扩展,每个物理节点(dnode)可以包含多个vgroup。每个database由多个table组成,这些table根据创建顺序被分布到不同的vgroup。

database1
    vgroup1
        table1
        table2
        ...
        table10
    vgroup2
        table11
        ...
        table20

table分布到不同vgroup的规则是由taos.cfg里的minTablesPerVnode参数和tableIncStepPerVnode参数控制的,每当database里当前vnode的table数量超过minTablesPerVnode时就会创建一个新的vgroup。这两个参数的默认值很大,如果database里table的数量不多,建议手工配置减小参数值,否则可能所有数据都分配到同一个物理节点产生数据倾斜。

注意:这两个参数修改后需要重启taosd服务,已经创建好的database需要删除重建才会生效。

[root@taos-1 ~]# tail /etc/taos/taos.cfg
...
minTablesPerVnode                 10
tableIncStepPerVnode              10

注意:在tdengine 3.0里minTablesPerVnode和tableIncStepPerVnode这两个参数已经被移除。

TDengine允许指定vgroup的副本数,每个副本是一个vnode。如果vgroup的副本数大于1,则此vgroup里会包含多个vnode,不同的vnode会被分配到不同的物理节点上。

可以在taos命令行里查看vgroups的情况。如下面的命令可以看到,集群包含3个物理节点,数据库里共有80个表,每10个表共用一个vgroup,这些vgroup比较均匀的分布到3个物理节点上。

[root@taos-1 ~]# taos
taos> use database1;
taos> show dnodes;
   id   |           end_point            | vnodes | cores  |   status   | role  |       create_time       |      offline reason      |
======================================================================================================================================
      1 | taos-1:6030                    |     61 |      8 | ready      | any   | 2022-04-03 17:58:10.049 |                          |
      2 | taos-2:6030                    |     61 |      8 | ready      | any   | 2022-04-03 17:58:15.756 |                          |
      3 | taos-3:6030                    |     62 |      8 | ready      | any   | 2022-02-03 17:58:16.287 |                          |
Query OK, 3 row(s) in set (0.008325s)

taos> show vgroups;
    vgId     |   tables    |  status  |   onlines   | v1_dnode | v1_status | compacting  |
==========================================================================================
         536 |          10 | ready    |           1 |        3 | leader    |           0 |
         537 |          10 | ready    |           1 |        2 | leader    |           0 |
         538 |          10 | ready    |           1 |        1 | leader    |           0 |
         539 |          10 | ready    |           1 |        3 | leader    |           0 |
         540 |          10 | ready    |           1 |        2 | leader    |           0 |
         541 |          10 | ready    |           1 |        1 | leader    |           0 |
         542 |          10 | ready    |           1 |        3 | leader    |           0 |
         543 |          10 | ready    |           1 |        2 | leader    |           0 |
Query OK, 8 row(s) in set (0.005023s)

使用Apache Flink处理Kafka数据流

公司产品为支持流式数据处理采用了Flink+Kafka的组合,在此记录环境安装部署和开发要点。

安装和启动kafka

kafka官网下载并解压到本地即可。

建议:如果从远程访问这个kafka,需要修改config/server.properties里的listeners属性为实际ip地址,否则producer发送数据时会提示“Connection to node 0 (localhost/127.0.0.1:9092) could not be established”:

listeners=PLAINTEXT://<ipaddress>:9092

启动kafka(如果是windows环境,将bin改为bin\windows,.sh改为.bat):

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

安装flink

flink官网下载压缩包,解压到本地即可。

启动flink:

bin/start-cluster

启动后访问 localhost:8081 可打开Flink Web Dashboard:

file

创建flink项目

用maven自动创建项目框架,这一步根据网络情况可能比较慢,耐心等待10分钟左右:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=com.test -DartifactId=flink -Dversion=1.0.0 -Dpackage=com.test -DinteractiveMode=false

在生成的pom.xml里添加flink-kafka-connector依赖(注意scala版本要与下载的kafka的scala版本一致):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

我们要处理流式数据,因此在生成的StreamingJob.java基础上修改。

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "10.1.10.76:9092");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_test", new SimpleStringSchema(), props);
    DataStream<String> stream = env.addSource(consumer);

    StreamingFileSink<String> sink = StreamingFileSink
            .forRowFormat(new Path("c:/temp/kafka-loader"), new SimpleStringEncoder<String>())
            .withBucketAssigner(new MyBucketAssigner())
            .build();
    stream.addSink(sink);

    // execute program
    env.execute("Flink Streaming Java API Skeleton");
}

static class MyBucketAssigner implements BucketAssigner<String, String> {
    @Override
    public String getBucketId(String element, Context context) {
        return "" + element.charAt(0);
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

运行flink作业

方式1:在IDE里直接运行main()方法即可,此处不细述。

方式2:mvn clean package打成jar包,然后打开localhost:8081控制台页面,选择左侧的Submit New Job菜单上传生成的jar包(按前述maven生成的pom.xml配置会自动生成两个jar包,要上传其中比较大的那个fatjar包)。上传成功后点击jar包,再点击Submit按钮即可。

测试发送数据

bin\windows\kafka-console-producer --broker-list <ipaddress>:9092 --topic flink_test

随机输入一些字符串并按回车键,在/tmp/kafka-loader目录下,应该会按字符串首字母生成相应的目录,里面的文件内容是所输入的字符串。

一些坑

FlinkKafkaConsumer

FlinkKafkaConsumer()的构造方法里,第二个参数可以是DeserializationSchema类型,也可以是KafkaDeserializationSchema类型,之前为了将flink-connector-kafka里自带的JSONKeyValueDeserializationSchema等(详见链接)转为前者找了半天,其实不用转直接用就可以。

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema适合kafka消息内容为json格式的,如果不是json格式,比如是逗号分隔的格式,还是自己实现KafkaDeserializationSchema,并不复杂。比如之前我用SimpleStringSchema无法获取到消息里的key信息,就需要用flink-connector-kafka提供的Deser:

static class MyKeyedDeserializationSchema implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String s) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        return consumerRecord.key() + "," + consumerRecord.value();  // key()是repo名称,将其插入消息体以便后续处理
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

Part文件名

为方便进行数据统计,对parquet文件名做了设计要求,但flink本身没有支持定制parquet文件名称,经过研究找到一个方法单独记录了一篇文章:定制Flink输出的parquet文件名

参考资料:

使用 Apache Flink 开发实时 ETL
Flink Kafka Connector

Fragment里getActivity()为空问题

问题

一个常见的场景:fragment在onViewCreated()里执行一个任务(AsyncTask)加载一些数据,任务没有完成时,用户切换到其他界面,这时如果在代码里调用了getActivity()方法,得到的将是空值。

这是一个在本地不容易发现的问题,但通过在线异常报告(例如bugly或友盟)容易发现。

解决方法

同时采取以下两项措施:

  1. 在任务的doInBackground()的一开始就获取context,而不是在每处都是用getActivity()获取;
  2. onStop()onDetach()方法里取消正在执行的任务以避免onPostExecute()被继续执行。

参考资料

getActivity() returns null in Fragment function

应用计时手机权限配置指南

应用计时需要常驻手机后台才能准确计时,目前各个品牌的手机默认都会对后台应用做限制,以达到节电的目的,因此需要手工开启一些权限。

各品牌手机开启权限的方法如下:

1. 小米手机(MIUI)

开机自启动(必须)

方法1:安全中心 -> 应用管理 -> 权限 -> 自启动 -> 自启动管理
方法2:设置 -> 授权管理 -> 自启动管理

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法1:安全中心 -> 应用管理 -> 权限 -> 应用权限管理 -> 应用计时 -> 显示悬浮窗
方法2:设置 -> 授权管理 -> 应用权限管理 -> 应用计时 -> 显示悬浮窗

无障碍(建议)

当计时模式选择的是辅助模式时,这一项是必须配置的。
方法:设置 -> 更多设置 -> 无障碍

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
MIUI10:打开最近任务列表,按住应用计时,在弹出菜单里选择加锁。
MIUI9:打开最近任务列表,下拉应用计时加锁。

后台弹出界面(建议)

强制模式下,如果选择了超时直接退出应用,需要此项权限。
方法:安全中心 -> 应用管理 -> 权限 -> 应用权限管理 -> 应用计时 -> 后台弹出界面

2. 华为手机(EMUI 10)

开机自启动(必须)

方法:设置 > 应用启动管理 > 应用计时 > 手动管理 > 允许自启动允许关联启动允许后台活动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 应用 > 应用计时 > 权限 > 悬浮窗

无障碍(建议)

方法:设置 > 辅助功能 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:打开最近任务列表,下拉应用计时加锁。

3. 华为手机(EMUI 9)

开机自启动(必须)

方法:设置 > 应用 > 自启动 > 应用计时 > 允许自启动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 应用 > 应用计时 > 显示在其他应用之上

无障碍(建议)

方法:设置 > 智能辅助 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:打开最近任务列表,下拉应用计时加锁。

4. 魅族手机 (Flyme 7.2)

开机自启动(必须)

待补充

后台运行(建议)

方法: 手机管家 > 权限管理 > 应用管理 > 应用计时 > 后台管理 > 允许后台运行

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:手机管家 > 权限管理 > 应用管理 > 应用计时 > 悬浮窗

无障碍(建议)

方法:设置 > 辅助功能 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:多任务列表,下拉应用计时加锁。

5. VIVO手机 (Funtouch OS)

开机自启动(必须)

方法:设置 > 更多设置权限管理应用计时单项权限设置自启动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 更多设置权限管理应用计时单项权限设置悬浮窗

无障碍(建议)

方法:设置 > 更多设置 > 辅助功能 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:多任务列表,下拉应用计时图标加锁。

后台弹出界面(建议)

方法:设置 > 更多设置权限管理应用计时单项权限设置后台弹出界面

高耗电运行(建议)

方法:设置 > 电池 > 后台高耗电 > 允许高耗电继续运行

由于市面手机机型和版本数量众多,以上内容难免有不完整或错误之处,请加QQ群763350557反馈您的宝贵意见。

感谢以下网友的贡献:

  • 吃瓜群众
  • 冬日可爱
  • 清风朗月

Intellij IDEA输出jar包注意事项

Intellij IDEA Ultimate 2016.1

菜单File -> Project Structures -> Artifacts

注意:MANIFEST.MF文件不要使用默认的路径,应放在项目根目录下,否则打包后此文件不在jar包内或内容不正确。

 

输出"uber-jar"运行时可能会遇到错误“Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes”,此时可尝试删除jar包里META-INF目录下所有的.SF、.DSA和.RSA文件。

参考:https://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar

Win7访问家庭组共享文件夹提示没有权限

问题:Win7电脑访问Win10电脑的共享文件夹时,提示下面的错误:

解决方法:

1、在Win10高级共享设置里,勾选“关闭密码保护共享”。这样客户端访问时就不会提示输入用户名密码了(我遇到的情况是即使输入了正确的用户名密码也提示不正确,这个问题并没解决)。

2、在Win10共享的文件夹属性的“安全”页里,加入"Everyone"这个用户并设置足够的权限即可。并不像网上说的那样,需要修改组策略“网络安全:LAN 管理器身份验证级别”。

查看Android应用占用内存情况

要查看指定app在手机上占用多少运行内存,首先将手机连接到电脑,然后在命令行执行下面的命令(其中com.my.package.name是app的包名):

adb shell dumpsys meminfo com.my.package.name

执行结果通常如下,其中Pss那一列的值(单位:kB)是我们主要需要关注的:

adb-dumpsys

参考链接:

adb shell dumpsys meminfo - What is the meaning of each cell of its output?