HDFS上文件的block默认会存放3个副本,一个副本存在一个rack的某个dn上,第二个和第三个副本存放在另一个机架的某两个dn上,nn会维持block的副本平衡,但是当集群上的副本数超过3个时,nn会删除那些节点上的副本来维护副本平衡呢?当集群上的副本数少于3个时,会原则那些节点作为原点进行复制呢?那就查下代码看nn是怎么搞的

HDFS删除多余副本

多余副本的场景

  • rack0上有一个副本,rack1上有两个副本,此时rack0的dn下线,nn维持副本平衡,复制一个副本到其它的rack上,3副本平衡,但是下线的dn又重新上线了,这就出现了4个副本,需要删除一个副本
  • 调用hadoop fs -setrep -R 3相关命令人为修改副本的个数,

删除多余副本

BlockManager主要管理block存储在集群中的相关信息,查看其processOverReplicatedBlock方法,处理多余副本,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 查出有多少个副本
* 如果有多余的副本,则调用chooseExcessReplicates()
* 将多余副本放入excessReplicateMap中
*/
private void processOverReplicatedBlock(final Block block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
...
// block正常副本所在节点的集合
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
// 用于存放该block损坏的副本所在的节点(多余的副本不包含损坏的副本)
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
...
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
.getDatanodeUuid());
if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
nonExcess.add(storage);
}
}
}
}
//
chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement);
}

processOverReplicatedBlock将block的副本所在的节点放在nonExcess集合中,然后调用chooseExcessReplicates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* We want "replication" replicates for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
* srcNodes.size() - dstNodes.size() == replication
*
* 尽量使副本遍布rack,尽量选择剩余空间不足的节点
* 选取规则是首先从不只一个副本的机架上选取剩余空间最小的节点
* So removing such a replica won't remove a rack. ??????什么意思
* 如果没有这样的节点可以选择那就选剩余空间最小的节点
*/
private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
// BlockCollection是该block的所属信息,INodeFile实现此接口
BlockCollection bc = getBlockCollection(b);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
// 这是个什么东西待验证????
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
// rack和dn的映射
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
// 某个机架上副本数超过1个的dn集合
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
// 机架上只有一个副本的dn集合
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();

// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes
// 将nonExcess根据所在机架上副本的个数分为两个集合
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);

// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
final DatanodeStorageInfo delNodeHintStorage
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
final DatanodeStorageInfo addedNodeStorage
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
// 删除多余副本,replication为期望副本数
while (nonExcess.size() - replication > 0) {
final DatanodeStorageInfo cur;
// delNodeHint不为null,则先从delNodeHint中删除
// 只有第一次才会判断
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
moreThanOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(bc, b, replication,
moreThanOne, exactlyOne, excessTypes);
}
firstOne = false;

// adjust rackmap, moreThanOne, and exactlyOne
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
exactlyOne, cur);

nonExcess.remove(cur);
addToExcessReplicate(cur.getDatanodeDescriptor(), b);

//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
addToInvalidates(b, cur.getDatanodeDescriptor());
blockLog.info("BLOCK* chooseExcessReplicates: "
+"("+cur+", "+b+") is added to invalidated blocks set");
}
}

chooseExcessReplicates的处理逻辑是,调用replicator.splitNodesWithRack将nonExcess分为两个集合,然后循环的调用replicator.chooseReplicaToDelete选出要删除副本的节点,放入excessReplicateMap和invalidateBlocks中,等待delete掉。

moreThanOne和exactlyOne的划分规则在splitNodesWithRack中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void splitNodesWithRack(
final Iterable<DatanodeStorageInfo> storages,
final Map<String, List<DatanodeStorageInfo>> rackMap,
final List<DatanodeStorageInfo> moreThanOne,
final List<DatanodeStorageInfo> exactlyOne) {
// 构建rackMap
// rackMap中key是rack name,value是副本的list
for(DatanodeStorageInfo s: storages) {
final String rackName = getRack(s.getDatanodeDescriptor());
List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
if (storageList == null) {
storageList = new ArrayList<DatanodeStorageInfo>();
rackMap.put(rackName, storageList);
}
storageList.add(s);
}

// split nodes into two sets
// 根据value的个数进行划分exactlyOne和moreThanOne
for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
exactlyOne.add(storageList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
moreThanOne.addAll(storageList);
}
}
}

splitNodesWithRack划分结束之后,在chooseExcessReplicates中进行while循环来删除多余的副本,直到达到期望副本个数。

