本篇主要记录下HDFS写文件的流程。其写入流程与普通文件写入流程类似,首先创建一个输出流OutputStream,然后通过这个输出流写入数据。在HDFS中数据传输的基本单元为Packet(默认64k),每个packet又由很多个chunk组成,chunk是文件校验的基本单位,一个chunk一个chunksum,chunk是校验单位也就是写入单位,将chunk写入packet,一个packet写满之后,将packet发送到pipeline中。

下面从代码层次去详细解读下write流程。其写入流程图如下:
![write流程图](/blogimgs/HDFS write解析/hdfs-write-flow.png “write流程图”)

创建一个输出流

HDFS写文件跟java写文件类似,都需要先打开一个文件流,HDFS是通过FileSystem对象打开文件流的,代码流程为通过FileSystem.get(conf)得到一个FileSystem对象,然后调用create(Path)或者append(Path)打开一个FSDataOutputStream流,看下create代码:

1
2
3
public FSDataOutputStream create(Path f) throws IOException {
return create(f, true); // Path overwrite
}

FileSystem是一个抽象类,将具体的create的具体操作留给子类实现,例如DistributedFileSystem、WebHdfsFileSystem等,不同的文件系统具有不同打开文件的行为,我们以DistributedFileSystem为例,create方法实现,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize, final Progressable progress,
final ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
checksumOpt);
return dfs.createWrappedOutputStream(dfsos, statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF);
}

DistributedFileSystem.create()中new一个FileSystemLinkResolver的匿名类,在resolve中调用在匿名类中重写的doCall()方法,如果doCall抛出UnresolvedLinkException异常,被resolve捕获调用next(),进行再次打开。(这段代码跟open一个FSDataInputStream类似)

doCall()中调用dfs.create返回一个DFSOutputStream对象,然后再通过dfs.createWrappedOutputStream包装一个HdfsDataOutputStream对象返回给FSDataOutputStream,FSDataOutputStream是HdfsDataOutputStream的父类,这样就通过FileSystem.create(path)打开了一个文件流。

doCall中dfs是FSDataOutputStream的成员变量DFSClient,其create方法中得到一个DFSOutputStream实例,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public DFSOutputStream create(String src, 
FsPermission permission,
EnumSet<CreateFlag> flag,
short replication,
long blockSize,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
replication, blockSize, progress, buffersize, checksumOpt, null);
}

public DFSOutputStream create(String src,
FsPermission permission,
EnumSet<CreateFlag> flag,
boolean createParent,
short replication,
long blockSize,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt,
InetSocketAddress[] favoredNodes) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getFileDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
String[] favoredNodeStrs = null;
if (favoredNodes != null) {
favoredNodeStrs = new String[favoredNodes.length];
for (int i = 0; i < favoredNodes.length; i++) {
favoredNodeStrs[i] =
favoredNodes[i].getHostName() + ":"
+ favoredNodes[i].getPort();
}
}
// 在得到输出流的过程中,不会对lease进行检查,
// 只是在创建file时,添加lease???
// 创建file之后,根据创建的file new一个FSDataOutputStream
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
favoredNodeStrs);
// 得到输出流,也就得到了该file的lease,得到lease之后就应该起个线程对其进行续约
beginFileLease(result.getFileId(), result);
return result;
}

文件的父目录如果不存在,dfs.create时会自动创建其父目录,在dfs.create传参时,将createParent设置为true。

dfs.create中得到一个DFSOutputStream的实例,DFSOutputStream实例通过静态方法newStreamForCreate得到。在HDFS写文件中是通过Lease(租约)来维护写文件凭证的,所以得到一个文件的写权限之后将其租约进行存储并定时更新。

DFSOutputStream实例

DFSOutputStream的构造方法是私有的,则实例通过静态方法newStreamForCreate得到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException {
HdfsFileStatus stat = null;

// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
// retryCount 是 10
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
// rpc 调用
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS);
break;
} catch (RemoteException re) {
...
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
// 启动DataStreamer线程
out.start();
return out;
}

newStreamForCreate先通过rpc请求namenode创建一个文件,然后通过该文件打开一个输出流。

rpc请求创建文件启动了重试机制,默认重试10次,通过stat = dfsClient.namenode.create创建,成功之后break出while循环。create远程调用的流程为NameNodeRpcServer.create -> namesystem.startFile -> startFileInt -> startFileInternal。在startFileInternal中通过newNode = dir.addFile(src, permissions, replication, blockSize, holder, clientMachine);向namenode中添加一个文件,并将clientname对src的租约进行存储leaseManager.addLease(newNode.getFileUnderConstructionFeature().getClientName(), src);,关于租约的更多内容请看HDFS中租约解析

