Zookeeper启动失败和解决一例

服务器重启后zookeeper启动失败,报错日志如下:

zookeeper    | ZooKeeper JMX enabled by default
zookeeper    | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper    | 2022-10-05 17:24:53,976 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper    | 2022-10-05 17:24:53,979 [myid:] - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
zookeeper    | 2022-10-05 17:24:53,979 [myid:] - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
zookeeper    | 2022-10-05 17:24:53,980 [myid:] - WARN  [main:QuorumPeerMain@116] - Either no config or no quorum defined in config, running  in standalone mode
zookeeper    | 2022-10-05 17:24:53,980 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
zookeeper    | 2022-10-05 17:24:53,990 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper    | 2022-10-05 17:24:53,990 [myid:] - INFO  [main:ZooKeeperServerMain@98] - Starting server
zookeeper    | 2022-10-05 17:24:53,995 [myid:] - INFO  [main:Environment@100] - Server environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:host.name=fdf9214b4b7e
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.version=1.7.0_65
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.vendor=Oracle Corporation
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.class.path=/opt/zookeeper-3.4.13/bin/../build/classes:/opt/zookeeper-3.4.13/bin/../build/lib/*.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/opt/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/opt/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/opt/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/opt/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/opt/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/opt/zookeeper-3.4.13/bin/../conf:
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.io.tmpdir=/tmp
zookeeper    | 2022-10-05 17:24:53,996 [myid:] - INFO  [main:Environment@100] - Server environment:java.compiler=<NA>
zookeeper    | 2022-10-05 17:24:53,998 [myid:] - INFO  [main:Environment@100] - Server environment:os.name=Linux
zookeeper    | 2022-10-05 17:24:53,998 [myid:] - INFO  [main:Environment@100] - Server environment:os.arch=amd64
zookeeper    | 2022-10-05 17:24:53,998 [myid:] - INFO  [main:Environment@100] - Server environment:os.version=4.4.249-1.el7.elrepo.x86_64
zookeeper    | 2022-10-05 17:24:53,998 [myid:] - INFO  [main:Environment@100] - Server environment:user.name=root
zookeeper    | 2022-10-05 17:24:53,999 [myid:] - INFO  [main:Environment@100] - Server environment:user.home=/root
zookeeper    | 2022-10-05 17:24:53,999 [myid:] - INFO  [main:Environment@100] - Server environment:user.dir=/opt/zookeeper-3.4.13
zookeeper    | 2022-10-05 17:24:54,000 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
zookeeper    | 2022-10-05 17:24:54,000 [myid:] - INFO  [main:ZooKeeperServer@836] - tickTime set to 2000
zookeeper    | 2022-10-05 17:24:54,000 [myid:] - INFO  [main:ZooKeeperServer@845] - minSessionTimeout set to -1
zookeeper    | 2022-10-05 17:24:54,000 [myid:] - INFO  [main:ZooKeeperServer@854] - maxSessionTimeout set to -1
zookeeper    | 2022-10-05 17:24:54,007 [myid:] - INFO  [main:ServerCnxnFactory@117] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
zookeeper    | 2022-10-05 17:24:54,010 [myid:] - INFO  [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
zookeeper    | 2022-10-05 17:24:54,799 [myid:] - ERROR [main:ZooKeeperServerMain@66] - Unexpected exception, exiting abnormally
zookeeper    | java.io.EOFException
zookeeper    |  at java.io.DataInputStream.readInt(DataInputStream.java:392)
zookeeper    |  at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileHeader.deserialize(FileHeader.java:66)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileTxnLog$FileTxnIterator.inStreamCreated(FileTxnLog.java:585)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileTxnLog$FileTxnIterator.createInputArchive(FileTxnLog.java:604)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileTxnLog$FileTxnIterator.goToNextLog(FileTxnLog.java:570)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileTxnLog$FileTxnIterator.next(FileTxnLog.java:650)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileTxnSnapLog.fastForwardFromEdits(FileTxnSnapLog.java:219)
zookeeper    |  at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:176)
zookeeper    |  at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:217)
zookeeper    |  at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:284)
zookeeper    |  at org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:407)

一开始怀疑是zoo.cfg损坏,但查看后发现文件没有明显异常。将/opt/zookeeper-3.4.13/data/version-2目录改名为version-2.bak后重启zookeeper正常(但client连接报错 zxid 0x02 our last zxid is 0x0 client must try another server,因为前面的操作导致zk这边的事务id被重置了),说明可能是data下有文件损坏。

进一步查看version-2.bak下的文件,发现有一个0字节的log文件:

file

删除此文件并将version-2.bak改回原来的名字version-2,重启zookeeper成功,并且client连接正常。

PySpark使用HBase Spark Connector访问Hbase数据

Spark从1.2版开始提供了被称为Spark SQL Data Sources API的扩展机制,允许用户将任意形式的数据对接到Spark作为数据源,在Spark里可以用统一的方式访问这些数据。

All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions. Please refer to the Spark paper for more details on RDD internals.

HBase Spark Connector就是HBase官方提供的这样一个数据源的实现,相当于Spark与HBase之间的桥梁(见下图)。本文介绍如何安装、配置和在Spark中使用此连接器。为节约篇幅,本文中将HBase Spark Connector简称为hsc

file

注意,在github上还有一个名为Spark Hbase Connector的项目,简称shc,是Hortonworks提供的,此项目最后commit停在2020年1月,而hsc截至2022年12月仍有新的代码在提交。因此更推荐使用hsc而不是shc。

环境信息

HBase Spark PySpark Hadoop Maven 操作系统
2.5.1 集群版 3.1.3 3.1.3 3.2.3 3.8.2 CentOS 7.9

获取hsc

首先从github下载项目代码,按项目README.md里给出的mvn命令按指定版本编译:

> git clone https://github.com/apache/hbase-connectors.git
> cd hbase-connectors/spark/hbase-spark
> mvn -Dspark.version=3.1.3 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.3 -Dscala.binary.version=2.12 -Dhbase.version=2.5.1 clean install

编译成功后将得到三个jar文件(hbase-spark.jar、hbase-spark-protocol-shaded.jar和scala-library.jar),按本文环境编译的jar包可以点击下载,如果你的环境版本正好相同可直接使用。

配置hsc

仍然是按项目README.md里的说明配置即可。

客户端配置:
将hbase-site.xml复制到 $SPARK_CONF_DIR 目录下。

服务端配置:
将前面编译得到的三个jar文件复制到hbase region-server安装目录下的lib目录下:

[root@taos-2 lib]# ll /usr/local/hbase-2.5.1/lib
-rw-r--r-- 1 root root   508987 May 30 17:09 hbase-spark-1.0.1-SNAPSHOT.jar
-rw-r--r-- 1 root root    40374 May 30 17:09 hbase-spark-protocol-shaded-1.0.1-SNAPSHOT.jar
-rw-r--r-- 1 root root  5276900 May 30 16:58 scala-library-2.12.10.jar

在PySpark里读取HBase数据

我们直接在python命令行里进行验证(python环境需要已经安装与spark版本相同的pyspark)。

> python3

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import SQLContext
>>> import pandas as pd

>>> spark = SparkSession.builder.getOrCreate()
>>> sqlContext = SQLContext(spark.sparkContext)

>>> df = sqlContext.read.format( 'org.apache.hadoop.hbase.spark')\
    .option('hbase.table','table1')\
    .option('hbase.columns.mapping','rowKey String :key, col1 String cf1:col1, col2 String cf2:col2')\
    .option("hbase.spark.use.hbasecontext", False)\
    .load()
>>> df.show(2,False)
+----------------------+--------------------------+-----------------------------------------------+
|col1                  |col2                      |rowKey                                         |
+----------------------+--------------------------+-----------------------------------------------+
|{"dq":65012,"cq":3189}|{"stac":1,"ndc":3,"mdc":0}|005e6c9b-d843-4e7c-9fa0-77fe894e42f7_1662393625|
|{"dq":65012,"cq":3189}|{"stac":1,"ndc":3,"mdc":0}|005e6c9b-d843-4e7c-9fa0-77fe894e42f7_1662393686|
+----------------------+--------------------------+-----------------------------------------------+
only showing top 2 rows

至此通过hsc读取hbase数据已经验证通过。上述python命令也可以保存为.py文件,使用spark-submit命令提交。

> spark-submit --name xxx --master yarn --deploy-mode client  spark_hbase_example.py

关于Partition

从hsc代码的HBaseTableScanRDD.scala的实现可以看出,hsc对DataFrame的partition划分是根据hbase region的数量来的,因此如果region数量不合理,就可能需要对DataFrame进行repartition处理从而带来额外的shuffle开销。

@InterfaceAudience.Private
class HBaseTableScanRDD(relation: HBaseRelation,
                       val hbaseContext: HBaseContext,
                       @transient val filter: Option[SparkSQLPushDownFilter] = None,
                        val columns: Seq[Field] = Seq.empty
     ) extends RDD[Result](relation.sqlContext.sparkContext, Nil){

  override def getPartitions: Array[Partition] = {
    val regions = RegionResource(relation)
    logDebug(s"There are ${regions.size} regions")
    val ps = regions.flatMap { x =>
     ...
    }.toArray
    ...
    ps.asInstanceOf[Array[Partition]]
  }
  ...
 }

常见问题

在python里获取SparkSession时报错 PythonUtils.getEncryptionEnabled does not exist in the JVM:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  ...
    self._encryption_enabled = self._jvm.PythonUtils.getEncryptionEnabled(self._jsc)
  File "/disk1/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1487, in __getattr__
    "{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM

原因是pyspark版本与spark版本不匹配,用下面命令可以查看pyspark当前版本,如果过低需要卸载并安装新版。

> pip show pyspark
Name: pyspark
Version: 2.3.0

> pip install pip -U -i https://pypi.tuna.tsinghua.edu.cn/simple
> pip uninstall pyspark
> pip install pyspark==3.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

完整的pyspark版本历史可以在这里查看:https://pypi.org/project/pyspark/#history

启用 SQL Filter pushdown时报错

当使用.option("hbase.spark.pushdown.columnfilter", true)时报错,其实true是默认值,报错信息如下:

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder$

解决方法是,需要将前面提到的scala-library等三个jar包放在每个hbase region server节点的lib目录下。参考链接

参考资料

https://spark.apache.org/docs/latest/hadoop-provided.html
https://hbase.apache.org/book.html#_sparksqldataframes
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html
https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_HBase_Connector.md

HBase手工删除Region In Transition的表

问题现象

一次预分区建表时卡住,强制重启hbase后发现有不少region处于RIT(Region In Transition)状态:

file

启动hbase发现region server总是启动没几分钟就停止,查看日志发现打开某个表有问题:

2022-02-06T23:22:59,303 WARN  [RS_OPEN_REGION-regionserver/zookeeper-3:16020-0] handler.AssignRegionHandler: Failed to open region hb_data_2,e0000000,1670290921940.99fbb79e62c2182aab011931a36e210f., will report to master
2022-02-06T23:55:08,541 ERROR [RS_OPEN_REGION-regionserver/zookeeper-3:16020-1] regionserver.HRegionServer: ***** ABORTING region server zookeeper-3,16020,1670342087370: Replay of WAL required. Forcing server shutdown *****
org.apache.hadoop.hbase.DroppedSnapshotException: region: hb_data_2,60000000,1670311834912.9887284106bf70971f4a457e16558183.
        at org.apache.hadoop.hbase.regionserver.HRegion.internalFlushCacheAndCommit(HRegion.java:2903) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.internalFlushcache(HRegion.java:2580) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.replayRecoveredEditsIfAny(HRegion.java:5144) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.initializeRegionInternals(HRegion.java:1003) ~[hbase-server-2.5.1.jar:2.5.1]
        at org.apache.hadoop.hbase.regionserver.HRegion.initialize(HRegion.java:941) ~[hbase-server-2.5.1.jar:2.5.1]

删除损坏的表

为了不影响业务,决定先删除创建失败的表hb_data_2。

> zkCli.sh
> rmr /hbase/table/hb_data_2

> hdfs dfs -rmr /hbase/data/default/hb_data_2

需要删除的region数量有200多个左右,一般文章介绍的方法是在hbase:meta表里删除:

deleteall 'hbase:meta', '...'

但这次问题region数量比较多,可以用HBCK工具提供的extraRegionsInMeta --fix命令批量删除,该命令的作用是扫描hbase:meta里存在但目录不存在的那些region,如果使用--fix命令则会自动从hbase:meta里删除这些region:

> hbase hbck -j /usr/local/hbase-hbck2/hbase-hbck2-1.2.0.jar extraRegionsInMeta  --fix default

Regions that had no dir on the FileSystem and got removed from Meta: 64
Regions in Meta but having no equivalent dir, for each table:
        hb_data_2-> ec86493893d7742525919263abd01c2c e23d0e908125730a5e1095720bf13e50 ...

清理后重启hbase,可以看到RIT的region已经不在了。

个别region删除不掉

按上面方法清理后,发现有个别region仍然在opening状态,即使在hbase:meta里删除,重启hbase服务后发现region又被恢复回来。此时可以在hbase web ui里的"procedures & locks"里看到对应的进程和进程号。用hbck2的bypass命令可以清除,注意加-o参数否则可能失败:

> hbase hbck -j /usr/local/hbase-hbck2/hbase-hbck2-1.2.0.jar bypass -r -o 412150

然后再去删除一遍hbase:meta里问题region的对应记录即可:

scan 'hbase:meta', {STARTROW=>'hb_data_3', LIMIT=>2}
hbase:011:0> deleteall 'hbase:meta','hb_data_3,f8000000,1670290921940.e4bb0233b1f0130e661a347127087e7d.'

HBCK2安装方法

HBase2默认带的工具是旧版的HBCK1,所以需要先安装HBCK2。建议从源码编译安装使得版本与hbase严格匹配,如果直接下载bin版运行抛出异常的概率比较高。

首先下载源码(从国内镜像下载比较快):

> wget https://mirrors.tuna.tsinghua.edu.cn/apache/hbase/hbase-operator-tools-1.2.0/hbase-operator-tools-1.2.0-src.tar.gz --no-check-certificate
> tar zxvf hbase-operator-tools-1.2.0-src.tar.gz

然后手工修改pom.xml里的hbase.version与当前使用的版本一致(注意,log4j2的版本也需要与hbase的保持一致):

> vi pom.xml
<hbase.version>2.5.1</hbase.version>
<log4j2.version>2.17.2</log4j2.version>

编译源码,这一步时间大约1个小时,主要是download依赖项比较慢:

> mvn install

hbck2 1.2.0对hbase 2.5.1版编译有两个用例不通过:

[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR]   TestMissingTableDescriptorGenerator.shouldGenerateTableInfoBasedOnCachedTableDescriptor:91 » IllegalArgument
[ERROR]   TestMissingTableDescriptorGenerator.shouldGenerateTableInfoBasedOnFileSystem:121 » IllegalArgument
[INFO]
[ERROR] Tests run: 72, Failures: 0, Errors: 2, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Apache HBase Operator Tools 1.2.0:
[INFO]
[INFO] Apache HBase Operator Tools ........................ SUCCESS [09:14 min]
[INFO] Apache HBase - Table Reporter ...................... SUCCESS [13:53 min]
[INFO] Apache HBase - HBCK2 ............................... FAILURE [33:07 min]
[INFO] Apache HBase - HBase Tools ......................... SKIPPED
[INFO] Apache HBase Operator Tools - Assembly ............. SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  56:15 min
[INFO] Finished at: 2022-02-06T21:53:23+08:00
[INFO] ------------------------------------------------------------------------

猜测原因是hbck2版本没跟上hbase版本,暂时skip掉testcases解决:

> mvn install -Dmaven.test.skip=true

参考资料

https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2
https://zhuanlan.zhihu.com/p/373957937

HBase导出和导入表数据 (Export/Import)

本文假设我们要导出HBase里名为table1的表,其中包含名为cf1的列族,然后再将导出的文件导入到另一个环境里。

环境信息

HBase Zookeeper Hadoop 操作系统
2.5.1 集群版 3.4.8 3.2.3 CentOS 7.9

导出

1. 记录HBase表结构

HBase没有提供直接导出创建表的命令的方法,我们可以在HBase命令行里执行describe命令查看表名和列族名,然后整理得到建表命令,此命令在导入时需要用到:

> hbase shell
hbase:001:0> describe 'table1'
Table table1 is ENABLED
table1, {TABLE_ATTRIBUTES => {METADATA => {'hbase.store.file-tracker.impl' => 'DEFAULT'}}}
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', INDEX_BLOCK_ENCODING => 'NONE', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOM
FILTER => 'ROW', IN_MEMORY => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '65536 B (64KB)'}

整理以后可以得到完整的建表语句,如下所示:

create 'table1','cf1', 
    {VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', 
    DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', 
    REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', 
    COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true'}

2. 导出HBase表数据

用HBase提供的Export命令可以导出表table1的数据,导出的内容在hdfs指定目录/foo/table1下(此目录不能已存在),文件名是part-m-00000_这样的格式:

> hbase org.apache.hadoop.hbase.mapreduce.Export -Dmapred.output.compress=true table1 /foo/table1

上面的命令会启动一个MapReduce作业,可以用-D指定一些MapReduce参数,例如结果文件是否压缩;也可以指定要导出的数据版本和时间戳范围,如果不指定则导出最新版本和全部时间范围。我发现官方文档对Export工具的介绍不够全面,很多-D参数都没有包含,如有需要可以看本文最后的附录部分。

导出作业执行时间的长短主要取决于数据量的大小,等作业执行完成后可以在hdfs上看到导出结果,是经过压缩的SequenceFile格式:

> hdfs dfs -ls /foo/table1
Found 3 items
-rw-r--r--   3 hdfs supergroup          0 2022-03-22 13:20 /foo/table1/_SUCCESS
-rw-r--r--   3 hdfs supergroup 1609084326 2022-03-22 13:20 /foo/table1/part-m-00000_
-rw-r--r--   3 hdfs supergroup 1151428929 2022-03-22 13:20 /foo/table1/part-m-00001_

有些文章提到可以用file:///local/path形式的路径导出到本地,实际测试发现导出的文件可能出现在集群里任意节点的本地磁盘,导入时也有类似问题,很可能会提示文件不存在。因此还是建议先导出到hdfs再下载。

要将文件从hdfs下载到本地磁盘,使用下面的命令:

> hdfs dfs -get /foo/table1/* /my/local/path

导入

1. 创建HBase表

导入数据前需要先手工创建表和列族,其中表名可以与导出时的表名不同,但列族名必须保持相同。建表时可以根据需要使用与原表不同的参数,例如预分区的配置,这里为了节约篇幅使用了默认的建表选项而非前面整理得到的完整建表语句:

> hbase shell
hbase:001:0> create 'table1', 'cf1'

列族里的列名不需要手工创建,下一步导入数据的时候会自动恢复。

2. 导入HBase表数据

如果数据文件在本地磁盘,需要先使用下面的命令上传到hdfs:

> hdfs dfs -put /my/local/path/* /bar/table1

将之前导出的数据恢复到表table1,此命令也是通过启动一个MapReduce作业实现的:

> hbase org.apache.hadoop.hbase.mapreduce.Import table1 /bar/table1/*

导入任务完成后,检查一下前3条数据是否正常:

> hbase shell
hbase:001:0> scan 'table1', {LIMIT=>3}

常见问题

由于要启动MapReduce作业完成导入导出,在执行HBase导入导出命令的时候可能会提示配置缺少几个必要的配置项。按照提示在mapred-site.xml文件里添加下面的配置即可(值按实际填写):

<property>
  <name>yarn.app.mapreduce.am.env</name>
  <value>HADOOP_MAPRED_HOME=...</value>
</property>
<property>
  <name>mapreduce.map.env</name>
  <value>HADOOP_MAPRED_HOME=...</value>
</property>
<property>
  <name>mapreduce.reduce.env</name>
  <value>HADOOP_MAPRED_HOME=...</value>
</property>

附录

HBase官方文档里没有给出Export和Import的全部参数,网上也少有文章介绍,实际上直接不带参数执行这两个命令就可以看到每个参数的用法。

Export命令参考

> hbase org.apache.hadoop.hbase.mapreduce.Export
ERROR: Wrong number of arguments: 0
Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> [<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]

  Note: -D properties will be applied to the conf used.
  For example:
   -D mapreduce.output.fileoutputformat.compress=true
   -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
   -D mapreduce.output.fileoutputformat.compress.type=BLOCK
  Additionally, the following SCAN properties can be specified
  to control/limit what is exported..
   -D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...
   -D hbase.mapreduce.include.deleted.rows=true
   -D hbase.mapreduce.scan.row.start=<ROWSTART>
   -D hbase.mapreduce.scan.row.stop=<ROWSTOP>
   -D hbase.client.scanner.caching=100
   -D hbase.export.visibility.labels=<labels>
For tables with very wide rows consider setting the batch size as below:
   -D hbase.export.scanner.batch=10
   -D hbase.export.scanner.caching=100
   -D mapreduce.job.name=jobName - use the specified mapreduce job name for the export
For MR performance consider the following properties:
   -D mapreduce.map.speculative=false
   -D mapreduce.reduce.speculative=false

Import命令使用参考

> hbase org.apache.hadoop.hbase.mapreduce.Import
ERROR: Wrong number of arguments: 0
Usage: Import [options] <tablename> <inputdir>
By default Import will load data directly into HBase. To instead generate
HFiles of data to prepare for a bulk data load, pass the option:
  -Dimport.bulk.output=/path/for/output
If there is a large result that includes too much Cell whitch can occur OOME caused by the memery sort in reducer, pass the option:
  -Dimport.bulk.hasLargeResult=true
 To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use
  -Dimport.filter.class=<name of filter class>
  -Dimport.filter.args=<comma separated list of args for filter
 NOTE: The filter will be applied BEFORE doing key renames via the HBASE_IMPORTER_RENAME_CFS property. Futher, filters will only use the Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify  whether the current row needs to be ignored completely for processing and  Filter#filterCell(Cell) method to determine if the Cell should be added; Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including the Cell.
To import data exported from HBase 0.94, use
  -Dhbase.import.version=0.94
  -D mapreduce.job.name=jobName - use the specified mapreduce job name for the import
For performance consider the following options:
  -Dmapreduce.map.speculative=false
  -Dmapreduce.reduce.speculative=false
  -Dimport.wal.durability=<Used while writing data to hbase. Allowed values are the supported durability values like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>

参考链接

https://hbase.apache.org/book.html#export
https://hbase.apache.org/book.html#import
https://stackoverflow.com/questions/34243134/what-is-sequence-file-in-hadoop

HBase建表报错“hbase:meta is NOT online”问题

环境信息

HBase Zookeeper 操作系统
2.5.1 集群版 3.4.8 CentOS 7.9

问题现象

在hbase shell里执行list命令没问题,但执行create命令时报错“PleaseHoldException: Master is initializing”:

hbase:001:0> list
TABLE
0 row(s)
Took 0.5356 seconds
=> []

hbase:002:0> create 'table1', 'cf1'
2022-03-15 12:41:58,876 INFO  [main] client.RpcRetryingCallerImpl (RpcRetryingCallerImpl.java:callWithRetries(132)) - Call exception, tries=6, retries=8, started=5249 ms ago, cancelled=false, msg=org.apache.hadoop.hbase.PleaseHoldException: Master is initializing
        at org.apache.hadoop.hbase.master.HMaster.checkInitialized(HMaster.java:3168)
        at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:2301)
        at org.apache.hadoop.hbase.master.MasterRpcServices.createTable(MasterRpcServices.java:690)
        at org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:387)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:124)
        at org.apache.hadoop.hbase.ipc.RpcHandler.run(RpcHandler.java:102)
        at org.apache.hadoop.hbase.ipc.RpcHandler.run(RpcHandler.java:82)
, details=, see https://s.apache.org/timeout

查看HMaster日志,发现警告信息“hbase:meta,,1.1588230740 is NOT online;”:

> tail hbase-root-master-host-1.out -n100
2022-03-15 12:29:24,336 INFO  [RegionServerTracker-0] master.RegionServerTracker (RegionServerTracker.java:processAsActiveMaster(179)) - RegionServer ephemeral node created, adding [zookeeper-2,16020,1669350558974]
2022-03-15 12:29:24,337 INFO  [RegionServerTracker-0] master.RegionServerTracker (RegionServerTracker.java:processAsActiveMaster(179)) - RegionServer ephemeral node created, adding [zookeeper-3,16020,1669350556241]
2022-03-15 12:29:24,337 INFO  [RegionServerTracker-0] master.RegionServerTracker (RegionServerTracker.java:processAsActiveMaster(179)) - RegionServer ephemeral node created, adding [zookeeper-1,16020,1669350559278]
2022-03-15 12:29:25,833 INFO  [master/zookeeper-1:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(805)) - Waiting on regionserver count=3; waited=1755ms, expecting min=1 server(s), max=NO_LIMIT server(s), timeout=4500ms, lastChange=1504ms
2022-03-15 12:29:27,339 INFO  [master/zookeeper-1:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(805)) - Waiting on regionserver count=3; waited=3261ms, expecting min=1 server(s), max=NO_LIMIT server(s), timeout=4500ms, lastChange=3010ms
2022-03-15 12:29:28,594 INFO  [master/zookeeper-1:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(825)) - Finished waiting on RegionServer count=3; waited=4516ms, expected min=1 server(s), max=NO_LIMIT server(s), master is running
2022-03-15 12:29:28,597 WARN  [master/zookeeper-1:16000:becomeActiveMaster] master.HMaster (HMaster.java:isRegionOnline(1344)) - hbase:meta,,1.1588230740 is NOT online; state={1588230740 state=OPEN, ts=1669350563745, server=zookeeper-1,16020,1667491986634}; ServerCrashProcedures=true. Master startup cannot progress, in holding-pattern until region onlined.
2022-03-15 12:29:29,598 WARN  [master/zookeeper-1:16000:becomeActiveMaster] master.HMaster (HMaster.java:isRegionOnline(1344)) - hbase:meta,,1.1588230740 is NOT online; state={1588230740 state=OPEN, ts=1669350563745, server=zookeeper-1,16020,1667491986634}; ServerCrashProcedures=true. Master startup cannot progress, in holding-pattern until region onlined.
2022-03-15 12:29:31,599 WARN  [master/zookeeper-1:16000:becomeActiveMaster] master.HMaster (HMaster.java:isRegionOnline(1344)) - hbase:meta,,1.1588230740 is NOT online; state={1588230740 state=OPEN, ts=1669350563745, server=zookeeper-1,16020,1667491986634}; ServerCrashProcedures=true. Master startup cannot progress, in holding-pattern until region onlined.
...

在HBase Web UI管理器里看region server没有异常:

file

问题解决

(实际上没有得到很好的解决,因为需要把hbase的元数据清除,如果hbase里已经有数据需要考虑其他方法)

网上有些文章说只要删除zookeeper里的/hbase节点即可,但实际测试发现删除并重启hbase后错误依然存在。原因是hdfs里/hbase目录下的数据有错误,重启hbase会从hdfs里恢复到zookeeper。因此需要将hdfs里的/hbase目录也一起清除:

# 停止hbase服务
> stop-hbase.sh

# 进入zookeeper命令行
> zkCli.sh

# 删除/hbase节点
[zk: localhost:2181(CONNECTED) 0] rmr /hbase

# 删除hdfs的/hbase目录
> hdfs dfs -rm -R /hbase

# 启动hbase服务
> start-hbase.sh

# 验证问题解决,建表成功
> hbase shell
hbase:002:0> create 'table1', 'cf1'
2022-03-15 12:59:12,870 INFO  [main] client.HBaseAdmin (HBaseAdmin.java:postOperationResult(3591)) - Operation: CREATE, Table Name: default:table1, procId: 9 completed
Created table table1
Took 1.2251 seconds
=> Hbase::Table - table1

参考链接

https://blog.csdn.net/macfei/article/details/108267999

使用Java nio实现快速合并大量csv文件

需求描述

某项目有一个将小csv文件合并为大csv文件的场景:典型情况每台设备每天产生1440个csv文件(即每分钟一个文件),每个文件大小约100KB,需要将它们合并为一个csv文件。

这样的设备大约有2000台,也就是每天288万个csv文件,按设备合并后应得到2000个csv文件。

问题分析

如果用Apache Commons CSV依次读取小文件再写入大文件,每台设备大约需要20~30秒,2000台设备需要10小时以上,时间太长无法接受。

通过查看样例数据发现,每个设备每天的csv文件表头是完全相同的,跨设备或跨天则不保证表头相同。恰好合并的规则也是按设备按天,因此想到可以利用数据的这个特点优化合并效率。

Java的nio包提供了文件通道(FileChannel)访问文件的方法,允许在两个文件通道间直接传输数据(transferTo),省去了数据在硬件、内核态和用户态之间多次复制的开销(零拷贝)。同时FileChannel允许跳到文件指定位置进行读取,我们可以利用这一点跳过csv的表头区域。

代码实现

以下代码封装了合并csv的逻辑,经测试在普通服务器SAS硬盘下合并1440个文件耗时约1秒,对比之前的20秒提升还是很明显的。

package com.acme;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.channels.FileChannel;

public class MergeCsvDemo {

    final static Logger logger = LoggerFactory.getLogger(MergeCsvDemo.class.getName());

    /**
     * Merge given csv files into one file.
     * @param srcFiles Source csv files, must have same header
     * @param destFile Destination csv file
     * @throws UserException
     */
    public static void mergeCsvFiles(File[] srcFiles, File destFile) throws UserException {

        if (destFile.exists()) {
            throw new UserException("Destination file already exists: " + destFile.getPath());
        }
        if (!destFile.getPath().toUpperCase().contains("CSV")) {
            throw new UserException("Only csv file is allowed: " + destFile.getPath());
        }
        if (srcFiles.length == 0) {
            throw new UserException("Please specify at least one source file");
        }
        if (!destFile.getParentFile().exists()) {
            destFile.getParentFile().mkdirs();
        }

        try {

            // 获取源文件表头长度(假设每个源文件表头相同)
            int headerLength = 0;
            BufferedReader br = new BufferedReader(new FileReader(srcFiles[0]));
            String line = br.readLine();
            if (line == null) {
                throw new UserException("Empty source file: " + srcFiles[0]);
            }
            headerLength = line.length();
            br.close();

            // 合并文件
            FileChannel destChannel = new FileOutputStream(destFile, true).getChannel();
            for (int i = 0; i < srcFiles.length; i++) {
                FileChannel srcChannel = new FileInputStream(srcFiles[i]).getChannel();
                // 非第一个文件时,跳过表头
                if (i > 0) {
                    srcChannel.position(headerLength);
                }
                destChannel.transferFrom(srcChannel, destChannel.size(), srcChannel.size());
                srcChannel.close();
            }
            destChannel.close();

        } catch (IOException e) {
            e.printStackTrace();
            throw new UserException(e.getMessage());
        }

    }
}

