之前有一篇是介绍HDFS写流程的,只是大致的从代码的角度解析了下,有些细节并没有深挖,这篇文章我们主要来挖下写流程中的一些细节,但是由于我水平有限,只能尽可能的挖,本篇也会不定时的进行打补丁。

这里重点总结下开始往pipeline中写数据到一个block结束关闭pipeline的流程。想先了解下HDFS write流程的请看[这篇](http://bigdatadecode.top/HDFS write解析.html)

这里先列下想要总结的知识点:

  • lease检查发生在哪?createFile时创建lease,两个线程能否同时创建同一个文件?lease检查只在addBlock中发生?是否会判断lease的当前持有者是否继续持有
  • client或者dn何时向nn汇报Replica或者Block的状态
  • pipeline是怎么建立的,targets dn是怎么传递的
  • pipeline在setup阶段发生故障怎么办,client如何检测,如何修复
  • pipeline在stream阶段发生故障怎么办,client如何检测,如何修复
  • pipeline在close阶段发生故障怎么办,client如何检测,如何修复
  • pipeline中某个dn发生故障,是否需要replace dn,replace原则
  • pipeline recovery、block recovery和lease recovery的关系,及各个recovery发生的时间点、由谁主导?
  • Replica和Block的状态是怎么变化的,Replica的状态又是怎么影响Block的状态的。
  • pipeline中什么故障会向上游发送ack然后关闭当前dn的网络连接,是谁关闭dn的连接(dn在DataXceiver.writeBlock的finally中关闭连接)?dn自己还是client?

之所以要把知识点列出来,是因为这些知识点不一定能一次搞清楚,避免以后忘了更新什么知识点。

lease检查发生在哪

HDFS是通过lease来防止并发写,实现写锁功能的。

HDFS write在open FSDataOutputStream时向NN申请lease,并在得到lease之后,由client调用LeaseRenewer来续约。要想防止并发写,则需要在client在写数据时判断是否持有该文件的lease,那么这个lease的检查都发生在哪里呢?

addFile

在HDFS write代码跟读中发现,在申请lease时并没有进行lease的检查(没有检查该文件的lease是否已被别的client所持有),这样如果有两个client对同一个文件进行open FSDataOutputStream时,能否成功,会发生什么??

申请lease的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
String src, PermissionStatus permissions, String holder,
String clientMachine, boolean create, boolean overwrite,
boolean createParent, short replication, long blockSize,
boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
EncryptedKeyVersion edek, boolean logRetryEntry)
throws FileAlreadyExistsException, AccessControlException,
UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, RetryStartFileException, IOException {
...
try {
BlocksMapUpdateInfo toRemoveBlocks = null;
// myFile为null,也就是src不存在时,此时create为true
if (myFile == null) {
if (!create) {
throw new FileNotFoundException("Can't overwrite non-existent " +
src + " for client " + clientMachine);
}
} else {
// myFile不为null,也就是说src已经存在,则重写
// overwrite时不应该检查下该client是否持有该文件的lease吗?
if (overwrite) {
toRemoveBlocks = new BlocksMapUpdateInfo();
List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now());
if (ret >= 0) {
incrDeletedFileCount(ret);
removePathAndBlocks(src, null, toRemoveINodes, true);
}
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
// 检查HDFS中的object(inode+block)是否达到上限
checkFsObjectLimit();
INodeFile newNode = null;

// Always do an implicit mkdirs for parent directory tree.
Path parent = new Path(src).getParent();
if (parent != null && mkdirsRecursively(parent.toString(),
permissions, true, now())) {
// addFile到namespace,这期间会判断Quota
newNode = dir.addFile(src, permissions, replication, blockSize,
holder, clientMachine);
}
...
// 将file添加到namespace中之后,申请lease
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
.getClientName(), src);
...
} catch (IOException ie) {
...
}
}

由上面的代码可知client在open一个file的输出流时,并没有进行lease检查(目前未发现),只是判断当前file是进行create还是overwrite,然后进行namespace状态的更新,最后进行申请lease。

