[上篇](http://bigdatadecode.top/HDFS read解析.html)主要记录了HDFS read打开一个文件流的流程,该篇记录下从打开的文件流里read数据的流程。

HDFS Read之从文件流中read

读取文件流中的数据通过文件流FSDataInputStream对象的read方法进行读取,最终调用了DFSInputStream的read方法(可以写个hdfs read file demo 进行debug下就会发现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public synchronized int read(final byte buf[], int off, int len) throws IOException {
// ReaderStrategy 将不同的BlockReader进行了封装
// 真正读数据的是BlockReader对象
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
return readWithStrategy(byteArrayReader, off, len);
}

private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
...
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
failures = 0;
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
// pos 和 blockEnd 会在blockSeekTo -> getBlockAt 中赋值
if (pos > blockEnd || currentNode == null) {
// 当前position所在的block
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
}
// 读取buffer
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);

if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
// 如果检测到ChecksumException 则只抛出异常,再次进行循环
} catch (ChecksumException ce) {
throw ce;
// 如果捕获到IO异常,则retries次数减1,进入下一次循环
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }
if (--retries == 0) {
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
}
}
}
return -1;
}

readWithStrategy会重试2次进行读取,_如果捕获到ChecksumException则直接进行重试(本次尝试不计数)_,当捕获到IOException异常时会进行重试(尝试次数减1),重新选择dn进行读取,并把有io异常的dn放入deadNodes map中。一个block一个deadNodes???

deadNodes是DFSInputStream的属性,则应该是一个DFSInputStream一个deadNodes,但是在读取一个block结束之后,deadNodes会不会被clear掉??读取失败之后会clear deadNodes

readWithStrategy中主要有两个方法,分别是blockSeekToreadBuffer,blockSeekTo是找到当前pos的block,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
...
//
// Connect to best DataNode for desired Block, with potential offset
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
boolean connectFailedOnce = false;
while (true) {
//
// Compute desired block
//得到target所在的dn locations信息
LocatedBlock targetBlock = getBlockAt(target, true);
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
// 从dn set中选出一个dn
DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
try {
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
// 使用 Builder模式 创建一个 blockReader
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(cachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk);
}
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
// Fetch a block from namenode and cache it ????
fetchBlockAt(target);
} else {
connectFailedOnce = true;
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
}
}
}

blockSeekTo主要包含两个方法和一个BlockReader的实例化对象,一个方法是getBlockAt,其作用是得到当前offset所在的block的dn locations信息,另一个方法是chooseDataNode,其作用是从getBlockAt得到的dn locations list中得到一个dn。getBlockAt的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private synchronized LocatedBlock getBlockAt(long offset,
boolean updatePosition) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";

final LocatedBlock blk;

//check offset
if (offset < 0 || offset >= getFileLength()) {
throw new IOException("offset < 0 || offset >= getFileLength(), offset="
+ offset
+ ", updatePosition=" + updatePosition
+ ", locatedBlocks=" + locatedBlocks);
}
else if (offset >= locatedBlocks.getFileLength()) {
// offset to the portion of the last block,
// which is not known to the name-node yet;
// getting the last block
blk = locatedBlocks.getLastLocatedBlock();
}
else {
// search cached blocks first
// 使用二分查找从缓存的block中查找当前offset所在的block
int targetBlockIdx = locatedBlocks.findBlock(offset);
// 在缓存中没有找到
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks
// 再次抓取blocks,从当前offset处开始抓取
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
assert (newBlocks != null) : "Could not find target position " + offset;
// 将new block插入到缓存的targetBlockIdx位置中
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
// 得到offset所在的block targetBlockIdx是block在缓存中的索引
blk = locatedBlocks.get(targetBlockIdx);
}

// update current position
// 更新read的起始地址pos和结束地址blockEnd
if (updatePosition) {
pos = offset;
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
currentLocatedBlock = blk;
}
return blk;
}

