玖叶教程网

前端编程开发入门

Hadoop高可用集群搭建及API调用(hadoop3高可用)

NameNode HA 背景

在Hadoop1中NameNode存在一个单点故障问题,如果NameNode所在的机器发生故障,整个集群就将不可用(Hadoop1中虽然有个SecorndaryNameNode,但是它并不是NameNode的备份,它只是NameNode的一个助理,协助NameNode工作,SecorndaryNameNode会对fsimage和edits文件进行合并,并推送给NameNode,防止因edits文件过大,导致NameNode重启变慢),这是Hadoop1的不可靠实现。

在Hadoop2中这个问题得以解决,Hadoop2中的高可靠性是指同时启动NameNode,其中一个处于active工作状态,另外一个处于随时待命standby状态。这样,当一个NameNode所在的服务器宕机时,可以在数据不丢失的情况下,手工或者自动切换到另一个NameNode提供服务。这些NameNode之间通过共享数据,保证数据的状态一致。多个NameNode之间共享数据,可以通过Network File System或者Quorum Journal Node。前者是通过Linux共享的文件系统,属于操作系统的配置,后者是Hadoop自身的东西,属于软件的配置。

注意:

1) NameNode HA 与HDFS Federation都有多个NameNode,当NameNode作用不同,在HDFS Federation联邦机制中多个NameNode解决了内存受限问题,而在NameNode HA中多个NameNode解决了NameNode单点故障问题。
2) 在Hadoop2.x版本中,NameNode HA 支持2个节点,在Hadoop3.x版本中,NameNode高可用可以支持多台节点。

NameNode HA实现原理

NameNode中存储了HDFS中所有元数据信息(包括用户操作元数据和block元数据),在NameNode HA中,当Active NameNode(ANN)挂掉后,StandbyNameNode(SNN)要及时顶上,这就需要将所有的元数据同步到SNN节点。如向HDFS中写入一个文件时,如果元数据同步写入ANN和SNN,那么当SNN挂掉势必会影响ANN,所以元数据需要异步写入ANN和SNN中。如果某时刻ANN刚好挂掉,但却没有及时将元数据异步写入到SNN也会引起数据丢失,所以向SNN同步元数据需要引入第三方存储,在HA方案中叫做“共享存储”。每次向HDFS中写入文件时,需要将edits log同步写入共享存储,这个步骤成功才能认定写文件成功,然后SNN定期从共享存储中同步editslog,以便拥有完整元数据便于ANN挂掉后进行主备切换。

HDFS将Cloudera公司实现的QJM(Quorum Journal Manager)方案作为默认的共享存储实现。在QJM方案中注意如下几点:

* 基于QJM的共享存储系统主要用于保存Editslog,并不保存FSImage文件,FSImage文件还是在NameNode本地磁盘中。
* QJM共享存储采用多个称为JournalNode的节点组成的JournalNode集群来存储EditsLog。每个JournalNode保存同样的EditsLog副本。
* 每次NameNode写EditsLog时,除了向本地磁盘写入EditsLog外,也会并行的向JournalNode集群中每个JournalNode发送写请求,只要大多数的JournalNode节点返回成功就认为向JournalNode集群中写入EditsLog成功。
* 如果有2N+1台JournalNode,那么根据大多数的原则,最多可以容忍有N台JournalNode节点挂掉。

NameNode HA 实现原理图如下:



当客户端操作HDFS集群时,Active NameNode 首先把 EditLog 提交到 JournalNode 集群,然后 Standby NameNode 再从 JournalNode 集群定时同步 EditLog。当处 于 Standby 状态的 NameNode 转换为 Active 状态的时候,有可能上一个 Active NameNode 发生了异常退出,那么 JournalNode 集群中各个 JournalNode 上的 EditLog 就可能会处于不一致的状态,所以首先要做的事情就是让 JournalNode 集群中各个节点上的 EditLog 恢复为一致,然后Standby NameNode会从JournalNode集群中同步EditsLog,然后对外提供服务。

注意:在NameNode HA中不再需要SecondaryNameNode角色,该角色被StandbyNameNode替代。

通过Journal Node实现NameNode HA时,可以手动将Standby NameNode切换成Active NameNode,也可以通过自动方式实现NameNode切换。

