001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.balancer; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.NavigableMap; 032import java.util.Random; 033import java.util.Set; 034import java.util.TreeMap; 035import java.util.function.Predicate; 036import java.util.stream.Collectors; 037import org.apache.commons.lang3.NotImplementedException; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.ClusterMetrics; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseIOException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HDFSBlocksDistribution; 044import org.apache.hadoop.hbase.ServerMetrics; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionReplicaUtil; 049import org.apache.hadoop.hbase.master.LoadBalancer; 050import org.apache.hadoop.hbase.master.MasterServices; 051import org.apache.hadoop.hbase.master.RackManager; 052import org.apache.hadoop.hbase.master.RegionPlan; 053import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 060import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 061import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 062import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 063 064/** 065 * The base class for load balancers. It provides the the functions used to by 066 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions 067 * in the edge cases. It doesn't provide an implementation of the 068 * actual balancing algorithm. 069 * 070 */ 071@InterfaceAudience.Private 072public abstract class BaseLoadBalancer implements LoadBalancer { 073 protected static final int MIN_SERVER_BALANCE = 2; 074 private volatile boolean stopped = false; 075 076 private static final List<RegionInfo> EMPTY_REGION_LIST = new ArrayList<>(0); 077 078 static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR 079 = load -> load.getRegionMetrics().isEmpty(); 080 081 protected RegionLocationFinder regionFinder; 082 protected boolean useRegionFinder; 083 084 private static class DefaultRackManager extends RackManager { 085 @Override 086 public String getRack(ServerName server) { 087 return UNKNOWN_RACK; 088 } 089 } 090 091 /** 092 * The constructor that uses the basic MetricsBalancer 093 */ 094 protected BaseLoadBalancer() { 095 metricsBalancer = new MetricsBalancer(); 096 createRegionFinder(); 097 } 098 099 /** 100 * This Constructor accepts an instance of MetricsBalancer, 101 * which will be used instead of creating a new one 102 */ 103 protected BaseLoadBalancer(MetricsBalancer metricsBalancer) { 104 this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer(); 105 createRegionFinder(); 106 } 107 108 private void createRegionFinder() { 109 useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true); 110 if (useRegionFinder) { 111 regionFinder = new RegionLocationFinder(); 112 } 113 } 114 115 /** 116 * An efficient array based implementation similar to ClusterState for keeping 117 * the status of the cluster in terms of region assignment and distribution. 118 * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of 119 * hundreds of thousands of hashmap manipulations are very costly, which is why this 120 * class uses mostly indexes and arrays. 121 * 122 * Cluster tracks a list of unassigned regions, region assignments, and the server 123 * topology in terms of server names, hostnames and racks. 124 */ 125 protected static class Cluster { 126 ServerName[] servers; 127 String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host 128 String[] racks; 129 boolean multiServersPerHost = false; // whether or not any host has more than one server 130 131 ArrayList<String> tables; 132 RegionInfo[] regions; 133 Deque<BalancerRegionLoad>[] regionLoads; 134 private RegionLocationFinder regionFinder; 135 136 int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality 137 138 int[] serverIndexToHostIndex; //serverIndex -> host index 139 int[] serverIndexToRackIndex; //serverIndex -> rack index 140 141 int[][] regionsPerServer; //serverIndex -> region list 142 int[][] regionsPerHost; //hostIndex -> list of regions 143 int[][] regionsPerRack; //rackIndex -> region list 144 int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index 145 int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index 146 int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index 147 148 int[][] serversPerHost; //hostIndex -> list of server indexes 149 int[][] serversPerRack; //rackIndex -> list of server indexes 150 int[] regionIndexToServerIndex; //regionIndex -> serverIndex 151 int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) 152 int[] regionIndexToTableIndex; //regionIndex -> tableIndex 153 int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions 154 int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS 155 int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary 156 boolean hasRegionReplicas = false; //whether there is regions with replicas 157 158 Integer[] serverIndicesSortedByRegionCount; 159 Integer[] serverIndicesSortedByLocality; 160 161 Map<String, Integer> serversToIndex; 162 Map<String, Integer> hostsToIndex; 163 Map<String, Integer> racksToIndex; 164 Map<String, Integer> tablesToIndex; 165 Map<RegionInfo, Integer> regionsToIndex; 166 float[] localityPerServer; 167 168 int numServers; 169 int numHosts; 170 int numRacks; 171 int numTables; 172 int numRegions; 173 174 int numMovedRegions = 0; //num moved regions from the initial configuration 175 Map<ServerName, List<RegionInfo>> clusterState; 176 177 protected final RackManager rackManager; 178 // Maps region -> rackIndex -> locality of region on rack 179 private float[][] rackLocalities; 180 // Maps localityType -> region -> [server|rack]Index with highest locality 181 private int[][] regionsToMostLocalEntities; 182 183 protected Cluster( 184 Map<ServerName, List<RegionInfo>> clusterState, 185 Map<String, Deque<BalancerRegionLoad>> loads, 186 RegionLocationFinder regionFinder, 187 RackManager rackManager) { 188 this(null, clusterState, loads, regionFinder, rackManager); 189 } 190 191 @SuppressWarnings("unchecked") 192 protected Cluster( 193 Collection<RegionInfo> unassignedRegions, 194 Map<ServerName, List<RegionInfo>> clusterState, 195 Map<String, Deque<BalancerRegionLoad>> loads, 196 RegionLocationFinder regionFinder, 197 RackManager rackManager) { 198 199 if (unassignedRegions == null) { 200 unassignedRegions = EMPTY_REGION_LIST; 201 } 202 203 serversToIndex = new HashMap<>(); 204 hostsToIndex = new HashMap<>(); 205 racksToIndex = new HashMap<>(); 206 tablesToIndex = new HashMap<>(); 207 208 //TODO: We should get the list of tables from master 209 tables = new ArrayList<>(); 210 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 211 212 numRegions = 0; 213 214 List<List<Integer>> serversPerHostList = new ArrayList<>(); 215 List<List<Integer>> serversPerRackList = new ArrayList<>(); 216 this.clusterState = clusterState; 217 this.regionFinder = regionFinder; 218 219 // Use servername and port as there can be dead servers in this list. We want everything with 220 // a matching hostname and port to have the same index. 221 for (ServerName sn : clusterState.keySet()) { 222 if (sn == null) { 223 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + 224 "skipping; unassigned regions?"); 225 if (LOG.isTraceEnabled()) { 226 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 227 } 228 continue; 229 } 230 if (serversToIndex.get(sn.getAddress().toString()) == null) { 231 serversToIndex.put(sn.getHostAndPort(), numServers++); 232 } 233 if (!hostsToIndex.containsKey(sn.getHostname())) { 234 hostsToIndex.put(sn.getHostname(), numHosts++); 235 serversPerHostList.add(new ArrayList<>(1)); 236 } 237 238 int serverIndex = serversToIndex.get(sn.getHostAndPort()); 239 int hostIndex = hostsToIndex.get(sn.getHostname()); 240 serversPerHostList.get(hostIndex).add(serverIndex); 241 242 String rack = this.rackManager.getRack(sn); 243 if (!racksToIndex.containsKey(rack)) { 244 racksToIndex.put(rack, numRacks++); 245 serversPerRackList.add(new ArrayList<>()); 246 } 247 int rackIndex = racksToIndex.get(rack); 248 serversPerRackList.get(rackIndex).add(serverIndex); 249 } 250 251 // Count how many regions there are. 252 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 253 numRegions += entry.getValue().size(); 254 } 255 numRegions += unassignedRegions.size(); 256 257 regionsToIndex = new HashMap<>(numRegions); 258 servers = new ServerName[numServers]; 259 serversPerHost = new int[numHosts][]; 260 serversPerRack = new int[numRacks][]; 261 regions = new RegionInfo[numRegions]; 262 regionIndexToServerIndex = new int[numRegions]; 263 initialRegionIndexToServerIndex = new int[numRegions]; 264 regionIndexToTableIndex = new int[numRegions]; 265 regionIndexToPrimaryIndex = new int[numRegions]; 266 regionLoads = new Deque[numRegions]; 267 268 regionLocations = new int[numRegions][]; 269 serverIndicesSortedByRegionCount = new Integer[numServers]; 270 serverIndicesSortedByLocality = new Integer[numServers]; 271 localityPerServer = new float[numServers]; 272 273 serverIndexToHostIndex = new int[numServers]; 274 serverIndexToRackIndex = new int[numServers]; 275 regionsPerServer = new int[numServers][]; 276 regionsPerHost = new int[numHosts][]; 277 regionsPerRack = new int[numRacks][]; 278 primariesOfRegionsPerServer = new int[numServers][]; 279 primariesOfRegionsPerHost = new int[numHosts][]; 280 primariesOfRegionsPerRack = new int[numRacks][]; 281 282 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; 283 284 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 285 if (entry.getKey() == null) { 286 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 287 continue; 288 } 289 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 290 291 // keep the servername if this is the first server name for this hostname 292 // or this servername has the newest startcode. 293 if (servers[serverIndex] == null || 294 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { 295 servers[serverIndex] = entry.getKey(); 296 } 297 298 if (regionsPerServer[serverIndex] != null) { 299 // there is another server with the same hostAndPort in ClusterState. 300 // allocate the array for the total size 301 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 302 } else { 303 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 304 } 305 primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; 306 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 307 serverIndicesSortedByLocality[serverIndex] = serverIndex; 308 } 309 310 hosts = new String[numHosts]; 311 for (Entry<String, Integer> entry : hostsToIndex.entrySet()) { 312 hosts[entry.getValue()] = entry.getKey(); 313 } 314 racks = new String[numRacks]; 315 for (Entry<String, Integer> entry : racksToIndex.entrySet()) { 316 racks[entry.getValue()] = entry.getKey(); 317 } 318 319 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 320 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 321 regionPerServerIndex = 0; 322 323 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 324 serverIndexToHostIndex[serverIndex] = hostIndex; 325 326 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 327 serverIndexToRackIndex[serverIndex] = rackIndex; 328 329 for (RegionInfo region : entry.getValue()) { 330 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 331 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 332 regionIndex++; 333 } 334 } 335 336 for (RegionInfo region : unassignedRegions) { 337 registerRegion(region, regionIndex, -1, loads, regionFinder); 338 regionIndex++; 339 } 340 341 for (int i = 0; i < serversPerHostList.size(); i++) { 342 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 343 for (int j = 0; j < serversPerHost[i].length; j++) { 344 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 345 } 346 if (serversPerHost[i].length > 1) { 347 multiServersPerHost = true; 348 } 349 } 350 351 for (int i = 0; i < serversPerRackList.size(); i++) { 352 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 353 for (int j = 0; j < serversPerRack[i].length; j++) { 354 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 355 } 356 } 357 358 numTables = tables.size(); 359 numRegionsPerServerPerTable = new int[numServers][numTables]; 360 361 for (int i = 0; i < numServers; i++) { 362 for (int j = 0; j < numTables; j++) { 363 numRegionsPerServerPerTable[i][j] = 0; 364 } 365 } 366 367 for (int i=0; i < regionIndexToServerIndex.length; i++) { 368 if (regionIndexToServerIndex[i] >= 0) { 369 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; 370 } 371 } 372 373 numMaxRegionsPerTable = new int[numTables]; 374 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 375 for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { 376 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 377 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 378 } 379 } 380 } 381 382 for (int i = 0; i < regions.length; i ++) { 383 RegionInfo info = regions[i]; 384 if (RegionReplicaUtil.isDefaultReplica(info)) { 385 regionIndexToPrimaryIndex[i] = i; 386 } else { 387 hasRegionReplicas = true; 388 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 389 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 390 } 391 } 392 393 for (int i = 0; i < regionsPerServer.length; i++) { 394 primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; 395 for (int j = 0; j < regionsPerServer[i].length; j++) { 396 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 397 primariesOfRegionsPerServer[i][j] = primaryIndex; 398 } 399 // sort the regions by primaries. 400 Arrays.sort(primariesOfRegionsPerServer[i]); 401 } 402 403 // compute regionsPerHost 404 if (multiServersPerHost) { 405 for (int i = 0 ; i < serversPerHost.length; i++) { 406 int numRegionsPerHost = 0; 407 for (int j = 0; j < serversPerHost[i].length; j++) { 408 numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; 409 } 410 regionsPerHost[i] = new int[numRegionsPerHost]; 411 primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; 412 } 413 for (int i = 0 ; i < serversPerHost.length; i++) { 414 int numRegionPerHostIndex = 0; 415 for (int j = 0; j < serversPerHost[i].length; j++) { 416 for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { 417 int region = regionsPerServer[serversPerHost[i][j]][k]; 418 regionsPerHost[i][numRegionPerHostIndex] = region; 419 int primaryIndex = regionIndexToPrimaryIndex[region]; 420 primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; 421 numRegionPerHostIndex++; 422 } 423 } 424 // sort the regions by primaries. 425 Arrays.sort(primariesOfRegionsPerHost[i]); 426 } 427 } 428 429 // compute regionsPerRack 430 if (numRacks > 1) { 431 for (int i = 0 ; i < serversPerRack.length; i++) { 432 int numRegionsPerRack = 0; 433 for (int j = 0; j < serversPerRack[i].length; j++) { 434 numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; 435 } 436 regionsPerRack[i] = new int[numRegionsPerRack]; 437 primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; 438 } 439 440 for (int i = 0 ; i < serversPerRack.length; i++) { 441 int numRegionPerRackIndex = 0; 442 for (int j = 0; j < serversPerRack[i].length; j++) { 443 for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { 444 int region = regionsPerServer[serversPerRack[i][j]][k]; 445 regionsPerRack[i][numRegionPerRackIndex] = region; 446 int primaryIndex = regionIndexToPrimaryIndex[region]; 447 primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; 448 numRegionPerRackIndex++; 449 } 450 } 451 // sort the regions by primaries. 452 Arrays.sort(primariesOfRegionsPerRack[i]); 453 } 454 } 455 } 456 457 /** Helper for Cluster constructor to handle a region */ 458 private void registerRegion(RegionInfo region, int regionIndex, 459 int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, 460 RegionLocationFinder regionFinder) { 461 String tableName = region.getTable().getNameAsString(); 462 if (!tablesToIndex.containsKey(tableName)) { 463 tables.add(tableName); 464 tablesToIndex.put(tableName, tablesToIndex.size()); 465 } 466 int tableIndex = tablesToIndex.get(tableName); 467 468 regionsToIndex.put(region, regionIndex); 469 regions[regionIndex] = region; 470 regionIndexToServerIndex[regionIndex] = serverIndex; 471 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 472 regionIndexToTableIndex[regionIndex] = tableIndex; 473 474 // region load 475 if (loads != null) { 476 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 477 // That could have failed if the RegionLoad is using the other regionName 478 if (rl == null) { 479 // Try getting the region load using encoded name. 480 rl = loads.get(region.getEncodedName()); 481 } 482 regionLoads[regionIndex] = rl; 483 } 484 485 if (regionFinder != null) { 486 // region location 487 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 488 regionLocations[regionIndex] = new int[loc.size()]; 489 for (int i = 0; i < loc.size(); i++) { 490 regionLocations[regionIndex][i] = loc.get(i) == null ? -1 491 : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 492 : serversToIndex.get(loc.get(i).getHostAndPort())); 493 } 494 } 495 } 496 497 /** 498 * Returns true iff a given server has less regions than the balanced amount 499 */ 500 public boolean serverHasTooFewRegions(int server) { 501 int minLoad = this.numRegions / numServers; 502 int numRegions = getNumRegions(server); 503 return numRegions < minLoad; 504 } 505 506 /** 507 * Retrieves and lazily initializes a field storing the locality of 508 * every region/server combination 509 */ 510 public float[][] getOrComputeRackLocalities() { 511 if (rackLocalities == null || regionsToMostLocalEntities == null) { 512 computeCachedLocalities(); 513 } 514 return rackLocalities; 515 } 516 517 /** 518 * Lazily initializes and retrieves a mapping of region -> server for which region has 519 * the highest the locality 520 */ 521 public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { 522 if (rackLocalities == null || regionsToMostLocalEntities == null) { 523 computeCachedLocalities(); 524 } 525 return regionsToMostLocalEntities[type.ordinal()]; 526 } 527 528 /** 529 * Looks up locality from cache of localities. Will create cache if it does 530 * not already exist. 531 */ 532 public float getOrComputeLocality(int region, int entity, LocalityType type) { 533 switch (type) { 534 case SERVER: 535 return getLocalityOfRegion(region, entity); 536 case RACK: 537 return getOrComputeRackLocalities()[region][entity]; 538 default: 539 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 540 } 541 } 542 543 /** 544 * Returns locality weighted by region size in MB. Will create locality cache 545 * if it does not already exist. 546 */ 547 public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { 548 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 549 } 550 551 /** 552 * Returns the size in MB from the most recent RegionLoad for region 553 */ 554 public int getRegionSizeMB(int region) { 555 Deque<BalancerRegionLoad> load = regionLoads[region]; 556 // This means regions have no actual data on disk 557 if (load == null) { 558 return 0; 559 } 560 return regionLoads[region].getLast().getStorefileSizeMB(); 561 } 562 563 /** 564 * Computes and caches the locality for each region/rack combinations, 565 * as well as storing a mapping of region -> server and region -> rack such that server 566 * and rack have the highest locality for region 567 */ 568 private void computeCachedLocalities() { 569 rackLocalities = new float[numRegions][numRacks]; 570 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 571 572 // Compute localities and find most local server per region 573 for (int region = 0; region < numRegions; region++) { 574 int serverWithBestLocality = 0; 575 float bestLocalityForRegion = 0; 576 for (int server = 0; server < numServers; server++) { 577 // Aggregate per-rack locality 578 float locality = getLocalityOfRegion(region, server); 579 int rack = serverIndexToRackIndex[server]; 580 int numServersInRack = serversPerRack[rack].length; 581 rackLocalities[region][rack] += locality / numServersInRack; 582 583 if (locality > bestLocalityForRegion) { 584 serverWithBestLocality = server; 585 bestLocalityForRegion = locality; 586 } 587 } 588 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 589 590 // Find most local rack per region 591 int rackWithBestLocality = 0; 592 float bestRackLocalityForRegion = 0.0f; 593 for (int rack = 0; rack < numRacks; rack++) { 594 float rackLocality = rackLocalities[region][rack]; 595 if (rackLocality > bestRackLocalityForRegion) { 596 bestRackLocalityForRegion = rackLocality; 597 rackWithBestLocality = rack; 598 } 599 } 600 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 601 } 602 603 } 604 605 /** 606 * Maps region index to rack index 607 */ 608 public int getRackForRegion(int region) { 609 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 610 } 611 612 enum LocalityType { 613 SERVER, 614 RACK 615 } 616 617 /** An action to move or swap a region */ 618 public static class Action { 619 public enum Type { 620 ASSIGN_REGION, 621 MOVE_REGION, 622 SWAP_REGIONS, 623 NULL, 624 } 625 626 public Type type; 627 public Action (Type type) {this.type = type;} 628 /** Returns an Action which would undo this action */ 629 public Action undoAction() { return this; } 630 @Override 631 public String toString() { return type + ":";} 632 } 633 634 public static class AssignRegionAction extends Action { 635 public int region; 636 public int server; 637 public AssignRegionAction(int region, int server) { 638 super(Type.ASSIGN_REGION); 639 this.region = region; 640 this.server = server; 641 } 642 @Override 643 public Action undoAction() { 644 // TODO implement this. This action is not being used by the StochasticLB for now 645 // in case it uses it, we should implement this function. 646 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 647 } 648 @Override 649 public String toString() { 650 return type + ": " + region + ":" + server; 651 } 652 } 653 654 public static class MoveRegionAction extends Action { 655 public int region; 656 public int fromServer; 657 public int toServer; 658 659 public MoveRegionAction(int region, int fromServer, int toServer) { 660 super(Type.MOVE_REGION); 661 this.fromServer = fromServer; 662 this.region = region; 663 this.toServer = toServer; 664 } 665 @Override 666 public Action undoAction() { 667 return new MoveRegionAction (region, toServer, fromServer); 668 } 669 @Override 670 public String toString() { 671 return type + ": " + region + ":" + fromServer + " -> " + toServer; 672 } 673 } 674 675 public static class SwapRegionsAction extends Action { 676 public int fromServer; 677 public int fromRegion; 678 public int toServer; 679 public int toRegion; 680 public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { 681 super(Type.SWAP_REGIONS); 682 this.fromServer = fromServer; 683 this.fromRegion = fromRegion; 684 this.toServer = toServer; 685 this.toRegion = toRegion; 686 } 687 @Override 688 public Action undoAction() { 689 return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); 690 } 691 @Override 692 public String toString() { 693 return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; 694 } 695 } 696 697 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", 698 justification="Mistake. Too disruptive to change now") 699 public static final Action NullAction = new Action(Type.NULL); 700 701 public void doAction(Action action) { 702 switch (action.type) { 703 case NULL: break; 704 case ASSIGN_REGION: 705 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 706 assert action instanceof AssignRegionAction: action.getClass(); 707 AssignRegionAction ar = (AssignRegionAction) action; 708 regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); 709 regionMoved(ar.region, -1, ar.server); 710 break; 711 case MOVE_REGION: 712 assert action instanceof MoveRegionAction: action.getClass(); 713 MoveRegionAction mra = (MoveRegionAction) action; 714 regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); 715 regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); 716 regionMoved(mra.region, mra.fromServer, mra.toServer); 717 break; 718 case SWAP_REGIONS: 719 assert action instanceof SwapRegionsAction: action.getClass(); 720 SwapRegionsAction a = (SwapRegionsAction) action; 721 regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); 722 regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); 723 regionMoved(a.fromRegion, a.fromServer, a.toServer); 724 regionMoved(a.toRegion, a.toServer, a.fromServer); 725 break; 726 default: 727 throw new RuntimeException("Uknown action:" + action.type); 728 } 729 } 730 731 /** 732 * Return true if the placement of region on server would lower the availability 733 * of the region in question 734 * @return true or false 735 */ 736 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 737 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 738 return false; // safeguard against race between cluster.servers and servers from LB method args 739 } 740 int server = serversToIndex.get(serverName.getHostAndPort()); 741 int region = regionsToIndex.get(regionInfo); 742 743 int primary = regionIndexToPrimaryIndex[region]; 744 745 // there is a subset relation for server < host < rack 746 // check server first 747 748 if (contains(primariesOfRegionsPerServer[server], primary)) { 749 // check for whether there are other servers that we can place this region 750 for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { 751 if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { 752 return true; // meaning there is a better server 753 } 754 } 755 return false; // there is not a better server to place this 756 } 757 758 // check host 759 if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host 760 int host = serverIndexToHostIndex[server]; 761 if (contains(primariesOfRegionsPerHost[host], primary)) { 762 // check for whether there are other hosts that we can place this region 763 for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { 764 if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { 765 return true; // meaning there is a better host 766 } 767 } 768 return false; // there is not a better host to place this 769 } 770 } 771 772 // check rack 773 if (numRacks > 1) { 774 int rack = serverIndexToRackIndex[server]; 775 if (contains(primariesOfRegionsPerRack[rack], primary)) { 776 // check for whether there are other racks that we can place this region 777 for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { 778 if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { 779 return true; // meaning there is a better rack 780 } 781 } 782 return false; // there is not a better rack to place this 783 } 784 } 785 return false; 786 } 787 788 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 789 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 790 return; 791 } 792 int server = serversToIndex.get(serverName.getHostAndPort()); 793 int region = regionsToIndex.get(regionInfo); 794 doAction(new AssignRegionAction(region, server)); 795 } 796 797 void regionMoved(int region, int oldServer, int newServer) { 798 regionIndexToServerIndex[region] = newServer; 799 if (initialRegionIndexToServerIndex[region] == newServer) { 800 numMovedRegions--; //region moved back to original location 801 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 802 numMovedRegions++; //region moved from original location 803 } 804 int tableIndex = regionIndexToTableIndex[region]; 805 if (oldServer >= 0) { 806 numRegionsPerServerPerTable[oldServer][tableIndex]--; 807 } 808 numRegionsPerServerPerTable[newServer][tableIndex]++; 809 810 //check whether this caused maxRegionsPerTable in the new Server to be updated 811 if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { 812 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; 813 } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) 814 == numMaxRegionsPerTable[tableIndex]) { 815 //recompute maxRegionsPerTable since the previous value was coming from the old server 816 numMaxRegionsPerTable[tableIndex] = 0; 817 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 818 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 819 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 820 } 821 } 822 } 823 824 // update for servers 825 int primary = regionIndexToPrimaryIndex[region]; 826 if (oldServer >= 0) { 827 primariesOfRegionsPerServer[oldServer] = removeRegion( 828 primariesOfRegionsPerServer[oldServer], primary); 829 } 830 primariesOfRegionsPerServer[newServer] = addRegionSorted( 831 primariesOfRegionsPerServer[newServer], primary); 832 833 // update for hosts 834 if (multiServersPerHost) { 835 int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; 836 int newHost = serverIndexToHostIndex[newServer]; 837 if (newHost != oldHost) { 838 regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); 839 primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); 840 if (oldHost >= 0) { 841 regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); 842 primariesOfRegionsPerHost[oldHost] = removeRegion( 843 primariesOfRegionsPerHost[oldHost], primary); // will still be sorted 844 } 845 } 846 } 847 848 // update for racks 849 if (numRacks > 1) { 850 int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; 851 int newRack = serverIndexToRackIndex[newServer]; 852 if (newRack != oldRack) { 853 regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); 854 primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); 855 if (oldRack >= 0) { 856 regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); 857 primariesOfRegionsPerRack[oldRack] = removeRegion( 858 primariesOfRegionsPerRack[oldRack], primary); // will still be sorted 859 } 860 } 861 } 862 } 863 864 int[] removeRegion(int[] regions, int regionIndex) { 865 //TODO: this maybe costly. Consider using linked lists 866 int[] newRegions = new int[regions.length - 1]; 867 int i = 0; 868 for (i = 0; i < regions.length; i++) { 869 if (regions[i] == regionIndex) { 870 break; 871 } 872 newRegions[i] = regions[i]; 873 } 874 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); 875 return newRegions; 876 } 877 878 int[] addRegion(int[] regions, int regionIndex) { 879 int[] newRegions = new int[regions.length + 1]; 880 System.arraycopy(regions, 0, newRegions, 0, regions.length); 881 newRegions[newRegions.length - 1] = regionIndex; 882 return newRegions; 883 } 884 885 int[] addRegionSorted(int[] regions, int regionIndex) { 886 int[] newRegions = new int[regions.length + 1]; 887 int i = 0; 888 for (i = 0; i < regions.length; i++) { // find the index to insert 889 if (regions[i] > regionIndex) { 890 break; 891 } 892 } 893 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 894 System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half 895 newRegions[i] = regionIndex; 896 897 return newRegions; 898 } 899 900 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 901 int i = 0; 902 for (i = 0; i < regions.length; i++) { 903 if (regions[i] == regionIndex) { 904 regions[i] = newRegionIndex; 905 break; 906 } 907 } 908 return regions; 909 } 910 911 void sortServersByRegionCount() { 912 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 913 } 914 915 int getNumRegions(int server) { 916 return regionsPerServer[server].length; 917 } 918 919 boolean contains(int[] arr, int val) { 920 return Arrays.binarySearch(arr, val) >= 0; 921 } 922 923 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 924 925 int getLowestLocalityRegionOnServer(int serverIndex) { 926 if (regionFinder != null) { 927 float lowestLocality = 1.0f; 928 int lowestLocalityRegionIndex = -1; 929 if (regionsPerServer[serverIndex].length == 0) { 930 // No regions on that region server 931 return -1; 932 } 933 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 934 int regionIndex = regionsPerServer[serverIndex][j]; 935 HDFSBlocksDistribution distribution = regionFinder 936 .getBlockDistribution(regions[regionIndex]); 937 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 938 // skip empty region 939 if (distribution.getUniqueBlocksTotalWeight() == 0) { 940 continue; 941 } 942 if (locality < lowestLocality) { 943 lowestLocality = locality; 944 lowestLocalityRegionIndex = j; 945 } 946 } 947 if (lowestLocalityRegionIndex == -1) { 948 return -1; 949 } 950 if (LOG.isTraceEnabled()) { 951 LOG.trace("Lowest locality region is " 952 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 953 .getRegionNameAsString() + " with locality " + lowestLocality 954 + " and its region server contains " + regionsPerServer[serverIndex].length 955 + " regions"); 956 } 957 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 958 } else { 959 return -1; 960 } 961 } 962 963 float getLocalityOfRegion(int region, int server) { 964 if (regionFinder != null) { 965 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 966 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 967 } else { 968 return 0f; 969 } 970 } 971 972 @VisibleForTesting 973 protected void setNumRegions(int numRegions) { 974 this.numRegions = numRegions; 975 } 976 977 @VisibleForTesting 978 protected void setNumMovedRegions(int numMovedRegions) { 979 this.numMovedRegions = numMovedRegions; 980 } 981 982 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", 983 justification="Not important but should be fixed") 984 @Override 985 public String toString() { 986 StringBuilder desc = new StringBuilder("Cluster={servers=["); 987 for(ServerName sn:servers) { 988 desc.append(sn.getHostAndPort()).append(", "); 989 } 990 desc.append("], serverIndicesSortedByRegionCount=") 991 .append(Arrays.toString(serverIndicesSortedByRegionCount)) 992 .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); 993 994 desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) 995 .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 996 .append(", numTables=").append(numTables).append(", numMovedRegions=") 997 .append(numMovedRegions).append('}'); 998 return desc.toString(); 999 } 1000 } 1001 1002 // slop for regions 1003 protected float slop; 1004 // overallSlop to control simpleLoadBalancer's cluster level threshold 1005 protected float overallSlop; 1006 protected Configuration config = HBaseConfiguration.create(); 1007 protected RackManager rackManager; 1008 private static final Random RANDOM = new Random(System.currentTimeMillis()); 1009 private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); 1010 protected MetricsBalancer metricsBalancer = null; 1011 protected ClusterMetrics clusterStatus = null; 1012 protected ServerName masterServerName; 1013 protected MasterServices services; 1014 protected boolean onlySystemTablesOnMaster; 1015 protected boolean maintenanceMode; 1016 1017 @Override 1018 public void setConf(Configuration conf) { 1019 this.config = conf; 1020 setSlop(conf); 1021 if (slop < 0) slop = 0; 1022 else if (slop > 1) slop = 1; 1023 1024 if (overallSlop < 0) overallSlop = 0; 1025 else if (overallSlop > 1) overallSlop = 1; 1026 1027 this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config); 1028 1029 this.rackManager = new RackManager(getConf()); 1030 if (useRegionFinder) { 1031 regionFinder.setConf(conf); 1032 } 1033 // Print out base configs. Don't print overallSlop since it for simple balancer exclusively. 1034 LOG.info("slop={}, systemTablesOnMaster={}", 1035 this.slop, this.onlySystemTablesOnMaster); 1036 } 1037 1038 protected void setSlop(Configuration conf) { 1039 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); 1040 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 1041 } 1042 1043 /** 1044 * Check if a region belongs to some system table. 1045 * If so, the primary replica may be expected to be put on the master regionserver. 1046 */ 1047 public boolean shouldBeOnMaster(RegionInfo region) { 1048 return (this.maintenanceMode || this.onlySystemTablesOnMaster) 1049 && region.getTable().isSystemTable(); 1050 } 1051 1052 /** 1053 * Balance the regions that should be on master regionserver. 1054 */ 1055 protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) { 1056 if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null; 1057 List<RegionPlan> plans = null; 1058 List<RegionInfo> regions = clusterMap.get(masterServerName); 1059 if (regions != null) { 1060 Iterator<ServerName> keyIt = null; 1061 for (RegionInfo region: regions) { 1062 if (shouldBeOnMaster(region)) continue; 1063 1064 // Find a non-master regionserver to host the region 1065 if (keyIt == null || !keyIt.hasNext()) { 1066 keyIt = clusterMap.keySet().iterator(); 1067 } 1068 ServerName dest = keyIt.next(); 1069 if (masterServerName.equals(dest)) { 1070 if (!keyIt.hasNext()) { 1071 keyIt = clusterMap.keySet().iterator(); 1072 } 1073 dest = keyIt.next(); 1074 } 1075 1076 // Move this region away from the master regionserver 1077 RegionPlan plan = new RegionPlan(region, masterServerName, dest); 1078 if (plans == null) { 1079 plans = new ArrayList<>(); 1080 } 1081 plans.add(plan); 1082 } 1083 } 1084 for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) { 1085 if (masterServerName.equals(server.getKey())) continue; 1086 for (RegionInfo region: server.getValue()) { 1087 if (!shouldBeOnMaster(region)) continue; 1088 1089 // Move this region to the master regionserver 1090 RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName); 1091 if (plans == null) { 1092 plans = new ArrayList<>(); 1093 } 1094 plans.add(plan); 1095 } 1096 } 1097 return plans; 1098 } 1099 1100 /** 1101 * If master is configured to carry system tables only, in here is 1102 * where we figure what to assign it. 1103 */ 1104 protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions( 1105 Collection<RegionInfo> regions, List<ServerName> servers) { 1106 if (servers == null || regions == null || regions.isEmpty()) { 1107 return null; 1108 } 1109 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 1110 if (this.maintenanceMode || this.onlySystemTablesOnMaster) { 1111 if (masterServerName != null && servers.contains(masterServerName)) { 1112 assignments.put(masterServerName, new ArrayList<>()); 1113 for (RegionInfo region : regions) { 1114 if (shouldBeOnMaster(region)) { 1115 assignments.get(masterServerName).add(region); 1116 } 1117 } 1118 } 1119 } 1120 return assignments; 1121 } 1122 1123 @Override 1124 public Configuration getConf() { 1125 return this.config; 1126 } 1127 1128 @Override 1129 public synchronized void setClusterMetrics(ClusterMetrics st) { 1130 this.clusterStatus = st; 1131 if (useRegionFinder) { 1132 regionFinder.setClusterMetrics(st); 1133 } 1134 } 1135 1136 @Override 1137 public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){ 1138 1139 } 1140 1141 @Override 1142 public void setMasterServices(MasterServices masterServices) { 1143 masterServerName = masterServices.getServerName(); 1144 this.services = masterServices; 1145 if (useRegionFinder) { 1146 this.regionFinder.setServices(masterServices); 1147 } 1148 if (this.services.isInMaintenanceMode()) { 1149 this.maintenanceMode = true; 1150 } 1151 } 1152 1153 @Override 1154 public void postMasterStartupInitialize() { 1155 if (services != null && regionFinder != null) { 1156 try { 1157 Set<RegionInfo> regions = 1158 services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet(); 1159 regionFinder.refreshAndWait(regions); 1160 } catch (Exception e) { 1161 LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e); 1162 } 1163 } 1164 } 1165 1166 public void setRackManager(RackManager rackManager) { 1167 this.rackManager = rackManager; 1168 } 1169 1170 protected boolean needsBalance(Cluster c) { 1171 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 1172 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 1173 if (LOG.isDebugEnabled()) { 1174 LOG.debug("Not running balancer because only " + cs.getNumServers() 1175 + " active regionserver(s)"); 1176 } 1177 return false; 1178 } 1179 if(areSomeRegionReplicasColocated(c)) return true; 1180 // Check if we even need to do any load balancing 1181 // HBASE-3681 check sloppiness first 1182 float average = cs.getLoadAverage(); // for logging 1183 int floor = (int) Math.floor(average * (1 - slop)); 1184 int ceiling = (int) Math.ceil(average * (1 + slop)); 1185 if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { 1186 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 1187 if (LOG.isTraceEnabled()) { 1188 // If nothing to balance, then don't say anything unless trace-level logging. 1189 LOG.trace("Skipping load balancing because balanced cluster; " + 1190 "servers=" + cs.getNumServers() + 1191 " regions=" + cs.getNumRegions() + " average=" + average + 1192 " mostloaded=" + serversByLoad.lastKey().getLoad() + 1193 " leastloaded=" + serversByLoad.firstKey().getLoad()); 1194 } 1195 return false; 1196 } 1197 return true; 1198 } 1199 1200 /** 1201 * Subclasses should implement this to return true if the cluster has nodes that hosts 1202 * multiple replicas for the same region, or, if there are multiple racks and the same 1203 * rack hosts replicas of the same region 1204 * @param c Cluster information 1205 * @return whether region replicas are currently co-located 1206 */ 1207 protected boolean areSomeRegionReplicasColocated(Cluster c) { 1208 return false; 1209 } 1210 1211 /** 1212 * Generates a bulk assignment plan to be used on cluster startup using a 1213 * simple round-robin assignment. 1214 * <p> 1215 * Takes a list of all the regions and all the servers in the cluster and 1216 * returns a map of each server to the regions that it should be assigned. 1217 * <p> 1218 * Currently implemented as a round-robin assignment. Same invariant as load 1219 * balancing, all servers holding floor(avg) or ceiling(avg). 1220 * 1221 * TODO: Use block locations from HDFS to place regions with their blocks 1222 * 1223 * @param regions all regions 1224 * @param servers all servers 1225 * @return map of server to the regions it should take, or null if no 1226 * assignment is possible (ie. no regions or no servers) 1227 */ 1228 @Override 1229 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 1230 List<ServerName> servers) throws HBaseIOException { 1231 metricsBalancer.incrMiscInvocations(); 1232 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers); 1233 if (assignments != null && !assignments.isEmpty()) { 1234 servers = new ArrayList<>(servers); 1235 // Guarantee not to put other regions on master 1236 servers.remove(masterServerName); 1237 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1238 if (!masterRegions.isEmpty()) { 1239 regions = new ArrayList<>(regions); 1240 regions.removeAll(masterRegions); 1241 } 1242 } 1243 if (this.maintenanceMode || regions == null || regions.isEmpty()) { 1244 return assignments; 1245 } 1246 1247 int numServers = servers == null ? 0 : servers.size(); 1248 if (numServers == 0) { 1249 LOG.warn("Wanted to do round robin assignment but no servers to assign to"); 1250 return null; 1251 } 1252 1253 // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the 1254 // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate 1255 // generator for AssignRegionAction. The LB will ensure the regions are mostly local 1256 // and balanced. This should also run fast with fewer number of iterations. 1257 1258 if (numServers == 1) { // Only one server, nothing fancy we can do here 1259 ServerName server = servers.get(0); 1260 assignments.put(server, new ArrayList<>(regions)); 1261 return assignments; 1262 } 1263 1264 Cluster cluster = createCluster(servers, regions); 1265 List<RegionInfo> unassignedRegions = new ArrayList<>(); 1266 1267 roundRobinAssignment(cluster, regions, unassignedRegions, 1268 servers, assignments); 1269 1270 List<RegionInfo> lastFewRegions = new ArrayList<>(); 1271 // assign the remaining by going through the list and try to assign to servers one-by-one 1272 int serverIdx = RANDOM.nextInt(numServers); 1273 for (RegionInfo region : unassignedRegions) { 1274 boolean assigned = false; 1275 for (int j = 0; j < numServers; j++) { // try all servers one by one 1276 ServerName serverName = servers.get((j + serverIdx) % numServers); 1277 if (!cluster.wouldLowerAvailability(region, serverName)) { 1278 List<RegionInfo> serverRegions = 1279 assignments.computeIfAbsent(serverName, k -> new ArrayList<>()); 1280 serverRegions.add(region); 1281 cluster.doAssignRegion(region, serverName); 1282 serverIdx = (j + serverIdx + 1) % numServers; //remain from next server 1283 assigned = true; 1284 break; 1285 } 1286 } 1287 if (!assigned) { 1288 lastFewRegions.add(region); 1289 } 1290 } 1291 // just sprinkle the rest of the regions on random regionservers. The balanceCluster will 1292 // make it optimal later. we can end up with this if numReplicas > numServers. 1293 for (RegionInfo region : lastFewRegions) { 1294 int i = RANDOM.nextInt(numServers); 1295 ServerName server = servers.get(i); 1296 List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>()); 1297 serverRegions.add(region); 1298 cluster.doAssignRegion(region, server); 1299 } 1300 return assignments; 1301 } 1302 1303 protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions) { 1304 // Get the snapshot of the current assignments for the regions in question, and then create 1305 // a cluster out of it. Note that we might have replicas already assigned to some servers 1306 // earlier. So we want to get the snapshot to see those assignments, but this will only contain 1307 // replicas of the regions that are passed (for performance). 1308 Map<ServerName, List<RegionInfo>> clusterState = getRegionAssignmentsByServer(regions); 1309 1310 for (ServerName server : servers) { 1311 if (!clusterState.containsKey(server)) { 1312 clusterState.put(server, EMPTY_REGION_LIST); 1313 } 1314 } 1315 return new Cluster(regions, clusterState, null, this.regionFinder, 1316 rackManager); 1317 } 1318 1319 private List<ServerName> findIdleServers(List<ServerName> servers) { 1320 return this.services.getServerManager() 1321 .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR); 1322 } 1323 1324 /** 1325 * Used to assign a single region to a random server. 1326 */ 1327 @Override 1328 public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers) 1329 throws HBaseIOException { 1330 metricsBalancer.incrMiscInvocations(); 1331 if (servers != null && servers.contains(masterServerName)) { 1332 if (shouldBeOnMaster(regionInfo)) { 1333 return masterServerName; 1334 } 1335 if (!LoadBalancer.isTablesOnMaster(getConf())) { 1336 // Guarantee we do not put any regions on master 1337 servers = new ArrayList<>(servers); 1338 servers.remove(masterServerName); 1339 } 1340 } 1341 1342 int numServers = servers == null ? 0 : servers.size(); 1343 if (numServers == 0) { 1344 LOG.warn("Wanted to retain assignment but no servers to assign to"); 1345 return null; 1346 } 1347 if (numServers == 1) { // Only one server, nothing fancy we can do here 1348 return servers.get(0); 1349 } 1350 List<ServerName> idleServers = findIdleServers(servers); 1351 if (idleServers.size() == 1) { 1352 return idleServers.get(0); 1353 } 1354 final List<ServerName> finalServers = idleServers.isEmpty() ? 1355 servers : idleServers; 1356 List<RegionInfo> regions = Lists.newArrayList(regionInfo); 1357 Cluster cluster = createCluster(finalServers, regions); 1358 return randomAssignment(cluster, regionInfo, finalServers); 1359 } 1360 1361 /** 1362 * Generates a bulk assignment startup plan, attempting to reuse the existing 1363 * assignment information from META, but adjusting for the specified list of 1364 * available/online servers available for assignment. 1365 * <p> 1366 * Takes a map of all regions to their existing assignment from META. Also 1367 * takes a list of online servers for regions to be assigned to. Attempts to 1368 * retain all assignment, so in some instances initial assignment will not be 1369 * completely balanced. 1370 * <p> 1371 * Any leftover regions without an existing server to be assigned to will be 1372 * assigned randomly to available servers. 1373 * 1374 * @param regions regions and existing assignment from meta 1375 * @param servers available servers 1376 * @return map of servers and regions to be assigned to them 1377 */ 1378 @Override 1379 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 1380 List<ServerName> servers) throws HBaseIOException { 1381 // Update metrics 1382 metricsBalancer.incrMiscInvocations(); 1383 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers); 1384 if (assignments != null && !assignments.isEmpty()) { 1385 servers = new ArrayList<>(servers); 1386 // Guarantee not to put other regions on master 1387 servers.remove(masterServerName); 1388 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1389 regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey())) 1390 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); 1391 } 1392 if (this.maintenanceMode || regions.isEmpty()) { 1393 return assignments; 1394 } 1395 1396 int numServers = servers == null ? 0 : servers.size(); 1397 if (numServers == 0) { 1398 LOG.warn("Wanted to do retain assignment but no servers to assign to"); 1399 return null; 1400 } 1401 if (numServers == 1) { // Only one server, nothing fancy we can do here 1402 ServerName server = servers.get(0); 1403 assignments.put(server, new ArrayList<>(regions.keySet())); 1404 return assignments; 1405 } 1406 1407 // Group all of the old assignments by their hostname. 1408 // We can't group directly by ServerName since the servers all have 1409 // new start-codes. 1410 1411 // Group the servers by their hostname. It's possible we have multiple 1412 // servers on the same host on different ports. 1413 ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create(); 1414 for (ServerName server : servers) { 1415 assignments.put(server, new ArrayList<>()); 1416 serversByHostname.put(server.getHostnameLowerCase(), server); 1417 } 1418 1419 // Collection of the hostnames that used to have regions 1420 // assigned, but for which we no longer have any RS running 1421 // after the cluster restart. 1422 Set<String> oldHostsNoLongerPresent = Sets.newTreeSet(); 1423 1424 int numRandomAssignments = 0; 1425 int numRetainedAssigments = 0; 1426 1427 Cluster cluster = createCluster(servers, regions.keySet()); 1428 1429 for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) { 1430 RegionInfo region = entry.getKey(); 1431 ServerName oldServerName = entry.getValue(); 1432 List<ServerName> localServers = new ArrayList<>(); 1433 if (oldServerName != null) { 1434 localServers = serversByHostname.get(oldServerName.getHostnameLowerCase()); 1435 } 1436 if (localServers.isEmpty()) { 1437 // No servers on the new cluster match up with this hostname, 1438 // assign randomly. 1439 ServerName randomServer = randomAssignment(cluster, region, servers); 1440 assignments.get(randomServer).add(region); 1441 numRandomAssignments++; 1442 if (oldServerName != null) { 1443 oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase()); 1444 } 1445 } else if (localServers.size() == 1) { 1446 // the usual case - one new server on same host 1447 ServerName target = localServers.get(0); 1448 assignments.get(target).add(region); 1449 cluster.doAssignRegion(region, target); 1450 numRetainedAssigments++; 1451 } else { 1452 // multiple new servers in the cluster on this same host 1453 if (localServers.contains(oldServerName)) { 1454 assignments.get(oldServerName).add(region); 1455 cluster.doAssignRegion(region, oldServerName); 1456 } else { 1457 ServerName target = null; 1458 for (ServerName tmp: localServers) { 1459 if (tmp.getPort() == oldServerName.getPort()) { 1460 target = tmp; 1461 break; 1462 } 1463 } 1464 if (target == null) { 1465 target = randomAssignment(cluster, region, localServers); 1466 } 1467 assignments.get(target).add(region); 1468 } 1469 numRetainedAssigments++; 1470 } 1471 } 1472 1473 String randomAssignMsg = ""; 1474 if (numRandomAssignments > 0) { 1475 randomAssignMsg = 1476 numRandomAssignments + " regions were assigned " 1477 + "to random hosts, since the old hosts for these regions are no " 1478 + "longer present in the cluster. These hosts were:\n " 1479 + Joiner.on("\n ").join(oldHostsNoLongerPresent); 1480 } 1481 1482 LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments 1483 + " retained the pre-restart assignment. " + randomAssignMsg); 1484 return assignments; 1485 } 1486 1487 @Override 1488 public void initialize() throws HBaseIOException{ 1489 } 1490 1491 @Override 1492 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 1493 } 1494 1495 @Override 1496 public void regionOffline(RegionInfo regionInfo) { 1497 } 1498 1499 @Override 1500 public boolean isStopped() { 1501 return stopped; 1502 } 1503 1504 @Override 1505 public void stop(String why) { 1506 LOG.info("Load Balancer stop requested: "+why); 1507 stopped = true; 1508 } 1509 1510 /** 1511 * Used to assign a single region to a random server. 1512 */ 1513 private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, 1514 List<ServerName> servers) { 1515 int numServers = servers.size(); // servers is not null, numServers > 1 1516 ServerName sn = null; 1517 final int maxIterations = numServers * 4; 1518 int iterations = 0; 1519 1520 do { 1521 int i = RANDOM.nextInt(numServers); 1522 sn = servers.get(i); 1523 } while (cluster.wouldLowerAvailability(regionInfo, sn) 1524 && iterations++ < maxIterations); 1525 cluster.doAssignRegion(regionInfo, sn); 1526 return sn; 1527 } 1528 1529 /** 1530 * Round robin a list of regions to a list of servers 1531 */ 1532 private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions, 1533 List<RegionInfo> unassignedRegions, List<ServerName> servers, 1534 Map<ServerName, List<RegionInfo>> assignments) { 1535 1536 int numServers = servers.size(); 1537 int numRegions = regions.size(); 1538 int max = (int) Math.ceil((float) numRegions / numServers); 1539 int serverIdx = 0; 1540 if (numServers > 1) { 1541 serverIdx = RANDOM.nextInt(numServers); 1542 } 1543 int regionIdx = 0; 1544 1545 for (int j = 0; j < numServers; j++) { 1546 ServerName server = servers.get((j + serverIdx) % numServers); 1547 List<RegionInfo> serverRegions = new ArrayList<>(max); 1548 for (int i = regionIdx; i < numRegions; i += numServers) { 1549 RegionInfo region = regions.get(i % numRegions); 1550 if (cluster.wouldLowerAvailability(region, server)) { 1551 unassignedRegions.add(region); 1552 } else { 1553 serverRegions.add(region); 1554 cluster.doAssignRegion(region, server); 1555 } 1556 } 1557 assignments.put(server, serverRegions); 1558 regionIdx++; 1559 } 1560 } 1561 1562 protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer( 1563 Collection<RegionInfo> regions) { 1564 if (this.services != null && this.services.getAssignmentManager() != null) { 1565 return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); 1566 } else { 1567 return new HashMap<>(); 1568 } 1569 } 1570 1571 @Override 1572 public void onConfigurationChange(Configuration conf) { 1573 } 1574}