PySpark使用HBase Spark Connector访问Hbase数据

Spark从1.2版开始提供了被称为Spark SQL Data Sources API的扩展机制,允许用户将任意形式的数据对接到Spark作为数据源,在Spark里可以用统一的方式访问这些数据。

All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions. Please refer to the Spark paper for more details on RDD internals.

HBase Spark Connector就是HBase官方提供的这样一个数据源的实现,相当于Spark与HBase之间的桥梁(见下图)。本文介绍如何安装、配置和在Spark中使用此连接器。为节约篇幅,本文中将HBase Spark Connector简称为hsc

file

注意,在github上还有一个名为Spark Hbase Connector的项目,简称shc,是Hortonworks提供的,此项目最后commit停在2020年1月,而hsc截至2022年12月仍有新的代码在提交。因此更推荐使用hsc而不是shc。

环境信息

HBase Spark PySpark Hadoop Maven 操作系统
2.5.1 集群版 3.1.3 3.1.3 3.2.3 3.8.2 CentOS 7.9

获取hsc

首先从github下载项目代码,按项目README.md里给出的mvn命令按指定版本编译:

> git clone https://github.com/apache/hbase-connectors.git
> cd hbase-connectors/spark/hbase-spark
> mvn -Dspark.version=3.1.3 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.3 -Dscala.binary.version=2.12 -Dhbase.version=2.5.1 clean install

编译成功后将得到三个jar文件(hbase-spark.jar、hbase-spark-protocol-shaded.jar和scala-library.jar),按本文环境编译的jar包可以点击下载,如果你的环境版本正好相同可直接使用。

配置hsc

仍然是按项目README.md里的说明配置即可。

客户端配置:
将hbase-site.xml复制到 $SPARK_CONF_DIR 目录下。

服务端配置:
将前面编译得到的三个jar文件复制到hbase region-server安装目录下的lib目录下:

[root@taos-2 lib]# ll /usr/local/hbase-2.5.1/lib
-rw-r--r-- 1 root root   508987 May 30 17:09 hbase-spark-1.0.1-SNAPSHOT.jar
-rw-r--r-- 1 root root    40374 May 30 17:09 hbase-spark-protocol-shaded-1.0.1-SNAPSHOT.jar
-rw-r--r-- 1 root root  5276900 May 30 16:58 scala-library-2.12.10.jar

在PySpark里读取HBase数据

我们直接在python命令行里进行验证(python环境需要已经安装与spark版本相同的pyspark)。

> python3

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import SQLContext
>>> import pandas as pd

>>> spark = SparkSession.builder.getOrCreate()
>>> sqlContext = SQLContext(spark.sparkContext)

>>> df = sqlContext.read.format( 'org.apache.hadoop.hbase.spark')\
    .option('hbase.table','table1')\
    .option('hbase.columns.mapping','rowKey String :key, col1 String cf1:col1, col2 String cf2:col2')\
    .option("hbase.spark.use.hbasecontext", False)\
    .load()
>>> df.show(2,False)
+----------------------+--------------------------+-----------------------------------------------+
|col1                  |col2                      |rowKey                                         |
+----------------------+--------------------------+-----------------------------------------------+
|{"dq":65012,"cq":3189}|{"stac":1,"ndc":3,"mdc":0}|005e6c9b-d843-4e7c-9fa0-77fe894e42f7_1662393625|
|{"dq":65012,"cq":3189}|{"stac":1,"ndc":3,"mdc":0}|005e6c9b-d843-4e7c-9fa0-77fe894e42f7_1662393686|
+----------------------+--------------------------+-----------------------------------------------+
only showing top 2 rows

至此通过hsc读取hbase数据已经验证通过。上述python命令也可以保存为.py文件,使用spark-submit命令提交。

> spark-submit --name xxx --master yarn --deploy-mode client  spark_hbase_example.py

关于Partition

从hsc代码的HBaseTableScanRDD.scala的实现可以看出,hsc对DataFrame的partition划分是根据hbase region的数量来的,因此如果region数量不合理,就可能需要对DataFrame进行repartition处理从而带来额外的shuffle开销。

@InterfaceAudience.Private
class HBaseTableScanRDD(relation: HBaseRelation,
                       val hbaseContext: HBaseContext,
                       @transient val filter: Option[SparkSQLPushDownFilter] = None,
                        val columns: Seq[Field] = Seq.empty
     ) extends RDD[Result](relation.sqlContext.sparkContext, Nil){

  override def getPartitions: Array[Partition] = {
    val regions = RegionResource(relation)
    logDebug(s"There are ${regions.size} regions")
    val ps = regions.flatMap { x =>
     ...
    }.toArray
    ...
    ps.asInstanceOf[Array[Partition]]
  }
  ...
 }

