001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master; 021 022import java.io.IOException; 023import java.text.DecimalFormat; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Random; 031import java.util.Scanner; 032import java.util.Set; 033import java.util.TreeMap; 034import org.apache.commons.lang3.StringUtils; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.hbase.ClusterMetrics.Option; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.ClusterConnection; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; 048import org.apache.hadoop.hbase.favored.FavoredNodesPlan; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.hbase.util.MunkresAssignment; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 057import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 058import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 059import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 060import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 061 062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 067 068/** 069 * A tool that is used for manipulating and viewing favored nodes information 070 * for regions. Run with -h to get a list of the options 071 */ 072@InterfaceAudience.Private 073// TODO: Remove? Unused. Partially implemented only. 074public class RegionPlacementMaintainer { 075 private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class 076 .getName()); 077 //The cost of a placement that should never be assigned. 078 private static final float MAX_COST = Float.POSITIVE_INFINITY; 079 080 // The cost of a placement that is undesirable but acceptable. 081 private static final float AVOID_COST = 100000f; 082 083 // The amount by which the cost of a placement is increased if it is the 084 // last slot of the server. This is done to more evenly distribute the slop 085 // amongst servers. 086 private static final float LAST_SLOT_COST_PENALTY = 0.5f; 087 088 // The amount by which the cost of a primary placement is penalized if it is 089 // not the host currently serving the region. This is done to minimize moves. 090 private static final float NOT_CURRENT_HOST_PENALTY = 0.1f; 091 092 private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false; 093 094 private Configuration conf; 095 private final boolean enforceLocality; 096 private final boolean enforceMinAssignmentMove; 097 private RackManager rackManager; 098 private Set<TableName> targetTableSet; 099 private final Connection connection; 100 101 public RegionPlacementMaintainer(Configuration conf) { 102 this(conf, true, true); 103 } 104 105 public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality, 106 boolean enforceMinAssignmentMove) { 107 this.conf = conf; 108 this.enforceLocality = enforceLocality; 109 this.enforceMinAssignmentMove = enforceMinAssignmentMove; 110 this.targetTableSet = new HashSet<>(); 111 this.rackManager = new RackManager(conf); 112 try { 113 this.connection = ConnectionFactory.createConnection(this.conf); 114 } catch (IOException e) { 115 throw new RuntimeException(e); 116 } 117 } 118 119 private static void printHelp(Options opt) { 120 new HelpFormatter().printHelp( 121 "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " + 122 "-diff>" + 123 " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" + 124 " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt); 125 } 126 127 public void setTargetTableName(String[] tableNames) { 128 if (tableNames != null) { 129 for (String table : tableNames) 130 this.targetTableSet.add(TableName.valueOf(table)); 131 } 132 } 133 134 /** 135 * @return the new RegionAssignmentSnapshot 136 * @throws IOException 137 */ 138 public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() 139 throws IOException { 140 SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = 141 new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); 142 currentAssignmentShapshot.initialize(); 143 return currentAssignmentShapshot; 144 } 145 146 /** 147 * Verify the region placement is consistent with the assignment plan 148 * @param isDetailMode 149 * @return reports 150 * @throws IOException 151 */ 152 public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode) 153 throws IOException { 154 System.out.println("Start to verify the region assignment and " + 155 "generate the verification report"); 156 // Get the region assignment snapshot 157 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 158 159 // Get all the tables 160 Set<TableName> tables = snapshot.getTableSet(); 161 162 // Get the region locality map 163 Map<String, Map<String, Float>> regionLocalityMap = null; 164 if (this.enforceLocality == true) { 165 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 166 } 167 List<AssignmentVerificationReport> reports = new ArrayList<>(); 168 // Iterate all the tables to fill up the verification report 169 for (TableName table : tables) { 170 if (!this.targetTableSet.isEmpty() && 171 !this.targetTableSet.contains(table)) { 172 continue; 173 } 174 AssignmentVerificationReport report = new AssignmentVerificationReport(); 175 report.fillUp(table, snapshot, regionLocalityMap); 176 report.print(isDetailMode); 177 reports.add(report); 178 } 179 return reports; 180 } 181 182 /** 183 * Generate the assignment plan for the existing table 184 * 185 * @param tableName 186 * @param assignmentSnapshot 187 * @param regionLocalityMap 188 * @param plan 189 * @param munkresForSecondaryAndTertiary if set on true the assignment plan 190 * for the tertiary and secondary will be generated with Munkres algorithm, 191 * otherwise will be generated using placeSecondaryAndTertiaryRS 192 * @throws IOException 193 */ 194 private void genAssignmentPlan(TableName tableName, 195 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot, 196 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan, 197 boolean munkresForSecondaryAndTertiary) throws IOException { 198 // Get the all the regions for the current table 199 List<RegionInfo> regions = 200 assignmentSnapshot.getTableToRegionMap().get(tableName); 201 int numRegions = regions.size(); 202 203 // Get the current assignment map 204 Map<RegionInfo, ServerName> currentAssignmentMap = 205 assignmentSnapshot.getRegionToRegionServerMap(); 206 207 // Get the all the region servers 208 List<ServerName> servers = new ArrayList<>(); 209 try (Admin admin = this.connection.getAdmin()) { 210 servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 211 .getLiveServerMetrics().keySet()); 212 } 213 214 LOG.info("Start to generate assignment plan for " + numRegions + 215 " regions from table " + tableName + " with " + 216 servers.size() + " region servers"); 217 218 int slotsPerServer = (int) Math.ceil((float) numRegions / 219 servers.size()); 220 int regionSlots = slotsPerServer * servers.size(); 221 222 // Compute the primary, secondary and tertiary costs for each region/server 223 // pair. These costs are based only on node locality and rack locality, and 224 // will be modified later. 225 float[][] primaryCost = new float[numRegions][regionSlots]; 226 float[][] secondaryCost = new float[numRegions][regionSlots]; 227 float[][] tertiaryCost = new float[numRegions][regionSlots]; 228 229 if (this.enforceLocality && regionLocalityMap != null) { 230 // Transform the locality mapping into a 2D array, assuming that any 231 // unspecified locality value is 0. 232 float[][] localityPerServer = new float[numRegions][regionSlots]; 233 for (int i = 0; i < numRegions; i++) { 234 Map<String, Float> serverLocalityMap = 235 regionLocalityMap.get(regions.get(i).getEncodedName()); 236 if (serverLocalityMap == null) { 237 continue; 238 } 239 for (int j = 0; j < servers.size(); j++) { 240 String serverName = servers.get(j).getHostname(); 241 if (serverName == null) { 242 continue; 243 } 244 Float locality = serverLocalityMap.get(serverName); 245 if (locality == null) { 246 continue; 247 } 248 for (int k = 0; k < slotsPerServer; k++) { 249 // If we can't find the locality of a region to a server, which occurs 250 // because locality is only reported for servers which have some 251 // blocks of a region local, then the locality for that pair is 0. 252 localityPerServer[i][j * slotsPerServer + k] = locality.floatValue(); 253 } 254 } 255 } 256 257 // Compute the total rack locality for each region in each rack. The total 258 // rack locality is the sum of the localities of a region on all servers in 259 // a rack. 260 Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>(); 261 for (int i = 0; i < numRegions; i++) { 262 RegionInfo region = regions.get(i); 263 for (int j = 0; j < regionSlots; j += slotsPerServer) { 264 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 265 Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack); 266 if (rackLocality == null) { 267 rackLocality = new HashMap<>(); 268 rackRegionLocality.put(rack, rackLocality); 269 } 270 Float localityObj = rackLocality.get(region); 271 float locality = localityObj == null ? 0 : localityObj.floatValue(); 272 locality += localityPerServer[i][j]; 273 rackLocality.put(region, locality); 274 } 275 } 276 for (int i = 0; i < numRegions; i++) { 277 for (int j = 0; j < regionSlots; j++) { 278 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 279 Float totalRackLocalityObj = 280 rackRegionLocality.get(rack).get(regions.get(i)); 281 float totalRackLocality = totalRackLocalityObj == null ? 282 0 : totalRackLocalityObj.floatValue(); 283 284 // Primary cost aims to favor servers with high node locality and low 285 // rack locality, so that secondaries and tertiaries can be chosen for 286 // nodes with high rack locality. This might give primaries with 287 // slightly less locality at first compared to a cost which only 288 // considers the node locality, but should be better in the long run. 289 primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - 290 totalRackLocality); 291 292 // Secondary cost aims to favor servers with high node locality and high 293 // rack locality since the tertiary will be chosen from the same rack as 294 // the secondary. This could be negative, but that is okay. 295 secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality); 296 297 // Tertiary cost is only concerned with the node locality. It will later 298 // be restricted to only hosts on the same rack as the secondary. 299 tertiaryCost[i][j] = 1 - localityPerServer[i][j]; 300 } 301 } 302 } 303 304 if (this.enforceMinAssignmentMove && currentAssignmentMap != null) { 305 // We want to minimize the number of regions which move as the result of a 306 // new assignment. Therefore, slightly penalize any placement which is for 307 // a host that is not currently serving the region. 308 for (int i = 0; i < numRegions; i++) { 309 for (int j = 0; j < servers.size(); j++) { 310 ServerName currentAddress = currentAssignmentMap.get(regions.get(i)); 311 if (currentAddress != null && 312 !currentAddress.equals(servers.get(j))) { 313 for (int k = 0; k < slotsPerServer; k++) { 314 primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY; 315 } 316 } 317 } 318 } 319 } 320 321 // Artificially increase cost of last slot of each server to evenly 322 // distribute the slop, otherwise there will be a few servers with too few 323 // regions and many servers with the max number of regions. 324 for (int i = 0; i < numRegions; i++) { 325 for (int j = 0; j < regionSlots; j += slotsPerServer) { 326 primaryCost[i][j] += LAST_SLOT_COST_PENALTY; 327 secondaryCost[i][j] += LAST_SLOT_COST_PENALTY; 328 tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY; 329 } 330 } 331 332 RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, 333 regionSlots); 334 primaryCost = randomizedMatrix.transform(primaryCost); 335 int[] primaryAssignment = new MunkresAssignment(primaryCost).solve(); 336 primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment); 337 338 // Modify the secondary and tertiary costs for each region/server pair to 339 // prevent a region from being assigned to the same rack for both primary 340 // and either one of secondary or tertiary. 341 for (int i = 0; i < numRegions; i++) { 342 int slot = primaryAssignment[i]; 343 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 344 for (int k = 0; k < servers.size(); k++) { 345 if (!rackManager.getRack(servers.get(k)).equals(rack)) { 346 continue; 347 } 348 if (k == slot / slotsPerServer) { 349 // Same node, do not place secondary or tertiary here ever. 350 for (int m = 0; m < slotsPerServer; m++) { 351 secondaryCost[i][k * slotsPerServer + m] = MAX_COST; 352 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 353 } 354 } else { 355 // Same rack, do not place secondary or tertiary here if possible. 356 for (int m = 0; m < slotsPerServer; m++) { 357 secondaryCost[i][k * slotsPerServer + m] = AVOID_COST; 358 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 359 } 360 } 361 } 362 } 363 if (munkresForSecondaryAndTertiary) { 364 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 365 secondaryCost = randomizedMatrix.transform(secondaryCost); 366 int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve(); 367 secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment); 368 369 // Modify the tertiary costs for each region/server pair to ensure that a 370 // region is assigned to a tertiary server on the same rack as its secondary 371 // server, but not the same server in that rack. 372 for (int i = 0; i < numRegions; i++) { 373 int slot = secondaryAssignment[i]; 374 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 375 for (int k = 0; k < servers.size(); k++) { 376 if (k == slot / slotsPerServer) { 377 // Same node, do not place tertiary here ever. 378 for (int m = 0; m < slotsPerServer; m++) { 379 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 380 } 381 } else { 382 if (rackManager.getRack(servers.get(k)).equals(rack)) { 383 continue; 384 } 385 // Different rack, do not place tertiary here if possible. 386 for (int m = 0; m < slotsPerServer; m++) { 387 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 388 } 389 } 390 } 391 } 392 393 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 394 tertiaryCost = randomizedMatrix.transform(tertiaryCost); 395 int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve(); 396 tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment); 397 398 for (int i = 0; i < numRegions; i++) { 399 List<ServerName> favoredServers 400 = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 401 ServerName s = servers.get(primaryAssignment[i] / slotsPerServer); 402 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 403 ServerName.NON_STARTCODE)); 404 405 s = servers.get(secondaryAssignment[i] / slotsPerServer); 406 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 407 ServerName.NON_STARTCODE)); 408 409 s = servers.get(tertiaryAssignment[i] / slotsPerServer); 410 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 411 ServerName.NON_STARTCODE)); 412 // Update the assignment plan 413 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 414 } 415 LOG.info("Generated the assignment plan for " + numRegions + 416 " regions from table " + tableName + " with " + 417 servers.size() + " region servers"); 418 LOG.info("Assignment plan for secondary and tertiary generated " + 419 "using MunkresAssignment"); 420 } else { 421 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(); 422 for (int i = 0; i < numRegions; i++) { 423 primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer)); 424 } 425 FavoredNodeAssignmentHelper favoredNodeHelper = 426 new FavoredNodeAssignmentHelper(servers, conf); 427 favoredNodeHelper.initialize(); 428 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = 429 favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap); 430 for (int i = 0; i < numRegions; i++) { 431 List<ServerName> favoredServers 432 = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 433 RegionInfo currentRegion = regions.get(i); 434 ServerName s = primaryRSMap.get(currentRegion); 435 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 436 ServerName.NON_STARTCODE)); 437 438 ServerName[] secondaryAndTertiary = 439 secondaryAndTertiaryMap.get(currentRegion); 440 s = secondaryAndTertiary[0]; 441 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 442 ServerName.NON_STARTCODE)); 443 444 s = secondaryAndTertiary[1]; 445 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(), 446 ServerName.NON_STARTCODE)); 447 // Update the assignment plan 448 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 449 } 450 LOG.info("Generated the assignment plan for " + numRegions + 451 " regions from table " + tableName + " with " + 452 servers.size() + " region servers"); 453 LOG.info("Assignment plan for secondary and tertiary generated " + 454 "using placeSecondaryAndTertiaryWithRestrictions method"); 455 } 456 } 457 458 public FavoredNodesPlan getNewAssignmentPlan() throws IOException { 459 // Get the current region assignment snapshot by scanning from the META 460 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = 461 this.getRegionAssignmentSnapshot(); 462 463 // Get the region locality map 464 Map<String, Map<String, Float>> regionLocalityMap = null; 465 if (this.enforceLocality) { 466 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 467 } 468 // Initialize the assignment plan 469 FavoredNodesPlan plan = new FavoredNodesPlan(); 470 471 // Get the table to region mapping 472 Map<TableName, List<RegionInfo>> tableToRegionMap = 473 assignmentSnapshot.getTableToRegionMap(); 474 LOG.info("Start to generate the new assignment plan for the " + 475 + tableToRegionMap.keySet().size() + " tables" ); 476 for (TableName table : tableToRegionMap.keySet()) { 477 try { 478 if (!this.targetTableSet.isEmpty() && 479 !this.targetTableSet.contains(table)) { 480 continue; 481 } 482 // TODO: maybe run the placement in parallel for each table 483 genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan, 484 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY); 485 } catch (Exception e) { 486 LOG.error("Get some exceptions for placing primary region server" + 487 "for table " + table + " because " + e); 488 } 489 } 490 LOG.info("Finish to generate the new assignment plan for the " + 491 + tableToRegionMap.keySet().size() + " tables" ); 492 return plan; 493 } 494 495 /** 496 * Some algorithms for solving the assignment problem may traverse workers or 497 * jobs in linear order which may result in skewing the assignments of the 498 * first jobs in the matrix toward the last workers in the matrix if the 499 * costs are uniform. To avoid this kind of clumping, we can randomize the 500 * rows and columns of the cost matrix in a reversible way, such that the 501 * solution to the assignment problem can be interpreted in terms of the 502 * original untransformed cost matrix. Rows and columns are transformed 503 * independently such that the elements contained in any row of the input 504 * matrix are the same as the elements in the corresponding output matrix, 505 * and each row has its elements transformed in the same way. Similarly for 506 * columns. 507 */ 508 protected static class RandomizedMatrix { 509 private final int rows; 510 private final int cols; 511 private final int[] rowTransform; 512 private final int[] rowInverse; 513 private final int[] colTransform; 514 private final int[] colInverse; 515 516 /** 517 * Create a randomization scheme for a matrix of a given size. 518 * @param rows the number of rows in the matrix 519 * @param cols the number of columns in the matrix 520 */ 521 public RandomizedMatrix(int rows, int cols) { 522 this.rows = rows; 523 this.cols = cols; 524 Random random = new Random(); 525 rowTransform = new int[rows]; 526 rowInverse = new int[rows]; 527 for (int i = 0; i < rows; i++) { 528 rowTransform[i] = i; 529 } 530 // Shuffle the row indices. 531 for (int i = rows - 1; i >= 0; i--) { 532 int r = random.nextInt(i + 1); 533 int temp = rowTransform[r]; 534 rowTransform[r] = rowTransform[i]; 535 rowTransform[i] = temp; 536 } 537 // Generate the inverse row indices. 538 for (int i = 0; i < rows; i++) { 539 rowInverse[rowTransform[i]] = i; 540 } 541 542 colTransform = new int[cols]; 543 colInverse = new int[cols]; 544 for (int i = 0; i < cols; i++) { 545 colTransform[i] = i; 546 } 547 // Shuffle the column indices. 548 for (int i = cols - 1; i >= 0; i--) { 549 int r = random.nextInt(i + 1); 550 int temp = colTransform[r]; 551 colTransform[r] = colTransform[i]; 552 colTransform[i] = temp; 553 } 554 // Generate the inverse column indices. 555 for (int i = 0; i < cols; i++) { 556 colInverse[colTransform[i]] = i; 557 } 558 } 559 560 /** 561 * Copy a given matrix into a new matrix, transforming each row index and 562 * each column index according to the randomization scheme that was created 563 * at construction time. 564 * @param matrix the cost matrix to transform 565 * @return a new matrix with row and column indices transformed 566 */ 567 public float[][] transform(float[][] matrix) { 568 float[][] result = new float[rows][cols]; 569 for (int i = 0; i < rows; i++) { 570 for (int j = 0; j < cols; j++) { 571 result[rowTransform[i]][colTransform[j]] = matrix[i][j]; 572 } 573 } 574 return result; 575 } 576 577 /** 578 * Copy a given matrix into a new matrix, transforming each row index and 579 * each column index according to the inverse of the randomization scheme 580 * that was created at construction time. 581 * @param matrix the cost matrix to be inverted 582 * @return a new matrix with row and column indices inverted 583 */ 584 public float[][] invert(float[][] matrix) { 585 float[][] result = new float[rows][cols]; 586 for (int i = 0; i < rows; i++) { 587 for (int j = 0; j < cols; j++) { 588 result[rowInverse[i]][colInverse[j]] = matrix[i][j]; 589 } 590 } 591 return result; 592 } 593 594 /** 595 * Given an array where each element {@code indices[i]} represents the 596 * randomized column index corresponding to randomized row index {@code i}, 597 * create a new array with the corresponding inverted indices. 598 * @param indices an array of transformed indices to be inverted 599 * @return an array of inverted indices 600 */ 601 public int[] invertIndices(int[] indices) { 602 int[] result = new int[indices.length]; 603 for (int i = 0; i < indices.length; i++) { 604 result[rowInverse[i]] = colInverse[indices[i]]; 605 } 606 return result; 607 } 608 } 609 610 /** 611 * Print the assignment plan to the system output stream 612 * @param plan 613 */ 614 public static void printAssignmentPlan(FavoredNodesPlan plan) { 615 if (plan == null) return; 616 LOG.info("========== Start to print the assignment plan ================"); 617 // sort the map based on region info 618 Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap()); 619 620 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 621 622 String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); 623 String regionName = entry.getKey(); 624 LOG.info("Region: " + regionName ); 625 LOG.info("Its favored nodes: " + serverList); 626 } 627 LOG.info("========== Finish to print the assignment plan ================"); 628 } 629 630 /** 631 * Update the assignment plan into hbase:meta 632 * @param plan the assignments plan to be updated into hbase:meta 633 * @throws IOException if cannot update assignment plan in hbase:meta 634 */ 635 public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) 636 throws IOException { 637 try { 638 LOG.info("Start to update the hbase:meta with the new assignment plan"); 639 Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap(); 640 Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size()); 641 Map<String, RegionInfo> regionToRegionInfoMap = 642 getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap(); 643 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 644 planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue()); 645 } 646 647 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf); 648 LOG.info("Updated the hbase:meta with the new assignment plan"); 649 } catch (Exception e) { 650 LOG.error("Failed to update hbase:meta with the new assignment" + 651 "plan because " + e.getMessage()); 652 } 653 } 654 655 /** 656 * Update the assignment plan to all the region servers 657 * @param plan 658 * @throws IOException 659 */ 660 private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) 661 throws IOException{ 662 LOG.info("Start to update the region servers with the new assignment plan"); 663 // Get the region to region server map 664 Map<ServerName, List<RegionInfo>> currentAssignment = 665 this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); 666 667 // track of the failed and succeeded updates 668 int succeededNum = 0; 669 Map<ServerName, Exception> failedUpdateMap = new HashMap<>(); 670 671 for (Map.Entry<ServerName, List<RegionInfo>> entry : 672 currentAssignment.entrySet()) { 673 List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>(); 674 try { 675 // Keep track of the favored updates for the current region server 676 FavoredNodesPlan singleServerPlan = null; 677 // Find out all the updates for the current region server 678 for (RegionInfo region : entry.getValue()) { 679 List<ServerName> favoredServerList = plan.getFavoredNodes(region); 680 if (favoredServerList != null && 681 favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { 682 // Create the single server plan if necessary 683 if (singleServerPlan == null) { 684 singleServerPlan = new FavoredNodesPlan(); 685 } 686 // Update the single server update 687 singleServerPlan.updateFavoredNodesMap(region, favoredServerList); 688 regionUpdateInfos.add(new Pair<>(region, favoredServerList)); 689 } 690 } 691 if (singleServerPlan != null) { 692 // Update the current region server with its updated favored nodes 693 BlockingInterface currentRegionServer = 694 ((ClusterConnection)this.connection).getAdmin(entry.getKey()); 695 UpdateFavoredNodesRequest request = 696 RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); 697 698 UpdateFavoredNodesResponse updateFavoredNodesResponse = 699 currentRegionServer.updateFavoredNodes(null, request); 700 LOG.info("Region server " + 701 ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() + 702 " has updated " + updateFavoredNodesResponse.getResponse() + " / " + 703 singleServerPlan.getAssignmentMap().size() + 704 " regions with the assignment plan"); 705 succeededNum ++; 706 } 707 } catch (Exception e) { 708 failedUpdateMap.put(entry.getKey(), e); 709 } 710 } 711 // log the succeeded updates 712 LOG.info("Updated " + succeededNum + " region servers with " + 713 "the new assignment plan"); 714 715 // log the failed updates 716 int failedNum = failedUpdateMap.size(); 717 if (failedNum != 0) { 718 LOG.error("Failed to update the following + " + failedNum + 719 " region servers with its corresponding favored nodes"); 720 for (Map.Entry<ServerName, Exception> entry : 721 failedUpdateMap.entrySet() ) { 722 LOG.error("Failed to update " + entry.getKey().getHostAndPort() + 723 " because of " + entry.getValue().getMessage()); 724 } 725 } 726 } 727 728 public void updateAssignmentPlan(FavoredNodesPlan plan) 729 throws IOException { 730 LOG.info("Start to update the new assignment plan for the hbase:meta table and" + 731 " the region servers"); 732 // Update the new assignment plan to META 733 updateAssignmentPlanToMeta(plan); 734 // Update the new assignment plan to Region Servers 735 updateAssignmentPlanToRegionServers(plan); 736 LOG.info("Finish to update the new assignment plan for the hbase:meta table and" + 737 " the region servers"); 738 } 739 740 /** 741 * Return how many regions will move per table since their primary RS will 742 * change 743 * 744 * @param newPlan - new AssignmentPlan 745 * @return how many primaries will move per table 746 */ 747 public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) 748 throws IOException { 749 Map<TableName, Integer> movesPerTable = new HashMap<>(); 750 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 751 Map<TableName, List<RegionInfo>> tableToRegions = snapshot 752 .getTableToRegionMap(); 753 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 754 Set<TableName> tables = snapshot.getTableSet(); 755 for (TableName table : tables) { 756 int movedPrimaries = 0; 757 if (!this.targetTableSet.isEmpty() 758 && !this.targetTableSet.contains(table)) { 759 continue; 760 } 761 List<RegionInfo> regions = tableToRegions.get(table); 762 for (RegionInfo region : regions) { 763 List<ServerName> oldServers = oldPlan.getFavoredNodes(region); 764 List<ServerName> newServers = newPlan.getFavoredNodes(region); 765 if (oldServers != null && newServers != null) { 766 ServerName oldPrimary = oldServers.get(0); 767 ServerName newPrimary = newServers.get(0); 768 if (oldPrimary.compareTo(newPrimary) != 0) { 769 movedPrimaries++; 770 } 771 } 772 } 773 movesPerTable.put(table, movedPrimaries); 774 } 775 return movesPerTable; 776 } 777 778 /** 779 * Compares two plans and check whether the locality dropped or increased 780 * (prints the information as a string) also prints the baseline locality 781 * 782 * @param movesPerTable - how many primary regions will move per table 783 * @param regionLocalityMap - locality map from FS 784 * @param newPlan - new assignment plan 785 * @throws IOException 786 */ 787 public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable, 788 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan) 789 throws IOException { 790 // localities for primary, secondary and tertiary 791 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 792 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 793 Set<TableName> tables = snapshot.getTableSet(); 794 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap(); 795 for (TableName table : tables) { 796 float[] deltaLocality = new float[3]; 797 float[] locality = new float[3]; 798 if (!this.targetTableSet.isEmpty() 799 && !this.targetTableSet.contains(table)) { 800 continue; 801 } 802 List<RegionInfo> regions = tableToRegionsMap.get(table); 803 System.out.println("=================================================="); 804 System.out.println("Assignment Plan Projection Report For Table: " + table); 805 System.out.println("\t Total regions: " + regions.size()); 806 System.out.println("\t" + movesPerTable.get(table) 807 + " primaries will move due to their primary has changed"); 808 for (RegionInfo currentRegion : regions) { 809 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion 810 .getEncodedName()); 811 if (regionLocality == null) { 812 continue; 813 } 814 List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion); 815 List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion); 816 if (newServers != null && oldServers != null) { 817 int i=0; 818 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 819 ServerName newServer = newServers.get(p.ordinal()); 820 ServerName oldServer = oldServers.get(p.ordinal()); 821 Float oldLocality = 0f; 822 if (oldServers != null) { 823 oldLocality = regionLocality.get(oldServer.getHostname()); 824 if (oldLocality == null) { 825 oldLocality = 0f; 826 } 827 locality[i] += oldLocality; 828 } 829 Float newLocality = regionLocality.get(newServer.getHostname()); 830 if (newLocality == null) { 831 newLocality = 0f; 832 } 833 deltaLocality[i] += newLocality - oldLocality; 834 i++; 835 } 836 } 837 } 838 DecimalFormat df = new java.text.DecimalFormat( "#.##"); 839 for (int i = 0; i < deltaLocality.length; i++) { 840 System.out.print("\t\t Baseline locality for "); 841 if (i == 0) { 842 System.out.print("primary "); 843 } else if (i == 1) { 844 System.out.print("secondary "); 845 } else if (i == 2) { 846 System.out.print("tertiary "); 847 } 848 System.out.println(df.format(100 * locality[i] / regions.size()) + "%"); 849 System.out.print("\t\t Locality will change with the new plan: "); 850 System.out.println(df.format(100 * deltaLocality[i] / regions.size()) 851 + "%"); 852 } 853 System.out.println("\t Baseline dispersion"); 854 printDispersionScores(table, snapshot, regions.size(), null, true); 855 System.out.println("\t Projected dispersion"); 856 printDispersionScores(table, snapshot, regions.size(), newPlan, true); 857 } 858 } 859 860 public void printDispersionScores(TableName table, 861 SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan, 862 boolean simplePrint) { 863 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 864 return; 865 } 866 AssignmentVerificationReport report = new AssignmentVerificationReport(); 867 report.fillUpDispersion(table, snapshot, newPlan); 868 List<Float> dispersion = report.getDispersionInformation(); 869 if (simplePrint) { 870 DecimalFormat df = new java.text.DecimalFormat("#.##"); 871 System.out.println("\tAvg dispersion score: " 872 + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: " 873 + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: " 874 + df.format(dispersion.get(2)) + " hosts;"); 875 } else { 876 LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions 877 + " ; The average dispersion score is " + dispersion.get(0)); 878 } 879 } 880 881 public void printLocalityAndDispersionForCurrentPlan( 882 Map<String, Map<String, Float>> regionLocalityMap) throws IOException { 883 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 884 FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan(); 885 Set<TableName> tables = snapshot.getTableSet(); 886 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot 887 .getTableToRegionMap(); 888 for (TableName table : tables) { 889 float[] locality = new float[3]; 890 if (!this.targetTableSet.isEmpty() 891 && !this.targetTableSet.contains(table)) { 892 continue; 893 } 894 List<RegionInfo> regions = tableToRegionsMap.get(table); 895 for (RegionInfo currentRegion : regions) { 896 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion 897 .getEncodedName()); 898 if (regionLocality == null) { 899 continue; 900 } 901 List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion); 902 if (servers != null) { 903 int i = 0; 904 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 905 ServerName server = servers.get(p.ordinal()); 906 Float currentLocality = 0f; 907 if (servers != null) { 908 currentLocality = regionLocality.get(server.getHostname()); 909 if (currentLocality == null) { 910 currentLocality = 0f; 911 } 912 locality[i] += currentLocality; 913 } 914 i++; 915 } 916 } 917 } 918 for (int i = 0; i < locality.length; i++) { 919 String copy = null; 920 if (i == 0) { 921 copy = "primary"; 922 } else if (i == 1) { 923 copy = "secondary"; 924 } else if (i == 2) { 925 copy = "tertiary" ; 926 } 927 float avgLocality = 100 * locality[i] / regions.size(); 928 LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size() 929 + " ; The average locality for " + copy+ " is " + avgLocality + " %"); 930 } 931 printDispersionScores(table, snapshot, regions.size(), null, false); 932 } 933 } 934 935 /** 936 * @param favoredNodesStr The String of favored nodes 937 * @return the list of ServerName for the byte array of favored nodes. 938 */ 939 public static List<ServerName> getFavoredNodeList(String favoredNodesStr) { 940 String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ","); 941 if (favoredNodesArray == null) 942 return null; 943 944 List<ServerName> serverList = new ArrayList<>(); 945 for (String hostNameAndPort : favoredNodesArray) { 946 serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE)); 947 } 948 return serverList; 949 } 950 951 public static void main(String args[]) throws IOException { 952 Options opt = new Options(); 953 opt.addOption("w", "write", false, "write the assignments to hbase:meta only"); 954 opt.addOption("u", "update", false, 955 "update the assignments to hbase:meta and RegionServers together"); 956 opt.addOption("n", "dry-run", false, "do not write assignments to META"); 957 opt.addOption("v", "verify", false, "verify current assignments against META"); 958 opt.addOption("p", "print", false, "print the current assignment plan in META"); 959 opt.addOption("h", "help", false, "print usage"); 960 opt.addOption("d", "verification-details", false, 961 "print the details of verification report"); 962 963 opt.addOption("zk", true, "to set the zookeeper quorum"); 964 opt.addOption("fs", true, "to set HDFS"); 965 opt.addOption("hbase_root", true, "to set hbase_root directory"); 966 967 opt.addOption("overwrite", false, 968 "overwrite the favored nodes for a single region," + 969 "for example: -update -r regionName -f server1:port,server2:port,server3:port"); 970 opt.addOption("r", true, "The region name that needs to be updated"); 971 opt.addOption("f", true, "The new favored nodes"); 972 973 opt.addOption("tables", true, 974 "The list of table names splitted by ',' ;" + 975 "For example: -tables: t1,t2,...,tn"); 976 opt.addOption("l", "locality", true, "enforce the maximum locality"); 977 opt.addOption("m", "min-move", true, "enforce minimum assignment move"); 978 opt.addOption("diff", false, "calculate difference between assignment plans"); 979 opt.addOption("munkres", false, 980 "use munkres to place secondaries and tertiaries"); 981 opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " + 982 "information for current plan"); 983 try { 984 CommandLine cmd = new GnuParser().parse(opt, args); 985 Configuration conf = HBaseConfiguration.create(); 986 987 boolean enforceMinAssignmentMove = true; 988 boolean enforceLocality = true; 989 boolean verificationDetails = false; 990 991 // Read all the options 992 if ((cmd.hasOption("l") && 993 cmd.getOptionValue("l").equalsIgnoreCase("false")) || 994 (cmd.hasOption("locality") && 995 cmd.getOptionValue("locality").equalsIgnoreCase("false"))) { 996 enforceLocality = false; 997 } 998 999 if ((cmd.hasOption("m") && 1000 cmd.getOptionValue("m").equalsIgnoreCase("false")) || 1001 (cmd.hasOption("min-move") && 1002 cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) { 1003 enforceMinAssignmentMove = false; 1004 } 1005 1006 if (cmd.hasOption("zk")) { 1007 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk")); 1008 LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM)); 1009 } 1010 1011 if (cmd.hasOption("fs")) { 1012 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs")); 1013 LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); 1014 } 1015 1016 if (cmd.hasOption("hbase_root")) { 1017 conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root")); 1018 LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR)); 1019 } 1020 1021 // Create the region placement obj 1022 RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality, 1023 enforceMinAssignmentMove); 1024 1025 if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { 1026 verificationDetails = true; 1027 } 1028 1029 if (cmd.hasOption("tables")) { 1030 String tableNameListStr = cmd.getOptionValue("tables"); 1031 String[] tableNames = StringUtils.split(tableNameListStr, ","); 1032 rp.setTargetTableName(tableNames); 1033 } 1034 1035 if (cmd.hasOption("munkres")) { 1036 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; 1037 } 1038 1039 // Read all the modes 1040 if (cmd.hasOption("v") || cmd.hasOption("verify")) { 1041 // Verify the region placement. 1042 rp.verifyRegionPlacement(verificationDetails); 1043 } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { 1044 // Generate the assignment plan only without updating the hbase:meta and RS 1045 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1046 printAssignmentPlan(plan); 1047 } else if (cmd.hasOption("w") || cmd.hasOption("write")) { 1048 // Generate the new assignment plan 1049 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1050 // Print the new assignment plan 1051 printAssignmentPlan(plan); 1052 // Write the new assignment plan to META 1053 rp.updateAssignmentPlanToMeta(plan); 1054 } else if (cmd.hasOption("u") || cmd.hasOption("update")) { 1055 // Generate the new assignment plan 1056 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 1057 // Print the new assignment plan 1058 printAssignmentPlan(plan); 1059 // Update the assignment to hbase:meta and Region Servers 1060 rp.updateAssignmentPlan(plan); 1061 } else if (cmd.hasOption("diff")) { 1062 FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); 1063 Map<String, Map<String, Float>> locality = FSUtils 1064 .getRegionDegreeLocalityMappingFromFS(conf); 1065 Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan); 1066 rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); 1067 System.out.println("Do you want to update the assignment plan? [y/n]"); 1068 Scanner s = new Scanner(System.in); 1069 String input = s.nextLine().trim(); 1070 if (input.equals("y")) { 1071 System.out.println("Updating assignment plan..."); 1072 rp.updateAssignmentPlan(newPlan); 1073 } 1074 s.close(); 1075 } else if (cmd.hasOption("ld")) { 1076 Map<String, Map<String, Float>> locality = FSUtils 1077 .getRegionDegreeLocalityMappingFromFS(conf); 1078 rp.printLocalityAndDispersionForCurrentPlan(locality); 1079 } else if (cmd.hasOption("p") || cmd.hasOption("print")) { 1080 FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); 1081 printAssignmentPlan(plan); 1082 } else if (cmd.hasOption("overwrite")) { 1083 if (!cmd.hasOption("f") || !cmd.hasOption("r")) { 1084 throw new IllegalArgumentException("Please specify: " + 1085 " -update -r regionName -f server1:port,server2:port,server3:port"); 1086 } 1087 1088 String regionName = cmd.getOptionValue("r"); 1089 String favoredNodesStr = cmd.getOptionValue("f"); 1090 LOG.info("Going to update the region " + regionName + " with the new favored nodes " + 1091 favoredNodesStr); 1092 List<ServerName> favoredNodes = null; 1093 RegionInfo regionInfo = 1094 rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); 1095 if (regionInfo == null) { 1096 LOG.error("Cannot find the region " + regionName + " from the META"); 1097 } else { 1098 try { 1099 favoredNodes = getFavoredNodeList(favoredNodesStr); 1100 } catch (IllegalArgumentException e) { 1101 LOG.error("Cannot parse the invalid favored nodes because " + e); 1102 } 1103 FavoredNodesPlan newPlan = new FavoredNodesPlan(); 1104 newPlan.updateFavoredNodesMap(regionInfo, favoredNodes); 1105 rp.updateAssignmentPlan(newPlan); 1106 } 1107 } else { 1108 printHelp(opt); 1109 } 1110 } catch (ParseException e) { 1111 printHelp(opt); 1112 } 1113 } 1114}