示例下载

为了减小压缩后的文件尺寸,示例里的csv文件是经过脱敏、截取和复制处理的,执行测试用例即可合并80个样例csv文件,合并后约370MB,耗时约0.6秒。

merge-csv-demo.zip

HBase单机版和集群版安装配置

HBase的官方文档十分详细,按照quickstart介绍的步骤安装就可以完成,配置集群版时需要稍微留意一下与hadoop相关的几个配置项。本文记录了自己在某项目集成环境安装HBase集群版的过程。

一、环境信息

> uname -a
Linux node1 3.10.0-1160.el7.x86_64 #1 SMP Mon Oct 19 16:18:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

> java --version
openjdk 11.0.15 2022-04-19

> hbase version
HBase 2.5.1

二、单机版

HBase单机版的安装还是十分简单的。

下载HBase

国内建议用清华镜像比较快:

> wget https://mirrors.tuna.tsinghua.edu.cn/apache/hbase/2.5.1/hbase-2.5.1-bin.tar.gz --no-check-certificate

注:下载了几次2.4.15版解压缩时都报unexpected end of file错误,改为下载2.5.1版。

解压缩到指定目录

这里以安装到/usr/local为例:

> tar zxvf hbase-2.5.1-bin.tar.gz -C /usr/local/

启动HBase

