public FSDataInputStream open(Path f)throws IOException { // io.file.buffer.size The size of buffer for use in sequence files. return open(f, getConf().getInt("io.file.buffer.size", 4096)); }
synchronizedvoidopenInfo()throws IOException, UnresolvedLinkException { // fetchLocatedBlocksAndGetLastBlockLength有两个功能, // 一个是对locatedBlocks赋值 // 另一个是返回最后一个未构造完成的block的长度 lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength; // 如果获取失败,则进行重试,默认是3次 // dfs.client.retry.times.get-last-block-length 设置重试次数 // 当集群重启时,dn可能没来及汇报block,此时可能存在部分block的location无法从nn上读出 while (retriesForLastBlockLength > 0) { // Getting last block length as -1 is a special case. When cluster // restarts, DNs may not report immediately. At this time partial block // locations will not be available with NN for getting the length. Lets // retry for 3 times to get the length. if (lastBlockBeingWrittenLength == -1) { DFSClient.LOG.warn("Last block locations not available. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times"); // 等待dfs.client.retry.interval-ms.get-last-block-length ms之后重试 // 默认是4000ms waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); } else { break; } retriesForLastBlockLength--; } if (retriesForLastBlockLength == 0) { thrownew IOException("Could not obtain the last block locations."); } }
privatelongfetchLocatedBlocksAndGetLastBlockLength()throws IOException { // DFSClient 通过rpc调用FSNamespace.getLocatedBlocks得到部分block的locations final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); ... if (locatedBlocks != null) { Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { thrownew IOException("Blocklist for " + src + " has changed!"); } } } locatedBlocks = newInfo; long lastBlockBeingWrittenLength = 0; // 判断此src是否有正在构建的block,有则返回当前的长度,没有则返回0,-1表示失败 if (!locatedBlocks.isLastBlockComplete()) { final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); if (last != null) { if (last.getLocations().length == 0) { if (last.getBlockSize() == 0) { // if the length is zero, then no data has been written to // datanode. So no need to wait for the locations. return0; } return -1; } finallong len = readBlockLength(last); last.getBlock().setNumBytes(len); lastBlockBeingWrittenLength = len; } }
// file 的某个block privatefinal ExtendedBlock b; // block中第一个字节在file中的偏移量 privatelong offset; // block的副本所在dn数组 privatefinal DatanodeInfo[] locs; // 每个副本所在的磁盘id privatefinal String[] storageIDs; // 每个副本所在磁盘的类型 privatefinal StorageType[] storageTypes; // corrupt flag is true if all of the replicas of a block are corrupt. // else false. If block has few corrupt replicas, they are filtered and // their locations are not part of this object privateboolean corrupt; private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>(); /** * List of cached datanode locations */ private DatanodeInfo[] cachedLocs;
// lastBlock is not part of getLocatedBlocks(), might need to sort it too LocatedBlock lastBlock = blocks.getLastLocatedBlock(); if (lastBlock != null) { ArrayList<LocatedBlock> lastBlockList = Lists.newArrayListWithCapacity(1); lastBlockList.add(lastBlock); blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, lastBlockList); } } // 将按照网络拓扑排好序的block返回给client端 return blocks; }
private LocatedBlocks getBlockLocationsUpdateTimes(final String srcArg, long offset, long length, boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException, UnresolvedLinkException, IOException { String src = srcArg; FSPermissionChecker pc = getPermissionChecker(); // 此处的pathComponents在普通src(/user/hadoop/xx)中为null byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); // 是否需要更新访问时间,如果超过了访问精度则进行访问时间的更新, // 更新时需要拿到writeLock for (int attempt = 0; attempt < 2; attempt++) { boolean isReadOp = (attempt == 0); if (isReadOp) { // first attempt is with readlock checkOperation(OperationCategory.READ); // 可重入锁 readLock(); } else { // second attempt is with write lock checkOperation(OperationCategory.WRITE); writeLock(); // writelock is needed to set accesstime } try { src = resolvePath(src, pathComponents); if (isReadOp) { checkOperation(OperationCategory.READ); } else { checkOperation(OperationCategory.WRITE); } if (isPermissionEnabled) { checkPathAccess(pc, src, FsAction.READ); }
// if the namenode is in safemode, then do not update access time if (isInSafeMode()) { doAccessTime = false; } // 将src path解析为INodes infomation final INodesInPath iip = dir.getINodesInPath(src, true); final INode[] inodes = iip.getINodes(); // 将path中的文件名INode转化为INodeFile final INodeFile inode = INodeFile.valueOf( inodes[inodes.length - 1], src); if (isPermissionEnabled) { checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId()); } if (!iip.isSnapshot() //snapshots are readonly, so don't update atime. && doAccessTime && isAccessTimeSupported()) { finallong now = now(); // 当前的访问时间与上次访问时间超过了访问时间精度 // 进行访问时间的更新 if (now > inode.getAccessTime() + getAccessTimePrecision()) { // if we have to set access time but we only have the readlock, then // restart this entire operation with the writeLock. if (isReadOp) { continue; } boolean changed = dir.setTimes(inode, -1, now, false, iip.getLatestSnapshotId()); if (changed) { getEditLog().logTimes(src, -1, now); } } } // 当前src path不是快照则计算文件的长度时不包括最后一个正在构建的block finallong fileSize = iip.isSnapshot() ? inode.computeFileSize(iip.getPathSnapshotId()) : inode.computeFileSizeNotIncludingLastUcBlock(); boolean isUc = inode.isUnderConstruction(); if (iip.isSnapshot()) { // if src indicates a snapshot file, we need to make sure the returned // blocks do not exceed the size of the snapshot file. length = Math.min(length, fileSize - offset); isUc = false; }
final FileEncryptionInfo feInfo = FSDirectory.isReservedRawName(srcArg) ? null : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); // INodeFile的blocks通过blockManager创建LocatedBlocks final LocatedBlocks blocks = blockManager.createLocatedBlocks(inode.getBlocks(), fileSize, isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo); // Set caching information for the located blocks. for (LocatedBlock lb: blocks.getLocatedBlocks()) { cacheManager.setCachedLocations(lb); } return blocks; } finally { if (isReadOp) { readUnlock(); } else { writeUnlock(); } } } returnnull; // can never reach here }
publicvoidsortLocatedBlocks(final String targethost, final List<LocatedBlock> locatedblocks){ //sort the blocks // As it is possible for the separation of node manager and datanode, // here we should get node but not datanode only . Node client = getDatanodeByHost(targethost); if (client == null) { List<String> hosts = new ArrayList<String> (1); hosts.add(targethost); String rName = dnsToSwitchMapping.resolve(hosts).get(0); if (rName != null) client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost); } Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ? new DFSUtil.DecomStaleComparator(staleInterval) : DFSUtil.DECOM_COMPARATOR; for (LocatedBlock b : locatedblocks) { DatanodeInfo[] di = b.getLocations(); // 将下线的或者过时的dn放到最后 Arrays.sort(di, comparator); int lastActiveIndex = di.length - 1; while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) { --lastActiveIndex; } int activeLen = lastActiveIndex + 1; networktopology.sortByDistance(client, b.getLocations(), activeLen); } }
publicvoidsortByDistance(Node reader, Node[] nodes, int activeLen){ /** Sort weights for the nodes array */ int[] weights = newint[activeLen]; for (int i=0; i<activeLen; i++) { // 0 is local, 1 is same rack, 2 is off rack weights[i] = getWeight(reader, nodes[i]); } // Add weight/node pairs to a TreeMap to sort // TreeMap 按照key进行排序 TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>(); for (int i=0; i<activeLen; i++) { int weight = weights[i]; Node node = nodes[i]; List<Node> list = tree.get(weight); if (list == null) { list = Lists.newArrayListWithExpectedSize(1); tree.put(weight, list); } list.add(node); }
int idx = 0; for (List<Node> list: tree.values()) { if (list != null) { // 对TreeMap中value进行shuffle,以免产生热点 // 默认情况下list里只有一个node,当副本数增多, // 可能在同一rack上有两个副本,则此时list中就有两个node Collections.shuffle(list, r); for (Node n: list) { nodes[idx] = n; idx++; } } } Preconditions.checkState(idx == activeLen, "Sorted the wrong number of nodes!"); }