上篇文章介绍了HDFS中副本数的维持,当某个block的副本数大于期望的副本数时,通过BlockManager.processOverReplicatedBlock,对副本进行处理,根据规则选择要删除副本的dn,将dn和block放入invalidateBlocks中,然后ReplicationMonitor线程会从中取出block副本进行删除。当某个block的副本数少于期望的副本数时,通过BlockManager.processMisReplicatedBlock,对副本根据其复制优先级进行划分,放入neededReplications,然后ReplicationMonitor线程会从中取出block副本进行复制。

ReplicationMonitor线程

ReplicationMonitor实现了Runnable接口,通过FSNameSystem.startCommonServices -> blockManager.activate(conf) -> this.replicationThread.start()被启动,主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run() {
while (namesystem.isRunning()) {
try {
// Process replication work only when active NN is out of safe mode.
if (namesystem.isPopulatingReplQueues()) {
computeDatanodeWork();
processPendingReplications();
}
Thread.sleep(replicationRecheckInterval);
} catch (Throwable t) {
...
}
}
}

线程中通过computeDatanodeWork计算DataNode的工作,通过processPendingReplications将超时的请求放入neededReplication队列。

Datanode的工作包括复制和删除,看下computeDatanodeWork的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Compute block replication and block invalidation work that can be scheduled
* on data-nodes. The datanode will be informed of this work at the next
* heartbeat.
*
* @return number of blocks scheduled for replication or removal.
*/
int computeDatanodeWork() {
// Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get
// fixed up later.
if (namesystem.isInSafeMode()) {
return 0;
}

final int numlive = heartbeatManager.getLiveDatanodeCount();
// dfs.namenode.replication.work.multiplier.per.iteration 默认为2
// 一次心跳总共有多少个block进行转移
final int blocksToProcess = numlive
* this.blocksReplWorkMultiplier;
// 一次心跳删除多个dn上的invalidate block,计数单位是x个dn
final int nodesToProcess = (int) Math.ceil(numlive
* this.blocksInvalidateWorkPct);
// 计算复制的工作量
int workFound = this.computeReplicationWork(blocksToProcess);

// Update counters
namesystem.writeLock();
try {
this.updateState();
this.scheduledReplicationBlocksCount = workFound;
} finally {
namesystem.writeUnlock();
}
// 计算删除的工作量
workFound += this.computeInvalidateWork(nodesToProcess);
return workFound;
}

computeDatanodeWork通过numLive得到一次心跳需要复制副本的期望个数blocksToProcess和一次心跳需要删除副本的期望个数nodesToProcess,然后分别有computeReplicationWorkcomputeInvalidateWork分别进行复制和删除。

复制

下面先来看下复制工作computeReplicationWork

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int computeReplicationWork(int blocksToProcess) {
List<List<Block>> blocksToReplicate = null;
namesystem.writeLock();
try {
// Choose the blocks to be replicated
// 从neededReplications中按照优先级挑选出block
blocksToReplicate = neededReplications
.chooseUnderReplicatedBlocks(blocksToProcess);
} finally {
namesystem.writeUnlock();
}
// 对选中的block list进行复制
return computeReplicationWorkForBlocks(blocksToReplicate);
}

computeReplicationWork的功能是从neededReplications中按照优先级选取blocksToProcess个block,然后调用computeReplicationWorkForBlocks进行复制。neededReplications.chooseUnderReplicatedBlocks的选取规则如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}

if (size() == 0) { // There are no blocks to collect.
return blocksToReplicate;
}

int blockCount = 0;
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
// 从priorityQueues中取出对应优先级的list,并转换为iterator
BlockIterator neededReplicationsIterator = iterator(priority);
// priorityToReplIdx是一个map,存放各个优先级中需要复制副本在list中的index
// 得到priority开始复制的index
Integer replIndex = priorityToReplIdx.get(priority);

// skip to the first unprocessed block, which is at replIndex
// 从neededReplicationsIterator中跳过找到replIndex对应的block
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
blocksToProcess = Math.min(blocksToProcess, size());
// blockCount统计复制了多个block,达到blocksToProcess则跳出循环
if (blockCount == blocksToProcess) {
break; // break if already expected blocks are obtained
}