常见问题

在python里获取SparkSession时报错 PythonUtils.getEncryptionEnabled does not exist in the JVM:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  ...
    self._encryption_enabled = self._jvm.PythonUtils.getEncryptionEnabled(self._jsc)
  File "/disk1/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1487, in __getattr__
    "{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM

原因是pyspark版本与spark版本不匹配,用下面命令可以查看pyspark当前版本,如果过低需要卸载并安装新版。

> pip show pyspark
Name: pyspark
Version: 2.3.0

> pip install pip -U -i https://pypi.tuna.tsinghua.edu.cn/simple
> pip uninstall pyspark
> pip install pyspark==3.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

完整的pyspark版本历史可以在这里查看:https://pypi.org/project/pyspark/#history

启用 SQL Filter pushdown时报错

当使用.option("hbase.spark.pushdown.columnfilter", true)时报错,其实true是默认值,报错信息如下:

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder$

解决方法是,需要将前面提到的scala-library等三个jar包放在每个hbase region server节点的lib目录下。参考链接

参考资料

https://spark.apache.org/docs/latest/hadoop-provided.html
https://hbase.apache.org/book.html#_sparksqldataframes
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html
https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_HBase_Connector.md

在ClouderaManager上安装/更新Spark2

在Cloudera 5.9上安装/更新Spark 2.1.0。

1、下载CSD文件 http://archive.cloudera.com/spark2/csd/,将csd文件(.jar)放在/opt/cloudera/csd目录,并执行命令将文件拥有者修改成cloudera-scm

chown cloudera-scm:cloudera-scm /opt/cloudera/csd/SPARK2_ON_YARN- 2.1.0.cloudera1.jar

重启 cloudera-scm-server服务:

service cloudera-scm-server restart

然后在CM页面选择Cluster -> Cloudera Managerment Service -> 操作 -> 重启。

2、下载Parcel文件和对应的.sha1文件 http://archive.cloudera.com/spark2/parcels,将下载得到的parcel文件和.sha1文件复制到/opt/cloudera/parcel-repo目录下,并把.sha1文件改名为.sha(否则可能无法识别

mv SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-trusty.parcel.sha1 SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-trusty.parcel.sha

3、点击CM界面右上角的parcel按钮,再点击右上角的检查新Parcel按钮

cm-parcels

4、选择对应的parcel文件分配和激活(如果是新安装Spark2,在CM页面里选择添加服务 -> Spark 2.0)

5、按提示重启相关服务

cm-activate-parcel

Spark安装和使用

安装Spark

略,见参考资料。

用docker安装spark

docker hub上有不少spark镜像,例如p7hb/docker-spark,可以快速安装好。

docker pull p7hb/docker-spark
docker run -it -p 4040:4040 -p 8080:8080 -p 8081:8081 -h spark --name=spark p7hb/docker-spark:2.2.0

进入Spark-shell

$ spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/04/18 13:08:38 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://10.1.235.9:4040
Spark context available as 'sc' (master = yarn, app id = application_1491024547163_1752).
Spark session available as 'spark'.
Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 2.0.0.cloudera1
 /_/
 
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

在spark-shell里输入代码时,可以按tab键得到补全提示,很方便。

使用第三方jar包

主要通过下面这两个参数指定,注意两个参数中多个jar之间的分隔符是不一样的。

  • --jars driver和executor都需要的包,多个包之间用逗号(,)分割
  • --driver-class-path driver所依赖的包,多个包之间用冒号(:)分割

注:有一说是--jars里包含的包不需要在--driver-class-path里再次指定,但在spark2.0.0里发现仍然需要在--driver-class-path里指定。

使用java类/方法

scala> import java.lang.Double.isNaN
import java.lang.Double.isNaN

scala> isNaN(1) 
res57: Boolean = false

或直接使用全限定名:

scala> java.lang.Double.isNaN(1)
res58: Boolean = false

加载外部scala文件

事先写好一个test1.scala文件,然后在spark-shell里:

scala> :load test1.scala

注意load前面带一个冒号(:)

参考资料:

Spark On YARN 集群安装部署 (不错的安装教程,spark 1.3+hadoop 2.6)

Apache Spark技术实战之6 -- spark-submit常见问题及其解决