上图需要手动进行切换StandbyNamenode为Active NameNode,对于高可用场景时效性较低,那么可以通过zookeeper进行协调自动实现NameNode HA,实现代码通过Zookeeper来检测Activate NameNode节点是否挂掉,如果挂掉立即将Standby NameNode切换成Active NameNode,这种方式也是生产环境中常用情况。其原理如下:



上图中引入了zookeeper作为分布式协调器来完成NameNode自动选主,以上各个角色解释如下:

* AcitveNameNode:主 NameNode,只有主NameNode才能对外提供读写服务。
* Secondby NameNode:备用NameNode,定时同步Journal集群中的editslog元数据。
* ZKFailoverController:ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换。
* Zookeeper集群:分布式协调器,NameNode选主使用。
* Journal集群:Journal集群作为共享存储系统保存HDFS运行过程中的元数据,ANN和SNN通过Journal集群实现元数据同步。
* DataNode节点:除了通过共享存储系统共享 HDFS 的元数据信息之外,主 NameNode 和备 NameNode 还需要共享 HDFS 的数据块和 DataNode 之间的映射关系。DataNode 会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。

NameNode主备切换流程

NameNode 主备切换主要由 ZKFailoverController、HealthMonitor 和 ActiveStandbyElector 这 3 个组件来协同实现:

* ZKFailoverController 作为 NameNode 机器上一个独立的进程启动 (在 hdfs 集群中进程名为 zkfc),启动的时候会创建 HealthMonitor 和 ActiveStandbyElector 这两个主要的内部组件,ZKFailoverController 在创建 HealthMonitor 和 ActiveStandbyElector 的同时,也会向 HealthMonitor 和 ActiveStandbyElector 注册相应的回调方法。
* HealthMonitor 主要负责检测 NameNode 的健康状态,如果检测到 NameNode 的状态发生变化,会回调 ZKFailoverController 的相应方法进行自动的主备选举。
* ActiveStandbyElector 主要负责完成自动的主备选举,内部封装了 Zookeeper 的处理逻辑,一旦 Zookeeper 主备选举完成,会回调 ZKFailoverController 的相应方法来进行 NameNode 的主备状态切换。

NameNode主备切换流程如下:

1) HealthMonitor 初始化完成之后会启动内部的线程来定时调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法,对 NameNode 的健康状态进行检测。
2) HealthMonitor 如果检测到 NameNode 的健康状态发生变化,会回调 ZKFailoverController 注册的相应方法进行处理。
3) 如果 ZKFailoverController 判断需要进行主备切换,会首先使用 ActiveStandbyElector 来进行自动的主备选举。
4) ActiveStandbyElector 与 Zookeeper 进行交互完成自动的主备选举。
5) ActiveStandbyElector 在主备选举完成后,会回调 ZKFailoverController 的相应方法来通知当前的 NameNode 成为主 NameNode 或备 NameNode。
6) ZKFailoverController 调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法将 NameNode 转换为 Active 状态或 Standby 状态。

脑裂问题

当网络抖动时,ZKFC检测不到Active NameNode,此时认为NameNode挂掉了,因此将Standby NameNode切换成Active NameNode,而旧的Active NameNode由于网络抖动,接收不到zkfc的切换命令,此时两个NameNode都是Active状态,这就是脑裂问题。那么HDFS HA中如何防止脑裂问题的呢?

HDFS集群初始启动时,Namenode的主备选举是通过 ActiveStandbyElector 来完成的,ActiveStandbyElector 主要是利用了 Zookeeper 的写一致性和临时节点机制,具体的主备选举实现如下:

**1. 创建锁节点**

如果 HealthMonitor 检测到对应的 NameNode 的状态正常,那么表示这个 NameNode 有资格参加 Zookeeper 的主备选举。

如果目前还没有进行过主备选举的话,那么相应的 ActiveStandbyElector 就会发起一次主备选举,尝试在 Zookeeper 上创建一个路径为/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock 的临时节点 (\${dfs.nameservices} 为 Hadoop 的配置参数 dfs.nameservices 的值,下同),Zookeeper 的写一致性会保证最终只会有一个 ActiveStandbyElector 创建成功,那么创建成功的 ActiveStandbyElector 对应的 NameNode 就会成为主 NameNode,ActiveStandbyElector 会回调 ZKFailoverController 的方法进一步将对应的 NameNode 切换为 Active 状态。而创建失败的 ActiveStandbyElector 对应的NameNode成为备用NameNode,ActiveStandbyElector 会回调 ZKFailoverController 的方法进一步将对应的 NameNode 切换为 Standby 状态。