启动HBase服务,成功后可以看到HMaster进程和HRegionServer进程,浏览器访问 http://localhost:16010 可以进入HBase Web界面,bin/hbase shell可进入命令行。

> bin/start-hbase.sh
running master, logging to /usr/local/hbase-2.5.1/bin/../logs/hbase-root-master-node1.out
: running regionserver, logging to /usr/local/hbase-2.5.1/bin/../logs/hbase-root-regionserver-node1.out

> jps
55825 HMaster
56062 HRegionServer

> bin/hbase shell
hbase:001:0>

如果提示Could not start ZK at requested port of 2181,修改hbase-site.xml设置hbase.cluster.distributed=true,这样其实HBase的运行模式是伪分布式(而非单机模式),即HMaster与HRegionServer运行在同一物理机的不同jvm进程。

停止HBase

> bin/stop-hbase.sh

停止后用jps命令确认看HMaster和HRegionServer进程是否已结束。

三、集群版

HBase单机版一般无法支撑生产环境的需要,这里介绍如何在单机版基础上配置集群版,这里以三台服务器为例,其中节点1和节点2作为HMaster(主备),节点1、节点2和节点3同时作为RegionServer。

配置ssh免密

主节点需要能够免密登录到其他节点,备份主节点也需要能够免密登录到其他节点。