// Loop through all remaining blocks in the list.
// 将block添加到blocksToReplicate对应的优先级list中
while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
// 如果neededReplicationsIterator中没有block,并且此时的优先级为4,即最低优先级
// 则说明block已被复制结束,priorityToReplIdx进行重新赋值
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
}
break;
}
// 如果neededReplicationsIterator中没有block,则进行下一次循环,取出下优先级的list
// 将当前优先级复制的index写入priorityToReplIdx
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}

chooseUnderReplicatedBlocks负责选取blocksToProcess个block,先从priorityQueues中按照优先级取出对应的block list,利用blockCount进行计数,如果达到blocksToProcess,则跳出for,返回blocksToReplicate,否则取下一优先级的block list,接着计数,直到blocksToProcess为止。

选出blocksToReplicate,则调用computeReplicationWorkForBlocks进行复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
BlockCollection bc = null;
int additionalReplRequired;

int scheduledWork = 0;
// 存放需要copy的block列表
List<ReplicationWork> work = new LinkedList<ReplicationWork>();
namesystem.writeLock();
try {
synchronized (neededReplications) {
// 按照优先级进行复制,先复制高优先级的
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
// 得到对应优先级的block list
for (Block block : blocksToReplicate.get(priority)) {
// block should belong to a file
bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append
// 如果block被遗弃(该block没有对应的文件)或者该block正在被追加,则不复制
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
continue;
}
// 得到expect replication
requiredReplication = bc.getBlockReplication();
// get a source data-node
// 存放该block所有副本(包括live、corrupt、excess)的datanode
// 在chooseSourceDatanode中赋值
containingNodes = new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
// 选择一个复制源点srcDode,
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas,
priority);
if(srcNode == null) { // block can not be replicated from any node
LOG.debug("Block " + block + " cannot be repl from any node");
continue;
}

// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
// 有效副本的个数由live的个数+pending的个数(准备复制的个数)
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
// live+准备复制的个数达到了requiredReplication,不进行再次复制
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
blockLog.info("BLOCK* Removing " + block
+ " from neededReplications as it has enough replicas");
continue;
}
}
// additionalReplRequired表示准备要复制的个数
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication
- numEffectiveReplicas;
} else {
additionalReplRequired = 1; // Needed on a new rack
}
// 将block、srcDode等信息包装成ReplicationWork,放入work的list中
work.add(new ReplicationWork(block, bc, srcNode,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority));
}
}
}
} finally {
namesystem.writeUnlock();
}
// 用于存放block副本所在dn集合,避免选target时选到已有副本的dn
final Set<Node> excludedNodes = new HashSet<Node>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
for (DatanodeDescriptor dn : rw.containingNodes) {
excludedNodes.add(dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the block collection itself.
rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
}

namesystem.writeLock();
try {
for(ReplicationWork rw : work){
final DatanodeStorageInfo[] targets = rw.targets;
if(targets == null || targets.length == 0){
rw.targets = null;
continue;
}

synchronized (neededReplications) {
Block block = rw.block;
int priority = rw.priority;
// Recheck since global lock was released
// block should belong to a file
bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
neededReplications.decrementReplicationIndex(priority);
continue;
}
requiredReplication = bc.getBlockReplication();

// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);

if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
rw.targets = null;
blockLog.info("BLOCK* Removing " + block
+ " from neededReplications as it has enough replicas");
continue;
}
}

if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (rw.srcNode.getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
continue;
}
}

// Add block to the to be replicated list
// 将复制block的个数放入dn中,用于统计dn上有多少个副本要复制,是否达到上限
rw.srcNode.addBlockToBeReplicated(block, targets);
scheduledWork++;
DatanodeStorageInfo.incrementBlocksScheduled(targets);

// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
// 放入pending队列等待调度
pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
if(blockLog.isDebugEnabled()) {
blockLog.debug(
"BLOCK* block " + block
+ " is moved from neededReplications to pendingReplications");
}
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
}
}
}
} finally {
namesystem.writeUnlock();
}
...
return scheduledWork;
}

computeReplicationWorkForBlocks代码比较长,差不多200行,看的时候可以把代码逻辑写在纸上,慢慢分析。主要包括3个大for循环,第一个for循环是找到各个block的srcNode,第二个for循环是找到个人block的targets,第三个for循环则是把各个block放入pending队列,等待调度。

第一个for循环