**2. 注册 Watcher 监听**

不管创建/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock 节点是否成功,ActiveStandbyElector 随后都会向 Zookeeper 注册一个 Watcher 来监听这个节点的状态变化事件,ActiveStandbyElector 主要关注这个节点的 NodeDeleted 事件。

**3. 自动触发主备选举**

如果 Active NameNode 对应的 HealthMonitor 检测到 NameNode 的状态异常时, ZKFailoverController 会主动删除当前在 Zookeeper 上建立的临时节点/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock,这样处于 Standby 状态的 NameNode 的 ActiveStandbyElector 注册的监听器就会收到这个节点的 NodeDeleted 事件。收到这个事件之后,会马上再次进入到创建/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock 节点的流程,如果创建成功,这个本来处于 Standby 状态的 NameNode 就选举为主 NameNode 并随后开始切换为 Active 状态。

当然,如果是 Active 状态的 NameNode 所在的机器整个宕掉的话,那么根据 Zookeeper 的临时节点特性,/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock 节点会自动被删除,从而也会自动进行一次主备切换。

以上过程中,**Standby NameNode成功创建 Zookeeper 节点/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock 成为Active NameNode之后,还会创建另外一个路径为/hadoop-ha/\${dfs.nameservices}/ActiveBreadCrumb 的持久节点,这个节点里面保存了这个 Active NameNode 的地址信息**。

Active NameNode 的ActiveStandbyElector 在正常的状态下关闭 Zookeeper Session 的时候 (注意由于/hadoop-ha/\${dfs.nameservices}/ActiveStandbyElectorLock 是临时节点,也会随之删除)会一起删除节点/hadoop-ha/\${dfs.nameservices}/ActiveBreadCrumb。但是如果 ActiveStandbyElector 在异常的状态下 Zookeeper Session 关闭 (比如 Zookeeper 假死),那么由于/hadoop-ha/\${dfs.nameservices}/ActiveBreadCrumb 是持久节点,会一直保留下来。后面当另一个 NameNode 选主成功之后,会注意到上一个 Active NameNode 遗留下来的这个节点,从而会回调 ZKFailoverController 的方法对旧的 Active NameNode 进行隔离(fencing)操作以避免出现脑裂问题,fencing操作会通过SSH将旧的Active NameNode进程尝试转换成Standby状态,如果不能转换成Standby状态就直接将对应进程杀死。

NameNode自动HA集群搭建

zookeeper集群搭建

这里搭建zookeeper版本为3.6.3,搭建zookeeper对应的角色分布如下:

| **节点IP** | **节点名称** | **Zookeeper** |
| ---------------- | ------------------ | ------------------- |
| 192.168.179.4 | node1 | |
| 192.168.179.5 | node2 | |
| 192.168.179.6 | node3 | ★ |
| 192.168.179.7 | node4 | ★ |
| 192.168.179.8 | node5 | ★ |

具体搭建步骤如下:

**1) 上传zookeeper并解压,配置环境变量**

将zookeeper安装包上传到node3节点/software目录下并解压:
[root@node3 software]# tar -zxvf ./apache-zookeeper-3.6.3-bin.tar.gz


在node3节点配置环境变量:

#进入vim /etc/profile,在最后加入:
export ZOOKEEPER_HOME=/software/apache-zookeeper-3.6.3-bin/
export PATH=$PATH:$ZOOKEEPER_HOME/bin

#使配置生效
source /etc/profile


**2) 在node3节点配置zookeeper**

进入“$ZOOKEEPER\_HOME/conf”修改zoo\_sample.cfg为zoo.cfg:

[root@node3 ~]# cd $ZOOKEEPER_HOME/conf
[root@node3 conf]# mv zoo_sample.cfg zoo.cfg


配置zoo.cfg中内容如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/data/zookeeper
clientPort=2181
server.1=node3:2888:3888
server.2=node4:2888:3888
server.3=node5:2888:3888


**3) 将配置好的zookeeper发送到node4,node5节点**

[root@node3 software]# scp -r apache-zookeeper-3.6.3-bin node4:/software/
[root@node3 software]# scp -r apache-zookeeper-3.6.3-bin node5:/software/


**4) 各个节点上创建数据目录,并配置zookeeper环境变量**