> ssh-keygen
> ssh-copy-id root@node2
> ssh-copy-id root@node3

修改hbase-site.xml

HBase默认的hbase-site.xml内容是单机版配置,只适合测试和学习使用,现在需要改为集群版的配置。一是修改运行模式为集群模式;二是将HBase的数据目录由本地目录改为HDFS目录,其中host和port应参考core-site.xml里配置的fs.defaultFS的值:

> vi hbase-site.xml
<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>
<property>
  <name>hbase.rootdir</name>
  <value>hdfs://namenode1:8020/hbase</value>
</property>

同时删除单机版需要的hbase.tmp.dirhbase.unsafe.stream.capability.enforce这两个配置项。

此时重新启动HBase后,会自动在hdfs上创建hbase目录:

[root@node1 hbase-2.5.1]# hdfs dfs -ls /hbase
Found 12 items
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/.hbck
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/.tmp
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/MasterData
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/WALs
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/archive
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/corrupt
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/data
-rw-r--r--   3 hdfs supergroup         42 2022-01-05 13:34 /hbase/hbase.id
-rw-r--r--   3 hdfs supergroup          7 2022-01-05 13:34 /hbase/hbase.version
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/mobdir
drwxr-xr-x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/oldWALs
drwx--x--x   - hdfs supergroup          0 2022-01-05 13:34 /hbase/staging