先看下startFileInternal的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// FSNamesystem.java
private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
String src, PermissionStatus permissions, String holder,
String clientMachine, boolean create, boolean overwrite,
boolean createParent, short replication, long blockSize,
boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
EncryptedKeyVersion edek, boolean logRetryEntry)
throws FileAlreadyExistsException, AccessControlException,
UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, RetryStartFileException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src);
final INode inode = iip.getLastINode();
// 这里判断该path是否已经以路径的形式存在
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException(src +
" already exists as a directory");
}
...
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
...
try {
BlocksMapUpdateInfo toRemoveBlocks = null;
// 该file不存在则create
if (myFile == null) {
if (!create) {
throw new FileNotFoundException("Can't overwrite non-existent " +
src + " for client " + clientMachine);
}
} else {
// 该file已经存在,则重写
if (overwrite) {
toRemoveBlocks = new BlocksMapUpdateInfo();
List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now());
if (ret >= 0) {
incrDeletedFileCount(ret);
removePathAndBlocks(src, null, toRemoveINodes, true);
}
// 这是怎么执行到这的??
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
// 这里会检查整个hdfs上inode的上限,一般不设置
// 曾经被问过hdfs中block的上限是多少,估计问的就是这个吧
// 由dfs.namenode.max.objects控制
// 还有个属性控制一个file的最多block个数,默认是1024*1024
// dfs.namenode.fs-limits.max-blocks-per-file控制
checkFsObjectLimit();
INodeFile newNode = null;

// Always do an implicit mkdirs for parent directory tree.
Path parent = new Path(src).getParent();
// 递归的创建父目录
if (parent != null && mkdirsRecursively(parent.toString(),
permissions, true, now())) {
// 将file添加到namespace中
newNode = dir.addFile(src, permissions, replication, blockSize,
holder, clientMachine);
}

if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace");
}
// 将clientName对file的lease放入LeaseManager中
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
.getClientName(), src);
...
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return toRemoveBlocks;
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
ie.getMessage());
throw ie;
}
}

在得到FSDataOutputStream时,并没有对lease进行校验,那么当client1得到fileA的FSDataOutputStream之后,将其对应的租约add到LeaseManager中,此时client2也再申请fileA的FSDataOutputStream,此时会发生什么?client2也得到fileA的输出流,并将client2对应的lease也放入LeaseManager中?那么此时就有两个client持有文件fileA的租约。 是在写入bytes时进行lease 检查吗???疑惑中。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
// 如果当前client没有租约,则创建一个
lease = new Lease(holder);
leases.put(holder, lease);
sortedLeases.add(lease);
} else {
// 当前client已经持有lease,则更新时间
renewLease(lease);
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
return lease;
}

回到newStreamForCreate中,通过namenode创建的文件new一个输出流DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
// packet默认大小64k
// 计算每个packet的chunk个数,和packet的大小
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
streamer = new DataStreamer(stat, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
}

DFSOutputStream中有个重要的线程DataStreamer,该线程主要负责向pipeline中的dn发送packet。

再次回到newStreamForCreate中,调用out.start()启动DataStreamer线程。

DFSOutputStream创建完毕之后,回到DFSClient.create中,执行beginFileLease(result.getFileId(), result)开启Lease定时Renew机制LeaseRenewer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/** Get a lease and start automatic renewal */
private void beginFileLease(final long inodeId, final DFSOutputStream out)
throws IOException {
getLeaseRenewer().put(inodeId, out, this);
}
// LeaseRenewer 是客户端check是否更新租约
// A thread per namenode per user
synchronized void put(final long inodeId, final DFSOutputStream out,
final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) {
//start a new deamon with a new id.
final int id = ++currentId;
daemon = new Daemon(new Runnable() {
@Override
public void run() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " started");
}
// 调用LeaseRenewer.run(final int id)
// 在run中调用renew对租约续约
LeaseRenewer.this.run(id);
} catch(InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+ " is interrupted.", e);
}
} finally {
synchronized(LeaseRenewer.this) {
Factory.INSTANCE.remove(LeaseRenewer.this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " exited");
}
}
}

@Override
public String toString() {
return String.valueOf(LeaseRenewer.this);
}
});
daemon.start();
}
dfsc.putFileBeingWritten(inodeId, out);
emptyTime = Long.MAX_VALUE;
}
}

回到daCall方法中,调用dfs.createWrappedOutputStream(dfsos, statistics),将DFSOutputStream封装为HdfsDataOutputStream类型(FSDataOutputStream子类),将结果return给FSDataOutputStream类型的输出流。到此FSDataOutputStream输出流创建完毕。接着该调用write方法,进行数据的写入。

向输出流中写bytes数据流

写入操作的API是通过FSDataOutputStream的对象out调用write(byte[]),调用流程为out.write(bytes) -> FilterOutputStream.write -> DataOutputStream.write -> out.write(byte[], off, len) -> FSOutputSummer.write(byte b[], int off, int len)

1
2
3
4
5
6
7
8
9
public synchronized void write(byte b[], int off, int len)
throws IOException {
checkClosed();
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
}
}

在write中根据写入len不断的调用write1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private int write1(byte b[], int off, int len) throws IOException {
// 写入长度大于本地buf的长度时,直接写入本地buf的长度
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;
writeChecksumChunks(b, off, length);
return length;
}
// 当len小于本地buf的长度时,先写入buf,当buf写满之后,flushBuffer
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}