在node3,node4,node5各个节点上创建zoo.cfg中指定的数据目录“/opt/data/zookeeper”。

mkdir -p /opt/data/zookeeper

在node4,node5节点配置zookeeper环境变量

#进入vim /etc/profile,在最后加入:
export ZOOKEEPER_HOME=/software/apache-zookeeper-3.6.3-bin/
export PATH=$PATH:$ZOOKEEPER_HOME/bin

#使配置生效
source /etc/profile

**5) 各个节点创建节点ID**

在node3,node4,node5各个节点路径“/opt/data/zookeeper”中添加myid文件分别写入1,2,3:

#在node3的/opt/data/zookeeper中创建myid文件写入1
#在node4的/opt/data/zookeeper中创建myid文件写入2
#在node5的/opt/data/zookeeper中创建myid文件写入3

**6) 各个节点启动zookeeper,并检查进程状态**

#各个节点启动zookeeper命令
zkServer.sh start

#检查各个节点zookeeper进程状态
zkServer.sh status

HDFS节点规划

搭建HDFS NameNode HA不再需要原来的SecondaryNameNode角色,对应的角色有NameNode、DataNode、ZKFC、JournalNode在各个节点分布如下:

| **节点IP** | **节点名称** | **NN** | **DN** | **ZKFC** | **JN** |
| ---------------- | ------------------ | ------------ | ------------ | -------------- | ------------ |
| 192.168.179.4 | node1 | ★ | | ★ | |
| 192.168.179.5 | node2 | [★]() | | ★ | |
| 192.168.179.6 | node3 | ★ | ★ | ★ | ★ |
| 192.168.179.7 | node4 | | ★ | | ★ |
| 192.168.179.8 | node5 | | ★ | | ★ |

安装JDK

按照以下步骤在各个节点上安装JDK8。

**1) 各个节点创建/software目录,上传并安装jdk8 rpm包**

rpm -ivh /software/jdk-8u181-linux-x64.rpm

以上命令执行完成后,会在每台节点的/user/java下安装jdk8。

**2) 配置jdk环境变量**

在每台节点上配置jdk的环境变量:

export JAVA_HOME=/usr/java/jdk1.8.0_181-amd64
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar


每台节点配置完成后,最后执行“source /etc/profile”使配置生效。

HDFS HA集群搭建

在搭建HDFS HA之前,首先将node1~node5节点上之前搭建的Hadoop集群数据目录和安装文件删除,重新进行搭建,搭建步骤如下。

**1) 各个节点安装HDFS HA自动切换必须的依赖**

在HDFS集群搭建完成后,在Namenode HA切换进行故障转移时采用SSH方式进行,底层会使用到fuster包,有可能我们安装Centos7系统没有fuster程序包,导致不能进行NameNode HA 切换,我们可以通过安装Psmisc包达到安装fuster目的,因为此包中包含了fuster程序,安装方式如下,在各个节点上执行如下命令,安装Psmisc包:

yum -y install psmisc

**2) 下载安装包并解压**

我们安装Hadoop3.3.6版本,此版本目前是比较新的版本,搭建HDFS集群前,首先需要在官网下载安装包,地址如下:[https://hadoop.apache.org/releases.html](https://hadoop.apache.org/releases.html)。下载完成安装包后,上传到node1节点的/software目录下并解压,没有此目录,可以先创建此目录。

#将下载好的hadoop安装包上传到node1节点上
[root@node1 ~]# ls /software/
hadoop-3.3.6.tar.gz

[root@node1 ~]# cd /software/
[root@node1 software]# tar -zxvf ./hadoop-3.3.6.tar.gz

**3) 在node1节点上配置Hadoop的环境变量**

[root@node1 software]# vim /etc/profile
export HADOOP_HOME=/software/hadoop-3.3.6/
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:

#使配置生效
source /etc/profile

**4) 配置hadoop-env.sh**

由于通过SSH远程启动进程的时候默认不会加载/etc/profile设置,JAVA\_HOME变量就加载不到,而Hadoop启动需要读取到JAVA\_HOME信息,所有这里需要手动指定。在对应的\$HADOOP\_HOME/etc/hadoop路径中,找到hadoop-env.sh文件加入以下配置(大概在54行有默认注释配置的JAVA\_HOME):

#vim /software/hadoop-3.3.6/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_181-amd64/

**5) 配置core-site.xml**