删除节点的选择是在chooseReplicaToDelete中决定的,该方法在BlockPlacementPolicyDefault中被重写。选取规则为

  • 如果moreThanOne不是empty,则先把moreThanOne做为节点集合
  • 首先选择心跳时间间隔最长的节点或者
  • 如果所有的心跳都在允许的间隔之内,则选择剩余空间最少的节点,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// BlockPlacementPolicyDefault.class
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
Block block, short replicationFactor,
Collection<DatanodeStorageInfo> first,
Collection<DatanodeStorageInfo> second,
final List<StorageType> excessTypes) {
// 两次心跳之间允许的最大间隔,为时间点数值
long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
DatanodeStorageInfo oldestHeartbeatStorage = null;
long minSpace = Long.MAX_VALUE;
DatanodeStorageInfo minSpaceStorage = null;
// 如果first集合不为null,则从first集合中选取要删除的节点,否则从second
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
if (!excessTypes.contains(storage.getStorageType())) {
continue;
}

final DatanodeDescriptor node = storage.getDatanodeDescriptor();
long free = node.getRemaining();
long lastHeartbeat = node.getLastUpdate();
// 心跳间隔最长的节点赋值
if(lastHeartbeat < oldestHeartbeat) {
oldestHeartbeat = lastHeartbeat;
oldestHeartbeatStorage = storage;
}
// 对剩余空间最小的节点赋值
if (minSpace > free) {
minSpace = free;
minSpaceStorage = storage;
}
}
// 首先选择心跳时间间隔最长的节点或者
// 如果所有的心跳都在允许的间隔之内,则选择剩余空间最少的节点,
final DatanodeStorageInfo storage;
if (oldestHeartbeatStorage != null) {
storage = oldestHeartbeatStorage;
} else if (minSpaceStorage != null) {
storage = minSpaceStorage;
} else {
return null;
}
excessTypes.remove(storage.getStorageType());
return storage;
}

选出要删除的节点cur之后,调用adjustSetsWithChosenReplica重新调整rackMap、moreThanOne和exactlyOne,并调用addToExcessReplicate将cur放入excessReplicateMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock();
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
if (excessBlocks == null) {
excessBlocks = new LightWeightLinkedSet<Block>();
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
}
if (excessBlocks.add(block)) {
excessBlocksCount.incrementAndGet();
if(blockLog.isDebugEnabled()) {
blockLog.debug("BLOCK* addToExcessReplicate:"
+ " (" + dn + ", " + block
+ ") is added to excessReplicateMap");
}
}
}

最后会将cur放入invalidateBlocks中,随后cur会被删除

1
2
3
4
5
6
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
if (!namesystem.isPopulatingReplQueues()) {
return;
}
invalidateBlocks.add(block, datanode, true);
}

invalidateBlocks在哪被删除(在ReplicationMonitor的中)

HDFS修复缺少副本

当HDFS上某个block的副本数低于期望的副本数时,会调用processMisReplicatedBlock进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Process a single possibly misreplicated block. This adds it to the
* appropriate queues if necessary, and returns a result code indicating
* what happened with it.
*/
private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
BlockCollection bc = block.getBlockCollection();
// bc 为null,则该block没有所属文件信息,是无效的,应该被删除
if (bc == null) {
// block does not belong to any file
addToInvalidates(block);
return MisReplicationResult.INVALID;
}
if (!block.isComplete()) {
// Incomplete blocks are never considered mis-replicated --
// they'll be reached when they are completed or recovered.
return MisReplicationResult.UNDER_CONSTRUCTION;
}
// calculate current replication
short expectedReplication = bc.getBlockReplication();
// 计算该block的副本数,不包含损坏的
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num
.decommissionedReplicas(), expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}

if (numCurrentReplica > expectedReplication) {
if (num.replicasOnStaleNodes() > 0) {
// If any of the replicas of this block are on nodes that are
// considered "stale", then these replicas may in fact have
// already been deleted. So, we cannot safely act on the
// over-replication until a later point in time, when
// the "stale" nodes have block reported.
return MisReplicationResult.POSTPONE;
}

// over-replicated block
// 超出了expectedReplication,则删除多余副本
processOverReplicatedBlock(block, expectedReplication, null, null);
return MisReplicationResult.OVER_REPLICATED;
}

return MisReplicationResult.OK;
}

processMisReplicatedBlock根据该block的信息,将block打上标签并放入不同的队列中进行处理。利用countNodes计算其副本数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Return the number of nodes hosting a given block, grouped
* by the state of those replicas.
*/
public NumberReplicas countNodes(Block b) {
int decommissioned = 0;
int live = 0;
int corrupt = 0;
int excess = 0;
int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
decommissioned++;
} else {
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
.getDatanodeUuid());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
live++;
}
}
if (storage.areBlockContentsStale()) {
stale++;
}
}
return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}

processMisReplicatedBlock中用isNeededReplication判断是否需要进行增加副本数,current < expected || !blockHasEnoughRacks(b)为true时,则调用neededReplications.add进行增加副本,neededReplications是UnderReplicatedBlocks的实例,UnderReplicatedBlocks是HDFS中关于块复制的一个重要数据结构。add方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
synchronized boolean add(Block block,
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
expectedReplicas);
if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
if(NameNode.blockStateChangeLog.isDebugEnabled()) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add:"
+ block
+ " has only " + curReplicas
+ " replicas and need " + expectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + priLevel);
}
return true;
}
return false;
}