getBlockAt主要是得到offset所在的block,其检索方法是先在缓存(这个缓存大小是由_dfs.client.read.prefetch.size_决定的)中进行二分查找,找到就返回其索引,如果没有找到则再次调用rpc进行重新抓取block,__此次抓取的block的是从offset所在block开始抓取的__。下面看下findBlock的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public int findBlock(long offset) {
// create fake block of size 0 as a key
// 创建一个LocatedBlock对象,便于和LocatedBlocks中的block进行比较
LocatedBlock key = new LocatedBlock(
new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
key.setStartOffset(offset);
key.getBlock().setNumBytes(1);
// 重写comparator
Comparator<LocatedBlock> comp =
new Comparator<LocatedBlock>() {
// Returns 0 iff a is inside b or b is inside a
@Override
public int compare(LocatedBlock a, LocatedBlock b) {
long aBeg = a.getStartOffset();
long bBeg = b.getStartOffset();
long aEnd = aBeg + a.getBlockSize();
long bEnd = bBeg + b.getBlockSize();
if(aBeg <= bBeg && bEnd <= aEnd
|| bBeg <= aBeg && aEnd <= bEnd)
return 0; // one of the blocks is inside the other
if(aBeg < bBeg)
return -1; // a's left bound is to the left of the b's
return 1;
}
};
// 调用Collections的二分查找
return Collections.binarySearch(blocks, key, comp);
}

findBlock主要是利用Collections的二分查找进行查找offset所在的block,进行比较时先创建一个LocatedBlock对象,然后重写Comparator进行对象的比较。如果没有找到则调用dfsClient.getLocatedBlocks,从当前offset所在的block为起点进行再次抓取固定长度的block,并将newBlocks插入缓存中的blocks中。

抓取block是通过rpc调用dfsClient.getLocatedBlocks从FSNamesystem中获得blocks列表,然后通过locatedBlocks.insertRange插入到缓存中,locatedBlocks.insertRange代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) {
// 返回在缓存中查找失败返回的low处的索引
int oldIdx = blockIdx;
int insStart = 0, insEnd = 0;
// 如果缓存blocks的最后一个元素依然小于目标block的offset时(也就是low=len+1)
// 则不进入for循环
// 找到目标block在newBlocks中的索引
for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size();
newIdx++) {
long newOff = newBlocks.get(newIdx).getStartOffset();
long oldOff = blocks.get(oldIdx).getStartOffset();
// 当newBlocks
if(newOff < oldOff) {
insEnd++;
} else if(newOff == oldOff) {
// replace old cached block by the new one
blocks.set(oldIdx, newBlocks.get(newIdx));
if(insStart < insEnd) { // insert new blocks
blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
oldIdx += insEnd - insStart;
}
insStart = insEnd = newIdx+1;
oldIdx++;
} else { // newOff > oldOff
assert false : "List of LocatedBlock must be sorted by startOffset";
}
}
insEnd = newBlocks.size();
// 将大于目标block的blocks插入缓存中
if(insStart < insEnd) { // insert new blocks
blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
}
}

insertRange将newBlocks中的block根据其在blocks中的顺序分区间插入blocks中。区间由insStart和insEnd控制,for循环中调整insEnd和insStart的值,分批次插入到blocks中。

此时就可以得到目标block的locations信息,通过chooseDataNode选择一个dn,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) {
try {
return getBestNodeDNAddrPair(block, ignoredNodes);
} catch (IOException ie) {
// 捕获到getBestNodeDNAddrPair中chosenNode为null的异常之后
// 清空deadNodes,重新获取该block的信息
// 尝试3次,抛出异常
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
// dfs.client.max.block.acquire.failures 默认是3
// 获取该block信息3次,注意与获取3次dn的区别
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException");
throw new BlockMissingException(src, description,
block.getStartOffset());
}

DatanodeInfo[] nodes = block.getLocations();
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ " from any node: " + ie + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
// The wait time is dependent on # of failures and a random factor.
// At the first time of getting a BlockMissingException, the wait time
// is a random number between 0..3000 ms. If the first retry
// still fails, we will wait 3000 ms grace period before the 2nd retry.
// Also at the second retry, the waiting window is expanded to 6000 ms
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
final int timeWindow = dfsClient.getConf().timeWindow;
double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime);
} catch (InterruptedException iex) {
}
// 从block的所有dn中没有找到合适的dn,则将deadNodes清空,重新获取该block的信息
// 一个block一个deadNodes
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo();
block = getBlockAt(block.getStartOffset(), false);
failures++;
continue;
}
}
}

