001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.balancer; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.Deque; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableMap; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.function.Predicate; 038import java.util.stream.Collectors; 039 040import org.apache.commons.lang3.NotImplementedException; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.ClusterMetrics; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseIOException; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HDFSBlocksDistribution; 047import org.apache.hadoop.hbase.ServerMetrics; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionReplicaUtil; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.master.LoadBalancer; 054import org.apache.hadoop.hbase.master.MasterServices; 055import org.apache.hadoop.hbase.master.RackManager; 056import org.apache.hadoop.hbase.master.RegionPlan; 057import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 060import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 061import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 062import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * The base class for load balancers. It provides the the functions used to by 069 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions 070 * in the edge cases. It doesn't provide an implementation of the 071 * actual balancing algorithm. 072 * 073 */ 074@InterfaceAudience.Private 075public abstract class BaseLoadBalancer implements LoadBalancer { 076 protected static final int MIN_SERVER_BALANCE = 2; 077 private volatile boolean stopped = false; 078 079 private static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList(); 080 081 static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR 082 = load -> load.getRegionMetrics().isEmpty(); 083 084 protected RegionLocationFinder regionFinder; 085 protected boolean useRegionFinder; 086 087 private static class DefaultRackManager extends RackManager { 088 @Override 089 public String getRack(ServerName server) { 090 return UNKNOWN_RACK; 091 } 092 } 093 094 /** 095 * The constructor that uses the basic MetricsBalancer 096 */ 097 protected BaseLoadBalancer() { 098 metricsBalancer = new MetricsBalancer(); 099 createRegionFinder(); 100 } 101 102 /** 103 * This Constructor accepts an instance of MetricsBalancer, 104 * which will be used instead of creating a new one 105 */ 106 protected BaseLoadBalancer(MetricsBalancer metricsBalancer) { 107 this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer(); 108 createRegionFinder(); 109 } 110 111 private void createRegionFinder() { 112 useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true); 113 if (useRegionFinder) { 114 regionFinder = new RegionLocationFinder(); 115 } 116 } 117 118 /** 119 * An efficient array based implementation similar to ClusterState for keeping 120 * the status of the cluster in terms of region assignment and distribution. 121 * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of 122 * hundreds of thousands of hashmap manipulations are very costly, which is why this 123 * class uses mostly indexes and arrays. 124 * 125 * Cluster tracks a list of unassigned regions, region assignments, and the server 126 * topology in terms of server names, hostnames and racks. 127 */ 128 protected static class Cluster { 129 ServerName[] servers; 130 String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host 131 String[] racks; 132 boolean multiServersPerHost = false; // whether or not any host has more than one server 133 134 ArrayList<String> tables; 135 RegionInfo[] regions; 136 Deque<BalancerRegionLoad>[] regionLoads; 137 private RegionLocationFinder regionFinder; 138 139 int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality 140 141 int[] serverIndexToHostIndex; //serverIndex -> host index 142 int[] serverIndexToRackIndex; //serverIndex -> rack index 143 144 int[][] regionsPerServer; //serverIndex -> region list 145 int[] serverIndexToRegionsOffset; //serverIndex -> offset of region list 146 int[][] regionsPerHost; //hostIndex -> list of regions 147 int[][] regionsPerRack; //rackIndex -> region list 148 int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index 149 int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index 150 int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index 151 152 int[][] serversPerHost; //hostIndex -> list of server indexes 153 int[][] serversPerRack; //rackIndex -> list of server indexes 154 int[] regionIndexToServerIndex; //regionIndex -> serverIndex 155 int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) 156 int[] regionIndexToTableIndex; //regionIndex -> tableIndex 157 int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions 158 int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS 159 int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary 160 boolean hasRegionReplicas = false; //whether there is regions with replicas 161 162 Integer[] serverIndicesSortedByRegionCount; 163 Integer[] serverIndicesSortedByLocality; 164 165 Map<String, Integer> serversToIndex; 166 Map<String, Integer> hostsToIndex; 167 Map<String, Integer> racksToIndex; 168 Map<String, Integer> tablesToIndex; 169 Map<RegionInfo, Integer> regionsToIndex; 170 float[] localityPerServer; 171 172 int numServers; 173 int numHosts; 174 int numRacks; 175 int numTables; 176 int numRegions; 177 178 int numMovedRegions = 0; //num moved regions from the initial configuration 179 Map<ServerName, List<RegionInfo>> clusterState; 180 181 protected final RackManager rackManager; 182 // Maps region -> rackIndex -> locality of region on rack 183 private float[][] rackLocalities; 184 // Maps localityType -> region -> [server|rack]Index with highest locality 185 private int[][] regionsToMostLocalEntities; 186 187 protected Cluster( 188 Map<ServerName, List<RegionInfo>> clusterState, 189 Map<String, Deque<BalancerRegionLoad>> loads, 190 RegionLocationFinder regionFinder, 191 RackManager rackManager) { 192 this(null, clusterState, loads, regionFinder, rackManager); 193 } 194 195 @SuppressWarnings("unchecked") 196 protected Cluster( 197 Collection<RegionInfo> unassignedRegions, 198 Map<ServerName, List<RegionInfo>> clusterState, 199 Map<String, Deque<BalancerRegionLoad>> loads, 200 RegionLocationFinder regionFinder, 201 RackManager rackManager) { 202 203 if (unassignedRegions == null) { 204 unassignedRegions = EMPTY_REGION_LIST; 205 } 206 207 serversToIndex = new HashMap<>(); 208 hostsToIndex = new HashMap<>(); 209 racksToIndex = new HashMap<>(); 210 tablesToIndex = new HashMap<>(); 211 212 //TODO: We should get the list of tables from master 213 tables = new ArrayList<>(); 214 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 215 216 numRegions = 0; 217 218 List<List<Integer>> serversPerHostList = new ArrayList<>(); 219 List<List<Integer>> serversPerRackList = new ArrayList<>(); 220 this.clusterState = clusterState; 221 this.regionFinder = regionFinder; 222 223 // Use servername and port as there can be dead servers in this list. We want everything with 224 // a matching hostname and port to have the same index. 225 for (ServerName sn : clusterState.keySet()) { 226 if (sn == null) { 227 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + 228 "skipping; unassigned regions?"); 229 if (LOG.isTraceEnabled()) { 230 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 231 } 232 continue; 233 } 234 if (serversToIndex.get(sn.getAddress().toString()) == null) { 235 serversToIndex.put(sn.getHostAndPort(), numServers++); 236 } 237 if (!hostsToIndex.containsKey(sn.getHostname())) { 238 hostsToIndex.put(sn.getHostname(), numHosts++); 239 serversPerHostList.add(new ArrayList<>(1)); 240 } 241 242 int serverIndex = serversToIndex.get(sn.getHostAndPort()); 243 int hostIndex = hostsToIndex.get(sn.getHostname()); 244 serversPerHostList.get(hostIndex).add(serverIndex); 245 246 String rack = this.rackManager.getRack(sn); 247 if (!racksToIndex.containsKey(rack)) { 248 racksToIndex.put(rack, numRacks++); 249 serversPerRackList.add(new ArrayList<>()); 250 } 251 int rackIndex = racksToIndex.get(rack); 252 serversPerRackList.get(rackIndex).add(serverIndex); 253 } 254 255 // Count how many regions there are. 256 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 257 numRegions += entry.getValue().size(); 258 } 259 numRegions += unassignedRegions.size(); 260 261 regionsToIndex = new HashMap<>(numRegions); 262 servers = new ServerName[numServers]; 263 serversPerHost = new int[numHosts][]; 264 serversPerRack = new int[numRacks][]; 265 regions = new RegionInfo[numRegions]; 266 regionIndexToServerIndex = new int[numRegions]; 267 initialRegionIndexToServerIndex = new int[numRegions]; 268 regionIndexToTableIndex = new int[numRegions]; 269 regionIndexToPrimaryIndex = new int[numRegions]; 270 regionLoads = new Deque[numRegions]; 271 272 regionLocations = new int[numRegions][]; 273 serverIndicesSortedByRegionCount = new Integer[numServers]; 274 serverIndicesSortedByLocality = new Integer[numServers]; 275 localityPerServer = new float[numServers]; 276 277 serverIndexToHostIndex = new int[numServers]; 278 serverIndexToRackIndex = new int[numServers]; 279 regionsPerServer = new int[numServers][]; 280 serverIndexToRegionsOffset = new int[numServers]; 281 regionsPerHost = new int[numHosts][]; 282 regionsPerRack = new int[numRacks][]; 283 primariesOfRegionsPerServer = new int[numServers][]; 284 primariesOfRegionsPerHost = new int[numHosts][]; 285 primariesOfRegionsPerRack = new int[numRacks][]; 286 287 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; 288 289 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 290 if (entry.getKey() == null) { 291 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 292 continue; 293 } 294 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 295 296 // keep the servername if this is the first server name for this hostname 297 // or this servername has the newest startcode. 298 if (servers[serverIndex] == null || 299 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { 300 servers[serverIndex] = entry.getKey(); 301 } 302 303 if (regionsPerServer[serverIndex] != null) { 304 // there is another server with the same hostAndPort in ClusterState. 305 // allocate the array for the total size 306 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 307 } else { 308 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 309 } 310 primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; 311 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 312 serverIndicesSortedByLocality[serverIndex] = serverIndex; 313 } 314 315 hosts = new String[numHosts]; 316 for (Entry<String, Integer> entry : hostsToIndex.entrySet()) { 317 hosts[entry.getValue()] = entry.getKey(); 318 } 319 racks = new String[numRacks]; 320 for (Entry<String, Integer> entry : racksToIndex.entrySet()) { 321 racks[entry.getValue()] = entry.getKey(); 322 } 323 324 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 325 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 326 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 327 328 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 329 serverIndexToHostIndex[serverIndex] = hostIndex; 330 331 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 332 serverIndexToRackIndex[serverIndex] = rackIndex; 333 334 for (RegionInfo region : entry.getValue()) { 335 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 336 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 337 regionIndex++; 338 } 339 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 340 } 341 342 for (RegionInfo region : unassignedRegions) { 343 registerRegion(region, regionIndex, -1, loads, regionFinder); 344 regionIndex++; 345 } 346 347 for (int i = 0; i < serversPerHostList.size(); i++) { 348 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 349 for (int j = 0; j < serversPerHost[i].length; j++) { 350 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 351 } 352 if (serversPerHost[i].length > 1) { 353 multiServersPerHost = true; 354 } 355 } 356 357 for (int i = 0; i < serversPerRackList.size(); i++) { 358 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 359 for (int j = 0; j < serversPerRack[i].length; j++) { 360 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 361 } 362 } 363 364 numTables = tables.size(); 365 numRegionsPerServerPerTable = new int[numServers][numTables]; 366 367 for (int i = 0; i < numServers; i++) { 368 for (int j = 0; j < numTables; j++) { 369 numRegionsPerServerPerTable[i][j] = 0; 370 } 371 } 372 373 for (int i=0; i < regionIndexToServerIndex.length; i++) { 374 if (regionIndexToServerIndex[i] >= 0) { 375 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; 376 } 377 } 378 379 numMaxRegionsPerTable = new int[numTables]; 380 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 381 for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { 382 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 383 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 384 } 385 } 386 } 387 388 for (int i = 0; i < regions.length; i ++) { 389 RegionInfo info = regions[i]; 390 if (RegionReplicaUtil.isDefaultReplica(info)) { 391 regionIndexToPrimaryIndex[i] = i; 392 } else { 393 hasRegionReplicas = true; 394 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 395 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 396 } 397 } 398 399 for (int i = 0; i < regionsPerServer.length; i++) { 400 primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; 401 for (int j = 0; j < regionsPerServer[i].length; j++) { 402 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 403 primariesOfRegionsPerServer[i][j] = primaryIndex; 404 } 405 // sort the regions by primaries. 406 Arrays.sort(primariesOfRegionsPerServer[i]); 407 } 408 409 // compute regionsPerHost 410 if (multiServersPerHost) { 411 for (int i = 0 ; i < serversPerHost.length; i++) { 412 int numRegionsPerHost = 0; 413 for (int j = 0; j < serversPerHost[i].length; j++) { 414 numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; 415 } 416 regionsPerHost[i] = new int[numRegionsPerHost]; 417 primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; 418 } 419 for (int i = 0 ; i < serversPerHost.length; i++) { 420 int numRegionPerHostIndex = 0; 421 for (int j = 0; j < serversPerHost[i].length; j++) { 422 for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { 423 int region = regionsPerServer[serversPerHost[i][j]][k]; 424 regionsPerHost[i][numRegionPerHostIndex] = region; 425 int primaryIndex = regionIndexToPrimaryIndex[region]; 426 primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; 427 numRegionPerHostIndex++; 428 } 429 } 430 // sort the regions by primaries. 431 Arrays.sort(primariesOfRegionsPerHost[i]); 432 } 433 } 434 435 // compute regionsPerRack 436 if (numRacks > 1) { 437 for (int i = 0 ; i < serversPerRack.length; i++) { 438 int numRegionsPerRack = 0; 439 for (int j = 0; j < serversPerRack[i].length; j++) { 440 numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; 441 } 442 regionsPerRack[i] = new int[numRegionsPerRack]; 443 primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; 444 } 445 446 for (int i = 0 ; i < serversPerRack.length; i++) { 447 int numRegionPerRackIndex = 0; 448 for (int j = 0; j < serversPerRack[i].length; j++) { 449 for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { 450 int region = regionsPerServer[serversPerRack[i][j]][k]; 451 regionsPerRack[i][numRegionPerRackIndex] = region; 452 int primaryIndex = regionIndexToPrimaryIndex[region]; 453 primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; 454 numRegionPerRackIndex++; 455 } 456 } 457 // sort the regions by primaries. 458 Arrays.sort(primariesOfRegionsPerRack[i]); 459 } 460 } 461 } 462 463 /** Helper for Cluster constructor to handle a region */ 464 private void registerRegion(RegionInfo region, int regionIndex, 465 int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, 466 RegionLocationFinder regionFinder) { 467 String tableName = region.getTable().getNameAsString(); 468 if (!tablesToIndex.containsKey(tableName)) { 469 tables.add(tableName); 470 tablesToIndex.put(tableName, tablesToIndex.size()); 471 } 472 int tableIndex = tablesToIndex.get(tableName); 473 474 regionsToIndex.put(region, regionIndex); 475 regions[regionIndex] = region; 476 regionIndexToServerIndex[regionIndex] = serverIndex; 477 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 478 regionIndexToTableIndex[regionIndex] = tableIndex; 479 480 // region load 481 if (loads != null) { 482 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 483 // That could have failed if the RegionLoad is using the other regionName 484 if (rl == null) { 485 // Try getting the region load using encoded name. 486 rl = loads.get(region.getEncodedName()); 487 } 488 regionLoads[regionIndex] = rl; 489 } 490 491 if (regionFinder != null) { 492 // region location 493 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 494 regionLocations[regionIndex] = new int[loc.size()]; 495 for (int i = 0; i < loc.size(); i++) { 496 regionLocations[regionIndex][i] = loc.get(i) == null ? -1 497 : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 498 : serversToIndex.get(loc.get(i).getHostAndPort())); 499 } 500 } 501 } 502 503 /** 504 * Returns true iff a given server has less regions than the balanced amount 505 */ 506 public boolean serverHasTooFewRegions(int server) { 507 int minLoad = this.numRegions / numServers; 508 int numRegions = getNumRegions(server); 509 return numRegions < minLoad; 510 } 511 512 /** 513 * Retrieves and lazily initializes a field storing the locality of 514 * every region/server combination 515 */ 516 public float[][] getOrComputeRackLocalities() { 517 if (rackLocalities == null || regionsToMostLocalEntities == null) { 518 computeCachedLocalities(); 519 } 520 return rackLocalities; 521 } 522 523 /** 524 * Lazily initializes and retrieves a mapping of region -> server for which region has 525 * the highest the locality 526 */ 527 public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { 528 if (rackLocalities == null || regionsToMostLocalEntities == null) { 529 computeCachedLocalities(); 530 } 531 return regionsToMostLocalEntities[type.ordinal()]; 532 } 533 534 /** 535 * Looks up locality from cache of localities. Will create cache if it does 536 * not already exist. 537 */ 538 public float getOrComputeLocality(int region, int entity, LocalityType type) { 539 switch (type) { 540 case SERVER: 541 return getLocalityOfRegion(region, entity); 542 case RACK: 543 return getOrComputeRackLocalities()[region][entity]; 544 default: 545 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 546 } 547 } 548 549 /** 550 * Returns locality weighted by region size in MB. Will create locality cache 551 * if it does not already exist. 552 */ 553 public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { 554 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 555 } 556 557 /** 558 * Returns the size in MB from the most recent RegionLoad for region 559 */ 560 public int getRegionSizeMB(int region) { 561 Deque<BalancerRegionLoad> load = regionLoads[region]; 562 // This means regions have no actual data on disk 563 if (load == null) { 564 return 0; 565 } 566 return regionLoads[region].getLast().getStorefileSizeMB(); 567 } 568 569 /** 570 * Computes and caches the locality for each region/rack combinations, 571 * as well as storing a mapping of region -> server and region -> rack such that server 572 * and rack have the highest locality for region 573 */ 574 private void computeCachedLocalities() { 575 rackLocalities = new float[numRegions][numRacks]; 576 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 577 578 // Compute localities and find most local server per region 579 for (int region = 0; region < numRegions; region++) { 580 int serverWithBestLocality = 0; 581 float bestLocalityForRegion = 0; 582 for (int server = 0; server < numServers; server++) { 583 // Aggregate per-rack locality 584 float locality = getLocalityOfRegion(region, server); 585 int rack = serverIndexToRackIndex[server]; 586 int numServersInRack = serversPerRack[rack].length; 587 rackLocalities[region][rack] += locality / numServersInRack; 588 589 if (locality > bestLocalityForRegion) { 590 serverWithBestLocality = server; 591 bestLocalityForRegion = locality; 592 } 593 } 594 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 595 596 // Find most local rack per region 597 int rackWithBestLocality = 0; 598 float bestRackLocalityForRegion = 0.0f; 599 for (int rack = 0; rack < numRacks; rack++) { 600 float rackLocality = rackLocalities[region][rack]; 601 if (rackLocality > bestRackLocalityForRegion) { 602 bestRackLocalityForRegion = rackLocality; 603 rackWithBestLocality = rack; 604 } 605 } 606 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 607 } 608 609 } 610 611 /** 612 * Maps region index to rack index 613 */ 614 public int getRackForRegion(int region) { 615 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 616 } 617 618 enum LocalityType { 619 SERVER, 620 RACK 621 } 622 623 /** An action to move or swap a region */ 624 public static class Action { 625 public enum Type { 626 ASSIGN_REGION, 627 MOVE_REGION, 628 SWAP_REGIONS, 629 NULL, 630 } 631 632 public Type type; 633 public Action (Type type) {this.type = type;} 634 /** Returns an Action which would undo this action */ 635 public Action undoAction() { return this; } 636 @Override 637 public String toString() { return type + ":";} 638 } 639 640 public static class AssignRegionAction extends Action { 641 public int region; 642 public int server; 643 public AssignRegionAction(int region, int server) { 644 super(Type.ASSIGN_REGION); 645 this.region = region; 646 this.server = server; 647 } 648 @Override 649 public Action undoAction() { 650 // TODO implement this. This action is not being used by the StochasticLB for now 651 // in case it uses it, we should implement this function. 652 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 653 } 654 @Override 655 public String toString() { 656 return type + ": " + region + ":" + server; 657 } 658 } 659 660 public static class MoveRegionAction extends Action { 661 public int region; 662 public int fromServer; 663 public int toServer; 664 665 public MoveRegionAction(int region, int fromServer, int toServer) { 666 super(Type.MOVE_REGION); 667 this.fromServer = fromServer; 668 this.region = region; 669 this.toServer = toServer; 670 } 671 @Override 672 public Action undoAction() { 673 return new MoveRegionAction (region, toServer, fromServer); 674 } 675 @Override 676 public String toString() { 677 return type + ": " + region + ":" + fromServer + " -> " + toServer; 678 } 679 } 680 681 public static class SwapRegionsAction extends Action { 682 public int fromServer; 683 public int fromRegion; 684 public int toServer; 685 public int toRegion; 686 public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { 687 super(Type.SWAP_REGIONS); 688 this.fromServer = fromServer; 689 this.fromRegion = fromRegion; 690 this.toServer = toServer; 691 this.toRegion = toRegion; 692 } 693 @Override 694 public Action undoAction() { 695 return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); 696 } 697 @Override 698 public String toString() { 699 return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; 700 } 701 } 702 703 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", 704 justification="Mistake. Too disruptive to change now") 705 public static final Action NullAction = new Action(Type.NULL); 706 707 public void doAction(Action action) { 708 switch (action.type) { 709 case NULL: break; 710 case ASSIGN_REGION: 711 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 712 assert action instanceof AssignRegionAction: action.getClass(); 713 AssignRegionAction ar = (AssignRegionAction) action; 714 regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); 715 regionMoved(ar.region, -1, ar.server); 716 break; 717 case MOVE_REGION: 718 assert action instanceof MoveRegionAction: action.getClass(); 719 MoveRegionAction mra = (MoveRegionAction) action; 720 regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); 721 regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); 722 regionMoved(mra.region, mra.fromServer, mra.toServer); 723 break; 724 case SWAP_REGIONS: 725 assert action instanceof SwapRegionsAction: action.getClass(); 726 SwapRegionsAction a = (SwapRegionsAction) action; 727 regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); 728 regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); 729 regionMoved(a.fromRegion, a.fromServer, a.toServer); 730 regionMoved(a.toRegion, a.toServer, a.fromServer); 731 break; 732 default: 733 throw new RuntimeException("Uknown action:" + action.type); 734 } 735 } 736 737 /** 738 * Return true if the placement of region on server would lower the availability 739 * of the region in question 740 * @return true or false 741 */ 742 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 743 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 744 return false; // safeguard against race between cluster.servers and servers from LB method args 745 } 746 int server = serversToIndex.get(serverName.getHostAndPort()); 747 int region = regionsToIndex.get(regionInfo); 748 749 // Region replicas for same region should better assign to different servers 750 for (int i : regionsPerServer[server]) { 751 RegionInfo otherRegionInfo = regions[i]; 752 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 753 return true; 754 } 755 } 756 757 int primary = regionIndexToPrimaryIndex[region]; 758 if (primary == -1) { 759 return false; 760 } 761 // there is a subset relation for server < host < rack 762 // check server first 763 if (contains(primariesOfRegionsPerServer[server], primary)) { 764 // check for whether there are other servers that we can place this region 765 for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { 766 if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { 767 return true; // meaning there is a better server 768 } 769 } 770 return false; // there is not a better server to place this 771 } 772 773 // check host 774 if (multiServersPerHost) { 775 // these arrays would only be allocated if we have more than one server per host 776 int host = serverIndexToHostIndex[server]; 777 if (contains(primariesOfRegionsPerHost[host], primary)) { 778 // check for whether there are other hosts that we can place this region 779 for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { 780 if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { 781 return true; // meaning there is a better host 782 } 783 } 784 return false; // there is not a better host to place this 785 } 786 } 787 788 // check rack 789 if (numRacks > 1) { 790 int rack = serverIndexToRackIndex[server]; 791 if (contains(primariesOfRegionsPerRack[rack], primary)) { 792 // check for whether there are other racks that we can place this region 793 for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { 794 if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { 795 return true; // meaning there is a better rack 796 } 797 } 798 return false; // there is not a better rack to place this 799 } 800 } 801 802 return false; 803 } 804 805 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 806 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 807 return; 808 } 809 int server = serversToIndex.get(serverName.getHostAndPort()); 810 int region = regionsToIndex.get(regionInfo); 811 doAction(new AssignRegionAction(region, server)); 812 } 813 814 void regionMoved(int region, int oldServer, int newServer) { 815 regionIndexToServerIndex[region] = newServer; 816 if (initialRegionIndexToServerIndex[region] == newServer) { 817 numMovedRegions--; //region moved back to original location 818 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 819 numMovedRegions++; //region moved from original location 820 } 821 int tableIndex = regionIndexToTableIndex[region]; 822 if (oldServer >= 0) { 823 numRegionsPerServerPerTable[oldServer][tableIndex]--; 824 } 825 numRegionsPerServerPerTable[newServer][tableIndex]++; 826 827 //check whether this caused maxRegionsPerTable in the new Server to be updated 828 if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { 829 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; 830 } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) 831 == numMaxRegionsPerTable[tableIndex]) { 832 //recompute maxRegionsPerTable since the previous value was coming from the old server 833 numMaxRegionsPerTable[tableIndex] = 0; 834 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 835 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 836 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 837 } 838 } 839 } 840 841 // update for servers 842 int primary = regionIndexToPrimaryIndex[region]; 843 if (oldServer >= 0) { 844 primariesOfRegionsPerServer[oldServer] = removeRegion( 845 primariesOfRegionsPerServer[oldServer], primary); 846 } 847 primariesOfRegionsPerServer[newServer] = addRegionSorted( 848 primariesOfRegionsPerServer[newServer], primary); 849 850 // update for hosts 851 if (multiServersPerHost) { 852 int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; 853 int newHost = serverIndexToHostIndex[newServer]; 854 if (newHost != oldHost) { 855 regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); 856 primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); 857 if (oldHost >= 0) { 858 regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); 859 primariesOfRegionsPerHost[oldHost] = removeRegion( 860 primariesOfRegionsPerHost[oldHost], primary); // will still be sorted 861 } 862 } 863 } 864 865 // update for racks 866 if (numRacks > 1) { 867 int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; 868 int newRack = serverIndexToRackIndex[newServer]; 869 if (newRack != oldRack) { 870 regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); 871 primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); 872 if (oldRack >= 0) { 873 regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); 874 primariesOfRegionsPerRack[oldRack] = removeRegion( 875 primariesOfRegionsPerRack[oldRack], primary); // will still be sorted 876 } 877 } 878 } 879 } 880 881 int[] removeRegion(int[] regions, int regionIndex) { 882 //TODO: this maybe costly. Consider using linked lists 883 int[] newRegions = new int[regions.length - 1]; 884 int i = 0; 885 for (i = 0; i < regions.length; i++) { 886 if (regions[i] == regionIndex) { 887 break; 888 } 889 newRegions[i] = regions[i]; 890 } 891 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); 892 return newRegions; 893 } 894 895 int[] addRegion(int[] regions, int regionIndex) { 896 int[] newRegions = new int[regions.length + 1]; 897 System.arraycopy(regions, 0, newRegions, 0, regions.length); 898 newRegions[newRegions.length - 1] = regionIndex; 899 return newRegions; 900 } 901 902 int[] addRegionSorted(int[] regions, int regionIndex) { 903 int[] newRegions = new int[regions.length + 1]; 904 int i = 0; 905 for (i = 0; i < regions.length; i++) { // find the index to insert 906 if (regions[i] > regionIndex) { 907 break; 908 } 909 } 910 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 911 System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half 912 newRegions[i] = regionIndex; 913 914 return newRegions; 915 } 916 917 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 918 int i = 0; 919 for (i = 0; i < regions.length; i++) { 920 if (regions[i] == regionIndex) { 921 regions[i] = newRegionIndex; 922 break; 923 } 924 } 925 return regions; 926 } 927 928 void sortServersByRegionCount() { 929 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 930 } 931 932 int getNumRegions(int server) { 933 return regionsPerServer[server].length; 934 } 935 936 boolean contains(int[] arr, int val) { 937 return Arrays.binarySearch(arr, val) >= 0; 938 } 939 940 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 941 942 int getLowestLocalityRegionOnServer(int serverIndex) { 943 if (regionFinder != null) { 944 float lowestLocality = 1.0f; 945 int lowestLocalityRegionIndex = -1; 946 if (regionsPerServer[serverIndex].length == 0) { 947 // No regions on that region server 948 return -1; 949 } 950 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 951 int regionIndex = regionsPerServer[serverIndex][j]; 952 HDFSBlocksDistribution distribution = regionFinder 953 .getBlockDistribution(regions[regionIndex]); 954 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 955 // skip empty region 956 if (distribution.getUniqueBlocksTotalWeight() == 0) { 957 continue; 958 } 959 if (locality < lowestLocality) { 960 lowestLocality = locality; 961 lowestLocalityRegionIndex = j; 962 } 963 } 964 if (lowestLocalityRegionIndex == -1) { 965 return -1; 966 } 967 if (LOG.isTraceEnabled()) { 968 LOG.trace("Lowest locality region is " 969 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 970 .getRegionNameAsString() + " with locality " + lowestLocality 971 + " and its region server contains " + regionsPerServer[serverIndex].length 972 + " regions"); 973 } 974 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 975 } else { 976 return -1; 977 } 978 } 979 980 float getLocalityOfRegion(int region, int server) { 981 if (regionFinder != null) { 982 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 983 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 984 } else { 985 return 0f; 986 } 987 } 988 989 @VisibleForTesting 990 protected void setNumRegions(int numRegions) { 991 this.numRegions = numRegions; 992 } 993 994 @VisibleForTesting 995 protected void setNumMovedRegions(int numMovedRegions) { 996 this.numMovedRegions = numMovedRegions; 997 } 998 999 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", 1000 justification="Not important but should be fixed") 1001 @Override 1002 public String toString() { 1003 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1004 for(ServerName sn:servers) { 1005 desc.append(sn.getHostAndPort()).append(", "); 1006 } 1007 desc.append("], serverIndicesSortedByRegionCount=") 1008 .append(Arrays.toString(serverIndicesSortedByRegionCount)) 1009 .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); 1010 1011 desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) 1012 .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1013 .append(", numTables=").append(numTables).append(", numMovedRegions=") 1014 .append(numMovedRegions).append('}'); 1015 return desc.toString(); 1016 } 1017 } 1018 1019 // slop for regions 1020 protected float slop; 1021 // overallSlop to control simpleLoadBalancer's cluster level threshold 1022 protected float overallSlop; 1023 protected Configuration config = HBaseConfiguration.create(); 1024 protected RackManager rackManager; 1025 private static final Random RANDOM = new Random(System.currentTimeMillis()); 1026 private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); 1027 protected MetricsBalancer metricsBalancer = null; 1028 protected ClusterMetrics clusterStatus = null; 1029 protected ServerName masterServerName; 1030 protected MasterServices services; 1031 protected boolean onlySystemTablesOnMaster; 1032 protected boolean maintenanceMode; 1033 1034 @Override 1035 public void setConf(Configuration conf) { 1036 this.config = conf; 1037 setSlop(conf); 1038 if (slop < 0) slop = 0; 1039 else if (slop > 1) slop = 1; 1040 1041 if (overallSlop < 0) overallSlop = 0; 1042 else if (overallSlop > 1) overallSlop = 1; 1043 1044 this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config); 1045 1046 this.rackManager = new RackManager(getConf()); 1047 if (useRegionFinder) { 1048 regionFinder.setConf(conf); 1049 } 1050 // Print out base configs. Don't print overallSlop since it for simple balancer exclusively. 1051 LOG.info("slop={}, systemTablesOnMaster={}", 1052 this.slop, this.onlySystemTablesOnMaster); 1053 } 1054 1055 protected void setSlop(Configuration conf) { 1056 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); 1057 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 1058 } 1059 1060 /** 1061 * Check if a region belongs to some system table. 1062 * If so, the primary replica may be expected to be put on the master regionserver. 1063 */ 1064 public boolean shouldBeOnMaster(RegionInfo region) { 1065 return (this.maintenanceMode || this.onlySystemTablesOnMaster) 1066 && region.getTable().isSystemTable(); 1067 } 1068 1069 /** 1070 * Balance the regions that should be on master regionserver. 1071 */ 1072 protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) { 1073 if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null; 1074 List<RegionPlan> plans = null; 1075 List<RegionInfo> regions = clusterMap.get(masterServerName); 1076 if (regions != null) { 1077 Iterator<ServerName> keyIt = null; 1078 for (RegionInfo region: regions) { 1079 if (shouldBeOnMaster(region)) continue; 1080 1081 // Find a non-master regionserver to host the region 1082 if (keyIt == null || !keyIt.hasNext()) { 1083 keyIt = clusterMap.keySet().iterator(); 1084 } 1085 ServerName dest = keyIt.next(); 1086 if (masterServerName.equals(dest)) { 1087 if (!keyIt.hasNext()) { 1088 keyIt = clusterMap.keySet().iterator(); 1089 } 1090 dest = keyIt.next(); 1091 } 1092 1093 // Move this region away from the master regionserver 1094 RegionPlan plan = new RegionPlan(region, masterServerName, dest); 1095 if (plans == null) { 1096 plans = new ArrayList<>(); 1097 } 1098 plans.add(plan); 1099 } 1100 } 1101 for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) { 1102 if (masterServerName.equals(server.getKey())) continue; 1103 for (RegionInfo region: server.getValue()) { 1104 if (!shouldBeOnMaster(region)) continue; 1105 1106 // Move this region to the master regionserver 1107 RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName); 1108 if (plans == null) { 1109 plans = new ArrayList<>(); 1110 } 1111 plans.add(plan); 1112 } 1113 } 1114 return plans; 1115 } 1116 1117 /** 1118 * If master is configured to carry system tables only, in here is 1119 * where we figure what to assign it. 1120 */ 1121 protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions( 1122 Collection<RegionInfo> regions, List<ServerName> servers) { 1123 if (servers == null || regions == null || regions.isEmpty()) { 1124 return null; 1125 } 1126 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 1127 if (this.maintenanceMode || this.onlySystemTablesOnMaster) { 1128 if (masterServerName != null && servers.contains(masterServerName)) { 1129 assignments.put(masterServerName, new ArrayList<>()); 1130 for (RegionInfo region : regions) { 1131 if (shouldBeOnMaster(region)) { 1132 assignments.get(masterServerName).add(region); 1133 } 1134 } 1135 } 1136 } 1137 return assignments; 1138 } 1139 1140 @Override 1141 public Configuration getConf() { 1142 return this.config; 1143 } 1144 1145 @Override 1146 public synchronized void setClusterMetrics(ClusterMetrics st) { 1147 this.clusterStatus = st; 1148 if (useRegionFinder) { 1149 regionFinder.setClusterMetrics(st); 1150 } 1151 } 1152 1153 @Override 1154 public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){ 1155 1156 } 1157 1158 @Override 1159 public void setMasterServices(MasterServices masterServices) { 1160 masterServerName = masterServices.getServerName(); 1161 this.services = masterServices; 1162 if (useRegionFinder) { 1163 this.regionFinder.setServices(masterServices); 1164 } 1165 if (this.services.isInMaintenanceMode()) { 1166 this.maintenanceMode = true; 1167 } 1168 } 1169 1170 @Override 1171 public void postMasterStartupInitialize() { 1172 if (services != null && regionFinder != null) { 1173 try { 1174 Set<RegionInfo> regions = 1175 services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet(); 1176 regionFinder.refreshAndWait(regions); 1177 } catch (Exception e) { 1178 LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e); 1179 } 1180 } 1181 } 1182 1183 public void setRackManager(RackManager rackManager) { 1184 this.rackManager = rackManager; 1185 } 1186 1187 protected boolean needsBalance(Cluster c) { 1188 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 1189 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 1190 if (LOG.isDebugEnabled()) { 1191 LOG.debug("Not running balancer because only " + cs.getNumServers() 1192 + " active regionserver(s)"); 1193 } 1194 return false; 1195 } 1196 if(areSomeRegionReplicasColocated(c)) return true; 1197 // Check if we even need to do any load balancing 1198 // HBASE-3681 check sloppiness first 1199 float average = cs.getLoadAverage(); // for logging 1200 int floor = (int) Math.floor(average * (1 - slop)); 1201 int ceiling = (int) Math.ceil(average * (1 + slop)); 1202 if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { 1203 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 1204 if (LOG.isTraceEnabled()) { 1205 // If nothing to balance, then don't say anything unless trace-level logging. 1206 LOG.trace("Skipping load balancing because balanced cluster; " + 1207 "servers=" + cs.getNumServers() + 1208 " regions=" + cs.getNumRegions() + " average=" + average + 1209 " mostloaded=" + serversByLoad.lastKey().getLoad() + 1210 " leastloaded=" + serversByLoad.firstKey().getLoad()); 1211 } 1212 return false; 1213 } 1214 return true; 1215 } 1216 1217 /** 1218 * Subclasses should implement this to return true if the cluster has nodes that hosts 1219 * multiple replicas for the same region, or, if there are multiple racks and the same 1220 * rack hosts replicas of the same region 1221 * @param c Cluster information 1222 * @return whether region replicas are currently co-located 1223 */ 1224 protected boolean areSomeRegionReplicasColocated(Cluster c) { 1225 return false; 1226 } 1227 1228 /** 1229 * Generates a bulk assignment plan to be used on cluster startup using a 1230 * simple round-robin assignment. 1231 * <p> 1232 * Takes a list of all the regions and all the servers in the cluster and 1233 * returns a map of each server to the regions that it should be assigned. 1234 * <p> 1235 * Currently implemented as a round-robin assignment. Same invariant as load 1236 * balancing, all servers holding floor(avg) or ceiling(avg). 1237 * 1238 * TODO: Use block locations from HDFS to place regions with their blocks 1239 * 1240 * @param regions all regions 1241 * @param servers all servers 1242 * @return map of server to the regions it should take, or null if no 1243 * assignment is possible (ie. no regions or no servers) 1244 */ 1245 @Override 1246 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 1247 List<ServerName> servers) throws HBaseIOException { 1248 metricsBalancer.incrMiscInvocations(); 1249 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers); 1250 if (assignments != null && !assignments.isEmpty()) { 1251 servers = new ArrayList<>(servers); 1252 // Guarantee not to put other regions on master 1253 servers.remove(masterServerName); 1254 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1255 if (!masterRegions.isEmpty()) { 1256 regions = new ArrayList<>(regions); 1257 regions.removeAll(masterRegions); 1258 } 1259 } 1260 if (this.maintenanceMode || regions == null || regions.isEmpty()) { 1261 return assignments; 1262 } 1263 1264 int numServers = servers == null ? 0 : servers.size(); 1265 if (numServers == 0) { 1266 LOG.warn("Wanted to do round robin assignment but no servers to assign to"); 1267 return null; 1268 } 1269 1270 // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the 1271 // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate 1272 // generator for AssignRegionAction. The LB will ensure the regions are mostly local 1273 // and balanced. This should also run fast with fewer number of iterations. 1274 1275 if (numServers == 1) { // Only one server, nothing fancy we can do here 1276 ServerName server = servers.get(0); 1277 assignments.put(server, new ArrayList<>(regions)); 1278 return assignments; 1279 } 1280 1281 Cluster cluster = createCluster(servers, regions); 1282 roundRobinAssignment(cluster, regions, servers, assignments); 1283 return assignments; 1284 } 1285 1286 protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) 1287 throws HBaseIOException { 1288 boolean hasRegionReplica = false; 1289 try { 1290 if (services != null && services.getTableDescriptors() != null) { 1291 Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll(); 1292 for (RegionInfo regionInfo : regions) { 1293 TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString()); 1294 if (td != null && td.getRegionReplication() > 1) { 1295 hasRegionReplica = true; 1296 break; 1297 } 1298 } 1299 } 1300 } catch (IOException ioe) { 1301 throw new HBaseIOException(ioe); 1302 } 1303 1304 // Get the snapshot of the current assignments for the regions in question, and then create 1305 // a cluster out of it. Note that we might have replicas already assigned to some servers 1306 // earlier. So we want to get the snapshot to see those assignments, but this will only contain 1307 // replicas of the regions that are passed (for performance). 1308 Map<ServerName, List<RegionInfo>> clusterState = null; 1309 if (!hasRegionReplica) { 1310 clusterState = getRegionAssignmentsByServer(regions); 1311 } else { 1312 // for the case where we have region replica it is better we get the entire cluster's snapshot 1313 clusterState = getRegionAssignmentsByServer(null); 1314 } 1315 1316 for (ServerName server : servers) { 1317 if (!clusterState.containsKey(server)) { 1318 clusterState.put(server, EMPTY_REGION_LIST); 1319 } 1320 } 1321 return new Cluster(regions, clusterState, null, this.regionFinder, 1322 rackManager); 1323 } 1324 1325 private List<ServerName> findIdleServers(List<ServerName> servers) { 1326 return this.services.getServerManager() 1327 .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR); 1328 } 1329 1330 /** 1331 * Used to assign a single region to a random server. 1332 */ 1333 @Override 1334 public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers) 1335 throws HBaseIOException { 1336 metricsBalancer.incrMiscInvocations(); 1337 if (servers != null && servers.contains(masterServerName)) { 1338 if (shouldBeOnMaster(regionInfo)) { 1339 return masterServerName; 1340 } 1341 if (!LoadBalancer.isTablesOnMaster(getConf())) { 1342 // Guarantee we do not put any regions on master 1343 servers = new ArrayList<>(servers); 1344 servers.remove(masterServerName); 1345 } 1346 } 1347 1348 int numServers = servers == null ? 0 : servers.size(); 1349 if (numServers == 0) { 1350 LOG.warn("Wanted to retain assignment but no servers to assign to"); 1351 return null; 1352 } 1353 if (numServers == 1) { // Only one server, nothing fancy we can do here 1354 return servers.get(0); 1355 } 1356 List<ServerName> idleServers = findIdleServers(servers); 1357 if (idleServers.size() == 1) { 1358 return idleServers.get(0); 1359 } 1360 final List<ServerName> finalServers = idleServers.isEmpty() ? 1361 servers : idleServers; 1362 List<RegionInfo> regions = Lists.newArrayList(regionInfo); 1363 Cluster cluster = createCluster(finalServers, regions); 1364 return randomAssignment(cluster, regionInfo, finalServers); 1365 } 1366 1367 /** 1368 * Generates a bulk assignment startup plan, attempting to reuse the existing 1369 * assignment information from META, but adjusting for the specified list of 1370 * available/online servers available for assignment. 1371 * <p> 1372 * Takes a map of all regions to their existing assignment from META. Also 1373 * takes a list of online servers for regions to be assigned to. Attempts to 1374 * retain all assignment, so in some instances initial assignment will not be 1375 * completely balanced. 1376 * <p> 1377 * Any leftover regions without an existing server to be assigned to will be 1378 * assigned randomly to available servers. 1379 * 1380 * @param regions regions and existing assignment from meta 1381 * @param servers available servers 1382 * @return map of servers and regions to be assigned to them 1383 */ 1384 @Override 1385 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 1386 List<ServerName> servers) throws HBaseIOException { 1387 // Update metrics 1388 metricsBalancer.incrMiscInvocations(); 1389 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers); 1390 if (assignments != null && !assignments.isEmpty()) { 1391 servers = new ArrayList<>(servers); 1392 // Guarantee not to put other regions on master 1393 servers.remove(masterServerName); 1394 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1395 regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey())) 1396 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); 1397 } 1398 if (this.maintenanceMode || regions.isEmpty()) { 1399 return assignments; 1400 } 1401 1402 int numServers = servers == null ? 0 : servers.size(); 1403 if (numServers == 0) { 1404 LOG.warn("Wanted to do retain assignment but no servers to assign to"); 1405 return null; 1406 } 1407 if (numServers == 1) { // Only one server, nothing fancy we can do here 1408 ServerName server = servers.get(0); 1409 assignments.put(server, new ArrayList<>(regions.keySet())); 1410 return assignments; 1411 } 1412 1413 // Group all of the old assignments by their hostname. 1414 // We can't group directly by ServerName since the servers all have 1415 // new start-codes. 1416 1417 // Group the servers by their hostname. It's possible we have multiple 1418 // servers on the same host on different ports. 1419 ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create(); 1420 for (ServerName server : servers) { 1421 assignments.put(server, new ArrayList<>()); 1422 serversByHostname.put(server.getHostnameLowerCase(), server); 1423 } 1424 1425 // Collection of the hostnames that used to have regions 1426 // assigned, but for which we no longer have any RS running 1427 // after the cluster restart. 1428 Set<String> oldHostsNoLongerPresent = Sets.newTreeSet(); 1429 1430 // If the old servers aren't present, lets assign those regions later. 1431 List<RegionInfo> randomAssignRegions = Lists.newArrayList(); 1432 1433 int numRandomAssignments = 0; 1434 int numRetainedAssigments = 0; 1435 for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) { 1436 RegionInfo region = entry.getKey(); 1437 ServerName oldServerName = entry.getValue(); 1438 List<ServerName> localServers = new ArrayList<>(); 1439 if (oldServerName != null) { 1440 localServers = serversByHostname.get(oldServerName.getHostnameLowerCase()); 1441 } 1442 if (localServers.isEmpty()) { 1443 // No servers on the new cluster match up with this hostname, assign randomly, later. 1444 randomAssignRegions.add(region); 1445 if (oldServerName != null) { 1446 oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase()); 1447 } 1448 } else if (localServers.size() == 1) { 1449 // the usual case - one new server on same host 1450 ServerName target = localServers.get(0); 1451 assignments.get(target).add(region); 1452 numRetainedAssigments++; 1453 } else { 1454 // multiple new servers in the cluster on this same host 1455 if (localServers.contains(oldServerName)) { 1456 assignments.get(oldServerName).add(region); 1457 numRetainedAssigments++; 1458 } else { 1459 ServerName target = null; 1460 for (ServerName tmp : localServers) { 1461 if (tmp.getPort() == oldServerName.getPort()) { 1462 target = tmp; 1463 assignments.get(tmp).add(region); 1464 numRetainedAssigments++; 1465 break; 1466 } 1467 } 1468 if (target == null) { 1469 randomAssignRegions.add(region); 1470 } 1471 } 1472 } 1473 } 1474 1475 // If servers from prior assignment aren't present, then lets do randomAssignment on regions. 1476 if (randomAssignRegions.size() > 0) { 1477 Cluster cluster = createCluster(servers, regions.keySet()); 1478 for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) { 1479 ServerName sn = entry.getKey(); 1480 for (RegionInfo region : entry.getValue()) { 1481 cluster.doAssignRegion(region, sn); 1482 } 1483 } 1484 for (RegionInfo region : randomAssignRegions) { 1485 ServerName target = randomAssignment(cluster, region, servers); 1486 assignments.get(target).add(region); 1487 numRandomAssignments++; 1488 } 1489 } 1490 1491 String randomAssignMsg = ""; 1492 if (numRandomAssignments > 0) { 1493 randomAssignMsg = 1494 numRandomAssignments + " regions were assigned " 1495 + "to random hosts, since the old hosts for these regions are no " 1496 + "longer present in the cluster. These hosts were:\n " 1497 + Joiner.on("\n ").join(oldHostsNoLongerPresent); 1498 } 1499 1500 LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments 1501 + " retained the pre-restart assignment. " + randomAssignMsg); 1502 return assignments; 1503 } 1504 1505 @Override 1506 public void initialize() throws HBaseIOException{ 1507 } 1508 1509 @Override 1510 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 1511 } 1512 1513 @Override 1514 public void regionOffline(RegionInfo regionInfo) { 1515 } 1516 1517 @Override 1518 public boolean isStopped() { 1519 return stopped; 1520 } 1521 1522 @Override 1523 public void stop(String why) { 1524 LOG.info("Load Balancer stop requested: "+why); 1525 stopped = true; 1526 } 1527 1528 /** 1529 * Updates the balancer status tag reported to JMX 1530 */ 1531 public void updateBalancerStatus(boolean status) { 1532 metricsBalancer.balancerStatus(status); 1533 } 1534 1535 /** 1536 * Used to assign a single region to a random server. 1537 */ 1538 private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, 1539 List<ServerName> servers) { 1540 int numServers = servers.size(); // servers is not null, numServers > 1 1541 ServerName sn = null; 1542 final int maxIterations = numServers * 4; 1543 int iterations = 0; 1544 List<ServerName> usedSNs = new ArrayList<>(servers.size()); 1545 do { 1546 int i = RANDOM.nextInt(numServers); 1547 sn = servers.get(i); 1548 if (!usedSNs.contains(sn)) { 1549 usedSNs.add(sn); 1550 } 1551 } while (cluster.wouldLowerAvailability(regionInfo, sn) 1552 && iterations++ < maxIterations); 1553 if (iterations >= maxIterations) { 1554 // We have reached the max. Means the servers that we collected is still lowering the 1555 // availability 1556 for (ServerName unusedServer : servers) { 1557 if (!usedSNs.contains(unusedServer)) { 1558 // check if any other unused server is there for us to use. 1559 // If so use it. Else we have not other go but to go with one of them 1560 if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) { 1561 sn = unusedServer; 1562 break; 1563 } 1564 } 1565 } 1566 } 1567 cluster.doAssignRegion(regionInfo, sn); 1568 return sn; 1569 } 1570 1571 /** 1572 * Round robin a list of regions to a list of servers 1573 */ 1574 private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions, 1575 List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) { 1576 List<RegionInfo> unassignedRegions = new ArrayList<>(); 1577 int numServers = servers.size(); 1578 int numRegions = regions.size(); 1579 int max = (int) Math.ceil((float) numRegions / numServers); 1580 int serverIdx = 0; 1581 if (numServers > 1) { 1582 serverIdx = RANDOM.nextInt(numServers); 1583 } 1584 int regionIdx = 0; 1585 for (int j = 0; j < numServers; j++) { 1586 ServerName server = servers.get((j + serverIdx) % numServers); 1587 List<RegionInfo> serverRegions = new ArrayList<>(max); 1588 for (int i = regionIdx; i < numRegions; i += numServers) { 1589 RegionInfo region = regions.get(i % numRegions); 1590 if (cluster.wouldLowerAvailability(region, server)) { 1591 unassignedRegions.add(region); 1592 } else { 1593 serverRegions.add(region); 1594 cluster.doAssignRegion(region, server); 1595 } 1596 } 1597 assignments.put(server, serverRegions); 1598 regionIdx++; 1599 } 1600 1601 1602 List<RegionInfo> lastFewRegions = new ArrayList<>(); 1603 // assign the remaining by going through the list and try to assign to servers one-by-one 1604 serverIdx = RANDOM.nextInt(numServers); 1605 OUTER : for (RegionInfo region : unassignedRegions) { 1606 boolean assigned = false; 1607 INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one 1608 ServerName server = servers.get((j + serverIdx) % numServers); 1609 if (cluster.wouldLowerAvailability(region, server)) { 1610 continue INNER; 1611 } else { 1612 assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1613 cluster.doAssignRegion(region, server); 1614 serverIdx = (j + serverIdx + 1) % numServers; //remain from next server 1615 assigned = true; 1616 break; 1617 } 1618 } 1619 if (!assigned) { 1620 lastFewRegions.add(region); 1621 } 1622 } 1623 // just sprinkle the rest of the regions on random regionservers. The balanceCluster will 1624 // make it optimal later. we can end up with this if numReplicas > numServers. 1625 for (RegionInfo region : lastFewRegions) { 1626 int i = RANDOM.nextInt(numServers); 1627 ServerName server = servers.get(i); 1628 assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region); 1629 cluster.doAssignRegion(region, server); 1630 } 1631 } 1632 1633 protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer( 1634 Collection<RegionInfo> regions) { 1635 if (this.services != null && this.services.getAssignmentManager() != null) { 1636 return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); 1637 } else { 1638 return new HashMap<>(); 1639 } 1640 } 1641 1642 @Override 1643 public void onConfigurationChange(Configuration conf) { 1644 } 1645}