_写入数据时,是先将数据写入本地buf_,buf默认长度为this.buf = new byte[sum.getBytesPerChecksum() * BUFFER\_NUM\_CHUNKS]; BUFFER\_NUM\_CHUNKS = 9,即9个chunk的长度4608。buf写满之后对其数据生成chunksum写入packet。

当写入数据的len大于buf的长度时,则数据不写入buf,直接调用writeChecksumChunks将buf长度大小的数据生成chunksum,并写入packet中。
当写入数据的len小于buf的长度时,则将数据copy到buf中,等buf满时,调用flushBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false, true);
}

protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
}
}

// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}

在flushBuffer中依然会调用writeChecksumChunks

1
2
3
4
5
6
7
8
9
10
11
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
// 计算checksum
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
// 一个chunk一个chunk的写入packet
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}

在writeChecksumChunks中先调用calculateChunkedSums计算数据的checksum,然后调用writeChunk将每个chunk写入packet中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// DFSOutputStream.class
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
...
// 如果当前currentPacket为null,则新创建一个
if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
...
}
// 先写入checksum,然后写入data
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;

// If packet is full, enqueue it for transmission
// currentPacket已满 或者当前写入block的长度等于block的大小
if (currentPacket.numChunks == currentPacket.maxChunks ||
bytesCurBlock == blockSize) {
...
// 将packet放入队列dataQueue中
waitAndQueueCurrentPacket();

// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumBufSize();
}
// 最后一个packet时,可能会小于block的大小,需重新计算下packet的大小
if (!appendChunk) {
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
// 达到block大小之后,发生一个空的packet
if (bytesCurBlock == blockSize) {
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}

writeChunk先将chunk写入currentPacket中,当currentPacket写满之后调用waitAndQueueCurrentPacket,将packet放入dataQueue队列,等待DataStreamer线程将packet写入pipeline中,_整个block发送完毕之后将发送一个空的packet_。

将packet放入dataQueue的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
// dfs.client.write.max-packets-in-flight 默认值80
// 当dataQueue和ackQueue的大小之和大于80时,等待
while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
try {
dataQueue.wait();
} catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
}
}
checkClosed();
// 将currentPacket放入dataQueue
queueCurrentPacket();
} catch (ClosedChannelException e) {
}
}
}

private void queueCurrentPacket() {
synchronized (dataQueue) {
if (currentPacket == null) return;
dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.seqno;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
}
currentPacket = null;
// 通知DataStreamer线程消费
dataQueue.notifyAll();
}
}

DFSOutputStream.DataStreamer发送packet

DFSOutputStream中有两个队列,一个dataQueue一个ackQueue,两个队列大小的和不能超过_dfs.client.write.max-packets-in-flight_的值。
将currentPacket放入dataQueue中,并通知DataStreamer线程来消费,DataStreamer的run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// DataStreamer.run
public void run() {
long lastPacket = Time.now();
TraceScope traceScope = null;
if (traceSpan != null) {
traceScope = Trace.continueSpan(traceSpan);
}
while (!streamerClosed && dfsClient.clientRunning) {

// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}

Packet one;
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
doSleep = processDatanodeError();
}

synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.now();
// dataQueue为null,并且时间未超时,则等待
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.now();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// get packet to be sent.
// 发送packet,dataQueue为null,则发送一个心跳
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
} else {
one = dataQueue.getFirst(); // regular data packet
}
}
assert one != null;
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
...
// 建立pipeline
setPipeline(nextBlockOutputStream());
// 启动ResponseProcessor线程,更新DataStreamer的状态为DATA_STREAMING
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
...
setupPipelineForAppendOrRecovery();
initDataStreaming();
}

long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
...
// 当前packet是block的最后一个packet,等待接收之前所有packet的ack
if (one.lastPacketInBlock) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}

// send the packet
// 将packet从dataQueue移到ackQueue,准备发送packet
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
...
// write out data to remote datanode
try {
// 将packet写入pipeline
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
throw e;
}
lastPacket = Time.now();

// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}

if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}

// Is this block full?
// 将当前packet发送之后,即将当前packet放入ackQueue
// 如果当前packet是最后一个,则继续等待此packet的ack,
// 然后endBlock
if (one.lastPacketInBlock) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}

endBlock();
}
if (progress != null) { progress.progress(); }

// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (restartingNodeIndex == -1) {
DFSClient.LOG.warn("DataStreamer Exception", e);
}
if (e instanceof IOException) {
setLastException((IOException)e);
} else {
setLastException(new IOException("DataStreamer Exception: ",e));
}
hasError = true;
if (errorIndex == -1 && restartingNodeIndex == -1) {
// Not a datanode issue
streamerClosed = true;
}
}
}
if (traceScope != null) {
traceScope.close();
}
closeInternal();
}

