001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.balancer; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.Deque; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableMap; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.function.Predicate; 038import java.util.stream.Collectors; 039 040import org.apache.commons.lang3.NotImplementedException; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.ClusterMetrics; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseIOException; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HDFSBlocksDistribution; 047import org.apache.hadoop.hbase.ServerMetrics; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionReplicaUtil; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.master.LoadBalancer; 054import org.apache.hadoop.hbase.master.MasterServices; 055import org.apache.hadoop.hbase.master.RackManager; 056import org.apache.hadoop.hbase.master.RegionPlan; 057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 060import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 061import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 062import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * The base class for load balancers. It provides the the functions used to by 069 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions 070 * in the edge cases. It doesn't provide an implementation of the 071 * actual balancing algorithm. 072 * 073 */ 074@InterfaceAudience.Private 075public abstract class BaseLoadBalancer implements LoadBalancer { 076 protected static final int MIN_SERVER_BALANCE = 2; 077 private volatile boolean stopped = false; 078 079 private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList(); 080 081 static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR 082 = load -> load.getRegionMetrics().isEmpty(); 083 084 protected RegionLocationFinder regionFinder; 085 protected boolean useRegionFinder; 086 protected boolean isByTable = false; 087 088 private static class DefaultRackManager extends RackManager { 089 @Override 090 public String getRack(ServerName server) { 091 return UNKNOWN_RACK; 092 } 093 } 094 095 /** 096 * The constructor that uses the basic MetricsBalancer 097 */ 098 protected BaseLoadBalancer() { 099 metricsBalancer = new MetricsBalancer(); 100 createRegionFinder(); 101 } 102 103 /** 104 * This Constructor accepts an instance of MetricsBalancer, 105 * which will be used instead of creating a new one 106 */ 107 protected BaseLoadBalancer(MetricsBalancer metricsBalancer) { 108 this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer(); 109 createRegionFinder(); 110 } 111 112 private void createRegionFinder() { 113 useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true); 114 if (useRegionFinder) { 115 regionFinder = new RegionLocationFinder(); 116 } 117 } 118 119 /** 120 * An efficient array based implementation similar to ClusterState for keeping 121 * the status of the cluster in terms of region assignment and distribution. 122 * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of 123 * hundreds of thousands of hashmap manipulations are very costly, which is why this 124 * class uses mostly indexes and arrays. 125 * 126 * Cluster tracks a list of unassigned regions, region assignments, and the server 127 * topology in terms of server names, hostnames and racks. 128 */ 129 protected static class Cluster { 130 ServerName[] servers; 131 String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host 132 String[] racks; 133 boolean multiServersPerHost = false; // whether or not any host has more than one server 134 135 ArrayList<String> tables; 136 RegionInfo[] regions; 137 Deque<BalancerRegionLoad>[] regionLoads; 138 private RegionLocationFinder regionFinder; 139 140 int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality 141 142 int[] serverIndexToHostIndex; //serverIndex -> host index 143 int[] serverIndexToRackIndex; //serverIndex -> rack index 144 145 int[][] regionsPerServer; //serverIndex -> region list 146 int[] serverIndexToRegionsOffset; //serverIndex -> offset of region list 147 int[][] regionsPerHost; //hostIndex -> list of regions 148 int[][] regionsPerRack; //rackIndex -> region list 149 int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index 150 int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index 151 int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index 152 153 int[][] serversPerHost; //hostIndex -> list of server indexes 154 int[][] serversPerRack; //rackIndex -> list of server indexes 155 int[] regionIndexToServerIndex; //regionIndex -> serverIndex 156 int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) 157 int[] regionIndexToTableIndex; //regionIndex -> tableIndex 158 int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions 159 int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS 160 int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary 161 boolean hasRegionReplicas = false; //whether there is regions with replicas 162 163 Integer[] serverIndicesSortedByRegionCount; 164 Integer[] serverIndicesSortedByLocality; 165 166 Map<String, Integer> serversToIndex; 167 Map<String, Integer> hostsToIndex; 168 Map<String, Integer> racksToIndex; 169 Map<String, Integer> tablesToIndex; 170 Map<RegionInfo, Integer> regionsToIndex; 171 float[] localityPerServer; 172 173 int numServers; 174 int numHosts; 175 int numRacks; 176 int numTables; 177 int numRegions; 178 179 int numMovedRegions = 0; //num moved regions from the initial configuration 180 Map<ServerName, List<RegionInfo>> clusterState; 181 182 protected final RackManager rackManager; 183 // Maps region -> rackIndex -> locality of region on rack 184 private float[][] rackLocalities; 185 // Maps localityType -> region -> [server|rack]Index with highest locality 186 private int[][] regionsToMostLocalEntities; 187 188 protected Cluster( 189 Map<ServerName, List<RegionInfo>> clusterState, 190 Map<String, Deque<BalancerRegionLoad>> loads, 191 RegionLocationFinder regionFinder, 192 RackManager rackManager) { 193 this(null, clusterState, loads, regionFinder, rackManager); 194 } 195 196 @SuppressWarnings("unchecked") 197 protected Cluster( 198 Collection<RegionInfo> unassignedRegions, 199 Map<ServerName, List<RegionInfo>> clusterState, 200 Map<String, Deque<BalancerRegionLoad>> loads, 201 RegionLocationFinder regionFinder, 202 RackManager rackManager) { 203 204 if (unassignedRegions == null) { 205 unassignedRegions = EMPTY_REGION_LIST; 206 } 207 208 serversToIndex = new HashMap<>(); 209 hostsToIndex = new HashMap<>(); 210 racksToIndex = new HashMap<>(); 211 tablesToIndex = new HashMap<>(); 212 213 //TODO: We should get the list of tables from master 214 tables = new ArrayList<>(); 215 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 216 217 numRegions = 0; 218 219 List<List<Integer>> serversPerHostList = new ArrayList<>(); 220 List<List<Integer>> serversPerRackList = new ArrayList<>(); 221 this.clusterState = clusterState; 222 this.regionFinder = regionFinder; 223 224 // Use servername and port as there can be dead servers in this list. We want everything with 225 // a matching hostname and port to have the same index. 226 for (ServerName sn : clusterState.keySet()) { 227 if (sn == null) { 228 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + 229 "skipping; unassigned regions?"); 230 if (LOG.isTraceEnabled()) { 231 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 232 } 233 continue; 234 } 235 if (serversToIndex.get(sn.getAddress().toString()) == null) { 236 serversToIndex.put(sn.getHostAndPort(), numServers++); 237 } 238 if (!hostsToIndex.containsKey(sn.getHostname())) { 239 hostsToIndex.put(sn.getHostname(), numHosts++); 240 serversPerHostList.add(new ArrayList<>(1)); 241 } 242 243 int serverIndex = serversToIndex.get(sn.getHostAndPort()); 244 int hostIndex = hostsToIndex.get(sn.getHostname()); 245 serversPerHostList.get(hostIndex).add(serverIndex); 246 247 String rack = this.rackManager.getRack(sn); 248 if (!racksToIndex.containsKey(rack)) { 249 racksToIndex.put(rack, numRacks++); 250 serversPerRackList.add(new ArrayList<>()); 251 } 252 int rackIndex = racksToIndex.get(rack); 253 serversPerRackList.get(rackIndex).add(serverIndex); 254 } 255 256 // Count how many regions there are. 257 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 258 numRegions += entry.getValue().size(); 259 } 260 numRegions += unassignedRegions.size(); 261 262 regionsToIndex = new HashMap<>(numRegions); 263 servers = new ServerName[numServers]; 264 serversPerHost = new int[numHosts][]; 265 serversPerRack = new int[numRacks][]; 266 regions = new RegionInfo[numRegions]; 267 regionIndexToServerIndex = new int[numRegions]; 268 initialRegionIndexToServerIndex = new int[numRegions]; 269 regionIndexToTableIndex = new int[numRegions]; 270 regionIndexToPrimaryIndex = new int[numRegions]; 271 regionLoads = new Deque[numRegions]; 272 273 regionLocations = new int[numRegions][]; 274 serverIndicesSortedByRegionCount = new Integer[numServers]; 275 serverIndicesSortedByLocality = new Integer[numServers]; 276 localityPerServer = new float[numServers]; 277 278 serverIndexToHostIndex = new int[numServers]; 279 serverIndexToRackIndex = new int[numServers]; 280 regionsPerServer = new int[numServers][]; 281 serverIndexToRegionsOffset = new int[numServers]; 282 regionsPerHost = new int[numHosts][]; 283 regionsPerRack = new int[numRacks][]; 284 primariesOfRegionsPerServer = new int[numServers][]; 285 primariesOfRegionsPerHost = new int[numHosts][]; 286 primariesOfRegionsPerRack = new int[numRacks][]; 287 288 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; 289 290 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 291 if (entry.getKey() == null) { 292 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 293 continue; 294 } 295 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 296 297 // keep the servername if this is the first server name for this hostname 298 // or this servername has the newest startcode. 299 if (servers[serverIndex] == null || 300 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { 301 servers[serverIndex] = entry.getKey(); 302 } 303 304 if (regionsPerServer[serverIndex] != null) { 305 // there is another server with the same hostAndPort in ClusterState. 306 // allocate the array for the total size 307 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 308 } else { 309 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 310 } 311 primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; 312 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 313 serverIndicesSortedByLocality[serverIndex] = serverIndex; 314 } 315 316 hosts = new String[numHosts]; 317 for (Entry<String, Integer> entry : hostsToIndex.entrySet()) { 318 hosts[entry.getValue()] = entry.getKey(); 319 } 320 racks = new String[numRacks]; 321 for (Entry<String, Integer> entry : racksToIndex.entrySet()) { 322 racks[entry.getValue()] = entry.getKey(); 323 } 324 325 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 326 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 327 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 328 329 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 330 serverIndexToHostIndex[serverIndex] = hostIndex; 331 332 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 333 serverIndexToRackIndex[serverIndex] = rackIndex; 334 335 for (RegionInfo region : entry.getValue()) { 336 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 337 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 338 regionIndex++; 339 } 340 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 341 } 342 343 for (RegionInfo region : unassignedRegions) { 344 registerRegion(region, regionIndex, -1, loads, regionFinder); 345 regionIndex++; 346 } 347 348 for (int i = 0; i < serversPerHostList.size(); i++) { 349 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 350 for (int j = 0; j < serversPerHost[i].length; j++) { 351 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 352 } 353 if (serversPerHost[i].length > 1) { 354 multiServersPerHost = true; 355 } 356 } 357 358 for (int i = 0; i < serversPerRackList.size(); i++) { 359 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 360 for (int j = 0; j < serversPerRack[i].length; j++) { 361 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 362 } 363 } 364 365 numTables = tables.size(); 366 numRegionsPerServerPerTable = new int[numServers][numTables]; 367 368 for (int i = 0; i < numServers; i++) { 369 for (int j = 0; j < numTables; j++) { 370 numRegionsPerServerPerTable[i][j] = 0; 371 } 372 } 373 374 for (int i=0; i < regionIndexToServerIndex.length; i++) { 375 if (regionIndexToServerIndex[i] >= 0) { 376 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; 377 } 378 } 379 380 numMaxRegionsPerTable = new int[numTables]; 381 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 382 for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { 383 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 384 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 385 } 386 } 387 } 388 389 for (int i = 0; i < regions.length; i ++) { 390 RegionInfo info = regions[i]; 391 if (RegionReplicaUtil.isDefaultReplica(info)) { 392 regionIndexToPrimaryIndex[i] = i; 393 } else { 394 hasRegionReplicas = true; 395 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 396 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 397 } 398 } 399 400 for (int i = 0; i < regionsPerServer.length; i++) { 401 primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; 402 for (int j = 0; j < regionsPerServer[i].length; j++) { 403 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 404 primariesOfRegionsPerServer[i][j] = primaryIndex; 405 } 406 // sort the regions by primaries. 407 Arrays.sort(primariesOfRegionsPerServer[i]); 408 } 409 410 // compute regionsPerHost 411 if (multiServersPerHost) { 412 for (int i = 0 ; i < serversPerHost.length; i++) { 413 int numRegionsPerHost = 0; 414 for (int j = 0; j < serversPerHost[i].length; j++) { 415 numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; 416 } 417 regionsPerHost[i] = new int[numRegionsPerHost]; 418 primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; 419 } 420 for (int i = 0 ; i < serversPerHost.length; i++) { 421 int numRegionPerHostIndex = 0; 422 for (int j = 0; j < serversPerHost[i].length; j++) { 423 for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { 424 int region = regionsPerServer[serversPerHost[i][j]][k]; 425 regionsPerHost[i][numRegionPerHostIndex] = region; 426 int primaryIndex = regionIndexToPrimaryIndex[region]; 427 primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; 428 numRegionPerHostIndex++; 429 } 430 } 431 // sort the regions by primaries. 432 Arrays.sort(primariesOfRegionsPerHost[i]); 433 } 434 } 435 436 // compute regionsPerRack 437 if (numRacks > 1) { 438 for (int i = 0 ; i < serversPerRack.length; i++) { 439 int numRegionsPerRack = 0; 440 for (int j = 0; j < serversPerRack[i].length; j++) { 441 numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; 442 } 443 regionsPerRack[i] = new int[numRegionsPerRack]; 444 primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; 445 } 446 447 for (int i = 0 ; i < serversPerRack.length; i++) { 448 int numRegionPerRackIndex = 0; 449 for (int j = 0; j < serversPerRack[i].length; j++) { 450 for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { 451 int region = regionsPerServer[serversPerRack[i][j]][k]; 452 regionsPerRack[i][numRegionPerRackIndex] = region; 453 int primaryIndex = regionIndexToPrimaryIndex[region]; 454 primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; 455 numRegionPerRackIndex++; 456 } 457 } 458 // sort the regions by primaries. 459 Arrays.sort(primariesOfRegionsPerRack[i]); 460 } 461 } 462 } 463 464 /** Helper for Cluster constructor to handle a region */ 465 private void registerRegion(RegionInfo region, int regionIndex, 466 int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, 467 RegionLocationFinder regionFinder) { 468 String tableName = region.getTable().getNameAsString(); 469 if (!tablesToIndex.containsKey(tableName)) { 470 tables.add(tableName); 471 tablesToIndex.put(tableName, tablesToIndex.size()); 472 } 473 int tableIndex = tablesToIndex.get(tableName); 474 475 regionsToIndex.put(region, regionIndex); 476 regions[regionIndex] = region; 477 regionIndexToServerIndex[regionIndex] = serverIndex; 478 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 479 regionIndexToTableIndex[regionIndex] = tableIndex; 480 481 // region load 482 if (loads != null) { 483 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 484 // That could have failed if the RegionLoad is using the other regionName 485 if (rl == null) { 486 // Try getting the region load using encoded name. 487 rl = loads.get(region.getEncodedName()); 488 } 489 regionLoads[regionIndex] = rl; 490 } 491 492 if (regionFinder != null) { 493 // region location 494 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 495 regionLocations[regionIndex] = new int[loc.size()]; 496 for (int i = 0; i < loc.size(); i++) { 497 regionLocations[regionIndex][i] = loc.get(i) == null ? -1 498 : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 499 : serversToIndex.get(loc.get(i).getHostAndPort())); 500 } 501 } 502 } 503 504 /** 505 * Returns true iff a given server has less regions than the balanced amount 506 */ 507 public boolean serverHasTooFewRegions(int server) { 508 int minLoad = this.numRegions / numServers; 509 int numRegions = getNumRegions(server); 510 return numRegions < minLoad; 511 } 512 513 /** 514 * Retrieves and lazily initializes a field storing the locality of 515 * every region/server combination 516 */ 517 public float[][] getOrComputeRackLocalities() { 518 if (rackLocalities == null || regionsToMostLocalEntities == null) { 519 computeCachedLocalities(); 520 } 521 return rackLocalities; 522 } 523 524 /** 525 * Lazily initializes and retrieves a mapping of region -> server for which region has 526 * the highest the locality 527 */ 528 public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { 529 if (rackLocalities == null || regionsToMostLocalEntities == null) { 530 computeCachedLocalities(); 531 } 532 return regionsToMostLocalEntities[type.ordinal()]; 533 } 534 535 /** 536 * Looks up locality from cache of localities. Will create cache if it does 537 * not already exist. 538 */ 539 public float getOrComputeLocality(int region, int entity, LocalityType type) { 540 switch (type) { 541 case SERVER: 542 return getLocalityOfRegion(region, entity); 543 case RACK: 544 return getOrComputeRackLocalities()[region][entity]; 545 default: 546 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 547 } 548 } 549 550 /** 551 * Returns locality weighted by region size in MB. Will create locality cache 552 * if it does not already exist. 553 */ 554 public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { 555 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 556 } 557 558 /** 559 * Returns the size in MB from the most recent RegionLoad for region 560 */ 561 public int getRegionSizeMB(int region) { 562 Deque<BalancerRegionLoad> load = regionLoads[region]; 563 // This means regions have no actual data on disk 564 if (load == null) { 565 return 0; 566 } 567 return regionLoads[region].getLast().getStorefileSizeMB(); 568 } 569 570 /** 571 * Computes and caches the locality for each region/rack combinations, 572 * as well as storing a mapping of region -> server and region -> rack such that server 573 * and rack have the highest locality for region 574 */ 575 private void computeCachedLocalities() { 576 rackLocalities = new float[numRegions][numRacks]; 577 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 578 579 // Compute localities and find most local server per region 580 for (int region = 0; region < numRegions; region++) { 581 int serverWithBestLocality = 0; 582 float bestLocalityForRegion = 0; 583 for (int server = 0; server < numServers; server++) { 584 // Aggregate per-rack locality 585 float locality = getLocalityOfRegion(region, server); 586 int rack = serverIndexToRackIndex[server]; 587 int numServersInRack = serversPerRack[rack].length; 588 rackLocalities[region][rack] += locality / numServersInRack; 589 590 if (locality > bestLocalityForRegion) { 591 serverWithBestLocality = server; 592 bestLocalityForRegion = locality; 593 } 594 } 595 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 596 597 // Find most local rack per region 598 int rackWithBestLocality = 0; 599 float bestRackLocalityForRegion = 0.0f; 600 for (int rack = 0; rack < numRacks; rack++) { 601 float rackLocality = rackLocalities[region][rack]; 602 if (rackLocality > bestRackLocalityForRegion) { 603 bestRackLocalityForRegion = rackLocality; 604 rackWithBestLocality = rack; 605 } 606 } 607 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 608 } 609 610 } 611 612 /** 613 * Maps region index to rack index 614 */ 615 public int getRackForRegion(int region) { 616 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 617 } 618 619 enum LocalityType { 620 SERVER, 621 RACK 622 } 623 624 /** An action to move or swap a region */ 625 public static class Action { 626 public enum Type { 627 ASSIGN_REGION, 628 MOVE_REGION, 629 SWAP_REGIONS, 630 NULL, 631 } 632 633 public Type type; 634 public Action (Type type) {this.type = type;} 635 /** Returns an Action which would undo this action */ 636 public Action undoAction() { return this; } 637 @Override 638 public String toString() { return type + ":";} 639 } 640 641 public static class AssignRegionAction extends Action { 642 public int region; 643 public int server; 644 public AssignRegionAction(int region, int server) { 645 super(Type.ASSIGN_REGION); 646 this.region = region; 647 this.server = server; 648 } 649 @Override 650 public Action undoAction() { 651 // TODO implement this. This action is not being used by the StochasticLB for now 652 // in case it uses it, we should implement this function. 653 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 654 } 655 @Override 656 public String toString() { 657 return type + ": " + region + ":" + server; 658 } 659 } 660 661 public static class MoveRegionAction extends Action { 662 public int region; 663 public int fromServer; 664 public int toServer; 665 666 public MoveRegionAction(int region, int fromServer, int toServer) { 667 super(Type.MOVE_REGION); 668 this.fromServer = fromServer; 669 this.region = region; 670 this.toServer = toServer; 671 } 672 @Override 673 public Action undoAction() { 674 return new MoveRegionAction (region, toServer, fromServer); 675 } 676 @Override 677 public String toString() { 678 return type + ": " + region + ":" + fromServer + " -> " + toServer; 679 } 680 } 681 682 public static class SwapRegionsAction extends Action { 683 public int fromServer; 684 public int fromRegion; 685 public int toServer; 686 public int toRegion; 687 public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { 688 super(Type.SWAP_REGIONS); 689 this.fromServer = fromServer; 690 this.fromRegion = fromRegion; 691 this.toServer = toServer; 692 this.toRegion = toRegion; 693 } 694 @Override 695 public Action undoAction() { 696 return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); 697 } 698 @Override 699 public String toString() { 700 return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; 701 } 702 } 703 704 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", 705 justification="Mistake. Too disruptive to change now") 706 public static final Action NullAction = new Action(Type.NULL); 707 708 public void doAction(Action action) { 709 switch (action.type) { 710 case NULL: break; 711 case ASSIGN_REGION: 712 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 713 assert action instanceof AssignRegionAction: action.getClass(); 714 AssignRegionAction ar = (AssignRegionAction) action; 715 regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); 716 regionMoved(ar.region, -1, ar.server); 717 break; 718 case MOVE_REGION: 719 assert action instanceof MoveRegionAction: action.getClass(); 720 MoveRegionAction mra = (MoveRegionAction) action; 721 regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); 722 regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); 723 regionMoved(mra.region, mra.fromServer, mra.toServer); 724 break; 725 case SWAP_REGIONS: 726 assert action instanceof SwapRegionsAction: action.getClass(); 727 SwapRegionsAction a = (SwapRegionsAction) action; 728 regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); 729 regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); 730 regionMoved(a.fromRegion, a.fromServer, a.toServer); 731 regionMoved(a.toRegion, a.toServer, a.fromServer); 732 break; 733 default: 734 throw new RuntimeException("Uknown action:" + action.type); 735 } 736 } 737 738 /** 739 * Return true if the placement of region on server would lower the availability 740 * of the region in question 741 * @return true or false 742 */ 743 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 744 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 745 return false; // safeguard against race between cluster.servers and servers from LB method args 746 } 747 int server = serversToIndex.get(serverName.getHostAndPort()); 748 int region = regionsToIndex.get(regionInfo); 749 750 // Region replicas for same region should better assign to different servers 751 for (int i : regionsPerServer[server]) { 752 RegionInfo otherRegionInfo = regions[i]; 753 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 754 return true; 755 } 756 } 757 758 int primary = regionIndexToPrimaryIndex[region]; 759 if (primary == -1) { 760 return false; 761 } 762 // there is a subset relation for server < host < rack 763 // check server first 764 if (contains(primariesOfRegionsPerServer[server], primary)) { 765 // check for whether there are other servers that we can place this region 766 for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { 767 if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { 768 return true; // meaning there is a better server 769 } 770 } 771 return false; // there is not a better server to place this 772 } 773 774 // check host 775 if (multiServersPerHost) { 776 // these arrays would only be allocated if we have more than one server per host 777 int host = serverIndexToHostIndex[server]; 778 if (contains(primariesOfRegionsPerHost[host], primary)) { 779 // check for whether there are other hosts that we can place this region 780 for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { 781 if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { 782 return true; // meaning there is a better host 783 } 784 } 785 return false; // there is not a better host to place this 786 } 787 } 788 789 // check rack 790 if (numRacks > 1) { 791 int rack = serverIndexToRackIndex[server]; 792 if (contains(primariesOfRegionsPerRack[rack], primary)) { 793 // check for whether there are other racks that we can place this region 794 for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { 795 if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { 796 return true; // meaning there is a better rack 797 } 798 } 799 return false; // there is not a better rack to place this 800 } 801 } 802 803 return false; 804 } 805 806 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 807 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 808 return; 809 } 810 int server = serversToIndex.get(serverName.getHostAndPort()); 811 int region = regionsToIndex.get(regionInfo); 812 doAction(new AssignRegionAction(region, server)); 813 } 814 815 void regionMoved(int region, int oldServer, int newServer) { 816 regionIndexToServerIndex[region] = newServer; 817 if (initialRegionIndexToServerIndex[region] == newServer) { 818 numMovedRegions--; //region moved back to original location 819 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 820 numMovedRegions++; //region moved from original location 821 } 822 int tableIndex = regionIndexToTableIndex[region]; 823 if (oldServer >= 0) { 824 numRegionsPerServerPerTable[oldServer][tableIndex]--; 825 } 826 numRegionsPerServerPerTable[newServer][tableIndex]++; 827 828 //check whether this caused maxRegionsPerTable in the new Server to be updated 829 if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { 830 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; 831 } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) 832 == numMaxRegionsPerTable[tableIndex]) { 833 //recompute maxRegionsPerTable since the previous value was coming from the old server 834 numMaxRegionsPerTable[tableIndex] = 0; 835 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 836 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 837 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 838 } 839 } 840 } 841 842 // update for servers 843 int primary = regionIndexToPrimaryIndex[region]; 844 if (oldServer >= 0) { 845 primariesOfRegionsPerServer[oldServer] = removeRegion( 846 primariesOfRegionsPerServer[oldServer], primary); 847 } 848 primariesOfRegionsPerServer[newServer] = addRegionSorted( 849 primariesOfRegionsPerServer[newServer], primary); 850 851 // update for hosts 852 if (multiServersPerHost) { 853 int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; 854 int newHost = serverIndexToHostIndex[newServer]; 855 if (newHost != oldHost) { 856 regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); 857 primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); 858 if (oldHost >= 0) { 859 regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); 860 primariesOfRegionsPerHost[oldHost] = removeRegion( 861 primariesOfRegionsPerHost[oldHost], primary); // will still be sorted 862 } 863 } 864 } 865 866 // update for racks 867 if (numRacks > 1) { 868 int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; 869 int newRack = serverIndexToRackIndex[newServer]; 870 if (newRack != oldRack) { 871 regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); 872 primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); 873 if (oldRack >= 0) { 874 regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); 875 primariesOfRegionsPerRack[oldRack] = removeRegion( 876 primariesOfRegionsPerRack[oldRack], primary); // will still be sorted 877 } 878 } 879 } 880 } 881 882 int[] removeRegion(int[] regions, int regionIndex) { 883 //TODO: this maybe costly. Consider using linked lists 884 int[] newRegions = new int[regions.length - 1]; 885 int i = 0; 886 for (i = 0; i < regions.length; i++) { 887 if (regions[i] == regionIndex) { 888 break; 889 } 890 newRegions[i] = regions[i]; 891 } 892 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); 893 return newRegions; 894 } 895 896 int[] addRegion(int[] regions, int regionIndex) { 897 int[] newRegions = new int[regions.length + 1]; 898 System.arraycopy(regions, 0, newRegions, 0, regions.length); 899 newRegions[newRegions.length - 1] = regionIndex; 900 return newRegions; 901 } 902 903 int[] addRegionSorted(int[] regions, int regionIndex) { 904 int[] newRegions = new int[regions.length + 1]; 905 int i = 0; 906 for (i = 0; i < regions.length; i++) { // find the index to insert 907 if (regions[i] > regionIndex) { 908 break; 909 } 910 } 911 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 912 System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half 913 newRegions[i] = regionIndex; 914 915 return newRegions; 916 } 917 918 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 919 int i = 0; 920 for (i = 0; i < regions.length; i++) { 921 if (regions[i] == regionIndex) { 922 regions[i] = newRegionIndex; 923 break; 924 } 925 } 926 return regions; 927 } 928 929 void sortServersByRegionCount() { 930 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 931 } 932 933 int getNumRegions(int server) { 934 return regionsPerServer[server].length; 935 } 936 937 boolean contains(int[] arr, int val) { 938 return Arrays.binarySearch(arr, val) >= 0; 939 } 940 941 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 942 943 int getLowestLocalityRegionOnServer(int serverIndex) { 944 if (regionFinder != null) { 945 float lowestLocality = 1.0f; 946 int lowestLocalityRegionIndex = -1; 947 if (regionsPerServer[serverIndex].length == 0) { 948 // No regions on that region server 949 return -1; 950 } 951 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 952 int regionIndex = regionsPerServer[serverIndex][j]; 953 HDFSBlocksDistribution distribution = regionFinder 954 .getBlockDistribution(regions[regionIndex]); 955 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 956 // skip empty region 957 if (distribution.getUniqueBlocksTotalWeight() == 0) { 958 continue; 959 } 960 if (locality < lowestLocality) { 961 lowestLocality = locality; 962 lowestLocalityRegionIndex = j; 963 } 964 } 965 if (lowestLocalityRegionIndex == -1) { 966 return -1; 967 } 968 if (LOG.isTraceEnabled()) { 969 LOG.trace("Lowest locality region is " 970 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 971 .getRegionNameAsString() + " with locality " + lowestLocality 972 + " and its region server contains " + regionsPerServer[serverIndex].length 973 + " regions"); 974 } 975 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 976 } else { 977 return -1; 978 } 979 } 980 981 float getLocalityOfRegion(int region, int server) { 982 if (regionFinder != null) { 983 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 984 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 985 } else { 986 return 0f; 987 } 988 } 989 990 @VisibleForTesting 991 protected void setNumRegions(int numRegions) { 992 this.numRegions = numRegions; 993 } 994 995 @VisibleForTesting 996 protected void setNumMovedRegions(int numMovedRegions) { 997 this.numMovedRegions = numMovedRegions; 998 } 999 1000 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", 1001 justification="Not important but should be fixed") 1002 @Override 1003 public String toString() { 1004 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1005 for(ServerName sn:servers) { 1006 desc.append(sn.getHostAndPort()).append(", "); 1007 } 1008 desc.append("], serverIndicesSortedByRegionCount=") 1009 .append(Arrays.toString(serverIndicesSortedByRegionCount)) 1010 .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); 1011 1012 desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) 1013 .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1014 .append(", numTables=").append(numTables).append(", numMovedRegions=") 1015 .append(numMovedRegions).append('}'); 1016 return desc.toString(); 1017 } 1018 } 1019 1020 // slop for regions 1021 protected float slop; 1022 // overallSlop to control simpleLoadBalancer's cluster level threshold 1023 protected float overallSlop; 1024 protected Configuration config = HBaseConfiguration.create(); 1025 protected RackManager rackManager; 1026 private static final Random RANDOM = new Random(System.currentTimeMillis()); 1027 private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); 1028 protected MetricsBalancer metricsBalancer = null; 1029 protected ClusterMetrics clusterStatus = null; 1030 protected ServerName masterServerName; 1031 protected MasterServices services; 1032 protected boolean onlySystemTablesOnMaster; 1033 protected boolean maintenanceMode; 1034 1035 @Override 1036 public void setConf(Configuration conf) { 1037 this.config = conf; 1038 setSlop(conf); 1039 if (slop < 0) slop = 0; 1040 else if (slop > 1) slop = 1; 1041 1042 if (overallSlop < 0) overallSlop = 0; 1043 else if (overallSlop > 1) overallSlop = 1; 1044 1045 this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config); 1046 1047 this.rackManager = new RackManager(getConf()); 1048 if (useRegionFinder) { 1049 regionFinder.setConf(conf); 1050 } 1051 this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); 1052 // Print out base configs. Don't print overallSlop since it for simple balancer exclusively. 1053 LOG.info("slop={}, systemTablesOnMaster={}", 1054 this.slop, this.onlySystemTablesOnMaster); 1055 } 1056 1057 protected void setSlop(Configuration conf) { 1058 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); 1059 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 1060 } 1061 1062 /** 1063 * Check if a region belongs to some system table. 1064 * If so, the primary replica may be expected to be put on the master regionserver. 1065 */ 1066 public boolean shouldBeOnMaster(RegionInfo region) { 1067 return (this.maintenanceMode || this.onlySystemTablesOnMaster) 1068 && region.getTable().isSystemTable(); 1069 } 1070 1071 /** 1072 * Balance the regions that should be on master regionserver. 1073 */ 1074 protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) { 1075 if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null; 1076 List<RegionPlan> plans = null; 1077 List<RegionInfo> regions = clusterMap.get(masterServerName); 1078 if (regions != null) { 1079 Iterator<ServerName> keyIt = null; 1080 for (RegionInfo region: regions) { 1081 if (shouldBeOnMaster(region)) continue; 1082 1083 // Find a non-master regionserver to host the region 1084 if (keyIt == null || !keyIt.hasNext()) { 1085 keyIt = clusterMap.keySet().iterator(); 1086 } 1087 ServerName dest = keyIt.next(); 1088 if (masterServerName.equals(dest)) { 1089 if (!keyIt.hasNext()) { 1090 keyIt = clusterMap.keySet().iterator(); 1091 } 1092 dest = keyIt.next(); 1093 } 1094 1095 // Move this region away from the master regionserver 1096 RegionPlan plan = new RegionPlan(region, masterServerName, dest); 1097 if (plans == null) { 1098 plans = new ArrayList<>(); 1099 } 1100 plans.add(plan); 1101 } 1102 } 1103 for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) { 1104 if (masterServerName.equals(server.getKey())) continue; 1105 for (RegionInfo region: server.getValue()) { 1106 if (!shouldBeOnMaster(region)) continue; 1107 1108 // Move this region to the master regionserver 1109 RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName); 1110 if (plans == null) { 1111 plans = new ArrayList<>(); 1112 } 1113 plans.add(plan); 1114 } 1115 } 1116 return plans; 1117 } 1118 1119 /** 1120 * If master is configured to carry system tables only, in here is 1121 * where we figure what to assign it. 1122 */ 1123 protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions( 1124 Collection<RegionInfo> regions, List<ServerName> servers) { 1125 if (servers == null || regions == null || regions.isEmpty()) { 1126 return null; 1127 } 1128 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 1129 if (this.maintenanceMode || this.onlySystemTablesOnMaster) { 1130 if (masterServerName != null && servers.contains(masterServerName)) { 1131 assignments.put(masterServerName, new ArrayList<>()); 1132 for (RegionInfo region : regions) { 1133 if (shouldBeOnMaster(region)) { 1134 assignments.get(masterServerName).add(region); 1135 } 1136 } 1137 } 1138 } 1139 return assignments; 1140 } 1141 1142 @Override 1143 public Configuration getConf() { 1144 return this.config; 1145 } 1146 1147 @Override 1148 public synchronized void setClusterMetrics(ClusterMetrics st) { 1149 this.clusterStatus = st; 1150 if (useRegionFinder) { 1151 regionFinder.setClusterMetrics(st); 1152 } 1153 } 1154 1155 1156 @Override 1157 public void setMasterServices(MasterServices masterServices) { 1158 masterServerName = masterServices.getServerName(); 1159 this.services = masterServices; 1160 if (useRegionFinder) { 1161 this.regionFinder.setServices(masterServices); 1162 } 1163 if (this.services.isInMaintenanceMode()) { 1164 this.maintenanceMode = true; 1165 } 1166 } 1167 1168 @Override 1169 public void postMasterStartupInitialize() { 1170 if (services != null && regionFinder != null) { 1171 try { 1172 Set<RegionInfo> regions = 1173 services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet(); 1174 regionFinder.refreshAndWait(regions); 1175 } catch (Exception e) { 1176 LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e); 1177 } 1178 } 1179 } 1180 1181 public void setRackManager(RackManager rackManager) { 1182 this.rackManager = rackManager; 1183 } 1184 1185 protected boolean needsBalance(TableName tableName, Cluster c) { 1186 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 1187 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 1188 if (LOG.isDebugEnabled()) { 1189 LOG.debug("Not running balancer because only " + cs.getNumServers() 1190 + " active regionserver(s)"); 1191 } 1192 return false; 1193 } 1194 if(areSomeRegionReplicasColocated(c)) return true; 1195 if(idleRegionServerExist(c)) { 1196 return true; 1197 } 1198 1199 // Check if we even need to do any load balancing 1200 // HBASE-3681 check sloppiness first 1201 float average = cs.getLoadAverage(); // for logging 1202 int floor = (int) Math.floor(average * (1 - slop)); 1203 int ceiling = (int) Math.ceil(average * (1 + slop)); 1204 if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { 1205 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 1206 if (LOG.isTraceEnabled()) { 1207 // If nothing to balance, then don't say anything unless trace-level logging. 1208 LOG.trace("Skipping load balancing because balanced cluster; " + 1209 "servers=" + cs.getNumServers() + 1210 " regions=" + cs.getNumRegions() + " average=" + average + 1211 " mostloaded=" + serversByLoad.lastKey().getLoad() + 1212 " leastloaded=" + serversByLoad.firstKey().getLoad()); 1213 } 1214 return false; 1215 } 1216 return true; 1217 } 1218 1219 /** 1220 * Subclasses should implement this to return true if the cluster has nodes that hosts 1221 * multiple replicas for the same region, or, if there are multiple racks and the same 1222 * rack hosts replicas of the same region 1223 * @param c Cluster information 1224 * @return whether region replicas are currently co-located 1225 */ 1226 protected boolean areSomeRegionReplicasColocated(Cluster c) { 1227 return false; 1228 } 1229 1230 protected final boolean idleRegionServerExist(Cluster c){ 1231 boolean isServerExistsWithMoreRegions = false; 1232 boolean isServerExistsWithZeroRegions = false; 1233 for (int[] serverList: c.regionsPerServer){ 1234 if (serverList.length > 1) { 1235 isServerExistsWithMoreRegions = true; 1236 } 1237 if (serverList.length == 0) { 1238 isServerExistsWithZeroRegions = true; 1239 } 1240 } 1241 return isServerExistsWithMoreRegions && isServerExistsWithZeroRegions; 1242 } 1243 1244 /** 1245 * Generates a bulk assignment plan to be used on cluster startup using a 1246 * simple round-robin assignment. 1247 * <p> 1248 * Takes a list of all the regions and all the servers in the cluster and 1249 * returns a map of each server to the regions that it should be assigned. 1250 * <p> 1251 * Currently implemented as a round-robin assignment. Same invariant as load 1252 * balancing, all servers holding floor(avg) or ceiling(avg). 1253 * 1254 * TODO: Use block locations from HDFS to place regions with their blocks 1255 * 1256 * @param regions all regions 1257 * @param servers all servers 1258 * @return map of server to the regions it should take, or null if no 1259 * assignment is possible (ie. no regions or no servers) 1260 */ 1261 @Override 1262 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 1263 List<ServerName> servers) throws HBaseIOException { 1264 metricsBalancer.incrMiscInvocations(); 1265 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers); 1266 if (assignments != null && !assignments.isEmpty()) { 1267 servers = new ArrayList<>(servers); 1268 // Guarantee not to put other regions on master 1269 servers.remove(masterServerName); 1270 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1271 if (!masterRegions.isEmpty()) { 1272 regions = new ArrayList<>(regions); 1273 regions.removeAll(masterRegions); 1274 } 1275 } 1276 if (this.maintenanceMode || regions == null || regions.isEmpty()) { 1277 return assignments; 1278 } 1279 1280 int numServers = servers == null ? 0 : servers.size(); 1281 if (numServers == 0) { 1282 LOG.warn("Wanted to do round robin assignment but no servers to assign to"); 1283 return null; 1284 } 1285 1286 // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the 1287 // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate 1288 // generator for AssignRegionAction. The LB will ensure the regions are mostly local 1289 // and balanced. This should also run fast with fewer number of iterations. 1290 1291 if (numServers == 1) { // Only one server, nothing fancy we can do here 1292 ServerName server = servers.get(0); 1293 assignments.put(server, new ArrayList<>(regions)); 1294 return assignments; 1295 } 1296 1297 Cluster cluster = createCluster(servers, regions); 1298 roundRobinAssignment(cluster, regions, servers, assignments); 1299 return assignments; 1300 } 1301 1302 protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) 1303 throws HBaseIOException { 1304 boolean hasRegionReplica = false; 1305 try { 1306 if (services != null && services.getTableDescriptors() != null) { 1307 Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll(); 1308 for (RegionInfo regionInfo : regions) { 1309 TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString()); 1310 if (td != null && td.getRegionReplication() > 1) { 1311 hasRegionReplica = true; 1312 break; 1313 } 1314 } 1315 } 1316 } catch (IOException ioe) { 1317 throw new HBaseIOException(ioe); 1318 } 1319 1320 // Get the snapshot of the current assignments for the regions in question, and then create 1321 // a cluster out of it. Note that we might have replicas already assigned to some servers 1322 // earlier. So we want to get the snapshot to see those assignments, but this will only contain 1323 // replicas of the regions that are passed (for performance). 1324 Map<ServerName, List<RegionInfo>> clusterState = null; 1325 if (!hasRegionReplica) { 1326 clusterState = getRegionAssignmentsByServer(regions); 1327 } else { 1328 // for the case where we have region replica it is better we get the entire cluster's snapshot 1329 clusterState = getRegionAssignmentsByServer(null); 1330 } 1331 1332 for (ServerName server : servers) { 1333 if (!clusterState.containsKey(server)) { 1334 clusterState.put(server, EMPTY_REGION_LIST); 1335 } 1336 } 1337 return new Cluster(regions, clusterState, null, this.regionFinder, 1338 rackManager); 1339 } 1340 1341 private List<ServerName> findIdleServers(List<ServerName> servers) { 1342 return this.services.getServerManager() 1343 .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR); 1344 } 1345 1346 /** 1347 * Used to assign a single region to a random server. 1348 */ 1349 @Override 1350 public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers) 1351 throws HBaseIOException { 1352 metricsBalancer.incrMiscInvocations(); 1353 if (servers != null && servers.contains(masterServerName)) { 1354 if (shouldBeOnMaster(regionInfo)) { 1355 return masterServerName; 1356 } 1357 if (!LoadBalancer.isTablesOnMaster(getConf())) { 1358 // Guarantee we do not put any regions on master 1359 servers = new ArrayList<>(servers); 1360 servers.remove(masterServerName); 1361 } 1362 } 1363 1364 int numServers = servers == null ? 0 : servers.size(); 1365 if (numServers == 0) { 1366 LOG.warn("Wanted to retain assignment but no servers to assign to"); 1367 return null; 1368 } 1369 if (numServers == 1) { // Only one server, nothing fancy we can do here 1370 return servers.get(0); 1371 } 1372 List<ServerName> idleServers = findIdleServers(servers); 1373 if (idleServers.size() == 1) { 1374 return idleServers.get(0); 1375 } 1376 final List<ServerName> finalServers = idleServers.isEmpty() ? 1377 servers : idleServers; 1378 List<RegionInfo> regions = Lists.newArrayList(regionInfo); 1379 Cluster cluster = createCluster(finalServers, regions); 1380 return randomAssignment(cluster, regionInfo, finalServers); 1381 } 1382 1383 /** 1384 * Generates a bulk assignment startup plan, attempting to reuse the existing 1385 * assignment information from META, but adjusting for the specified list of 1386 * available/online servers available for assignment. 1387 * <p> 1388 * Takes a map of all regions to their existing assignment from META. Also 1389 * takes a list of online servers for regions to be assigned to. Attempts to 1390 * retain all assignment, so in some instances initial assignment will not be 1391 * completely balanced. 1392 * <p> 1393 * Any leftover regions without an existing server to be assigned to will be 1394 * assigned randomly to available servers. 1395 * 1396 * @param regions regions and existing assignment from meta 1397 * @param servers available servers 1398 * @return map of servers and regions to be assigned to them 1399 */ 1400 @Override 1401 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 1402 List<ServerName> servers) throws HBaseIOException { 1403 // Update metrics 1404 metricsBalancer.incrMiscInvocations(); 1405 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers); 1406 if (assignments != null && !assignments.isEmpty()) { 1407 servers = new ArrayList<>(servers); 1408 // Guarantee not to put other regions on master 1409 servers.remove(masterServerName); 1410 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1411 regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey())) 1412 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); 1413 } 1414 if (this.maintenanceMode || regions.isEmpty()) { 1415 return assignments; 1416 } 1417 1418 int numServers = servers == null ? 0 : servers.size(); 1419 if (numServers == 0) { 1420 LOG.warn("Wanted to do retain assignment but no servers to assign to"); 1421 return null; 1422 } 1423 if (numServers == 1) { // Only one server, nothing fancy we can do here 1424 ServerName server = servers.get(0); 1425 assignments.put(server, new ArrayList<>(regions.keySet())); 1426 return assignments; 1427 } 1428 1429 // Group all of the old assignments by their hostname. 1430 // We can't group directly by ServerName since the servers all have 1431 // new start-codes. 1432 1433 // Group the servers by their hostname. It's possible we have multiple 1434 // servers on the same host on different ports. 1435 ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create(); 1436 for (ServerName server : servers) { 1437 assignments.put(server, new ArrayList<>()); 1438 serversByHostname.put(server.getHostnameLowerCase(), server); 1439 } 1440 1441 // Collection of the hostnames that used to have regions 1442 // assigned, but for which we no longer have any RS running 1443 // after the cluster restart. 1444 Set<String> oldHostsNoLongerPresent = Sets.newTreeSet(); 1445 1446 // If the old servers aren't present, lets assign those regions later. 1447 List<RegionInfo> randomAssignRegions = Lists.newArrayList(); 1448 1449 int numRandomAssignments = 0; 1450 int numRetainedAssigments = 0; 1451 for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) { 1452 RegionInfo region = entry.getKey(); 1453 ServerName oldServerName = entry.getValue(); 1454 List<ServerName> localServers = new ArrayList<>(); 1455 if (oldServerName != null) { 1456 localServers = serversByHostname.get(oldServerName.getHostnameLowerCase()); 1457 } 1458 if (localServers.isEmpty()) { 1459 // No servers on the new cluster match up with this hostname, assign randomly, later. 1460 randomAssignRegions.add(region); 1461 if (oldServerName != null) { 1462 oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase()); 1463 } 1464 } else if (localServers.size() == 1) { 1465 // the usual case - one new server on same host 1466 ServerName target = localServers.get(0); 1467 assignments.get(target).add(region); 1468 numRetainedAssigments++; 1469 } else { 1470 // multiple new servers in the cluster on this same host 1471 if (localServers.contains(oldServerName)) { 1472 assignments.get(oldServerName).add(region); 1473 numRetainedAssigments++; 1474 } else { 1475 ServerName target = null; 1476 for (ServerName tmp : localServers) { 1477 if (tmp.getPort() == oldServerName.getPort()) { 1478 target = tmp; 1479 assignments.get(tmp).add(region); 1480 numRetainedAssigments++; 1481 break; 1482 } 1483 } 1484 if (target == null) { 1485 randomAssignRegions.add(region); 1486 } 1487 } 1488 } 1489 } 1490 1491 // If servers from prior assignment aren't present, then lets do randomAssignment on regions. 1492 if (randomAssignRegions.size() > 0) { 1493 Cluster cluster = createCluster(servers, regions.keySet()); 1494 for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) { 1495 ServerName sn = entry.getKey(); 1496 for (RegionInfo region : entry.getValue()) { 1497 cluster.doAssignRegion(region, sn); 1498 } 1499 } 1500 for (RegionInfo region : randomAssignRegions) { 1501 ServerName target = randomAssignment(cluster, region, servers); 1502 assignments.get(target).add(region); 1503 numRandomAssignments++; 1504 } 1505 } 1506 1507 String randomAssignMsg = ""; 1508 if (numRandomAssignments > 0) { 1509 randomAssignMsg = 1510 numRandomAssignments + " regions were assigned " 1511 + "to random hosts, since the old hosts for these regions are no " 1512 + "longer present in the cluster. These hosts were:\n " 1513 + Joiner.on("\n ").join(oldHostsNoLongerPresent); 1514 } 1515 1516 LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments 1517 + " retained the pre-restart assignment. " + randomAssignMsg); 1518 return assignments; 1519 } 1520 1521 @Override 1522 public void initialize() throws HBaseIOException{ 1523 } 1524 1525 @Override 1526 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 1527 } 1528 1529 @Override 1530 public void regionOffline(RegionInfo regionInfo) { 1531 } 1532 1533 @Override 1534 public boolean isStopped() { 1535 return stopped; 1536 } 1537 1538 @Override 1539 public void stop(String why) { 1540 LOG.info("Load Balancer stop requested: "+why); 1541 stopped = true; 1542 } 1543 1544 /** 1545 * Updates the balancer status tag reported to JMX 1546 */ 1547 public void updateBalancerStatus(boolean status) { 1548 metricsBalancer.balancerStatus(status); 1549 } 1550 1551 /** 1552 * Used to assign a single region to a random server. 1553 */ 1554 private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, 1555 List<ServerName> servers) { 1556 int numServers = servers.size(); // servers is not null, numServers > 1 1557 ServerName sn = null; 1558 final int maxIterations = numServers * 4; 1559 int iterations = 0; 1560 List<ServerName> usedSNs = new ArrayList<>(servers.size()); 1561 do { 1562 int i = RANDOM.nextInt(numServers); 1563 sn = servers.get(i); 1564 if (!usedSNs.contains(sn)) { 1565 usedSNs.add(sn); 1566 } 1567 } while (cluster.wouldLowerAvailability(regionInfo, sn) 1568 && iterations++ < maxIterations); 1569 if (iterations >= maxIterations) { 1570 // We have reached the max. Means the servers that we collected is still lowering the 1571 // availability 1572 for (ServerName unusedServer : servers) { 1573 if (!usedSNs.contains(unusedServer)) { 1574 // check if any other unused server is there for us to use. 1575 // If so use it. Else we have not other go but to go with one of them 1576 if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) { 1577 sn = unusedServer; 1578 break; 1579 } 1580 } 1581 } 1582 } 1583 cluster.doAssignRegion(regionInfo, sn); 1584 return sn; 1585 } 1586 1587 /** 1588 * Round robin a list of regions to a list of servers 1589 */ 1590 private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions, 1591 List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) { 1592 List<RegionInfo> unassignedRegions = new ArrayList<>(); 1593 int numServers = servers.size(); 1594 int numRegions = regions.size(); 1595 int max = (int) Math.ceil((float) numRegions / numServers); 1596 int serverIdx = 0; 1597 if (numServers > 1) { 1598 serverIdx = RANDOM.nextInt(numServers); 1599 } 1600 int regionIdx = 0; 1601 for (int j = 0; j < numServers; j++) { 1602 ServerName server = servers.get((j + serverIdx) % numServers); 1603 List<RegionInfo> serverRegions = new ArrayList<>(max); 1604 for (int i = regionIdx; i < numRegions; i += numServers) { 1605 RegionInfo region = regions.get(i % numRegions); 1606 if (cluster.wouldLowerAvailability(region, server)) { 1607 unassignedRegions.add(region); 1608 } else { 1609 serverRegions.add(region); 1610 cluster.doAssignRegion(region, server); 1611 } 1612 } 1613 assignments.put(server, serverRegions); 1614 regionIdx++; 1615 } 1616 1617 1618 List<RegionInfo> lastFewRegions = new ArrayList<>(); 1619 // assign the remaining by going through the list and try to assign to servers one-by-one 1620 serverIdx = RANDOM.nextInt(numServers); 1621 OUTER : for (RegionInfo region : unassignedRegions) { 1622 boolean assigned = false; 1623 INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one 1624 ServerName server = servers.get((j + serverIdx) % numServers); 1625 if (cluster.wouldLowerAvailability(region, server)) { 1626 continue INNER; 1627 } else { 1628 assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1629 cluster.doAssignRegion(region, server); 1630 serverIdx = (j + serverIdx + 1) % numServers; //remain from next server 1631 assigned = true; 1632 break; 1633 } 1634 } 1635 if (!assigned) { 1636 lastFewRegions.add(region); 1637 } 1638 } 1639 // just sprinkle the rest of the regions on random regionservers. The balanceCluster will 1640 // make it optimal later. we can end up with this if numReplicas > numServers. 1641 for (RegionInfo region : lastFewRegions) { 1642 int i = RANDOM.nextInt(numServers); 1643 ServerName server = servers.get(i); 1644 assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1645 cluster.doAssignRegion(region, server); 1646 } 1647 } 1648 1649 protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer( 1650 Collection<RegionInfo> regions) { 1651 if (this.services != null && this.services.getAssignmentManager() != null) { 1652 return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); 1653 } else { 1654 return new HashMap<>(); 1655 } 1656 } 1657 1658 private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad( 1659 Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) { 1660 Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>(); 1661 for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) { 1662 serverNameListMap.forEach((serverName, regionInfoList) -> { 1663 List<RegionInfo> regionInfos = 1664 returnMap.computeIfAbsent(serverName, k -> new ArrayList<>()); 1665 regionInfos.addAll(regionInfoList); 1666 }); 1667 } 1668 return returnMap; 1669 } 1670 1671 @Override 1672 public abstract List<RegionPlan> balanceTable(TableName tableName, 1673 Map<ServerName, List<RegionInfo>> loadOfOneTable); 1674 1675 @Override 1676 public List<RegionPlan> 1677 balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 1678 if (isByTable) { 1679 List<RegionPlan> result = new ArrayList<>(); 1680 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 1681 LOG.info("Start Generate Balance plan for table: " + tableName); 1682 List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable); 1683 if (partialPlans != null) { 1684 result.addAll(partialPlans); 1685 } 1686 }); 1687 return result; 1688 } else { 1689 LOG.info("Start Generate Balance plan for cluster."); 1690 return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable)); 1691 } 1692 } 1693 1694 @Override 1695 public void onConfigurationChange(Configuration conf) { 1696 } 1697}