进入 $HADOOP\_HOME/etc/hadoop路径下,修改core-site.xml文件,指定HDFS集群数据访问地址及集群数据存放路径。

#vim /software/hadoop-3.3.6/etc/hadoop/core-site.xml
<configuration>
    <property>
        <!-- 为Hadoop 客户端配置默认的高可用路径  -->
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
    </property>
    <property>
        <!-- Hadoop 数据存放的路径,namenode,datanode 数据存放路径都依赖本路径,不要使用 file:/ 开头,使用绝对路径即可
            namenode 默认存放路径 :file://${hadoop.tmp.dir}/dfs/name
            datanode 默认存放路径 :file://${hadoop.tmp.dir}/dfs/data
        -->
        <name>hadoop.tmp.dir</name>
        <value>/opt/data/hadoop/</value>
    </property>

    <property>
        <!-- 指定zookeeper所在的节点 -->
        <name>ha.zookeeper.quorum</name>
        <value>node3:2181,node4:2181,node5:2181</value>
    </property>

</configuration>

**6) 配置hdfs-site.xml**

进入 $HADOOP\_HOME/etc/hadoop路径下,修改hdfs-site.xml文件,指定NameNode和JournalNode节点和端口。这里配置NameNode节点为3个。

#vim /software/hadoop-3.3.6/etc/hadoop/hdfs-site.xml
<configuration>
    <!-- 指定副本的数量 -->
    <property>
      <name>dfs.replication</name>
      <value>3</value>
    </property>
  
    <!-- 解析参数dfs.nameservices值hdfs://mycluster的地址 -->
    <property>
      <name>dfs.nameservices</name>
      <value>mycluster</value>
    </property>
  
    <!-- mycluster由以下三个namenode支撑 -->
    <property>
      <name>dfs.ha.namenodes.mycluster</name>
      <value>nn1,nn2,nn3</value>
    </property>
  
    <property>
      <!-- dfs.namenode.rpc-address.[nameservice ID].[name node ID] namenode 所在服务器名称和RPC监听端口号  -->
      <name>dfs.namenode.rpc-address.mycluster.nn1</name>
      <value>node1:8020</value>
    </property>
  
    <property>
      <!-- dfs.namenode.rpc-address.[nameservice ID].[name node ID] namenode 所在服务器名称和RPC监听端口号  -->
      <name>dfs.namenode.rpc-address.mycluster.nn2</name>
      <value>node2:8020</value>
    </property>

    <property>
      <!-- dfs.namenode.rpc-address.[nameservice ID].[name node ID] namenode 所在服务器名称和RPC监听端口号  -->
      <name>dfs.namenode.rpc-address.mycluster.nn3</name>
      <value>node3:8020</value>
    </property>
  
    <property>
      <!-- dfs.namenode.http-address.[nameservice ID].[name node ID] namenode 监听的HTTP协议端口 -->
      <name>dfs.namenode.http-address.mycluster.nn1</name>
      <value>node1:9870</value>
    </property>
    <property>
      <!-- dfs.namenode.http-address.[nameservice ID].[name node ID] namenode 监听的HTTP协议端口 -->
      <name>dfs.namenode.http-address.mycluster.nn2</name>
      <value>node2:9870</value>
    </property>
    <property>
      <!-- dfs.namenode.http-address.[nameservice ID].[name node ID] namenode 监听的HTTP协议端口 -->
      <name>dfs.namenode.http-address.mycluster.nn3</name>
      <value>node3:9870</value>
    </property>

    <!-- namenode高可用代理类 -->
    <property>
      <name>dfs.client.failover.proxy.provider.mycluster</name>
      <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
  
    <!-- 指定三台journal node服务器的地址 -->
    <property>
      <name>dfs.namenode.shared.edits.dir</name>
      <value>qjournal://node3:8485;node4:8485;node5:8485/mycluster</value>
    </property>
  
    <!-- journalnode 存储数据的地方 -->
    <property>
      <name>dfs.journalnode.edits.dir</name>
      <value>/opt/data/journal/node/local/data</value>
    </property>
  
    <!--启动NN故障自动切换 -->
    <property>
      <name>dfs.ha.automatic-failover.enabled</name>
      <value>true</value>
    </property>
  
    <!-- 当active nn出现故障时,ssh到对应的服务器,将namenode进程kill掉  -->
    <property>
      <name>dfs.ha.fencing.methods</name>
      <value>sshfence</value>
    </property>
    <property>
      <name>dfs.ha.fencing.ssh.private-key-files</name>
      <value>/root/.ssh/id_rsa</value>
    </property>
