001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.balancer; 020 021import edu.umd.cs.findbugs.annotations.NonNull; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Comparator; 028import java.util.Deque; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Map.Entry; 034import java.util.NavigableMap; 035import java.util.Random; 036import java.util.Set; 037import java.util.TreeMap; 038import java.util.function.Predicate; 039import java.util.stream.Collectors; 040import org.apache.commons.lang3.NotImplementedException; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.ClusterMetrics; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseIOException; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HDFSBlocksDistribution; 047import org.apache.hadoop.hbase.ServerMetrics; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionReplicaUtil; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.master.LoadBalancer; 054import org.apache.hadoop.hbase.master.MasterServices; 055import org.apache.hadoop.hbase.master.RackManager; 056import org.apache.hadoop.hbase.master.RegionPlan; 057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 058import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 064import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 065import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 066import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 067 068/** 069 * The base class for load balancers. It provides the the functions used to by 070 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions 071 * in the edge cases. It doesn't provide an implementation of the 072 * actual balancing algorithm. 073 * 074 */ 075@InterfaceAudience.Private 076public abstract class BaseLoadBalancer implements LoadBalancer { 077 078 public static final String BALANCER_DECISION_BUFFER_ENABLED = 079 "hbase.master.balancer.decision.buffer.enabled"; 080 public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false; 081 082 protected static final int MIN_SERVER_BALANCE = 2; 083 private volatile boolean stopped = false; 084 085 private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList(); 086 087 static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR 088 = load -> load.getRegionMetrics().isEmpty(); 089 090 protected RegionLocationFinder regionFinder; 091 protected boolean useRegionFinder; 092 protected boolean isByTable = false; 093 094 /** 095 * Use to add balancer decision history to ring-buffer 096 */ 097 protected NamedQueueRecorder namedQueueRecorder; 098 099 private static class DefaultRackManager extends RackManager { 100 @Override 101 public String getRack(ServerName server) { 102 return UNKNOWN_RACK; 103 } 104 } 105 106 /** 107 * The constructor that uses the basic MetricsBalancer 108 */ 109 protected BaseLoadBalancer() { 110 metricsBalancer = new MetricsBalancer(); 111 createRegionFinder(); 112 } 113 114 /** 115 * This Constructor accepts an instance of MetricsBalancer, 116 * which will be used instead of creating a new one 117 */ 118 protected BaseLoadBalancer(MetricsBalancer metricsBalancer) { 119 this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer(); 120 createRegionFinder(); 121 } 122 123 private void createRegionFinder() { 124 useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true); 125 if (useRegionFinder) { 126 regionFinder = new RegionLocationFinder(); 127 } 128 } 129 130 /** 131 * An efficient array based implementation similar to ClusterState for keeping 132 * the status of the cluster in terms of region assignment and distribution. 133 * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of 134 * hundreds of thousands of hashmap manipulations are very costly, which is why this 135 * class uses mostly indexes and arrays. 136 * 137 * Cluster tracks a list of unassigned regions, region assignments, and the server 138 * topology in terms of server names, hostnames and racks. 139 */ 140 protected static class Cluster { 141 ServerName[] servers; 142 String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host 143 String[] racks; 144 boolean multiServersPerHost = false; // whether or not any host has more than one server 145 146 ArrayList<String> tables; 147 RegionInfo[] regions; 148 Deque<BalancerRegionLoad>[] regionLoads; 149 private RegionLocationFinder regionFinder; 150 151 int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality 152 153 int[] serverIndexToHostIndex; //serverIndex -> host index 154 int[] serverIndexToRackIndex; //serverIndex -> rack index 155 156 int[][] regionsPerServer; //serverIndex -> region list 157 int[] serverIndexToRegionsOffset; //serverIndex -> offset of region list 158 int[][] regionsPerHost; //hostIndex -> list of regions 159 int[][] regionsPerRack; //rackIndex -> region list 160 int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index 161 int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index 162 int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index 163 164 int[][] serversPerHost; //hostIndex -> list of server indexes 165 int[][] serversPerRack; //rackIndex -> list of server indexes 166 int[] regionIndexToServerIndex; //regionIndex -> serverIndex 167 int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) 168 int[] regionIndexToTableIndex; //regionIndex -> tableIndex 169 int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions 170 int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS 171 int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary 172 boolean hasRegionReplicas = false; //whether there is regions with replicas 173 174 Integer[] serverIndicesSortedByRegionCount; 175 Integer[] serverIndicesSortedByLocality; 176 177 Map<String, Integer> serversToIndex; 178 Map<String, Integer> hostsToIndex; 179 Map<String, Integer> racksToIndex; 180 Map<String, Integer> tablesToIndex; 181 Map<RegionInfo, Integer> regionsToIndex; 182 float[] localityPerServer; 183 184 int numServers; 185 int numHosts; 186 int numRacks; 187 int numTables; 188 int numRegions; 189 190 int numMovedRegions = 0; //num moved regions from the initial configuration 191 Map<ServerName, List<RegionInfo>> clusterState; 192 193 protected final RackManager rackManager; 194 // Maps region -> rackIndex -> locality of region on rack 195 private float[][] rackLocalities; 196 // Maps localityType -> region -> [server|rack]Index with highest locality 197 private int[][] regionsToMostLocalEntities; 198 199 protected Cluster( 200 Map<ServerName, List<RegionInfo>> clusterState, 201 Map<String, Deque<BalancerRegionLoad>> loads, 202 RegionLocationFinder regionFinder, 203 RackManager rackManager) { 204 this(null, clusterState, loads, regionFinder, rackManager); 205 } 206 207 @SuppressWarnings("unchecked") 208 protected Cluster( 209 Collection<RegionInfo> unassignedRegions, 210 Map<ServerName, List<RegionInfo>> clusterState, 211 Map<String, Deque<BalancerRegionLoad>> loads, 212 RegionLocationFinder regionFinder, 213 RackManager rackManager) { 214 215 if (unassignedRegions == null) { 216 unassignedRegions = EMPTY_REGION_LIST; 217 } 218 219 serversToIndex = new HashMap<>(); 220 hostsToIndex = new HashMap<>(); 221 racksToIndex = new HashMap<>(); 222 tablesToIndex = new HashMap<>(); 223 224 //TODO: We should get the list of tables from master 225 tables = new ArrayList<>(); 226 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 227 228 numRegions = 0; 229 230 List<List<Integer>> serversPerHostList = new ArrayList<>(); 231 List<List<Integer>> serversPerRackList = new ArrayList<>(); 232 this.clusterState = clusterState; 233 this.regionFinder = regionFinder; 234 235 // Use servername and port as there can be dead servers in this list. We want everything with 236 // a matching hostname and port to have the same index. 237 for (ServerName sn : clusterState.keySet()) { 238 if (sn == null) { 239 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + 240 "skipping; unassigned regions?"); 241 if (LOG.isTraceEnabled()) { 242 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 243 } 244 continue; 245 } 246 if (serversToIndex.get(sn.getAddress().toString()) == null) { 247 serversToIndex.put(sn.getHostAndPort(), numServers++); 248 } 249 if (!hostsToIndex.containsKey(sn.getHostname())) { 250 hostsToIndex.put(sn.getHostname(), numHosts++); 251 serversPerHostList.add(new ArrayList<>(1)); 252 } 253 254 int serverIndex = serversToIndex.get(sn.getHostAndPort()); 255 int hostIndex = hostsToIndex.get(sn.getHostname()); 256 serversPerHostList.get(hostIndex).add(serverIndex); 257 258 String rack = this.rackManager.getRack(sn); 259 if (!racksToIndex.containsKey(rack)) { 260 racksToIndex.put(rack, numRacks++); 261 serversPerRackList.add(new ArrayList<>()); 262 } 263 int rackIndex = racksToIndex.get(rack); 264 serversPerRackList.get(rackIndex).add(serverIndex); 265 } 266 267 // Count how many regions there are. 268 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 269 numRegions += entry.getValue().size(); 270 } 271 numRegions += unassignedRegions.size(); 272 273 regionsToIndex = new HashMap<>(numRegions); 274 servers = new ServerName[numServers]; 275 serversPerHost = new int[numHosts][]; 276 serversPerRack = new int[numRacks][]; 277 regions = new RegionInfo[numRegions]; 278 regionIndexToServerIndex = new int[numRegions]; 279 initialRegionIndexToServerIndex = new int[numRegions]; 280 regionIndexToTableIndex = new int[numRegions]; 281 regionIndexToPrimaryIndex = new int[numRegions]; 282 regionLoads = new Deque[numRegions]; 283 284 regionLocations = new int[numRegions][]; 285 serverIndicesSortedByRegionCount = new Integer[numServers]; 286 serverIndicesSortedByLocality = new Integer[numServers]; 287 localityPerServer = new float[numServers]; 288 289 serverIndexToHostIndex = new int[numServers]; 290 serverIndexToRackIndex = new int[numServers]; 291 regionsPerServer = new int[numServers][]; 292 serverIndexToRegionsOffset = new int[numServers]; 293 regionsPerHost = new int[numHosts][]; 294 regionsPerRack = new int[numRacks][]; 295 primariesOfRegionsPerServer = new int[numServers][]; 296 primariesOfRegionsPerHost = new int[numHosts][]; 297 primariesOfRegionsPerRack = new int[numRacks][]; 298 299 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; 300 301 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 302 if (entry.getKey() == null) { 303 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 304 continue; 305 } 306 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 307 308 // keep the servername if this is the first server name for this hostname 309 // or this servername has the newest startcode. 310 if (servers[serverIndex] == null || 311 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { 312 servers[serverIndex] = entry.getKey(); 313 } 314 315 if (regionsPerServer[serverIndex] != null) { 316 // there is another server with the same hostAndPort in ClusterState. 317 // allocate the array for the total size 318 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 319 } else { 320 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 321 } 322 primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; 323 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 324 serverIndicesSortedByLocality[serverIndex] = serverIndex; 325 } 326 327 hosts = new String[numHosts]; 328 for (Entry<String, Integer> entry : hostsToIndex.entrySet()) { 329 hosts[entry.getValue()] = entry.getKey(); 330 } 331 racks = new String[numRacks]; 332 for (Entry<String, Integer> entry : racksToIndex.entrySet()) { 333 racks[entry.getValue()] = entry.getKey(); 334 } 335 336 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 337 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 338 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 339 340 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 341 serverIndexToHostIndex[serverIndex] = hostIndex; 342 343 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 344 serverIndexToRackIndex[serverIndex] = rackIndex; 345 346 for (RegionInfo region : entry.getValue()) { 347 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 348 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 349 regionIndex++; 350 } 351 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 352 } 353 354 for (RegionInfo region : unassignedRegions) { 355 registerRegion(region, regionIndex, -1, loads, regionFinder); 356 regionIndex++; 357 } 358 359 for (int i = 0; i < serversPerHostList.size(); i++) { 360 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 361 for (int j = 0; j < serversPerHost[i].length; j++) { 362 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 363 } 364 if (serversPerHost[i].length > 1) { 365 multiServersPerHost = true; 366 } 367 } 368 369 for (int i = 0; i < serversPerRackList.size(); i++) { 370 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 371 for (int j = 0; j < serversPerRack[i].length; j++) { 372 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 373 } 374 } 375 376 numTables = tables.size(); 377 numRegionsPerServerPerTable = new int[numServers][numTables]; 378 379 for (int i = 0; i < numServers; i++) { 380 for (int j = 0; j < numTables; j++) { 381 numRegionsPerServerPerTable[i][j] = 0; 382 } 383 } 384 385 for (int i=0; i < regionIndexToServerIndex.length; i++) { 386 if (regionIndexToServerIndex[i] >= 0) { 387 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; 388 } 389 } 390 391 numMaxRegionsPerTable = new int[numTables]; 392 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 393 for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { 394 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 395 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 396 } 397 } 398 } 399 400 for (int i = 0; i < regions.length; i ++) { 401 RegionInfo info = regions[i]; 402 if (RegionReplicaUtil.isDefaultReplica(info)) { 403 regionIndexToPrimaryIndex[i] = i; 404 } else { 405 hasRegionReplicas = true; 406 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 407 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 408 } 409 } 410 411 for (int i = 0; i < regionsPerServer.length; i++) { 412 primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; 413 for (int j = 0; j < regionsPerServer[i].length; j++) { 414 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 415 primariesOfRegionsPerServer[i][j] = primaryIndex; 416 } 417 // sort the regions by primaries. 418 Arrays.sort(primariesOfRegionsPerServer[i]); 419 } 420 421 // compute regionsPerHost 422 if (multiServersPerHost) { 423 for (int i = 0 ; i < serversPerHost.length; i++) { 424 int numRegionsPerHost = 0; 425 for (int j = 0; j < serversPerHost[i].length; j++) { 426 numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; 427 } 428 regionsPerHost[i] = new int[numRegionsPerHost]; 429 primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; 430 } 431 for (int i = 0 ; i < serversPerHost.length; i++) { 432 int numRegionPerHostIndex = 0; 433 for (int j = 0; j < serversPerHost[i].length; j++) { 434 for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { 435 int region = regionsPerServer[serversPerHost[i][j]][k]; 436 regionsPerHost[i][numRegionPerHostIndex] = region; 437 int primaryIndex = regionIndexToPrimaryIndex[region]; 438 primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; 439 numRegionPerHostIndex++; 440 } 441 } 442 // sort the regions by primaries. 443 Arrays.sort(primariesOfRegionsPerHost[i]); 444 } 445 } 446 447 // compute regionsPerRack 448 if (numRacks > 1) { 449 for (int i = 0 ; i < serversPerRack.length; i++) { 450 int numRegionsPerRack = 0; 451 for (int j = 0; j < serversPerRack[i].length; j++) { 452 numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; 453 } 454 regionsPerRack[i] = new int[numRegionsPerRack]; 455 primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; 456 } 457 458 for (int i = 0 ; i < serversPerRack.length; i++) { 459 int numRegionPerRackIndex = 0; 460 for (int j = 0; j < serversPerRack[i].length; j++) { 461 for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { 462 int region = regionsPerServer[serversPerRack[i][j]][k]; 463 regionsPerRack[i][numRegionPerRackIndex] = region; 464 int primaryIndex = regionIndexToPrimaryIndex[region]; 465 primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; 466 numRegionPerRackIndex++; 467 } 468 } 469 // sort the regions by primaries. 470 Arrays.sort(primariesOfRegionsPerRack[i]); 471 } 472 } 473 } 474 475 /** Helper for Cluster constructor to handle a region */ 476 private void registerRegion(RegionInfo region, int regionIndex, 477 int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, 478 RegionLocationFinder regionFinder) { 479 String tableName = region.getTable().getNameAsString(); 480 if (!tablesToIndex.containsKey(tableName)) { 481 tables.add(tableName); 482 tablesToIndex.put(tableName, tablesToIndex.size()); 483 } 484 int tableIndex = tablesToIndex.get(tableName); 485 486 regionsToIndex.put(region, regionIndex); 487 regions[regionIndex] = region; 488 regionIndexToServerIndex[regionIndex] = serverIndex; 489 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 490 regionIndexToTableIndex[regionIndex] = tableIndex; 491 492 // region load 493 if (loads != null) { 494 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 495 // That could have failed if the RegionLoad is using the other regionName 496 if (rl == null) { 497 // Try getting the region load using encoded name. 498 rl = loads.get(region.getEncodedName()); 499 } 500 regionLoads[regionIndex] = rl; 501 } 502 503 if (regionFinder != null) { 504 // region location 505 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 506 regionLocations[regionIndex] = new int[loc.size()]; 507 for (int i = 0; i < loc.size(); i++) { 508 regionLocations[regionIndex][i] = loc.get(i) == null ? -1 509 : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 510 : serversToIndex.get(loc.get(i).getHostAndPort())); 511 } 512 } 513 } 514 515 /** 516 * Returns true iff a given server has less regions than the balanced amount 517 */ 518 public boolean serverHasTooFewRegions(int server) { 519 int minLoad = this.numRegions / numServers; 520 int numRegions = getNumRegions(server); 521 return numRegions < minLoad; 522 } 523 524 /** 525 * Retrieves and lazily initializes a field storing the locality of 526 * every region/server combination 527 */ 528 public float[][] getOrComputeRackLocalities() { 529 if (rackLocalities == null || regionsToMostLocalEntities == null) { 530 computeCachedLocalities(); 531 } 532 return rackLocalities; 533 } 534 535 /** 536 * Lazily initializes and retrieves a mapping of region -> server for which region has 537 * the highest the locality 538 */ 539 public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { 540 if (rackLocalities == null || regionsToMostLocalEntities == null) { 541 computeCachedLocalities(); 542 } 543 return regionsToMostLocalEntities[type.ordinal()]; 544 } 545 546 /** 547 * Looks up locality from cache of localities. Will create cache if it does 548 * not already exist. 549 */ 550 public float getOrComputeLocality(int region, int entity, LocalityType type) { 551 switch (type) { 552 case SERVER: 553 return getLocalityOfRegion(region, entity); 554 case RACK: 555 return getOrComputeRackLocalities()[region][entity]; 556 default: 557 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 558 } 559 } 560 561 /** 562 * Returns locality weighted by region size in MB. Will create locality cache 563 * if it does not already exist. 564 */ 565 public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { 566 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 567 } 568 569 /** 570 * Returns the size in MB from the most recent RegionLoad for region 571 */ 572 public int getRegionSizeMB(int region) { 573 Deque<BalancerRegionLoad> load = regionLoads[region]; 574 // This means regions have no actual data on disk 575 if (load == null) { 576 return 0; 577 } 578 return regionLoads[region].getLast().getStorefileSizeMB(); 579 } 580 581 /** 582 * Computes and caches the locality for each region/rack combinations, 583 * as well as storing a mapping of region -> server and region -> rack such that server 584 * and rack have the highest locality for region 585 */ 586 private void computeCachedLocalities() { 587 rackLocalities = new float[numRegions][numRacks]; 588 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 589 590 // Compute localities and find most local server per region 591 for (int region = 0; region < numRegions; region++) { 592 int serverWithBestLocality = 0; 593 float bestLocalityForRegion = 0; 594 for (int server = 0; server < numServers; server++) { 595 // Aggregate per-rack locality 596 float locality = getLocalityOfRegion(region, server); 597 int rack = serverIndexToRackIndex[server]; 598 int numServersInRack = serversPerRack[rack].length; 599 rackLocalities[region][rack] += locality / numServersInRack; 600 601 if (locality > bestLocalityForRegion) { 602 serverWithBestLocality = server; 603 bestLocalityForRegion = locality; 604 } 605 } 606 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 607 608 // Find most local rack per region 609 int rackWithBestLocality = 0; 610 float bestRackLocalityForRegion = 0.0f; 611 for (int rack = 0; rack < numRacks; rack++) { 612 float rackLocality = rackLocalities[region][rack]; 613 if (rackLocality > bestRackLocalityForRegion) { 614 bestRackLocalityForRegion = rackLocality; 615 rackWithBestLocality = rack; 616 } 617 } 618 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 619 } 620 621 } 622 623 /** 624 * Maps region index to rack index 625 */ 626 public int getRackForRegion(int region) { 627 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 628 } 629 630 enum LocalityType { 631 SERVER, 632 RACK 633 } 634 635 /** An action to move or swap a region */ 636 public static class Action { 637 public enum Type { 638 ASSIGN_REGION, 639 MOVE_REGION, 640 SWAP_REGIONS, 641 NULL, 642 } 643 644 public Type type; 645 public Action (Type type) {this.type = type;} 646 /** Returns an Action which would undo this action */ 647 public Action undoAction() { return this; } 648 @Override 649 public String toString() { return type + ":";} 650 } 651 652 public static class AssignRegionAction extends Action { 653 public int region; 654 public int server; 655 public AssignRegionAction(int region, int server) { 656 super(Type.ASSIGN_REGION); 657 this.region = region; 658 this.server = server; 659 } 660 @Override 661 public Action undoAction() { 662 // TODO implement this. This action is not being used by the StochasticLB for now 663 // in case it uses it, we should implement this function. 664 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 665 } 666 @Override 667 public String toString() { 668 return type + ": " + region + ":" + server; 669 } 670 } 671 672 public static class MoveRegionAction extends Action { 673 public int region; 674 public int fromServer; 675 public int toServer; 676 677 public MoveRegionAction(int region, int fromServer, int toServer) { 678 super(Type.MOVE_REGION); 679 this.fromServer = fromServer; 680 this.region = region; 681 this.toServer = toServer; 682 } 683 @Override 684 public Action undoAction() { 685 return new MoveRegionAction (region, toServer, fromServer); 686 } 687 @Override 688 public String toString() { 689 return type + ": " + region + ":" + fromServer + " -> " + toServer; 690 } 691 } 692 693 public static class SwapRegionsAction extends Action { 694 public int fromServer; 695 public int fromRegion; 696 public int toServer; 697 public int toRegion; 698 public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { 699 super(Type.SWAP_REGIONS); 700 this.fromServer = fromServer; 701 this.fromRegion = fromRegion; 702 this.toServer = toServer; 703 this.toRegion = toRegion; 704 } 705 @Override 706 public Action undoAction() { 707 return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); 708 } 709 @Override 710 public String toString() { 711 return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; 712 } 713 } 714 715 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", 716 justification="Mistake. Too disruptive to change now") 717 public static final Action NullAction = new Action(Type.NULL); 718 719 public void doAction(Action action) { 720 switch (action.type) { 721 case NULL: break; 722 case ASSIGN_REGION: 723 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 724 assert action instanceof AssignRegionAction: action.getClass(); 725 AssignRegionAction ar = (AssignRegionAction) action; 726 regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); 727 regionMoved(ar.region, -1, ar.server); 728 break; 729 case MOVE_REGION: 730 assert action instanceof MoveRegionAction: action.getClass(); 731 MoveRegionAction mra = (MoveRegionAction) action; 732 regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); 733 regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); 734 regionMoved(mra.region, mra.fromServer, mra.toServer); 735 break; 736 case SWAP_REGIONS: 737 assert action instanceof SwapRegionsAction: action.getClass(); 738 SwapRegionsAction a = (SwapRegionsAction) action; 739 regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); 740 regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); 741 regionMoved(a.fromRegion, a.fromServer, a.toServer); 742 regionMoved(a.toRegion, a.toServer, a.fromServer); 743 break; 744 default: 745 throw new RuntimeException("Uknown action:" + action.type); 746 } 747 } 748 749 /** 750 * Return true if the placement of region on server would lower the availability 751 * of the region in question 752 * @return true or false 753 */ 754 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 755 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 756 return false; // safeguard against race between cluster.servers and servers from LB method args 757 } 758 int server = serversToIndex.get(serverName.getHostAndPort()); 759 int region = regionsToIndex.get(regionInfo); 760 761 // Region replicas for same region should better assign to different servers 762 for (int i : regionsPerServer[server]) { 763 RegionInfo otherRegionInfo = regions[i]; 764 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 765 return true; 766 } 767 } 768 769 int primary = regionIndexToPrimaryIndex[region]; 770 if (primary == -1) { 771 return false; 772 } 773 // there is a subset relation for server < host < rack 774 // check server first 775 if (contains(primariesOfRegionsPerServer[server], primary)) { 776 // check for whether there are other servers that we can place this region 777 for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { 778 if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { 779 return true; // meaning there is a better server 780 } 781 } 782 return false; // there is not a better server to place this 783 } 784 785 // check host 786 if (multiServersPerHost) { 787 // these arrays would only be allocated if we have more than one server per host 788 int host = serverIndexToHostIndex[server]; 789 if (contains(primariesOfRegionsPerHost[host], primary)) { 790 // check for whether there are other hosts that we can place this region 791 for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { 792 if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { 793 return true; // meaning there is a better host 794 } 795 } 796 return false; // there is not a better host to place this 797 } 798 } 799 800 // check rack 801 if (numRacks > 1) { 802 int rack = serverIndexToRackIndex[server]; 803 if (contains(primariesOfRegionsPerRack[rack], primary)) { 804 // check for whether there are other racks that we can place this region 805 for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { 806 if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { 807 return true; // meaning there is a better rack 808 } 809 } 810 return false; // there is not a better rack to place this 811 } 812 } 813 814 return false; 815 } 816 817 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 818 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 819 return; 820 } 821 int server = serversToIndex.get(serverName.getHostAndPort()); 822 int region = regionsToIndex.get(regionInfo); 823 doAction(new AssignRegionAction(region, server)); 824 } 825 826 void regionMoved(int region, int oldServer, int newServer) { 827 regionIndexToServerIndex[region] = newServer; 828 if (initialRegionIndexToServerIndex[region] == newServer) { 829 numMovedRegions--; //region moved back to original location 830 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 831 numMovedRegions++; //region moved from original location 832 } 833 int tableIndex = regionIndexToTableIndex[region]; 834 if (oldServer >= 0) { 835 numRegionsPerServerPerTable[oldServer][tableIndex]--; 836 } 837 numRegionsPerServerPerTable[newServer][tableIndex]++; 838 839 //check whether this caused maxRegionsPerTable in the new Server to be updated 840 if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { 841 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; 842 } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) 843 == numMaxRegionsPerTable[tableIndex]) { 844 //recompute maxRegionsPerTable since the previous value was coming from the old server 845 numMaxRegionsPerTable[tableIndex] = 0; 846 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 847 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 848 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 849 } 850 } 851 } 852 853 // update for servers 854 int primary = regionIndexToPrimaryIndex[region]; 855 if (oldServer >= 0) { 856 primariesOfRegionsPerServer[oldServer] = removeRegion( 857 primariesOfRegionsPerServer[oldServer], primary); 858 } 859 primariesOfRegionsPerServer[newServer] = addRegionSorted( 860 primariesOfRegionsPerServer[newServer], primary); 861 862 // update for hosts 863 if (multiServersPerHost) { 864 int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; 865 int newHost = serverIndexToHostIndex[newServer]; 866 if (newHost != oldHost) { 867 regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); 868 primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); 869 if (oldHost >= 0) { 870 regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); 871 primariesOfRegionsPerHost[oldHost] = removeRegion( 872 primariesOfRegionsPerHost[oldHost], primary); // will still be sorted 873 } 874 } 875 } 876 877 // update for racks 878 if (numRacks > 1) { 879 int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; 880 int newRack = serverIndexToRackIndex[newServer]; 881 if (newRack != oldRack) { 882 regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); 883 primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); 884 if (oldRack >= 0) { 885 regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); 886 primariesOfRegionsPerRack[oldRack] = removeRegion( 887 primariesOfRegionsPerRack[oldRack], primary); // will still be sorted 888 } 889 } 890 } 891 } 892 893 int[] removeRegion(int[] regions, int regionIndex) { 894 //TODO: this maybe costly. Consider using linked lists 895 int[] newRegions = new int[regions.length - 1]; 896 int i = 0; 897 for (i = 0; i < regions.length; i++) { 898 if (regions[i] == regionIndex) { 899 break; 900 } 901 newRegions[i] = regions[i]; 902 } 903 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); 904 return newRegions; 905 } 906 907 int[] addRegion(int[] regions, int regionIndex) { 908 int[] newRegions = new int[regions.length + 1]; 909 System.arraycopy(regions, 0, newRegions, 0, regions.length); 910 newRegions[newRegions.length - 1] = regionIndex; 911 return newRegions; 912 } 913 914 int[] addRegionSorted(int[] regions, int regionIndex) { 915 int[] newRegions = new int[regions.length + 1]; 916 int i = 0; 917 for (i = 0; i < regions.length; i++) { // find the index to insert 918 if (regions[i] > regionIndex) { 919 break; 920 } 921 } 922 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 923 System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half 924 newRegions[i] = regionIndex; 925 926 return newRegions; 927 } 928 929 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 930 int i = 0; 931 for (i = 0; i < regions.length; i++) { 932 if (regions[i] == regionIndex) { 933 regions[i] = newRegionIndex; 934 break; 935 } 936 } 937 return regions; 938 } 939 940 void sortServersByRegionCount() { 941 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 942 } 943 944 int getNumRegions(int server) { 945 return regionsPerServer[server].length; 946 } 947 948 boolean contains(int[] arr, int val) { 949 return Arrays.binarySearch(arr, val) >= 0; 950 } 951 952 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 953 954 int getLowestLocalityRegionOnServer(int serverIndex) { 955 if (regionFinder != null) { 956 float lowestLocality = 1.0f; 957 int lowestLocalityRegionIndex = -1; 958 if (regionsPerServer[serverIndex].length == 0) { 959 // No regions on that region server 960 return -1; 961 } 962 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 963 int regionIndex = regionsPerServer[serverIndex][j]; 964 HDFSBlocksDistribution distribution = regionFinder 965 .getBlockDistribution(regions[regionIndex]); 966 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 967 // skip empty region 968 if (distribution.getUniqueBlocksTotalWeight() == 0) { 969 continue; 970 } 971 if (locality < lowestLocality) { 972 lowestLocality = locality; 973 lowestLocalityRegionIndex = j; 974 } 975 } 976 if (lowestLocalityRegionIndex == -1) { 977 return -1; 978 } 979 if (LOG.isTraceEnabled()) { 980 LOG.trace("Lowest locality region is " 981 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 982 .getRegionNameAsString() + " with locality " + lowestLocality 983 + " and its region server contains " + regionsPerServer[serverIndex].length 984 + " regions"); 985 } 986 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 987 } else { 988 return -1; 989 } 990 } 991 992 float getLocalityOfRegion(int region, int server) { 993 if (regionFinder != null) { 994 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 995 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 996 } else { 997 return 0f; 998 } 999 } 1000 1001 protected void setNumRegions(int numRegions) { 1002 this.numRegions = numRegions; 1003 } 1004 1005 protected void setNumMovedRegions(int numMovedRegions) { 1006 this.numMovedRegions = numMovedRegions; 1007 } 1008 1009 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", 1010 justification="Not important but should be fixed") 1011 @Override 1012 public String toString() { 1013 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1014 for(ServerName sn:servers) { 1015 desc.append(sn.getHostAndPort()).append(", "); 1016 } 1017 desc.append("], serverIndicesSortedByRegionCount=") 1018 .append(Arrays.toString(serverIndicesSortedByRegionCount)) 1019 .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); 1020 1021 desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) 1022 .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1023 .append(", numTables=").append(numTables).append(", numMovedRegions=") 1024 .append(numMovedRegions).append('}'); 1025 return desc.toString(); 1026 } 1027 } 1028 1029 // slop for regions 1030 protected float slop; 1031 // overallSlop to control simpleLoadBalancer's cluster level threshold 1032 protected float overallSlop; 1033 protected Configuration config = HBaseConfiguration.create(); 1034 protected RackManager rackManager; 1035 private static final Random RANDOM = new Random(System.currentTimeMillis()); 1036 private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); 1037 protected MetricsBalancer metricsBalancer = null; 1038 protected ClusterMetrics clusterStatus = null; 1039 protected ServerName masterServerName; 1040 protected MasterServices services; 1041 1042 /** 1043 * @deprecated since 2.4.0, will be removed in 3.0.0. 1044 * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a> 1045 */ 1046 @Deprecated 1047 protected boolean onlySystemTablesOnMaster; 1048 1049 protected boolean maintenanceMode; 1050 1051 @Override 1052 public void setConf(Configuration conf) { 1053 this.config = conf; 1054 setSlop(conf); 1055 if (slop < 0) slop = 0; 1056 else if (slop > 1) slop = 1; 1057 1058 if (overallSlop < 0) overallSlop = 0; 1059 else if (overallSlop > 1) overallSlop = 1; 1060 1061 this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config); 1062 1063 this.rackManager = new RackManager(getConf()); 1064 if (useRegionFinder) { 1065 regionFinder.setConf(conf); 1066 } 1067 this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); 1068 // Print out base configs. Don't print overallSlop since it for simple balancer exclusively. 1069 LOG.info("slop={}, systemTablesOnMaster={}", 1070 this.slop, this.onlySystemTablesOnMaster); 1071 } 1072 1073 protected void setSlop(Configuration conf) { 1074 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); 1075 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 1076 } 1077 1078 /** 1079 * Check if a region belongs to some system table. 1080 * If so, the primary replica may be expected to be put on the master regionserver. 1081 * 1082 * @deprecated since 2.4.0, will be removed in 3.0.0. 1083 * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a> 1084 */ 1085 @Deprecated 1086 public boolean shouldBeOnMaster(RegionInfo region) { 1087 return (this.maintenanceMode || this.onlySystemTablesOnMaster) 1088 && region.getTable().isSystemTable(); 1089 } 1090 1091 /** 1092 * Balance the regions that should be on master regionserver. 1093 * 1094 * @deprecated since 2.4.0, will be removed in 3.0.0. 1095 * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a> 1096 */ 1097 @Deprecated 1098 protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) { 1099 if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null; 1100 List<RegionPlan> plans = null; 1101 List<RegionInfo> regions = clusterMap.get(masterServerName); 1102 if (regions != null) { 1103 Iterator<ServerName> keyIt = null; 1104 for (RegionInfo region: regions) { 1105 if (shouldBeOnMaster(region)) continue; 1106 1107 // Find a non-master regionserver to host the region 1108 if (keyIt == null || !keyIt.hasNext()) { 1109 keyIt = clusterMap.keySet().iterator(); 1110 } 1111 ServerName dest = keyIt.next(); 1112 if (masterServerName.equals(dest)) { 1113 if (!keyIt.hasNext()) { 1114 keyIt = clusterMap.keySet().iterator(); 1115 } 1116 dest = keyIt.next(); 1117 } 1118 1119 // Move this region away from the master regionserver 1120 RegionPlan plan = new RegionPlan(region, masterServerName, dest); 1121 if (plans == null) { 1122 plans = new ArrayList<>(); 1123 } 1124 plans.add(plan); 1125 } 1126 } 1127 for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) { 1128 if (masterServerName.equals(server.getKey())) continue; 1129 for (RegionInfo region: server.getValue()) { 1130 if (!shouldBeOnMaster(region)) continue; 1131 1132 // Move this region to the master regionserver 1133 RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName); 1134 if (plans == null) { 1135 plans = new ArrayList<>(); 1136 } 1137 plans.add(plan); 1138 } 1139 } 1140 return plans; 1141 } 1142 1143 /** 1144 * If master is configured to carry system tables only, in here is 1145 * where we figure what to assign it. 1146 * 1147 * @deprecated since 2.4.0, will be removed in 3.0.0. 1148 * @see <a href="https://issues.apache.org/jira/browse/HBASE-15549">HBASE-15549</a> 1149 */ 1150 @Deprecated 1151 @NonNull 1152 protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions( 1153 Collection<RegionInfo> regions, List<ServerName> servers) { 1154 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 1155 if (this.maintenanceMode || this.onlySystemTablesOnMaster) { 1156 if (masterServerName != null && servers.contains(masterServerName)) { 1157 assignments.put(masterServerName, new ArrayList<>()); 1158 for (RegionInfo region : regions) { 1159 if (shouldBeOnMaster(region)) { 1160 assignments.get(masterServerName).add(region); 1161 } 1162 } 1163 } 1164 } 1165 return assignments; 1166 } 1167 1168 @Override 1169 public Configuration getConf() { 1170 return this.config; 1171 } 1172 1173 @Override 1174 public synchronized void setClusterMetrics(ClusterMetrics st) { 1175 this.clusterStatus = st; 1176 if (useRegionFinder) { 1177 regionFinder.setClusterMetrics(st); 1178 } 1179 } 1180 1181 1182 @Override 1183 public void setMasterServices(MasterServices masterServices) { 1184 masterServerName = masterServices.getServerName(); 1185 this.services = masterServices; 1186 if (useRegionFinder) { 1187 this.regionFinder.setServices(masterServices); 1188 } 1189 if (this.services.isInMaintenanceMode()) { 1190 this.maintenanceMode = true; 1191 } 1192 } 1193 1194 @Override 1195 public void postMasterStartupInitialize() { 1196 if (services != null && regionFinder != null) { 1197 try { 1198 Set<RegionInfo> regions = 1199 services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet(); 1200 regionFinder.refreshAndWait(regions); 1201 } catch (Exception e) { 1202 LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e); 1203 } 1204 } 1205 } 1206 1207 public void setRackManager(RackManager rackManager) { 1208 this.rackManager = rackManager; 1209 } 1210 1211 protected boolean needsBalance(TableName tableName, Cluster c) { 1212 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 1213 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 1214 if (LOG.isDebugEnabled()) { 1215 LOG.debug("Not running balancer because only " + cs.getNumServers() 1216 + " active regionserver(s)"); 1217 } 1218 return false; 1219 } 1220 if(areSomeRegionReplicasColocated(c)) return true; 1221 if(idleRegionServerExist(c)) { 1222 return true; 1223 } 1224 1225 // Check if we even need to do any load balancing 1226 // HBASE-3681 check sloppiness first 1227 float average = cs.getLoadAverage(); // for logging 1228 int floor = (int) Math.floor(average * (1 - slop)); 1229 int ceiling = (int) Math.ceil(average * (1 + slop)); 1230 if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { 1231 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 1232 if (LOG.isTraceEnabled()) { 1233 // If nothing to balance, then don't say anything unless trace-level logging. 1234 LOG.trace("Skipping load balancing because balanced cluster; " + 1235 "servers=" + cs.getNumServers() + 1236 " regions=" + cs.getNumRegions() + " average=" + average + 1237 " mostloaded=" + serversByLoad.lastKey().getLoad() + 1238 " leastloaded=" + serversByLoad.firstKey().getLoad()); 1239 } 1240 return false; 1241 } 1242 return true; 1243 } 1244 1245 /** 1246 * Subclasses should implement this to return true if the cluster has nodes that hosts 1247 * multiple replicas for the same region, or, if there are multiple racks and the same 1248 * rack hosts replicas of the same region 1249 * @param c Cluster information 1250 * @return whether region replicas are currently co-located 1251 */ 1252 protected boolean areSomeRegionReplicasColocated(Cluster c) { 1253 return false; 1254 } 1255 1256 protected final boolean idleRegionServerExist(Cluster c){ 1257 boolean isServerExistsWithMoreRegions = false; 1258 boolean isServerExistsWithZeroRegions = false; 1259 for (int[] serverList: c.regionsPerServer){ 1260 if (serverList.length > 1) { 1261 isServerExistsWithMoreRegions = true; 1262 } 1263 if (serverList.length == 0) { 1264 isServerExistsWithZeroRegions = true; 1265 } 1266 } 1267 return isServerExistsWithMoreRegions && isServerExistsWithZeroRegions; 1268 } 1269 1270 /** 1271 * Generates a bulk assignment plan to be used on cluster startup using a 1272 * simple round-robin assignment. 1273 * <p> 1274 * Takes a list of all the regions and all the servers in the cluster and 1275 * returns a map of each server to the regions that it should be assigned. 1276 * <p> 1277 * Currently implemented as a round-robin assignment. Same invariant as load 1278 * balancing, all servers holding floor(avg) or ceiling(avg). 1279 * 1280 * TODO: Use block locations from HDFS to place regions with their blocks 1281 * 1282 * @param regions all regions 1283 * @param servers all servers 1284 * @return map of server to the regions it should take, or emptyMap if no 1285 * assignment is possible (ie. no servers) 1286 */ 1287 @Override 1288 @NonNull 1289 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 1290 List<ServerName> servers) throws HBaseIOException { 1291 metricsBalancer.incrMiscInvocations(); 1292 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers); 1293 if (!assignments.isEmpty()) { 1294 servers = new ArrayList<>(servers); 1295 // Guarantee not to put other regions on master 1296 servers.remove(masterServerName); 1297 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1298 if (!masterRegions.isEmpty()) { 1299 regions = new ArrayList<>(regions); 1300 regions.removeAll(masterRegions); 1301 } 1302 } 1303 /** 1304 * only need assign system table 1305 */ 1306 if (this.maintenanceMode || regions.isEmpty()) { 1307 return assignments; 1308 } 1309 1310 int numServers = servers == null ? 0 : servers.size(); 1311 if (numServers == 0) { 1312 LOG.warn("Wanted to do round robin assignment but no servers to assign to"); 1313 return Collections.emptyMap(); 1314 } 1315 1316 // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the 1317 // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate 1318 // generator for AssignRegionAction. The LB will ensure the regions are mostly local 1319 // and balanced. This should also run fast with fewer number of iterations. 1320 1321 if (numServers == 1) { // Only one server, nothing fancy we can do here 1322 ServerName server = servers.get(0); 1323 assignments.put(server, new ArrayList<>(regions)); 1324 return assignments; 1325 } 1326 1327 Cluster cluster = createCluster(servers, regions); 1328 roundRobinAssignment(cluster, regions, servers, assignments); 1329 return assignments; 1330 } 1331 1332 protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) 1333 throws HBaseIOException { 1334 boolean hasRegionReplica = false; 1335 try { 1336 if (services != null && services.getTableDescriptors() != null) { 1337 Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll(); 1338 for (RegionInfo regionInfo : regions) { 1339 TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString()); 1340 if (td != null && td.getRegionReplication() > 1) { 1341 hasRegionReplica = true; 1342 break; 1343 } 1344 } 1345 } 1346 } catch (IOException ioe) { 1347 throw new HBaseIOException(ioe); 1348 } 1349 1350 // Get the snapshot of the current assignments for the regions in question, and then create 1351 // a cluster out of it. Note that we might have replicas already assigned to some servers 1352 // earlier. So we want to get the snapshot to see those assignments, but this will only contain 1353 // replicas of the regions that are passed (for performance). 1354 Map<ServerName, List<RegionInfo>> clusterState = null; 1355 if (!hasRegionReplica) { 1356 clusterState = getRegionAssignmentsByServer(regions); 1357 } else { 1358 // for the case where we have region replica it is better we get the entire cluster's snapshot 1359 clusterState = getRegionAssignmentsByServer(null); 1360 } 1361 1362 for (ServerName server : servers) { 1363 if (!clusterState.containsKey(server)) { 1364 clusterState.put(server, EMPTY_REGION_LIST); 1365 } 1366 } 1367 return new Cluster(regions, clusterState, null, this.regionFinder, 1368 rackManager); 1369 } 1370 1371 private List<ServerName> findIdleServers(List<ServerName> servers) { 1372 return this.services.getServerManager() 1373 .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR); 1374 } 1375 1376 /** 1377 * Used to assign a single region to a random server. 1378 */ 1379 @Override 1380 public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers) 1381 throws HBaseIOException { 1382 metricsBalancer.incrMiscInvocations(); 1383 if (servers != null && servers.contains(masterServerName)) { 1384 if (shouldBeOnMaster(regionInfo)) { 1385 return masterServerName; 1386 } 1387 if (!LoadBalancer.isTablesOnMaster(getConf())) { 1388 // Guarantee we do not put any regions on master 1389 servers = new ArrayList<>(servers); 1390 servers.remove(masterServerName); 1391 } 1392 } 1393 1394 int numServers = servers == null ? 0 : servers.size(); 1395 if (numServers == 0) { 1396 LOG.warn("Wanted to retain assignment but no servers to assign to"); 1397 return null; 1398 } 1399 if (numServers == 1) { // Only one server, nothing fancy we can do here 1400 return servers.get(0); 1401 } 1402 List<ServerName> idleServers = findIdleServers(servers); 1403 if (idleServers.size() == 1) { 1404 return idleServers.get(0); 1405 } 1406 final List<ServerName> finalServers = idleServers.isEmpty() ? 1407 servers : idleServers; 1408 List<RegionInfo> regions = Lists.newArrayList(regionInfo); 1409 Cluster cluster = createCluster(finalServers, regions); 1410 return randomAssignment(cluster, regionInfo, finalServers); 1411 } 1412 1413 /** 1414 * Generates a bulk assignment startup plan, attempting to reuse the existing 1415 * assignment information from META, but adjusting for the specified list of 1416 * available/online servers available for assignment. 1417 * <p> 1418 * Takes a map of all regions to their existing assignment from META. Also 1419 * takes a list of online servers for regions to be assigned to. Attempts to 1420 * retain all assignment, so in some instances initial assignment will not be 1421 * completely balanced. 1422 * <p> 1423 * Any leftover regions without an existing server to be assigned to will be 1424 * assigned randomly to available servers. 1425 * 1426 * @param regions regions and existing assignment from meta 1427 * @param servers available servers 1428 * @return map of servers and regions to be assigned to them, or emptyMap if no 1429 * assignment is possible (ie. no servers) 1430 */ 1431 @Override 1432 @NonNull 1433 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 1434 List<ServerName> servers) throws HBaseIOException { 1435 // Update metrics 1436 metricsBalancer.incrMiscInvocations(); 1437 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers); 1438 if (!assignments.isEmpty()) { 1439 servers = new ArrayList<>(servers); 1440 // Guarantee not to put other regions on master 1441 servers.remove(masterServerName); 1442 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1443 regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey())) 1444 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); 1445 } 1446 if (this.maintenanceMode || regions.isEmpty()) { 1447 return assignments; 1448 } 1449 1450 int numServers = servers == null ? 0 : servers.size(); 1451 if (numServers == 0) { 1452 LOG.warn("Wanted to do retain assignment but no servers to assign to"); 1453 return Collections.emptyMap(); 1454 } 1455 if (numServers == 1) { // Only one server, nothing fancy we can do here 1456 ServerName server = servers.get(0); 1457 assignments.put(server, new ArrayList<>(regions.keySet())); 1458 return assignments; 1459 } 1460 1461 // Group all of the old assignments by their hostname. 1462 // We can't group directly by ServerName since the servers all have 1463 // new start-codes. 1464 1465 // Group the servers by their hostname. It's possible we have multiple 1466 // servers on the same host on different ports. 1467 ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create(); 1468 for (ServerName server : servers) { 1469 assignments.put(server, new ArrayList<>()); 1470 serversByHostname.put(server.getHostnameLowerCase(), server); 1471 } 1472 1473 // Collection of the hostnames that used to have regions 1474 // assigned, but for which we no longer have any RS running 1475 // after the cluster restart. 1476 Set<String> oldHostsNoLongerPresent = Sets.newTreeSet(); 1477 1478 // If the old servers aren't present, lets assign those regions later. 1479 List<RegionInfo> randomAssignRegions = Lists.newArrayList(); 1480 1481 int numRandomAssignments = 0; 1482 int numRetainedAssigments = 0; 1483 for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) { 1484 RegionInfo region = entry.getKey(); 1485 ServerName oldServerName = entry.getValue(); 1486 List<ServerName> localServers = new ArrayList<>(); 1487 if (oldServerName != null) { 1488 localServers = serversByHostname.get(oldServerName.getHostnameLowerCase()); 1489 } 1490 if (localServers.isEmpty()) { 1491 // No servers on the new cluster match up with this hostname, assign randomly, later. 1492 randomAssignRegions.add(region); 1493 if (oldServerName != null) { 1494 oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase()); 1495 } 1496 } else if (localServers.size() == 1) { 1497 // the usual case - one new server on same host 1498 ServerName target = localServers.get(0); 1499 assignments.get(target).add(region); 1500 numRetainedAssigments++; 1501 } else { 1502 // multiple new servers in the cluster on this same host 1503 if (localServers.contains(oldServerName)) { 1504 assignments.get(oldServerName).add(region); 1505 numRetainedAssigments++; 1506 } else { 1507 ServerName target = null; 1508 for (ServerName tmp : localServers) { 1509 if (tmp.getPort() == oldServerName.getPort()) { 1510 target = tmp; 1511 assignments.get(tmp).add(region); 1512 numRetainedAssigments++; 1513 break; 1514 } 1515 } 1516 if (target == null) { 1517 randomAssignRegions.add(region); 1518 } 1519 } 1520 } 1521 } 1522 1523 // If servers from prior assignment aren't present, then lets do randomAssignment on regions. 1524 if (randomAssignRegions.size() > 0) { 1525 Cluster cluster = createCluster(servers, regions.keySet()); 1526 for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) { 1527 ServerName sn = entry.getKey(); 1528 for (RegionInfo region : entry.getValue()) { 1529 cluster.doAssignRegion(region, sn); 1530 } 1531 } 1532 for (RegionInfo region : randomAssignRegions) { 1533 ServerName target = randomAssignment(cluster, region, servers); 1534 assignments.get(target).add(region); 1535 numRandomAssignments++; 1536 } 1537 } 1538 1539 String randomAssignMsg = ""; 1540 if (numRandomAssignments > 0) { 1541 randomAssignMsg = 1542 numRandomAssignments + " regions were assigned " 1543 + "to random hosts, since the old hosts for these regions are no " 1544 + "longer present in the cluster. These hosts were:\n " 1545 + Joiner.on("\n ").join(oldHostsNoLongerPresent); 1546 } 1547 1548 LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments 1549 + " retained the pre-restart assignment. " + randomAssignMsg); 1550 return assignments; 1551 } 1552 1553 @Override 1554 public void initialize() throws HBaseIOException{ 1555 } 1556 1557 @Override 1558 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 1559 } 1560 1561 @Override 1562 public void regionOffline(RegionInfo regionInfo) { 1563 } 1564 1565 @Override 1566 public boolean isStopped() { 1567 return stopped; 1568 } 1569 1570 @Override 1571 public void stop(String why) { 1572 LOG.info("Load Balancer stop requested: "+why); 1573 stopped = true; 1574 } 1575 1576 /** 1577 * Updates the balancer status tag reported to JMX 1578 */ 1579 public void updateBalancerStatus(boolean status) { 1580 metricsBalancer.balancerStatus(status); 1581 } 1582 1583 /** 1584 * Used to assign a single region to a random server. 1585 */ 1586 private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, 1587 List<ServerName> servers) { 1588 int numServers = servers.size(); // servers is not null, numServers > 1 1589 ServerName sn = null; 1590 final int maxIterations = numServers * 4; 1591 int iterations = 0; 1592 List<ServerName> usedSNs = new ArrayList<>(servers.size()); 1593 do { 1594 int i = RANDOM.nextInt(numServers); 1595 sn = servers.get(i); 1596 if (!usedSNs.contains(sn)) { 1597 usedSNs.add(sn); 1598 } 1599 } while (cluster.wouldLowerAvailability(regionInfo, sn) 1600 && iterations++ < maxIterations); 1601 if (iterations >= maxIterations) { 1602 // We have reached the max. Means the servers that we collected is still lowering the 1603 // availability 1604 for (ServerName unusedServer : servers) { 1605 if (!usedSNs.contains(unusedServer)) { 1606 // check if any other unused server is there for us to use. 1607 // If so use it. Else we have not other go but to go with one of them 1608 if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) { 1609 sn = unusedServer; 1610 break; 1611 } 1612 } 1613 } 1614 } 1615 cluster.doAssignRegion(regionInfo, sn); 1616 return sn; 1617 } 1618 1619 /** 1620 * Round robin a list of regions to a list of servers 1621 */ 1622 private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions, 1623 List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) { 1624 List<RegionInfo> unassignedRegions = new ArrayList<>(); 1625 int numServers = servers.size(); 1626 int numRegions = regions.size(); 1627 int max = (int) Math.ceil((float) numRegions / numServers); 1628 int serverIdx = 0; 1629 if (numServers > 1) { 1630 serverIdx = RANDOM.nextInt(numServers); 1631 } 1632 int regionIdx = 0; 1633 for (int j = 0; j < numServers; j++) { 1634 ServerName server = servers.get((j + serverIdx) % numServers); 1635 List<RegionInfo> serverRegions = new ArrayList<>(max); 1636 for (int i = regionIdx; i < numRegions; i += numServers) { 1637 RegionInfo region = regions.get(i % numRegions); 1638 if (cluster.wouldLowerAvailability(region, server)) { 1639 unassignedRegions.add(region); 1640 } else { 1641 serverRegions.add(region); 1642 cluster.doAssignRegion(region, server); 1643 } 1644 } 1645 assignments.put(server, serverRegions); 1646 regionIdx++; 1647 } 1648 1649 1650 List<RegionInfo> lastFewRegions = new ArrayList<>(); 1651 // assign the remaining by going through the list and try to assign to servers one-by-one 1652 serverIdx = RANDOM.nextInt(numServers); 1653 OUTER : for (RegionInfo region : unassignedRegions) { 1654 boolean assigned = false; 1655 INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one 1656 ServerName server = servers.get((j + serverIdx) % numServers); 1657 if (cluster.wouldLowerAvailability(region, server)) { 1658 continue INNER; 1659 } else { 1660 assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1661 cluster.doAssignRegion(region, server); 1662 serverIdx = (j + serverIdx + 1) % numServers; //remain from next server 1663 assigned = true; 1664 break; 1665 } 1666 } 1667 if (!assigned) { 1668 lastFewRegions.add(region); 1669 } 1670 } 1671 // just sprinkle the rest of the regions on random regionservers. The balanceCluster will 1672 // make it optimal later. we can end up with this if numReplicas > numServers. 1673 for (RegionInfo region : lastFewRegions) { 1674 int i = RANDOM.nextInt(numServers); 1675 ServerName server = servers.get(i); 1676 assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1677 cluster.doAssignRegion(region, server); 1678 } 1679 } 1680 1681 protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer( 1682 Collection<RegionInfo> regions) { 1683 if (this.services != null && this.services.getAssignmentManager() != null) { 1684 return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); 1685 } else { 1686 return new HashMap<>(); 1687 } 1688 } 1689 1690 private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad( 1691 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) { 1692 Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>(); 1693 for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) { 1694 serverNameListMap.forEach((serverName, regionInfoList) -> { 1695 List<RegionInfo> regionInfos = 1696 returnMap.computeIfAbsent(serverName, k -> new ArrayList<>()); 1697 regionInfos.addAll(regionInfoList); 1698 }); 1699 } 1700 return returnMap; 1701 } 1702 1703 @Override 1704 public abstract List<RegionPlan> balanceTable(TableName tableName, 1705 Map<ServerName, List<RegionInfo>> loadOfOneTable); 1706 1707 @Override 1708 public List<RegionPlan> 1709 balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 1710 if (isByTable) { 1711 List<RegionPlan> result = new ArrayList<>(); 1712 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 1713 LOG.info("Start Generate Balance plan for table: " + tableName); 1714 List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable); 1715 if (partialPlans != null) { 1716 result.addAll(partialPlans); 1717 } 1718 }); 1719 return result; 1720 } else { 1721 LOG.info("Start Generate Balance plan for cluster."); 1722 return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable)); 1723 } 1724 } 1725 1726 @Override 1727 public void onConfigurationChange(Configuration conf) { 1728 } 1729}