修改regionservers

修改regionservers文件,每行填写一台服务器的名称:

> vi conf/regionservers
node1
node2
node3

创建备份HMaster

生产环境一般需要启动在两个节点上启动HMaster以提供高可用能力。在conf目录下创建backup-masters文件,里面填写备份HMaster服务器的名称:

> vi conf/backup-masters
node2

配置zookeeper

在hbase-site.xml里配置zookeeper:

> vi hbase-site.xml
<property>
  <name>hbase.zookeeper.quorum</name>
  <value>node1,node2,node3</value>
</property>
<property>
  <name>hbase.zookeeper.property.dataDir</name>
  <value>/usr/local/zookeeper</value>
</property>

配置从节点

将整个目录复制到其他从节点,我习惯用rsync比较快,如果没有装rsync也可以用scp命令:

> rsync -avr /usr/local/hbase-2.5.1 root@node2:/usr/local
> rsync -avr /usr/local/hbase-2.5.1 root@node3:/usr/local

启动HBase集群

在主节点上启动集群:

> bin/start-hbase.sh

启动后在各节点上执行jps检查HMaster和HRegionServer进程是否存在,如果启动失败查看日志以便定位原因。

四、常见问题

HMaster启动了,但所有RegionServer都没启动

检查日志文件发现HMaster在等待RegionServer,而RegionServer日志里则没有任何信息。