</configuration>

**7) 配置workers指定DataNode节点**

进入 $HADOOP\_HOME/etc/hadoop路径下,修改workers配置文件,加入以下内容:

#vim /software/hadoop-3.3.6/etc/hadoop/workers
node3
node4
node5

**8) 配置start-dfs.sh&stop-dfs.sh**

进入 $HADOOP\_HOME/sbin路径下,在start-dfs.sh和stop-dfs.sh文件顶部添加操作HDFS的用户为root,防止启动错误。

#分别在start-dfs.sh 和stop-dfs.sh文件顶部添加如下内容
HDFS_NAMENODE_USER=root
HDFS_DATANODE_USER=root
HDFS_JOURNALNODE_USER=root
HDFS_ZKFC_USER=root

**9) 分发安装包**

将node1节点上配置好的hadoop安装包发送到node2~node5节点上。这里由于Hadoop安装包比较大,也可以先将原有hadoop安装包上传到其他节点解压,然后在node1节点上只分发hfds-site.xml 、core-site.xml文件即可。

#在node1节点上执行如下分发命令
[root@node1 ~]# cd /software/
[root@node1 software]# scp -r ./hadoop-3.3.6/ node2:/software/
[root@node1 software]# scp -r ./hadoop-3.3.6/ node3:/software/
[root@node1 software]# scp -r ./hadoop-3.3.6/ node4:/software/
[root@node1 software]# scp -r ./hadoop-3.3.6/ node5:/software/

**10) 在node2、node3、node4、node5节点上配置HADOOP\_HOME**

#分别在node2、node3、node4、node5节点上配置HADOOP_HOME
vim /etc/profile
export HADOOP_HOME=/software/hadoop-3.3.6/
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:

#最后记得Source
source /etc/profile

格式化并启动HDFS集群

HDFS HA 集群搭建完成后,首次使用需要进行格式化。步骤如下:

#在node3,node4,node5节点上启动zookeeper
zkServer.sh start


#在node1上格式化zookeeper
[root@node1 ~]# hdfs zkfc -formatZK


#在每台journalnode中启动所有的journalnode,这里就是node3,node4,node5节点上启动
hdfs --daemon start journalnode

#在node1中格式化namenode,只有第一次搭建做,以后不用做
[root@node1 ~]# hdfs namenode -format


#在node1中启动namenode,以便同步其他namenode
[root@node1 ~]# hdfs --daemon start namenode


#高可用模式配置namenode,使用下列命令来同步namenode(在需要同步的namenode中执行,这里就是在node2、node3上执行):
[root@node2 software]# hdfs namenode -bootstrapStandby
[root@node3 software]# hdfs namenode -bootstrapStandby

以上格式化集群完成后就可以在NameNode节点上执行如下命令启动集群:

#在node1节点上启动集群
[root@node1 ~]# start-dfs.sh

至此,HDFS HA搭建完成,可以浏览器访问HDFS WebUI界面,通过此界面方便查看和操作HDFS集群。

访问三个NameNode WebUI:

以上三个NameNode只有一个是Active状态,其余两个都是Standby状态,三个NameNode节点不一定哪个为active节点,这取决于争夺zookeeper锁,哪个节点先启动争夺到锁,那么就是active节点。

停止集群时只需要在NameNode节点上执行stop-dfs.sh命令即可。后续再次启动HDFS集群只需要在NameNode节点执行start-dfs.sh命令,不需要再次格式化集群。

测试NameNode HA

首先查看Zookeeper中的数据目录内容,可以看到当前Active NameNode节点为Node3。