private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null;
StorageType storageType = null;
if (nodes != null) {
// 遍历选出非deadNode节点
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same
// index to get storage type.
if (storageTypes != null && i < storageTypes.length) {
storageType = storageTypes[i];
}
break;
}
}
}
// 循环了一圈依然没有找到合适的dn
if (chosenNode == null) {
throw new IOException("No live nodes contain block " + block.getBlock() +
" after checking nodes = " + Arrays.toString(nodes) +
", ignoredNodes = " + ignoredNodes);
}
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType);
}

chooseDataNode调用getBestNodeDNAddrPairblock的locations中选择一个dn,如果没有找到则抛出一个IOException异常,在chooseDataNode中捕获,在catch中校验重试次数是否超过dfs.client.max.block.acquire.failures,没有则清空deadNodes再次去获取该block的locations。依然失败则抛出异常。

chooseDataNode结束返回到blockSeekTo中,由BlockReaderFactory创建BlockReader对象,这里创建的BlockReader对象会根据short circuit local read还是远程读创建不同的BlockReader。BlockReaderFactory使用了Builder设计模式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public BlockReader build() throws IOException {
BlockReader reader = null;

Preconditions.checkNotNull(configuration);
// 检查是否开启了short circuit local read
// short circuit local read 使用的就是 Unix Domain Socket技术,
// 那为什么还要将conf.domainSocketDataTraffic作为一个备选方案
if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal();
if (reader != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": returning new legacy block reader local.");
}
return reader;
}
} else {
reader = getBlockReaderLocal();
if (reader != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": returning new block reader local.");
}
return reader;
}
}
}
// 通过UNIX domain socket 得到一个remote block reader
// 与short circuit local read 的区别????
if (conf.domainSocketDataTraffic) {
reader = getRemoteBlockReaderFromDomain();
if (reader != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": returning new remote block reader using " +
"UNIX domain socket on " + pathInfo.getPath());
}
return reader;
}
}
Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
"TCP reads were disabled for testing, but we failed to " +
"do a non-TCP read.");
// 返回远程BlockReader
return getRemoteBlockReaderFromTcp();
}

得到BlockReader之后代码回到readWithStrategy中,然后进行readBuffer操作,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// corruptedBlockMap在readWithStrategy中被实例化
private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
IOException ioe;

/* we retry current node only once. So this is set to true only here.
* Intention is to handle one common case of an error that is not a
* failure on datanode or client : when DataNode closes the connection
* since client is idle. If there are other cases of "non-errors" then
* then a datanode might be retried by setting this to true again.
*/
boolean retryCurrentNode = true;

while (true) {
// retry as many times as seekToNewSource allows.
try {
return reader.doRead(blockReader, off, len, readStatistics);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
+ " at " + ce.getPos());
ioe = ce;
retryCurrentNode = false;
// we want to remember which block replicas we have tried
addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
corruptedBlockMap);
} catch ( IOException e ) {
if (!retryCurrentNode) {
DFSClient.LOG.warn("Exception while reading from "
+ getCurrentBlock() + " of " + src + " from "
+ currentNode, e);
}
ioe = e;
}
boolean sourceFound = false;
if (retryCurrentNode) {
/* possibly retry the same node so that transient errors don't
* result in application level failures (e.g. Datanode could have
* closed the connection because the client is idle for too long).
*/
sourceFound = seekToBlockSource(pos);
} else {
addToDeadNodes(currentNode);
sourceFound = seekToNewSource(pos);
}
if (!sourceFound) {
throw ioe;
}
retryCurrentNode = false;
}
}

readBuffer是通过调用ByteArrayStrategy重写的doRead方法来read的,而ByteArrayStrategy.doRead又是调用BlockReader.read来进行真正的读操作。

doRead时readBuffer会捕获到ChecksumException和IOException异常,

  • 如果检测到_ChecksumException_异常,retryCurrentNode 变为fasle,将当前节点加入deadNodes,然后进行seekToNewSource
  • 如果检测到_IOException_异常,并且retryCurrentNode为true,则进行seekToBlockSource
                如果retryCurrentNode为false,将当前节点加入deadNodes,然后进行seekToNewSource
    

