001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.text.DecimalFormat; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Random; 029import java.util.Scanner; 030import java.util.Set; 031import java.util.TreeMap; 032import java.util.concurrent.ThreadLocalRandom; 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.AsyncClusterConnection; 041import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 042import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; 046import org.apache.hadoop.hbase.favored.FavoredNodesPlan; 047import org.apache.hadoop.hbase.security.User; 048import org.apache.hadoop.hbase.util.FSUtils; 049import org.apache.hadoop.hbase.util.FutureUtils; 050import org.apache.hadoop.hbase.util.MunkresAssignment; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 057import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 058import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 059import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 060import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 061import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 062 063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 066 067/** 068 * A tool that is used for manipulating and viewing favored nodes information for regions. Run with 069 * -h to get a list of the options 070 */ 071@InterfaceAudience.Private 072// TODO: Remove? Unused. Partially implemented only. 073public class RegionPlacementMaintainer implements Closeable { 074 private static final Logger LOG = 075 LoggerFactory.getLogger(RegionPlacementMaintainer.class.getName()); 076 // The cost of a placement that should never be assigned. 077 private static final float MAX_COST = Float.POSITIVE_INFINITY; 078 079 // The cost of a placement that is undesirable but acceptable. 080 private static final float AVOID_COST = 100000f; 081 082 // The amount by which the cost of a placement is increased if it is the 083 // last slot of the server. This is done to more evenly distribute the slop 084 // amongst servers. 085 private static final float LAST_SLOT_COST_PENALTY = 0.5f; 086 087 // The amount by which the cost of a primary placement is penalized if it is 088 // not the host currently serving the region. This is done to minimize moves. 089 private static final float NOT_CURRENT_HOST_PENALTY = 0.1f; 090 091 private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false; 092 093 private Configuration conf; 094 private final boolean enforceLocality; 095 private final boolean enforceMinAssignmentMove; 096 private RackManager rackManager; 097 private Set<TableName> targetTableSet; 098 private AsyncClusterConnection connection; 099 100 public RegionPlacementMaintainer(Configuration conf) throws IOException { 101 this(conf, true, true); 102 } 103 104 public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality, 105 boolean enforceMinAssignmentMove) { 106 this.conf = conf; 107 this.enforceLocality = enforceLocality; 108 this.enforceMinAssignmentMove = enforceMinAssignmentMove; 109 this.targetTableSet = new HashSet<>(); 110 this.rackManager = new RackManager(conf); 111 } 112 113 private static void printHelp(Options opt) { 114 new HelpFormatter().printHelp( 115 "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " 116 + "-diff>" + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" 117 + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", 118 opt); 119 } 120 121 private AsyncClusterConnection getConnection() throws IOException { 122 if (connection == null) { 123 connection = 124 ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent()); 125 } 126 return connection; 127 } 128 129 public void setTargetTableName(String[] tableNames) { 130 if (tableNames != null) { 131 for (String table : tableNames) 132 this.targetTableSet.add(TableName.valueOf(table)); 133 } 134 } 135 136 /** 137 * @return the new RegionAssignmentSnapshot 138 */ 139 public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException { 140 SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = 141 new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); 142 currentAssignmentShapshot.initialize(); 143 return currentAssignmentShapshot; 144 } 145 146 /** 147 * Verify the region placement is consistent with the assignment plan 148 */ 149 public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode) 150 throws IOException { 151 System.out 152 .println("Start to verify the region assignment and " + "generate the verification report"); 153 // Get the region assignment snapshot 154 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 155 156 // Get all the tables 157 Set<TableName> tables = snapshot.getTableSet(); 158 159 // Get the region locality map 160 Map<String, Map<String, Float>> regionLocalityMap = null; 161 if (this.enforceLocality == true) { 162 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 163 } 164 List<AssignmentVerificationReport> reports = new ArrayList<>(); 165 // Iterate all the tables to fill up the verification report 166 for (TableName table : tables) { 167 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 168 continue; 169 } 170 AssignmentVerificationReport report = new AssignmentVerificationReport(); 171 report.fillUp(table, snapshot, regionLocalityMap); 172 report.print(isDetailMode); 173 reports.add(report); 174 } 175 return reports; 176 } 177 178 /** 179 * Generate the assignment plan for the existing table nnnn * @param 180 * munkresForSecondaryAndTertiary if set on true the assignment plan for the tertiary and 181 * secondary will be generated with Munkres algorithm, otherwise will be generated using 182 * placeSecondaryAndTertiaryRS n 183 */ 184 private void genAssignmentPlan(TableName tableName, 185 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot, 186 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan, 187 boolean munkresForSecondaryAndTertiary) throws IOException { 188 // Get the all the regions for the current table 189 List<RegionInfo> regions = assignmentSnapshot.getTableToRegionMap().get(tableName); 190 int numRegions = regions.size(); 191 192 // Get the current assignment map 193 Map<RegionInfo, ServerName> currentAssignmentMap = 194 assignmentSnapshot.getRegionToRegionServerMap(); 195 196 // Get the all the region servers 197 List<ServerName> servers = new ArrayList<>(); 198 servers.addAll(FutureUtils.get(getConnection().getAdmin().getRegionServers())); 199 200 LOG.info("Start to generate assignment plan for " + numRegions + " regions from table " 201 + tableName + " with " + servers.size() + " region servers"); 202 203 int slotsPerServer = (int) Math.ceil((float) numRegions / servers.size()); 204 int regionSlots = slotsPerServer * servers.size(); 205 206 // Compute the primary, secondary and tertiary costs for each region/server 207 // pair. These costs are based only on node locality and rack locality, and 208 // will be modified later. 209 float[][] primaryCost = new float[numRegions][regionSlots]; 210 float[][] secondaryCost = new float[numRegions][regionSlots]; 211 float[][] tertiaryCost = new float[numRegions][regionSlots]; 212 213 if (this.enforceLocality && regionLocalityMap != null) { 214 // Transform the locality mapping into a 2D array, assuming that any 215 // unspecified locality value is 0. 216 float[][] localityPerServer = new float[numRegions][regionSlots]; 217 for (int i = 0; i < numRegions; i++) { 218 Map<String, Float> serverLocalityMap = 219 regionLocalityMap.get(regions.get(i).getEncodedName()); 220 if (serverLocalityMap == null) { 221 continue; 222 } 223 for (int j = 0; j < servers.size(); j++) { 224 String serverName = servers.get(j).getHostname(); 225 if (serverName == null) { 226 continue; 227 } 228 Float locality = serverLocalityMap.get(serverName); 229 if (locality == null) { 230 continue; 231 } 232 for (int k = 0; k < slotsPerServer; k++) { 233 // If we can't find the locality of a region to a server, which occurs 234 // because locality is only reported for servers which have some 235 // blocks of a region local, then the locality for that pair is 0. 236 localityPerServer[i][j * slotsPerServer + k] = locality.floatValue(); 237 } 238 } 239 } 240 241 // Compute the total rack locality for each region in each rack. The total 242 // rack locality is the sum of the localities of a region on all servers in 243 // a rack. 244 Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>(); 245 for (int i = 0; i < numRegions; i++) { 246 RegionInfo region = regions.get(i); 247 for (int j = 0; j < regionSlots; j += slotsPerServer) { 248 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 249 Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack); 250 if (rackLocality == null) { 251 rackLocality = new HashMap<>(); 252 rackRegionLocality.put(rack, rackLocality); 253 } 254 Float localityObj = rackLocality.get(region); 255 float locality = localityObj == null ? 0 : localityObj.floatValue(); 256 locality += localityPerServer[i][j]; 257 rackLocality.put(region, locality); 258 } 259 } 260 for (int i = 0; i < numRegions; i++) { 261 for (int j = 0; j < regionSlots; j++) { 262 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 263 Float totalRackLocalityObj = rackRegionLocality.get(rack).get(regions.get(i)); 264 float totalRackLocality = 265 totalRackLocalityObj == null ? 0 : totalRackLocalityObj.floatValue(); 266 267 // Primary cost aims to favor servers with high node locality and low 268 // rack locality, so that secondaries and tertiaries can be chosen for 269 // nodes with high rack locality. This might give primaries with 270 // slightly less locality at first compared to a cost which only 271 // considers the node locality, but should be better in the long run. 272 primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - totalRackLocality); 273 274 // Secondary cost aims to favor servers with high node locality and high 275 // rack locality since the tertiary will be chosen from the same rack as 276 // the secondary. This could be negative, but that is okay. 277 secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality); 278 279 // Tertiary cost is only concerned with the node locality. It will later 280 // be restricted to only hosts on the same rack as the secondary. 281 tertiaryCost[i][j] = 1 - localityPerServer[i][j]; 282 } 283 } 284 } 285 286 if (this.enforceMinAssignmentMove && currentAssignmentMap != null) { 287 // We want to minimize the number of regions which move as the result of a 288 // new assignment. Therefore, slightly penalize any placement which is for 289 // a host that is not currently serving the region. 290 for (int i = 0; i < numRegions; i++) { 291 for (int j = 0; j < servers.size(); j++) { 292 ServerName currentAddress = currentAssignmentMap.get(regions.get(i)); 293 if (currentAddress != null && !currentAddress.equals(servers.get(j))) { 294 for (int k = 0; k < slotsPerServer; k++) { 295 primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY; 296 } 297 } 298 } 299 } 300 } 301 302 // Artificially increase cost of last slot of each server to evenly 303 // distribute the slop, otherwise there will be a few servers with too few 304 // regions and many servers with the max number of regions. 305 for (int i = 0; i < numRegions; i++) { 306 for (int j = 0; j < regionSlots; j += slotsPerServer) { 307 primaryCost[i][j] += LAST_SLOT_COST_PENALTY; 308 secondaryCost[i][j] += LAST_SLOT_COST_PENALTY; 309 tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY; 310 } 311 } 312 313 RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 314 primaryCost = randomizedMatrix.transform(primaryCost); 315 int[] primaryAssignment = new MunkresAssignment(primaryCost).solve(); 316 primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment); 317 318 // Modify the secondary and tertiary costs for each region/server pair to 319 // prevent a region from being assigned to the same rack for both primary 320 // and either one of secondary or tertiary. 321 for (int i = 0; i < numRegions; i++) { 322 int slot = primaryAssignment[i]; 323 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 324 for (int k = 0; k < servers.size(); k++) { 325 if (!rackManager.getRack(servers.get(k)).equals(rack)) { 326 continue; 327 } 328 if (k == slot / slotsPerServer) { 329 // Same node, do not place secondary or tertiary here ever. 330 for (int m = 0; m < slotsPerServer; m++) { 331 secondaryCost[i][k * slotsPerServer + m] = MAX_COST; 332 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 333 } 334 } else { 335 // Same rack, do not place secondary or tertiary here if possible. 336 for (int m = 0; m < slotsPerServer; m++) { 337 secondaryCost[i][k * slotsPerServer + m] = AVOID_COST; 338 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 339 } 340 } 341 } 342 } 343 if (munkresForSecondaryAndTertiary) { 344 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 345 secondaryCost = randomizedMatrix.transform(secondaryCost); 346 int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve(); 347 secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment); 348 349 // Modify the tertiary costs for each region/server pair to ensure that a 350 // region is assigned to a tertiary server on the same rack as its secondary 351 // server, but not the same server in that rack. 352 for (int i = 0; i < numRegions; i++) { 353 int slot = secondaryAssignment[i]; 354 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 355 for (int k = 0; k < servers.size(); k++) { 356 if (k == slot / slotsPerServer) { 357 // Same node, do not place tertiary here ever. 358 for (int m = 0; m < slotsPerServer; m++) { 359 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 360 } 361 } else { 362 if (rackManager.getRack(servers.get(k)).equals(rack)) { 363 continue; 364 } 365 // Different rack, do not place tertiary here if possible. 366 for (int m = 0; m < slotsPerServer; m++) { 367 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 368 } 369 } 370 } 371 } 372 373 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 374 tertiaryCost = randomizedMatrix.transform(tertiaryCost); 375 int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve(); 376 tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment); 377 378 for (int i = 0; i < numRegions; i++) { 379 List<ServerName> favoredServers = 380 new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 381 ServerName s = servers.get(primaryAssignment[i] / slotsPerServer); 382 favoredServers 383 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 384 385 s = servers.get(secondaryAssignment[i] / slotsPerServer); 386 favoredServers 387 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 388 389 s = servers.get(tertiaryAssignment[i] / slotsPerServer); 390 favoredServers 391 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 392 // Update the assignment plan 393 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 394 } 395 LOG.info("Generated the assignment plan for " + numRegions + " regions from table " 396 + tableName + " with " + servers.size() + " region servers"); 397 LOG.info("Assignment plan for secondary and tertiary generated " + "using MunkresAssignment"); 398 } else { 399 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(); 400 for (int i = 0; i < numRegions; i++) { 401 primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer)); 402 } 403 FavoredNodeAssignmentHelper favoredNodeHelper = 404 new FavoredNodeAssignmentHelper(servers, conf); 405 favoredNodeHelper.initialize(); 406 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = 407 favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap); 408 for (int i = 0; i < numRegions; i++) { 409 List<ServerName> favoredServers = 410 new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 411 RegionInfo currentRegion = regions.get(i); 412 ServerName s = primaryRSMap.get(currentRegion); 413 favoredServers 414 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 415 416 ServerName[] secondaryAndTertiary = secondaryAndTertiaryMap.get(currentRegion); 417 s = secondaryAndTertiary[0]; 418 favoredServers 419 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 420 421 s = secondaryAndTertiary[1]; 422 favoredServers 423 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 424 // Update the assignment plan 425 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 426 } 427 LOG.info("Generated the assignment plan for " + numRegions + " regions from table " 428 + tableName + " with " + servers.size() + " region servers"); 429 LOG.info("Assignment plan for secondary and tertiary generated " 430 + "using placeSecondaryAndTertiaryWithRestrictions method"); 431 } 432 } 433 434 public FavoredNodesPlan getNewAssignmentPlan() throws IOException { 435 // Get the current region assignment snapshot by scanning from the META 436 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = this.getRegionAssignmentSnapshot(); 437 438 // Get the region locality map 439 Map<String, Map<String, Float>> regionLocalityMap = null; 440 if (this.enforceLocality) { 441 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 442 } 443 // Initialize the assignment plan 444 FavoredNodesPlan plan = new FavoredNodesPlan(); 445 446 // Get the table to region mapping 447 Map<TableName, List<RegionInfo>> tableToRegionMap = assignmentSnapshot.getTableToRegionMap(); 448 LOG.info("Start to generate the new assignment plan for the " 449 + +tableToRegionMap.keySet().size() + " tables"); 450 for (TableName table : tableToRegionMap.keySet()) { 451 try { 452 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 453 continue; 454 } 455 // TODO: maybe run the placement in parallel for each table 456 genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan, 457 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY); 458 } catch (Exception e) { 459 LOG.error("Get some exceptions for placing primary region server" + "for table " + table 460 + " because " + e); 461 } 462 } 463 LOG.info("Finish to generate the new assignment plan for the " 464 + +tableToRegionMap.keySet().size() + " tables"); 465 return plan; 466 } 467 468 @Override 469 public void close() throws IOException { 470 Closeables.close(connection, true); 471 } 472 473 /** 474 * Some algorithms for solving the assignment problem may traverse workers or jobs in linear order 475 * which may result in skewing the assignments of the first jobs in the matrix toward the last 476 * workers in the matrix if the costs are uniform. To avoid this kind of clumping, we can 477 * randomize the rows and columns of the cost matrix in a reversible way, such that the solution 478 * to the assignment problem can be interpreted in terms of the original untransformed cost 479 * matrix. Rows and columns are transformed independently such that the elements contained in any 480 * row of the input matrix are the same as the elements in the corresponding output matrix, and 481 * each row has its elements transformed in the same way. Similarly for columns. 482 */ 483 protected static class RandomizedMatrix { 484 private final int rows; 485 private final int cols; 486 private final int[] rowTransform; 487 private final int[] rowInverse; 488 private final int[] colTransform; 489 private final int[] colInverse; 490 491 /** 492 * Create a randomization scheme for a matrix of a given size. 493 * @param rows the number of rows in the matrix 494 * @param cols the number of columns in the matrix 495 */ 496 public RandomizedMatrix(int rows, int cols) { 497 this.rows = rows; 498 this.cols = cols; 499 Random random = ThreadLocalRandom.current(); 500 rowTransform = new int[rows]; 501 rowInverse = new int[rows]; 502 for (int i = 0; i < rows; i++) { 503 rowTransform[i] = i; 504 } 505 // Shuffle the row indices. 506 for (int i = rows - 1; i >= 0; i--) { 507 int r = random.nextInt(i + 1); 508 int temp = rowTransform[r]; 509 rowTransform[r] = rowTransform[i]; 510 rowTransform[i] = temp; 511 } 512 // Generate the inverse row indices. 513 for (int i = 0; i < rows; i++) { 514 rowInverse[rowTransform[i]] = i; 515 } 516 517 colTransform = new int[cols]; 518 colInverse = new int[cols]; 519 for (int i = 0; i < cols; i++) { 520 colTransform[i] = i; 521 } 522 // Shuffle the column indices. 523 for (int i = cols - 1; i >= 0; i--) { 524 int r = random.nextInt(i + 1); 525 int temp = colTransform[r]; 526 colTransform[r] = colTransform[i]; 527 colTransform[i] = temp; 528 } 529 // Generate the inverse column indices. 530 for (int i = 0; i < cols; i++) { 531 colInverse[colTransform[i]] = i; 532 } 533 } 534 535 /** 536 * Copy a given matrix into a new matrix, transforming each row index and each column index 537 * according to the randomization scheme that was created at construction time. 538 * @param matrix the cost matrix to transform 539 * @return a new matrix with row and column indices transformed 540 */ 541 public float[][] transform(float[][] matrix) { 542 float[][] result = new float[rows][cols]; 543 for (int i = 0; i < rows; i++) { 544 for (int j = 0; j < cols; j++) { 545 result[rowTransform[i]][colTransform[j]] = matrix[i][j]; 546 } 547 } 548 return result; 549 } 550 551 /** 552 * Copy a given matrix into a new matrix, transforming each row index and each column index 553 * according to the inverse of the randomization scheme that was created at construction time. 554 * @param matrix the cost matrix to be inverted 555 * @return a new matrix with row and column indices inverted 556 */ 557 public float[][] invert(float[][] matrix) { 558 float[][] result = new float[rows][cols]; 559 for (int i = 0; i < rows; i++) { 560 for (int j = 0; j < cols; j++) { 561 result[rowInverse[i]][colInverse[j]] = matrix[i][j]; 562 } 563 } 564 return result; 565 } 566 567 /** 568 * Given an array where each element {@code indices[i]} represents the randomized column index 569 * corresponding to randomized row index {@code i}, create a new array with the corresponding 570 * inverted indices. 571 * @param indices an array of transformed indices to be inverted 572 * @return an array of inverted indices 573 */ 574 public int[] invertIndices(int[] indices) { 575 int[] result = new int[indices.length]; 576 for (int i = 0; i < indices.length; i++) { 577 result[rowInverse[i]] = colInverse[indices[i]]; 578 } 579 return result; 580 } 581 } 582 583 /** 584 * Print the assignment plan to the system output stream n 585 */ 586 public static void printAssignmentPlan(FavoredNodesPlan plan) { 587 if (plan == null) return; 588 LOG.info("========== Start to print the assignment plan ================"); 589 // sort the map based on region info 590 Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap()); 591 592 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 593 594 String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); 595 String regionName = entry.getKey(); 596 LOG.info("Region: " + regionName); 597 LOG.info("Its favored nodes: " + serverList); 598 } 599 LOG.info("========== Finish to print the assignment plan ================"); 600 } 601 602 /** 603 * Update the assignment plan into hbase:meta 604 * @param plan the assignments plan to be updated into hbase:meta 605 * @throws IOException if cannot update assignment plan in hbase:meta 606 */ 607 public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) throws IOException { 608 try { 609 LOG.info("Start to update the hbase:meta with the new assignment plan"); 610 Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap(); 611 Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size()); 612 Map<String, RegionInfo> regionToRegionInfoMap = 613 getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap(); 614 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 615 planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue()); 616 } 617 618 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf); 619 LOG.info("Updated the hbase:meta with the new assignment plan"); 620 } catch (Exception e) { 621 LOG.error( 622 "Failed to update hbase:meta with the new assignment" + "plan because " + e.getMessage()); 623 } 624 } 625 626 /** 627 * Update the assignment plan to all the region servers nn 628 */ 629 private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) throws IOException { 630 LOG.info("Start to update the region servers with the new assignment plan"); 631 // Get the region to region server map 632 Map<ServerName, List<RegionInfo>> currentAssignment = 633 this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); 634 635 // track of the failed and succeeded updates 636 int succeededNum = 0; 637 Map<ServerName, Exception> failedUpdateMap = new HashMap<>(); 638 639 for (Map.Entry<ServerName, List<RegionInfo>> entry : currentAssignment.entrySet()) { 640 List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>(); 641 try { 642 // Keep track of the favored updates for the current region server 643 FavoredNodesPlan singleServerPlan = null; 644 // Find out all the updates for the current region server 645 for (RegionInfo region : entry.getValue()) { 646 List<ServerName> favoredServerList = plan.getFavoredNodes(region); 647 if ( 648 favoredServerList != null 649 && favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM 650 ) { 651 // Create the single server plan if necessary 652 if (singleServerPlan == null) { 653 singleServerPlan = new FavoredNodesPlan(); 654 } 655 // Update the single server update 656 singleServerPlan.updateFavoredNodesMap(region, favoredServerList); 657 regionUpdateInfos.add(new Pair<>(region, favoredServerList)); 658 } 659 } 660 if (singleServerPlan != null) { 661 // Update the current region server with its updated favored nodes 662 AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey()); 663 UpdateFavoredNodesRequest request = 664 RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); 665 UpdateFavoredNodesResponse updateFavoredNodesResponse = 666 FutureUtils.get(rsAdmin.updateFavoredNodes(request)); 667 LOG.info("Region server " 668 + FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest())) 669 .getServerInfo() 670 + " has updated " + updateFavoredNodesResponse.getResponse() + " / " 671 + singleServerPlan.size() + " regions with the assignment plan"); 672 succeededNum++; 673 } 674 } catch (Exception e) { 675 failedUpdateMap.put(entry.getKey(), e); 676 } 677 } 678 // log the succeeded updates 679 LOG.info("Updated " + succeededNum + " region servers with " + "the new assignment plan"); 680 681 // log the failed updates 682 int failedNum = failedUpdateMap.size(); 683 if (failedNum != 0) { 684 LOG.error("Failed to update the following + " + failedNum 685 + " region servers with its corresponding favored nodes"); 686 for (Map.Entry<ServerName, Exception> entry : failedUpdateMap.entrySet()) { 687 LOG.error("Failed to update " + entry.getKey().getAddress() + " because of " 688 + entry.getValue().getMessage()); 689 } 690 } 691 } 692 693 public void updateAssignmentPlan(FavoredNodesPlan plan) throws IOException { 694 LOG.info("Start to update the new assignment plan for the hbase:meta table and" 695 + " the region servers"); 696 // Update the new assignment plan to META 697 updateAssignmentPlanToMeta(plan); 698 // Update the new assignment plan to Region Servers 699 updateAssignmentPlanToRegionServers(plan); 700 LOG.info("Finish to update the new assignment plan for the hbase:meta table and" 701 + " the region servers"); 702 } 703 704 /** 705 * Return how many regions will move per table since their primary RS will change 706 * @param newPlan - new AssignmentPlan 707 * @return how many primaries will move per table 708 */ 709 public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) throws IOException { 710 Map<TableName, Integer> movesPerTable = new HashMap<>(); 711 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 712 Map<TableName, List<RegionInfo>> tableToRegions = snapshot.getTableToRegionMap(); 713 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 714 Set<TableName> tables = snapshot.getTableSet(); 715 for (TableName table : tables) { 716 int movedPrimaries = 0; 717 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 718 continue; 719 } 720 List<RegionInfo> regions = tableToRegions.get(table); 721 for (RegionInfo region : regions) { 722 List<ServerName> oldServers = oldPlan.getFavoredNodes(region); 723 List<ServerName> newServers = newPlan.getFavoredNodes(region); 724 if (oldServers != null && newServers != null) { 725 ServerName oldPrimary = oldServers.get(0); 726 ServerName newPrimary = newServers.get(0); 727 if (oldPrimary.compareTo(newPrimary) != 0) { 728 movedPrimaries++; 729 } 730 } 731 } 732 movesPerTable.put(table, movedPrimaries); 733 } 734 return movesPerTable; 735 } 736 737 /** 738 * Compares two plans and check whether the locality dropped or increased (prints the information 739 * as a string) also prints the baseline locality 740 * @param movesPerTable - how many primary regions will move per table 741 * @param regionLocalityMap - locality map from FS 742 * @param newPlan - new assignment plan n 743 */ 744 public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable, 745 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan) 746 throws IOException { 747 // localities for primary, secondary and tertiary 748 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 749 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 750 Set<TableName> tables = snapshot.getTableSet(); 751 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap(); 752 for (TableName table : tables) { 753 float[] deltaLocality = new float[3]; 754 float[] locality = new float[3]; 755 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 756 continue; 757 } 758 List<RegionInfo> regions = tableToRegionsMap.get(table); 759 System.out.println("=================================================="); 760 System.out.println("Assignment Plan Projection Report For Table: " + table); 761 System.out.println("\t Total regions: " + regions.size()); 762 System.out.println( 763 "\t" + movesPerTable.get(table) + " primaries will move due to their primary has changed"); 764 for (RegionInfo currentRegion : regions) { 765 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName()); 766 if (regionLocality == null) { 767 continue; 768 } 769 List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion); 770 List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion); 771 if (newServers != null && oldServers != null) { 772 int i = 0; 773 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 774 ServerName newServer = newServers.get(p.ordinal()); 775 ServerName oldServer = oldServers.get(p.ordinal()); 776 Float oldLocality = 0f; 777 if (oldServers != null) { 778 oldLocality = regionLocality.get(oldServer.getHostname()); 779 if (oldLocality == null) { 780 oldLocality = 0f; 781 } 782 locality[i] += oldLocality; 783 } 784 Float newLocality = regionLocality.get(newServer.getHostname()); 785 if (newLocality == null) { 786 newLocality = 0f; 787 } 788 deltaLocality[i] += newLocality - oldLocality; 789 i++; 790 } 791 } 792 } 793 DecimalFormat df = new java.text.DecimalFormat("#.##"); 794 for (int i = 0; i < deltaLocality.length; i++) { 795 System.out.print("\t\t Baseline locality for "); 796 if (i == 0) { 797 System.out.print("primary "); 798 } else if (i == 1) { 799 System.out.print("secondary "); 800 } else if (i == 2) { 801 System.out.print("tertiary "); 802 } 803 System.out.println(df.format(100 * locality[i] / regions.size()) + "%"); 804 System.out.print("\t\t Locality will change with the new plan: "); 805 System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + "%"); 806 } 807 System.out.println("\t Baseline dispersion"); 808 printDispersionScores(table, snapshot, regions.size(), null, true); 809 System.out.println("\t Projected dispersion"); 810 printDispersionScores(table, snapshot, regions.size(), newPlan, true); 811 } 812 } 813 814 public void printDispersionScores(TableName table, SnapshotOfRegionAssignmentFromMeta snapshot, 815 int numRegions, FavoredNodesPlan newPlan, boolean simplePrint) { 816 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 817 return; 818 } 819 AssignmentVerificationReport report = new AssignmentVerificationReport(); 820 report.fillUpDispersion(table, snapshot, newPlan); 821 List<Float> dispersion = report.getDispersionInformation(); 822 if (simplePrint) { 823 DecimalFormat df = new java.text.DecimalFormat("#.##"); 824 System.out.println("\tAvg dispersion score: " + df.format(dispersion.get(0)) 825 + " hosts;\tMax dispersion score: " + df.format(dispersion.get(1)) 826 + " hosts;\tMin dispersion score: " + df.format(dispersion.get(2)) + " hosts;"); 827 } else { 828 LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions 829 + " ; The average dispersion score is " + dispersion.get(0)); 830 } 831 } 832 833 public void printLocalityAndDispersionForCurrentPlan( 834 Map<String, Map<String, Float>> regionLocalityMap) throws IOException { 835 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 836 FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan(); 837 Set<TableName> tables = snapshot.getTableSet(); 838 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap(); 839 for (TableName table : tables) { 840 float[] locality = new float[3]; 841 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 842 continue; 843 } 844 List<RegionInfo> regions = tableToRegionsMap.get(table); 845 for (RegionInfo currentRegion : regions) { 846 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName()); 847 if (regionLocality == null) { 848 continue; 849 } 850 List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion); 851 if (servers != null) { 852 int i = 0; 853 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 854 ServerName server = servers.get(p.ordinal()); 855 Float currentLocality = 0f; 856 if (servers != null) { 857 currentLocality = regionLocality.get(server.getHostname()); 858 if (currentLocality == null) { 859 currentLocality = 0f; 860 } 861 locality[i] += currentLocality; 862 } 863 i++; 864 } 865 } 866 } 867 for (int i = 0; i < locality.length; i++) { 868 String copy = null; 869 if (i == 0) { 870 copy = "primary"; 871 } else if (i == 1) { 872 copy = "secondary"; 873 } else if (i == 2) { 874 copy = "tertiary"; 875 } 876 float avgLocality = 100 * locality[i] / regions.size(); 877 LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size() 878 + " ; The average locality for " + copy + " is " + avgLocality + " %"); 879 } 880 printDispersionScores(table, snapshot, regions.size(), null, false); 881 } 882 } 883 884 /** 885 * @param favoredNodesStr The String of favored nodes 886 * @return the list of ServerName for the byte array of favored nodes. 887 */ 888 public static List<ServerName> getFavoredNodeList(String favoredNodesStr) { 889 String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ","); 890 if (favoredNodesArray == null) return null; 891 892 List<ServerName> serverList = new ArrayList<>(); 893 for (String hostNameAndPort : favoredNodesArray) { 894 serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE)); 895 } 896 return serverList; 897 } 898 899 public static void main(String args[]) throws IOException { 900 Options opt = new Options(); 901 opt.addOption("w", "write", false, "write the assignments to hbase:meta only"); 902 opt.addOption("u", "update", false, 903 "update the assignments to hbase:meta and RegionServers together"); 904 opt.addOption("n", "dry-run", false, "do not write assignments to META"); 905 opt.addOption("v", "verify", false, "verify current assignments against META"); 906 opt.addOption("p", "print", false, "print the current assignment plan in META"); 907 opt.addOption("h", "help", false, "print usage"); 908 opt.addOption("d", "verification-details", false, "print the details of verification report"); 909 910 opt.addOption("zk", true, "to set the zookeeper quorum"); 911 opt.addOption("fs", true, "to set HDFS"); 912 opt.addOption("hbase_root", true, "to set hbase_root directory"); 913 914 opt.addOption("overwrite", false, "overwrite the favored nodes for a single region," 915 + "for example: -update -r regionName -f server1:port,server2:port,server3:port"); 916 opt.addOption("r", true, "The region name that needs to be updated"); 917 opt.addOption("f", true, "The new favored nodes"); 918 919 opt.addOption("tables", true, 920 "The list of table names splitted by ',' ;" + "For example: -tables: t1,t2,...,tn"); 921 opt.addOption("l", "locality", true, "enforce the maximum locality"); 922 opt.addOption("m", "min-move", true, "enforce minimum assignment move"); 923 opt.addOption("diff", false, "calculate difference between assignment plans"); 924 opt.addOption("munkres", false, "use munkres to place secondaries and tertiaries"); 925 opt.addOption("ld", "locality-dispersion", false, 926 "print locality and dispersion " + "information for current plan"); 927 try { 928 CommandLine cmd = new GnuParser().parse(opt, args); 929 Configuration conf = HBaseConfiguration.create(); 930 931 boolean enforceMinAssignmentMove = true; 932 boolean enforceLocality = true; 933 boolean verificationDetails = false; 934 935 // Read all the options 936 if ( 937 (cmd.hasOption("l") && cmd.getOptionValue("l").equalsIgnoreCase("false")) 938 || (cmd.hasOption("locality") && cmd.getOptionValue("locality").equalsIgnoreCase("false")) 939 ) { 940 enforceLocality = false; 941 } 942 943 if ( 944 (cmd.hasOption("m") && cmd.getOptionValue("m").equalsIgnoreCase("false")) 945 || (cmd.hasOption("min-move") && cmd.getOptionValue("min-move").equalsIgnoreCase("false")) 946 ) { 947 enforceMinAssignmentMove = false; 948 } 949 950 if (cmd.hasOption("zk")) { 951 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk")); 952 LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM)); 953 } 954 955 if (cmd.hasOption("fs")) { 956 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs")); 957 LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); 958 } 959 960 if (cmd.hasOption("hbase_root")) { 961 conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root")); 962 LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR)); 963 } 964 965 // Create the region placement obj 966 try (RegionPlacementMaintainer rp = 967 new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) { 968 969 if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { 970 verificationDetails = true; 971 } 972 973 if (cmd.hasOption("tables")) { 974 String tableNameListStr = cmd.getOptionValue("tables"); 975 String[] tableNames = StringUtils.split(tableNameListStr, ","); 976 rp.setTargetTableName(tableNames); 977 } 978 979 if (cmd.hasOption("munkres")) { 980 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; 981 } 982 983 // Read all the modes 984 if (cmd.hasOption("v") || cmd.hasOption("verify")) { 985 // Verify the region placement. 986 rp.verifyRegionPlacement(verificationDetails); 987 } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { 988 // Generate the assignment plan only without updating the hbase:meta and RS 989 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 990 printAssignmentPlan(plan); 991 } else if (cmd.hasOption("w") || cmd.hasOption("write")) { 992 // Generate the new assignment plan 993 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 994 // Print the new assignment plan 995 printAssignmentPlan(plan); 996 // Write the new assignment plan to META 997 rp.updateAssignmentPlanToMeta(plan); 998 } else if (cmd.hasOption("u") || cmd.hasOption("update")) { 999 // Generate the new assignment plan 1000 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1001 // Print the new assignment plan 1002 printAssignmentPlan(plan); 1003 // Update the assignment to hbase:meta and Region Servers 1004 rp.updateAssignmentPlan(plan); 1005 } else if (cmd.hasOption("diff")) { 1006 FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); 1007 Map<String, Map<String, Float>> locality = 1008 FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 1009 Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan); 1010 rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); 1011 System.out.println("Do you want to update the assignment plan? [y/n]"); 1012 Scanner s = new Scanner(System.in); 1013 String input = s.nextLine().trim(); 1014 if (input.equals("y")) { 1015 System.out.println("Updating assignment plan..."); 1016 rp.updateAssignmentPlan(newPlan); 1017 } 1018 s.close(); 1019 } else if (cmd.hasOption("ld")) { 1020 Map<String, Map<String, Float>> locality = 1021 FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 1022 rp.printLocalityAndDispersionForCurrentPlan(locality); 1023 } else if (cmd.hasOption("p") || cmd.hasOption("print")) { 1024 FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); 1025 printAssignmentPlan(plan); 1026 } else if (cmd.hasOption("overwrite")) { 1027 if (!cmd.hasOption("f") || !cmd.hasOption("r")) { 1028 throw new IllegalArgumentException("Please specify: " 1029 + " -update -r regionName -f server1:port,server2:port,server3:port"); 1030 } 1031 1032 String regionName = cmd.getOptionValue("r"); 1033 String favoredNodesStr = cmd.getOptionValue("f"); 1034 LOG.info("Going to update the region " + regionName + " with the new favored nodes " 1035 + favoredNodesStr); 1036 List<ServerName> favoredNodes = null; 1037 RegionInfo regionInfo = 1038 rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); 1039 if (regionInfo == null) { 1040 LOG.error("Cannot find the region " + regionName + " from the META"); 1041 } else { 1042 try { 1043 favoredNodes = getFavoredNodeList(favoredNodesStr); 1044 } catch (IllegalArgumentException e) { 1045 LOG.error("Cannot parse the invalid favored nodes because " + e); 1046 } 1047 FavoredNodesPlan newPlan = new FavoredNodesPlan(); 1048 newPlan.updateFavoredNodesMap(regionInfo, favoredNodes); 1049 rp.updateAssignmentPlan(newPlan); 1050 } 1051 } else { 1052 printHelp(opt); 1053 } 1054 } 1055 } catch (ParseException e) { 1056 printHelp(opt); 1057 } 1058 } 1059}