[root@node3 ~]# zkCli.sh
[zk: localhost:2181(CONNECTED) 3] get /hadoop-ha/mycluster/ActiveBreadCrumb
myclusternn3node3 ?>(?>

当在node3节点kill掉对应的NameNode进程时,会在node1和node2中重新选取Active NameNode,如下:

#在对应Active节点kill NameNode进程
[root@node3 ~]# kill -9 38111

#再次查看zookeeper中节点信息
[zk: localhost:2181(CONNECTED) 5] get /hadoop-ha/mycluster/ActiveBreadCrumb

myclusternn2node2 ?>(?>

此时,可以执行命令重新启动node3上的NameNode,但启动后状态为Standby。

#重新启动挂掉的NameNode节点
[root@node3 ~]# hdfs --daemon start namenode

注意:如果以上测试NameNode节点不能正常切换,那么就查看各个NameNode节点$HADOOP_HOME/logs目录下对应的进行日志错误,根据具体错误来解决问题。

HDFS启动脚本和停止脚本编写

每次启动HDFS集群需要首先去各个zookeeper节点上启动zookeeper,然后再去namenode节点上启动HDFS集群,关闭集群时也是一样,先在namenode节点上停止HDFS集群,然后去zookeeper每台节点上关闭zookeeper。为了操作方便我们可以编写HDFS启动脚本和HDFS关闭脚本来方便以上操作。

这里在node1节点的$HADOOP\_HOME/sbin目录下准备start-hdfs-cluster.sh和stop-hdfs-cluster.sh两个脚本,内容如下:

start-hdfs-cluster.sh

#!/bin/bash
for zknode in node3 node4 node5
do
    ssh $zknode "source /etc/profile;zkServer.sh start"
done

sleep 1

start-dfs.sh
sleep 1

echo "=====node1 jps====="
jps

for other_node in node2 node3 node4 node5
do
   echo "=====$other_node jps====="
   ssh $other_node "source /etc/profile;jps"
done

给脚本赋予权限,在/software/hadoop-3.3.6/sbin目录下执行:

[root@node1 sbin]# chmod +x start-hdfs-cluster.sh

复制文件,可以省去给stop脚本赋予权限的步骤

[root@node1 sbin]# cp start-hdfs-cluster.sh stop-hdfs-cluster.sh

stop-hdfs-cluster.sh

#!/bin/bash
stop-dfs.sh
sleep 1

for zknode in node3 node4 node5
do
    ssh $zknode "source /etc/profile;zkServer.sh stop"
done

echo "=====node1 jps====="
jps

for other_node in node2 node3 node4 node5
do
   echo "=====$other_node jps====="
   ssh $other_node "source /etc/profile;jps"
done

修改以上两个脚本权限:

[root@node1 ~]# cd $HADOOP_HOME/sbin
[root@node1 sbin]# chmod 755 ./start-hdfs-cluster.sh ./stop-hdfs-cluster.sh

以上脚本准备好之后,可以在node1节点上运行脚本进行集群启停。建议初学者按照原始方式来启动关闭集群,可以加深HDFS原理的理解。

NameNode HA API操作

当搭建好HDFS HA后,同样可以在IDEA中编写代码通过API方式操作HDFS集群。与非HA方式配置不同,有以下两个点需要注意:

1) 将HDFS集群配置文件core-site.xml、hdfs-site.xml需要放在项目resource资源目录下。
2) 在代码中访问HDFS HA集群时URI写法为:“hdfs://mycluster”,对应HDFS集群配置文件core-site.xml中fs.defaultFS配置项。

API操作HDFS HA 集群代码如下:

public class TestHAHDFS {
    public static FileSystem fs = null;

    public static void main(String[] args) throws IOException, InterruptedException {
        Configuration conf = new Configuration();
        //创建FileSystem对象
        fs = FileSystem.get(URI.create("hdfs://mycluster/"),conf,"root");

        //查看HDFS路径文件
        listHDFSPathDir("/");
        System.out.println("=====================================");

        //创建目录
        mkdirOnHDFS("/laowu/testdir");
        System.out.println("=====================================");

        //向HDFS 中上传数据
        writeFileToHDFS("./data/test.txt","/laowu/testdir/test.txt");
        System.out.println("=====================================");

        //重命名HDFS文件
        renameHDFSFile("/laowu/testdir/test.txt","/laowu/testdir/new_test.txt");
        System.out.println("=====================================");

        //查看文件详细信息
        getHDFSFileInfos("/laowu/testdir/new_test.txt");
        System.out.println("=====================================");

        //读取HDFS中的数据
        readFileFromHDFS("/laowu/testdir/new_test.txt");
        System.out.println("=====================================");

        //删除HDFS中的目录或者文件
        deleteFileOrDirFromHDFS("/laowu/testdir");
        System.out.println("=====================================");

        //关闭fs对象
        fs.close();
    }

