HA整体架构

![HA整体架构图](/blogimgs/HDFS HA机制解析/ha架构图.png “HA整体架构图”)

从上图中,我们可以看出NameNode的高可用架构主要分为下面几个部分:

Active NameNode和Standby NameNode:两台NameNode形成互备,一台处于Active状态,为主NameNode,另外一台处于Standby状态,为备NameNode,只有主NameNode才能对外提供读写服务

主备切换控制器ZKFailoverController:ZKFailoverController作为独立的进程运行,对NameNode的主备切换进行总体控制(ZKFailoverController是抽象类,它的实现类是DFSZKFailoverController)。ZKFailoverController通过HealthMonitor线程能及时检测到NameNode的健康状况,在主NameNode故障时借助Zookeeper实现自动的主备选举和切换,当然NameNode目前也支持不依赖于Zookeeper的手动主备切换。

为啥把监控分开
显然,我们不能在NN进程内进行心跳等信息同步,最简单的原因,一次FullGC就可以让NN挂起十几分钟,所以,必须要有一个独立的短小精悍的watchdog来专门负责监控。这也是一个松耦合的设计,便于扩展或更改。

Zookeeper集群:为主备切换控制器提供主备选举支持。

共享存储系统:共享存储系统是实现NameNode的高可用最为关键的部分,共享存储系统保存了NameNode在运行过程中所产生的HDFS的元数据。只有active namenode才能往共享存储系统中写数据,active NameNode和standby NameNode通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主NameNode在确认元数据完全同步之后才能继续对外提供服务。

DataNode节点:除了通过共享存储系统共享HDFS的元数据信息之外,主NameNode和备NameNode还需要共享HDFS的数据块和DataNode之间的映射关系。_DataNode会同时向主NameNode和备NameNode上报数据块的位置信息,但只接收来自active namenode的读写命令_。

这里主要介绍通过__隔离和Quorum Journal Manager(QJM)共享存储空间__实现HDFS HA。

隔离(Fencing)

隔离(Fencing)是为了防止脑裂,就是保证在任何时候HDFS只有一个Active NN,主要包括三个方面:

  • 共享存储fencing:确保只有一个NN可以写入edits。QJM中每一个JournalNode中均有一个epochnumber,匹配epochnumber的QJM才有权限更新JN。当NN由standby状态切换成active状态时,会重新生成一个epoch number,并更新JN中的epochnumber,以至于以前的ActiveNN中的QJM中的epoch number和JN的epochnumber不匹配,故而原ActiveNN上的QJM没法往JN中写入数据(后面会介绍源码),即形成了fencing
  • 客户端fencing:确保只有一个NN可以响应客户端的请求。
  • DataNode fencing:确保只有一个NN可以向DN下发命令,譬如删除块,复制块,等等。

QJM的Fencing方案只能让原来的Active NN失去对JN的写权限,但是原来的Active NN还是可以响应客户端的请求,对DN进行读。对客户端和DataNode的fence是通过配置dfs.ha.fencing.methods实现的。Hadoop公共库中有两种Fencing实现:sshfence、shell
sshfence:ssh到原Active NN上,使用fuser结束进程(通过tcp端口号定位进程pid,该方法比jps命令更准确)。
shell: run an arbitrary shell command to fence the Active NameNode,即执行一个用户事先定义的shell命令(脚本)完成隔离。

QJM共享存储

Qurom Journal Manager(QJM)是一个基于Paxos算法实现的HDFS 元数据共享存储的方案。QJM的基本原理就是用2N+1台JournalNode存储EditLog,每次写数据操作有大多数(>=N+1)返回成功时即认为该次写成功,数据不会丢失。这个算法所能容忍的是最多有N台机器挂掉,如果多于N台挂掉,这个算法就失效了。这个原理是基于Paxos算法的。
用QJM的方式来实现HA的主要好处有:1)不需要配置额外的高共享存储,这样对于基于commodity hardware的云计算数据中心来说,降低了复杂度和维护成本;2)不在需要单独配置fencing实现,因为QJM本身内置了fencing的功能;3)不存在Single Point Of Failure;4)系统鲁棒性的程度是可配置的(QJM基于Paxos算法,所以如果配置2N+1台JournalNode组成的集群,能容忍最多N台机器挂掉);5)QJM中存储日志的JournalNode不会因为其中一台的延迟而影响整体的延迟,而且也不会因为JournalNode的数量增多而影响性能(因为NN向JournalNode发送日志是并行的)。

