最新版python安装Apache Beam失败问题和解决

Beam安装:
https://beam.apache.org/get-started/quickstart-py/

为安装Apache Beam,首先下载安装了最新版python 3.12.4。

file

> python --version
Python 3.12.4

按文档安装beam时报错无法编译wheel:

> pip install apache-beam
...
      INFO:root:copying apache_beam\portability\api\org\apache\beam\model\pipeline\v1\schema_pb2.pyi -> build\lib.win-amd64-cpython-312\apache_beam\portability\api\org\apache\beam\model\pipeline\v1
      INFO:root:copying apache_beam\portability\api\org\apache\beam\model\pipeline\v1\standard_window_fns_pb2.pyi -> build\lib.win-amd64-cpython-312\apache_beam\portability\api\org\apache\beam\model\pipeline\v1
      INFO:root:copying apache_beam\portability\api\standard_coders.yaml -> build\lib.win-amd64-cpython-312\apache_beam\portability\api
      INFO:root:running build_ext
      INFO:root:building 'apache_beam.coders.coder_impl_row_encoders' extension
      error: Microsoft Visual C++ 14.0 or greater is required. Get it with "Microsoft C++ Build Tools": https://visualstudio.microsoft.com/visual-cpp-build-tools/
      [end of output]

  note: This error originates from a subprocess, and is likely not a problem with pip.
  ERROR: Failed building wheel for apache-beam
  Building wheel for pyarrow (pyproject.toml) ... error
  error: subprocess-exited-with-error

  × Building wheel for pyarrow (pyproject.toml) did not run successfully.
  │ exit code: 1
  ╰─> [299 lines of output]
      <string>:34: DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
      WARNING setuptools_scm.pyproject_reading toml section missing 'pyproject.toml does not contain a tool.setuptools_scm section'
      Traceback (most recent call last):
        File "C:\Users\zhanghao\AppData\Local\Temp\pip-build-env-0wvk6red\overlay\Lib\site-packages\setuptools_scm\_integration\pyproject_reading.py", line 36, in read_pyproject
          section = defn.get("tool", {})[tool_name]
                    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^
      KeyError: 'setuptools_scm'
      running bdist_wheel
      running build
      running build_py

按报错信息里的提示,从 https://visualstudio.microsoft.com/zh-hans/visual-cpp-build-tools/ 下载了最新版的vc++ buildtools仍然报错。

file

在so上找到类似的问题,其中一个解决方法是安装整个vc++,但需要下载6GB+太重了。发现还有另一个方法,就是不要使用最新版的python,因为最新版python里没有包含一些已编译的包。

https://stackoverflow.com/questions/64261546/how-to-solve-error-microsoft-visual-c-14-0-or-greater-is-required-when-inst

This error can happen when using the latest version of Python, e.g. 3.12, because the package wheels were only built for earlier versions of Python

卸载python 3.12.4,安装python 3.9.13,再重新安装beam成功。

> python --version
Python 3.9.13

> pip install apache-beam
...
Using legacy 'setup.py install' for crcmod, since package 'wheel' is not installed.
Using legacy 'setup.py install' for dill, since package 'wheel' is not installed.
Using legacy 'setup.py install' for hdfs, since package 'wheel' is not installed.
Using legacy 'setup.py install' for pyjsparser, since package 'wheel' is not installed.
Using legacy 'setup.py install' for docopt, since package 'wheel' is not installed.
Installing collected packages: pytz, pyjsparser, docopt, crcmod, zstandard, urllib3, tzdata, typing-extensions, six, rpds-py, regex, pyparsing, pyarrow-hotfix, protobuf, packaging, orjson, objsize, numpy, jsonpickle, idna, grpcio, fasteners, fastavro, dnspython, dill, cloudpickle, charset-normalizer, certifi, attrs, async-timeout, tzlocal, requests, referencing, redis, python-dateutil, pymongo, pydot, pyarrow, proto-plus, httplib2, jsonschema-specifications, js2py, hdfs, jsonschema, apache-beam
  Running setup.py install for pyjsparser ... done
  Running setup.py install for docopt ... done
  Running setup.py install for crcmod ... done
  Running setup.py install for dill ... done
  Running setup.py install for hdfs ... done
Successfully installed apache-beam-2.56.0 async-timeout-4.0.3 attrs-23.2.0 ...

Python代码单元测试和性能测试