    private static void listHDFSPathDir(String hdfsPath) throws IOException {
        FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
        for (FileStatus fileStatus : fileStatuses) {
            if(fileStatus.isDirectory()){
                listHDFSPathDir(fileStatus.getPath().toString());
            }
            System.out.println(fileStatus.getPath());
        }
    }

    private static void mkdirOnHDFS(String dirpath) throws IOException {
        Path path = new Path(dirpath);

        //判断目录是否存在
        if(fs.exists(path)) {
            System.out.println("目录" + dirpath + "已经存在");
            return;
        }

        //创建HDFS目录
        boolean result = fs.mkdirs(path);
        if(result) {
            System.out.println("创建目录" + dirpath + "成功");
        } else {
            System.out.println("创建目录" + dirpath + "失败");
        }
    }

    private static void writeFileToHDFS(String localFilePath, String hdfsFilePath) throws IOException {
        //判断HDFS文件是否存在,存在则删除
        Path hdfsPath = new Path(hdfsFilePath);
        if(fs.exists(hdfsPath)) {
            fs.delete(hdfsPath, true);
        }

        //创建HDFS文件路径
        Path path = new Path(hdfsFilePath);
        FSDataOutputStream out = fs.create(path);

        //读取本地文件写入HDFS路径中
        FileReader fr = new FileReader(localFilePath);
        BufferedReader br = new BufferedReader(fr);
        String newLine = "";
        while ((newLine = br.readLine()) != null) {
            out.write(newLine.getBytes());
            out.write("\n".getBytes());
        }

        //关闭流对象
        out.close();
        br.close();
        fr.close();

        //以上代码也可以调用copyFromLocalFile方法完成
        //参数解释如下:上传完成是否删除原数据;是否覆盖写入;本地文件路径;写入HDFS文件路径
        fs.copyFromLocalFile(false,true,new Path(localFilePath),new Path(hdfsFilePath));
        System.out.println("本地文件 ./data/test.txt 写入了HDFS中的"+hdfsFilePath+"文件中");

    }

    private static void renameHDFSFile(String hdfsOldFilePath,String hdfsNewFilePath) throws IOException {
        fs.rename(new Path(hdfsOldFilePath),new Path(hdfsNewFilePath));
        System.out.println("成功将"+hdfsOldFilePath+"命名为:"+hdfsNewFilePath);
    }

    private static void getHDFSFileInfos(String hdfsFilePath) throws IOException {
        Path file = new Path(hdfsFilePath);
        RemoteIterator<LocatedFileStatus> listFilesIterator = fs.listFiles(file, true);//是否递归
        while(listFilesIterator.hasNext()){
            LocatedFileStatus fileStatus = listFilesIterator.next();
            System.out.println("文件详细信息如下:");
            System.out.println("权限:" + fileStatus.getPermission());
            System.out.println("所有者:" + fileStatus.getOwner());
            System.out.println("组:" + fileStatus.getGroup());
            System.out.println("大小:" + fileStatus.getLen());
            System.out.println("修改时间:" + fileStatus.getModificationTime());
            System.out.println("副本数:" + fileStatus.getReplication());
            System.out.println("块大小:" + fileStatus.getBlockSize());
            System.out.println("文件名:" + fileStatus.getPath().getName());

            //获取当前文件block所在节点信息
            BlockLocation[] blks = fileStatus.getBlockLocations();
            for (BlockLocation nd : blks) {
                System.out.println("block信息:"+nd);
            }
        }
    }

    private static void readFileFromHDFS(String hdfsFilePath) throws IOException {
        //读取HDFS文件
        Path path= new Path(hdfsFilePath);
        FSDataInputStream in = fs.open(path);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String newLine = "";
        while((newLine = br.readLine()) != null) {
            System.out.println(newLine);
        }

        //关闭流对象
        br.close();
        in.close();
    }

    private static void deleteFileOrDirFromHDFS(String hdfsFileOrDirPath) throws IOException {
        //判断HDFS目录或者文件是否存在
        Path path = new Path(hdfsFileOrDirPath);
        if(!fs.exists(path)) {
            System.out.println("HDFS目录或者文件不存在");
            return;
        }

        //第二个参数表示是否递归删除
        boolean result = fs.delete(path, true);
        if(result){
            System.out.println("HDFS目录或者文件 "+path+" 删除成功");
        } else {
            System.out.println("HDFS目录或者文件 "+path+" 删除成功");
        }

    }

}

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言