源码分析主备切换

主备切换主要是通过ZKFailoverController实现的,是一个独立的进程,在hdfs启动脚本之中的进程名为_zkfc_。ZKFailoverController是一个抽象类,其实现类的DFSZKFailoverController,其内部有三个组件,分别为_HealthMonitor、ActiveStandbyElector和FailoverController_。其中HealthMonitor是一个独立的线程,循环的检测nn的状态,ActiveStandbyElector主要用来与zk保持心跳和通过创建临时节点进行节点的选举,FailoverController是进行状态转换。

zkfc的程序入口在DFSZKFailoverController中的main方法中,代码如下:

1
2
3
4
5
6
7
8
public static void main(String args[])
throws Exception {
...
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
parser.getConfiguration());

System.exit(zkfc.run(parser.getRemainingArgs()));
}

DFSZKFailoverController通过create创建一个实例,然后调用run方法启动zkfc。create方法里构造了一个NNHAServiceTarget对象,并由此构建DFSZKFailoverController对象。代码如下:

1
2
3
4
5
6
public static DFSZKFailoverController create(Configuration conf) {
...
NNHAServiceTarget localTarget = new NNHAServiceTarget(
localNNConf, nsId, nnId);
return new DFSZKFailoverController(localNNConf, localTarget);
}

NNHAServiceTarget主要是用来存放目标nn的fence相关的属性,源码对类的解释是_One of the NN NameNodes acting as the target of an administrative command(e.g. failover) ,译:NNs中的一个nn作为管理命令的目标_。在其构造方法中对nsId、nnId、addr、zkfc addr和fencer进行赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public NNHAServiceTarget(Configuration conf,
String nsId, String nnId) {
...
this.addr = NetUtils.createSocketAddr(serviceAddr,
NameNode.DEFAULT_PORT);

this.autoFailoverEnabled = targetConf.getBoolean(
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
if (autoFailoverEnabled) {
int port = DFSZKFailoverController.getZkfcPort(targetConf);
if (port != 0) {
setZkfcPort(port);
}
}

try {
this.fencer = NodeFencer.create(targetConf,
DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY);
} catch (BadFencingConfigurationException e) {
this.fenceConfigError = e;
}

this.nnId = nnId;
this.nsId = nsId;
}

DFSZKFailoverController的对象zkfc创建成功之后,会执行run方法,run方法是在ZKFailoverController中实现的,_而run中又调用了doRun方法_,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
try {
initZK(); // new ActiveStandbyElector,与zk建立连接
} catch (KeeperException ke) {
...
return ERR_CODE_NO_ZK;
}
// 在启动zkfc之前,要先对zk进行格式化
// 格式化时输入参数,启动时不加参数
if (args.length > 0) {
if ("-formatZK".equals(args[0])) {
boolean force = false;
boolean interactive = true;
for (int i = 1; i < args.length; i++) {
if ("-force".equals(args[i])) {
force = true;
} else if ("-nonInteractive".equals(args[i])) {
interactive = false;
} else {
badArg(args[i]);
}
}
// 对zk格式化,默认会zk上创建/hadoop-ha节点
return formatZK(force, interactive);
} else {
badArg(args[0]);
}
}
...
initRPC(); // 初始化rpcServer,创建ZKFCRpcServer对象,使用的是hadoop rpc接口和PB序列化
initHM(); // 启动HealthMonitor线程
startRPC(); // 启动rpcServer
try {
mainLoop(); // 循环阻塞等待fatalError错误,然后退出zkfc进程
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true); // 关闭zk连接,退出选举
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}

首先调用initZK(),与zk建立连接,与zk的连接是在ActiveStandbyElector类中,ActiveStandbyElector在构造函数中注册了一个回调函数ElectorCallbacks,而ActiveStandbyElector又实现了StatCallbackStringCallback,分别在调用zkClient.existszkClient.create之后通过processResult对其调用结果状态进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private void initZK() throws HadoopIllegalArgumentException, IOException,
KeeperException {
...
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, getParentZnode(), zkAcls, zkAuths,
new ElectorCallbacks(), maxRetryNum);
}

