在ClouderaManager上安装/更新Spark2

在Cloudera 5.9上安装/更新Spark 2.1.0。

1、下载CSD文件 http://archive.cloudera.com/spark2/csd/,将csd文件(.jar)放在/opt/cloudera/csd目录,并执行命令将文件拥有者修改成cloudera-scm:

chown cloudera-scm:cloudera-scm /opt/cloudera/csd/SPARK2_ON_YARN- 2.1.0.cloudera1.jar

重启 cloudera-scm-server服务:

service cloudera-scm-server restart

然后在CM页面选择Cluster -> Cloudera Managerment Service -> 操作 -> 重启。

2、下载Parcel文件和对应的.sha1文件 http://archive.cloudera.com/spark2/parcels,将下载得到的parcel文件和.sha1文件复制到/opt/cloudera/parcel-repo目录下,并把.sha1文件改名为.sha(否则可能无法识别)

mv SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-trusty.parcel.sha1 SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-trusty.parcel.sha

3、点击CM界面右上角的parcel按钮,再点击右上角的检查新Parcel按钮

cm-parcels

4、选择对应的parcel文件分配和激活(如果是新安装Spark2,在CM页面里选择添加服务 -> Spark 2.0)

5、按提示重启相关服务

cm-activate-parcel

Spark安装和使用

安装Spark

略,见参考资料。

用docker安装spark

docker hub上有不少spark镜像,例如p7hb/docker-spark,可以快速安装好。

docker pull p7hb/docker-spark
docker run -it -p 4040:4040 -p 8080:8080 -p 8081:8081 -h spark --name=spark p7hb/docker-spark:2.2.0

进入Spark-shell

$ spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/04/18 13:08:38 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://10.1.235.9:4040
Spark context available as 'sc' (master = yarn, app id = application_1491024547163_1752).
Spark session available as 'spark'.
Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 2.0.0.cloudera1
 /_/
 
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

在spark-shell里输入代码时,可以按tab键得到补全提示,很方便。

使用第三方jar包

主要通过下面这两个参数指定,注意两个参数中多个jar之间的分隔符是不一样的。

  • --jars driver和executor都需要的包,多个包之间用逗号(,)分割
  • --driver-class-path driver所依赖的包,多个包之间用冒号(:)分割

注:有一说是--jars里包含的包不需要在--driver-class-path里再次指定,但在spark2.0.0里发现仍然需要在--driver-class-path里指定。

使用java类/方法

scala> import java.lang.Double.isNaN
import java.lang.Double.isNaN

scala> isNaN(1) 
res57: Boolean = false

或直接使用全限定名:

scala> java.lang.Double.isNaN(1)
res58: Boolean = false

加载外部scala文件

事先写好一个test1.scala文件,然后在spark-shell里:

scala> :load test1.scala

注意load前面带一个冒号(:)

参考资料:

Spark On YARN 集群安装部署 (不错的安装教程,spark 1.3+hadoop 2.6)

Apache Spark技术实战之6 -- spark-submit常见问题及其解决

关于SSD磁盘的写入放大

SSD的写入性能与剩余空间有(很大)关系,原理是向SSD写入数据时不能直接覆盖,而是需要先把整个块(如512KB)的内容读出来(备份,因为这个块上还有其他不需要删除的数据),擦除这个块,再把需要写入的数据连通刚才备份出来的数据合并到一起写回去。

由于上面的操作,物理写的数据量(如512KB+512KB)通常大于逻辑上的数据量(如4KB),这个放大的倍数被称为写入放大倍数(WA,Write Amplification)

ssd_wa

一个日常感受到的例子,在一个将要满的磁盘上,删除30000个小文件,发现删除速度越来越快:

ssd_del_1

ssd_del_2

ssd_del_3

参考资料:

关于Kafka的ISR

ISR代表In-Sync Replicas,在Kafka里表示目前处于同步状态的那些副本(replica)。

Kafka规定一条消息只有当ISR中所有的副本都复制成功时,才能被消费。