那么当clientA打开fileA文件的输出流进行写数据时,clientB也发出了请求来打开fileA文件的输出流,此时fileA已经存在,则进行overwrite,在此过程中,没有进行任何lease的检查,clientB能够将clientA已经写入fileA中的数据覆盖掉(也就是把已经写入的数据调用dir.delete删除,并且delete时也没有涉及lease检查),*随后又会通过addLease得到clientB关于fileA的lease(此时clientA持有clientA关于fileA的lease)*。

lease在LeaseManager中是以持有者来区分的,clientA和clientB是不同的持有者,则在各自的lease中都保留这fileA的path。

这种情况HDFS具体是怎么处理的?有时间找个合适的例子去测试下。。。

addBlock

上面分析了下addFile时lease相关的操作,下面解析下addBlock时进行的lease检查。

block是通过pipeline写入dn的,当需要addBlock时,DataStreamer会创建一个pipeline,pipeline的创建期间进行lease检查。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// FSNamesystem.java
FileState analyzeFileState(String src,
long fileId,
String clientName,
ExtendedBlock previous,
LocatedBlock[] onRetryBlock)
throws IOException {
...
// checkLease进行lease检查
final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
...
}

private INodeFile checkLease(String src, String holder, INode inode,
long fileId)
throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock();
final String ident = src + " (inode " + fileId + ")";
if (inode == null) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException(
"No lease on " + ident + ": File does not exist. "
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
if (!inode.isFile()) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException(
"No lease on " + ident + ": INode is not a regular file. "
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
final INodeFile file = inode.asFile();
if (!file.isUnderConstruction()) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException(
"No lease on " + ident + ": File is not open for writing. "
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
// No further modification is allowed on a deleted file.
// A file is considered deleted, if it is not in the inodeMap or is marked
// as deleted in the snapshot feature.
if (isFileDeleted(file)) {
throw new FileNotFoundException(src);
}
// 通过file得到该file的clientName,判断是否持有该lease
// 那么上面的情况,file会有两个clientName?这肯定不会出现,但在哪进行的处理呢??
String clientName = file.getFileUnderConstructionFeature().getClientName();
if (holder != null && !clientName.equals(holder)) {
throw new LeaseExpiredException("Lease mismatch on " + ident +
" owned by " + clientName + " but is accessed by " + holder);
}
return file;
}
  • pipeline是怎么建立的,targets dn是怎么传递的
  • pipeline在setup阶段发生故障怎么办,client如何检测,如何修复

pipeline setup & error

pipeline setup

HDFS写数据时是先将数据写入本地buf中,buf写满之后写入packet中,packet写满之后放入dataQueue,通知DataStreamer去消费,此时DataStreamer发现pipeline的状态是PIPELINE_SETUP_CREATE,则开始创建pipeline。创建pipeline时会向NN申请addBlock,NN会返回block的locations信息,然后根据locations信息创建pipeline。

pipeline的长度默认是3(副本因子个数),在client端通过createBlockOutputStream与pipeline中的第一个dn建立连接并等待dn发送回来的connect-ack。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
private boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
...
while (true) {
boolean result = false;
DataOutputStream out = null;
try {
// 与第一个dn建立http连接
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
// 向下游发送请求
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
// 接收下游返回的ack
InputStream unbufIn = NetUtils.getInputStream(s);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));

blockReplyStream = new DataInputStream(unbufIn);
...
// 向dn发送写请求,也可以理解为connect请求
// 由dn上的DataXceiverServer接收,new一个DataXceiver去响应
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);

// receive ack for connect
// 向下游发送写请求之后,等待下游返回的connect-ack
// 接收到connect-ack之后才会向pipeline中send packet
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
// OOB是out of band的缩写,带外数据
// Got an restart OOB ack.
// If a node is already restarting, this status is not likely from
// the same node. If it is from a different node, it is not
// from the local datanode. Thus it is safe to treat this as a
// regular node error.
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
restartingNodeIndex == -1) {
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
// pipeline setup没有成功,抛出异常在catch中处理
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
}
}
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
restartingNodeIndex = -1;
hasError = false;
} catch (IOException ie) {
if (restartingNodeIndex == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
}
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ nodes[0] + " : " + ie);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
// Don't close the socket/exclude this node just yet. Try again with
// a new encryption key.
continue;
}

// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorIndex = i;
break;
}
}
} else {
assert checkRestart == false;
errorIndex = 0;
}
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
restartingNodeIndex = errorIndex;
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex]);
}
hasError = true;
setLastException(ie);
result = false; // error
} finally {
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
}
}
return result;
}
}