public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
HadoopIllegalArgumentException, KeeperException {
if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
throw new HadoopIllegalArgumentException("Invalid argument");
}
zkHostPort = zookeeperHostPorts;
zkSessionTimeout = zookeeperSessionTimeout;
zkAcl = acl;
zkAuthInfo = authInfo;
appClient = app;
znodeWorkingDir = parentZnodeName;
// 临时节点ActiveStandbyElectorLock,用于标识锁
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
// 永久节点ActiveBreadCrumb,用于存放active信息
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
this.maxRetryNum = maxRetryNum;

// createConnection for future API calls
// 创建zk连接
createConnection();
}

private void createConnection() throws IOException, KeeperException {
...
zkClient = getNewZooKeeper();
LOG.debug("Created new connection for " + this);
}

protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
KeeperException {

// Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
// may trigger the Connected event immediately. So, if we register the
// watcher after constructing ZooKeeper, we may miss that event. Instead,
// we construct the watcher first, and have it block any events it receives
// before we can set its ZooKeeper reference.
// 不幸的是,zk的构造方法连接上zk之后,可能马上触发连接事件。
// 因此如果构造zk之后注册watcher,可能不会捕获到连接事件。
// 取而代之的方法是,先构造Watcher,在设置了zk的引用之前,使它阻塞所有的事件
watcher = new WatcherWithClientRef();
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
// 在watcher中设置zk的引用
watcher.setZooKeeperRef(zk);

// Wait for the asynchronous success/failure. This may throw an exception
// if we don't connect within the session timeout.
watcher.waitForZKConnectionEvent(zkSessionTimeout);

for (ZKAuthInfo auth : zkAuthInfo) {
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zk;
}

initZk主要是通过实例化ActiveStandbyElector,从而得到zkClient,并向其注册一个watcher,这里简单介绍下watcher的实现类WatcherWithClientRef

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private final class WatcherWithClientRef implements Watcher {
private ZooKeeper zk;

/**
* Latch fired whenever any event arrives. This is used in order
* to wait for the Connected event when the client is first created.
*/
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);

/**
* Latch used to wait until the reference to ZooKeeper is set.
*/
private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);

/**
* Waits for the next event from ZooKeeper to arrive.
*
* @param connectionTimeoutMs zookeeper connection timeout in milliseconds
* @throws KeeperException if the connection attempt times out. This will
* be a ZooKeeper ConnectionLoss exception code.
* @throws IOException if interrupted while connecting to ZooKeeper
*/
// 接收连接是否连接成功
private void waitForZKConnectionEvent(int connectionTimeoutMs)
throws KeeperException, IOException {
try {
// await() 如果hasReceivedEvent为0则立即返回true,
// 如果在connectionTimeoutMs内hasReceivedEvent依然不为0,线程依然阻塞则返回false
// 当调用countDown()之后,hasReceivedEvent的值会发生变化减1
if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
+ connectionTimeoutMs + " milliseconds");
zk.close();
throw KeeperException.create(Code.CONNECTIONLOSS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
"Interrupted when connecting to zookeeper server", e);
}
}

private void setZooKeeperRef(ZooKeeper zk) {
Preconditions.checkState(this.zk == null,
"zk already set -- must be set exactly once");
this.zk = zk;
hasSetZooKeeper.countDown();
}

@Override
public void process(WatchedEvent event) {
// 接收到事件之后,hasReceivedEvent的值减1
hasReceivedEvent.countDown();
try {
hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS);
// 捕获到事件之后具体的处理逻辑
ActiveStandbyElector.this.processWatchEvent(
zk, event);
} catch (Throwable t) {
fatalError(
"Failed to process watcher event " + event + ": " +
StringUtils.stringifyException(t));
}
}
}

WatcherWithClientRef在构造zk时被注册为默认watcher,主要监听连接或者断开事件。当调用initZk之后,watcher.process会对事件进行处理,连接、断开、过期的状态类型都是EventType.None。