DataStreamer线程主要是从dataQueue中拿出packet发送到pipeline,通过setPipeline(nextBlockOutputStream())创建pipeline,nextBlockOutputStream打开一个DataOutputStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
do {
hasError = false;
lastException.set(null);
errorIndex = -1;
success = false;

long startTime = Time.now();
DatanodeInfo[] excluded =
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet()
.toArray(new DatanodeInfo[0]);
block = oldBlock;
// 向namenode发送add block请求
// add block 时会checkLease(在analyzeFileState中调用)
lb = locateFollowingBlock(startTime,
excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
storageTypes = lb.getStorageTypes();

//
// Connect to first DataNode in the list.
// 与nodes中的第一个datanode建立连接
// 向下游发送写请求,由Sender发送
success = createBlockOutputStream(nodes, storageTypes, 0L, false);

if (!success) {
DFSClient.LOG.info("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, fileId, src,
dfsClient.clientName);
block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
}
} while (!success && --count >= 0);

if (!success) {
throw new IOException("Unable to create new block.");
}
return lb;
}

private boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
...
Status pipelineStatus = SUCCESS;
String firstBadLink = "";
boolean checkRestart = false;
...
// persist blocks on namenode on next flush
persistBlocks.set(true);

int refetchEncryptionKey = 1;
while (true) {
boolean result = false;
DataOutputStream out = null;
try {
assert null == s : "Previous socket unclosed";
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
// 建立socket连接
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);

OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn);
...
// send the request
// 向dn发送写请求,由DataXceiver接收
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);

// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();

// Got an restart OOB ack.
// If a node is already restarting, this status is not likely from
// the same node. If it is from a different node, it is not
// from the local datanode. Thus it is safe to treat this as a
// regular node error.
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
restartingNodeIndex == -1) {
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
}
}
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
restartingNodeIndex = -1;
hasError = false;
} catch (IOException ie) {
if (restartingNodeIndex == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
}
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ nodes[0] + " : " + ie);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
// Don't close the socket/exclude this node just yet. Try again with
// a new encryption key.
continue;
}

// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorIndex = i;
break;
}
}
} else {
assert checkRestart == false;
errorIndex = 0;
}
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
restartingNodeIndex = errorIndex;
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex]);
}
hasError = true;
setLastException(ie);
result = false; // error
} finally {
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
}
}
return result;
}
}

nextBlockOutputStream中通过locateFollowingBlock得到block的locateLocations,由createBlockOutputStream与nodes中的第一个dn建立socket连接(此过程中会建立一个输出流和一个输入流,其中输出流用来向下游发送packet,输入流用来接收下游发来的ack),并发送writeBlock请求。最后nextBlockOutputStream返回nodes列表,由setPipeline设置当前block的pipeLine

1
2
3
4
5
6
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
String[] storageIDs) {
this.nodes = nodes;
this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
}

建立pipeLine之后,还要启一个新的线程ResponseProcessor接收packet的ack,这个线程在initDataStreaming中启动,并更新DataStreamer线程的状态为DATA_STREAMING

1
2
3
4
5
6
7
private void initDataStreaming() {
this.setName("DataStreamer for file " + src +
" block " + block);
response = new ResponseProcessor(nodes);
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}

准备工作结束之后,就是发送packet,调用one.writeTo(blockStream)这里只是将packet写入pipeline中的第一个dn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void writeTo(DataOutputStream stm) throws IOException {
final int dataLen = dataPos - dataStart;
final int checksumLen = checksumPos - checksumStart;
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;

PacketHeader header = new PacketHeader(
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
// checksumPos不等于dataStart时,将checksum移动到data前面,
// 紧挨着data,为header空出足够的空间
if (checksumPos != dataStart) {
// Move the checksum to cover the gap. This can happen for the last
// packet or during an hflush/hsync call.
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
checksumPos = dataStart;
checksumStart = checksumPos - checksumLen;
}

final int headerStart = checksumStart - header.getSerializedSize();
assert checksumStart + 1 >= header.getSerializedSize();
assert checksumPos == dataStart;
assert headerStart >= 0;
assert headerStart + header.getSerializedSize() == checksumStart;

// Copy the header data into the buffer immediately preceding the checksum
// data.
// 将header复制到packet的buf中,组成一个完整的packet
System.arraycopy(header.getBytes(), 0, buf, headerStart,
header.getSerializedSize());

// corrupt the data for testing.
if (DFSClientFaultInjector.get().corruptPacket()) {
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
}

// Write the now contiguous full packet to the output stream.
// 将buf写入输出流中
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);

// undo corruption.
if (DFSClientFaultInjector.get().uncorruptPacket()) {
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
}
}

DataXceiver线程写入DataNode

以上的流程可以看做是client端,client端将数据发送到dn上,由dn负责将packet写入本地磁盘,并向下一个dn发送。这其中涉及到DataXceiverServer线程和DataXceiver线程,DataXceiverServer相当于监听器,而DataXceiver相当于handle,由DataXceiverServer监听来自client的socket请求,根据请求创建DataXceiver线程。由DataXceiver线程进行写dn。看下DataXceiverServer.run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
// 接收client的socket请求
peer = peerServer.accept();

// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
// 查看当前线程是否超出上限 dfs.datanode.max.transfer.threads控制
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
// 新启一个DataXceiver线程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored)
...
}
}

具体的处理逻辑在DataXceiver线程中,查看run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void run() {
int opsProcessed = 0;
Op op = null;

try {
...
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

try {
...
// 读取操作码
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
...
}
...
// 处理操作码
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
...
} finally {
...
}
}