在pipeline setup阶段client先向pipeline中的第一个dn发送连接请求,然后等待pipeline中dn发送回的conncet-ack,由于client发送连接请求和接收dn返回的connect-ack是同步的,所以不需要像接收packet ack那样单独new一个Responderclient发送连接请求之后会一直等待最后一个dn的connect-ack返回到client之后才会继续执行,去校验connect是否成功

client与pipeline中的第一个dn建立连接之后,下面来看下pipeline中剩下的dn是如何建立连接的。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// DataXceiver.writeBlock
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
...
// reply to upstream datanode or client
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
getOutputStream(),
HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);

DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
final String storageUuid;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);

storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
block, latestGenerationStamp, minBytesRcvd);
}

// 与下游dn建立连接
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + mirrorNode);
}
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);

OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);

// Do not propagate allowLazyPersist to downstream DataNodes.
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, false);

mirrorOut.flush();

// read connect ack (only for clients, not for replication req)
if (isClient) {
// 从mirrorIn中读取下游返回的connect-ack
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
firstBadLink);
}
}

} catch (IOException e) {
if (isClient) {
BlockOpResponseProto.newBuilder()
.setStatus(ERROR)
// NB: Unconditionally using the xfer addr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
LOG.error(datanode + ":Exception transfering block " +
block + " to mirror " + mirrorNode + ": " + e);
throw e;
} else {
LOG.info(datanode + ":Exception transfering " +
block + " to mirror " + mirrorNode +
"- continuing without the mirror", e);
}
}
}

// send connect-ack to source for clients and not transfer-RBW/Finalized
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}

// receive the block and mirror to the next target
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);

// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
writeResponse(SUCCESS, null, replyOut);
}
}
...
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
// close all opened streams
IOUtils.closeStream(mirrorOut);
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
blockReceiver = null;
}
...
}

dn上响应上游或者client连接请求的是DataXceiver线程。如果有下游dn,则DataXceiver与下游dn建立连接,发送写请求,并等待下游返回的connect-ack,接收到下游的返回的connect-ack之后将connect-ack继续返回给上游,之后开始调用blockReceiver.receiveBlock来接收pipeline中的packet。

当client接收到pipeline逐层返回的connect-ack之后,pipeline就创建成功了。不过细心的小伙伴们可能会发现在DataXceiver中与下游dn建立连接时都是从target中取出的第一个元素进行连接(mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);),那么target中的元素是怎么减少的呢?target是通过Sender发送给下游dn的,查看Sender.writeBlock参数中target的值并没有变化,但是下游dn接收到的target却发生了变化,则target的值一定是在writeBlock中改变的,查看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Sender.java
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
...
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.setStorageType(PBHelper.convertStorageType(storageType))
// PBHelper.convert 将target中的元素从索引1处开始
.addAllTargets(PBHelper.convert(targets, 1))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist);
...
}

下面看下在pipeline setup过程中出现故障怎么办?

errors in pipeline setup

client端可能在与dn建立socket连接时(createSocketForPipeline)和接收dn返回的connect-ack时(BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(blockReplyStream)))发生IOException error。
调用createSocketForPipeline与dn建立socket连接时,可能会抛出IOException异常;在接收dn返回的connect-ack时,根据返回的状态pipelineStatus是否为SUCCESS,不是SUCCESS则根据pipelineStatus的状态抛出不同的异常。这些异常在catch中捕获,处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
try {
// 与dn建立连接,失败抛出异常IOException
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
...
// receive ack for connect
// 接收dn返回的connect-ack
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(blockReplyStream));
// 记录从connect的状态
pipelineStatus = resp.getStatus();
// 记录发生故障dn的索引
firstBadLink = resp.getFirstBadLink();
...
// pipelineStatus状态不为SUCCESS,抛出异常
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
}
}
...
} catch (IOException ie) {
// createBlockOutputStream中发生故障
if (restartingNodeIndex == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
}
...
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
// errorIndex在DataStreamer中标识处理error,processDatanodeError进行处理
errorIndex = i;
break;
}
}
} else {
// firstBadLink.length()为0,则可能是在createBlockOutputStream时发生error
assert checkRestart == false;
errorIndex = 0;
}
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
restartingNodeIndex = errorIndex;
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex]);
}
// hasError在DataStreamer中标识处理error,processDatanodeError进行处理
hasError = true;
setLastException(ie);
result = false; // error
} finally {
// 发生error时,result在catch中设为false
// 发生error,关闭与dn的连接
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
}
}