> tail logs/hbase-root-master-node1.out
2022-01-05 17:36:36,007 INFO  [master/zookeeper-2:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(805)) - Waiting on regionserver count=0; waited=862160ms, expecting min=1 server(s), max=NO_LIMIT server(s), timeout=4500ms, lastChange=862160ms
2022-01-05 17:36:37,512 INFO  [master/zookeeper-2:16000:becomeActiveMaster] master.ServerManager (ServerManager.java:waitForRegionServers(805)) - Waiting on regionserver count=0; waited=863665ms, expecting min=1 server(s), max=NO_LIMIT server(s), timeout=4500ms, lastChange=863665ms

向前翻日志发现有警告JAVA_HOME没有配置(但在命令行里echo $JAVA_HOME其实有值),在conf/hbase-env.sh里再指定一次并重启HBase问题解决。

> vi conf/hbase-env.sh
# The java implementation to use.  Java 1.8+ required.
export JAVA_HOME=/home/k2data/jdk-11

RegionServer报错ClockOutOfSyncException

如果主节点与从节点的时钟不同步,RegionServer会报错退出,可以手动更新时间错误的节点。生产环境建议ntpdate或chrony保持时钟同步。

停止HBase服务时提示"no hbase master found"

原因是记录hbase各个服务的pid文件默认放在/tmp目录下,由于某些原因文件丢失了。可以手工在每个节点jps查看hbase服务(H开头的)的进程id,然后kill掉。