retryCurrentNode标识当前节点read失败之后是否进行重试,如果是ChecksumException失败,则不进行重试,如果是IOException失败,则进行重试,并且只重试一次。_因为可能存在由于client长时间没有任何动作,则dn关闭了连接导致IOException,此时进行重试_。__此处进行重试并不是马上对该节点进行重试,只是不该节点标为dead,可以在随后的choseNode时可以被再次选择__。

由seekToNewSource来选择再次read的节点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized boolean seekToNewSource(long targetPos) throws IOException {
boolean markedDead = deadNodes.containsKey(currentNode);
addToDeadNodes(currentNode);
DatanodeInfo oldNode = currentNode;
DatanodeInfo newNode = blockSeekTo(targetPos);
if (!markedDead) {
/* remove it from deadNodes. blockSeekTo could have cleared
* deadNodes and added currentNode again. Thats ok. */
deadNodes.remove(oldNode);
}
if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
currentNode = newNode;
return true;
} else {
return false;
}
}

真正的read操作是BlockReader.read,BlockReader是在blockSeekTo中实例化的,此处是_远程read_,其实现是RemoteBlockReader2.read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// RemoteBlockReader2.read
public synchronized int read(byte[] buf, int off, int len)
throws IOException {

UUID randomId = null;
...
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
...
if (curDataSlice.remaining() == 0) {
// we're at EOF now
return -1;
}
// 如果buf的len小于curDataSlice的长度,剩下的内容怎么读
int nRead = Math.min(curDataSlice.remaining(), len);
// 从curDataSlice中读取数据到buf中
curDataSlice.get(buf, off, nRead);

return nRead;
}

private void readNextPacket() throws IOException {
//Read packet headers.
packetReceiver.receiveNextPacket(in);

PacketHeader curHeader = packetReceiver.getHeader();
curDataSlice = packetReceiver.getDataSlice();
assert curDataSlice.capacity() == curHeader.getDataLen();
...
if (curHeader.getDataLen() > 0) {
// bytesPerChecksum 多少字节一个checkcum
// curHeader.getDataLen得到数据data的长度,然后得出需要多少个chunks
int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
// 得出checksum的长度
int checksumsLen = chunks * checksumSize;
...
lastSeqNo = curHeader.getSeqno();
if (verifyChecksum && curDataSlice.remaining() > 0) {
// N.B.: the checksum error offset reported here is actually
// relative to the start of the block, not the start of the file.
// This is slightly misleading, but preserves the behavior from
// the older BlockReader.
// 利用checksum检查数据是否正确
checksum.verifyChunkedSums(curDataSlice,
packetReceiver.getChecksumSlice(),
filename, curHeader.getOffsetInBlock());
}
bytesNeededToFinish -= curHeader.getDataLen();
}

// First packet will include some data prior to the first byte
// the user requested. Skip it.
if (curHeader.getOffsetInBlock() < startOffset) {
int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
curDataSlice.position(newPos);
}

// If we've now satisfied the whole client read, read one last packet
// header, which should be empty
// bytesNeededToFinish是表示还需要读多少字节,
// 这个是在哪赋值的?怎么知道还有多少字节要读
if (bytesNeededToFinish <= 0) {
// 读取结束
readTrailingEmptyPacket();
if (verifyChecksum) {
sendReadResult(Status.CHECKSUM_OK);
} else {
sendReadResult(Status.SUCCESS);
}
}
}

read时packet是基本的传输单位,每个packet(默认每个packet为64K)由若干个chunk组成,每个chunk对应一个chunksum。

curDataSlice中存储这需要读取data的信息,初次读取packet时,curDataSlice为null,进行packet读取readNextPacket,对curDataSlice进行赋值(_也就是先把内容读取到packet中,此时curDataSlice就相当于一个packet_),并对packet中的数据调用checksum.verifyChunkedSums进行checksum检验。

最后由curDataSlice.get(buf, off, nRead)从curDataSlice读取一定长度的byte放入buf中。