DataXceiver.run方法中主要是读取操作码(readOp)并解析操作码(processOp),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Receiver.class
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}

这里是写入操作,则调用opWriteBlock,opWriteBlock中又调用writeBlock,DataXceiver重新了writeBlock,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// DataXceiver.class
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
// clientname不为null,则isDatanode为false,isClient为true
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
...
// We later mutate block's generation stamp and length, but we need to
// forward the original version of the block to downstream mirrors, so
// make a copy here.
final ExtendedBlock originalBlock = new ExtendedBlock(block);
if (block.getNumBytes() == 0) {
block.setNumBytes(dataXceiverServer.estimateBlockSize);
}
...
// reply to upstream datanode or client
// 向upstream或者client建立一个输出流,用于发送ack
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
getOutputStream(),
HdfsConstants.SMALL_BUFFER_SIZE));
...
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
final String storageUuid;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
// 实例化blockReceiver,用于接收packet
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);

storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
block, latestGenerationStamp, minBytesRcvd);
}

//
// Connect to downstream machine, if appropriate
//
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
// 得到downstream的dn
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
...
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
// 建立连接
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);

OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);

// Do not propagate allowLazyPersist to downstream DataNodes.
// 向downstream发送写请求
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, false);

mirrorOut.flush();

// read connect ack (only for clients, not for replication req)
// 得到下游的connect-ack
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
firstBadLink);
}
}
// 这个catch捕获的是pipeline建立时的异常
// 当前dn与下游建立连接时发生的异常
} catch (IOException e) {
if (isClient) {
BlockOpResponseProto.newBuilder()
.setStatus(ERROR)
// NB: Unconditionally using the xfer addr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
LOG.error(datanode + ":Exception transfering block " +
block + " to mirror " + mirrorNode + ": " + e);
throw e;
} else {
LOG.info(datanode + ":Exception transfering " +
block + " to mirror " + mirrorNode +
"- continuing without the mirror", e);
}
}
}// if结束,判断是否有下游dn,是否建立连接

// send connect-ack to source for clients and not transfer-RBW/Finalized
if (isClient && !isTransfer) {
...
// 向upstream发送connect-ack
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// 向上游发送connect-ack之后准备接收block packet
// receive the block and mirror to the next target
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
// 接收block
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
...
}

// update its generation stamp
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}

// if this write is for a replication request or recovering
// a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);
LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ localAddress + " of size " + block.getNumBytes());
}


} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
// close all opened streams
...
blockReceiver = null;
}
...
}

writeBlock中实例化blockReceiver,由blockReceiver.receiveBlock接收packet并写入downstream和本地磁盘,在接收packet之前先创建于downstream的连接,并向downstream发送写请求。下面来看下receiveBlock的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {

syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
throttler = throttlerArg;

this.replyOut = replyOut;
this.isReplaceBlock = isReplaceBlock;

try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
// 这里是个空循环
// 不停的调用receivePacket接收packet,直到整个block的packet接收完
while (receivePacket() >= 0) { /* Receive until the last packet */ }

// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}

// If this write is for a replication or transfer-RBW/Finalized,
// then finalize block or convert temporary to RBW.
// For client-writes, the block is finalized in the PacketResponder.
if (isDatanode || isTransfer) {
// close the block/crc files
close();
block.setNumBytes(replicaInfo.getNumBytes());

if (stage == BlockConstructionStage.TRANSFER_RBW) {
// for TRANSFER_RBW, convert temporary to RBW
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
// Finalize the block.
datanode.data.finalizeBlock(block);
}
datanode.metrics.incrBlocksWritten();
}

} catch (IOException ioe) {
...
} finally {
...
}
}

receiveBlock会启动PacketResponder线程来接收来自downstream的packet ack和向upstream发送packet ack。PacketResponder中也有个ackQueue队列(注意和DFSOutputStream中的dataQueue和ackQueue区分),receivePacket将接收的packet放入ackQueue中,由PacketResponder接收ack并从ackQueue中取出packet。receivePacket代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
private int receivePacket() throws IOException {
// read the next packet
// 从流中读取packet
packetReceiver.receiveNextPacket(in);

PacketHeader header = packetReceiver.getHeader();
...
// Sanity check the header
...
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();

// avoid double sync'ing on close
if (syncBlock && lastPacketInBlock) {
this.syncOnClose = false;
}

// update received bytes
final long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
}

// put in queue for pending acks, unless sync was requested
// 在向downstream发送packet之前,将packet放入ackQueue中
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}

//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
long begin = Time.monotonicNow();
// 向downstream发送packet
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
} catch (IOException e) {
handleMirrorOutError(e);
}
}
// 将packet的data部分写入本地时的buf
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();

if (lastPacketInBlock || len == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
}
// sync block if requested
if (syncBlock) {
flushOrSync(true);
}
} else {
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();

if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
throw new IOException("Invalid checksum length: received length is "
+ checksumReceivedLen + " but expected length is " + checksumLen);
}

