001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import static org.apache.hadoop.hbase.master.balancer.DistributeReplicasConditional.getReplicaKey; 021 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * CandidateGenerator to distribute colocated replicas across different servers. 033 */ 034@InterfaceAudience.Private 035final class DistributeReplicasCandidateGenerator extends RegionPlanConditionalCandidateGenerator { 036 037 private static final Logger LOG = 038 LoggerFactory.getLogger(DistributeReplicasCandidateGenerator.class); 039 private static final int BATCH_SIZE = 100_000; 040 041 DistributeReplicasCandidateGenerator(BalancerConditionals balancerConditionals) { 042 super(balancerConditionals); 043 } 044 045 @Override 046 BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) { 047 return generateCandidate(cluster, isWeighing, false); 048 } 049 050 BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing, 051 boolean isForced) { 052 if (cluster.getMaxReplicas() < cluster.numRacks) { 053 LOG.trace("Skipping replica distribution as there are not enough racks to distribute them."); 054 return BalanceAction.NULL_ACTION; 055 } 056 057 // Iterate through shuffled servers to find colocated replicas 058 boolean foundColocatedReplicas = false; 059 List<MoveRegionAction> moveRegionActions = new ArrayList<>(); 060 List<Integer> shuffledServerIndices = cluster.getShuffledServerIndices(); 061 for (int sourceIndex : shuffledServerIndices) { 062 if (moveRegionActions.size() >= BATCH_SIZE || cluster.isStopRequested()) { 063 break; 064 } 065 int[] serverRegions = cluster.regionsPerServer[sourceIndex]; 066 Set<ReplicaKey> replicaKeys = new HashSet<>(serverRegions.length); 067 for (int regionIndex : serverRegions) { 068 ReplicaKey replicaKey = getReplicaKey(cluster.regions[regionIndex]); 069 if (replicaKeys.contains(replicaKey)) { 070 foundColocatedReplicas = true; 071 if (isWeighing) { 072 // If weighing, fast exit with an actionable move 073 return getAction(sourceIndex, regionIndex, pickOtherRandomServer(cluster, sourceIndex), 074 -1); 075 } 076 // If not weighing, pick a good move 077 for (int i = 0; i < cluster.numServers; i++) { 078 // Randomize destination ordering so we aren't overloading one destination 079 int destinationIndex = pickOtherRandomServer(cluster, sourceIndex); 080 if (destinationIndex == sourceIndex) { 081 continue; 082 } 083 MoveRegionAction possibleAction = 084 new MoveRegionAction(regionIndex, sourceIndex, destinationIndex); 085 if (isForced) { 086 return possibleAction; 087 } 088 if (willBeAccepted(cluster, possibleAction)) { 089 cluster.doAction(possibleAction); // Update cluster state to reflect move 090 moveRegionActions.add(possibleAction); 091 break; 092 } 093 } 094 } else { 095 replicaKeys.add(replicaKey); 096 } 097 } 098 } 099 100 if (!moveRegionActions.isEmpty()) { 101 return batchMovesAndResetClusterState(cluster, moveRegionActions); 102 } 103 // If no colocated replicas are found, return NULL_ACTION 104 if (foundColocatedReplicas) { 105 LOG.warn("Could not find a place to put a colocated replica! We will force a move."); 106 return generateCandidate(cluster, isWeighing, true); 107 } 108 LOG.trace("No colocated replicas found. No balancing action required."); 109 return BalanceAction.NULL_ACTION; 110 } 111}