QJM的Fencing方案只能让原来的Active NN失去对JN的写权限,但是原来的Active NN还是可以响应客户端的请求,对DN进行读。对客户端和DataNode的fence是通过配置dfs.ha.fencing.methods实现的。Hadoop公共库中有两种Fencing实现:sshfence、shell sshfence:ssh到原Active NN上,使用fuser结束进程(通过tcp端口号定位进程pid,该方法比jps命令更准确)。 shell: run an arbitrary shell command to fence the Active NameNode,即执行一个用户事先定义的shell命令(脚本)完成隔离。
QJM共享存储
Qurom Journal Manager(QJM)是一个基于Paxos算法实现的HDFS 元数据共享存储的方案。QJM的基本原理就是用2N+1台JournalNode存储EditLog,每次写数据操作有大多数(>=N+1)返回成功时即认为该次写成功,数据不会丢失。这个算法所能容忍的是最多有N台机器挂掉,如果多于N台挂掉,这个算法就失效了。这个原理是基于Paxos算法的。 用QJM的方式来实现HA的主要好处有:1)不需要配置额外的高共享存储,这样对于基于commodity hardware的云计算数据中心来说,降低了复杂度和维护成本;2)不在需要单独配置fencing实现,因为QJM本身内置了fencing的功能;3)不存在Single Point Of Failure;4)系统鲁棒性的程度是可配置的(QJM基于Paxos算法,所以如果配置2N+1台JournalNode组成的集群,能容忍最多N台机器挂掉);5)QJM中存储日志的JournalNode不会因为其中一台的延迟而影响整体的延迟,而且也不会因为JournalNode的数量增多而影响性能(因为NN向JournalNode发送日志是并行的)。
NNHAServiceTarget主要是用来存放目标nn的fence相关的属性,源码对类的解释是_One of the NN NameNodes acting as the target of an administrative command(e.g. failover) ,译:NNs中的一个nn作为管理命令的目标_。在其构造方法中对nsId、nnId、addr、zkfc addr和fencer进行赋值。
// createConnection for future API calls // 创建zk连接 createConnection(); }
privatevoidcreateConnection()throws IOException, KeeperException { ... zkClient = getNewZooKeeper(); LOG.debug("Created new connection for " + this); }
protectedsynchronized ZooKeeper getNewZooKeeper()throws IOException, KeeperException { // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and // may trigger the Connected event immediately. So, if we register the // watcher after constructing ZooKeeper, we may miss that event. Instead, // we construct the watcher first, and have it block any events it receives // before we can set its ZooKeeper reference. // 不幸的是,zk的构造方法连接上zk之后,可能马上触发连接事件。 // 因此如果构造zk之后注册watcher,可能不会捕获到连接事件。 // 取而代之的方法是,先构造Watcher,在设置了zk的引用之前,使它阻塞所有的事件 watcher = new WatcherWithClientRef(); ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher); // 在watcher中设置zk的引用 watcher.setZooKeeperRef(zk);
// Wait for the asynchronous success/failure. This may throw an exception // if we don't connect within the session timeout. watcher.waitForZKConnectionEvent(zkSessionTimeout); for (ZKAuthInfo auth : zkAuthInfo) { zk.addAuthInfo(auth.getScheme(), auth.getAuth()); } return zk; }
privatefinalclassWatcherWithClientRefimplementsWatcher{ private ZooKeeper zk; /** * Latch fired whenever any event arrives. This is used in order * to wait for the Connected event when the client is first created. */ private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
/** * Latch used to wait until the reference to ZooKeeper is set. */ private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
/** * Waits for the next event from ZooKeeper to arrive. * * @param connectionTimeoutMs zookeeper connection timeout in milliseconds * @throws KeeperException if the connection attempt times out. This will * be a ZooKeeper ConnectionLoss exception code. * @throws IOException if interrupted while connecting to ZooKeeper */ // 接收连接是否连接成功 privatevoidwaitForZKConnectionEvent(int connectionTimeoutMs) throws KeeperException, IOException { try { // await() 如果hasReceivedEvent为0则立即返回true, // 如果在connectionTimeoutMs内hasReceivedEvent依然不为0,线程依然阻塞则返回false // 当调用countDown()之后,hasReceivedEvent的值会发生变化减1 if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) { LOG.error("Connection timed out: couldn't connect to ZooKeeper in " + connectionTimeoutMs + " milliseconds"); zk.close(); throw KeeperException.create(Code.CONNECTIONLOSS); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); thrownew IOException( "Interrupted when connecting to zookeeper server", e); } }
privatevoidsetZooKeeperRef(ZooKeeper zk){ Preconditions.checkState(this.zk == null, "zk already set -- must be set exactly once"); this.zk = zk; hasSetZooKeeper.countDown(); }
privatevoidrecheckElectability(){ // Maintain lock ordering of elector -> ZKFC synchronized (elector) { synchronized (this) { boolean healthy = lastHealthState == State.SERVICE_HEALTHY; long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); if (remainingDelay > 0) { if (healthy) { LOG.info("Would have joined master election, but this node is " + "prohibited from doing so for " + TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms"); } scheduleRecheck(remainingDelay); return; } switch (lastHealthState) { case SERVICE_HEALTHY: elector.joinElection(targetToData(localTarget)); if (quitElectionOnBadState) { quitElectionOnBadState = false; } break; case INITIALIZING: LOG.info("Ensuring that " + localTarget + " does not " + "participate in active master election"); elector.quitElection(false); serviceState = HAServiceState.INITIALIZING; break; case SERVICE_UNHEALTHY: case SERVICE_NOT_RESPONDING: LOG.info("Quitting master election for " + localTarget + " and marking that fencing is necessary"); elector.quitElection(true); serviceState = HAServiceState.INITIALIZING; break; case HEALTH_MONITOR_FAILED: fatalError("Health monitor failed!"); break; default: thrownew IllegalArgumentException("Unhandled state:" + lastHealthState); } } } }
// needFence为true则需要执行fence publicsynchronizedvoidquitElection(boolean needFence){ LOG.info("Yielding from election"); if (!needFence && state == State.ACTIVE) { // If active is gracefully going back to standby mode, remove // our permanent znode so no one fences us. tryDeleteOwnBreadCrumbNode(); } reset(); wantToBeInElection = false; }
privatevoidreset(){ state = State.INIT; terminateConnection(); }
if (eventType == Event.EventType.None) { // the connection state has changed switch (event.getState()) { case SyncConnected: LOG.info("Session connected."); // if the listener was asked to move to safe state then it needs to // be undone ConnectionState prevConnectionState = zkConnectionState; zkConnectionState = ConnectionState.CONNECTED; if (prevConnectionState == ConnectionState.DISCONNECTED && wantToBeInElection) { monitorActiveStatus(); } break; case Disconnected: LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection // is not active and we dont know our state zkConnectionState = ConnectionState.DISCONNECTED; enterNeutralMode(); break; case Expired: // the connection got terminated because of session timeout // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); reJoinElection(0); break; case SaslAuthenticated: LOG.info("Successfully authenticated to ZooKeeper using SASL."); break; default: fatalError("Unexpected Zookeeper watch event state: " + event.getState()); break; }
return; }
// a watch on lock path in zookeeper has fired. so something has changed on // the lock. ideally we should check that the path is the same as the lock // path but trusting zookeeper for now String path = event.getPath(); if (path != null) { switch (eventType) { case NodeDeleted: if (state == State.ACTIVE) { enterNeutralMode(); } joinElectionInternal(); break; case NodeDataChanged: monitorActiveStatus(); break; default: LOG.debug("Unexpected node event: " + eventType + " for path: " + path); monitorActiveStatus(); }
return; }
// some unexpected error has occurred fatalError("Unexpected watch error from Zookeeper"); }
privatevoidjoinElectionInternal(){ Preconditions.checkState(appData != null, "trying to join election without any app data"); if (zkClient == null) { if (!reEstablishSession()) { fatalError("Failed to reEstablish connection with ZooKeeper"); return; } }
/** * interface implementation of Zookeeper callback for create */ @Override publicsynchronizedvoidprocessResult(int rc, String path, Object ctx, String name){ ... Code code = Code.get(rc); if (isSuccess(code)) { // we successfully created the znode. we are the leader. start monitoring // 创建成功,则进行角色的转变 if (becomeActive()) { monitorActiveStatus(); } else { reJoinElectionAfterFailureToBecomeActive(); } return; } // 节点已存在 if (isNodeExists(code)) { if (createRetryCount == 0) { // znode exists and we did not retry the operation. so a different // instance has created it. become standby and monitor lock. becomeStandby(); } // if we had retried then the znode could have been created by our first // attempt to the server (that we lost) and this node exists response is // for the second attempt. verify this case via ephemeral node owner. this // will happen on the callback for monitoring the lock. monitorActiveStatus(); return; } ... fatalError(errorMessage); }
如果创建成功则将当前节点转换为active,执行becomeActive,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
privatebooleanbecomeActive(){ ... try { Stat oldBreadcrumbStat = fenceOldActive(); writeBreadCrumbNode(oldBreadcrumbStat); LOG.debug("Becoming active for " + this); appClient.becomeActive(); state = State.ACTIVE; returntrue; } catch (Exception e) { LOG.warn("Exception handling the winning of election", e); // Caller will handle quitting and rejoining the election. returnfalse; } }
private Stat fenceOldActive()throws InterruptedException, KeeperException { final Stat stat = new Stat(); byte[] data; LOG.info("Checking for any old active which needs to be fenced..."); try { // 读取当前zk中节点的内容 data = zkDoWithRetries(new ZKAction<byte[]>() { @Override publicbyte[] run() throws KeeperException, InterruptedException { return zkClient.getData(zkBreadCrumbPath, false, stat); } }); } catch (KeeperException ke) { if (isNodeDoesNotExist(ke.code())) { LOG.info("No old node to fence"); returnnull; } // If we failed to read for any other reason, then likely we lost // our session, or we don't have permissions, etc. In any case, // we probably shouldn't become active, and failing the whole // thing is the best bet. throw ke; }
LOG.info("Old node exists: " + StringUtils.byteToHexString(data)); // appData是当前节点加入选举时的节点信息,也就是standby节点的信息 if (Arrays.equals(data, appData)) { LOG.info("But old node has our own data, so don't need to fence it."); } else { // 当前zk节点的信息与APPData的信息不符,进行fence appClient.fenceOldActive(data); } return stat; }
privatevoiddoFence(HAServiceTarget target){ LOG.info("Should fence: " + target); // 由FailoverController进行切换,如果没有切换成功则进行fence boolean gracefulWorked = new FailoverController(conf, RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target); if (gracefulWorked) { // It's possible that it's in standby but just about to go into active, // no? Is there some race here? LOG.info("Successfully transitioned " + target + " to standby " + "state without fencing"); return; } try { target.checkFencingConfigured(); } catch (BadFencingConfigurationException e) { LOG.error("Couldn't fence old active " + target, e); recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active")); thrownew RuntimeException(e); } // 调用NodeFencer类,进行fence if (!target.getFencer().fence(target)) { thrownew RuntimeException("Unable to fence " + target); } }
/** * interface implementation of Zookeeper callback for monitor (exists) */ @Override publicsynchronizedvoidprocessResult(int rc, String path, Object ctx, Stat stat){ ... Code code = Code.get(rc); if (isSuccess(code)) { // the following owner check completes verification in case the lock znode // creation was retried if (stat.getEphemeralOwner() == zkClient.getSessionId()) { // we own the lock znode. so we are the leader if (!becomeActive()) { reJoinElectionAfterFailureToBecomeActive(); } } else { // we dont own the lock znode. so we are a standby. becomeStandby(); } // the watch set by us will notify about changes return; }
if (isNodeDoesNotExist(code)) { // the lock znode disappeared before we started monitoring it enterNeutralMode(); joinElectionInternal(); return; } ... }