第一个for循环中的srcNode是在chooseSourceDatanode中选取的,选取规则为,优先选择正在下线的节点,然后随机选择一个没有达到复制上限的节点,如果复制的优先级最高并且所以的节点都达到了上限,则随机选一个。下面看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 DatanodeDescriptor chooseSourceDatanode(Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
int live = 0;
int decommissioned = 0;
int corrupt = 0;
int excess = 0;

Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
// 统计的前提是当前磁盘正常
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned += countableReplica;
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess += countableReplica;
} else {
// 统计包含read only的情况
// 当State为read only时,countableReplica为0,live没有统计read only的情况
nodesContainingLiveReplicas.add(storage);
live += countableReplica;
}
// 将block的副本所在dn都放入containingNodes中
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue;
// 如果priority是最高优先级,则可以忽略是否达到复制的上线
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
{
continue; // already reached replication limit
}
// 但是如果当前node的复制个数超过了replicationStreamsHardLimit,不管什么优先级都跳过
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
{
continue;
}
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;
// never use already decommissioned nodes
if(node.isDecommissioned())
continue;
// we prefer nodes that are in DECOMMISSION_INPROGRESS state
// 优先选择正在下线的dn
if(node.isDecommissionInProgress() || srcNode == null) {
srcNode = node;
continue;
}
if(srcNode.isDecommissionInProgress())
continue;
// switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
// 没有正在下线的,则随机选一个
if(DFSUtil.getRandom().nextBoolean())
srcNode = node;
}
if(numReplicas != null)
numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
return srcNode;
}

chooseSourceDatanode中主要是选择srcNode,选取规则上面已经介绍,这里介绍两个限制值

  1. maxReplicationStreams:一个给定节点除最高优先级复制外复制流的最大数目,取参数dfs.namenode.replication.max-streams,参数未配置默认为2;
  2. replicationStreamsHardLimit:一个给定节点全部优先级复制复制流的最大数目,取参数dfs.namenode.replication.max-streams-hard-limit,参数未配置默认为4。针对所有的优先级,算是一个硬性指标吧,所有为hard limit吧。

第二个for循环

第二个for循环是调用ReplicationWork.chooseTargets为每个block选取targets,chooseTargets代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
try {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaStorages, false,
excludedNodes, block.getNumBytes(),
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
} finally {
srcNode.decrementPendingReplicationWithoutTargets();
}
}

然后调用BlockPlacementPolicy的chooseTarget进行选取,这里BlockPlacementPolicyDefault对chooseTarget进行了重写,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy) {
...
// 相同block的副本在每个机架上最多的个数
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
// chosenStorage副本所在节点的list,这里的副本只包括正常的和read only
// results为
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
// 将副本所在的节点加入excludeNodes中,避免其选为target
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}

boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
// 根据需要复制的副本数选择targets,将targets放入results中,返回srcNode
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty());
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}

// sorting nodes to form a pipeline
// 对targets进行排序,得到pipeline
return getPipeline(
(writer != null && writer instanceof DatanodeDescriptor) ? writer
: localNode,
results.toArray(new DatanodeStorageInfo[results.size()]));
}

chooseTarget中又调用了一个同名函数chooseTarget来进行targets选取,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private Node chooseTarget(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy,
final EnumSet<StorageType> unavailableStorages,
final boolean newBlock) {
...
final int numOfResults = results.size();
final int totalReplicasExpected = numOfReplicas + numOfResults;
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0).getDatanodeDescriptor();
}
...
try {
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
throw new NotEnoughReplicasException(
"All required storage types are unavailable: "
+ " unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy);
}
// munOfResults为0是说results中没有数据,也就是当前block没有副本
if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
}
}
// 从results中取出第一个副本所在节点
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
// numOfResults<=1则当前block只有一个副本,则从别的机架上选一个节点放第二个副本
if (numOfResults <= 1) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
// numOfResults<=2则当前block有两个副本,则要判断当前两个副本是否在同一机架
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
if (clusterMap.isOnSameRack(dn0, dn1)) {
// 在同一机架则从别的机架上选一个节点存放第三个副本
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock){
// 如果是新block则从dn1所在的机架上选择一个节点放第三个副本
// results.isEmpty则为newBlock
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else {
// 不在同一机架则从writer所在机架上选一个节点放第三个副本
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 超过3副本的话,剩余副本则随机选择
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) {
...
}
return writer;
}