if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
try {
// 校验checksum
verifyChunks(dataBuf, checksumBuf);
} catch (IOException ioe) {
// checksum error detected locally. there is no reason to continue.
if (responder != null) {
try {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock,
Status.ERROR_CHECKSUM);
// Wait until the responder sends back the response
// and interrupt this thread.
Thread.sleep(3000);
} catch (InterruptedException e) { }
}
throw new IOException("Terminating due to a checksum error." + ioe);
}

if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage.
translateChunks(dataBuf, checksumBuf);
}
}
// checksum在传输过程中丢失,则重新计算
if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
// checksum is missing, need to calculate it
checksumBuf = ByteBuffer.allocate(checksumLen);
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}

// by this point, the data in the buffer uses the disk checksum

final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
// 得到磁盘中当前block的长度
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
//finally write to the disk :
// 当磁盘中已写入block的长度不是chunk的整数倍,则将最后一个checksum进行重写
if (onDiskLen % bytesPerChecksum != 0) {
// prepare to overwrite last checksum
adjustCrcFilePosition();
}

// If this is a partial chunk, then read in pre-existing checksum
Checksum partialCrc = null;
if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("receivePacket for " + block
+ ": bytesPerChecksum=" + bytesPerChecksum
+ " does not divide firstByteInBlock=" + firstByteInBlock);
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize;
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
}

int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
// data的len
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);

// Write data to disk.
long begin = Time.monotonicNow();
// 将data写入磁盘
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
...
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else if (partialCrc != null) {
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (len > bytesPerChecksum) {
throw new IOException("Unexpected packet data length for "
+ block + " from " + inAddr + ": a partial chunk must be "
+ " sent in an individual packet (data length = " + len
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
}
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
checksumOut.write(buf);
if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len);
}
partialCrc = null;
} else {
// write checksum
final int offset = checksumBuf.arrayOffset() +
checksumBuf.position();
final int end = offset + checksumLen;
lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
end);
// 将checksum写入磁盘
checksumOut.write(checksumBuf.array(), offset, checksumLen);
}

/// flush entire packet, sync if requested
// 将block data和metadata flush到磁盘
flushOrSync(syncBlock);
replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
datanode.metrics.incrBytesWritten(len);
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
...
return lastPacketInBlock?-1:len;
}

发送接收ACK

receivePacket从流中读出packet,在其向downstream发送时,先将packet当如ackQueue队列中,由PacketResponder线程等待接收此packet的ack,然后向downstream发送packet,最后将packet写入本地磁盘。

下面看下PacketResponder线程的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
// read an ack from downstream datanode
// 读取ack
ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
// Process an OOB ACK.
Status oobStatus = ack.getOOBStatus();
if (oobStatus != null) {
LOG.info("Relaying an out of band ack of type " + oobStatus);
sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
Status.SUCCESS);
continue;
}
// 新收到的ack的seqno
seqno = ack.getSeqno();
}
// 获得一个ack
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
// 按照发送packet的顺序接收packet ack
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
expected = pkt.seqno;
...
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info(myString, ioe);
}
}