add将根据block相关副本的优先级放入under replication queue中,优先级从getPriority中获取,级别有5中,从0开始,0的优先级最高,则4最低。

  1. QUEUE_HIGHEST_PRIORITY = 0
    最高优先级,主要针对数据块副本数非常的、严重的不足的情况,当前副本数低于期望值,且仅有1个或者干脆没有,比如副本数仅有1个,或者副本数干脆为0,但是还存在退役副本,这种情况最危险,数据最容易丢失,所以复制的优先级也最高
  2. QUEUE_VERY_UNDER_REPLICATED = 1
    主要针对数据块副本数不足但没有上面严重的情况,如当前副本数低于期望值,但是副本数大于1其判断公式为当前副本数curReplicas乘以3还小于期望副本数expectedReplicas,这种情况也比较危险,数据也容易丢失,所以复制的优先级也很高
  3. QUEUE_UNDER_REPLICATED = 2
    主要针对数据块副本数低于期望值,但是还不是很严重,也可以理解为正常缺失副本块
  4. QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3
    有足够(大于或者等于正确的副本数)的副本个数,但是并不符合副本的放置策略,副本分布不均衡
  5. QUEUE_WITH_CORRUPT_BLOCKS = 4
    主要针对损坏的数据块的情况,其副本数位0,但是还没有退役副本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private int getPriority(Block block,
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
if (curReplicas >= expectedReplicas) {
// Block has enough copies, but not enough racks
return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
} else if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
return QUEUE_HIGHEST_PRIORITY;
}
//all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) {
//only on replica -risk of loss
// highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas * 3) < expectedReplicas) {
//there is less than a third as many blocks as requested;
//this is considered very under-replicated
return QUEUE_VERY_UNDER_REPLICATED;
} else {
//add to the normal queue for under replicated blocks
return QUEUE_UNDER_REPLICATED;
}
}

通过getPriority得到优先级之后,从priorityQueues list中拿到相同优先级的LightWeightLinkedSet set,将block放入set中。最后在processMisReplicatedBlock方法中返回该block的标记MisReplicationResult.UNDER_REPLICATED

通过上面的代码nn将缺少副本的block根据复制优先级放入不同的queue中,等待复制线程进行复制。

附加:副本默认放置策略

默认的放置策略大家都熟悉,3副本分布在两个rack上,一个rack上有一个副本,另一个rack上有两个副本,这里就只简单的列出代码的部分实现,代码入口是BlockPlacementPolicyDefault.chooseTarget,这里只列主要的逻辑代码chooseTarget:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
private Node chooseTarget(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy,
final EnumSet<StorageType> unavailableStorages,
final boolean newBlock) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return (writer instanceof DatanodeDescriptor) ? writer : null;
}
final int numOfResults = results.size();
final int totalReplicasExpected = numOfReplicas + numOfResults;
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0).getDatanodeDescriptor();
}

// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);

// choose storage types; use fallbacks for unavailable storages
final List<StorageType> requiredStorageTypes = storagePolicy
.chooseStorageTypes((short) totalReplicasExpected,
DatanodeStorageInfo.toStorageTypes(results),
unavailableStorages, newBlock);
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
if (LOG.isTraceEnabled()) {
LOG.trace("storageTypes=" + storageTypes);
}

try {
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
throw new NotEnoughReplicasException(
"All required storage types are unavailable: "
+ " unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy);
}
// 第一个dn的选择,如果client在dn上,则选择此dn,否则随机选dn
if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
}
}
// 从results列表中取出第一个副本所在的dn0
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
// 第二个副本从非dn0的机架上选择一个dn
if (numOfResults <= 1) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
// 从results中取出第二个副本所在的dn1
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
// 如果dn0和dn1在一个机架上,则从另一个机架中选择一个dn
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock){
// 如果是新block,即results.isEmpty。
// dn0和dn1不在同一个机架,则从dn1所在的机架上选一个dn
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else {
// 选择一个同writer同rack的dn
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 超出3个副本的情况,则后面的副本随机选择dn
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected
+ " (unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy
+ ", newBlock=" + newBlock + ")";

if (LOG.isTraceEnabled()) {
LOG.trace(message, e);
} else {
LOG.warn(message + " " + e.getMessage());
}

if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes.

// excludedNodes contains the initial excludedNodes and nodes that were
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock);
}

boolean retry = false;
// simply add all the remaining types into unavailableStorages and give
// another try. No best effort is guaranteed here.
for (StorageType type : storageTypes.keySet()) {
if (!unavailableStorages.contains(type)) {
unavailableStorages.add(type);
retry = true;
}
}
if (retry) {
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
oldExcludedNodes);
}
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock);
}
}
return writer;
}