001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.io.IOException; 021import java.text.DecimalFormat; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Random; 028import java.util.Scanner; 029import java.util.Set; 030import java.util.TreeMap; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ClusterConnection; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; 045import org.apache.hadoop.hbase.favored.FavoredNodesPlan; 046import org.apache.hadoop.hbase.util.FSUtils; 047import org.apache.hadoop.hbase.util.MunkresAssignment; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 054import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 055import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 057import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 058 059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 060import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 064 065/** 066 * A tool that is used for manipulating and viewing favored nodes information for regions. Run with 067 * -h to get a list of the options 068 */ 069@InterfaceAudience.Private 070// TODO: Remove? Unused. Partially implemented only. 071public class RegionPlacementMaintainer { 072 private static final Logger LOG = 073 LoggerFactory.getLogger(RegionPlacementMaintainer.class.getName()); 074 // The cost of a placement that should never be assigned. 075 private static final float MAX_COST = Float.POSITIVE_INFINITY; 076 077 // The cost of a placement that is undesirable but acceptable. 078 private static final float AVOID_COST = 100000f; 079 080 // The amount by which the cost of a placement is increased if it is the 081 // last slot of the server. This is done to more evenly distribute the slop 082 // amongst servers. 083 private static final float LAST_SLOT_COST_PENALTY = 0.5f; 084 085 // The amount by which the cost of a primary placement is penalized if it is 086 // not the host currently serving the region. This is done to minimize moves. 087 private static final float NOT_CURRENT_HOST_PENALTY = 0.1f; 088 089 private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false; 090 091 private Configuration conf; 092 private final boolean enforceLocality; 093 private final boolean enforceMinAssignmentMove; 094 private RackManager rackManager; 095 private Set<TableName> targetTableSet; 096 private final Connection connection; 097 098 public RegionPlacementMaintainer(Configuration conf) { 099 this(conf, true, true); 100 } 101 102 public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality, 103 boolean enforceMinAssignmentMove) { 104 this.conf = conf; 105 this.enforceLocality = enforceLocality; 106 this.enforceMinAssignmentMove = enforceMinAssignmentMove; 107 this.targetTableSet = new HashSet<>(); 108 this.rackManager = new RackManager(conf); 109 try { 110 this.connection = ConnectionFactory.createConnection(this.conf); 111 } catch (IOException e) { 112 throw new RuntimeException(e); 113 } 114 } 115 116 private static void printHelp(Options opt) { 117 new HelpFormatter().printHelp( 118 "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " 119 + "-diff>" + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" 120 + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", 121 opt); 122 } 123 124 public void setTargetTableName(String[] tableNames) { 125 if (tableNames != null) { 126 for (String table : tableNames) 127 this.targetTableSet.add(TableName.valueOf(table)); 128 } 129 } 130 131 /** Returns the new RegionAssignmentSnapshot n */ 132 public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException { 133 SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = 134 new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf)); 135 currentAssignmentShapshot.initialize(); 136 return currentAssignmentShapshot; 137 } 138 139 /** 140 * Verify the region placement is consistent with the assignment plan nnn 141 */ 142 public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode) 143 throws IOException { 144 System.out 145 .println("Start to verify the region assignment and " + "generate the verification report"); 146 // Get the region assignment snapshot 147 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 148 149 // Get all the tables 150 Set<TableName> tables = snapshot.getTableSet(); 151 152 // Get the region locality map 153 Map<String, Map<String, Float>> regionLocalityMap = null; 154 if (this.enforceLocality == true) { 155 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 156 } 157 List<AssignmentVerificationReport> reports = new ArrayList<>(); 158 // Iterate all the tables to fill up the verification report 159 for (TableName table : tables) { 160 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 161 continue; 162 } 163 AssignmentVerificationReport report = new AssignmentVerificationReport(); 164 report.fillUp(table, snapshot, regionLocalityMap); 165 report.print(isDetailMode); 166 reports.add(report); 167 } 168 return reports; 169 } 170 171 /** 172 * Generate the assignment plan for the existing table nnnn * @param 173 * munkresForSecondaryAndTertiary if set on true the assignment plan for the tertiary and 174 * secondary will be generated with Munkres algorithm, otherwise will be generated using 175 * placeSecondaryAndTertiaryRS n 176 */ 177 private void genAssignmentPlan(TableName tableName, 178 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot, 179 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan, 180 boolean munkresForSecondaryAndTertiary) throws IOException { 181 // Get the all the regions for the current table 182 List<RegionInfo> regions = assignmentSnapshot.getTableToRegionMap().get(tableName); 183 int numRegions = regions.size(); 184 185 // Get the current assignment map 186 Map<RegionInfo, ServerName> currentAssignmentMap = 187 assignmentSnapshot.getRegionToRegionServerMap(); 188 189 // Get the all the region servers 190 List<ServerName> servers = new ArrayList<>(); 191 try (Admin admin = this.connection.getAdmin()) { 192 servers.addAll(admin.getRegionServers()); 193 } 194 195 LOG.info("Start to generate assignment plan for " + numRegions + " regions from table " 196 + tableName + " with " + servers.size() + " region servers"); 197 198 int slotsPerServer = (int) Math.ceil((float) numRegions / servers.size()); 199 int regionSlots = slotsPerServer * servers.size(); 200 201 // Compute the primary, secondary and tertiary costs for each region/server 202 // pair. These costs are based only on node locality and rack locality, and 203 // will be modified later. 204 float[][] primaryCost = new float[numRegions][regionSlots]; 205 float[][] secondaryCost = new float[numRegions][regionSlots]; 206 float[][] tertiaryCost = new float[numRegions][regionSlots]; 207 208 if (this.enforceLocality && regionLocalityMap != null) { 209 // Transform the locality mapping into a 2D array, assuming that any 210 // unspecified locality value is 0. 211 float[][] localityPerServer = new float[numRegions][regionSlots]; 212 for (int i = 0; i < numRegions; i++) { 213 Map<String, Float> serverLocalityMap = 214 regionLocalityMap.get(regions.get(i).getEncodedName()); 215 if (serverLocalityMap == null) { 216 continue; 217 } 218 for (int j = 0; j < servers.size(); j++) { 219 String serverName = servers.get(j).getHostname(); 220 if (serverName == null) { 221 continue; 222 } 223 Float locality = serverLocalityMap.get(serverName); 224 if (locality == null) { 225 continue; 226 } 227 for (int k = 0; k < slotsPerServer; k++) { 228 // If we can't find the locality of a region to a server, which occurs 229 // because locality is only reported for servers which have some 230 // blocks of a region local, then the locality for that pair is 0. 231 localityPerServer[i][j * slotsPerServer + k] = locality.floatValue(); 232 } 233 } 234 } 235 236 // Compute the total rack locality for each region in each rack. The total 237 // rack locality is the sum of the localities of a region on all servers in 238 // a rack. 239 Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>(); 240 for (int i = 0; i < numRegions; i++) { 241 RegionInfo region = regions.get(i); 242 for (int j = 0; j < regionSlots; j += slotsPerServer) { 243 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 244 Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack); 245 if (rackLocality == null) { 246 rackLocality = new HashMap<>(); 247 rackRegionLocality.put(rack, rackLocality); 248 } 249 Float localityObj = rackLocality.get(region); 250 float locality = localityObj == null ? 0 : localityObj.floatValue(); 251 locality += localityPerServer[i][j]; 252 rackLocality.put(region, locality); 253 } 254 } 255 for (int i = 0; i < numRegions; i++) { 256 for (int j = 0; j < regionSlots; j++) { 257 String rack = rackManager.getRack(servers.get(j / slotsPerServer)); 258 Float totalRackLocalityObj = rackRegionLocality.get(rack).get(regions.get(i)); 259 float totalRackLocality = 260 totalRackLocalityObj == null ? 0 : totalRackLocalityObj.floatValue(); 261 262 // Primary cost aims to favor servers with high node locality and low 263 // rack locality, so that secondaries and tertiaries can be chosen for 264 // nodes with high rack locality. This might give primaries with 265 // slightly less locality at first compared to a cost which only 266 // considers the node locality, but should be better in the long run. 267 primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - totalRackLocality); 268 269 // Secondary cost aims to favor servers with high node locality and high 270 // rack locality since the tertiary will be chosen from the same rack as 271 // the secondary. This could be negative, but that is okay. 272 secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality); 273 274 // Tertiary cost is only concerned with the node locality. It will later 275 // be restricted to only hosts on the same rack as the secondary. 276 tertiaryCost[i][j] = 1 - localityPerServer[i][j]; 277 } 278 } 279 } 280 281 if (this.enforceMinAssignmentMove && currentAssignmentMap != null) { 282 // We want to minimize the number of regions which move as the result of a 283 // new assignment. Therefore, slightly penalize any placement which is for 284 // a host that is not currently serving the region. 285 for (int i = 0; i < numRegions; i++) { 286 for (int j = 0; j < servers.size(); j++) { 287 ServerName currentAddress = currentAssignmentMap.get(regions.get(i)); 288 if (currentAddress != null && !currentAddress.equals(servers.get(j))) { 289 for (int k = 0; k < slotsPerServer; k++) { 290 primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY; 291 } 292 } 293 } 294 } 295 } 296 297 // Artificially increase cost of last slot of each server to evenly 298 // distribute the slop, otherwise there will be a few servers with too few 299 // regions and many servers with the max number of regions. 300 for (int i = 0; i < numRegions; i++) { 301 for (int j = 0; j < regionSlots; j += slotsPerServer) { 302 primaryCost[i][j] += LAST_SLOT_COST_PENALTY; 303 secondaryCost[i][j] += LAST_SLOT_COST_PENALTY; 304 tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY; 305 } 306 } 307 308 RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 309 primaryCost = randomizedMatrix.transform(primaryCost); 310 int[] primaryAssignment = new MunkresAssignment(primaryCost).solve(); 311 primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment); 312 313 // Modify the secondary and tertiary costs for each region/server pair to 314 // prevent a region from being assigned to the same rack for both primary 315 // and either one of secondary or tertiary. 316 for (int i = 0; i < numRegions; i++) { 317 int slot = primaryAssignment[i]; 318 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 319 for (int k = 0; k < servers.size(); k++) { 320 if (!rackManager.getRack(servers.get(k)).equals(rack)) { 321 continue; 322 } 323 if (k == slot / slotsPerServer) { 324 // Same node, do not place secondary or tertiary here ever. 325 for (int m = 0; m < slotsPerServer; m++) { 326 secondaryCost[i][k * slotsPerServer + m] = MAX_COST; 327 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 328 } 329 } else { 330 // Same rack, do not place secondary or tertiary here if possible. 331 for (int m = 0; m < slotsPerServer; m++) { 332 secondaryCost[i][k * slotsPerServer + m] = AVOID_COST; 333 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 334 } 335 } 336 } 337 } 338 if (munkresForSecondaryAndTertiary) { 339 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 340 secondaryCost = randomizedMatrix.transform(secondaryCost); 341 int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve(); 342 secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment); 343 344 // Modify the tertiary costs for each region/server pair to ensure that a 345 // region is assigned to a tertiary server on the same rack as its secondary 346 // server, but not the same server in that rack. 347 for (int i = 0; i < numRegions; i++) { 348 int slot = secondaryAssignment[i]; 349 String rack = rackManager.getRack(servers.get(slot / slotsPerServer)); 350 for (int k = 0; k < servers.size(); k++) { 351 if (k == slot / slotsPerServer) { 352 // Same node, do not place tertiary here ever. 353 for (int m = 0; m < slotsPerServer; m++) { 354 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; 355 } 356 } else { 357 if (rackManager.getRack(servers.get(k)).equals(rack)) { 358 continue; 359 } 360 // Different rack, do not place tertiary here if possible. 361 for (int m = 0; m < slotsPerServer; m++) { 362 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; 363 } 364 } 365 } 366 } 367 368 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); 369 tertiaryCost = randomizedMatrix.transform(tertiaryCost); 370 int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve(); 371 tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment); 372 373 for (int i = 0; i < numRegions; i++) { 374 List<ServerName> favoredServers = 375 new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 376 ServerName s = servers.get(primaryAssignment[i] / slotsPerServer); 377 favoredServers 378 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 379 380 s = servers.get(secondaryAssignment[i] / slotsPerServer); 381 favoredServers 382 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 383 384 s = servers.get(tertiaryAssignment[i] / slotsPerServer); 385 favoredServers 386 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 387 // Update the assignment plan 388 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 389 } 390 LOG.info("Generated the assignment plan for " + numRegions + " regions from table " 391 + tableName + " with " + servers.size() + " region servers"); 392 LOG.info("Assignment plan for secondary and tertiary generated " + "using MunkresAssignment"); 393 } else { 394 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(); 395 for (int i = 0; i < numRegions; i++) { 396 primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer)); 397 } 398 FavoredNodeAssignmentHelper favoredNodeHelper = 399 new FavoredNodeAssignmentHelper(servers, conf); 400 favoredNodeHelper.initialize(); 401 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = 402 favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap); 403 for (int i = 0; i < numRegions; i++) { 404 List<ServerName> favoredServers = 405 new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM); 406 RegionInfo currentRegion = regions.get(i); 407 ServerName s = primaryRSMap.get(currentRegion); 408 favoredServers 409 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 410 411 ServerName[] secondaryAndTertiary = secondaryAndTertiaryMap.get(currentRegion); 412 s = secondaryAndTertiary[0]; 413 favoredServers 414 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 415 416 s = secondaryAndTertiary[1]; 417 favoredServers 418 .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE)); 419 // Update the assignment plan 420 plan.updateFavoredNodesMap(regions.get(i), favoredServers); 421 } 422 LOG.info("Generated the assignment plan for " + numRegions + " regions from table " 423 + tableName + " with " + servers.size() + " region servers"); 424 LOG.info("Assignment plan for secondary and tertiary generated " 425 + "using placeSecondaryAndTertiaryWithRestrictions method"); 426 } 427 } 428 429 public FavoredNodesPlan getNewAssignmentPlan() throws IOException { 430 // Get the current region assignment snapshot by scanning from the META 431 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = this.getRegionAssignmentSnapshot(); 432 433 // Get the region locality map 434 Map<String, Map<String, Float>> regionLocalityMap = null; 435 if (this.enforceLocality) { 436 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 437 } 438 // Initialize the assignment plan 439 FavoredNodesPlan plan = new FavoredNodesPlan(); 440 441 // Get the table to region mapping 442 Map<TableName, List<RegionInfo>> tableToRegionMap = assignmentSnapshot.getTableToRegionMap(); 443 LOG.info("Start to generate the new assignment plan for the " 444 + +tableToRegionMap.keySet().size() + " tables"); 445 for (TableName table : tableToRegionMap.keySet()) { 446 try { 447 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 448 continue; 449 } 450 // TODO: maybe run the placement in parallel for each table 451 genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan, 452 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY); 453 } catch (Exception e) { 454 LOG.error("Get some exceptions for placing primary region server" + "for table " + table 455 + " because " + e); 456 } 457 } 458 LOG.info("Finish to generate the new assignment plan for the " 459 + +tableToRegionMap.keySet().size() + " tables"); 460 return plan; 461 } 462 463 /** 464 * Some algorithms for solving the assignment problem may traverse workers or jobs in linear order 465 * which may result in skewing the assignments of the first jobs in the matrix toward the last 466 * workers in the matrix if the costs are uniform. To avoid this kind of clumping, we can 467 * randomize the rows and columns of the cost matrix in a reversible way, such that the solution 468 * to the assignment problem can be interpreted in terms of the original untransformed cost 469 * matrix. Rows and columns are transformed independently such that the elements contained in any 470 * row of the input matrix are the same as the elements in the corresponding output matrix, and 471 * each row has its elements transformed in the same way. Similarly for columns. 472 */ 473 protected static class RandomizedMatrix { 474 private final int rows; 475 private final int cols; 476 private final int[] rowTransform; 477 private final int[] rowInverse; 478 private final int[] colTransform; 479 private final int[] colInverse; 480 481 /** 482 * Create a randomization scheme for a matrix of a given size. 483 * @param rows the number of rows in the matrix 484 * @param cols the number of columns in the matrix 485 */ 486 public RandomizedMatrix(int rows, int cols) { 487 this.rows = rows; 488 this.cols = cols; 489 Random random = ThreadLocalRandom.current(); 490 rowTransform = new int[rows]; 491 rowInverse = new int[rows]; 492 for (int i = 0; i < rows; i++) { 493 rowTransform[i] = i; 494 } 495 // Shuffle the row indices. 496 for (int i = rows - 1; i >= 0; i--) { 497 int r = random.nextInt(i + 1); 498 int temp = rowTransform[r]; 499 rowTransform[r] = rowTransform[i]; 500 rowTransform[i] = temp; 501 } 502 // Generate the inverse row indices. 503 for (int i = 0; i < rows; i++) { 504 rowInverse[rowTransform[i]] = i; 505 } 506 507 colTransform = new int[cols]; 508 colInverse = new int[cols]; 509 for (int i = 0; i < cols; i++) { 510 colTransform[i] = i; 511 } 512 // Shuffle the column indices. 513 for (int i = cols - 1; i >= 0; i--) { 514 int r = random.nextInt(i + 1); 515 int temp = colTransform[r]; 516 colTransform[r] = colTransform[i]; 517 colTransform[i] = temp; 518 } 519 // Generate the inverse column indices. 520 for (int i = 0; i < cols; i++) { 521 colInverse[colTransform[i]] = i; 522 } 523 } 524 525 /** 526 * Copy a given matrix into a new matrix, transforming each row index and each column index 527 * according to the randomization scheme that was created at construction time. 528 * @param matrix the cost matrix to transform 529 * @return a new matrix with row and column indices transformed 530 */ 531 public float[][] transform(float[][] matrix) { 532 float[][] result = new float[rows][cols]; 533 for (int i = 0; i < rows; i++) { 534 for (int j = 0; j < cols; j++) { 535 result[rowTransform[i]][colTransform[j]] = matrix[i][j]; 536 } 537 } 538 return result; 539 } 540 541 /** 542 * Copy a given matrix into a new matrix, transforming each row index and each column index 543 * according to the inverse of the randomization scheme that was created at construction time. 544 * @param matrix the cost matrix to be inverted 545 * @return a new matrix with row and column indices inverted 546 */ 547 public float[][] invert(float[][] matrix) { 548 float[][] result = new float[rows][cols]; 549 for (int i = 0; i < rows; i++) { 550 for (int j = 0; j < cols; j++) { 551 result[rowInverse[i]][colInverse[j]] = matrix[i][j]; 552 } 553 } 554 return result; 555 } 556 557 /** 558 * Given an array where each element {@code indices[i]} represents the randomized column index 559 * corresponding to randomized row index {@code i}, create a new array with the corresponding 560 * inverted indices. 561 * @param indices an array of transformed indices to be inverted 562 * @return an array of inverted indices 563 */ 564 public int[] invertIndices(int[] indices) { 565 int[] result = new int[indices.length]; 566 for (int i = 0; i < indices.length; i++) { 567 result[rowInverse[i]] = colInverse[indices[i]]; 568 } 569 return result; 570 } 571 } 572 573 /** 574 * Print the assignment plan to the system output stream n 575 */ 576 public static void printAssignmentPlan(FavoredNodesPlan plan) { 577 if (plan == null) return; 578 LOG.info("========== Start to print the assignment plan ================"); 579 // sort the map based on region info 580 Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap()); 581 582 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 583 584 String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); 585 String regionName = entry.getKey(); 586 LOG.info("Region: " + regionName); 587 LOG.info("Its favored nodes: " + serverList); 588 } 589 LOG.info("========== Finish to print the assignment plan ================"); 590 } 591 592 /** 593 * Update the assignment plan into hbase:meta 594 * @param plan the assignments plan to be updated into hbase:meta 595 * @throws IOException if cannot update assignment plan in hbase:meta 596 */ 597 public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) throws IOException { 598 try { 599 LOG.info("Start to update the hbase:meta with the new assignment plan"); 600 Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap(); 601 Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size()); 602 Map<String, RegionInfo> regionToRegionInfoMap = 603 getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap(); 604 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) { 605 planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue()); 606 } 607 608 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf); 609 LOG.info("Updated the hbase:meta with the new assignment plan"); 610 } catch (Exception e) { 611 LOG.error( 612 "Failed to update hbase:meta with the new assignment" + "plan because " + e.getMessage()); 613 } 614 } 615 616 /** 617 * Update the assignment plan to all the region servers nn 618 */ 619 private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) throws IOException { 620 LOG.info("Start to update the region servers with the new assignment plan"); 621 // Get the region to region server map 622 Map<ServerName, List<RegionInfo>> currentAssignment = 623 this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); 624 625 // track of the failed and succeeded updates 626 int succeededNum = 0; 627 Map<ServerName, Exception> failedUpdateMap = new HashMap<>(); 628 629 for (Map.Entry<ServerName, List<RegionInfo>> entry : currentAssignment.entrySet()) { 630 List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>(); 631 try { 632 // Keep track of the favored updates for the current region server 633 FavoredNodesPlan singleServerPlan = null; 634 // Find out all the updates for the current region server 635 for (RegionInfo region : entry.getValue()) { 636 List<ServerName> favoredServerList = plan.getFavoredNodes(region); 637 if ( 638 favoredServerList != null 639 && favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM 640 ) { 641 // Create the single server plan if necessary 642 if (singleServerPlan == null) { 643 singleServerPlan = new FavoredNodesPlan(); 644 } 645 // Update the single server update 646 singleServerPlan.updateFavoredNodesMap(region, favoredServerList); 647 regionUpdateInfos.add(new Pair<>(region, favoredServerList)); 648 } 649 } 650 if (singleServerPlan != null) { 651 // Update the current region server with its updated favored nodes 652 BlockingInterface currentRegionServer = 653 ((ClusterConnection) this.connection).getAdmin(entry.getKey()); 654 UpdateFavoredNodesRequest request = 655 RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); 656 657 UpdateFavoredNodesResponse updateFavoredNodesResponse = 658 currentRegionServer.updateFavoredNodes(null, request); 659 LOG.info( 660 "Region server " + ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() 661 + " has updated " + updateFavoredNodesResponse.getResponse() + " / " 662 + singleServerPlan.size() + " regions with the assignment plan"); 663 succeededNum++; 664 } 665 } catch (Exception e) { 666 failedUpdateMap.put(entry.getKey(), e); 667 } 668 } 669 // log the succeeded updates 670 LOG.info("Updated " + succeededNum + " region servers with " + "the new assignment plan"); 671 672 // log the failed updates 673 int failedNum = failedUpdateMap.size(); 674 if (failedNum != 0) { 675 LOG.error("Failed to update the following + " + failedNum 676 + " region servers with its corresponding favored nodes"); 677 for (Map.Entry<ServerName, Exception> entry : failedUpdateMap.entrySet()) { 678 LOG.error("Failed to update " + entry.getKey().getAddress() + " because of " 679 + entry.getValue().getMessage()); 680 } 681 } 682 } 683 684 public void updateAssignmentPlan(FavoredNodesPlan plan) throws IOException { 685 LOG.info("Start to update the new assignment plan for the hbase:meta table and" 686 + " the region servers"); 687 // Update the new assignment plan to META 688 updateAssignmentPlanToMeta(plan); 689 // Update the new assignment plan to Region Servers 690 updateAssignmentPlanToRegionServers(plan); 691 LOG.info("Finish to update the new assignment plan for the hbase:meta table and" 692 + " the region servers"); 693 } 694 695 /** 696 * Return how many regions will move per table since their primary RS will change 697 * @param newPlan - new AssignmentPlan 698 * @return how many primaries will move per table 699 */ 700 public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) throws IOException { 701 Map<TableName, Integer> movesPerTable = new HashMap<>(); 702 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 703 Map<TableName, List<RegionInfo>> tableToRegions = snapshot.getTableToRegionMap(); 704 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 705 Set<TableName> tables = snapshot.getTableSet(); 706 for (TableName table : tables) { 707 int movedPrimaries = 0; 708 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 709 continue; 710 } 711 List<RegionInfo> regions = tableToRegions.get(table); 712 for (RegionInfo region : regions) { 713 List<ServerName> oldServers = oldPlan.getFavoredNodes(region); 714 List<ServerName> newServers = newPlan.getFavoredNodes(region); 715 if (oldServers != null && newServers != null) { 716 ServerName oldPrimary = oldServers.get(0); 717 ServerName newPrimary = newServers.get(0); 718 if (oldPrimary.compareTo(newPrimary) != 0) { 719 movedPrimaries++; 720 } 721 } 722 } 723 movesPerTable.put(table, movedPrimaries); 724 } 725 return movesPerTable; 726 } 727 728 /** 729 * Compares two plans and check whether the locality dropped or increased (prints the information 730 * as a string) also prints the baseline locality 731 * @param movesPerTable - how many primary regions will move per table 732 * @param regionLocalityMap - locality map from FS 733 * @param newPlan - new assignment plan n 734 */ 735 public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable, 736 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan) 737 throws IOException { 738 // localities for primary, secondary and tertiary 739 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 740 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan(); 741 Set<TableName> tables = snapshot.getTableSet(); 742 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap(); 743 for (TableName table : tables) { 744 float[] deltaLocality = new float[3]; 745 float[] locality = new float[3]; 746 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 747 continue; 748 } 749 List<RegionInfo> regions = tableToRegionsMap.get(table); 750 System.out.println("=================================================="); 751 System.out.println("Assignment Plan Projection Report For Table: " + table); 752 System.out.println("\t Total regions: " + regions.size()); 753 System.out.println( 754 "\t" + movesPerTable.get(table) + " primaries will move due to their primary has changed"); 755 for (RegionInfo currentRegion : regions) { 756 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName()); 757 if (regionLocality == null) { 758 continue; 759 } 760 List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion); 761 List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion); 762 if (newServers != null && oldServers != null) { 763 int i = 0; 764 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 765 ServerName newServer = newServers.get(p.ordinal()); 766 ServerName oldServer = oldServers.get(p.ordinal()); 767 Float oldLocality = 0f; 768 if (oldServers != null) { 769 oldLocality = regionLocality.get(oldServer.getHostname()); 770 if (oldLocality == null) { 771 oldLocality = 0f; 772 } 773 locality[i] += oldLocality; 774 } 775 Float newLocality = regionLocality.get(newServer.getHostname()); 776 if (newLocality == null) { 777 newLocality = 0f; 778 } 779 deltaLocality[i] += newLocality - oldLocality; 780 i++; 781 } 782 } 783 } 784 DecimalFormat df = new java.text.DecimalFormat("#.##"); 785 for (int i = 0; i < deltaLocality.length; i++) { 786 System.out.print("\t\t Baseline locality for "); 787 if (i == 0) { 788 System.out.print("primary "); 789 } else if (i == 1) { 790 System.out.print("secondary "); 791 } else if (i == 2) { 792 System.out.print("tertiary "); 793 } 794 System.out.println(df.format(100 * locality[i] / regions.size()) + "%"); 795 System.out.print("\t\t Locality will change with the new plan: "); 796 System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + "%"); 797 } 798 System.out.println("\t Baseline dispersion"); 799 printDispersionScores(table, snapshot, regions.size(), null, true); 800 System.out.println("\t Projected dispersion"); 801 printDispersionScores(table, snapshot, regions.size(), newPlan, true); 802 } 803 } 804 805 public void printDispersionScores(TableName table, SnapshotOfRegionAssignmentFromMeta snapshot, 806 int numRegions, FavoredNodesPlan newPlan, boolean simplePrint) { 807 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 808 return; 809 } 810 AssignmentVerificationReport report = new AssignmentVerificationReport(); 811 report.fillUpDispersion(table, snapshot, newPlan); 812 List<Float> dispersion = report.getDispersionInformation(); 813 if (simplePrint) { 814 DecimalFormat df = new java.text.DecimalFormat("#.##"); 815 System.out.println("\tAvg dispersion score: " + df.format(dispersion.get(0)) 816 + " hosts;\tMax dispersion score: " + df.format(dispersion.get(1)) 817 + " hosts;\tMin dispersion score: " + df.format(dispersion.get(2)) + " hosts;"); 818 } else { 819 LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions 820 + " ; The average dispersion score is " + dispersion.get(0)); 821 } 822 } 823 824 public void printLocalityAndDispersionForCurrentPlan( 825 Map<String, Map<String, Float>> regionLocalityMap) throws IOException { 826 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot(); 827 FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan(); 828 Set<TableName> tables = snapshot.getTableSet(); 829 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap(); 830 for (TableName table : tables) { 831 float[] locality = new float[3]; 832 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { 833 continue; 834 } 835 List<RegionInfo> regions = tableToRegionsMap.get(table); 836 for (RegionInfo currentRegion : regions) { 837 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName()); 838 if (regionLocality == null) { 839 continue; 840 } 841 List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion); 842 if (servers != null) { 843 int i = 0; 844 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) { 845 ServerName server = servers.get(p.ordinal()); 846 Float currentLocality = 0f; 847 if (servers != null) { 848 currentLocality = regionLocality.get(server.getHostname()); 849 if (currentLocality == null) { 850 currentLocality = 0f; 851 } 852 locality[i] += currentLocality; 853 } 854 i++; 855 } 856 } 857 } 858 for (int i = 0; i < locality.length; i++) { 859 String copy = null; 860 if (i == 0) { 861 copy = "primary"; 862 } else if (i == 1) { 863 copy = "secondary"; 864 } else if (i == 2) { 865 copy = "tertiary"; 866 } 867 float avgLocality = 100 * locality[i] / regions.size(); 868 LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size() 869 + " ; The average locality for " + copy + " is " + avgLocality + " %"); 870 } 871 printDispersionScores(table, snapshot, regions.size(), null, false); 872 } 873 } 874 875 /** 876 * @param favoredNodesStr The String of favored nodes 877 * @return the list of ServerName for the byte array of favored nodes. 878 */ 879 public static List<ServerName> getFavoredNodeList(String favoredNodesStr) { 880 String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ","); 881 if (favoredNodesArray == null) return null; 882 883 List<ServerName> serverList = new ArrayList<>(); 884 for (String hostNameAndPort : favoredNodesArray) { 885 serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE)); 886 } 887 return serverList; 888 } 889 890 public static void main(String args[]) throws IOException { 891 Options opt = new Options(); 892 opt.addOption("w", "write", false, "write the assignments to hbase:meta only"); 893 opt.addOption("u", "update", false, 894 "update the assignments to hbase:meta and RegionServers together"); 895 opt.addOption("n", "dry-run", false, "do not write assignments to META"); 896 opt.addOption("v", "verify", false, "verify current assignments against META"); 897 opt.addOption("p", "print", false, "print the current assignment plan in META"); 898 opt.addOption("h", "help", false, "print usage"); 899 opt.addOption("d", "verification-details", false, "print the details of verification report"); 900 901 opt.addOption("zk", true, "to set the zookeeper quorum"); 902 opt.addOption("fs", true, "to set HDFS"); 903 opt.addOption("hbase_root", true, "to set hbase_root directory"); 904 905 opt.addOption("overwrite", false, "overwrite the favored nodes for a single region," 906 + "for example: -update -r regionName -f server1:port,server2:port,server3:port"); 907 opt.addOption("r", true, "The region name that needs to be updated"); 908 opt.addOption("f", true, "The new favored nodes"); 909 910 opt.addOption("tables", true, 911 "The list of table names splitted by ',' ;" + "For example: -tables: t1,t2,...,tn"); 912 opt.addOption("l", "locality", true, "enforce the maximum locality"); 913 opt.addOption("m", "min-move", true, "enforce minimum assignment move"); 914 opt.addOption("diff", false, "calculate difference between assignment plans"); 915 opt.addOption("munkres", false, "use munkres to place secondaries and tertiaries"); 916 opt.addOption("ld", "locality-dispersion", false, 917 "print locality and dispersion " + "information for current plan"); 918 try { 919 CommandLine cmd = new GnuParser().parse(opt, args); 920 Configuration conf = HBaseConfiguration.create(); 921 922 boolean enforceMinAssignmentMove = true; 923 boolean enforceLocality = true; 924 boolean verificationDetails = false; 925 926 // Read all the options 927 if ( 928 (cmd.hasOption("l") && cmd.getOptionValue("l").equalsIgnoreCase("false")) 929 || (cmd.hasOption("locality") && cmd.getOptionValue("locality").equalsIgnoreCase("false")) 930 ) { 931 enforceLocality = false; 932 } 933 934 if ( 935 (cmd.hasOption("m") && cmd.getOptionValue("m").equalsIgnoreCase("false")) 936 || (cmd.hasOption("min-move") && cmd.getOptionValue("min-move").equalsIgnoreCase("false")) 937 ) { 938 enforceMinAssignmentMove = false; 939 } 940 941 if (cmd.hasOption("zk")) { 942 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk")); 943 LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM)); 944 } 945 946 if (cmd.hasOption("fs")) { 947 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs")); 948 LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); 949 } 950 951 if (cmd.hasOption("hbase_root")) { 952 conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root")); 953 LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR)); 954 } 955 956 // Create the region placement obj 957 RegionPlacementMaintainer rp = 958 new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove); 959 960 if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { 961 verificationDetails = true; 962 } 963 964 if (cmd.hasOption("tables")) { 965 String tableNameListStr = cmd.getOptionValue("tables"); 966 String[] tableNames = StringUtils.split(tableNameListStr, ","); 967 rp.setTargetTableName(tableNames); 968 } 969 970 if (cmd.hasOption("munkres")) { 971 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; 972 } 973 974 // Read all the modes 975 if (cmd.hasOption("v") || cmd.hasOption("verify")) { 976 // Verify the region placement. 977 rp.verifyRegionPlacement(verificationDetails); 978 } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { 979 // Generate the assignment plan only without updating the hbase:meta and RS 980 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 981 printAssignmentPlan(plan); 982 } else if (cmd.hasOption("w") || cmd.hasOption("write")) { 983 // Generate the new assignment plan 984 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 985 // Print the new assignment plan 986 printAssignmentPlan(plan); 987 // Write the new assignment plan to META 988 rp.updateAssignmentPlanToMeta(plan); 989 } else if (cmd.hasOption("u") || cmd.hasOption("update")) { 990 // Generate the new assignment plan 991 FavoredNodesPlan plan = rp.getNewAssignmentPlan(); 992 // Print the new assignment plan 993 printAssignmentPlan(plan); 994 // Update the assignment to hbase:meta and Region Servers 995 rp.updateAssignmentPlan(plan); 996 } else if (cmd.hasOption("diff")) { 997 FavoredNodesPlan newPlan = rp.getNewAssignmentPlan(); 998 Map<String, Map<String, Float>> locality = 999 FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 1000 Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan); 1001 rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); 1002 System.out.println("Do you want to update the assignment plan? [y/n]"); 1003 Scanner s = new Scanner(System.in); 1004 String input = s.nextLine().trim(); 1005 if (input.equals("y")) { 1006 System.out.println("Updating assignment plan..."); 1007 rp.updateAssignmentPlan(newPlan); 1008 } 1009 s.close(); 1010 } else if (cmd.hasOption("ld")) { 1011 Map<String, Map<String, Float>> locality = 1012 FSUtils.getRegionDegreeLocalityMappingFromFS(conf); 1013 rp.printLocalityAndDispersionForCurrentPlan(locality); 1014 } else if (cmd.hasOption("p") || cmd.hasOption("print")) { 1015 FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan(); 1016 printAssignmentPlan(plan); 1017 } else if (cmd.hasOption("overwrite")) { 1018 if (!cmd.hasOption("f") || !cmd.hasOption("r")) { 1019 throw new IllegalArgumentException("Please specify: " 1020 + " -update -r regionName -f server1:port,server2:port,server3:port"); 1021 } 1022 1023 String regionName = cmd.getOptionValue("r"); 1024 String favoredNodesStr = cmd.getOptionValue("f"); 1025 LOG.info("Going to update the region " + regionName + " with the new favored nodes " 1026 + favoredNodesStr); 1027 List<ServerName> favoredNodes = null; 1028 RegionInfo regionInfo = 1029 rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); 1030 if (regionInfo == null) { 1031 LOG.error("Cannot find the region " + regionName + " from the META"); 1032 } else { 1033 try { 1034 favoredNodes = getFavoredNodeList(favoredNodesStr); 1035 } catch (IllegalArgumentException e) { 1036 LOG.error("Cannot parse the invalid favored nodes because " + e); 1037 } 1038 FavoredNodesPlan newPlan = new FavoredNodesPlan(); 1039 newPlan.updateFavoredNodesMap(regionInfo, favoredNodes); 1040 rp.updateAssignmentPlan(newPlan); 1041 } 1042 } else { 1043 printHelp(opt); 1044 } 1045 } catch (ParseException e) { 1046 printHelp(opt); 1047 } 1048 } 1049}