如果在createSocketForPipeline中发生error,直接被catch捕获,firstBadLink为初始值(因为没有接受dn返回的connect-ack),errorIndex赋值为0,hasError赋值为true;
如果在接收connect-ack之后,检查pipelineStatus的值为非SUCCESS,抛出异常被catch捕获,找到firstBadLink对应的dn,对errorIndex赋值,hasError赋值为true;

对errorIndex和hasError赋值之后,在finally中根据result的值关闭网络连接

errorIndex>=0和hasError==true时,在DataStreamer.run中调用processDatanodeError对error进行处理。

dn端的处理逻辑的入口在DataXceiver.writeBlock中,writeBlock会判断targets.length是否大于0,大于0则表示有下游dn,则与下游dn建立连接,建立连接时可能会发生error抛出IOException,被catch捕获,设置ack的状态为ERROR返回给上游并关闭相关网络连接。需要注意的是当前dn接收到下游dn的connect-ack时并不会判断ack中的状态,而是直接返回给上游或者client。

在targets.length大于0的代码段的catch中有段代码需要注意下:(具体逻辑随后再屡?????)

1
2
3
4
5
6
7
8
9
10
11
if (isClient) {
LOG.error(datanode + ":Exception transfering block " +
block + " to mirror " + mirrorNode + ": " + e);
// catch中又抛出异常,被外层的catch捕获
throw e;
} else {
// 何时选择忽略error????
LOG.info(datanode + ":Exception transfering " +
block + " to mirror " + mirrorNode +
"- continuing without the mirror", e);
}

pipeline setup中与某个dn连接发生error之后,就立即返回ERROR状态并关闭连接,通过逐层返回给client之后,又client处理,进行重新setup。

pipeline setup发生error之后的修复

errorIndex和hasError在createBlockOutputStream中被赋值,errorIndex等于0表示是client与第一个dn建立连接时发生故障,则再次进入while循环依然尝试与第一个dn建立连接。errorIndex不等0而等于pipeline中dn的索引时,则跳出while,返回到nextBlockOutputStream中,根据createBlockOutputStream的结果和重试的次数判断是否进入while中,进行再次pipeline setup,此时会像NN重新申请block

这也就是之前文章中介绍的pipeline setup recovery的策略(这里只是create new block时setup error的恢复策略):
如果新建一个pipeline是为了创建一个新block,则client只是放弃这个block,重新向NN申请一个新的block。然后为这个新的block建立pipeline

pipeline streaming & error

pipeline data streaming

pipeline setup成功之后,client端在DataStreamer.run中,通过initDataStreaming,设置BlockConstructionStage的状态为DATA_STREAMING启动一个ResponseProcessor线程接收下游dn返回的packet ack

ResponseProcessor线程启动之后,将packet开始向pipeline中发送,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
throw e;
}

client端的DataStreamer发送packet到pipeline中,pipeline中的第一个dn开始接收packet,代码依然是在DataXceiver.writeBlock中,(此时pipeline已经建立成功,也就是dn已经将connect-ack返回给client,则开始接收pipeline中的packet)。dn通过blockReceiver.receiveBlock接收packet,在接收packet时可能会发生error,抛出IOException,在finally中关闭连接。receiveBlock的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
...
try {
if (isClient && !isTransfer) {
// 启动一个PacketResponder守护线程
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
// 循环从socket中接收packet
while (receivePacket() >= 0) { /* Receive until the last packet */ }
// 该block的packet接收完毕之后,将PacketResponder线程关闭
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
...
} catch (IOException ioe) {
if (datanode.isRestarting()) {
// Do not throw if shutting down for restart. Otherwise, it will cause
// premature termination of responder.
LOG.info("Shutting down for restart (" + block + ").");
} else {
LOG.info("Exception for " + block, ioe);
throw ioe;
}
} finally {
...
}
}