本文介绍如何使用pytest对python工程进行单元测试和统计测试覆盖率,同时介绍如何测试python程序的运行速度和内存消耗。

一、工程目录结构

测试用例统一放在tests目录下,tests内部目录结构与被测代码的目录结构相同,以便快速定位测试用例代码。

file

如果环境还没有安装过pytest,先执行pip install pytest进行安装。

二、测试用例内容

测试用例的原则是逻辑尽量简单直白,这样当用例运行失败时能够较快的通过调试定位到问题原因,建议在测试代码里添加必要的注释。另外注意下面几个规则:

  • 文件名必须以test_开头,这样pytest命令可以正确识别此文件为测试用例。
  • 文件里每个测试方法必须以test_开头,原因同上。
  • 如果测试用例引用外部文件,可将文件放在同目录下,然后用os.path.dirname(file)引用py文件所在目录,确保从各个目录执行此测试用例都能正确找到外部文件。
  • 如果测试用例生成新的文件,建议用pytest提供的tmp_path参数(文档)作为输出目录,此临时目录下的文件会被自动清理,以免影响测试用例的下次运行。

以下是一个测试用例的例子(test_adapter_csv.py):

from acme.datasets.adapter import *
import os
import pandas as pd

# 验证正常加载csv文件
def test_read_file_to_df():
    df = CSVHandler.read_file_to_df(os.path.dirname(__file__), file_type=CSVInputFile.BUFFER)
    assert len(df) == 16

# 验证数据正常写出到csv文件
def test_write_df_to_csv(tmp_path):
    df = pd.DataFrame()
    CSVHandler.write_df_to_csv(df, path=str(tmp_path), file_type=CSVOutputFile.LOT_FLOW)
    assert os.path.exists(os.path.join(str(tmp_path), 'RESULT.csv'))

为提高代码覆盖率,我们不能只验证输入正常的情况,还应该对处理异常的情况进行验证:

# 验证要加载的csv文件不存在的情况
def test_read_file_to_df_not_exist():
    with pytest.raises(FileNotFoundError):
        df = CSVHandler.read_file_to_df(os.path.dirname(__file__), file_type=CSVInputFile.DEMAND)

三、执行测试用例

方式1、在IDE里可直接执行test_xxx.py文件里的指定方法
file

方式2、在IDE里右键点击tests目录选择执行所有测试用例
file

方式3、在命令行里执行pytest命令可以执行所有测试用例
file

四、统计测试覆盖率

PyCharm专业版里内置了覆盖率工具,在运行时选择"Run with coverage“即可,这里介绍PyCharm社区版如何统计测试覆盖率。

首先确保已安装测试覆盖率组件:

pip install pytest-cov
pip install pytest-html

并且确保在tests目录内的每个目录下已包含init.py文件(否则统计时会忽略此目录下的测试用例)。
file

然后在命令行里执行:

pytest --cov=acme --cov-report=html

即可生成测试覆盖率结果文件,放在htmlcov目录下,打开其中的index.html可查看结果。

即使只有一个测试用例,工程的总体覆盖率也已经达到28%,这是因为有很多import、def代码在加载模块时被覆盖到,而实际的业务逻辑代码并没有被覆盖。
file

file

如果忽略 --cov-report=html 参数则会在控制台里输出每个py文件的覆盖率报告,但不会包含逐行的覆盖率结果。
file

五、性能测试(时间)

方案1:cProfile+Snakeviz

首先安装pytest-profiling组件和snakeviz查看工具:

pip install pytest-profiling
pip install snakeviz

在命令行里执行pytest时添加–profile参数即可统计运行时间:

pytest tests/datasets/test_adapter_csv.py --profile

file

同时会生成二进制格式的统计文件:
file

使用snakeviz工具查看指定prof文件的内容:

snakeviz prof/combined.prof

file

方案2、Viztracer(推荐)

viztracer查看结果的交互方式更加友好,有利于更快速的定位问题,因此推荐使用。

首先安装viztracer组件:

pip install viztracer

用viztracer命令运行指定的测试用例:

viztracer --min_duration 0.1ms tests/datasets/test_adapter_csv.py
vizviewer result.json

运行结束后会生成result.json文件:
file

用vizviewer命令查看result.json的内容(会自动启动浏览器):

vizviewer result.json

file

六、性能测试(内存)

a. 整体内存分析

首先安装必要的组件:

pip install memory_profiler
pip install matplotlib

使用mprof运行指定的测试用例:

mprof run tests/datasets/test_adapter_csv.py

每次运行结束后会生成一个.dat文件:
file

使用mprof plot命令查看最新一个.dat文件的内容,若不指定文件名则自动查看最新一个文件,图形展示了测试用例执行过程中消耗的内存变化情况:

mprof plot mprofile_20240410185255.dat

file

b. 逐行内存分析

有时我们希望分析某个函数或者某几行代码的内存占用,可以用@profile修饰要分析的函数,然后在PyCharm里点击测试函数旁边的运行按钮启动测试用例(命令行里启动不行)。当运行完成后,在控制台里会输出此函数的逐行内存变化。

file

逐行内存占用情况如下图所示,从图中可以看到,读取DataFrame的操作新增占用了0.6MB的内存。

file

我们也可以用@profile标记多个函数,在PyCharm里批量运行测试用例,这样在测试报告里可以分别查看这些函数的逐行内存占用情况:
file

HBase建表超时问题和解决

问题描述

尝试用下面的命令在hbase shell里创建启用SNAPPY压缩的表,建表命令一直没有返回,直到过了10分钟左右提示错误信息如下:

hbase:012:0> create 'hb_data_2',{NAME=>'DATA', 'COMPRESSION' => 'SNAPPY'}, {NUMREGIONS => 64, SPLITALGO => 'HexStringSplit' }

2022-12-05 19:54:03,386 INFO  [ReadOnlyZKClient-taos-1:2181,taos-2:2181,taos-3:2181@0x0e0d9e3f] zookeeper.ZooKeeper (ZooKeeper.java:close(1422)) - Session: 0x284b286aa820049 closed
2022-12-05 19:54:03,387 INFO  [ReadOnlyZKClient-taos-1:2181,taos-2:2181,taos-3:2181@0x0e0d9e3f-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(524)) - EventThread shut down for session: 0x284b286aa820049
ERROR: The procedure 844 is still running
For usage try 'help "create"'
Took 608.2809 seconds
hbase:013:0>

在web-ui里看到:

file

查看日志:

> tail /usr/local/hbase-2.5.1/logs/hbase-root-regionserver-taos-1.log -n200
...
2022-12-05T22:51:11,801 WARN  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-0] handler.AssignRegionHandler: Failed to open region hb_data_2,f0000000,1670241238417.d5ced9638670a54fb4172157ee539d34., will report to master
org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test. Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:339) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:306) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7220) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegionFromTableDir(HRegion.java:7183) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7159) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7118) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7074) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler.process(AssignRegionHandler.java:147) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.executor.EventHandler.run(EventHandler.java:100) ~[hbase-server-2.5.1.jar:2.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test.
        at org.apache.hadoop.hbase.util.CompressionTest.testCompression(CompressionTest.java:90) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:300) ~[hbase-server-2.5.1.jar:2.5.1]
        ... 10 more
2022-12-05T22:51:11,801 WARN  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-2] handler.AssignRegionHandler: Failed to open region hb_data_2,e4000000,1670241238417.722ed7fb3b36599c9ea0176774e3f91c., will report to master
org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test. Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:339) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:306) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7220) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegionFromTableDir(HRegion.java:7183) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7159) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7118) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7074) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler.process(AssignRegionHandler.java:147) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.executor.EventHandler.run(EventHandler.java:100) ~[hbase-server-2.5.1.jar:2.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'snappy' previously failed test.
        at org.apache.hadoop.hbase.util.CompressionTest.testCompression(CompressionTest.java:90) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:300) ~[hbase-server-2.5.1.jar:2.5.1]
        ... 10 more
2022-12-05T22:51:11,801 INFO  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-1] handler.AssignRegionHandler: Open hb_data_2,,1670241238417.ec86493893d7742525919263abd01c2c.
2022-12-05T22:51:11,802 INFO  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-1] regionserver.HRegion: Closing region hb_data_2,,1670241238417.ec86493893d7742525919263abd01c2c.
2022-12-05T22:51:11,802 INFO  [RS_OPEN_REGION-regionserver/zookeeper-1:16020-1] regionserver.HRegion: Closed hb_data_2,,1670241238417.ec86493893d7742525919263abd01c2c.
...

此时表处于ENABLING状态:

file

尝试删除,但删除命令也会死住直到超时:

hbase:017:0> drop 'hb_data_2'
ERROR: The procedure 2829 is still running
For usage try 'help "drop"'

Took 668.4171 seconds
hbase:018:0>

尝试更换算法为LZ4,仍然提示类似错误:

org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'lz4' previously failed test. Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:339) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:306) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7220) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegionFromTableDir(HRegion.java:7183) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7159) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7118) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:7074) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler.process(AssignRegionHandler.java:147) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.executor.EventHandler.run(EventHandler.java:100) ~[hbase-server-2.5.1.jar:2.5.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Compression algorithm 'lz4' previously failed test.
        at org.apache.hadoop.hbase.util.CompressionTest.testCompression(CompressionTest.java:90) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.util.TableDescriptorChecker.checkCompression(TableDescriptorChecker.java:300) ~[hbase-server-2.5.1.jar:2.5.1]
        ... 10 more

解决方案

在每个节点的hbase-site.xml里添加如下配置:

<property>
  </name>hbase.table.sanity.checks</name>
  <value>false</value>
</property>

然后重启hbase集群:

> stop-hbase.sh
> start-hbase.sh

重新尝试建表(要换一个表名):

hbase:008:0> create 'hb_data_5',{NAME=>'DATA', 'COMPRESSION' => 'SNAPPY'}, {NUMREGIONS => 64, SPLITALGO => 'HexStringSplit' }

2022-12-06 15:30:37,106 INFO  [main] client.HBaseAdmin (HBaseAdmin.java:postOperationResult(3591)) - Operation: CREATE, Table Name: default:hb_data_5, procId: 13870 completed
Created table hb_data_5
Took 2.2455 seconds
=> Hbase::Table - hb_data_5
hbase:009:0>

但是返现原先的表无法禁用并删除(HBase里删表必须先disable):

hbase:005:0> disable 'hb_data_2'
2022-12-06 15:28:16,517 INFO  [main] client.HBaseAdmin (HBaseAdmin.java:rpcCall(926)) - Started disable of hb_data_2

ERROR: Table hb_data_2 is disabled!

For usage try 'help "disable"'

Took 0.0529 seconds
hbase:006:0> drop 'hb_data_2'

ERROR: Table org.apache.hadoop.hbase.TableNotDisabledException: Not DISABLED; tableName=hb_data_2, state=ENABLING
        at org.apache.hadoop.hbase.master.HMaster.checkTableModifiable(HMaster.java:2786)
        at org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure.prepareDelete(DeleteTableProcedure.java:241)
        at org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure.executeFromState(DeleteTableProcedure.java:90)
        at org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure.executeFromState(DeleteTableProcedure.java:58)
        at org.apache.hadoop.hbase.procedure2.StateMachineProcedure.execute(StateMachineProcedure.java:188)
        at org.apache.hadoop.hbase.procedure2.Procedure.doExecute(Procedure.java:922)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execProcedure(ProcedureExecutor.java:1648)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.executeProcedure(ProcedureExecutor.java:1394)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.access$1000(ProcedureExecutor.java:75)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor$WorkerThread.runProcedure(ProcedureExecutor.java:1960)
        at org.apache.hadoop.hbase.trace.TraceUtil.trace(TraceUtil.java:216)
        at org.apache.hadoop.hbase.procedure2.ProcedureExecutor$WorkerThread.run(ProcedureExecutor.java:1987)
 should be disabled!

For usage try 'help "drop"'

Took 0.0226 seconds

使用JMeter测试TDengine数据库性能

TDengine 是一款专为时序数据打造的国产数据库产品,它可对TB量级的时序数据进行存储、分析和分发,常备应用于设备运行状态实时监测和预警等场景。

在前面的文章中我们曾经介绍过使用sysbench进行过数据库性能测试,但sysbench聚焦于mysql和postgresql这两种关系数据库,对其他数据库的支持几乎为零(曾支持过oracle但后来取消了)。因此这里我们使用更通用的测试框架JMeter进行性能测试。

环境信息

TDengine服务器 TDengine驱动 jmeter
3.0.0 3.0.2 5.5

添加依赖

将连接TDengine所需的jar包复制到jmeter安装目录下的lib目录:

taos-jdbcdriver-3.0.2.jar
fastjson-1.2.29.jar

准备数据库

在TDengine数据库里预先创建用于测试的库和表:

create database if not exists test vgroups 10 buffer 10;

测试发现create table既可以创建stable也可以创建table,为避免歧义我们严格用create stable来创建超级表:

create stable test.stable01 (time timestamp, col001 double, col002 double, col003 double, \
    col004 double, col005 double, col006 double, col007 double, col008 double, col009 double, \
    col010 double) tags (device varchar(20));

创建子表:

create table test.table01 using stable01 tags ('device01');

测试计划

在GUI模式下创建测试计划(tdengine_test.jmx):
file

执行测试

执行测试(-n表示不启动GUI,-t参数指定测试计划文件,-l指定测试csv格式的结果文件,-e表示测试结束后生成html格式报告,-o表示指定报告的路径名):

jmeter -n -t tdengine_test.jmx -l log1.jtl -e -o report1

测试机作为客户端,与TDengine服务不在同一个局域网内的情况下测试结果:
file

测试机作为客户端,与TDengine服务在同一个局域网内的情况下测试结果:
file

参考资料

https://jmeter.apache.org/usermanual/build-db-test-plan.html
https://jmeter.apache.org/usermanual/generating-dashboard.html
https://docs.taosdata.com/connector/java/
https://docs.taosdata.com/taos-sql/table/

理解TDengine里的虚拟节点(vgroup)

这篇是速记,基于TDengine 2.6版,详情建议看官方文档:数据节点管理

TDengine集群版通过虚拟节点(vgroup)实现database的负载均衡和水平扩展,每个物理节点(dnode)可以包含多个vgroup。每个database由多个table组成,这些table根据创建顺序被分布到不同的vgroup。

database1
    vgroup1
        table1
        table2
        ...
        table10
    vgroup2
        table11
        ...
        table20

table分布到不同vgroup的规则是由taos.cfg里的minTablesPerVnode参数和tableIncStepPerVnode参数控制的,每当database里当前vnode的table数量超过minTablesPerVnode时就会创建一个新的vgroup。这两个参数的默认值很大,如果database里table的数量不多,建议手工配置减小参数值,否则可能所有数据都分配到同一个物理节点产生数据倾斜。

注意:这两个参数修改后需要重启taosd服务,已经创建好的database需要删除重建才会生效。

[root@taos-1 ~]# tail /etc/taos/taos.cfg
...
minTablesPerVnode                 10
tableIncStepPerVnode              10

注意:在tdengine 3.0里minTablesPerVnode和tableIncStepPerVnode这两个参数已经被移除。

TDengine允许指定vgroup的副本数,每个副本是一个vnode。如果vgroup的副本数大于1,则此vgroup里会包含多个vnode,不同的vnode会被分配到不同的物理节点上。

可以在taos命令行里查看vgroups的情况。如下面的命令可以看到,集群包含3个物理节点,数据库里共有80个表,每10个表共用一个vgroup,这些vgroup比较均匀的分布到3个物理节点上。

[root@taos-1 ~]# taos
taos> use database1;
taos> show dnodes;
   id   |           end_point            | vnodes | cores  |   status   | role  |       create_time       |      offline reason      |
======================================================================================================================================
      1 | taos-1:6030                    |     61 |      8 | ready      | any   | 2022-04-03 17:58:10.049 |                          |
      2 | taos-2:6030                    |     61 |      8 | ready      | any   | 2022-04-03 17:58:15.756 |                          |
      3 | taos-3:6030                    |     62 |      8 | ready      | any   | 2022-02-03 17:58:16.287 |                          |
Query OK, 3 row(s) in set (0.008325s)

taos> show vgroups;
    vgId     |   tables    |  status  |   onlines   | v1_dnode | v1_status | compacting  |
==========================================================================================
         536 |          10 | ready    |           1 |        3 | leader    |           0 |
         537 |          10 | ready    |           1 |        2 | leader    |           0 |
         538 |          10 | ready    |           1 |        1 | leader    |           0 |
         539 |          10 | ready    |           1 |        3 | leader    |           0 |
         540 |          10 | ready    |           1 |        2 | leader    |           0 |
         541 |          10 | ready    |           1 |        1 | leader    |           0 |
         542 |          10 | ready    |           1 |        3 | leader    |           0 |
         543 |          10 | ready    |           1 |        2 | leader    |           0 |
Query OK, 8 row(s) in set (0.005023s)

使用Apache Flink处理Kafka数据流

公司产品为支持流式数据处理采用了Flink+Kafka的组合,在此记录环境安装部署和开发要点。

安装和启动kafka

kafka官网下载并解压到本地即可。

建议:如果从远程访问这个kafka,需要修改config/server.properties里的listeners属性为实际ip地址,否则producer发送数据时会提示“Connection to node 0 (localhost/127.0.0.1:9092) could not be established”:

listeners=PLAINTEXT://<ipaddress>:9092

启动kafka(如果是windows环境,将bin改为bin\windows,.sh改为.bat):

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

安装flink

flink官网下载压缩包,解压到本地即可。

启动flink:

bin/start-cluster

启动后访问 localhost:8081 可打开Flink Web Dashboard:

file

创建flink项目

用maven自动创建项目框架,这一步根据网络情况可能比较慢,耐心等待10分钟左右:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=com.test -DartifactId=flink -Dversion=1.0.0 -Dpackage=com.test -DinteractiveMode=false

在生成的pom.xml里添加flink-kafka-connector依赖(注意scala版本要与下载的kafka的scala版本一致):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

我们要处理流式数据,因此在生成的StreamingJob.java基础上修改。

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "10.1.10.76:9092");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_test", new SimpleStringSchema(), props);
    DataStream<String> stream = env.addSource(consumer);

    StreamingFileSink<String> sink = StreamingFileSink
            .forRowFormat(new Path("c:/temp/kafka-loader"), new SimpleStringEncoder<String>())
            .withBucketAssigner(new MyBucketAssigner())
            .build();
    stream.addSink(sink);

    // execute program
    env.execute("Flink Streaming Java API Skeleton");
}