initZK()之后也就得到了zkClient,继续doRun,查看是否有参数_-formatZK_,有则执行formatZK,对zk进行格式化,然后结束程序。这里主要介绍对nn的FailoverController,是没有参数的情况,接下来是初始化rpc服务initRPC,zkfc的rpc类是ZKFCRpcServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected void initRPC() throws IOException {
InetSocketAddress bindAddr = getRpcAddressToBindTo();
rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
}
// ZKFCRpcServer.class
ZKFCRpcServer(Configuration conf,
InetSocketAddress bindAddr,
ZKFailoverController zkfc,
PolicyProvider policy) throws IOException {
this.zkfc = zkfc;
// 使用protocol buffer序列化
RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
ProtobufRpcEngine.class);
ZKFCProtocolServerSideTranslatorPB translator =
new ZKFCProtocolServerSideTranslatorPB(this);
BlockingService service = ZKFCProtocolService
.newReflectiveBlockingService(translator);
// 使用hadoop rpc接口得到rpc server
// ZKFCProtocol是rpc协议,service是rpc协议的实现类
// ZKFCProtocolPB是protobuf rpc接口的一个过渡类
this.server = new RPC.Builder(conf).setProtocol(ZKFCProtocolPB.class)
.setInstance(service).setBindAddress(bindAddr.getHostName())
.setPort(bindAddr.getPort()).setNumHandlers(HANDLER_COUNT)
.setVerbose(false).build();

// set service-level authorization security policy
if (conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
server.refreshServiceAcl(conf, policy);
}
}

这里主要是构造了一个zkfc的rpc server,此rpc使用protocol buffer序列化消息,hadoop rpc提供的接口来构造rpc server。rpc协议是在ZKFCProtocol接口中定义的,包含两个方法cedeActivegracefulFailover,其接口的实现类是ZKFCProtocolServerSideTranslatorPB,消息格式传输的格式是在ZKFCProtocol.proto中定义的。有关hadoop rpc更详细的内容可以查看先前的博客[Hadoop RPC 解析](http://bigdatadecode.top/Hadoop RPC 解析.html)。

initRPC之后开始调用initHM,初始化HealthMonitor

1
2
3
4
5
6
7
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
// 添加回调函数
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start();
}