例如下图中的情况,id为1的节点处于失效状态,相应的可以看到有些partition(例如partition1、partition2)的Isr只有一个,则这些partition里有些message在节点1里还没有复制成功,因此不能被消费。   -- 这段有问题

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set,But the messages are still available for consumption even though the leader that took the writes originally is down:

min.insync.replicas

kafka-isr-2

参考资料:

Hadoop安装配置和使用问题记录

关于Hadoop如何配置的教程网上已经有不少了,配合hadoop主页上的说明,基本可以顺利在多台机器上配置好hadoop集群。这里记录的是我在实际配置和使用hadoop时遇到的问题,其中一些属于hadoop周边,如ssh配置时容易遇到的问题和解决方法,供大家参考。我目前是在windows xp里通过cygwin安装的hadoop,版本是0.17.2.1。

Cygwin部分

1、安装cygwin。

cygwin.com下载网络安装包,在选择组件的时候建议直接把openssh组件选中,有些版本的cygwin似乎不会自动安装diffutils组件,所以需要手工选中一下,否则配置ssh时会提示缺少该组件包。

2、cygwin控制台里的缺省提示符不太习惯

export PS1="u@w$"设一下好多了。

3、配置sshd

比较简单,可以参考这个链接。在ssh-host-config时问到Should privilege separation be used? (yes/no)时回答no即可,如果由于安全因素要回答yes时,在启动sshd时有可能遇到Privilege separation user sshd does not exist的错误信息,解决方法可参考这个链接

4、配置ssh使用证书方式自动验证

这里花了一些时间,后来觉得可能是由于机器上有两个版本的cygwin的缘故,因为在另一台机器上安装就没有出现问题。在使用ssh-keygen -t rsa命令后提示输入密码时,直接按两次回车即可。公钥的复制等过程这里不再赘述。

Hadoop部分

5、在Eclipse里运行hadoop的WordCount程序时提示异常:javax.security.auth.login.LoginException: Login failed: CreateProcess: whoami error=2

解决方法是把c:\cygwin\bin加入系统的path环境变量,然后重启Eclipse以便改动生效。

6、在Eclipse里运行WordCount时遇到java heap size不够的异常

在运行配置里加入-Xms200M就可以解决。(hadoop的helloworld要求的内存比较多?)

7、当要运行的运算依赖第三方类库时的问题

这个链接有所讨论,但暂时没看到除了在命令行里使用hadoop jar命令以外的解决方法,比如在0.17.2和0.18.1版本里我都没有看到JobConf类里有类似addJar()这样的方法,在JobConf#setJar()里使用逗号分隔多个jar文件的方式则会报找不到文件的错误。解决方式可能有两个:

  • a) 把所需要的第三方jar文件复制到每个节点机器的jre里(暂时没有试验)

  • b) 把第三方jar包和自己的类打到一个包里。

update: 在网上找到另一种方式,通过DistributedCache实现,原文里可能有笔误,我试验正确的方法是 调用DistributedCache.addArchiveToClassPath()方法,注意其第一个参数必须是相对路径,如/test/lib /my.jar,而不能是像hdfs://192.168.0.5:47110/test/lib/my.jar这样的绝对路径。关于DistributedCache的说明在里有一些。

8、调试mapreduce程序的方式

这个链接里讲得比较清楚了,因为很有用所以特意重复一次。如果文件存放在HDFS里,那么只需要调用JobConf#.set("mapred.job.tracker", "local")即可;如果文件也是存在本地的,还需要调用JobConf#set("fs.default.name", "local")方法。我通常让文件存在HDFS里调试,因为要使用本地文件要么参数需要改变,要么代码需要改变,维护两个环境很麻烦。在程序里用System.out.println()输出的内容可以在datanode的hadoop安装路径的logs/userlogs目录里找到。

9、使用自定义InputFormat时获取xmi:id