static class MyBucketAssigner implements BucketAssigner<String, String> {
    @Override
    public String getBucketId(String element, Context context) {
        return "" + element.charAt(0);
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

运行flink作业

方式1:在IDE里直接运行main()方法即可,此处不细述。

方式2:mvn clean package打成jar包,然后打开localhost:8081控制台页面,选择左侧的Submit New Job菜单上传生成的jar包(按前述maven生成的pom.xml配置会自动生成两个jar包,要上传其中比较大的那个fatjar包)。上传成功后点击jar包,再点击Submit按钮即可。

测试发送数据

bin\windows\kafka-console-producer --broker-list <ipaddress>:9092 --topic flink_test

随机输入一些字符串并按回车键,在/tmp/kafka-loader目录下,应该会按字符串首字母生成相应的目录,里面的文件内容是所输入的字符串。

一些坑

FlinkKafkaConsumer

FlinkKafkaConsumer()的构造方法里,第二个参数可以是DeserializationSchema类型,也可以是KafkaDeserializationSchema类型,之前为了将flink-connector-kafka里自带的JSONKeyValueDeserializationSchema等(详见链接)转为前者找了半天,其实不用转直接用就可以。

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema适合kafka消息内容为json格式的,如果不是json格式,比如是逗号分隔的格式,还是自己实现KafkaDeserializationSchema,并不复杂。比如之前我用SimpleStringSchema无法获取到消息里的key信息,就需要用flink-connector-kafka提供的Deser:

static class MyKeyedDeserializationSchema implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String s) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        return consumerRecord.key() + "," + consumerRecord.value();  // key()是repo名称,将其插入消息体以便后续处理
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

Part文件名

为方便进行数据统计,对parquet文件名做了设计要求,但flink本身没有支持定制parquet文件名称,经过研究找到一个方法单独记录了一篇文章:定制Flink输出的parquet文件名

参考资料:

使用 Apache Flink 开发实时 ETL
Flink Kafka Connector

Fragment里getActivity()为空问题

问题

一个常见的场景:fragment在onViewCreated()里执行一个任务(AsyncTask)加载一些数据,任务没有完成时,用户切换到其他界面,这时如果在代码里调用了getActivity()方法,得到的将是空值。

这是一个在本地不容易发现的问题,但通过在线异常报告(例如bugly或友盟)容易发现。

解决方法

同时采取以下两项措施:

  1. 在任务的doInBackground()的一开始就获取context,而不是在每处都是用getActivity()获取;
  2. onStop()onDetach()方法里取消正在执行的任务以避免onPostExecute()被继续执行。

参考资料

getActivity() returns null in Fragment function

应用计时手机权限配置指南

应用计时需要常驻手机后台才能准确计时,目前各个品牌的手机默认都会对后台应用做限制,以达到节电的目的,因此需要手工开启一些权限。

各品牌手机开启权限的方法如下:

1. 小米手机(MIUI)

开机自启动(必须)

方法1:安全中心 -> 应用管理 -> 权限 -> 自启动 -> 自启动管理
方法2:设置 -> 授权管理 -> 自启动管理

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法1:安全中心 -> 应用管理 -> 权限 -> 应用权限管理 -> 应用计时 -> 显示悬浮窗
方法2:设置 -> 授权管理 -> 应用权限管理 -> 应用计时 -> 显示悬浮窗

无障碍(建议)

当计时模式选择的是辅助模式时,这一项是必须配置的。
方法:设置 -> 更多设置 -> 无障碍

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
MIUI10:打开最近任务列表,按住应用计时,在弹出菜单里选择加锁。
MIUI9:打开最近任务列表,下拉应用计时加锁。

后台弹出界面(建议)

强制模式下,如果选择了超时直接退出应用,需要此项权限。
方法:安全中心 -> 应用管理 -> 权限 -> 应用权限管理 -> 应用计时 -> 后台弹出界面

2. 华为手机(EMUI 10)

开机自启动(必须)

方法:设置 > 应用启动管理 > 应用计时 > 手动管理 > 允许自启动允许关联启动允许后台活动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 应用 > 应用计时 > 权限 > 悬浮窗

无障碍(建议)

方法:设置 > 辅助功能 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:打开最近任务列表,下拉应用计时加锁。

3. 华为手机(EMUI 9)

开机自启动(必须)

方法:设置 > 应用 > 自启动 > 应用计时 > 允许自启动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 应用 > 应用计时 > 显示在其他应用之上

无障碍(建议)

方法:设置 > 智能辅助 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:打开最近任务列表,下拉应用计时加锁。

4. 魅族手机 (Flyme 7.2)

开机自启动(必须)

待补充

后台运行(建议)

方法: 手机管家 > 权限管理 > 应用管理 > 应用计时 > 后台管理 > 允许后台运行

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:手机管家 > 权限管理 > 应用管理 > 应用计时 > 悬浮窗

无障碍(建议)

方法:设置 > 辅助功能 > 无障碍 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:多任务列表,下拉应用计时加锁。

5. VIVO手机 (Funtouch OS)

开机自启动(必须)

方法:设置 > 更多设置权限管理应用计时单项权限设置自启动

悬浮窗(建议)

悬浮窗提醒时长,以及强制模式禁用当前应用,这两项功能需要悬浮窗权限。
方法:设置 > 更多设置权限管理应用计时单项权限设置悬浮窗

无障碍(建议)

方法:设置 > 更多设置 > 辅助功能 > 应用计时

锁定后台(建议)

锁定后台的目的是避免一键内存清理。
方法:多任务列表,下拉应用计时图标加锁。

后台弹出界面(建议)

方法:设置 > 更多设置权限管理应用计时单项权限设置后台弹出界面

高耗电运行(建议)

方法:设置 > 电池 > 后台高耗电 > 允许高耗电继续运行

由于市面手机机型和版本数量众多,以上内容难免有不完整或错误之处,请加QQ群763350557反馈您的宝贵意见。

感谢以下网友的贡献:

  • 吃瓜群众
  • 冬日可爱
  • 清风朗月

Intellij IDEA输出jar包注意事项

Intellij IDEA Ultimate 2016.1

菜单File -> Project Structures -> Artifacts

注意:MANIFEST.MF文件不要使用默认的路径,应放在项目根目录下,否则打包后此文件不在jar包内或内容不正确。

 

输出"uber-jar"运行时可能会遇到错误“Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes”,此时可尝试删除jar包里META-INF目录下所有的.SF、.DSA和.RSA文件。

参考:https://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar

Win7访问家庭组共享文件夹提示没有权限

问题:Win7电脑访问Win10电脑的共享文件夹时,提示下面的错误:

解决方法:

1、在Win10高级共享设置里,勾选“关闭密码保护共享”。这样客户端访问时就不会提示输入用户名密码了(我遇到的情况是即使输入了正确的用户名密码也提示不正确,这个问题并没解决)。

2、在Win10共享的文件夹属性的“安全”页里,加入"Everyone"这个用户并设置足够的权限即可。并不像网上说的那样,需要修改组策略“网络安全:LAN 管理器身份验证级别”。