先来看下HealthMonitor的构造函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
HealthMonitor(Configuration conf, HAServiceTarget target) {
this.targetToMonitor = target;
this.conf = conf;
// ha.health-monitor.sleep-after-disconnect.ms
this.sleepAfterDisconnectMillis = conf.getLong(
HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT);
// ha.health-monitor.check-interval.ms 隔多久去检查nn是否健康,默认是1000
this.checkIntervalMillis = conf.getLong(
HA_HM_CHECK_INTERVAL_KEY,
HA_HM_CHECK_INTERVAL_DEFAULT);
// ha.health-monitor.connect-retry-interval.ms 默认是1000
this.connectRetryInterval = conf.getLong(
HA_HM_CONNECT_RETRY_INTERVAL_KEY,
HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
// ha.health-monitor.rpc-timeout.ms
this.rpcTimeout = conf.getInt(
HA_HM_RPC_TIMEOUT_KEY,
HA_HM_RPC_TIMEOUT_DEFAULT);

this.daemon = new MonitorDaemon();
}

HealthMonitor构造完之后,添加回调类,然后将HealthMonitor线程作为一个_守护进程_去启动。查看其run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// MonitorDaemon.run HealthMonitor的内部类
public void run() {
while (shouldRun) {
try {
// 得到rpc的客户端,此时的rpc server是在nn中启动的rpc server
loopUntilConnected();
// 对nn进行循环检查
doHealthChecks();
} catch (InterruptedException ie) {
Preconditions.checkState(!shouldRun,
"Interrupted but still supposed to run");
}
}
}

private void doHealthChecks() throws InterruptedException {
while (shouldRun) {
HAServiceStatus status = null;
boolean healthy = false;
try {
status = proxy.getServiceStatus();
// 健康状态检查,异常会抛出
proxy.monitorHealth();
healthy = true;
} catch (HealthCheckFailedException e) {
LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + e.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
}

if (status != null) {
setLastServiceStatus(status);
}
if (healthy) {
enterState(State.SERVICE_HEALTHY);
}
// 间隔checkIntervalMillis之后继续check healthy
Thread.sleep(checkIntervalMillis);
}
}

健康状态的检测主要是在doHealthChecks中调用monitorHealth,其调用逻辑是NameNodeRpcServer.monitorHealth–>NameNode.monitorHealth,该方法会调用getNamesystem().checkAvailableResources()去检查磁盘是否有足够的可用空间,不够则抛出HealthCheckFailedException异常,在doHealthChecks中被捕获,设置状态为_SERVICE_UNHEALTHY_,该方法也可能抛出别的异常被Throwable捕获,设置状态为_SERVICE_NOT_RESPONDING_。然后调用enterState进行状态的更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private synchronized void enterState(State newState) {
if (newState != state) {
LOG.info("Entering state " + newState);
state = newState;
synchronized (callbacks) {
// 调用在构造HealthMonitor时注册的回调类
for (Callback cb : callbacks) {
cb.enteredState(newState);
}
}
}
}

class HealthCallbacks implements HealthMonitor.Callback {
@Override
public void enteredState(HealthMonitor.State newState) {
setLastHealthState(newState);
// 检查当前nn的状态,判断是否可用进行选举
recheckElectability();
}
}

private void recheckElectability() {
// Maintain lock ordering of elector -> ZKFC
synchronized (elector) {
synchronized (this) {
boolean healthy = lastHealthState == State.SERVICE_HEALTHY;

long remainingDelay = delayJoiningUntilNanotime - System.nanoTime();
if (remainingDelay > 0) {
if (healthy) {
LOG.info("Would have joined master election, but this node is " +
"prohibited from doing so for " +
TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
}
scheduleRecheck(remainingDelay);
return;
}

switch (lastHealthState) {
case SERVICE_HEALTHY:
elector.joinElection(targetToData(localTarget));
if (quitElectionOnBadState) {
quitElectionOnBadState = false;
}
break;

case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
elector.quitElection(false);
serviceState = HAServiceState.INITIALIZING;
break;

case SERVICE_UNHEALTHY:
case SERVICE_NOT_RESPONDING:
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
elector.quitElection(true);
serviceState = HAServiceState.INITIALIZING;
break;

case HEALTH_MONITOR_FAILED:
fatalError("Health monitor failed!");
break;

default:
throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
}
}
}
}

recheckElectability中匹配到nn的状态是SERVICE_UNHEALTHY或者SERVICE_NOT_RESPONDING则调用elector.quitElection(true)放弃选举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// needFence为true则需要执行fence
public synchronized void quitElection(boolean needFence) {
LOG.info("Yielding from election");
if (!needFence && state == State.ACTIVE) {
// If active is gracefully going back to standby mode, remove
// our permanent znode so no one fences us.
tryDeleteOwnBreadCrumbNode();
}
reset();
wantToBeInElection = false;
}

private void reset() {
state = State.INIT;
terminateConnection();
}

public synchronized void terminateConnection() {
if (zkClient == null) {
return;
}
LOG.debug("Terminating ZK connection for " + this);
ZooKeeper tempZk = zkClient;
zkClient = null;
watcher = null;
try {
tempZk.close();
} catch(InterruptedException e) {
LOG.warn(e);
}
zkConnectionState = ConnectionState.TERMINATED;
wantToBeInElection = false;
}

放弃选举其实就是调用ActiveStandbyElector的quitElection,将zkClinet关闭,zk连接关闭之后,之前创建的临时节点ActiveStandbyElectorLock被删除,此时standby节点上的ActiveStandbyElector通过watcher监听到_NodeDeleted_事件进行选举抢锁(只有standby节点上的watcher能监听到NodeDeleted事件,因为active节点上的watcher随着zkClinet的关闭已经消失了,无法进行监听)。

active节点上的ActiveStandbyElector退出选举之后,HealthMonitor线程继续check nn,等到active nn恢复正常之后重新进行选举。而standby节点上的watcher监听到NodeDeleted事件,由watcher.process处理事件,在process中又调用了ActiveStandbyElector.this.processWatchEvent,下面看下processWatchEvent的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState
+ " for " + this);

if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");

// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}

return;
}

// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}

return;
}

// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}

匹配到NodeDeleted事件,然后执行joinElectionInternal去创建临时节点,进行抢锁,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}

createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}

private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
this, zkClient);
}

是否创建成功的结果是在processResult中进行捕获的,此时的ActiveStandbyElector.processResult是实现StringCallback接口需要重写的方法,该类中还有个同名的方法,是实现StatCallback接口需要重写的方法。先来看下捕获create结果的processResult代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* interface implementation of Zookeeper callback for create
*/
@Override
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
...
Code code = Code.get(rc);
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
// 创建成功,则进行角色的转变
if (becomeActive()) {
monitorActiveStatus();
} else {
reJoinElectionAfterFailureToBecomeActive();
}
return;
}
// 节点已存在
if (isNodeExists(code)) {
if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock.
becomeStandby();
}
// if we had retried then the znode could have been created by our first
// attempt to the server (that we lost) and this node exists response is
// for the second attempt. verify this case via ephemeral node owner. this
// will happen on the callback for monitoring the lock.
monitorActiveStatus();
return;
}
...
fatalError(errorMessage);
}

如果创建成功则将当前节点转换为active,执行becomeActive,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean becomeActive() {
...
try {
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);

LOG.debug("Becoming active for " + this);
appClient.becomeActive();
state = State.ACTIVE;
return true;
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
// Caller will handle quitting and rejoining the election.
return false;
}
}

为防止脑裂,在变为active之前,先检查下是否需要fence,判断是否需要fence的依据是zk上是否有_breadcrumb_节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Stat fenceOldActive() throws InterruptedException, KeeperException {
final Stat stat = new Stat();
byte[] data;
LOG.info("Checking for any old active which needs to be fenced...");
try {
// 读取当前zk中节点的内容
data = zkDoWithRetries(new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
return zkClient.getData(zkBreadCrumbPath, false, stat);
}
});
} catch (KeeperException ke) {
if (isNodeDoesNotExist(ke.code())) {
LOG.info("No old node to fence");
return null;
}

// If we failed to read for any other reason, then likely we lost
// our session, or we don't have permissions, etc. In any case,
// we probably shouldn't become active, and failing the whole
// thing is the best bet.
throw ke;
}

LOG.info("Old node exists: " + StringUtils.byteToHexString(data));
// appData是当前节点加入选举时的节点信息,也就是standby节点的信息
if (Arrays.equals(data, appData)) {
LOG.info("But old node has our own data, so don't need to fence it.");
} else {
// 当前zk节点的信息与APPData的信息不符,进行fence
appClient.fenceOldActive(data);
}
return stat;
}

当前zk的ActiveBreadCrumb节点记录的是active节点的信息,与appData(记录的是standby节点的信息)不一样,则进行fenceOldActive操作,fenceOldActive调用的是在ActiveStandbyElector初始化时注册的回调类里的方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ElectorCallbacks.class
public void fenceOldActive(byte[] data) {
ZKFailoverController.this.fenceOldActive(data);
}
// ZKFailoverController.class
private synchronized void fenceOldActive(byte[] data) {
HAServiceTarget target = dataToTarget(data);

try {
doFence(target);
} catch (Throwable t) {
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
Throwables.propagate(t);
}
}