if (Thread.interrupted() || isInterrupted) {
/*
* The receiver thread cancelled this thread. We could also check
* any other status updates from the receiver thread (e.g. if it is
* ok to write to replyOut). It is prudent to not send any more
* status back to the client because this datanode has a problem.
* The upstream datanode will detect that this datanode is bad, and
* rightly so.
*
* The receiver thread can also interrupt this thread for sending
* an out-of-band response upstream.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}

if (lastPacketInBlock) {
// Finalize the block and close the block file
finalizeBlock(startTime);
}
// 向upstream发送ack
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
datanode.checkDiskErrorAsync();
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
}
}
} catch (Throwable e) {
if (running) {
LOG.info(myString, e);
running = false;
receiverThread.interrupt();
}
}
}
LOG.info(myString + " terminating");
}

PacketResponder线程主要用来接收和发送packet ack,并且是按照packet的写入顺序发送ack。

如果packet的type为LAST_IN_PIPELINE时,不进入if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError)语句,直接进入if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE),从ackQueue队列中拿出第一个packet,sendAckUpstream向upstream发送ack,发送结束之后从ackQueue中移除packet。因为ackQueue的里packet的顺序是packet的写入顺序,则这样就保证了ack也是有序的。

如果packet的type为HAS_DOWNSTREAM_IN_PIPELINE时,进入if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError),从输入流中读取ack,再进入if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE)从ackQueue中读取expect的packet,进行一些列校验之后,想upstream发送acksendAckUpstream(此时发送的ack包括自己和downstream的ack),发送结束之后从ackQueue中移除packet。

PacketResponder线程负责dn上packet ack的发送和接收,ResponseProcessor线程负责client端packet ack的接收,看下ResponseProcessor线程的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public void run() {
...
PipelineAck ack = new PipelineAck();
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
// read an ack from the pipeline
long begin = Time.monotonicNow();
// 读取ack
ack.readFields(blockReplyStream);
...
long seqno = ack.getSeqno();
// processes response status from datanodes.
// 从pipeline的最后一个dn开始接收ack
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = ack.getReply(i);
...
// node error
if (reply != SUCCESS) {
setErrorIndex(i); // first bad datanode
// throw 则跳出for循环,被catch捕获异常
throw new IOException("Bad response " + reply +
" for block " + block +
" from datanode " +
targets[i]);
}
}

assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}

// a success ack for a data packet
Packet one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
// 从输入流中读取的ack的seqno与ackQueue中取得的seqno不一样则抛出异常
if (one.seqno != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
isLastPacketInBlock = one.lastPacketInBlock;

// Fail the packet write for testing in order to force a
// pipeline recovery.
if (DFSClientFaultInjector.get().failPacket() &&
isLastPacketInBlock) {
failPacket = true;
throw new IOException(
"Failing the last packet for testing.");
}

// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
// 接收到ack后,从ackQueue中移除packet
synchronized (dataQueue) {
lastAckedSeqno = seqno;
ackQueue.removeFirst();
dataQueue.notifyAll();

one.releaseBuffer(byteArrayManager);
}
} catch (Exception e) {
if (!responderClosed) {
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
// If no explicit error report was received, mark the primary
// node as failed.
tryMarkPrimaryDatanodeFailed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (restartingNodeIndex == -1) {
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
+ " for block " + block, e);
}
responderClosed = true;
}
}
}
}

ResponseProcessor线程读取ack,并从pipeline中的最后一个dn检查packet的状态,如果发现error则设置errorIndex为当前dn的索引,在catch中设置hasError = true;如果没有error则从ackQueue中移除packet。

pipeline中写发送错误

在写数据的过程中,如果Pipeline数据流管道中的一个DataNode节点写失败了会发生什问题、需要做哪些内部处理呢?下面从代码中寻找答案。

ResponseProcessor线程中从接收到的ack中发现error,则设置errorIndex为错误节点的index,hasError标识为true,在DataStreamer线程中发现属性的变化,进行错误处理,看下部分DataStreamer的run代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void run() {
...
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
Packet one;
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
// 处理错误
doSleep = processDatanodeError();
}
...
} catch (Throwable e) {
...
}
}
...
}

DataStreamer线程发现error之后调用processDatanodeError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private boolean processDatanodeError() throws IOException {
if (response != null) {
DFSClient.LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
return true;
}
// 关闭pipeline流
closeStream();
// 将ackQueue中的packet移到dataQueue
// move packets from ack queue to front of the data queue
synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}

// Record the new pipeline failure recovery.
// 创建新的pipeline可以进行重试
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
}
// 重建pipeline
boolean doSleep = setupPipelineForAppendOrRecovery();

if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {

// If we had an error while closing the pipeline, we go through a fast-path
// where the BlockReceiver does not run. Instead, the DataNode just finalizes
// the block immediately during the 'connect ack' process. So, we want to pull
// the end-of-block packet from the dataQueue, since we don't actually have
// a true pipeline to send it over.
//
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) {
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
assert endOfBlockPacket.lastPacketInBlock;
assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
lastAckedSeqno = endOfBlockPacket.seqno;
dataQueue.notifyAll();
}
endBlock();
} else {
// 启动ResponseProcess线程
initDataStreaming();
}
}
return doSleep;
}

processDatanodeError方法中主要逻辑是setupPipelineForAppendOrRecovery,创建新的pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
private boolean setupPipelineForAppendOrRecovery() throws IOException {
// check number of datanodes
...
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
// Sleep before reconnect if a dn is restarting.
// This process will be repeated until the deadline or the datanode
// starts back up.
if (restartingNodeIndex >= 0) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
4000L);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
lastException.set(new IOException("Interrupted while waiting for " +
"datanode to restart. " + nodes[restartingNodeIndex]));
streamerClosed = true;
return false;
}
}
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
if (errorIndex >= 0) {
...
if (nodes.length <= 1) {
lastException.set(new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting..."));
streamerClosed = true;
return false;
}
// 将错误的dn加入failed列表中
failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
// 将正常的dn复制到新的数组newnodes里
arraycopy(nodes, newnodes, errorIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length];
arraycopy(storageIDs, newStorageIDs, errorIndex);
// 使用newnodes设置pipeline
setPipeline(newnodes, newStorageTypes, newStorageIDs);

// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (errorIndex > restartingNodeIndex) {
restartingNodeIndex = -1;
} else if (errorIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else {
// this shouldn't happen...
assert false;
}
}

if (restartingNodeIndex == -1) {
hasError = false;
}
lastException.set(null);
errorIndex = -1;
}

// Check if replace-datanode policy is satisfied.
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
nodes, isAppend, isHflushed)) {
try {
// 补全pipeline的节点数量
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
...
}
}

// get a new generation stamp and an access token
// 生成一个新的stamp
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();

// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
// good reports should follow bad ones, if client committed
// with those nodes.
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
// 与pipeline中的第一个dn建立连接
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
}
...
} // while
// 更新block的stamp
if (success) {
// update pipeline at the namenode
ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
// update client side generation stamp
block = newBlock;
}
return false; // do not sleep, continue processing
}

setupPipelineForAppendOrRecovery从原pipeline中取出正常的dn,将错误dn排除,然后调用addDatanode2ExistingPipeline,加入一个新的dn与原来正常的两个dn组成一个新的pipeline(在这个过程中会transfer之前传输成功的数据),并生成新的stamp和token,调用createBlockOutputStream与pipeline建立socket连接。使用stamp更新block标识,这样namenode可以删除以前发生错误的block。

下面主要看写addDatanode2ExistingPipeline代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void addDatanode2ExistingPipeline() throws IOException {
...
/*
* Is data transfer necessary? We have the following cases.
*
* Case 1: Failure in Pipeline Setup
* - Append
* + Transfer the stored replica, which may be a RBW or a finalized.
* - Create
* + If no data, then no transfer is required.
* + If there are data written, transfer RBW. This case may happens
* when there are streaming failure earlier in this pipeline.
*
* Case 2: Failure in Streaming
* - Append/Create:
* + transfer RBW
*
* Case 3: Failure in Close
* - Append/Create:
* + no transfer, let NameNode replicates the block.
*/
...
//get a new datanode
final DatanodeInfo[] original = nodes;
// 从namenode得到一个新的dn
final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
src, fileId, block, nodes, storageIDs,
failed.toArray(new DatanodeInfo[failed.size()]),
1, dfsClient.clientName);
// 更新pipeline
setPipeline(lb);