在DataXceiver.writeBlock中会先根据stage创建不同状态Replica的BlockReceiver,然后调用blockReceiver.receiveBlock来接收socket中的packet。

client端会单独起一个ResponseProcessor线程来接收packet ack,那么dn也会新起一个PacketResponder的守护线程来接收下游的返回的ack和向上游发送ack。(需要注意的是client端和dn端中用来存储packet的数据结构有所不同,client端是先将packet放入dataQueue,然后放入ackQueue,而dn端只有ackQueue,没有dataQueue,因为pipeline中dn的内存中只有一个packet,向下游发送之后将内存中的packet放入ackQueue中。)

dn端是以packet为单位接收数据的,packetReceiver.receiveNextPacket(in)从socket接收packet到dn的内存中,然后放入ackQueue中,向下游发送packetReceiver.mirrorPacketTo(mirrorOut),发送完之后将内存中的数据写入disk。

上面介绍了packet从client发送到pipeline中,并在pipeline中的传送,下面介绍下在pipeline Streaming发生error的情况

errors in pipeline streaming

pipeline stream中可以再细分为3个阶段,分别为1.a接收socket中的packet到内存,1.b将内存中的packet发送到下游的socket中,2将内存中的packet写入磁盘,3.a接收下游返回的ack,3.b向上游发送ack。这几个阶段都有可能发生error。

1.a接收socket中的packet的内存时(packetReceiver.receiveNextPacket(in))会抛出异常IOException,在writeBlock中被捕获,finally执行关闭连接。

1.b将内存中的packet写入下游socket时(packetReceiver.mirrorPacketTo(mirrorOut))会抛出异常IOException,被catch捕获,调用handleMirrorOutError,handleMirrorOutError代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* While writing to mirrorOut, failure to write to mirror should not
* affect this datanode unless it is caused by interruption.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
String bpid = block.getBlockPoolId();
LOG.info(datanode.getDNRegistrationForBP(bpid)
+ ":Exception writing " + block + " to mirror " + mirrorAddr, ioe);
if (Thread.interrupted()) { // shut down if the thread is interrupted
throw ioe;
} else { // encounter an error while writing to mirror
// continue to run even if can not write to mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
}
}

无论1.b是否发生error,始终都会将内存中的packet写入磁盘,也就是肯定会执行2。2中写入磁盘的数据主要是chunksum数据和data数据,那么这两步都可能会发生error。

在校验chunksum时(verifyChunks(dataBuf, checksumBuf))抛出异常被捕获,可能会将packet的状态设置为Status.ERROR_CHECKSUM然后放入ackQueue中(在将packet向下游socket中发送之前已经将该packet放入ackQueue中,此时又放进去?会不会重?为什么???),并再抛出IOException异常在writeBlock中捕获,finally执行关闭连接。

将data写入disk时会抛出IOException,在writeBlock中捕获,finally执行关闭连接。

3.a和3.b是在另一个线程中执行的,3.a接收下游的ack时可能会抛出IOException异常,被捕获并对mirrorError赋值为true。3.b向上游或者client发送ack时也可能抛出IOException异常。下面看下PacketResponder.run代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
// new 一个null的ack
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
// 向下游发送packet发生error或者接收下游返回的ack时发生error,
// 都有可能将mirrorError置为true,导致不进if也就是不接受下游的ack
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
// read an ack from downstream datanode
ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
// Process an OOB ACK.
...
seqno = ack.getSeqno();
}
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
// 从ackQueue中取出一个packet
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream
// nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos
- ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos
+ "ns.");
}
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info(myString, ioe);
}
}

