001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master; 021 022import java.io.IOException; 023import java.text.DecimalFormat; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Random; 030import java.util.Scanner; 031import java.util.Set; 032import java.util.TreeMap; 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.ClusterConnection; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; 046import org.apache.hadoop.hbase.favored.FavoredNodesPlan; 047import org.apache.hadoop.hbase.util.FSUtils; 048import org.apache.hadoop.hbase.util.MunkresAssignment; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 055import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 057import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 058import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 065 066/** 067 * A tool that is used for manipulating and viewing favored nodes information 068 * for regions. Run with -h to get a list of the options 069 */ 070@InterfaceAudience.Private 071// TODO: Remove? Unused. Partially implemented only. 072public class RegionPlacementMaintainer { 073 private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class 074 .getName()); 075 //The cost of a placement that should never be assigned. 076 private static final float MAX_COST = Float.POSITIVE_INFINITY; 077 078 // The cost of a placement that is undesirable but acceptable. 079 private static final float AVOID_COST = 100000f; 080 081 // The amount by which the cost of a placement is increased if it is the 082 // last slot of the server. This is done to more evenly distribute the slop 083 // amongst servers. 084 private static final float LAST_SLOT_COST_PENALTY = 0.5f; 085 086 // The amount by which the cost of a primary placement is penalized if it is 087 // not the host currently serving the region. This is done to minimize moves. 088 private static final float NOT_CURRENT_HOST_PENALTY = 0.1f; 089 090 private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false; 091 092 private Configuration conf; 093 private final boolean enforceLocality; 094 private final boolean enforceMinAssignmentMove; 095 private RackManager rackManager; 096 private Set<TableName> targetTableSet; 097 private final Connection connection; 098 099 public RegionPlacementMaintainer(Configuration conf) { 100 this(conf, true, true); 101 } 102 103 public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality, 104 boolean enforceMinAssignmentMove) { 105 this.conf = conf; 106 this.enforceLocality = enforceLocality; 107 this.enforceMinAssignmentMove = enforceMinAssignmentMove; 108 this.targetTableSet = new HashSet<>(); 109 this.rackManager = new RackManager(conf); 110 try { 111 this.connection = ConnectionFactory.createConnection(this.conf); 112 } catch (IOException e) { 113 throw new RuntimeException(e); 114 } 115 } 116 117 private static void printHelp(Options opt) { 118 new HelpFormatter().printHelp( 119 "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " + 120 "-diff>" + 121 " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" + 122 " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt); 123 } 124 125 public void setTargetTableName(String[] tableNames) { 126 if (tableNames != null) { 127 for (String table : tableNames) 128 this.targetTableSet.add(TableName.valueOf(table)); 129 } 130 } 131 132 /** 133 * @return the new RegionAssignmentSnapshot 134 * @throws IOException 135 */ 136 public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() 137 throws IOException { 138 SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = 139 new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); 140 currentAssignmentShapshot.initialize(); 141 return currentAssignmentShapshot; 142 } 143 144 /** 145 * Verify the region placement is consistent with the assignment plan 146 * @param isDetailMode 147 * @return reports 148 * @throws IOException 149 */ 150 public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode) 151 throws IOException { 152 System.out.println("Start to verify the region assignment and " + 153 "generate the verification report"); 154 // Get the region assignment snapshot 155 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 156 157 // Get all the tables 158 Set<TableName> tables = snapshot.getTableSet(); 159 160 // Get the region locality map 161 Map<String, Map<String, Float>> regionLocalityMap = null; 162 if (this.enforceLocality == true) { 163 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 164 } 165 List<AssignmentVerificationReport> reports = new ArrayList<>(); 166 // Iterate all the tables to fill up the verification report 167 for (TableName table : tables) { 168 if (!this.targetTableSet.isEmpty() && 169 !this.targetTableSet.contains(table)) { 170 continue; 171 } 172 AssignmentVerificationReport report = new AssignmentVerificationReport(); 173 report.fillUp(table, snapshot, regionLocalityMap); 174 report.print(isDetailMode); 175 reports.add(report); 176 } 177 return reports; 178 } 179 180 /** 181 * Generate the assignment plan for the existing table 182 * 183 * @param tableName 184 * @param assignmentSnapshot 185 * @param regionLocalityMap 186 * @param plan 187 * @param munkresForSecondaryAndTertiary if set on true the assignment plan 188 * for the tertiary and secondary will be generated with Munkres algorithm, 189 * otherwise will be generated using placeSecondaryAndTertiaryRS 190 * @throws IOException 191 */ 192 private void genAssignmentPlan(TableName tableName, 193 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot, 194 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan, 195 boolean munkresForSecondaryAndTertiary) throws IOException { 196 // Get the all the regions for the current table 197 List<RegionInfo> regions = 198 assignmentSnapshot.getTableToRegionMap().get(tableName); 199 int numRegions = regions.size(); 200 201 // Get the current assignment map 202 Map<RegionInfo, ServerName> currentAssignmentMap = 203 assignmentSnapshot.getRegionToRegionServerMap(); 204 205 // Get the all the region servers 206 List<ServerName> servers = new ArrayList<>(); 207 try (Admin admin = this.connection.getAdmin()) { 208 servers.addAll(admin.getRegionServers()); 209 } 210 211 LOG.info("Start to generate assignment plan for " + numRegions + 212 " regions from table " + tableName + " with " + 213 servers.size() + " region servers"); 214 215 int slotsPerServer = (int) Math.ceil((float) numRegions / 216 servers.size()); 217 int regionSlots = slotsPerServer * servers.size(); 218 219 // Compute the primary, secondary and tertiary costs for each region/server 220 // pair. These costs are based only on node locality and rack locality, and 221 // will be modified later. 222 float[][] primaryCost = new float[numRegions][regionSlots]; 223 float[][] secondaryCost = new float[numRegions][regionSlots]; 224 float[][] tertiaryCost = new float[numRegions][regionSlots]; 225 226 if (this.enforceLocality && regionLocalityMap != null) { 227 // Transform the locality mapping into a 2D array, assuming that any 228 // unspecified locality value is 0. 229 float[][] localityPerServer = new float[numRegions][regionSlots]; 230 for (int i = 0; i < numRegions; i++) { 231 Map<String, Float> serverLocalityMap = 232 regionLocalityMap.get(regions.get(i).getEncodedName()); 233 if (serverLocalityMap == null) { 234 continue; 235 } 236 for (int j = 0; j < servers.size(); j++) { 237 String serverName = servers.get(j).getHostname(); 238 if (serverName == null) { 239 continue; 240 } 241 Float locality = serverLocalityMap.get(serverName); 242 if (locality == null) { 243 continue; 244 } 245 for (int k = 0; k < slotsPerServer; k++) { 246 // If we can't find the locality of a region to a server, which occurs 247 // because locality is only reported for servers which have some 248 // blocks of a region local, then the locality for that pair is 0. 249 localityPerServer[i][j * slotsPerServer + k] = locality.floatValue(); 250 } 251 } 252 } 253 254 // Compute the total rack locality for each region in each rack. The total 255 // rack locality is the sum of the localities of a region on all servers in 256 // a rack. 257 Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>(); 258 for (int i = 0; i < numRegions; i++) { 259 RegionInfo region = regions.get(i); 260 for (int j = 0; j < regionSlots; j += slotsPerServer) { 261 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 262 Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack); 263 if (rackLocality == null) { 264 rackLocality = new HashMap<>(); 265 rackRegionLocality.put(rack, rackLocality); 266 } 267 Float localityObj = rackLocality.get(region); 268 float locality = localityObj == null ? 0 : localityObj.floatValue(); 269 locality += localityPerServer[i][j]; 270 rackLocality.put(region, locality); 271 } 272 } 273 for (int i = 0; i < numRegions; i++) { 274 for (int j = 0; j < regionSlots; j++) { 275 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 276 Float totalRackLocalityObj = 277 rackRegionLocality.get(rack).get(regions.get(i)); 278 float totalRackLocality = totalRackLocalityObj == null ? 279 0 : totalRackLocalityObj.floatValue(); 280 281 // Primary cost aims to favor servers with high node locality and low 282 // rack locality, so that secondaries and tertiaries can be chosen for 283 // nodes with high rack locality. This might give primaries with 284 // slightly less locality at first compared to a cost which only 285 // considers the node locality, but should be better in the long run. 286 primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - 287 totalRackLocality); 288 289 // Secondary cost aims to favor servers with high node locality and high 290 // rack locality since the tertiary will be chosen from the same rack as 291 // the secondary. This could be negative, but that is okay. 292 secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality); 293 294 // Tertiary cost is only concerned with the node locality. It will later 295 // be restricted to only hosts on the same rack as the secondary. 296 tertiaryCost[i][j] = 1 - localityPerServer[i][j]; 297 } 298 } 299 } 300 301 if (this.enforceMinAssignmentMove && currentAssignmentMap != null) { 302 // We want to minimize the number of regions which move as the result of a 303 // new assignment. Therefore, slightly penalize any placement which is for 304 // a host that is not currently serving the region. 305 for (int i = 0; i < numRegions; i++) { 306 for (int j = 0; j < servers.size(); j++) { 307 ServerName currentAddress = currentAssignmentMap.get(regions.get(i)); 308 if (currentAddress != null && 309 !currentAddress.equals(servers.get(j))) { 310 for (int k = 0; k < slotsPerServer; k++) { 311 primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY; 312 } 313 } 314 } 315 } 316 } 317 318 // Artificially increase cost of last slot of each server to evenly 319 // distribute the slop, otherwise there will be a few servers with too few 320 // regions and many servers with the max number of regions. 321 for (int i = 0; i < numRegions; i++) { 322 for (int j = 0; j < regionSlots; j += slotsPerServer) { 323 primaryCost[i][j] += LAST_SLOT_COST_PENALTY; 324 secondaryCost[i][j] += LAST_SLOT_COST_PENALTY; 325 tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY; 326 } 327 } 328 329 RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, 330 regionSlots); 331 primaryCost = randomizedMatrix.transform(primaryCost); 332 int[] primaryAssignment = new MunkresAssignment(primaryCost).solve(); 333 primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment); 334 335 // Modify the secondary and tertiary costs for each region/server pair to 336 // prevent a region from being assigned to the same rack for both primary 337 // and either one of secondary or tertiary. 338 for (int i = 0; i < numRegions; i++) { 339 int slot = primaryAssignment[i]; 340 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 341 for (int k = 0; k < servers.size(); k++) { 342 if (!rackManager.getRack(servers.get(k)).equals(rack)) { 343 continue; 344 } 345 if (k == slot / slotsPerServer) { 346 // Same node, do not place secondary or tertiary here ever. 347 for (int m = 0; m < slotsPerServer; m++) { 348 secondaryCost[i][k * slotsPerServer + m] = MAX_COST; 349 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 350 } 351 } else { 352 // Same rack, do not place secondary or tertiary here if possible. 353 for (int m = 0; m < slotsPerServer; m++) { 354 secondaryCost[i][k * slotsPerServer + m] = AVOID_COST; 355 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 356 } 357 } 358 } 359 } 360 if (munkresForSecondaryAndTertiary) { 361 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 362 secondaryCost = randomizedMatrix.transform(secondaryCost); 363 int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve(); 364 secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment); 365 366 // Modify the tertiary costs for each region/server pair to ensure that a 367 // region is assigned to a tertiary server on the same rack as its secondary 368 // server, but not the same server in that rack. 369 for (int i = 0; i < numRegions; i++) { 370 int slot = secondaryAssignment[i]; 371 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 372 for (int k = 0; k < servers.size(); k++) { 373 if (k == slot / slotsPerServer) { 374 // Same node, do not place tertiary here ever. 375 for (int m = 0; m < slotsPerServer; m++) { 376 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 377 } 378 } else { 379 if (rackManager.getRack(servers.get(k)).equals(rack)) { 380 continue; 381 } 382 // Different rack, do not place tertiary here if possible. 383 for (int m = 0; m < slotsPerServer; m++) { 384 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 385 } 386 } 387 } 388 } 389 390 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 391 tertiaryCost = randomizedMatrix.transform(tertiaryCost); 392 int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve(); 393 tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment); 394 395 for (int i = 0; i < numRegions; i++) { 396 List<ServerName> favoredServers 397 = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 398 ServerName s = servers.get(primaryAssignment[i] / slotsPerServer); 399 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 400 ServerName.NON_STARTCODE)); 401 402 s = servers.get(secondaryAssignment[i] / slotsPerServer); 403 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 404 ServerName.NON_STARTCODE)); 405 406 s = servers.get(tertiaryAssignment[i] / slotsPerServer); 407 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 408 ServerName.NON_STARTCODE)); 409 // Update the assignment plan 410 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 411 } 412 LOG.info("Generated the assignment plan for " + numRegions + 413 " regions from table " + tableName + " with " + 414 servers.size() + " region servers"); 415 LOG.info("Assignment plan for secondary and tertiary generated " + 416 "using MunkresAssignment"); 417 } else { 418 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(); 419 for (int i = 0; i < numRegions; i++) { 420 primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer)); 421 } 422 FavoredNodeAssignmentHelper favoredNodeHelper = 423 new FavoredNodeAssignmentHelper(servers, conf); 424 favoredNodeHelper.initialize(); 425 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = 426 favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap); 427 for (int i = 0; i < numRegions; i++) { 428 List<ServerName> favoredServers 429 = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 430 RegionInfo currentRegion = regions.get(i); 431 ServerName s = primaryRSMap.get(currentRegion); 432 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 433 ServerName.NON_STARTCODE)); 434 435 ServerName[] secondaryAndTertiary = 436 secondaryAndTertiaryMap.get(currentRegion); 437 s = secondaryAndTertiary[0]; 438 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 439 ServerName.NON_STARTCODE)); 440 441 s = secondaryAndTertiary[1]; 442 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 443 ServerName.NON_STARTCODE)); 444 // Update the assignment plan 445 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 446 } 447 LOG.info("Generated the assignment plan for " + numRegions + 448 " regions from table " + tableName + " with " + 449 servers.size() + " region servers"); 450 LOG.info("Assignment plan for secondary and tertiary generated " + 451 "using placeSecondaryAndTertiaryWithRestrictions method"); 452 } 453 } 454 455 public FavoredNodesPlan getNewAssignmentPlan() throws IOException { 456 // Get the current region assignment snapshot by scanning from the META 457 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = 458 this.getRegionAssignmentSnapshot(); 459 460 // Get the region locality map 461 Map<String, Map<String, Float>> regionLocalityMap = null; 462 if (this.enforceLocality) { 463 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 464 } 465 // Initialize the assignment plan 466 FavoredNodesPlan plan = new FavoredNodesPlan(); 467 468 // Get the table to region mapping 469 Map<TableName, List<RegionInfo>> tableToRegionMap = 470 assignmentSnapshot.getTableToRegionMap(); 471 LOG.info("Start to generate the new assignment plan for the " + 472 + tableToRegionMap.keySet().size() + " tables" ); 473 for (TableName table : tableToRegionMap.keySet()) { 474 try { 475 if (!this.targetTableSet.isEmpty() && 476 !this.targetTableSet.contains(table)) { 477 continue; 478 } 479 // TODO: maybe run the placement in parallel for each table 480 genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan, 481 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY); 482 } catch (Exception e) { 483 LOG.error("Get some exceptions for placing primary region server" + 484 "for table " + table + " because " + e); 485 } 486 } 487 LOG.info("Finish to generate the new assignment plan for the " + 488 + tableToRegionMap.keySet().size() + " tables" ); 489 return plan; 490 } 491 492 /** 493 * Some algorithms for solving the assignment problem may traverse workers or 494 * jobs in linear order which may result in skewing the assignments of the 495 * first jobs in the matrix toward the last workers in the matrix if the 496 * costs are uniform. To avoid this kind of clumping, we can randomize the 497 * rows and columns of the cost matrix in a reversible way, such that the 498 * solution to the assignment problem can be interpreted in terms of the 499 * original untransformed cost matrix. Rows and columns are transformed 500 * independently such that the elements contained in any row of the input 501 * matrix are the same as the elements in the corresponding output matrix, 502 * and each row has its elements transformed in the same way. Similarly for 503 * columns. 504 */ 505 protected static class RandomizedMatrix { 506 private final int rows; 507 private final int cols; 508 private final int[] rowTransform; 509 private final int[] rowInverse; 510 private final int[] colTransform; 511 private final int[] colInverse; 512 513 /** 514 * Create a randomization scheme for a matrix of a given size. 515 * @param rows the number of rows in the matrix 516 * @param cols the number of columns in the matrix 517 */ 518 public RandomizedMatrix(int rows, int cols) { 519 this.rows = rows; 520 this.cols = cols; 521 Random random = new Random(); 522 rowTransform = new int[rows]; 523 rowInverse = new int[rows]; 524 for (int i = 0; i < rows; i++) { 525 rowTransform[i] = i; 526 } 527 // Shuffle the row indices. 528 for (int i = rows - 1; i >= 0; i--) { 529 int r = random.nextInt(i + 1); 530 int temp = rowTransform[r]; 531 rowTransform[r] = rowTransform[i]; 532 rowTransform[i] = temp; 533 } 534 // Generate the inverse row indices. 535 for (int i = 0; i < rows; i++) { 536 rowInverse[rowTransform[i]] = i; 537 } 538 539 colTransform = new int[cols]; 540 colInverse = new int[cols]; 541 for (int i = 0; i < cols; i++) { 542 colTransform[i] = i; 543 } 544 // Shuffle the column indices. 545 for (int i = cols - 1; i >= 0; i--) { 546 int r = random.nextInt(i + 1); 547 int temp = colTransform[r]; 548 colTransform[r] = colTransform[i]; 549 colTransform[i] = temp; 550 } 551 // Generate the inverse column indices. 552 for (int i = 0; i < cols; i++) { 553 colInverse[colTransform[i]] = i; 554 } 555 } 556 557 /** 558 * Copy a given matrix into a new matrix, transforming each row index and 559 * each column index according to the randomization scheme that was created 560 * at construction time. 561 * @param matrix the cost matrix to transform 562 * @return a new matrix with row and column indices transformed 563 */ 564 public float[][] transform(float[][] matrix) { 565 float[][] result = new float[rows][cols]; 566 for (int i = 0; i < rows; i++) { 567 for (int j = 0; j < cols; j++) { 568 result[rowTransform[i]][colTransform[j]] = matrix[i][j]; 569 } 570 } 571 return result; 572 } 573 574 /** 575 * Copy a given matrix into a new matrix, transforming each row index and 576 * each column index according to the inverse of the randomization scheme 577 * that was created at construction time. 578 * @param matrix the cost matrix to be inverted 579 * @return a new matrix with row and column indices inverted 580 */ 581 public float[][] invert(float[][] matrix) { 582 float[][] result = new float[rows][cols]; 583 for (int i = 0; i < rows; i++) { 584 for (int j = 0; j < cols; j++) { 585 result[rowInverse[i]][colInverse[j]] = matrix[i][j]; 586 } 587 } 588 return result; 589 } 590 591 /** 592 * Given an array where each element {@code indices[i]} represents the 593 * randomized column index corresponding to randomized row index {@code i}, 594 * create a new array with the corresponding inverted indices. 595 * @param indices an array of transformed indices to be inverted 596 * @return an array of inverted indices 597 */ 598 public int[] invertIndices(int[] indices) { 599 int[] result = new int[indices.length]; 600 for (int i = 0; i < indices.length; i++) { 601 result[rowInverse[i]] = colInverse[indices[i]]; 602 } 603 return result; 604 } 605 } 606 607 /** 608 * Print the assignment plan to the system output stream 609 * @param plan 610 */ 611 public static void printAssignmentPlan(FavoredNodesPlan plan) { 612 if (plan == null) return; 613 LOG.info("========== Start to print the assignment plan ================"); 614 // sort the map based on region info 615 Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap()); 616 617 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 618 619 String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); 620 String regionName = entry.getKey(); 621 LOG.info("Region: " + regionName ); 622 LOG.info("Its favored nodes: " + serverList); 623 } 624 LOG.info("========== Finish to print the assignment plan ================"); 625 } 626 627 /** 628 * Update the assignment plan into hbase:meta 629 * @param plan the assignments plan to be updated into hbase:meta 630 * @throws IOException if cannot update assignment plan in hbase:meta 631 */ 632 public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) 633 throws IOException { 634 try { 635 LOG.info("Start to update the hbase:meta with the new assignment plan"); 636 Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap(); 637 Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size()); 638 Map<String, RegionInfo> regionToRegionInfoMap = 639 getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap(); 640 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 641 planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue()); 642 } 643 644 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf); 645 LOG.info("Updated the hbase:meta with the new assignment plan"); 646 } catch (Exception e) { 647 LOG.error("Failed to update hbase:meta with the new assignment" + 648 "plan because " + e.getMessage()); 649 } 650 } 651 652 /** 653 * Update the assignment plan to all the region servers 654 * @param plan 655 * @throws IOException 656 */ 657 private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) 658 throws IOException{ 659 LOG.info("Start to update the region servers with the new assignment plan"); 660 // Get the region to region server map 661 Map<ServerName, List<RegionInfo>> currentAssignment = 662 this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); 663 664 // track of the failed and succeeded updates 665 int succeededNum = 0; 666 Map<ServerName, Exception> failedUpdateMap = new HashMap<>(); 667 668 for (Map.Entry<ServerName, List<RegionInfo>> entry : 669 currentAssignment.entrySet()) { 670 List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>(); 671 try { 672 // Keep track of the favored updates for the current region server 673 FavoredNodesPlan singleServerPlan = null; 674 // Find out all the updates for the current region server 675 for (RegionInfo region : entry.getValue()) { 676 List<ServerName> favoredServerList = plan.getFavoredNodes(region); 677 if (favoredServerList != null && 678 favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { 679 // Create the single server plan if necessary 680 if (singleServerPlan == null) { 681 singleServerPlan = new FavoredNodesPlan(); 682 } 683 // Update the single server update 684 singleServerPlan.updateFavoredNodesMap(region, favoredServerList); 685 regionUpdateInfos.add(new Pair<>(region, favoredServerList)); 686 } 687 } 688 if (singleServerPlan != null) { 689 // Update the current region server with its updated favored nodes 690 BlockingInterface currentRegionServer = 691 ((ClusterConnection)this.connection).getAdmin(entry.getKey()); 692 UpdateFavoredNodesRequest request = 693 RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); 694 695 UpdateFavoredNodesResponse updateFavoredNodesResponse = 696 currentRegionServer.updateFavoredNodes(null, request); 697 LOG.info("Region server " + 698 ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + 699 " has updated " + updateFavoredNodesResponse.getResponse() + " / " + 700 singleServerPlan.getAssignmentMap().size() + 701 " regions with the assignment plan"); 702 succeededNum ++; 703 } 704 } catch (Exception e) { 705 failedUpdateMap.put(entry.getKey(), e); 706 } 707 } 708 // log the succeeded updates 709 LOG.info("Updated " + succeededNum + " region servers with " + 710 "the new assignment plan"); 711 712 // log the failed updates 713 int failedNum = failedUpdateMap.size(); 714 if (failedNum != 0) { 715 LOG.error("Failed to update the following + " + failedNum + 716 " region servers with its corresponding favored nodes"); 717 for (Map.Entry<ServerName, Exception> entry : 718 failedUpdateMap.entrySet() ) { 719 LOG.error("Failed to update " + entry.getKey().getHostAndPort() + 720 " because of " + entry.getValue().getMessage()); 721 } 722 } 723 } 724 725 public void updateAssignmentPlan(FavoredNodesPlan plan) 726 throws IOException { 727 LOG.info("Start to update the new assignment plan for the hbase:meta table and" + 728 " the region servers"); 729 // Update the new assignment plan to META 730 updateAssignmentPlanToMeta(plan); 731 // Update the new assignment plan to Region Servers 732 updateAssignmentPlanToRegionServers(plan); 733 LOG.info("Finish to update the new assignment plan for the hbase:meta table and" + 734 " the region servers"); 735 } 736 737 /** 738 * Return how many regions will move per table since their primary RS will 739 * change 740 * 741 * @param newPlan - new AssignmentPlan 742 * @return how many primaries will move per table 743 */ 744 public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) 745 throws IOException { 746 Map<TableName, Integer> movesPerTable = new HashMap<>(); 747 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 748 Map<TableName, List<RegionInfo>> tableToRegions = snapshot 749 .getTableToRegionMap(); 750 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 751 Set<TableName> tables = snapshot.getTableSet(); 752 for (TableName table : tables) { 753 int movedPrimaries = 0; 754 if (!this.targetTableSet.isEmpty() 755 && !this.targetTableSet.contains(table)) { 756 continue; 757 } 758 List<RegionInfo> regions = tableToRegions.get(table); 759 for (RegionInfo region : regions) { 760 List<ServerName> oldServers = oldPlan.getFavoredNodes(region); 761 List<ServerName> newServers = newPlan.getFavoredNodes(region); 762 if (oldServers != null && newServers != null) { 763 ServerName oldPrimary = oldServers.get(0); 764 ServerName newPrimary = newServers.get(0); 765 if (oldPrimary.compareTo(newPrimary) != 0) { 766 movedPrimaries++; 767 } 768 } 769 } 770 movesPerTable.put(table, movedPrimaries); 771 } 772 return movesPerTable; 773 } 774 775 /** 776 * Compares two plans and check whether the locality dropped or increased 777 * (prints the information as a string) also prints the baseline locality 778 * 779 * @param movesPerTable - how many primary regions will move per table 780 * @param regionLocalityMap - locality map from FS 781 * @param newPlan - new assignment plan 782 * @throws IOException 783 */ 784 public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable, 785 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan) 786 throws IOException { 787 // localities for primary, secondary and tertiary 788 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 789 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 790 Set<TableName> tables = snapshot.getTableSet(); 791 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap(); 792 for (TableName table : tables) { 793 float[] deltaLocality = new float[3]; 794 float[] locality = new float[3]; 795 if (!this.targetTableSet.isEmpty() 796 && !this.targetTableSet.contains(table)) { 797 continue; 798 } 799 List<RegionInfo> regions = tableToRegionsMap.get(table); 800 System.out.println("=================================================="); 801 System.out.println("Assignment Plan Projection Report For Table: " + table); 802 System.out.println("\t Total regions: " + regions.size()); 803 System.out.println("\t" + movesPerTable.get(table) 804 + " primaries will move due to their primary has changed"); 805 for (RegionInfo currentRegion : regions) { 806 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion 807 .getEncodedName()); 808 if (regionLocality == null) { 809 continue; 810 } 811 List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion); 812 List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion); 813 if (newServers != null && oldServers != null) { 814 int i=0; 815 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 816 ServerName newServer = newServers.get(p.ordinal()); 817 ServerName oldServer = oldServers.get(p.ordinal()); 818 Float oldLocality = 0f; 819 if (oldServers != null) { 820 oldLocality = regionLocality.get(oldServer.getHostname()); 821 if (oldLocality == null) { 822 oldLocality = 0f; 823 } 824 locality[i] += oldLocality; 825 } 826 Float newLocality = regionLocality.get(newServer.getHostname()); 827 if (newLocality == null) { 828 newLocality = 0f; 829 } 830 deltaLocality[i] += newLocality - oldLocality; 831 i++; 832 } 833 } 834 } 835 DecimalFormat df = new java.text.DecimalFormat( "#.##"); 836 for (int i = 0; i < deltaLocality.length; i++) { 837 System.out.print("\t\t Baseline locality for "); 838 if (i == 0) { 839 System.out.print("primary "); 840 } else if (i == 1) { 841 System.out.print("secondary "); 842 } else if (i == 2) { 843 System.out.print("tertiary "); 844 } 845 System.out.println(df.format(100 * locality[i] / regions.size()) + "%"); 846 System.out.print("\t\t Locality will change with the new plan: "); 847 System.out.println(df.format(100 * deltaLocality[i] / regions.size()) 848 + "%"); 849 } 850 System.out.println("\t Baseline dispersion"); 851 printDispersionScores(table, snapshot, regions.size(), null, true); 852 System.out.println("\t Projected dispersion"); 853 printDispersionScores(table, snapshot, regions.size(), newPlan, true); 854 } 855 } 856 857 public void printDispersionScores(TableName table, 858 SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan, 859 boolean simplePrint) { 860 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 861 return; 862 } 863 AssignmentVerificationReport report = new AssignmentVerificationReport(); 864 report.fillUpDispersion(table, snapshot, newPlan); 865 List<Float> dispersion = report.getDispersionInformation(); 866 if (simplePrint) { 867 DecimalFormat df = new java.text.DecimalFormat("#.##"); 868 System.out.println("\tAvg dispersion score: " 869 + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: " 870 + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: " 871 + df.format(dispersion.get(2)) + " hosts;"); 872 } else { 873 LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions 874 + " ; The average dispersion score is " + dispersion.get(0)); 875 } 876 } 877 878 public void printLocalityAndDispersionForCurrentPlan( 879 Map<String, Map<String, Float>> regionLocalityMap) throws IOException { 880 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 881 FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan(); 882 Set<TableName> tables = snapshot.getTableSet(); 883 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot 884 .getTableToRegionMap(); 885 for (TableName table : tables) { 886 float[] locality = new float[3]; 887 if (!this.targetTableSet.isEmpty() 888 && !this.targetTableSet.contains(table)) { 889 continue; 890 } 891 List<RegionInfo> regions = tableToRegionsMap.get(table); 892 for (RegionInfo currentRegion : regions) { 893 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion 894 .getEncodedName()); 895 if (regionLocality == null) { 896 continue; 897 } 898 List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion); 899 if (servers != null) { 900 int i = 0; 901 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 902 ServerName server = servers.get(p.ordinal()); 903 Float currentLocality = 0f; 904 if (servers != null) { 905 currentLocality = regionLocality.get(server.getHostname()); 906 if (currentLocality == null) { 907 currentLocality = 0f; 908 } 909 locality[i] += currentLocality; 910 } 911 i++; 912 } 913 } 914 } 915 for (int i = 0; i < locality.length; i++) { 916 String copy = null; 917 if (i == 0) { 918 copy = "primary"; 919 } else if (i == 1) { 920 copy = "secondary"; 921 } else if (i == 2) { 922 copy = "tertiary" ; 923 } 924 float avgLocality = 100 * locality[i] / regions.size(); 925 LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size() 926 + " ; The average locality for " + copy+ " is " + avgLocality + " %"); 927 } 928 printDispersionScores(table, snapshot, regions.size(), null, false); 929 } 930 } 931 932 /** 933 * @param favoredNodesStr The String of favored nodes 934 * @return the list of ServerName for the byte array of favored nodes. 935 */ 936 public static List<ServerName> getFavoredNodeList(String favoredNodesStr) { 937 String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ","); 938 if (favoredNodesArray == null) 939 return null; 940 941 List<ServerName> serverList = new ArrayList<>(); 942 for (String hostNameAndPort : favoredNodesArray) { 943 serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE)); 944 } 945 return serverList; 946 } 947 948 public static void main(String args[]) throws IOException { 949 Options opt = new Options(); 950 opt.addOption("w", "write", false, "write the assignments to hbase:meta only"); 951 opt.addOption("u", "update", false, 952 "update the assignments to hbase:meta and RegionServers together"); 953 opt.addOption("n", "dry-run", false, "do not write assignments to META"); 954 opt.addOption("v", "verify", false, "verify current assignments against META"); 955 opt.addOption("p", "print", false, "print the current assignment plan in META"); 956 opt.addOption("h", "help", false, "print usage"); 957 opt.addOption("d", "verification-details", false, 958 "print the details of verification report"); 959 960 opt.addOption("zk", true, "to set the zookeeper quorum"); 961 opt.addOption("fs", true, "to set HDFS"); 962 opt.addOption("hbase_root", true, "to set hbase_root directory"); 963 964 opt.addOption("overwrite", false, 965 "overwrite the favored nodes for a single region," + 966 "for example: -update -r regionName -f server1:port,server2:port,server3:port"); 967 opt.addOption("r", true, "The region name that needs to be updated"); 968 opt.addOption("f", true, "The new favored nodes"); 969 970 opt.addOption("tables", true, 971 "The list of table names splitted by ',' ;" + 972 "For example: -tables: t1,t2,...,tn"); 973 opt.addOption("l", "locality", true, "enforce the maximum locality"); 974 opt.addOption("m", "min-move", true, "enforce minimum assignment move"); 975 opt.addOption("diff", false, "calculate difference between assignment plans"); 976 opt.addOption("munkres", false, 977 "use munkres to place secondaries and tertiaries"); 978 opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " + 979 "information for current plan"); 980 try { 981 CommandLine cmd = new GnuParser().parse(opt, args); 982 Configuration conf = HBaseConfiguration.create(); 983 984 boolean enforceMinAssignmentMove = true; 985 boolean enforceLocality = true; 986 boolean verificationDetails = false; 987 988 // Read all the options 989 if ((cmd.hasOption("l") && 990 cmd.getOptionValue("l").equalsIgnoreCase("false")) || 991 (cmd.hasOption("locality") && 992 cmd.getOptionValue("locality").equalsIgnoreCase("false"))) { 993 enforceLocality = false; 994 } 995 996 if ((cmd.hasOption("m") && 997 cmd.getOptionValue("m").equalsIgnoreCase("false")) || 998 (cmd.hasOption("min-move") && 999 cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) { 1000 enforceMinAssignmentMove = false; 1001 } 1002 1003 if (cmd.hasOption("zk")) { 1004 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk")); 1005 LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM)); 1006 } 1007 1008 if (cmd.hasOption("fs")) { 1009 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs")); 1010 LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); 1011 } 1012 1013 if (cmd.hasOption("hbase_root")) { 1014 conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root")); 1015 LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR)); 1016 } 1017 1018 // Create the region placement obj 1019 RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality, 1020 enforceMinAssignmentMove); 1021 1022 if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { 1023 verificationDetails = true; 1024 } 1025 1026 if (cmd.hasOption("tables")) { 1027 String tableNameListStr = cmd.getOptionValue("tables"); 1028 String[] tableNames = StringUtils.split(tableNameListStr, ","); 1029 rp.setTargetTableName(tableNames); 1030 } 1031 1032 if (cmd.hasOption("munkres")) { 1033 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; 1034 } 1035 1036 // Read all the modes 1037 if (cmd.hasOption("v") || cmd.hasOption("verify")) { 1038 // Verify the region placement. 1039 rp.verifyRegionPlacement(verificationDetails); 1040 } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { 1041 // Generate the assignment plan only without updating the hbase:meta and RS 1042 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1043 printAssignmentPlan(plan); 1044 } else if (cmd.hasOption("w") || cmd.hasOption("write")) { 1045 // Generate the new assignment plan 1046 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1047 // Print the new assignment plan 1048 printAssignmentPlan(plan); 1049 // Write the new assignment plan to META 1050 rp.updateAssignmentPlanToMeta(plan); 1051 } else if (cmd.hasOption("u") || cmd.hasOption("update")) { 1052 // Generate the new assignment plan 1053 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1054 // Print the new assignment plan 1055 printAssignmentPlan(plan); 1056 // Update the assignment to hbase:meta and Region Servers 1057 rp.updateAssignmentPlan(plan); 1058 } else if (cmd.hasOption("diff")) { 1059 FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); 1060 Map<String, Map<String, Float>> locality = FSUtils 1061 .getRegionDegreeLocalityMappingFromFS(conf); 1062 Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan); 1063 rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); 1064 System.out.println("Do you want to update the assignment plan? [y/n]"); 1065 Scanner s = new Scanner(System.in); 1066 String input = s.nextLine().trim(); 1067 if (input.equals("y")) { 1068 System.out.println("Updating assignment plan..."); 1069 rp.updateAssignmentPlan(newPlan); 1070 } 1071 s.close(); 1072 } else if (cmd.hasOption("ld")) { 1073 Map<String, Map<String, Float>> locality = FSUtils 1074 .getRegionDegreeLocalityMappingFromFS(conf); 1075 rp.printLocalityAndDispersionForCurrentPlan(locality); 1076 } else if (cmd.hasOption("p") || cmd.hasOption("print")) { 1077 FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); 1078 printAssignmentPlan(plan); 1079 } else if (cmd.hasOption("overwrite")) { 1080 if (!cmd.hasOption("f") || !cmd.hasOption("r")) { 1081 throw new IllegalArgumentException("Please specify: " + 1082 " -update -r regionName -f server1:port,server2:port,server3:port"); 1083 } 1084 1085 String regionName = cmd.getOptionValue("r"); 1086 String favoredNodesStr = cmd.getOptionValue("f"); 1087 LOG.info("Going to update the region " + regionName + " with the new favored nodes " + 1088 favoredNodesStr); 1089 List<ServerName> favoredNodes = null; 1090 RegionInfo regionInfo = 1091 rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); 1092 if (regionInfo == null) { 1093 LOG.error("Cannot find the region " + regionName + " from the META"); 1094 } else { 1095 try { 1096 favoredNodes = getFavoredNodeList(favoredNodesStr); 1097 } catch (IllegalArgumentException e) { 1098 LOG.error("Cannot parse the invalid favored nodes because " + e); 1099 } 1100 FavoredNodesPlan newPlan = new FavoredNodesPlan(); 1101 newPlan.updateFavoredNodesMap(regionInfo, favoredNodes); 1102 rp.updateAssignmentPlan(newPlan); 1103 } 1104 } else { 1105 printHelp(opt); 1106 } 1107 } catch (ParseException e) { 1108 printHelp(opt); 1109 } 1110 } 1111}