SLF4J提示找到多个实现

启动hbase或在hbase shell里执行命令时总是出现如下的slf4j警告提示:

[root@taos-1 target]# start-hbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hdfs/hadoop-3.2.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hbase-2.5.1/lib/client-facing-thirdparty/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.

一般是与hadoop自带的log4j2版本不一致导致的,按提示中的路径删除旧版的log4j2包即可

五、参考链接

https://hbase.apache.org/book.html#quickstart

Java实现HDFS和Yarn的客户端

Java实现HDFS和Yarn的客户端的方法十分类似,因此放在一起记录。使用配置好的客户端在现场实施比较方便,而且能投同时兼容HA和非HA的HDFS和Yarn集群。

添加依赖项

pom.xml里添加下面的依赖项:

<dependencies>
    <!-- Hadoop Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.3</version>
    </dependency>

    <!-- Yarn Client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-client</artifactId>
        <version>3.2.3</version>
    </dependency>
</dependencies>

定义配置项

客户端要对接到指定的Hadoop集群,最方便的方式是从集群先获取到core-site.xmlhdfs-site.xmlyarn-site.xml这三个配置文件(可联系集群管理员获取),前两者是HDFS客户端需要,后者是Yarn客户端需要。

然后将这些文件放在同一目录下,然后我们定义一个环境变量指向此目录,在客户端代码里用addResource()方法加载这三个配置文件。Spark使用的是HADOOP_CONF_DIR这个环境变量,为符合习惯我们也用这个环境变量。