//find the new datanode
final int d = findNewDatanode(original);
//transfer replica
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
// transfer的目标节点,也就是新添加的节点
final DatanodeInfo[] targets = {nodes[d]};
final StorageType[] targetStorageTypes = {storageTypes[d]};
// tarnsfer
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
}

总结

hdfs write的流程为:

  • 通过FileSystem.create创建一个FSDataOutputStream输出流,在此过程中client通过rpc向namenode添加一个文件记录,得到该文件的租约,启动一个DataStreamer线程(DataStreamer线程中维护dataQueue和ackQueue队列),并持续更新租约
  • FSDataOutputStream输出流建好之后,就可以调用FSDataOutputStream.write方法进行数据的写入。在写入过程中先将数据写入client本地的buf中,此buf默认是9个chunk的大小,当本地buf写满之后(如果要写入的数据长度大于本地buf的长度,则直接将buf长度的数据写入currentPacket中),计算这些数据的checksum,并写入currentPacket中,currentPacket写满之后放入dataQueue中排队并通知DataStreamer线程去dataQueue中消费数据。(数据先写入本地buf,然后写入packet,等packet满之后才向namenode申请blockId)
  • DataStreamer线程从dataQueue中取出packet,如果DataStreamer的stage为PIPELINE_SETUP_CREATE时,表示当前block的pipeline还没有建立,向namenode申请blockId和block的locations,将申请到的locations组成一个pipeline,与第一个dn建立socket连接,由Sender发送写请求。新启动ResponseProcessor线程接收dn返回的packet ack,并更新DataStreamer的stage,由PIPELINE_SETUP_CREATE变为DATA_STREAMING
  • 将要发送的packet从dataQueue中移到ackQueue中,然后向pipeline中发送packet

以上的流程都发生在client端,下面的流程发生在dn端

  • dn在client创建pipeline时,通过DataXceiverServer接收到client的socket请求,创建一个DataXceiver线程,由DataXceiver线程处理来自client的写请求。
  • DataXceiver线程会实例化一个BlockReceiver对象,并判断是否有downstream,如果有则创建一个downstream的socket,发送写请求。
  • 与downstream建立连接之后,在blockReceiver.receiveBlock循环调用receivePacket接收packet,向downstream发送packet之前将packet放入ackQueue(当前ackQueue是PacketResponder线程维护的)中,然后将data和checksum写入磁盘
  • 在blockReceiver.receiveBlock中还会启动一个PacketResponder线程,此线程负责接收downstream发送的packet ack,校验成功之后从ackQueue中移除,向upstream发送自己的ack和downstream的ack。
  • 最终所有的ack都汇集到ResponseProcessor线程中,如果ack没有error则从ackQueue中移除;如果有error,先将ackQueue中的packet移到dataQueue中,然后将发生error的dn从pipeline中删除,从namenode中重新申请dn与原有的没有发生error的dn组成新的pipeline,在addDatanode2ExistingPipeline中判断是否要transfer已经发送的packet,将已经发送成功的packet从之前正常的dn上transfer到新增加的dn上,并更新block是stamp,这样发生故障的DataNode节点上的block数据会在节点恢复正常后被删除。

疑问

在DataNode节点的流水复制过程中,如果一个DataNode节点发生错误,如接收到的packet出错了,那么该DataNode的BlockReceiver自动结束该线程,也不会向发送端发送确认帧,发送端就会迟迟收不到接收端的确认帧,这样的话,接受端就任务它以后的所有DataNode节点在接受该Block的packet是发生了错误,并把这个情况发送给发送端的发送端。

如果Pipeline中的多个节点在写数据是发生失败,那么只要写成功的block的数量达到dfs.replication.min(默认为1),那么就任务是写成功的,然后NameNode后通过一步的方式将block复制到其他节点,最后事数据副本达到dfs.replication参数配置的个数。