则一次调用read流程结束,一次read只是从packet中读取一些byte,再次调用read会继续从curDataSlice中get一定长度的byte,直到curDataSlice.remaining() == 0 && bytesNeededToFinish > 0时,也就是当前packet内容读取完毕,然后再次调用readNextPacket进行读取block。

简单说下packet的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Each packet looks like:
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
//
// PLEN: Payload length
// = length(PLEN) + length(CHECKSUMS) + length(DATA)
// This length includes its own encoded length in
// the sum for historical reasons.
//
// HLEN: Header length
// = length(HEADER)
//
// HEADER: the actual packet header fields, encoded in protobuf
// CHECKSUMS: the crcs for the data chunk. May be missing if
// checksums were not requested
// DATA the actual block data

HDFS一个文件由多个block构成。HDFS在进行block读写的时候是以packet(默认每个packet为64K)为单位进行的。每一个packet由若干个chunk(默认512Byte)组成。Chunk是进行数据校验的基本单位,对每一个chunk生成一个校验和(默认4Byte)并将校验和进行存储。在读取一个block的时候,数据传输的基本单位是packet,每个packet由若干个chunk组成。

补充

getRemoteBlockReaderFromTcp代码跟读

getRemoteBlockReaderFromTcp得到一个远程BlockReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
...
BlockReader blockReader = null;
while (true) {
BlockReaderPeer curPeer = null;
Peer peer = null;
try {
//Get the next TCP-based peer-- either from the cache or by creating it.
curPeer = nextTcpPeer();
if (curPeer == null) break;
if (curPeer.fromCache) remainingCacheTries--;
peer = curPeer.peer;
// 通过peer得到某个block的blockReader
blockReader = getRemoteBlockReader(peer);
return blockReader;
} catch (IOException ioe) {
...
} finally {
if (blockReader == null) {
IOUtils.cleanup(LOG, peer);
}
}
}
return null;
}

private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
if (conf.useLegacyBlockReader) {
return RemoteBlockReader.newBlockReader(fileName,
block, token, startOffset, length, conf.ioBufferSize,
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy);
} else {
return RemoteBlockReader2.newBlockReader(
fileName, block, token, startOffset, length,
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy);
}
}
// RemoteBlockReader2
public static BlockReader newBlockReader(String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID,
PeerCache peerCache,
CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
// 使用Socket建立写入流,
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
// 向DataNode发送读指令
// 此处的readBlock与DataXceiver中的readBlock的区别是啥?这是一个rpc调用??
// Sender是client端,DataXceiver是Server端???
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum, cachingStrategy);

//
// Get bytes in block
//
DataInputStream in = new DataInputStream(peer.getInputStream());

BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
//Warning when we get CHECKSUM_NULL?

// Read the first chunk offset.
long firstChunkOffset = checksumInfo.getChunkOffset();

if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
}

return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
datanodeID, peerCache);
}

随机读

HDFS读文件包含两种,一种是最经常使用的顺序读,另一种是随机读。像MR任务,一般都会涉及到随机读。MR在提交作业时,已经确定了每个map和reduce要读取的文件,文件的偏移量,读取的长度,则只读取相应的split就行。

随机读的代码入口函数依然在FSDataInputStream中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// FSDataInputStream.read
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
return ((PositionedReadable)in).read(position, buffer, offset, length);
}
//DFSInputStream.read
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// sanity checks
dfsClient.checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
}
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}

// determine the block and byte range within the block
// corresponding to position and realLen
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
- 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
// corrupted.
reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
}

remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(realLen);
}
return realLen;
}

总结

