001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.balancer; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.NavigableMap; 032import java.util.Random; 033import java.util.Set; 034import java.util.TreeMap; 035import java.util.function.Predicate; 036import java.util.stream.Collectors; 037 038import org.apache.commons.lang3.NotImplementedException; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.ClusterMetrics; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseIOException; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HDFSBlocksDistribution; 045import org.apache.hadoop.hbase.ServerMetrics; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionReplicaUtil; 050import org.apache.hadoop.hbase.master.LoadBalancer; 051import org.apache.hadoop.hbase.master.MasterServices; 052import org.apache.hadoop.hbase.master.RackManager; 053import org.apache.hadoop.hbase.master.RegionPlan; 054import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 055import org.apache.hadoop.hbase.master.assignment.RegionStates; 056import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 058import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 059import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 060import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 061import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * The base class for load balancers. It provides the the functions used to by 068 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions 069 * in the edge cases. It doesn't provide an implementation of the 070 * actual balancing algorithm. 071 * 072 */ 073@InterfaceAudience.Private 074public abstract class BaseLoadBalancer implements LoadBalancer { 075 protected static final int MIN_SERVER_BALANCE = 2; 076 private volatile boolean stopped = false; 077 078 private static final List<RegionInfo> EMPTY_REGION_LIST = new ArrayList<>(0); 079 080 static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR 081 = load -> load.getRegionMetrics().isEmpty(); 082 083 protected RegionLocationFinder regionFinder; 084 protected boolean useRegionFinder; 085 086 private static class DefaultRackManager extends RackManager { 087 @Override 088 public String getRack(ServerName server) { 089 return UNKNOWN_RACK; 090 } 091 } 092 093 /** 094 * The constructor that uses the basic MetricsBalancer 095 */ 096 protected BaseLoadBalancer() { 097 metricsBalancer = new MetricsBalancer(); 098 createRegionFinder(); 099 } 100 101 /** 102 * This Constructor accepts an instance of MetricsBalancer, 103 * which will be used instead of creating a new one 104 */ 105 protected BaseLoadBalancer(MetricsBalancer metricsBalancer) { 106 this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer(); 107 createRegionFinder(); 108 } 109 110 private void createRegionFinder() { 111 useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true); 112 if (useRegionFinder) { 113 regionFinder = new RegionLocationFinder(); 114 } 115 } 116 117 /** 118 * An efficient array based implementation similar to ClusterState for keeping 119 * the status of the cluster in terms of region assignment and distribution. 120 * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of 121 * hundreds of thousands of hashmap manipulations are very costly, which is why this 122 * class uses mostly indexes and arrays. 123 * 124 * Cluster tracks a list of unassigned regions, region assignments, and the server 125 * topology in terms of server names, hostnames and racks. 126 */ 127 protected static class Cluster { 128 ServerName[] servers; 129 String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host 130 String[] racks; 131 boolean multiServersPerHost = false; // whether or not any host has more than one server 132 133 ArrayList<String> tables; 134 RegionInfo[] regions; 135 Deque<BalancerRegionLoad>[] regionLoads; 136 private RegionLocationFinder regionFinder; 137 138 int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality 139 140 int[] serverIndexToHostIndex; //serverIndex -> host index 141 int[] serverIndexToRackIndex; //serverIndex -> rack index 142 143 int[][] regionsPerServer; //serverIndex -> region list 144 int[] serverIndexToRegionsOffset; //serverIndex -> offset of region list 145 int[][] regionsPerHost; //hostIndex -> list of regions 146 int[][] regionsPerRack; //rackIndex -> region list 147 int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index 148 int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index 149 int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index 150 151 int[][] serversPerHost; //hostIndex -> list of server indexes 152 int[][] serversPerRack; //rackIndex -> list of server indexes 153 int[] regionIndexToServerIndex; //regionIndex -> serverIndex 154 int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) 155 int[] regionIndexToTableIndex; //regionIndex -> tableIndex 156 int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions 157 int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS 158 int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary 159 boolean hasRegionReplicas = false; //whether there is regions with replicas 160 161 Integer[] serverIndicesSortedByRegionCount; 162 Integer[] serverIndicesSortedByLocality; 163 164 Map<String, Integer> serversToIndex; 165 Map<String, Integer> hostsToIndex; 166 Map<String, Integer> racksToIndex; 167 Map<String, Integer> tablesToIndex; 168 Map<RegionInfo, Integer> regionsToIndex; 169 float[] localityPerServer; 170 171 int numServers; 172 int numHosts; 173 int numRacks; 174 int numTables; 175 int numRegions; 176 177 int numMovedRegions = 0; //num moved regions from the initial configuration 178 Map<ServerName, List<RegionInfo>> clusterState; 179 180 protected final RackManager rackManager; 181 // Maps region -> rackIndex -> locality of region on rack 182 private float[][] rackLocalities; 183 // Maps localityType -> region -> [server|rack]Index with highest locality 184 private int[][] regionsToMostLocalEntities; 185 186 protected Cluster( 187 Map<ServerName, List<RegionInfo>> clusterState, 188 Map<String, Deque<BalancerRegionLoad>> loads, 189 RegionLocationFinder regionFinder, 190 RackManager rackManager) { 191 this(null, clusterState, loads, regionFinder, rackManager); 192 } 193 194 @SuppressWarnings("unchecked") 195 protected Cluster( 196 Collection<RegionInfo> unassignedRegions, 197 Map<ServerName, List<RegionInfo>> clusterState, 198 Map<String, Deque<BalancerRegionLoad>> loads, 199 RegionLocationFinder regionFinder, 200 RackManager rackManager) { 201 202 if (unassignedRegions == null) { 203 unassignedRegions = EMPTY_REGION_LIST; 204 } 205 206 serversToIndex = new HashMap<>(); 207 hostsToIndex = new HashMap<>(); 208 racksToIndex = new HashMap<>(); 209 tablesToIndex = new HashMap<>(); 210 211 //TODO: We should get the list of tables from master 212 tables = new ArrayList<>(); 213 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 214 215 numRegions = 0; 216 217 List<List<Integer>> serversPerHostList = new ArrayList<>(); 218 List<List<Integer>> serversPerRackList = new ArrayList<>(); 219 this.clusterState = clusterState; 220 this.regionFinder = regionFinder; 221 222 // Use servername and port as there can be dead servers in this list. We want everything with 223 // a matching hostname and port to have the same index. 224 for (ServerName sn : clusterState.keySet()) { 225 if (sn == null) { 226 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + 227 "skipping; unassigned regions?"); 228 if (LOG.isTraceEnabled()) { 229 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 230 } 231 continue; 232 } 233 if (serversToIndex.get(sn.getAddress().toString()) == null) { 234 serversToIndex.put(sn.getHostAndPort(), numServers++); 235 } 236 if (!hostsToIndex.containsKey(sn.getHostname())) { 237 hostsToIndex.put(sn.getHostname(), numHosts++); 238 serversPerHostList.add(new ArrayList<>(1)); 239 } 240 241 int serverIndex = serversToIndex.get(sn.getHostAndPort()); 242 int hostIndex = hostsToIndex.get(sn.getHostname()); 243 serversPerHostList.get(hostIndex).add(serverIndex); 244 245 String rack = this.rackManager.getRack(sn); 246 if (!racksToIndex.containsKey(rack)) { 247 racksToIndex.put(rack, numRacks++); 248 serversPerRackList.add(new ArrayList<>()); 249 } 250 int rackIndex = racksToIndex.get(rack); 251 serversPerRackList.get(rackIndex).add(serverIndex); 252 } 253 254 // Count how many regions there are. 255 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 256 numRegions += entry.getValue().size(); 257 } 258 numRegions += unassignedRegions.size(); 259 260 regionsToIndex = new HashMap<>(numRegions); 261 servers = new ServerName[numServers]; 262 serversPerHost = new int[numHosts][]; 263 serversPerRack = new int[numRacks][]; 264 regions = new RegionInfo[numRegions]; 265 regionIndexToServerIndex = new int[numRegions]; 266 initialRegionIndexToServerIndex = new int[numRegions]; 267 regionIndexToTableIndex = new int[numRegions]; 268 regionIndexToPrimaryIndex = new int[numRegions]; 269 regionLoads = new Deque[numRegions]; 270 271 regionLocations = new int[numRegions][]; 272 serverIndicesSortedByRegionCount = new Integer[numServers]; 273 serverIndicesSortedByLocality = new Integer[numServers]; 274 localityPerServer = new float[numServers]; 275 276 serverIndexToHostIndex = new int[numServers]; 277 serverIndexToRackIndex = new int[numServers]; 278 regionsPerServer = new int[numServers][]; 279 serverIndexToRegionsOffset = new int[numServers]; 280 regionsPerHost = new int[numHosts][]; 281 regionsPerRack = new int[numRacks][]; 282 primariesOfRegionsPerServer = new int[numServers][]; 283 primariesOfRegionsPerHost = new int[numHosts][]; 284 primariesOfRegionsPerRack = new int[numRacks][]; 285 286 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; 287 288 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 289 if (entry.getKey() == null) { 290 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 291 continue; 292 } 293 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 294 295 // keep the servername if this is the first server name for this hostname 296 // or this servername has the newest startcode. 297 if (servers[serverIndex] == null || 298 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { 299 servers[serverIndex] = entry.getKey(); 300 } 301 302 if (regionsPerServer[serverIndex] != null) { 303 // there is another server with the same hostAndPort in ClusterState. 304 // allocate the array for the total size 305 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 306 } else { 307 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 308 } 309 primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; 310 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 311 serverIndicesSortedByLocality[serverIndex] = serverIndex; 312 } 313 314 hosts = new String[numHosts]; 315 for (Entry<String, Integer> entry : hostsToIndex.entrySet()) { 316 hosts[entry.getValue()] = entry.getKey(); 317 } 318 racks = new String[numRacks]; 319 for (Entry<String, Integer> entry : racksToIndex.entrySet()) { 320 racks[entry.getValue()] = entry.getKey(); 321 } 322 323 for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 324 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); 325 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 326 327 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 328 serverIndexToHostIndex[serverIndex] = hostIndex; 329 330 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 331 serverIndexToRackIndex[serverIndex] = rackIndex; 332 333 for (RegionInfo region : entry.getValue()) { 334 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 335 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 336 regionIndex++; 337 } 338 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 339 } 340 341 for (RegionInfo region : unassignedRegions) { 342 registerRegion(region, regionIndex, -1, loads, regionFinder); 343 regionIndex++; 344 } 345 346 for (int i = 0; i < serversPerHostList.size(); i++) { 347 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 348 for (int j = 0; j < serversPerHost[i].length; j++) { 349 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 350 } 351 if (serversPerHost[i].length > 1) { 352 multiServersPerHost = true; 353 } 354 } 355 356 for (int i = 0; i < serversPerRackList.size(); i++) { 357 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 358 for (int j = 0; j < serversPerRack[i].length; j++) { 359 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 360 } 361 } 362 363 numTables = tables.size(); 364 numRegionsPerServerPerTable = new int[numServers][numTables]; 365 366 for (int i = 0; i < numServers; i++) { 367 for (int j = 0; j < numTables; j++) { 368 numRegionsPerServerPerTable[i][j] = 0; 369 } 370 } 371 372 for (int i=0; i < regionIndexToServerIndex.length; i++) { 373 if (regionIndexToServerIndex[i] >= 0) { 374 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; 375 } 376 } 377 378 numMaxRegionsPerTable = new int[numTables]; 379 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 380 for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { 381 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 382 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 383 } 384 } 385 } 386 387 for (int i = 0; i < regions.length; i ++) { 388 RegionInfo info = regions[i]; 389 if (RegionReplicaUtil.isDefaultReplica(info)) { 390 regionIndexToPrimaryIndex[i] = i; 391 } else { 392 hasRegionReplicas = true; 393 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 394 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 395 } 396 } 397 398 for (int i = 0; i < regionsPerServer.length; i++) { 399 primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; 400 for (int j = 0; j < regionsPerServer[i].length; j++) { 401 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 402 primariesOfRegionsPerServer[i][j] = primaryIndex; 403 } 404 // sort the regions by primaries. 405 Arrays.sort(primariesOfRegionsPerServer[i]); 406 } 407 408 // compute regionsPerHost 409 if (multiServersPerHost) { 410 for (int i = 0 ; i < serversPerHost.length; i++) { 411 int numRegionsPerHost = 0; 412 for (int j = 0; j < serversPerHost[i].length; j++) { 413 numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; 414 } 415 regionsPerHost[i] = new int[numRegionsPerHost]; 416 primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; 417 } 418 for (int i = 0 ; i < serversPerHost.length; i++) { 419 int numRegionPerHostIndex = 0; 420 for (int j = 0; j < serversPerHost[i].length; j++) { 421 for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { 422 int region = regionsPerServer[serversPerHost[i][j]][k]; 423 regionsPerHost[i][numRegionPerHostIndex] = region; 424 int primaryIndex = regionIndexToPrimaryIndex[region]; 425 primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; 426 numRegionPerHostIndex++; 427 } 428 } 429 // sort the regions by primaries. 430 Arrays.sort(primariesOfRegionsPerHost[i]); 431 } 432 } 433 434 // compute regionsPerRack 435 if (numRacks > 1) { 436 for (int i = 0 ; i < serversPerRack.length; i++) { 437 int numRegionsPerRack = 0; 438 for (int j = 0; j < serversPerRack[i].length; j++) { 439 numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; 440 } 441 regionsPerRack[i] = new int[numRegionsPerRack]; 442 primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; 443 } 444 445 for (int i = 0 ; i < serversPerRack.length; i++) { 446 int numRegionPerRackIndex = 0; 447 for (int j = 0; j < serversPerRack[i].length; j++) { 448 for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { 449 int region = regionsPerServer[serversPerRack[i][j]][k]; 450 regionsPerRack[i][numRegionPerRackIndex] = region; 451 int primaryIndex = regionIndexToPrimaryIndex[region]; 452 primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; 453 numRegionPerRackIndex++; 454 } 455 } 456 // sort the regions by primaries. 457 Arrays.sort(primariesOfRegionsPerRack[i]); 458 } 459 } 460 } 461 462 /** Helper for Cluster constructor to handle a region */ 463 private void registerRegion(RegionInfo region, int regionIndex, 464 int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, 465 RegionLocationFinder regionFinder) { 466 String tableName = region.getTable().getNameAsString(); 467 if (!tablesToIndex.containsKey(tableName)) { 468 tables.add(tableName); 469 tablesToIndex.put(tableName, tablesToIndex.size()); 470 } 471 int tableIndex = tablesToIndex.get(tableName); 472 473 regionsToIndex.put(region, regionIndex); 474 regions[regionIndex] = region; 475 regionIndexToServerIndex[regionIndex] = serverIndex; 476 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 477 regionIndexToTableIndex[regionIndex] = tableIndex; 478 479 // region load 480 if (loads != null) { 481 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 482 // That could have failed if the RegionLoad is using the other regionName 483 if (rl == null) { 484 // Try getting the region load using encoded name. 485 rl = loads.get(region.getEncodedName()); 486 } 487 regionLoads[regionIndex] = rl; 488 } 489 490 if (regionFinder != null) { 491 // region location 492 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 493 regionLocations[regionIndex] = new int[loc.size()]; 494 for (int i = 0; i < loc.size(); i++) { 495 regionLocations[regionIndex][i] = loc.get(i) == null ? -1 496 : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 497 : serversToIndex.get(loc.get(i).getHostAndPort())); 498 } 499 } 500 } 501 502 /** 503 * Returns true iff a given server has less regions than the balanced amount 504 */ 505 public boolean serverHasTooFewRegions(int server) { 506 int minLoad = this.numRegions / numServers; 507 int numRegions = getNumRegions(server); 508 return numRegions < minLoad; 509 } 510 511 /** 512 * Retrieves and lazily initializes a field storing the locality of 513 * every region/server combination 514 */ 515 public float[][] getOrComputeRackLocalities() { 516 if (rackLocalities == null || regionsToMostLocalEntities == null) { 517 computeCachedLocalities(); 518 } 519 return rackLocalities; 520 } 521 522 /** 523 * Lazily initializes and retrieves a mapping of region -> server for which region has 524 * the highest the locality 525 */ 526 public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { 527 if (rackLocalities == null || regionsToMostLocalEntities == null) { 528 computeCachedLocalities(); 529 } 530 return regionsToMostLocalEntities[type.ordinal()]; 531 } 532 533 /** 534 * Looks up locality from cache of localities. Will create cache if it does 535 * not already exist. 536 */ 537 public float getOrComputeLocality(int region, int entity, LocalityType type) { 538 switch (type) { 539 case SERVER: 540 return getLocalityOfRegion(region, entity); 541 case RACK: 542 return getOrComputeRackLocalities()[region][entity]; 543 default: 544 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 545 } 546 } 547 548 /** 549 * Returns locality weighted by region size in MB. Will create locality cache 550 * if it does not already exist. 551 */ 552 public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { 553 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 554 } 555 556 /** 557 * Returns the size in MB from the most recent RegionLoad for region 558 */ 559 public int getRegionSizeMB(int region) { 560 Deque<BalancerRegionLoad> load = regionLoads[region]; 561 // This means regions have no actual data on disk 562 if (load == null) { 563 return 0; 564 } 565 return regionLoads[region].getLast().getStorefileSizeMB(); 566 } 567 568 /** 569 * Computes and caches the locality for each region/rack combinations, 570 * as well as storing a mapping of region -> server and region -> rack such that server 571 * and rack have the highest locality for region 572 */ 573 private void computeCachedLocalities() { 574 rackLocalities = new float[numRegions][numRacks]; 575 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 576 577 // Compute localities and find most local server per region 578 for (int region = 0; region < numRegions; region++) { 579 int serverWithBestLocality = 0; 580 float bestLocalityForRegion = 0; 581 for (int server = 0; server < numServers; server++) { 582 // Aggregate per-rack locality 583 float locality = getLocalityOfRegion(region, server); 584 int rack = serverIndexToRackIndex[server]; 585 int numServersInRack = serversPerRack[rack].length; 586 rackLocalities[region][rack] += locality / numServersInRack; 587 588 if (locality > bestLocalityForRegion) { 589 serverWithBestLocality = server; 590 bestLocalityForRegion = locality; 591 } 592 } 593 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 594 595 // Find most local rack per region 596 int rackWithBestLocality = 0; 597 float bestRackLocalityForRegion = 0.0f; 598 for (int rack = 0; rack < numRacks; rack++) { 599 float rackLocality = rackLocalities[region][rack]; 600 if (rackLocality > bestRackLocalityForRegion) { 601 bestRackLocalityForRegion = rackLocality; 602 rackWithBestLocality = rack; 603 } 604 } 605 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 606 } 607 608 } 609 610 /** 611 * Maps region index to rack index 612 */ 613 public int getRackForRegion(int region) { 614 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 615 } 616 617 enum LocalityType { 618 SERVER, 619 RACK 620 } 621 622 /** An action to move or swap a region */ 623 public static class Action { 624 public enum Type { 625 ASSIGN_REGION, 626 MOVE_REGION, 627 SWAP_REGIONS, 628 NULL, 629 } 630 631 public Type type; 632 public Action (Type type) {this.type = type;} 633 /** Returns an Action which would undo this action */ 634 public Action undoAction() { return this; } 635 @Override 636 public String toString() { return type + ":";} 637 } 638 639 public static class AssignRegionAction extends Action { 640 public int region; 641 public int server; 642 public AssignRegionAction(int region, int server) { 643 super(Type.ASSIGN_REGION); 644 this.region = region; 645 this.server = server; 646 } 647 @Override 648 public Action undoAction() { 649 // TODO implement this. This action is not being used by the StochasticLB for now 650 // in case it uses it, we should implement this function. 651 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 652 } 653 @Override 654 public String toString() { 655 return type + ": " + region + ":" + server; 656 } 657 } 658 659 public static class MoveRegionAction extends Action { 660 public int region; 661 public int fromServer; 662 public int toServer; 663 664 public MoveRegionAction(int region, int fromServer, int toServer) { 665 super(Type.MOVE_REGION); 666 this.fromServer = fromServer; 667 this.region = region; 668 this.toServer = toServer; 669 } 670 @Override 671 public Action undoAction() { 672 return new MoveRegionAction (region, toServer, fromServer); 673 } 674 @Override 675 public String toString() { 676 return type + ": " + region + ":" + fromServer + " -> " + toServer; 677 } 678 } 679 680 public static class SwapRegionsAction extends Action { 681 public int fromServer; 682 public int fromRegion; 683 public int toServer; 684 public int toRegion; 685 public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { 686 super(Type.SWAP_REGIONS); 687 this.fromServer = fromServer; 688 this.fromRegion = fromRegion; 689 this.toServer = toServer; 690 this.toRegion = toRegion; 691 } 692 @Override 693 public Action undoAction() { 694 return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); 695 } 696 @Override 697 public String toString() { 698 return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; 699 } 700 } 701 702 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", 703 justification="Mistake. Too disruptive to change now") 704 public static final Action NullAction = new Action(Type.NULL); 705 706 public void doAction(Action action) { 707 switch (action.type) { 708 case NULL: break; 709 case ASSIGN_REGION: 710 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 711 assert action instanceof AssignRegionAction: action.getClass(); 712 AssignRegionAction ar = (AssignRegionAction) action; 713 regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); 714 regionMoved(ar.region, -1, ar.server); 715 break; 716 case MOVE_REGION: 717 assert action instanceof MoveRegionAction: action.getClass(); 718 MoveRegionAction mra = (MoveRegionAction) action; 719 regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); 720 regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); 721 regionMoved(mra.region, mra.fromServer, mra.toServer); 722 break; 723 case SWAP_REGIONS: 724 assert action instanceof SwapRegionsAction: action.getClass(); 725 SwapRegionsAction a = (SwapRegionsAction) action; 726 regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); 727 regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); 728 regionMoved(a.fromRegion, a.fromServer, a.toServer); 729 regionMoved(a.toRegion, a.toServer, a.fromServer); 730 break; 731 default: 732 throw new RuntimeException("Uknown action:" + action.type); 733 } 734 } 735 736 /** 737 * Return true if the placement of region on server would lower the availability 738 * of the region in question 739 * @return true or false 740 */ 741 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 742 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 743 return false; // safeguard against race between cluster.servers and servers from LB method args 744 } 745 int server = serversToIndex.get(serverName.getHostAndPort()); 746 int region = regionsToIndex.get(regionInfo); 747 748 int primary = regionIndexToPrimaryIndex[region]; 749 // there is a subset relation for server < host < rack 750 // check server first 751 752 if (contains(primariesOfRegionsPerServer[server], primary)) { 753 // check for whether there are other servers that we can place this region 754 for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { 755 if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { 756 return true; // meaning there is a better server 757 } 758 } 759 return false; // there is not a better server to place this 760 } 761 762 // check host 763 if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host 764 int host = serverIndexToHostIndex[server]; 765 if (contains(primariesOfRegionsPerHost[host], primary)) { 766 // check for whether there are other hosts that we can place this region 767 for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { 768 if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { 769 return true; // meaning there is a better host 770 } 771 } 772 return false; // there is not a better host to place this 773 } 774 } 775 776 // check rack 777 if (numRacks > 1) { 778 int rack = serverIndexToRackIndex[server]; 779 if (contains(primariesOfRegionsPerRack[rack], primary)) { 780 // check for whether there are other racks that we can place this region 781 for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { 782 if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { 783 return true; // meaning there is a better rack 784 } 785 } 786 return false; // there is not a better rack to place this 787 } 788 } 789 return false; 790 } 791 792 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 793 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { 794 return; 795 } 796 int server = serversToIndex.get(serverName.getHostAndPort()); 797 int region = regionsToIndex.get(regionInfo); 798 doAction(new AssignRegionAction(region, server)); 799 } 800 801 void regionMoved(int region, int oldServer, int newServer) { 802 regionIndexToServerIndex[region] = newServer; 803 if (initialRegionIndexToServerIndex[region] == newServer) { 804 numMovedRegions--; //region moved back to original location 805 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 806 numMovedRegions++; //region moved from original location 807 } 808 int tableIndex = regionIndexToTableIndex[region]; 809 if (oldServer >= 0) { 810 numRegionsPerServerPerTable[oldServer][tableIndex]--; 811 } 812 numRegionsPerServerPerTable[newServer][tableIndex]++; 813 814 //check whether this caused maxRegionsPerTable in the new Server to be updated 815 if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { 816 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; 817 } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) 818 == numMaxRegionsPerTable[tableIndex]) { 819 //recompute maxRegionsPerTable since the previous value was coming from the old server 820 numMaxRegionsPerTable[tableIndex] = 0; 821 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { 822 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { 823 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; 824 } 825 } 826 } 827 828 // update for servers 829 int primary = regionIndexToPrimaryIndex[region]; 830 if (oldServer >= 0) { 831 primariesOfRegionsPerServer[oldServer] = removeRegion( 832 primariesOfRegionsPerServer[oldServer], primary); 833 } 834 primariesOfRegionsPerServer[newServer] = addRegionSorted( 835 primariesOfRegionsPerServer[newServer], primary); 836 837 // update for hosts 838 if (multiServersPerHost) { 839 int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; 840 int newHost = serverIndexToHostIndex[newServer]; 841 if (newHost != oldHost) { 842 regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); 843 primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); 844 if (oldHost >= 0) { 845 regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); 846 primariesOfRegionsPerHost[oldHost] = removeRegion( 847 primariesOfRegionsPerHost[oldHost], primary); // will still be sorted 848 } 849 } 850 } 851 852 // update for racks 853 if (numRacks > 1) { 854 int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; 855 int newRack = serverIndexToRackIndex[newServer]; 856 if (newRack != oldRack) { 857 regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); 858 primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); 859 if (oldRack >= 0) { 860 regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); 861 primariesOfRegionsPerRack[oldRack] = removeRegion( 862 primariesOfRegionsPerRack[oldRack], primary); // will still be sorted 863 } 864 } 865 } 866 } 867 868 int[] removeRegion(int[] regions, int regionIndex) { 869 //TODO: this maybe costly. Consider using linked lists 870 int[] newRegions = new int[regions.length - 1]; 871 int i = 0; 872 for (i = 0; i < regions.length; i++) { 873 if (regions[i] == regionIndex) { 874 break; 875 } 876 newRegions[i] = regions[i]; 877 } 878 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); 879 return newRegions; 880 } 881 882 int[] addRegion(int[] regions, int regionIndex) { 883 int[] newRegions = new int[regions.length + 1]; 884 System.arraycopy(regions, 0, newRegions, 0, regions.length); 885 newRegions[newRegions.length - 1] = regionIndex; 886 return newRegions; 887 } 888 889 int[] addRegionSorted(int[] regions, int regionIndex) { 890 int[] newRegions = new int[regions.length + 1]; 891 int i = 0; 892 for (i = 0; i < regions.length; i++) { // find the index to insert 893 if (regions[i] > regionIndex) { 894 break; 895 } 896 } 897 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 898 System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half 899 newRegions[i] = regionIndex; 900 901 return newRegions; 902 } 903 904 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 905 int i = 0; 906 for (i = 0; i < regions.length; i++) { 907 if (regions[i] == regionIndex) { 908 regions[i] = newRegionIndex; 909 break; 910 } 911 } 912 return regions; 913 } 914 915 void sortServersByRegionCount() { 916 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 917 } 918 919 int getNumRegions(int server) { 920 return regionsPerServer[server].length; 921 } 922 923 boolean contains(int[] arr, int val) { 924 return Arrays.binarySearch(arr, val) >= 0; 925 } 926 927 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 928 929 int getLowestLocalityRegionOnServer(int serverIndex) { 930 if (regionFinder != null) { 931 float lowestLocality = 1.0f; 932 int lowestLocalityRegionIndex = -1; 933 if (regionsPerServer[serverIndex].length == 0) { 934 // No regions on that region server 935 return -1; 936 } 937 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 938 int regionIndex = regionsPerServer[serverIndex][j]; 939 HDFSBlocksDistribution distribution = regionFinder 940 .getBlockDistribution(regions[regionIndex]); 941 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 942 // skip empty region 943 if (distribution.getUniqueBlocksTotalWeight() == 0) { 944 continue; 945 } 946 if (locality < lowestLocality) { 947 lowestLocality = locality; 948 lowestLocalityRegionIndex = j; 949 } 950 } 951 if (lowestLocalityRegionIndex == -1) { 952 return -1; 953 } 954 if (LOG.isTraceEnabled()) { 955 LOG.trace("Lowest locality region is " 956 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 957 .getRegionNameAsString() + " with locality " + lowestLocality 958 + " and its region server contains " + regionsPerServer[serverIndex].length 959 + " regions"); 960 } 961 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 962 } else { 963 return -1; 964 } 965 } 966 967 float getLocalityOfRegion(int region, int server) { 968 if (regionFinder != null) { 969 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 970 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 971 } else { 972 return 0f; 973 } 974 } 975 976 @VisibleForTesting 977 protected void setNumRegions(int numRegions) { 978 this.numRegions = numRegions; 979 } 980 981 @VisibleForTesting 982 protected void setNumMovedRegions(int numMovedRegions) { 983 this.numMovedRegions = numMovedRegions; 984 } 985 986 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", 987 justification="Not important but should be fixed") 988 @Override 989 public String toString() { 990 StringBuilder desc = new StringBuilder("Cluster={servers=["); 991 for(ServerName sn:servers) { 992 desc.append(sn.getHostAndPort()).append(", "); 993 } 994 desc.append("], serverIndicesSortedByRegionCount=") 995 .append(Arrays.toString(serverIndicesSortedByRegionCount)) 996 .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); 997 998 desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) 999 .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1000 .append(", numTables=").append(numTables).append(", numMovedRegions=") 1001 .append(numMovedRegions).append('}'); 1002 return desc.toString(); 1003 } 1004 } 1005 1006 // slop for regions 1007 protected float slop; 1008 // overallSlop to control simpleLoadBalancer's cluster level threshold 1009 protected float overallSlop; 1010 protected Configuration config = HBaseConfiguration.create(); 1011 protected RackManager rackManager; 1012 private static final Random RANDOM = new Random(System.currentTimeMillis()); 1013 private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); 1014 protected MetricsBalancer metricsBalancer = null; 1015 protected ClusterMetrics clusterStatus = null; 1016 protected ServerName masterServerName; 1017 protected MasterServices services; 1018 protected boolean onlySystemTablesOnMaster; 1019 protected boolean maintenanceMode; 1020 1021 @Override 1022 public void setConf(Configuration conf) { 1023 this.config = conf; 1024 setSlop(conf); 1025 if (slop < 0) slop = 0; 1026 else if (slop > 1) slop = 1; 1027 1028 if (overallSlop < 0) overallSlop = 0; 1029 else if (overallSlop > 1) overallSlop = 1; 1030 1031 this.onlySystemTablesOnMaster = LoadBalancer.isSystemTablesOnlyOnMaster(this.config); 1032 1033 this.rackManager = new RackManager(getConf()); 1034 if (useRegionFinder) { 1035 regionFinder.setConf(conf); 1036 } 1037 // Print out base configs. Don't print overallSlop since it for simple balancer exclusively. 1038 LOG.info("slop={}, systemTablesOnMaster={}", 1039 this.slop, this.onlySystemTablesOnMaster); 1040 } 1041 1042 protected void setSlop(Configuration conf) { 1043 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); 1044 this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); 1045 } 1046 1047 /** 1048 * Check if a region belongs to some system table. 1049 * If so, the primary replica may be expected to be put on the master regionserver. 1050 */ 1051 public boolean shouldBeOnMaster(RegionInfo region) { 1052 return (this.maintenanceMode || this.onlySystemTablesOnMaster) 1053 && region.getTable().isSystemTable(); 1054 } 1055 1056 /** 1057 * Balance the regions that should be on master regionserver. 1058 */ 1059 protected List<RegionPlan> balanceMasterRegions(Map<ServerName, List<RegionInfo>> clusterMap) { 1060 if (masterServerName == null || clusterMap == null || clusterMap.size() <= 1) return null; 1061 List<RegionPlan> plans = null; 1062 List<RegionInfo> regions = clusterMap.get(masterServerName); 1063 if (regions != null) { 1064 Iterator<ServerName> keyIt = null; 1065 for (RegionInfo region: regions) { 1066 if (shouldBeOnMaster(region)) continue; 1067 1068 // Find a non-master regionserver to host the region 1069 if (keyIt == null || !keyIt.hasNext()) { 1070 keyIt = clusterMap.keySet().iterator(); 1071 } 1072 ServerName dest = keyIt.next(); 1073 if (masterServerName.equals(dest)) { 1074 if (!keyIt.hasNext()) { 1075 keyIt = clusterMap.keySet().iterator(); 1076 } 1077 dest = keyIt.next(); 1078 } 1079 1080 // Move this region away from the master regionserver 1081 RegionPlan plan = new RegionPlan(region, masterServerName, dest); 1082 if (plans == null) { 1083 plans = new ArrayList<>(); 1084 } 1085 plans.add(plan); 1086 } 1087 } 1088 for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) { 1089 if (masterServerName.equals(server.getKey())) continue; 1090 for (RegionInfo region: server.getValue()) { 1091 if (!shouldBeOnMaster(region)) continue; 1092 1093 // Move this region to the master regionserver 1094 RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName); 1095 if (plans == null) { 1096 plans = new ArrayList<>(); 1097 } 1098 plans.add(plan); 1099 } 1100 } 1101 return plans; 1102 } 1103 1104 /** 1105 * If master is configured to carry system tables only, in here is 1106 * where we figure what to assign it. 1107 */ 1108 protected Map<ServerName, List<RegionInfo>> assignMasterSystemRegions( 1109 Collection<RegionInfo> regions, List<ServerName> servers) { 1110 if (servers == null || regions == null || regions.isEmpty()) { 1111 return null; 1112 } 1113 Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); 1114 if (this.maintenanceMode || this.onlySystemTablesOnMaster) { 1115 if (masterServerName != null && servers.contains(masterServerName)) { 1116 assignments.put(masterServerName, new ArrayList<>()); 1117 for (RegionInfo region : regions) { 1118 if (shouldBeOnMaster(region)) { 1119 assignments.get(masterServerName).add(region); 1120 } 1121 } 1122 } 1123 } 1124 return assignments; 1125 } 1126 1127 @Override 1128 public Configuration getConf() { 1129 return this.config; 1130 } 1131 1132 @Override 1133 public synchronized void setClusterMetrics(ClusterMetrics st) { 1134 this.clusterStatus = st; 1135 if (useRegionFinder) { 1136 regionFinder.setClusterMetrics(st); 1137 } 1138 } 1139 1140 @Override 1141 public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){ 1142 1143 } 1144 1145 @Override 1146 public void setMasterServices(MasterServices masterServices) { 1147 masterServerName = masterServices.getServerName(); 1148 this.services = masterServices; 1149 if (useRegionFinder) { 1150 this.regionFinder.setServices(masterServices); 1151 } 1152 if (this.services.isInMaintenanceMode()) { 1153 this.maintenanceMode = true; 1154 } 1155 } 1156 1157 @Override 1158 public void postMasterStartupInitialize() { 1159 if (services != null && regionFinder != null) { 1160 try { 1161 Set<RegionInfo> regions = 1162 services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet(); 1163 regionFinder.refreshAndWait(regions); 1164 } catch (Exception e) { 1165 LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e); 1166 } 1167 } 1168 } 1169 1170 public void setRackManager(RackManager rackManager) { 1171 this.rackManager = rackManager; 1172 } 1173 1174 protected boolean needsBalance(Cluster c) { 1175 ClusterLoadState cs = new ClusterLoadState(c.clusterState); 1176 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 1177 if (LOG.isDebugEnabled()) { 1178 LOG.debug("Not running balancer because only " + cs.getNumServers() 1179 + " active regionserver(s)"); 1180 } 1181 return false; 1182 } 1183 if(areSomeRegionReplicasColocated(c)) return true; 1184 // Check if we even need to do any load balancing 1185 // HBASE-3681 check sloppiness first 1186 float average = cs.getLoadAverage(); // for logging 1187 int floor = (int) Math.floor(average * (1 - slop)); 1188 int ceiling = (int) Math.ceil(average * (1 + slop)); 1189 if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { 1190 NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad(); 1191 if (LOG.isTraceEnabled()) { 1192 // If nothing to balance, then don't say anything unless trace-level logging. 1193 LOG.trace("Skipping load balancing because balanced cluster; " + 1194 "servers=" + cs.getNumServers() + 1195 " regions=" + cs.getNumRegions() + " average=" + average + 1196 " mostloaded=" + serversByLoad.lastKey().getLoad() + 1197 " leastloaded=" + serversByLoad.firstKey().getLoad()); 1198 } 1199 return false; 1200 } 1201 return true; 1202 } 1203 1204 /** 1205 * Subclasses should implement this to return true if the cluster has nodes that hosts 1206 * multiple replicas for the same region, or, if there are multiple racks and the same 1207 * rack hosts replicas of the same region 1208 * @param c Cluster information 1209 * @return whether region replicas are currently co-located 1210 */ 1211 protected boolean areSomeRegionReplicasColocated(Cluster c) { 1212 return false; 1213 } 1214 1215 /** 1216 * Generates a bulk assignment plan to be used on cluster startup using a 1217 * simple round-robin assignment. 1218 * <p> 1219 * Takes a list of all the regions and all the servers in the cluster and 1220 * returns a map of each server to the regions that it should be assigned. 1221 * <p> 1222 * Currently implemented as a round-robin assignment. Same invariant as load 1223 * balancing, all servers holding floor(avg) or ceiling(avg). 1224 * 1225 * TODO: Use block locations from HDFS to place regions with their blocks 1226 * 1227 * @param regions all regions 1228 * @param servers all servers 1229 * @return map of server to the regions it should take, or null if no 1230 * assignment is possible (ie. no regions or no servers) 1231 */ 1232 @Override 1233 public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, 1234 List<ServerName> servers) throws HBaseIOException { 1235 metricsBalancer.incrMiscInvocations(); 1236 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions, servers); 1237 if (assignments != null && !assignments.isEmpty()) { 1238 servers = new ArrayList<>(servers); 1239 // Guarantee not to put other regions on master 1240 servers.remove(masterServerName); 1241 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1242 if (!masterRegions.isEmpty()) { 1243 regions = new ArrayList<>(regions); 1244 regions.removeAll(masterRegions); 1245 } 1246 } 1247 if (this.maintenanceMode || regions == null || regions.isEmpty()) { 1248 return assignments; 1249 } 1250 1251 int numServers = servers == null ? 0 : servers.size(); 1252 if (numServers == 0) { 1253 LOG.warn("Wanted to do round robin assignment but no servers to assign to"); 1254 return null; 1255 } 1256 1257 // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the 1258 // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate 1259 // generator for AssignRegionAction. The LB will ensure the regions are mostly local 1260 // and balanced. This should also run fast with fewer number of iterations. 1261 1262 if (numServers == 1) { // Only one server, nothing fancy we can do here 1263 ServerName server = servers.get(0); 1264 assignments.put(server, new ArrayList<>(regions)); 1265 return assignments; 1266 } 1267 1268 Cluster cluster = createCluster(servers, regions, false); 1269 List<RegionInfo> unassignedRegions = new ArrayList<>(); 1270 1271 roundRobinAssignment(cluster, regions, unassignedRegions, 1272 servers, assignments); 1273 1274 List<RegionInfo> lastFewRegions = new ArrayList<>(); 1275 // assign the remaining by going through the list and try to assign to servers one-by-one 1276 int serverIdx = RANDOM.nextInt(numServers); 1277 OUTER : for (RegionInfo region : unassignedRegions) { 1278 boolean assigned = false; 1279 INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one 1280 ServerName serverName = servers.get((j + serverIdx) % numServers); 1281 if (!cluster.wouldLowerAvailability(region, serverName)) { 1282 List<RegionInfo> serverRegions = 1283 assignments.computeIfAbsent(serverName, k -> new ArrayList<>()); 1284 if (!RegionReplicaUtil.isDefaultReplica(region.getReplicaId())) { 1285 // if the region is not a default replica 1286 // check if the assignments map has the other replica region on this server 1287 for (RegionInfo hri : serverRegions) { 1288 if (RegionReplicaUtil.isReplicasForSameRegion(region, hri)) { 1289 if (LOG.isTraceEnabled()) { 1290 LOG.trace("Skipping the server, " + serverName 1291 + " , got the same server for the region " + region); 1292 } 1293 // do not allow this case. The unassignedRegions we got because the 1294 // replica region in this list was not assigned because of lower availablity issue. 1295 // So when we assign here we should ensure that as far as possible the server being 1296 // selected does not have the server where the replica region was not assigned. 1297 continue INNER; // continue the inner loop, ie go to the next server 1298 } 1299 } 1300 } 1301 serverRegions.add(region); 1302 cluster.doAssignRegion(region, serverName); 1303 serverIdx = (j + serverIdx + 1) % numServers; //remain from next server 1304 assigned = true; 1305 break; 1306 } 1307 } 1308 if (!assigned) { 1309 lastFewRegions.add(region); 1310 } 1311 } 1312 // just sprinkle the rest of the regions on random regionservers. The balanceCluster will 1313 // make it optimal later. we can end up with this if numReplicas > numServers. 1314 for (RegionInfo region : lastFewRegions) { 1315 int i = RANDOM.nextInt(numServers); 1316 ServerName server = servers.get(i); 1317 List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>()); 1318 serverRegions.add(region); 1319 cluster.doAssignRegion(region, server); 1320 } 1321 return assignments; 1322 } 1323 1324 protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions, 1325 boolean hasRegionReplica) { 1326 // Get the snapshot of the current assignments for the regions in question, and then create 1327 // a cluster out of it. Note that we might have replicas already assigned to some servers 1328 // earlier. So we want to get the snapshot to see those assignments, but this will only contain 1329 // replicas of the regions that are passed (for performance). 1330 Map<ServerName, List<RegionInfo>> clusterState = null; 1331 if (!hasRegionReplica) { 1332 clusterState = getRegionAssignmentsByServer(regions); 1333 } else { 1334 // for the case where we have region replica it is better we get the entire cluster's snapshot 1335 clusterState = getRegionAssignmentsByServer(null); 1336 } 1337 1338 for (ServerName server : servers) { 1339 if (!clusterState.containsKey(server)) { 1340 clusterState.put(server, EMPTY_REGION_LIST); 1341 } 1342 } 1343 return new Cluster(regions, clusterState, null, this.regionFinder, 1344 rackManager); 1345 } 1346 1347 private List<ServerName> findIdleServers(List<ServerName> servers) { 1348 return this.services.getServerManager() 1349 .getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR); 1350 } 1351 1352 /** 1353 * Used to assign a single region to a random server. 1354 */ 1355 @Override 1356 public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers) 1357 throws HBaseIOException { 1358 metricsBalancer.incrMiscInvocations(); 1359 if (servers != null && servers.contains(masterServerName)) { 1360 if (shouldBeOnMaster(regionInfo)) { 1361 return masterServerName; 1362 } 1363 if (!LoadBalancer.isTablesOnMaster(getConf())) { 1364 // Guarantee we do not put any regions on master 1365 servers = new ArrayList<>(servers); 1366 servers.remove(masterServerName); 1367 } 1368 } 1369 1370 int numServers = servers == null ? 0 : servers.size(); 1371 if (numServers == 0) { 1372 LOG.warn("Wanted to retain assignment but no servers to assign to"); 1373 return null; 1374 } 1375 if (numServers == 1) { // Only one server, nothing fancy we can do here 1376 return servers.get(0); 1377 } 1378 List<ServerName> idleServers = findIdleServers(servers); 1379 if (idleServers.size() == 1) { 1380 return idleServers.get(0); 1381 } 1382 final List<ServerName> finalServers = idleServers.isEmpty() ? 1383 servers : idleServers; 1384 List<RegionInfo> regions = Lists.newArrayList(regionInfo); 1385 Cluster cluster = createCluster(finalServers, regions, false); 1386 return randomAssignment(cluster, regionInfo, finalServers); 1387 } 1388 1389 /** 1390 * Generates a bulk assignment startup plan, attempting to reuse the existing 1391 * assignment information from META, but adjusting for the specified list of 1392 * available/online servers available for assignment. 1393 * <p> 1394 * Takes a map of all regions to their existing assignment from META. Also 1395 * takes a list of online servers for regions to be assigned to. Attempts to 1396 * retain all assignment, so in some instances initial assignment will not be 1397 * completely balanced. 1398 * <p> 1399 * Any leftover regions without an existing server to be assigned to will be 1400 * assigned randomly to available servers. 1401 * 1402 * @param regions regions and existing assignment from meta 1403 * @param servers available servers 1404 * @return map of servers and regions to be assigned to them 1405 */ 1406 @Override 1407 public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, 1408 List<ServerName> servers) throws HBaseIOException { 1409 // Update metrics 1410 metricsBalancer.incrMiscInvocations(); 1411 Map<ServerName, List<RegionInfo>> assignments = assignMasterSystemRegions(regions.keySet(), servers); 1412 if (assignments != null && !assignments.isEmpty()) { 1413 servers = new ArrayList<>(servers); 1414 // Guarantee not to put other regions on master 1415 servers.remove(masterServerName); 1416 List<RegionInfo> masterRegions = assignments.get(masterServerName); 1417 regions = regions.entrySet().stream().filter(e -> !masterRegions.contains(e.getKey())) 1418 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); 1419 } 1420 if (this.maintenanceMode || regions.isEmpty()) { 1421 return assignments; 1422 } 1423 1424 int numServers = servers == null ? 0 : servers.size(); 1425 if (numServers == 0) { 1426 LOG.warn("Wanted to do retain assignment but no servers to assign to"); 1427 return null; 1428 } 1429 if (numServers == 1) { // Only one server, nothing fancy we can do here 1430 ServerName server = servers.get(0); 1431 assignments.put(server, new ArrayList<>(regions.keySet())); 1432 return assignments; 1433 } 1434 1435 // Group all of the old assignments by their hostname. 1436 // We can't group directly by ServerName since the servers all have 1437 // new start-codes. 1438 1439 // Group the servers by their hostname. It's possible we have multiple 1440 // servers on the same host on different ports. 1441 ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create(); 1442 for (ServerName server : servers) { 1443 assignments.put(server, new ArrayList<>()); 1444 serversByHostname.put(server.getHostnameLowerCase(), server); 1445 } 1446 1447 // Collection of the hostnames that used to have regions 1448 // assigned, but for which we no longer have any RS running 1449 // after the cluster restart. 1450 Set<String> oldHostsNoLongerPresent = Sets.newTreeSet(); 1451 1452 // If the old servers aren't present, lets assign those regions later. 1453 List<RegionInfo> randomAssignRegions = Lists.newArrayList(); 1454 1455 int numRandomAssignments = 0; 1456 int numRetainedAssigments = 0; 1457 boolean hasRegionReplica = false; 1458 for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) { 1459 RegionInfo region = entry.getKey(); 1460 ServerName oldServerName = entry.getValue(); 1461 // In the current set of regions even if one has region replica let us go with 1462 // getting the entire snapshot 1463 if (this.services != null) { // for tests 1464 AssignmentManager am = this.services.getAssignmentManager(); 1465 if (am != null) { 1466 RegionStates rss = am.getRegionStates(); 1467 if (!hasRegionReplica && rss.isReplicaAvailableForRegion(region)) { 1468 hasRegionReplica = true; 1469 } 1470 } 1471 } 1472 List<ServerName> localServers = new ArrayList<>(); 1473 if (oldServerName != null) { 1474 localServers = serversByHostname.get(oldServerName.getHostnameLowerCase()); 1475 } 1476 if (localServers.isEmpty()) { 1477 // No servers on the new cluster match up with this hostname, assign randomly, later. 1478 randomAssignRegions.add(region); 1479 if (oldServerName != null) { 1480 oldHostsNoLongerPresent.add(oldServerName.getHostnameLowerCase()); 1481 } 1482 } else if (localServers.size() == 1) { 1483 // the usual case - one new server on same host 1484 ServerName target = localServers.get(0); 1485 assignments.get(target).add(region); 1486 numRetainedAssigments++; 1487 } else { 1488 // multiple new servers in the cluster on this same host 1489 if (localServers.contains(oldServerName)) { 1490 assignments.get(oldServerName).add(region); 1491 numRetainedAssigments++; 1492 } else { 1493 ServerName target = null; 1494 for (ServerName tmp : localServers) { 1495 if (tmp.getPort() == oldServerName.getPort()) { 1496 target = tmp; 1497 assignments.get(tmp).add(region); 1498 numRetainedAssigments++; 1499 break; 1500 } 1501 } 1502 if (target == null) { 1503 randomAssignRegions.add(region); 1504 } 1505 } 1506 } 1507 } 1508 1509 // If servers from prior assignment aren't present, then lets do randomAssignment on regions. 1510 if (randomAssignRegions.size() > 0) { 1511 Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica); 1512 for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) { 1513 ServerName sn = entry.getKey(); 1514 for (RegionInfo region : entry.getValue()) { 1515 cluster.doAssignRegion(region, sn); 1516 } 1517 } 1518 for (RegionInfo region : randomAssignRegions) { 1519 ServerName target = randomAssignment(cluster, region, servers); 1520 assignments.get(target).add(region); 1521 numRandomAssignments++; 1522 } 1523 } 1524 1525 String randomAssignMsg = ""; 1526 if (numRandomAssignments > 0) { 1527 randomAssignMsg = 1528 numRandomAssignments + " regions were assigned " 1529 + "to random hosts, since the old hosts for these regions are no " 1530 + "longer present in the cluster. These hosts were:\n " 1531 + Joiner.on("\n ").join(oldHostsNoLongerPresent); 1532 } 1533 1534 LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments 1535 + " retained the pre-restart assignment. " + randomAssignMsg); 1536 return assignments; 1537 } 1538 1539 @Override 1540 public void initialize() throws HBaseIOException{ 1541 } 1542 1543 @Override 1544 public void regionOnline(RegionInfo regionInfo, ServerName sn) { 1545 } 1546 1547 @Override 1548 public void regionOffline(RegionInfo regionInfo) { 1549 } 1550 1551 @Override 1552 public boolean isStopped() { 1553 return stopped; 1554 } 1555 1556 @Override 1557 public void stop(String why) { 1558 LOG.info("Load Balancer stop requested: "+why); 1559 stopped = true; 1560 } 1561 1562 /** 1563 * Updates the balancer status tag reported to JMX 1564 */ 1565 public void updateBalancerStatus(boolean status) { 1566 metricsBalancer.balancerStatus(status); 1567 } 1568 1569 /** 1570 * Used to assign a single region to a random server. 1571 */ 1572 private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, 1573 List<ServerName> servers) { 1574 int numServers = servers.size(); // servers is not null, numServers > 1 1575 ServerName sn = null; 1576 final int maxIterations = numServers * 4; 1577 int iterations = 0; 1578 List<ServerName> usedSNs = new ArrayList<>(servers.size()); 1579 do { 1580 int i = RANDOM.nextInt(numServers); 1581 sn = servers.get(i); 1582 if (!usedSNs.contains(sn)) { 1583 usedSNs.add(sn); 1584 } 1585 } while (cluster.wouldLowerAvailability(regionInfo, sn) 1586 && iterations++ < maxIterations); 1587 if (iterations >= maxIterations) { 1588 // We have reached the max. Means the servers that we collected is still lowering the 1589 // availability 1590 for (ServerName unusedServer : servers) { 1591 if (!usedSNs.contains(unusedServer)) { 1592 // check if any other unused server is there for us to use. 1593 // If so use it. Else we have not other go but to go with one of them 1594 if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) { 1595 sn = unusedServer; 1596 break; 1597 } 1598 } 1599 } 1600 } 1601 cluster.doAssignRegion(regionInfo, sn); 1602 return sn; 1603 } 1604 1605 /** 1606 * Round robin a list of regions to a list of servers 1607 */ 1608 private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions, 1609 List<RegionInfo> unassignedRegions, List<ServerName> servers, 1610 Map<ServerName, List<RegionInfo>> assignments) { 1611 1612 int numServers = servers.size(); 1613 int numRegions = regions.size(); 1614 int max = (int) Math.ceil((float) numRegions / numServers); 1615 int serverIdx = 0; 1616 if (numServers > 1) { 1617 serverIdx = RANDOM.nextInt(numServers); 1618 } 1619 int regionIdx = 0; 1620 1621 for (int j = 0; j < numServers; j++) { 1622 ServerName server = servers.get((j + serverIdx) % numServers); 1623 List<RegionInfo> serverRegions = new ArrayList<>(max); 1624 for (int i = regionIdx; i < numRegions; i += numServers) { 1625 RegionInfo region = regions.get(i % numRegions); 1626 if (cluster.wouldLowerAvailability(region, server)) { 1627 unassignedRegions.add(region); 1628 } else { 1629 serverRegions.add(region); 1630 cluster.doAssignRegion(region, server); 1631 } 1632 } 1633 assignments.put(server, serverRegions); 1634 regionIdx++; 1635 } 1636 } 1637 1638 protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer( 1639 Collection<RegionInfo> regions) { 1640 if (this.services != null && this.services.getAssignmentManager() != null) { 1641 return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); 1642 } else { 1643 return new HashMap<>(); 1644 } 1645 } 1646 1647 @Override 1648 public void onConfigurationChange(Configuration conf) { 1649 } 1650}