if (Thread.interrupted() || isInterrupted) {
/*
* The receiver thread cancelled this thread. We could also check
* any other status updates from the receiver thread (e.g. if it is
* ok to write to replyOut). It is prudent to not send any more
* status back to the client because this datanode has a problem.
* The upstream datanode will detect that this datanode is bad, and
* rightly so.
*
* The receiver thread can also interrupt this thread for sending
* an out-of-band response upstream.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}

if (lastPacketInBlock) {
// Finalize the block and close the block file
finalizeBlock(startTime);
}
// 向上游发送ack,包括接收下游的ack和从自己的ack
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
datanode.checkDiskErrorAsync();
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
}
}
} catch (Throwable e) {
if (running) {
LOG.info(myString, e);
running = false;
receiverThread.interrupt();
}
}
}
LOG.info(myString + " terminating");
}

这里还是有很多地方不清楚,记录下:

这个过程是pipeline中重要的过程,但这里也有很多疑惑,记录下以后慢慢解决
data stream中发生的error的状态是怎么记录在ackQueue中,目前在代码中只发现当verifyChunks异常时,会将packet的状态变为ERROR_CHECKSUM放入ackQueue中,别的情况呢???又是怎么反馈给上游的???
向下游发送packet发生error时,mirrorError可能会被置为true,而此时PacketResponder将不接受下游的ack,那么只是将一个null的ack和Status.SUCCESS(sendAckUpstream(ack, expected, totalAckTimeNanos,(pkt != null ? pkt.offsetInBlock : 0), (pkt != null ? pkt.ackStatus : Status.SUCCESS)))发送给上游????(不接受下游的ack则不能从ackQueue中取packet,但为什么要发送Status.SUCCESS)
当某个packet发生error,client并不是马上发现,而是等待这个packet的ack返回给client之后,client才能检测到该packet发生了error,进行处理。

上面是dn端在pipeline stream中可能发生的error,下面介绍下client端在pipeline stream中发生的error

client端与pipeline stream相关的是ResponseProcessor线程和DataStreamer线程。先看下ResponseProcessor线程接收dn发来的ack,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public void run() {
setName("ResponseProcessor for block " + block);
PipelineAck ack = new PipelineAck();
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
// read an ack from the pipeline
long begin = Time.monotonicNow();
ack.readFields(blockReplyStream);
...
long seqno = ack.getSeqno();
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = ack.getReply(i);
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message);
throw new IOException(message);
}
// node error
if (reply != SUCCESS) {
setErrorIndex(i); // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
" from datanode " +
targets[i]);
}
}

assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}

// a success ack for a data packet
Packet one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
// 接收到的seqno和ackQueue中的不一样,抛出异常
if (one.seqno != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
isLastPacketInBlock = one.lastPacketInBlock;

// Fail the packet write for testing in order to force a
// pipeline recovery.
if (DFSClientFaultInjector.get().failPacket() &&
isLastPacketInBlock) {
failPacket = true;
throw new IOException(
"Failing the last packet for testing.");
}

// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());

synchronized (dataQueue) {
lastAckedSeqno = seqno;
ackQueue.removeFirst();
dataQueue.notifyAll();

one.releaseBuffer(byteArrayManager);
}
} catch (Exception e) {
if (!responderClosed) {
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
// If no explicit error report was received, mark the primary
// node as failed.
tryMarkPrimaryDatanodeFailed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (restartingNodeIndex == -1) {
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
+ " for block " + block, e);
}
responderClosed = true;
}
}
}
}

ResponseProcessoror中接收下游的ack时,如果ack中的packet有非SUCCESS状态,则setErrorIndex并抛出IOException异常;如果接收到ack的seqno和从ackQueue取出的seqno不一样,则抛出IOException异常,抛出之后在catch中捕获,将hasError置为true,关闭ResponseProcessoror线程。

在DataStreamer.run中也可能抛出异常(在one.writeTo(blockStream)中抛出异常IOException)

pipeline stream发生error之后的修复

pipeline stream中发生error只有在client端才能检测到才进行处理。

如果是在ResponseProcessoror中发生的异常,分两种情况,第一种是在在下游的ack中发现packet的状态非SUCCESS,则设置errorIndex为pipeline中第一个发送error的dn索引并抛出IOException在catch中捕获将hasError置为true。
第二种情况是接收到ack的seqno和从ackQueue中取出的seqno不一样,则抛出IOException在catch中捕获将hasError置为true。

DataStreamer.run中one.writeTo(blockStream)抛出异常被catch捕获再次抛出IOException被外层的catch捕获将hasError置为true。

hasError和errorIndex的值发生变化之后,在DataStreamer.run中被发现,进行故障处理。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}

if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
doSleep = processDatanodeError();
}

processDatanodeError只有在errorIndex大于等于0或者restartingNodeIndex大于等于0时才执行,(也就是说当ResponseProcessor接收到的ack中有packet的状态是非SUCCESS时,才会执行???)。

来研究下processDatanodeError代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
private boolean processDatanodeError() throws IOException {
if (response != null) {
DFSClient.LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
return true;
}
// 关闭连接
closeStream();
// 将ackQueue中等待接收ack的packet重新放回dataQueue中,重新往pipeline发送
// move packets from ack queue to front of the data queue
synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}

// Record the new pipeline failure recovery.
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
}
//
boolean doSleep = setupPipelineForAppendOrRecovery();

if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {

// If we had an error while closing the pipeline, we go through a fast-path
// where the BlockReceiver does not run. Instead, the DataNode just finalizes
// the block immediately during the 'connect ack' process. So, we want to pull
// the end-of-block packet from the dataQueue, since we don't actually have
// a true pipeline to send it over.
//
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) {
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
assert endOfBlockPacket.lastPacketInBlock;
assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
lastAckedSeqno = endOfBlockPacket.seqno;
dataQueue.notifyAll();
}
endBlock();
} else {
initDataStreaming();
}
}

return doSleep;
}

private boolean setupPipelineForAppendOrRecovery() throws IOException {
// check number of datanodes
if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \""
+ src + "\" - Aborting...";
DFSClient.LOG.warn(msg);
setLastException(new IOException(msg));
streamerClosed = true;
return false;
}

boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
// Sleep before reconnect if a dn is restarting.
// This process will be repeated until the deadline or the datanode
// starts back up.
if (restartingNodeIndex >= 0) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
4000L);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
lastException.set(new IOException("Interrupted while waiting for " +
"datanode to restart. " + nodes[restartingNodeIndex]));
streamerClosed = true;
return false;
}
}
// 此时hasError的值为true
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
if (errorIndex >= 0) {
StringBuilder pipelineMsg = new StringBuilder();
for (int j = 0; j < nodes.length; j++) {
pipelineMsg.append(nodes[j]);
if (j < nodes.length - 1) {
pipelineMsg.append(", ");
}
}
if (nodes.length <= 1) {
lastException.set(new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting..."));
streamerClosed = true;
return false;
}
DFSClient.LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex]);
failed.add(nodes[errorIndex]);

DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
arraycopy(nodes, newnodes, errorIndex);

final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, errorIndex);

final String[] newStorageIDs = new String[newnodes.length];
arraycopy(storageIDs, newStorageIDs, errorIndex);
// 用剩下的dn组成一个新的pipeline
setPipeline(newnodes, newStorageTypes, newStorageIDs);

// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (errorIndex > restartingNodeIndex) {
restartingNodeIndex = -1;
} else if (errorIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else {
// this shouldn't happen...
assert false;
}
}

if (restartingNodeIndex == -1) {
hasError = false;
}
lastException.set(null);
errorIndex = -1;
}
// 检查是否需要替换发生故障的dn从而组成一个新的pipeline
// 由dfs.client.block.write.replace-datanode-on-failure.policy参数控制策略
// Check if replace-datanode policy is satisfied.
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
nodes, isAppend, isHflushed)) {
try {
// 向pipeline中新加一个dn
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe;
}
DFSClient.LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since "
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
+ " is set to true.", ioe);
}
}
// get a new generation stamp and an access token
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
// good reports should follow bad ones, if client committed
// with those nodes.
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
}

if (restartingNodeIndex >= 0) {
assert hasError == true;
// check errorIndex set above
if (errorIndex == restartingNodeIndex) {
// ignore, if came from the restarting node
errorIndex = -1;
}
// still within the deadline
if (Time.now() < restartDeadline) {
continue; // with in the deadline
}
// expired. declare the restarting node dead
restartDeadline = 0;
int expiredNodeIndex = restartingNodeIndex;
restartingNodeIndex = -1;
DFSClient.LOG.warn("Datanode did not restart in time: " +
nodes[expiredNodeIndex]);
// Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be
// overwritten/dropped. In this case, the restarting node will get
// excluded in the following attempt, if it still does not come up.
if (errorIndex == -1) {
errorIndex = expiredNodeIndex;
}
// From this point on, normal pipeline recovery applies.
}
} // while

if (success) {
// update pipeline at the namenode
ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
// update client side generation stamp
block = newBlock;
}
return false; // do not sleep, continue processing
}

在pipeline中是否替换发生故障的dn与剩下的dn组成新的pipeline的判断条件是通过不能的replace策略决定的,默认是default策略,由dfs.client.block.write.replace-datanode-on-failure.policy控制,default策略如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* DEFAULT condition:
* Let r be the replication number.
* Let n be the number of existing datanodes.
* Add a new datanode only if r >= 3 and either
* (1) floor(r/2) >= n; or
* (2) r > n and the block is hflushed/appended.
*/
public boolean satisfy(final short replication,
final DatanodeInfo[] existings, final int n, final boolean isAppend,
final boolean isHflushed) {
if (replication < 3) {
return false;
} else {
if (n <= (replication/2)) {
return true;
} else {
return isAppend || isHflushed;
}
}
}

是说3个dn组成的pipeline有一个dn发生故障不会替换发生故障的dn?那何时会有两个dn发生故障,在ResponseProcessor中只检测第一个发生故障的dn并设置errorIndex为该dn的索引,在哪里会检查发生多个dn故障???

上面是pipeline stream相关的内容,有些还是不是太清楚,随后再根据不断的啃代码不断加深理解然后再更新吧。

pipeline close & error

pipeline close

client端在DataStreamer.run中首先判断当前packet是否为lastPacketInBlock,如果是则阻塞直到ackQueue中的packet返回的ack都被client收到,然后设置pipeline的状态为BlockConstructionStage.PIPELINE_CLOSE

接着将当前packet也就是block中的最后一个packet发送到pipeline中,将此packet发送到pipeline中则等待收到该packet的ack之后调用endBlock()将连接关闭并将pipeline置为初始状态。

在dn端首先通过DataXceiver.writeBlock调用BlockReceiver.receiverPacket,在其中判断lastPacketInBlock || len == 0并且**syncBlock(具体用来干什么??)**为true则调用flushOrSync(true)将data和meta写入磁盘。
其次在PacketResponder线程中在接收下游ack之后判断从ackQueue中取出的packet是否为lastPacketBlock,如果是则调用finalizeBlock(startTime),然后向上游发送ack

errors in pipeline close

client端在close阶段可能发生error的地方是将最后一个packet发送到pipeline时可能会抛出异常然后将hasError置为true

dn端在close阶段发生error的地方可能是在BlockReceiver.receiverPacket中判断packet为最后一个packet并且syncBlock为true则调用flushOrSync(true),flushOrSync(true)可能会抛出异常(该IOException一路从DataXceiver.writeBlock中抛出)。

pipeline close发生error之后的修复

client端发送error将hasError置为true,会执行processDatanodeError吗???
不会执行??但是在processDatanodeError代码中有这样一段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
// If we had an error while closing the pipeline, we go through a fast-path
// where the BlockReceiver does not run. Instead, the DataNode just finalizes
// the block immediately during the 'connect ack' process. So, we want to pull
// the end-of-block packet from the dataQueue, since we don't actually have
// a true pipeline to send it over.
//
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished.
// 如果在pipeline close阶段发生error,有一个快速的方法,该方法不用在dn上new BlockReceiver
// 而是让dn在pipeline重建时就立马使block Finalized。因此只是将最后一个packet从dataQueue中取出
// 没有必要往一个pipeline中发送。
synchronized (dataQueue) {
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
assert endOfBlockPacket.lastPacketInBlock;
assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
lastAckedSeqno = endOfBlockPacket.seqno;
dataQueue.notifyAll();
}
endBlock();
} else {
initDataStreaming();
}
}

会执行??但是errorIndex可能为-1呀。。。。

pipeline在close阶段是已经将之前的packet的ack返回给client,然后client才会发送最后一个packet(此packet为null)到pipeline中,那么在dn端的error应该和pipeline stream阶段在dn端发生的error情况类似,还没有具体搞明白????。


考虑客户端写文件的过程中宕机,那么在lease soft limit过期之前,其他的客户端不能写这个文件,等到lease soft limit过期后,其他客户端可以写这个文件,在写文件之前,会首先检查文件是不是没有关闭,如果没有,那么就会进入lease recovery和block recovery阶段,这个阶段的目的是使文件的最后一个block的所有副本数据达到一致,因为客户端写block的多个副本是pipeline写,pipeline中的副本数据不一致很正常。