static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes)throws IOException { HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; // retryCount 是 10 int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { // rpc 调用 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { ... if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { thrownew IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); // 启动DataStreamer线程 out.start(); return out; }
/** Get a lease and start automatic renewal */ privatevoidbeginFileLease(finallong inodeId, final DFSOutputStream out) throws IOException { getLeaseRenewer().put(inodeId, out, this); } // LeaseRenewer 是客户端check是否更新租约 // A thread per namenode per user synchronizedvoidput(finallong inodeId, final DFSOutputStream out, final DFSClient dfsc){ if (dfsc.isClientRunning()) { if (!isRunning() || isRenewerExpired()) { //start a new deamon with a new id. finalint id = ++currentId; daemon = new Daemon(new Runnable() { @Override publicvoidrun(){ try { if (LOG.isDebugEnabled()) { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " started"); } // 调用LeaseRenewer.run(final int id) // 在run中调用renew对租约续约 LeaseRenewer.this.run(id); } catch(InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug(LeaseRenewer.this.getClass().getSimpleName() + " is interrupted.", e); } } finally { synchronized(LeaseRenewer.this) { Factory.INSTANCE.remove(LeaseRenewer.this); } if (LOG.isDebugEnabled()) { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " exited"); } } } @Override public String toString(){ return String.valueOf(LeaseRenewer.this); } }); daemon.start(); } dfsc.putFileBeingWritten(inodeId, out); emptyTime = Long.MAX_VALUE; } }
privateintwrite1(byte b[], int off, int len)throws IOException { // 写入长度大于本地buf的长度时,直接写入本地buf的长度 if(count==0 && len>=buf.length) { // local buffer is empty and user buffer size >= local buffer size, so // simply checksum the user buffer and send it directly to the underlying // stream finalint length = buf.length; writeChecksumChunks(b, off, length); return length; } // 当len小于本地buf的长度时,先写入buf,当buf写满之后,flushBuffer // copy user data to local buffer int bytesToCopy = buf.length-count; bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; System.arraycopy(b, off, buf, count, bytesToCopy); count += bytesToCopy; if (count == buf.length) { // local buffer is full flushBuffer(); } return bytesToCopy; }
_写入数据时,是先将数据写入本地buf_,buf默认长度为this.buf = new byte[sum.getBytesPerChecksum() * BUFFER\_NUM\_CHUNKS]; BUFFER\_NUM\_CHUNKS = 9,即9个chunk的长度4608。buf写满之后对其数据生成chunksum写入packet。
// total bytes left minus unflushed bytes left return count - (bufLen - lenToFlush); }
在flushBuffer中依然会调用writeChecksumChunks
1 2 3 4 5 6 7 8 9 10 11
privatevoidwriteChecksumChunks(byte b[], int off, int len) throws IOException { // 计算checksum sum.calculateChunkedSums(b, off, len, checksum, 0); for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); // 一个chunk一个chunk的写入packet writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); } }
// DFSOutputStream.class protectedsynchronizedvoidwriteChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)throws IOException { ... // 如果当前currentPacket为null,则新创建一个 if (currentPacket == null) { currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++); ... } // 先写入checksum,然后写入data currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.numChunks++; bytesCurBlock += len;
// If packet is full, enqueue it for transmission // currentPacket已满 或者当前写入block的长度等于block的大小 if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) { ... // 将packet放入队列dataQueue中 waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { appendChunk = false; resetChecksumBufSize(); } // 最后一个packet时,可能会小于block的大小,需重新计算下packet的大小 if (!appendChunk) { int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. // 达到block大小之后,发生一个空的packet if (bytesCurBlock == blockSize) { currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; } } }
privatevoidwaitAndQueueCurrentPacket()throws IOException { synchronized (dataQueue) { try { // If queue is full, then wait till we have enough space // dfs.client.write.max-packets-in-flight 默认值80 // 当dataQueue和ackQueue的大小之和大于80时,等待 while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { try { dataQueue.wait(); } catch (InterruptedException e) { // If we get interrupted while waiting to queue data, we still need to get rid // of the current packet. This is because we have an invariant that if // currentPacket gets full, it will get queued before the next writeChunk. // // Rather than wait around for space in the queue, we should instead try to // return to the caller as soon as possible, even though we slightly overrun // the MAX_PACKETS length. Thread.currentThread().interrupt(); break; } } checkClosed(); // 将currentPacket放入dataQueue queueCurrentPacket(); } catch (ClosedChannelException e) { } } }
// DataStreamer.run publicvoidrun(){ long lastPacket = Time.now(); TraceScope traceScope = null; if (traceSpan != null) { traceScope = Trace.continueSpan(traceSpan); } while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } }
Packet one; try { // process datanode IO errors if any boolean doSleep = false; if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) { doSleep = processDatanodeError(); }
synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.now(); // dataQueue为null,并且时间未超时,则等待 while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; try { dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.now(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. // 发送packet,dataQueue为null,则发送一个心跳 if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); } else { one = dataQueue.getFirst(); // regular data packet } } assert one != null; // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { ... // 建立pipeline setPipeline(nextBlockOutputStream()); // 启动ResponseProcessor线程,更新DataStreamer的状态为DATA_STREAMING initDataStreaming(); } elseif (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { ... setupPipelineForAppendOrRecovery(); initDataStreaming(); }
long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); ... // 当前packet是block的最后一个packet,等待接收之前所有packet的ack if (one.lastPacketInBlock) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet // 将packet从dataQueue移到ackQueue,准备发送packet synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); } } ... // write out data to remote datanode try { // 将packet写入pipeline one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; } lastPacket = Time.now(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; }
if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; }
// Is this block full? // 将当前packet发送之后,即将当前packet放入ackQueue // 如果当前packet是最后一个,则继续等待此packet的ack, // 然后endBlock if (one.lastPacketInBlock) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; }
endBlock(); } if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException)e); } else { setLastException(new IOException("DataStreamer Exception: ",e)); } hasError = true; if (errorIndex == -1 && restartingNodeIndex == -1) { // Not a datanode issue streamerClosed = true; } } } if (traceScope != null) { traceScope.close(); } closeInternal(); }
// // Connect to first DataNode in the list. // 与nodes中的第一个datanode建立连接 // 向下游发送写请求,由Sender发送 success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) { thrownew IOException("Unable to create new block."); } return lb; }
privatebooleancreateBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag){ ... Status pipelineStatus = SUCCESS; String firstBadLink = ""; boolean checkRestart = false; ... // persist blocks on namenode on next flush persistBlocks.set(true);
int refetchEncryptionKey = 1; while (true) { boolean result = false; DataOutputStream out = null; try { assertnull == s : "Previous socket unclosed"; assertnull == blockReplyStream : "Previous blockReplyStream unclosed"; // 建立socket连接 s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(s); IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(unbufIn); ... // send the request // 向dn发送写请求,由DataXceiver接收 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(blockReplyStream)); pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); // Got an restart OOB ack. // If a node is already restarting, this status is not likely from // the same node. If it is from a different node, it is not // from the local datanode. Thus it is safe to treat this as a // regular node error. if (PipelineAck.isRestartOOBStatus(pipelineStatus) && restartingNodeIndex == -1) { checkRestart = true; thrownew IOException("A datanode is restarting."); } if (pipelineStatus != SUCCESS) { if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { thrownew InvalidBlockTokenException( "Got access token error for connect ack with firstBadLink as " + firstBadLink); } else { thrownew IOException("Bad connect ack with firstBadLink as " + firstBadLink); } } assertnull == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success restartingNodeIndex = -1; hasError = false; } catch (IOException ie) { if (restartingNodeIndex == -1) { DFSClient.LOG.info("Exception in createBlockOutputStream", ie); } if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + nodes[0] + " : " + ie); // The encryption key used is invalid. refetchEncryptionKey--; dfsClient.clearDataEncryptionKey(); // Don't close the socket/exclude this node just yet. Try again with // a new encryption key. continue; }
// find the datanode that matches if (firstBadLink.length() != 0) { for (int i = 0; i < nodes.length; i++) { // NB: Unconditionally using the xfer addr w/o hostname if (firstBadLink.equals(nodes[i].getXferAddr())) { errorIndex = i; break; } } } else { assert checkRestart == false; errorIndex = 0; } // Check whether there is a restart worth waiting for. if (checkRestart && shouldWaitForRestart(errorIndex)) { restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.now(); restartingNodeIndex = errorIndex; errorIndex = -1; DFSClient.LOG.info("Waiting for the datanode to be restarted: " + nodes[restartingNodeIndex]); } hasError = true; setLastException(ie); result = false; // error } finally { if (!result) { IOUtils.closeSocket(s); s = null; IOUtils.closeStream(out); out = null; IOUtils.closeStream(blockReplyStream); blockReplyStream = null; } } return result; } }
PacketHeader header = new PacketHeader( pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); // checksumPos不等于dataStart时,将checksum移动到data前面, // 紧挨着data,为header空出足够的空间 if (checksumPos != dataStart) { // Move the checksum to cover the gap. This can happen for the last // packet or during an hflush/hsync call. System.arraycopy(buf, checksumStart, buf, dataStart - checksumLen , checksumLen); checksumPos = dataStart; checksumStart = checksumPos - checksumLen; } finalint headerStart = checksumStart - header.getSerializedSize(); assert checksumStart + 1 >= header.getSerializedSize(); assert checksumPos == dataStart; assert headerStart >= 0; assert headerStart + header.getSerializedSize() == checksumStart; // Copy the header data into the buffer immediately preceding the checksum // data. // 将header复制到packet的buf中,组成一个完整的packet System.arraycopy(header.getBytes(), 0, buf, headerStart, header.getSerializedSize()); // corrupt the data for testing. if (DFSClientFaultInjector.get().corruptPacket()) { buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; }
// Write the now contiguous full packet to the output stream. // 将buf写入输出流中 stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
publicvoidrun(){ int opsProcessed = 0; Op op = null;
try { ... // We process requests in a loop, and stay around for a short timeout. // This optimistic behaviour allows the other end to reuse connections. // Setting keepalive timeout to 0 disable this behavior. do { updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try { ... // 读取操作码 op = readOp(); } catch (InterruptedIOException ignored) { // Time out while we wait for client rpc break; } catch (IOException err) { ... } ... // 处理操作码 processOp(op); ++opsProcessed; } while ((peer != null) && (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0)); } catch (Throwable t) { ... } finally { ... } }
// Receiver.class protectedfinalvoidprocessOp(Op op)throws IOException { switch(op) { case READ_BLOCK: opReadBlock(); break; case WRITE_BLOCK: opWriteBlock(in); break; case REPLACE_BLOCK: opReplaceBlock(in); break; case COPY_BLOCK: opCopyBlock(in); break; case BLOCK_CHECKSUM: opBlockChecksum(in); break; case TRANSFER_BLOCK: opTransferBlock(in); break; case REQUEST_SHORT_CIRCUIT_FDS: opRequestShortCircuitFds(in); break; case RELEASE_SHORT_CIRCUIT_FDS: opReleaseShortCircuitFds(in); break; case REQUEST_SHORT_CIRCUIT_SHM: opRequestShortCircuitShm(in); break; default: thrownew IOException("Unknown op " + op + " in data stream"); } }
// DataXceiver.class publicvoidwriteBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, finalint pipelineSize, finallong minBytesRcvd, finallong maxBytesRcvd, finallong latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, finalboolean allowLazyPersist)throws IOException { previousOpClientName = clientname; updateCurrentThreadName("Receiving block " + block); // clientname不为null,则isDatanode为false,isClient为true finalboolean isDatanode = clientname.length() == 0; finalboolean isClient = !isDatanode; finalboolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED; ... // We later mutate block's generation stamp and length, but we need to // forward the original version of the block to downstream mirrors, so // make a copy here. final ExtendedBlock originalBlock = new ExtendedBlock(block); if (block.getNumBytes() == 0) { block.setNumBytes(dataXceiverServer.estimateBlockSize); } ... // reply to upstream datanode or client // 向upstream或者client建立一个输出流,用于发送ack final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( getOutputStream(), HdfsConstants.SMALL_BUFFER_SIZE)); ... DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup Status mirrorInStatus = SUCCESS; final String storageUuid; try { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver // 实例化blockReceiver,用于接收packet blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy, allowLazyPersist);
// Do not propagate allowLazyPersist to downstream DataNodes. // 向downstream发送写请求 new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, false);
mirrorOut.flush();
// read connect ack (only for clients, not for replication req) // 得到下游的connect-ack if (isClient) { BlockOpResponseProto connectAck = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { LOG.info("Datanode " + targets.length + " got response for connect ack " + " from downstream datanode with firstbadlink as " + firstBadLink); } } // 这个catch捕获的是pipeline建立时的异常 // 当前dn与下游建立连接时发生的异常 } catch (IOException e) { if (isClient) { BlockOpResponseProto.newBuilder() .setStatus(ERROR) // NB: Unconditionally using the xfer addr w/o hostname .setFirstBadLink(targets[0].getXferAddr()) .build() .writeDelimitedTo(replyOut); replyOut.flush(); } IOUtils.closeStream(mirrorOut); mirrorOut = null; IOUtils.closeStream(mirrorIn); mirrorIn = null; IOUtils.closeSocket(mirrorSock); mirrorSock = null; if (isClient) { LOG.error(datanode + ":Exception transfering block " + block + " to mirror " + mirrorNode + ": " + e); throw e; } else { LOG.info(datanode + ":Exception transfering " + block + " to mirror " + mirrorNode + "- continuing without the mirror", e); } } }// if结束,判断是否有下游dn,是否建立连接
// send connect-ack to source for clients and not transfer-RBW/Finalized if (isClient && !isTransfer) { ... // 向upstream发送connect-ack BlockOpResponseProto.newBuilder() .setStatus(mirrorInStatus) .setFirstBadLink(firstBadLink) .build() .writeDelimitedTo(replyOut); replyOut.flush(); } // 向上游发送connect-ack之后准备接收block packet // receive the block and mirror to the next target if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; // 接收block blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets, false); ... }
// update its generation stamp if (isClient && stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { block.setGenerationStamp(latestGenerationStamp); block.setNumBytes(minBytesRcvd); } // if this write is for a replication request or recovering // a failed close for client, then confirm block. For other client-writes, // the block is finalized in the PacketResponder. if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid); LOG.info("Received " + block + " src: " + remoteAddress + " dest: " + localAddress + " of size " + block.getNumBytes()); }
} catch (IOException ioe) { LOG.info("opWriteBlock " + block + " received exception " + ioe); throw ioe; } finally { // close all opened streams ... blockReceiver = null; } ... }
try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams)); responder.start(); // start thread to processes responses } // 这里是个空循环 // 不停的调用receivePacket接收packet,直到整个block的packet接收完 while (receivePacket() >= 0) { /* Receive until the last packet */ }
// wait for all outstanding packet responses. And then // indicate responder to gracefully shutdown. // Mark that responder has been closed for future processing if (responder != null) { ((PacketResponder)responder.getRunnable()).close(); responderClosed = true; }
// If this write is for a replication or transfer-RBW/Finalized, // then finalize block or convert temporary to RBW. // For client-writes, the block is finalized in the PacketResponder. if (isDatanode || isTransfer) { // close the block/crc files close(); block.setNumBytes(replicaInfo.getNumBytes());
if (stage == BlockConstructionStage.TRANSFER_RBW) { // for TRANSFER_RBW, convert temporary to RBW datanode.data.convertTemporaryToRbw(block); } else { // for isDatnode or TRANSFER_FINALIZED // Finalize the block. datanode.data.finalizeBlock(block); } datanode.metrics.incrBlocksWritten(); }
privateintreceivePacket()throws IOException { // read the next packet // 从流中读取packet packetReceiver.receiveNextPacket(in);
PacketHeader header = packetReceiver.getHeader(); ... // Sanity check the header ... long offsetInBlock = header.getOffsetInBlock(); long seqno = header.getSeqno(); boolean lastPacketInBlock = header.isLastPacketInBlock(); finalint len = header.getDataLen(); boolean syncBlock = header.getSyncBlock();
// avoid double sync'ing on close if (syncBlock && lastPacketInBlock) { this.syncOnClose = false; }
// update received bytes finallong firstByteInBlock = offsetInBlock; offsetInBlock += len; if (replicaInfo.getNumBytes() < offsetInBlock) { replicaInfo.setNumBytes(offsetInBlock); } // put in queue for pending acks, unless sync was requested // 在向downstream发送packet之前,将packet放入ackQueue中 if (responder != null && !syncBlock && !shouldVerifyChecksum()) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, Status.SUCCESS); }
//First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { try { long begin = Time.monotonicNow(); // 向downstream发送packet packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); long duration = Time.monotonicNow() - begin; if (duration > datanodeSlowLogThresholdMs) { LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); } } catch (IOException e) { handleMirrorOutError(e); } } // 将packet的data部分写入本地时的buf ByteBuffer dataBuf = packetReceiver.getDataSlice(); ByteBuffer checksumBuf = packetReceiver.getChecksumSlice(); if (lastPacketInBlock || len == 0) { if(LOG.isDebugEnabled()) { LOG.debug("Receiving an empty packet or the end of the block " + block); } // sync block if requested if (syncBlock) { flushOrSync(true); } } else { finalint checksumLen = diskChecksum.getChecksumSize(len); finalint checksumReceivedLen = checksumBuf.capacity();
if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) { thrownew IOException("Invalid checksum length: received length is " + checksumReceivedLen + " but expected length is " + checksumLen); }
if (checksumReceivedLen > 0 && shouldVerifyChecksum()) { try { // 校验checksum verifyChunks(dataBuf, checksumBuf); } catch (IOException ioe) { // checksum error detected locally. there is no reason to continue. if (responder != null) { try { ((PacketResponder) responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, Status.ERROR_CHECKSUM); // Wait until the responder sends back the response // and interrupt this thread. Thread.sleep(3000); } catch (InterruptedException e) { } } thrownew IOException("Terminating due to a checksum error." + ioe); }
if (needsChecksumTranslation) { // overwrite the checksums in the packet buffer with the // appropriate polynomial for the disk storage. translateChunks(dataBuf, checksumBuf); } } // checksum在传输过程中丢失,则重新计算 if (checksumReceivedLen == 0 && !streams.isTransientStorage()) { // checksum is missing, need to calculate it checksumBuf = ByteBuffer.allocate(checksumLen); diskChecksum.calculateChunkedSums(dataBuf, checksumBuf); } // by this point, the data in the buffer uses the disk checksum
finalboolean shouldNotWriteChecksum = checksumReceivedLen == 0 && streams.isTransientStorage(); try { // 得到磁盘中当前block的长度 long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen<offsetInBlock) { //finally write to the disk : // 当磁盘中已写入block的长度不是chunk的整数倍,则将最后一个checksum进行重写 if (onDiskLen % bytesPerChecksum != 0) { // prepare to overwrite last checksum adjustCrcFilePosition(); } // If this is a partial chunk, then read in pre-existing checksum Checksum partialCrc = null; if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) { if (LOG.isDebugEnabled()) { LOG.debug("receivePacket for " + block + ": bytesPerChecksum=" + bytesPerChecksum + " does not divide firstByteInBlock=" + firstByteInBlock); } long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + onDiskLen / bytesPerChecksum * checksumSize; partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum); }
int startByteToDisk = (int)(onDiskLen-firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position(); // data的len int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. long begin = Time.monotonicNow(); // 将data写入磁盘 out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); long duration = Time.monotonicNow() - begin; ... finalbyte[] lastCrc; if (shouldNotWriteChecksum) { lastCrc = null; } elseif (partialCrc != null) { // If this is a partial chunk, then verify that this is the only // chunk in the packet. Calculate new crc for this chunk. if (len > bytesPerChecksum) { thrownew IOException("Unexpected packet data length for " + block + " from " + inAddr + ": a partial chunk must be " + " sent in an individual packet (data length = " + len + " > bytesPerChecksum = " + bytesPerChecksum + ")"); } partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk); byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length); checksumOut.write(buf); if(LOG.isDebugEnabled()) { LOG.debug("Writing out partial crc for data len " + len); } partialCrc = null; } else { // write checksum finalint offset = checksumBuf.arrayOffset() + checksumBuf.position(); finalint end = offset + checksumLen; lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize, end); // 将checksum写入磁盘 checksumOut.write(checksumBuf.array(), offset, checksumLen); }
publicvoidrun(){ boolean lastPacketInBlock = false; finallong startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (isRunning() && !lastPacketInBlock) { long totalAckTimeNanos = 0; boolean isInterrupted = false; try { Packet pkt = null; long expected = -2; PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0; try { if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { // read an ack from downstream datanode // 读取ack ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug(myString + " got " + ack); } // Process an OOB ACK. Status oobStatus = ack.getOOBStatus(); if (oobStatus != null) { LOG.info("Relaying an out of band ack of type " + oobStatus); sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L, Status.SUCCESS); continue; } // 新收到的ack的seqno seqno = ack.getSeqno(); } // 获得一个ack if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE) { // 按照发送packet的顺序接收packet ack pkt = waitForAckHead(seqno); if (!isRunning()) { break; } expected = pkt.seqno; ... lastPacketInBlock = pkt.lastPacketInBlock; } } catch (InterruptedException ine) { isInterrupted = true; } catch (IOException ioe) { if (Thread.interrupted()) { isInterrupted = true; } else { // continue to run even if can not read from mirror // notify client of the error // and wait for the client to shut down the pipeline mirrorError = true; LOG.info(myString, ioe); } }
if (Thread.interrupted() || isInterrupted) { /* * The receiver thread cancelled this thread. We could also check * any other status updates from the receiver thread (e.g. if it is * ok to write to replyOut). It is prudent to not send any more * status back to the client because this datanode has a problem. * The upstream datanode will detect that this datanode is bad, and * rightly so. * * The receiver thread can also interrupt this thread for sending * an out-of-band response upstream. */ LOG.info(myString + ": Thread is interrupted."); running = false; continue; }
if (lastPacketInBlock) { // Finalize the block and close the block file finalizeBlock(startTime); } // 向upstream发送ack sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0), (pkt != null ? pkt.ackStatus : Status.SUCCESS)); if (pkt != null) { // remove the packet from the ack queue removeAckHead(); } } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { datanode.checkDiskErrorAsync(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption receiverThread.interrupt(); } } } catch (Throwable e) { if (running) { LOG.info(myString, e); running = false; receiverThread.interrupt(); } } } LOG.info(myString + " terminating"); }
publicvoidrun(){ ... PipelineAck ack = new PipelineAck(); while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { // process responses from datanodes. try { // read an ack from the pipeline long begin = Time.monotonicNow(); // 读取ack ack.readFields(blockReplyStream); ... long seqno = ack.getSeqno(); // processes response status from datanodes. // 从pipeline的最后一个dn开始接收ack for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { final Status reply = ack.getReply(i); ... // node error if (reply != SUCCESS) { setErrorIndex(i); // first bad datanode // throw 则跳出for循环,被catch捕获异常 thrownew IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]); } } assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack continue; }
// a success ack for a data packet Packet one; synchronized (dataQueue) { one = ackQueue.getFirst(); } // 从输入流中读取的ack的seqno与ackQueue中取得的seqno不一样则抛出异常 if (one.seqno != seqno) { thrownew IOException("ResponseProcessor: Expecting seqno " + " for block " + block + one.seqno + " but received " + seqno); } isLastPacketInBlock = one.lastPacketInBlock;
// Fail the packet write for testing in order to force a // pipeline recovery. if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) { failPacket = true; thrownew IOException( "Failing the last packet for testing."); } // update bytesAcked block.setNumBytes(one.getLastByteOffsetBlock()); // 接收到ack后,从ackQueue中移除packet synchronized (dataQueue) { lastAckedSeqno = seqno; ackQueue.removeFirst(); dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager); } } catch (Exception e) { if (!responderClosed) { if (e instanceof IOException) { setLastException((IOException)e); } hasError = true; // If no explicit error report was received, mark the primary // node as failed. tryMarkPrimaryDatanodeFailed(); synchronized (dataQueue) { dataQueue.notifyAll(); } if (restartingNodeIndex == -1) { DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " + " for block " + block, e); } responderClosed = true; } } } }
privatebooleanprocessDatanodeError()throws IOException { if (response != null) { DFSClient.LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); returntrue; } // 关闭pipeline流 closeStream(); // 将ackQueue中的packet移到dataQueue // move packets from ack queue to front of the data queue synchronized (dataQueue) { dataQueue.addAll(0, ackQueue); ackQueue.clear(); }
// Record the new pipeline failure recovery. // 创建新的pipeline可以进行重试 if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { lastAckedSeqnoBeforeFailure = lastAckedSeqno; pipelineRecoveryCount = 1; } else { // If we had to recover the pipeline five times in a row for the // same packet, this client likely has corrupt data or corrupting // during transmission. if (++pipelineRecoveryCount > 5) { DFSClient.LOG.warn("Error recovering pipeline for writing " + block + ". Already retried 5 times for the same packet."); lastException.set(new IOException("Failing write. Tried pipeline " + "recovery 5 times without success.")); streamerClosed = true; returnfalse; } } // 重建pipeline boolean doSleep = setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
// If we had an error while closing the pipeline, we go through a fast-path // where the BlockReceiver does not run. Instead, the DataNode just finalizes // the block immediately during the 'connect ack' process. So, we want to pull // the end-of-block packet from the dataQueue, since we don't actually have // a true pipeline to send it over. // // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet assert endOfBlockPacket.lastPacketInBlock; assert lastAckedSeqno == endOfBlockPacket.seqno - 1; lastAckedSeqno = endOfBlockPacket.seqno; dataQueue.notifyAll(); } endBlock(); } else { // 启动ResponseProcess线程 initDataStreaming(); } } return doSleep; }
privatebooleansetupPipelineForAppendOrRecovery()throws IOException { // check number of datanodes ... boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { // Sleep before reconnect if a dn is restarting. // This process will be repeated until the deadline or the datanode // starts back up. if (restartingNodeIndex >= 0) { // 4 seconds or the configured deadline period, whichever is shorter. // This is the retry interval and recovery will be retried in this // interval until timeout or success. long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, 4000L); try { Thread.sleep(delay); } catch (InterruptedException ie) { lastException.set(new IOException("Interrupted while waiting for " + "datanode to restart. " + nodes[restartingNodeIndex])); streamerClosed = true; returnfalse; } } boolean isRecovery = hasError; // remove bad datanode from list of datanodes. // If errorIndex was not set (i.e. appends), then do not remove // any datanodes // if (errorIndex >= 0) { ... if (nodes.length <= 1) { lastException.set(new IOException("All datanodes " + pipelineMsg + " are bad. Aborting...")); streamerClosed = true; returnfalse; } // 将错误的dn加入failed列表中 failed.add(nodes[errorIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; // 将正常的dn复制到新的数组newnodes里 arraycopy(nodes, newnodes, errorIndex); final StorageType[] newStorageTypes = new StorageType[newnodes.length]; arraycopy(storageTypes, newStorageTypes, errorIndex); final String[] newStorageIDs = new String[newnodes.length]; arraycopy(storageIDs, newStorageIDs, errorIndex); // 使用newnodes设置pipeline setPipeline(newnodes, newStorageTypes, newStorageIDs);
// Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { // If the error came from a node further away than the restarting // node, the restart must have been complete. if (errorIndex > restartingNodeIndex) { restartingNodeIndex = -1; } elseif (errorIndex < restartingNodeIndex) { // the node index has shifted. restartingNodeIndex--; } else { // this shouldn't happen... assertfalse; } }
// Check if replace-datanode policy is satisfied. if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication, nodes, isAppend, isHflushed)) { try { // 补全pipeline的节点数量 addDatanode2ExistingPipeline(); } catch(IOException ioe) { ... } }
// get a new generation stamp and an access token // 生成一个新的stamp LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); // set up the pipeline again with the remaining nodes if (failPacket) { // for testing success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, // good reports should follow bad ones, if client committed // with those nodes. Thread.sleep(2000); } catch (InterruptedException ie) {} } else { // 与pipeline中的第一个dn建立连接 success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); } ... } // while // 更新block的stamp if (success) { // update pipeline at the namenode ExtendedBlock newBlock = new ExtendedBlock( block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes, storageIDs); // update client side generation stamp block = newBlock; } returnfalse; // do not sleep, continue processing }
privatevoidaddDatanode2ExistingPipeline()throws IOException { ... /* * Is data transfer necessary? We have the following cases. * * Case 1: Failure in Pipeline Setup * - Append * + Transfer the stored replica, which may be a RBW or a finalized. * - Create * + If no data, then no transfer is required. * + If there are data written, transfer RBW. This case may happens * when there are streaming failure earlier in this pipeline. * * Case 2: Failure in Streaming * - Append/Create: * + transfer RBW * * Case 3: Failure in Close * - Append/Create: * + no transfer, let NameNode replicates the block. */ ... //get a new datanode final DatanodeInfo[] original = nodes; // 从namenode得到一个新的dn final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( src, fileId, block, nodes, storageIDs, failed.toArray(new DatanodeInfo[failed.size()]), 1, dfsClient.clientName); // 更新pipeline setPipeline(lb);
//find the new datanode finalint d = findNewDatanode(original); //transfer replica final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; // transfer的目标节点,也就是新添加的节点 final DatanodeInfo[] targets = {nodes[d]}; final StorageType[] targetStorageTypes = {storageTypes[d]}; // tarnsfer transfer(src, targets, targetStorageTypes, lb.getBlockToken()); }