publicsynchronizedintread(finalbyte buf[], int off, int len)throws IOException { // ReaderStrategy 将不同的BlockReader进行了封装 // 真正读数据的是BlockReader对象 ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); return readWithStrategy(byteArrayReader, off, len); }
privateintreadWithStrategy(ReaderStrategy strategy, int off, int len)throws IOException { ... Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); failures = 0; if (pos < getFileLength()) { int retries = 2; while (retries > 0) { try { // currentNode can be left as null if previous read had a checksum // error on the same block. See HDFS-3067 // pos 和 blockEnd 会在blockSeekTo -> getBlockAt 中赋值 if (pos > blockEnd || currentNode == null) { // 当前position所在的block currentNode = blockSeekTo(pos); } int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); if (locatedBlocks.isLastBlockComplete()) { realLen = (int) Math.min(realLen, locatedBlocks.getFileLength()); } // 读取buffer int result = readBuffer(strategy, off, realLen, corruptedBlockMap); if (result >= 0) { pos += result; } else { // got a EOS from reader though we expect more data on it. thrownew IOException("Unexpected EOS from the reader"); } if (dfsClient.stats != null) { dfsClient.stats.incrementBytesRead(result); } return result; // 如果检测到ChecksumException 则只抛出异常,再次进行循环 } catch (ChecksumException ce) { throw ce; // 如果捕获到IO异常,则retries次数减1,进入下一次循环 } catch (IOException e) { if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } blockEnd = -1; if (currentNode != null) { addToDeadNodes(currentNode); } if (--retries == 0) { throw e; } } finally { // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. reportCheckSumFailure(corruptedBlockMap, currentLocatedBlock.getLocations().length); } } } return -1; }
public int findBlock(long offset) { // create fake block of size 0 as a key // 创建一个LocatedBlock对象,便于和LocatedBlocks中的block进行比较 LocatedBlock key = newLocatedBlock( newExtendedBlock(), newDatanodeInfo[0], 0L, false); key.setStartOffset(offset); key.getBlock().setNumBytes(1); // 重写comparator Comparator<LocatedBlock> comp = newComparator<LocatedBlock>() { // Returns 0 iff a is inside b or b is inside a @Override public int compare(LocatedBlock a, LocatedBlock b) { long aBeg = a.getStartOffset(); long bBeg = b.getStartOffset(); long aEnd = aBeg + a.getBlockSize(); long bEnd = bBeg + b.getBlockSize(); if(aBeg <= bBeg && bEnd <= aEnd || bBeg <= aBeg && aEnd <= bEnd) return0; // one of the blocks is inside the other if(aBeg < bBeg) return-1; // a's left bound is to the left of the b's return1; } }; // 调用Collections的二分查找 returnCollections.binarySearch(blocks, key, comp); }
DatanodeInfo[] nodes = block.getLocations(); if (nodes == null || nodes.length == 0) { DFSClient.LOG.info("No node available for " + blockInfo); } DFSClient.LOG.info("Could not obtain " + block.getBlock() + " from any node: " + ie + errMsg + ". Will get new block locations from namenode and retry..."); try { // Introducing a random factor to the wait time before another retry. // The wait time is dependent on # of failures and a random factor. // At the first time of getting a BlockMissingException, the wait time // is a random number between 0..3000 ms. If the first retry // still fails, we will wait 3000 ms grace period before the 2nd retry. // Also at the second retry, the waiting window is expanded to 6000 ms // alleviating the request rate from the server. Similarly the 3rd retry // will wait 6000ms grace period before retry and the waiting window is // expanded to 9000ms. finalint timeWindow = dfsClient.getConf().timeWindow; double waitTime = timeWindow * failures + // grace period for the last round of attempt timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); } catch (InterruptedException iex) { } // 从block的所有dn中没有找到合适的dn,则将deadNodes清空,重新获取该block的信息 // 一个block一个deadNodes deadNodes.clear(); //2nd option is to remove only nodes[blockId] openInfo(); block = getBlockAt(block.getStartOffset(), false); failures++; continue; } } }
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, Collection<DatanodeInfo> ignoredNodes)throws IOException { DatanodeInfo[] nodes = block.getLocations(); StorageType[] storageTypes = block.getStorageTypes(); DatanodeInfo chosenNode = null; StorageType storageType = null; if (nodes != null) { // 遍历选出非deadNode节点 for (int i = 0; i < nodes.length; i++) { if (!deadNodes.containsKey(nodes[i]) && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same // index to get storage type. if (storageTypes != null && i < storageTypes.length) { storageType = storageTypes[i]; } break; } } } // 循环了一圈依然没有找到合适的dn if (chosenNode == null) { thrownew IOException("No live nodes contain block " + block.getBlock() + " after checking nodes = " + Arrays.toString(nodes) + ", ignoredNodes = " + ignoredNodes); } final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); returnnew DNAddrPair(chosenNode, targetAddr, storageType); }
chooseDataNode结束返回到blockSeekTo中,由BlockReaderFactory创建BlockReader对象,这里创建的BlockReader对象会根据short circuit local read还是远程读创建不同的BlockReader。BlockReaderFactory使用了Builder设计模式,代码如下:
// corruptedBlockMap在readWithStrategy中被实例化 privatesynchronizedintreadBuffer(ReaderStrategy reader, int off, int len, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { IOException ioe; /* we retry current node only once. So this is set to true only here. * Intention is to handle one common case of an error that is not a * failure on datanode or client : when DataNode closes the connection * since client is idle. If there are other cases of "non-errors" then * then a datanode might be retried by setting this to true again. */ boolean retryCurrentNode = true;
while (true) { // retry as many times as seekToNewSource allows. try { return reader.doRead(blockReader, off, len, readStatistics); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode + " at " + ce.getPos()); ioe = ce; retryCurrentNode = false; // we want to remember which block replicas we have tried addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, corruptedBlockMap); } catch ( IOException e ) { if (!retryCurrentNode) { DFSClient.LOG.warn("Exception while reading from " + getCurrentBlock() + " of " + src + " from " + currentNode, e); } ioe = e; } boolean sourceFound = false; if (retryCurrentNode) { /* possibly retry the same node so that transient errors don't * result in application level failures (e.g. Datanode could have * closed the connection because the client is idle for too long). */ sourceFound = seekToBlockSource(pos); } else { addToDeadNodes(currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { throw ioe; } retryCurrentNode = false; } }
PacketHeader curHeader = packetReceiver.getHeader(); curDataSlice = packetReceiver.getDataSlice(); assert curDataSlice.capacity() == curHeader.getDataLen(); ... if (curHeader.getDataLen() > 0) { // bytesPerChecksum 多少字节一个checkcum // curHeader.getDataLen得到数据data的长度,然后得出需要多少个chunks int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; // 得出checksum的长度 int checksumsLen = chunks * checksumSize; ... lastSeqNo = curHeader.getSeqno(); if (verifyChecksum && curDataSlice.remaining() > 0) { // N.B.: the checksum error offset reported here is actually // relative to the start of the block, not the start of the file. // This is slightly misleading, but preserves the behavior from // the older BlockReader. // 利用checksum检查数据是否正确 checksum.verifyChunkedSums(curDataSlice, packetReceiver.getChecksumSlice(), filename, curHeader.getOffsetInBlock()); } bytesNeededToFinish -= curHeader.getDataLen(); } // First packet will include some data prior to the first byte // the user requested. Skip it. if (curHeader.getOffsetInBlock() < startOffset) { int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); curDataSlice.position(newPos); }
// If we've now satisfied the whole client read, read one last packet // header, which should be empty // bytesNeededToFinish是表示还需要读多少字节, // 这个是在哪赋值的?怎么知道还有多少字节要读 if (bytesNeededToFinish <= 0) { // 读取结束 readTrailingEmptyPacket(); if (verifyChecksum) { sendReadResult(Status.CHECKSUM_OK); } else { sendReadResult(Status.SUCCESS); } } }
// Each packet looks like: // PLEN HLEN HEADER CHECKSUMS DATA // 32-bit 16-bit <protobuf> <variable length> // // PLEN: Payload length // = length(PLEN) + length(CHECKSUMS) + length(DATA) // This length includes its own encoded length in // the sum for historical reasons. // // HLEN: Header length // = length(HEADER) // // HEADER: the actual packet header fields, encoded in protobuf // CHECKSUMS: the crcs for the data chunk. May be missing if // checksums were not requested // DATA the actual block data
io.file.buffer.size(4096) The size of buffer for use in sequence files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.
Just like dfs.datanode.drop.cache.behind.reads, this setting causes the page cache to be dropped behind HDFS reads, potentially freeing up more memory for other uses. Unlike dfs.datanode.drop.cache.behind.reads, this is a client-side setting rather than a setting for the entire datanode. If present, this setting will override the DataNode default. If the native libraries are not available to the DataNode, this configuration has no effect.
In some workloads, the data read from HDFS is known to be significantly large enough that it is unlikely to be useful to cache it in the operating system buffer cache. In this case, the DataNode may be configured to automatically purge all data from the buffer cache after it is delivered to the client. This behavior is automatically disabled for workloads which read only short sections of a block (e.g HBase random-IO workloads). This may improve performance for some workloads by freeing buffer cache space usage for more cacheable data. If the Hadoop native libraries are not available, this configuration has no effect.
When using remote reads, this setting causes the datanode to read ahead in the block file using posix_fadvise, potentially decreasing I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side setting rather than a setting for the entire datanode. If present, this setting will override the DataNode default. When using local reads, this setting determines how much readahead we do in BlockReaderLocal. If the native libraries are not available to the DataNode, this configuration has no effect.