这两篇文章主要介绍了hdfs读文件的流程,整个流程为:

  1. 得到一个文件系统的实例,通过getFileSystem得到
  2. open一个文件输入流,open时根据指定的path调用rpc打开一个FSDataInputStream,在初始化输入流时,会将一部分block的locations信息读入内存进行缓存,默认是10个block。将输入流的实例返回给client。内存中缓存的block已经是按照各个dn到client的距离进行排序之后的结果
  3. 输入流实例化之后,调用read方法读取数据。(读分为顺序读和随机读)
    1. read时可以选择几种ReaderStrategy,本篇选择的是ByteArrayStrategy。
    2. 根据off和len进行读。通过off找到对应的block A(二分查找),如果block A在之前的缓存中,则直接返回block A。如果block A不在之前的缓存中,则再次请求从nn请求一部分block,将新请求的blocks根据其中block在原缓存中的位置插入到缓存中,之后得到该off对应的block A。
    3. 通过block A选择离client最近的dn,*如果得到dn时发生IOException错误(当从locations中没有选择到合适的dn时,抛出IOException)*,则等待一段时间之后,进行重试,默认是重试3次。
    4. 得到dn之后,通过BlockRead进行读取数据,BlockRead根据是否short circuit local 和 remote实例化不同的BlockRead。
    5. read数据时发生的错误为checksumException和IOException,如果是ChecksumException则将dn放入deadNodes中,换个dn进行重读,如果是IOException则直接重读(可能是连接断开了),不将dn放入deadNodes中。
    6. 本篇介绍的是RemoteBlockReader,读文件时是以packet为单位的,读取一个packet放入内存,对其中的chunk进行checksum校验。从内存中读取len的长度,如果读完则继续读取下一个packet。读取完毕之后发送一个空packet。是block的最后一个packet还是??
    7. 读文件时还需要dn端的一些操作,主要类是DataXceiver,这个类随后再分析。

问题

  • 何种情况下block会丢失
  • 读取速度慢,某个node读取失败
  • 怎么判断需要创建远程BlockReader还是本地BlockReader(是否还有本地读的概念,都是通过remote吗?)
    仅由此来判断是否开启本地读还是远程读? setRemotePeerFactory
    allowShortCircuitLocalReads 当 当前文件正在构建中则为false
  • read某个block的dn时,出错,怎么办
    看在哪出错?是在get block dn时还是在得到dn进行read时出错,
    如果在get block的dn时出错,重复3次依然出错,则抛出异常。
    如果在得到dn之后,read时出错,此时又分两种情况,如果是checksum错,换个dn重读,如果是IOException,则直接重试。
  • 发送,数据是怎么发送的??? 读的过程中有send嘛?
  • 读取某个dn的block时失败,将此dn加入哪?或者dn死掉,。。
    加入deadNodes
  • 这个deadNodes是谁维护的?只是本次读?
    读一个block,一个deadNodes
  • In HDFS, why corrupted block(s) happens?
    dfs.datanode.scan.period.hours
  • Sender(out).readBlock与DataXceiver中的readBlock的区别是啥?这是一个rpc调用??
    // Sender是client端,DataXceiver是Server端???

相关配置属性

  • io.file.buffer.size(4096)
    The size of buffer for use in sequence files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.

  • dfs.client.read.prefetch.size

prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);

  • dfs.client.cache.drop.behind.reads (vs dfs.datanode.drop.cache.behind.reads)

    Just like dfs.datanode.drop.cache.behind.reads, this setting causes the page cache to be dropped behind HDFS reads, potentially freeing up more memory for other uses. Unlike dfs.datanode.drop.cache.behind.reads, this is a client-side setting rather than a setting for the entire datanode. If present, this setting will override the DataNode default. If the native libraries are not available to the DataNode, this configuration has no effect.

    In some workloads, the data read from HDFS is known to be significantly large enough that it is unlikely to be useful to cache it in the operating system buffer cache. In this case, the DataNode may be configured to automatically purge all data from the buffer cache after it is delivered to the client. This behavior is automatically disabled for workloads which read only short sections of a block (e.g HBase random-IO workloads). This may improve performance for some workloads by freeing buffer cache space usage for more cacheable data. If the Hadoop native libraries are not available, this configuration has no effect.

  • dfs.client.cache.drop.behind.writes

  • dfs.client.cache.readahead (vs dfs.datanode.readahead.bytes)

    When using remote reads, this setting causes the datanode to read ahead in the block file using posix_fadvise, potentially decreasing I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side setting rather than a setting for the entire datanode. If present, this setting will override the DataNode default. When using local reads, this setting determines how much readahead we do in BlockReaderLocal. If the native libraries are not available to the DataNode, this configuration has no effect.