使用EMF模型元素作为key的时候,需要注意并不是在代码的任何地方都能得到xmi:id的值的。具体来说,在WritableComparable#write()方法里能得到(前提是该对象本来就有resource,即eobj.eResource()!=null),而在WritableComparable#readFields()里是不能得到的,在RecordWriter#write()方法里同样不能得到,因为后两者的EMF元素对象都是反序列化得到的,已经不是内存里原来的那个实例了。

10、map进行到100%后,reduce过程进行到某个数值(如16%)后就不再继续,直到被hadoop强制关闭。

在tasknode的log里记录如下:

2008-11-20 11:17:06,455 INFO org.apache.hadoop.mapred.TaskTracker: task_200811191041_0015_r_000000_0 0.16666667% reduce > copy (6 of 12 at 0.00 MB/s) > 
2008-11-20 11:17:09,455 INFO org.apache.hadoop.mapred.TaskTracker: task_200811191041_0015_r_000000_0 0.16666667% reduce > copy (6 of 12 at 0.00 MB/s) > 
2008-11-20 11:17:15,455 INFO org.apache.hadoop.mapred.TaskTracker: task_200811191041_0015_r_000000_0 0.16666667% reduce > copy (6 of 12 at 0.00 MB/s) > 
2008-11-20 11:17:18,705 FATAL org.apache.hadoop.mapred.TaskTracker: Task: task_200811191041_0015_r_000000_0 - Killed due to Shuffle Failure: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out. 2008-11-20 11:17:18,705 INFO org.apache.hadoop.mapred.TaskTracker: About to purge task: task_200811191041_0015_r_000000_0 2008-11-20 11:17:18,705 INFO org.apache.hadoop.mapred.TaskRunner: task_200811191041_0015_r_000000_0 done; removing files. 2008-11-20 11:17:18,705 WARN org.apache.hadoop.mapred.TaskTracker: Unknown child task finshed: task_200811191041_0015_r_000000_0. Ignored. 2008-11-20 11:17:40,845 INFO org.apache.hadoop.mapred.TaskTracker: Received 'KillJobAction' for job: job_200811191041_0015 2008-11-20 11:17:40,845 INFO org.apache.hadoop.mapred.TaskRunner: task_200811191041_0015_m_000011_0 done; removing files. 2008-11-20 11:17:40,845 INFO org.apache.hadoop.mapred.TaskRunner: task_200811191041_0015_m_000005_0 done; removing files. 

在我的java application的控制台里的输入如下:

08/11/20 11:06:39 INFO mapred.JobClient:  map 96% reduce 11%
08/11/20 11:06:40 INFO mapred.JobClient:  map 100% reduce 11%
08/11/20 11:06:43 INFO mapred.JobClient:  map 100% reduce 13%
08/11/20 11:06:47 INFO mapred.JobClient:  map 100% reduce 16% (在这里停很久) 08/11/20 11:17:12 INFO mapred.JobClient:  map 100% reduce 0%
08/11/20 11:17:12 INFO mapred.JobClient: Task Id : task_200811191041_0015_r_000000_0, Status : FAILED
Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out. 08/11/20 11:17:14 WARN mapred.JobClient: Error reading task outputnode2 08/11/20 11:17:14 WARN mapred.JobClient: Error reading task outputnode2 08/11/20 11:17:25 INFO mapred.JobClient:  map 100% reduce 16%
08/11/20 11:17:30 INFO mapred.JobClient:  map 100% reduce 25%
08/11/20 11:17:31 INFO mapred.JobClient:  map 100% reduce 100%
08/11/20 11:17:32 INFO mapred.JobClient: Job complete: job_200811191041_0015

我想找到这个问题的所在了。是secondary name node所在的机器没有配置dfs.http.address这个参数,该参数在hadoop-default.xml里的缺省值是0.0.0.0:50070,应改为name node所在机器的ip地址。参考链接

11、一些参考链接。

http://hayesdavis.net/2008/06/14/running-hadoop-on-windows/
http://hi.baidu.com/shirdrn/blog/category/Hadoop
http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/index.html
http://blog.ring.idv.tw/comment.ser?i=231

搬家前链接:https://www.cnblogs.com/bjzhanghao/archive/2008/11/02/1325113.html