选完targets之后,还要调用getPipeline对targets进行排序,排序问题是个旅行家问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private DatanodeStorageInfo[] getPipeline(Node writer,
DatanodeStorageInfo[] storages) {
...
synchronized(clusterMap) {
int index=0;
if (writer == null || !clusterMap.contains(writer)) {
writer = storages[0].getDatanodeDescriptor();
}
for(; index < storages.length; index++) {
DatanodeStorageInfo shortestStorage = storages[index];
int shortestDistance = clusterMap.getDistance(writer,
shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
for(int i = index + 1; i < storages.length; i++) {
int currentDistance = clusterMap.getDistance(writer,
storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestStorage = storages[i];
shortestIndex = i;
}
}
//switch position index & shortestIndex
if (index != shortestIndex) {
storages[shortestIndex] = storages[index];
storages[index] = shortestStorage;
}
writer = shortestStorage.getDatanodeDescriptor();
}
}
return storages;
}

第三个for循环

第三个for循环只是调用pendingReplications.increment(block, DatanodeStorageInfo.toDatanodeDescriptors(targets)),将block放入PendingReplicationBlocks.pendingReplications

pendingReplications 在哪被消费 PendingReplicationBlocks线程中???

删除

回到computeDatanodeWork中,复制结束之后,对pendingReplicationBlocksCountunderReplicatedBlocksCountcorruptReplicaBlocksCountscheduledReplicationBlocksCount进行状态更新之后,就开始进行计算删除工作computeInvalidateWork

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int computeInvalidateWork(int nodesToProcess) {
// 得到含有无效block的dn
final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();
Collections.shuffle(nodes);
nodesToProcess = Math.min(nodes.size(), nodesToProcess);
int blockCnt = 0;
for (DatanodeInfo dnInfo : nodes) {
// 删除dn上的无效block
int blocks = invalidateWorkForOneNode(dnInfo);
if (blocks > 0) {
blockCnt += blocks;
if (--nodesToProcess == 0) {
break;
}
}
}
return blockCnt;
}

computeInvalidateWork传进去的参数是节点的个数,而不是要删除block的个数,并且删除block时没有优先级,则直接从invalidateBlocks中拿到所有含有无效block的dn,进行for循环判断nodesToProcess是否满足,由blockCnt统计删除的工作量。

具体的删除工作在invalidateWorkForOneNode中完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private int invalidateWorkForOneNode(DatanodeInfo dn) {
final List<Block> toInvalidate;
namesystem.writeLock();
try {
// blocks should not be replicated or removed if safe mode is on
if (namesystem.isInSafeMode()) {
LOG.debug("In safemode, not computing replication work");
return 0;
}
try {
// 从InvalidateBlocks.node2blocks中拿到dn对应的block副本
toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));

if (toInvalidate == null) {
return 0;
}
} catch(UnregisteredNodeException une) {
return 0;
}
} finally {
namesystem.writeUnlock();
}
if (NameNode.stateChangeLog.isInfoEnabled()) {
NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+ ": ask " + dn + " to delete " + toInvalidate);
}
return toInvalidate.size();
}

invalidateWork代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// InvalidateBlocks.class
synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
final long delay = getInvalidationDelay();
if (delay > 0) {
if (BlockManager.LOG.isDebugEnabled()) {
BlockManager.LOG
.debug("Block deletion is delayed during NameNode startup. "
+ "The deletion will start after " + delay + " ms.");
}
return null;
}
final LightWeightHashSet<Block> set = node2blocks.get(dn);
if (set == null) {
return null;
}

// # blocks that can be sent in one message is limited
final int limit = blockInvalidateLimit;
final List<Block> toInvalidate = set.pollN(limit);

// If we send everything in this message, remove this node entry
if (set.isEmpty()) {
remove(dn);
}
// 将要删除的block放入dn中
dn.addBlocksToBeInvalidated(toInvalidate);
numBlocks -= toInvalidate.size();
return toInvalidate;
}

删除应该是在dn上被删除,通过dn.addBlocksToBeInvalidated将副本放入DatanodeDescriptor.invalidateBlocks中。

还是没有找到具体的删除操作,复制是等待调度,删除呢?只返回要删除的block个数?具体的删除操作应该在dn上,代码还没有去跟踪,有时间再跟踪验证想法