private void doFence(HAServiceTarget target) {
LOG.info("Should fence: " + target);
// 由FailoverController进行切换,如果没有切换成功则进行fence
boolean gracefulWorked = new FailoverController(conf,
RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
if (gracefulWorked) {
// It's possible that it's in standby but just about to go into active,
// no? Is there some race here?
LOG.info("Successfully transitioned " + target + " to standby " +
"state without fencing");
return;
}

try {
target.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.error("Couldn't fence old active " + target, e);
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
throw new RuntimeException(e);
}
// 调用NodeFencer类,进行fence
if (!target.getFencer().fence(target)) {
throw new RuntimeException("Unable to fence " + target);
}
}

doFence时先调用FailoverController.tryGracefulFence进行状态的转换,该方法通过rpc最终调用*NameNode.transitionToStandby()*方法,通过state.setState(haContext, STANDBY_STATE)将状态设置为standby,如果设置失败则调用NodeFencer类的fence方法进行强制fence。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean fence(HAServiceTarget fromSvc) {
LOG.info("====== Beginning Service Fencing Process... ======");
int i = 0;
// 可以设置多个fence method,以回车分隔
for (FenceMethodWithArg method : methods) {
LOG.info("Trying method " + (++i) + "/" + methods.size() +": " + method);

try {
// SshFenceByTcpPort 和 ShellCommandFencer 实现了tryFence的具体逻辑
if (method.method.tryFence(fromSvc, method.arg)) {
LOG.info("====== Fencing successful by method " + method + " ======");
return true;
}
}
...
}
...
}

fenceOldActive成功之后则调用writeBreadCrumbNode将当前standby节点的信息写入zk中,通过回调类ElectorCallbacks的becomeActive方法将standby节点变为active。状态转变的过程是通过rpc最终调用NameNode的transitionToActive方法将ACTIVE_STATE设置为当前节点的状态。切换成功之后则进入监控,调用monitorActiveStatus

1
2
3
4
5
6
7
8
9
10
11
12
private void monitorActiveStatus() {
assert wantToBeInElection;
LOG.debug("Monitoring active leader for " + this);
statRetryCount = 0;
monitorLockNodeAsync();
}

private void monitorLockNodeAsync() {
zkClient.exists(zkLockFilePath,
watcher, this,
zkClient);
}

zkClient.exists将WatcherWithClientRef注册为watcher,监听ActiveStandbyElectorLock znode的状态,exists的执行结果在processResult中捕获,此处的processResult是ActiveStandbyElector实现StatCallback接口需要重写的方法。先来看下捕获exists结果的processResult代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* interface implementation of Zookeeper callback for monitor (exists)
*/
@Override
public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
...
Code code = Code.get(rc);
if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
// we own the lock znode. so we are the leader
if (!becomeActive()) {
reJoinElectionAfterFailureToBecomeActive();
}
} else {
// we dont own the lock znode. so we are a standby.
becomeStandby();
}
// the watch set by us will notify about changes
return;
}

if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it
enterNeutralMode();
joinElectionInternal();
return;
}
...
}

此时状态已经切换完成,原active节点循环进行doHealthChecks,当节点恢复正常之后则调用ActiveStandbyElector.joinElection恢复选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public synchronized void joinElection(byte[] data)
throws HadoopIllegalArgumentException {

if (data == null) {
throw new HadoopIllegalArgumentException("data cannot be null");
}

if (wantToBeInElection) {
LOG.info("Already in election. Not re-connecting.");
return;
}
// 将节点信息写入appData中
appData = new byte[data.length];
System.arraycopy(data, 0, appData, 0, data.length);

LOG.debug("Attempting active election for " + this);
joinElectionInternal();
}

整个HA的故障转移到此分析完毕

总结

本文主要分析了HA模式下,active与standby进行切换的流程,其大致流程是active nn通过HealthMonitor线程(ha.health-monitor.check-interval.ms 默认1000)检测到磁盘空间不足或者rpc调用没有响应,捕获到SERVICE_UNHEALTHY或者SERVICE_NOT_RESPONDING状态,将退出选举,关闭zk连接,此时zk上ActiveStandbyElectorLock临时节点自动删除,standby节点上的watcher监听到NODEDELETED事件,进行抢锁,去zk上创建ActiveStandbyElectorLock节点,创建成功之后进行状态的转换becomActive,在becomeActive时会判断zk上是否存有原active节点创建的ActiveBreadCrumb节点,如果有则进行fence操作,先由FailoverController执行gracefullFence,如果不成功则执行NodeFence的fence方法(fence method有两个,分别是SSHFence和ShellFence),fence成功之后将当前节点的信息写入ActiveBreadCrumb节点,并将当前节点的状态转换为active。

原active节点的HealthMonitor线程一直循环检测nn的健康状况,等到nn健康之后再将其加入选举,加入选举也就是创建于zk的连接。