private INodeFile checkLease(String src, String holder, INode inode, long fileId) throws LeaseExpiredException, FileNotFoundException { asserthasReadLock(); final String ident = src + " (inode " + fileId + ")"; if (inode == null) { Lease lease = leaseManager.getLease(holder); thrownew LeaseExpiredException( "No lease on " + ident + ": File does not exist. " + (lease != null ? lease.toString() : "Holder " + holder + " does not have any open files.")); } if (!inode.isFile()) { Lease lease = leaseManager.getLease(holder); thrownew LeaseExpiredException( "No lease on " + ident + ": INode is not a regular file. " + (lease != null ? lease.toString() : "Holder " + holder + " does not have any open files.")); } final INodeFile file = inode.asFile(); if (!file.isUnderConstruction()) { Lease lease = leaseManager.getLease(holder); thrownew LeaseExpiredException( "No lease on " + ident + ": File is not open for writing. " + (lease != null ? lease.toString() : "Holder " + holder + " does not have any open files.")); } // No further modification is allowed on a deleted file. // A file is considered deleted, if it is not in the inodeMap or is marked // as deleted in the snapshot feature. if (isFileDeleted(file)) { thrownew FileNotFoundException(src); } // 通过file得到该file的clientName,判断是否持有该lease // 那么上面的情况,file会有两个clientName?这肯定不会出现,但在哪进行的处理呢?? String clientName = file.getFileUnderConstructionFeature().getClientName(); if (holder != null && !clientName.equals(holder)) { thrownew LeaseExpiredException("Lease mismatch on " + ident + " owned by " + clientName + " but is accessed by " + holder); } return file; }
privatebooleancreateBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag){ ... while (true) { boolean result = false; DataOutputStream out = null; try { // 与第一个dn建立http连接 s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); // 向下游发送请求 OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); // 接收下游返回的ack InputStream unbufIn = NetUtils.getInputStream(s); IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(unbufIn); ... // 向dn发送写请求,也可以理解为connect请求 // 由dn上的DataXceiverServer接收,new一个DataXceiver去响应 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect // 向下游发送写请求之后,等待下游返回的connect-ack // 接收到connect-ack之后才会向pipeline中send packet BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(blockReplyStream)); pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); // OOB是out of band的缩写,带外数据 // Got an restart OOB ack. // If a node is already restarting, this status is not likely from // the same node. If it is from a different node, it is not // from the local datanode. Thus it is safe to treat this as a // regular node error. if (PipelineAck.isRestartOOBStatus(pipelineStatus) && restartingNodeIndex == -1) { checkRestart = true; thrownew IOException("A datanode is restarting."); } // pipeline setup没有成功,抛出异常在catch中处理 if (pipelineStatus != SUCCESS) { if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { thrownew InvalidBlockTokenException( "Got access token error for connect ack with firstBadLink as " + firstBadLink); } else { thrownew IOException("Bad connect ack with firstBadLink as " + firstBadLink); } } assertnull == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success restartingNodeIndex = -1; hasError = false; } catch (IOException ie) { if (restartingNodeIndex == -1) { DFSClient.LOG.info("Exception in createBlockOutputStream", ie); } if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + nodes[0] + " : " + ie); // The encryption key used is invalid. refetchEncryptionKey--; dfsClient.clearDataEncryptionKey(); // Don't close the socket/exclude this node just yet. Try again with // a new encryption key. continue; }
// find the datanode that matches if (firstBadLink.length() != 0) { for (int i = 0; i < nodes.length; i++) { // NB: Unconditionally using the xfer addr w/o hostname if (firstBadLink.equals(nodes[i].getXferAddr())) { errorIndex = i; break; } } } else { assert checkRestart == false; errorIndex = 0; } // Check whether there is a restart worth waiting for. if (checkRestart && shouldWaitForRestart(errorIndex)) { restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.now(); restartingNodeIndex = errorIndex; errorIndex = -1; DFSClient.LOG.info("Waiting for the datanode to be restarted: " + nodes[restartingNodeIndex]); } hasError = true; setLastException(ie); result = false; // error } finally { if (!result) { IOUtils.closeSocket(s); s = null; IOUtils.closeStream(out); out = null; IOUtils.closeStream(blockReplyStream); blockReplyStream = null; } } return result; } }
// DataXceiver.writeBlock publicvoidwriteBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, finalint pipelineSize, finallong minBytesRcvd, finallong maxBytesRcvd, finallong latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, finalboolean allowLazyPersist)throws IOException { ... // reply to upstream datanode or client final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( getOutputStream(), HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup Status mirrorInStatus = SUCCESS; final String storageUuid; try { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy, allowLazyPersist);
try { one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; }
voidreceiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock)throws IOException { ... try { if (isClient && !isTransfer) { // 启动一个PacketResponder守护线程 responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams)); responder.start(); // start thread to processes responses } // 循环从socket中接收packet while (receivePacket() >= 0) { /* Receive until the last packet */ } // 该block的packet接收完毕之后,将PacketResponder线程关闭 // wait for all outstanding packet responses. And then // indicate responder to gracefully shutdown. // Mark that responder has been closed for future processing if (responder != null) { ((PacketResponder)responder.getRunnable()).close(); responderClosed = true; } ... } catch (IOException ioe) { if (datanode.isRestarting()) { // Do not throw if shutting down for restart. Otherwise, it will cause // premature termination of responder. LOG.info("Shutting down for restart (" + block + ")."); } else { LOG.info("Exception for " + block, ioe); throw ioe; } } finally { ... } }
/** * While writing to mirrorOut, failure to write to mirror should not * affect this datanode unless it is caused by interruption. */ privatevoidhandleMirrorOutError(IOException ioe)throws IOException { String bpid = block.getBlockPoolId(); LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing " + block + " to mirror " + mirrorAddr, ioe); if (Thread.interrupted()) { // shut down if the thread is interrupted throw ioe; } else { // encounter an error while writing to mirror // continue to run even if can not write to mirror // notify client of the error // and wait for the client to shut down the pipeline mirrorError = true; } }
publicvoidrun(){ boolean lastPacketInBlock = false; finallong startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (isRunning() && !lastPacketInBlock) { long totalAckTimeNanos = 0; boolean isInterrupted = false; try { Packet pkt = null; long expected = -2; // new 一个null的ack PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0; try { // 向下游发送packet发生error或者接收下游返回的ack时发生error, // 都有可能将mirrorError置为true,导致不进if也就是不接受下游的ack if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { // read an ack from downstream datanode ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug(myString + " got " + ack); } // Process an OOB ACK. ... seqno = ack.getSeqno(); } if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE) { // 从ackQueue中取出一个packet pkt = waitForAckHead(seqno); if (!isRunning()) { break; } expected = pkt.seqno; if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) { thrownew IOException(myString + "seqno: expected=" + expected + ", received=" + seqno); } if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { // The total ack time includes the ack times of downstream // nodes. // The value is 0 if this responder doesn't have a downstream // DN in the pipeline. totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; // Report the elapsed time from ack send to ack receive minus // the downstream ack time. long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos(); if (ackTimeNanos < 0) { if (LOG.isDebugEnabled()) { LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns."); } } else { datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos); } } lastPacketInBlock = pkt.lastPacketInBlock; } } catch (InterruptedException ine) { isInterrupted = true; } catch (IOException ioe) { if (Thread.interrupted()) { isInterrupted = true; } else { // continue to run even if can not read from mirror // notify client of the error // and wait for the client to shut down the pipeline mirrorError = true; LOG.info(myString, ioe); } }
if (Thread.interrupted() || isInterrupted) { /* * The receiver thread cancelled this thread. We could also check * any other status updates from the receiver thread (e.g. if it is * ok to write to replyOut). It is prudent to not send any more * status back to the client because this datanode has a problem. * The upstream datanode will detect that this datanode is bad, and * rightly so. * * The receiver thread can also interrupt this thread for sending * an out-of-band response upstream. */ LOG.info(myString + ": Thread is interrupted."); running = false; continue; }
if (lastPacketInBlock) { // Finalize the block and close the block file finalizeBlock(startTime); } // 向上游发送ack,包括接收下游的ack和从自己的ack sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0), (pkt != null ? pkt.ackStatus : Status.SUCCESS)); if (pkt != null) { // remove the packet from the ack queue removeAckHead(); } } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { datanode.checkDiskErrorAsync(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption receiverThread.interrupt(); } } } catch (Throwable e) { if (running) { LOG.info(myString, e); running = false; receiverThread.interrupt(); } } } LOG.info(myString + " terminating"); }
publicvoidrun(){ setName("ResponseProcessor for block " + block); PipelineAck ack = new PipelineAck(); while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { // process responses from datanodes. try { // read an ack from the pipeline long begin = Time.monotonicNow(); ack.readFields(blockReplyStream); ... long seqno = ack.getSeqno(); // processes response status from datanodes. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { final Status reply = ack.getReply(i); // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) { restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.now(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; DFSClient.LOG.info(message); thrownew IOException(message); } // node error if (reply != SUCCESS) { setErrorIndex(i); // first bad datanode thrownew IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]); } } assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack continue; }
// a success ack for a data packet Packet one; synchronized (dataQueue) { one = ackQueue.getFirst(); } // 接收到的seqno和ackQueue中的不一样,抛出异常 if (one.seqno != seqno) { thrownew IOException("ResponseProcessor: Expecting seqno " + " for block " + block + one.seqno + " but received " + seqno); } isLastPacketInBlock = one.lastPacketInBlock;
// Fail the packet write for testing in order to force a // pipeline recovery. if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) { failPacket = true; thrownew IOException( "Failing the last packet for testing."); } // update bytesAcked block.setNumBytes(one.getLastByteOffsetBlock());
privatebooleanprocessDatanodeError()throws IOException { if (response != null) { DFSClient.LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); returntrue; } // 关闭连接 closeStream(); // 将ackQueue中等待接收ack的packet重新放回dataQueue中,重新往pipeline发送 // move packets from ack queue to front of the data queue synchronized (dataQueue) { dataQueue.addAll(0, ackQueue); ackQueue.clear(); }
// Record the new pipeline failure recovery. if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { lastAckedSeqnoBeforeFailure = lastAckedSeqno; pipelineRecoveryCount = 1; } else { // If we had to recover the pipeline five times in a row for the // same packet, this client likely has corrupt data or corrupting // during transmission. if (++pipelineRecoveryCount > 5) { DFSClient.LOG.warn("Error recovering pipeline for writing " + block + ". Already retried 5 times for the same packet."); lastException.set(new IOException("Failing write. Tried pipeline " + "recovery 5 times without success.")); streamerClosed = true; returnfalse; } } // boolean doSleep = setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
// If we had an error while closing the pipeline, we go through a fast-path // where the BlockReceiver does not run. Instead, the DataNode just finalizes // the block immediately during the 'connect ack' process. So, we want to pull // the end-of-block packet from the dataQueue, since we don't actually have // a true pipeline to send it over. // // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet assert endOfBlockPacket.lastPacketInBlock; assert lastAckedSeqno == endOfBlockPacket.seqno - 1; lastAckedSeqno = endOfBlockPacket.seqno; dataQueue.notifyAll(); } endBlock(); } else { initDataStreaming(); } } return doSleep; }
privatebooleansetupPipelineForAppendOrRecovery()throws IOException { // check number of datanodes if (nodes == null || nodes.length == 0) { String msg = "Could not get block locations. " + "Source file \"" + src + "\" - Aborting..."; DFSClient.LOG.warn(msg); setLastException(new IOException(msg)); streamerClosed = true; returnfalse; } boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { // Sleep before reconnect if a dn is restarting. // This process will be repeated until the deadline or the datanode // starts back up. if (restartingNodeIndex >= 0) { // 4 seconds or the configured deadline period, whichever is shorter. // This is the retry interval and recovery will be retried in this // interval until timeout or success. long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, 4000L); try { Thread.sleep(delay); } catch (InterruptedException ie) { lastException.set(new IOException("Interrupted while waiting for " + "datanode to restart. " + nodes[restartingNodeIndex])); streamerClosed = true; returnfalse; } } // 此时hasError的值为true boolean isRecovery = hasError; // remove bad datanode from list of datanodes. // If errorIndex was not set (i.e. appends), then do not remove // any datanodes if (errorIndex >= 0) { StringBuilder pipelineMsg = new StringBuilder(); for (int j = 0; j < nodes.length; j++) { pipelineMsg.append(nodes[j]); if (j < nodes.length - 1) { pipelineMsg.append(", "); } } if (nodes.length <= 1) { lastException.set(new IOException("All datanodes " + pipelineMsg + " are bad. Aborting...")); streamerClosed = true; returnfalse; } DFSClient.LOG.warn("Error Recovery for block " + block + " in pipeline " + pipelineMsg + ": bad datanode " + nodes[errorIndex]); failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; arraycopy(nodes, newnodes, errorIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length]; arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length]; arraycopy(storageIDs, newStorageIDs, errorIndex); // 用剩下的dn组成一个新的pipeline setPipeline(newnodes, newStorageTypes, newStorageIDs);
// Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { // If the error came from a node further away than the restarting // node, the restart must have been complete. if (errorIndex > restartingNodeIndex) { restartingNodeIndex = -1; } elseif (errorIndex < restartingNodeIndex) { // the node index has shifted. restartingNodeIndex--; } else { // this shouldn't happen... assertfalse; } }
if (restartingNodeIndex == -1) { hasError = false; } lastException.set(null); errorIndex = -1; } // 检查是否需要替换发生故障的dn从而组成一个新的pipeline // 由dfs.client.block.write.replace-datanode-on-failure.policy参数控制策略 // Check if replace-datanode policy is satisfied. if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication, nodes, isAppend, isHflushed)) { try { // 向pipeline中新加一个dn addDatanode2ExistingPipeline(); } catch(IOException ioe) { if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { throw ioe; } DFSClient.LOG.warn("Failed to replace datanode." + " Continue with the remaining datanodes since " + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY + " is set to true.", ioe); } } // get a new generation stamp and an access token LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); // set up the pipeline again with the remaining nodes if (failPacket) { // for testing success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, // good reports should follow bad ones, if client committed // with those nodes. Thread.sleep(2000); } catch (InterruptedException ie) {} } else { success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); }
if (restartingNodeIndex >= 0) { assert hasError == true; // check errorIndex set above if (errorIndex == restartingNodeIndex) { // ignore, if came from the restarting node errorIndex = -1; } // still within the deadline if (Time.now() < restartDeadline) { continue; // with in the deadline } // expired. declare the restarting node dead restartDeadline = 0; int expiredNodeIndex = restartingNodeIndex; restartingNodeIndex = -1; DFSClient.LOG.warn("Datanode did not restart in time: " + nodes[expiredNodeIndex]); // Mark the restarting node as failed. If there is any other failed // node during the last pipeline construction attempt, it will not be // overwritten/dropped. In this case, the restarting node will get // excluded in the following attempt, if it still does not come up. if (errorIndex == -1) { errorIndex = expiredNodeIndex; } // From this point on, normal pipeline recovery applies. } } // while
if (success) { // update pipeline at the namenode ExtendedBlock newBlock = new ExtendedBlock( block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes, storageIDs); // update client side generation stamp block = newBlock; } returnfalse; // do not sleep, continue processing }
/** * DEFAULT condition: * Let r be the replication number. * Let n be the number of existing datanodes. * Add a new datanode only if r >= 3 and either * (1) floor(r/2) >= n; or * (2) r > n and the block is hflushed/appended. */ publicbooleansatisfy(finalshort replication, final DatanodeInfo[] existings, finalint n, finalboolean isAppend, finalboolean isHflushed){ if (replication < 3) { returnfalse; } else { if (n <= (replication/2)) { returntrue; } else { return isAppend || isHflushed; } } }
if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) { // If we had an error while closing the pipeline, we go through a fast-path // where the BlockReceiver does not run. Instead, the DataNode just finalizes // the block immediately during the 'connect ack' process. So, we want to pull // the end-of-block packet from the dataQueue, since we don't actually have // a true pipeline to send it over. // // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that // a client waiting on close() will be aware that the flush finished. // 如果在pipeline close阶段发生error,有一个快速的方法,该方法不用在dn上new BlockReceiver // 而是让dn在pipeline重建时就立马使block Finalized。因此只是将最后一个packet从dataQueue中取出 // 没有必要往一个pipeline中发送。 synchronized (dataQueue) { Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet assert endOfBlockPacket.lastPacketInBlock; assert lastAckedSeqno == endOfBlockPacket.seqno - 1; lastAckedSeqno = endOfBlockPacket.seqno; dataQueue.notifyAll(); } endBlock(); } else { initDataStreaming(); } }