HDFS客户端

实现HDFS客户端的代码很简单,下面的代码从HDFS下载指定的文件到本地同名文件:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.FileOutputStream;
import java.io.OutputStream;

public class HdfsClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
        conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
        FileSystem fs = FileSystem.get(conf);

        // Download file from HDFS to local
        FSDataInputStream in = fs.open(new Path("/tmp/10min_top.csv"));
        OutputStream out = new FileOutputStream("c:/temp/10min_top.csv");
        IOUtils.copyBytes(in, out, 4096, true);
    }
}

Yarn客户端

实现Yarn客户端的代码很简单,下面的代码连接到Yarn并输出所有application的基本信息:

package com.acme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.List;

public class YarnClientDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new YarnConfiguration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            System.err.println("HADOOP_CONF_DIR not set");
            return;
        }
        conf.addResource(new Path(hadoopConfDir + "/yarn-site.xml"));

        System.out.println("yarn.resourcemanager.hostname: " + conf.get("yarn.resourcemanager.hostname"));

        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // List all applications from yarn
        List<ApplicationReport> applications = yarnClient.getApplications();
        for (ApplicationReport app : applications) {
            System.out.println(app.getApplicationId().getId() + ", " + app.getName() + ", " + app.getFinalApplicationStatus().name());
        }
        yarnClient.stop();
        yarnClient.close();
    }
}

上面代码执行结果示例:

yarn.resourcemanager.hostname: namenode1
16, k2a_job_1420, FAILED
17, k2a_job_1420, FAILED
14, k2a_job_1414, FAILED
15, k2a_job_1418, SUCCEEDED
12, k2a_job_1256, SUCCEEDED
13, k2a_job_1417, FAILED
10, k2a_job_1254, SUCCEEDED

代码下载

hdfs-client-demo.zip

hadoop-conf-files.zip

定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?

使用Apache Drill处理数据文件

本文针对drill版本1.8。

安装Drill

官网下载tar包并解压即可,linux和windows都是如此。
注意:drill要求java版本是8或以上。

命令行使用Drill

最简单的方式是用embedded模式启动drill(启动后可以在浏览器里访问 http://localhost:8047 来管理drill):

bin/drill-embedded

这样就以嵌入模式启动了drill服务并同时进入了内置的sqline命令行。如果drill服务已经启动,则可以这样进入sqline命令行(参考):

bin/sqline -u jdbc:drill:drillbit=localhost

作为例子,用SQL语法查询一下drill自带的数据(命令行里的cp表示classpath,注意查询语句最后要加分号结尾):

apache drill> SELECT * FROM cp.`employee.json` LIMIT 3;

查询任意数据文件的内容:

apache drill> SELECT * FROM dfs.`/home/hao/apache-drill-1.16.0/sample-data/region.parquet`;

退出命令行用!quit

配置和查看Drill参数

如果要永久性修改参数值,需要修改$DRILL_HOME/conf/drill-override.conf文件(见文档);SET、RESET命令可以在当前session里修改参数值(文档)。

配置参数:

SET `store.parquet.block-size` = 536870912

重置参数为缺省值:

RESET `store.parquet.block-size`

查看参数:

select * from sys.options where name = 'store.parquet.block-size'

在java代码里使用Drill

下面是在java代码里使用Drill的例子代码,要注意的一点是,JDBC的URL是jdbc:drill:drillbit=localhost,而不是很多教程上说的jdbc:drill:zk=localhost

package com.acme;

import java.sql.*;

public class DrillJDBCExample {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    //static final String DB_URL = "jdbc:drill:zk=localhost:2181";
    static final String DB_URL = "jdbc:drill:drillbit=localhost"; //for embedded mode installation

    static final String USER = "admin";
    static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL,USER,PASS);

            stmt = conn.createStatement();
            /* Perform a select on data in the classpath storage plugin. */
            String sql = "select employee_id,first_name,last_name from cp.`employee.json`";
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
                int id  = rs.getInt("employee_id");
                String first = rs.getString("first_name");
                String last = rs.getString("last_name");

                System.out.print("ID: " + id);
                System.out.print(", First: " + first);
                System.out.println(", Last: " + last);
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

让Drill访问数据库

根据要访问的数据库的不同,需要为Drill添加相应的驱动,方法见RDBMS Storage Plugin

利用Drill将csv格式转换为parquet格式

原理是在drill里创建一张格式为parquet的表,该表的路径(下例中的/parquet1)对应的是磁盘上的一个目录。

ALTER SESSION SET `store.format`='parquet';
ALTER SESSION SET `store.parquet.compression` = 'snappy';

CREATE TABLE dfs.tmp.`/parquet1` AS 
SELECT * FROM dfs.`/my/csv/file.csv`;

让drill支持.zip、.arc压缩格式

(暂缺)

参考资料

Drill in 10 Minutes
How to convert a csv file to parquet