001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.io.IOException;
021import java.text.DecimalFormat;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Random;
028import java.util.Scanner;
029import java.util.Set;
030import java.util.TreeMap;
031import java.util.concurrent.ThreadLocalRandom;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ClusterConnection;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
045import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.hadoop.hbase.util.MunkresAssignment;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
058
059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
060import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
064
065/**
066 * A tool that is used for manipulating and viewing favored nodes information for regions. Run with
067 * -h to get a list of the options
068 */
069@InterfaceAudience.Private
070// TODO: Remove? Unused. Partially implemented only.
071public class RegionPlacementMaintainer {
072  private static final Logger LOG =
073    LoggerFactory.getLogger(RegionPlacementMaintainer.class.getName());
074  // The cost of a placement that should never be assigned.
075  private static final float MAX_COST = Float.POSITIVE_INFINITY;
076
077  // The cost of a placement that is undesirable but acceptable.
078  private static final float AVOID_COST = 100000f;
079
080  // The amount by which the cost of a placement is increased if it is the
081  // last slot of the server. This is done to more evenly distribute the slop
082  // amongst servers.
083  private static final float LAST_SLOT_COST_PENALTY = 0.5f;
084
085  // The amount by which the cost of a primary placement is penalized if it is
086  // not the host currently serving the region. This is done to minimize moves.
087  private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
088
089  private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
090
091  private Configuration conf;
092  private final boolean enforceLocality;
093  private final boolean enforceMinAssignmentMove;
094  private RackManager rackManager;
095  private Set<TableName> targetTableSet;
096  private final Connection connection;
097
098  public RegionPlacementMaintainer(Configuration conf) {
099    this(conf, true, true);
100  }
101
102  public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
103    boolean enforceMinAssignmentMove) {
104    this.conf = conf;
105    this.enforceLocality = enforceLocality;
106    this.enforceMinAssignmentMove = enforceMinAssignmentMove;
107    this.targetTableSet = new HashSet<>();
108    this.rackManager = new RackManager(conf);
109    try {
110      this.connection = ConnectionFactory.createConnection(this.conf);
111    } catch (IOException e) {
112      throw new RuntimeException(e);
113    }
114  }
115
116  private static void printHelp(Options opt) {
117    new HelpFormatter().printHelp(
118      "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes "
119        + "-diff>" + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]"
120        + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]",
121      opt);
122  }
123
124  public void setTargetTableName(String[] tableNames) {
125    if (tableNames != null) {
126      for (String table : tableNames)
127        this.targetTableSet.add(TableName.valueOf(table));
128    }
129  }
130
131  /** Returns the new RegionAssignmentSnapshot n */
132  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
133    SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
134      new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
135    currentAssignmentShapshot.initialize();
136    return currentAssignmentShapshot;
137  }
138
139  /**
140   * Verify the region placement is consistent with the assignment plan nnn
141   */
142  public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
143    throws IOException {
144    System.out
145      .println("Start to verify the region assignment and " + "generate the verification report");
146    // Get the region assignment snapshot
147    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
148
149    // Get all the tables
150    Set<TableName> tables = snapshot.getTableSet();
151
152    // Get the region locality map
153    Map<String, Map<String, Float>> regionLocalityMap = null;
154    if (this.enforceLocality == true) {
155      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
156    }
157    List<AssignmentVerificationReport> reports = new ArrayList<>();
158    // Iterate all the tables to fill up the verification report
159    for (TableName table : tables) {
160      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
161        continue;
162      }
163      AssignmentVerificationReport report = new AssignmentVerificationReport();
164      report.fillUp(table, snapshot, regionLocalityMap);
165      report.print(isDetailMode);
166      reports.add(report);
167    }
168    return reports;
169  }
170
171  /**
172   * Generate the assignment plan for the existing table nnnn * @param
173   * munkresForSecondaryAndTertiary if set on true the assignment plan for the tertiary and
174   * secondary will be generated with Munkres algorithm, otherwise will be generated using
175   * placeSecondaryAndTertiaryRS n
176   */
177  private void genAssignmentPlan(TableName tableName,
178    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
179    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
180    boolean munkresForSecondaryAndTertiary) throws IOException {
181    // Get the all the regions for the current table
182    List<RegionInfo> regions = assignmentSnapshot.getTableToRegionMap().get(tableName);
183    int numRegions = regions.size();
184
185    // Get the current assignment map
186    Map<RegionInfo, ServerName> currentAssignmentMap =
187      assignmentSnapshot.getRegionToRegionServerMap();
188
189    // Get the all the region servers
190    List<ServerName> servers = new ArrayList<>();
191    try (Admin admin = this.connection.getAdmin()) {
192      servers.addAll(admin.getRegionServers());
193    }
194
195    LOG.info("Start to generate assignment plan for " + numRegions + " regions from table "
196      + tableName + " with " + servers.size() + " region servers");
197
198    int slotsPerServer = (int) Math.ceil((float) numRegions / servers.size());
199    int regionSlots = slotsPerServer * servers.size();
200
201    // Compute the primary, secondary and tertiary costs for each region/server
202    // pair. These costs are based only on node locality and rack locality, and
203    // will be modified later.
204    float[][] primaryCost = new float[numRegions][regionSlots];
205    float[][] secondaryCost = new float[numRegions][regionSlots];
206    float[][] tertiaryCost = new float[numRegions][regionSlots];
207
208    if (this.enforceLocality && regionLocalityMap != null) {
209      // Transform the locality mapping into a 2D array, assuming that any
210      // unspecified locality value is 0.
211      float[][] localityPerServer = new float[numRegions][regionSlots];
212      for (int i = 0; i < numRegions; i++) {
213        Map<String, Float> serverLocalityMap =
214          regionLocalityMap.get(regions.get(i).getEncodedName());
215        if (serverLocalityMap == null) {
216          continue;
217        }
218        for (int j = 0; j < servers.size(); j++) {
219          String serverName = servers.get(j).getHostname();
220          if (serverName == null) {
221            continue;
222          }
223          Float locality = serverLocalityMap.get(serverName);
224          if (locality == null) {
225            continue;
226          }
227          for (int k = 0; k < slotsPerServer; k++) {
228            // If we can't find the locality of a region to a server, which occurs
229            // because locality is only reported for servers which have some
230            // blocks of a region local, then the locality for that pair is 0.
231            localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
232          }
233        }
234      }
235
236      // Compute the total rack locality for each region in each rack. The total
237      // rack locality is the sum of the localities of a region on all servers in
238      // a rack.
239      Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
240      for (int i = 0; i < numRegions; i++) {
241        RegionInfo region = regions.get(i);
242        for (int j = 0; j < regionSlots; j += slotsPerServer) {
243          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
244          Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
245          if (rackLocality == null) {
246            rackLocality = new HashMap<>();
247            rackRegionLocality.put(rack, rackLocality);
248          }
249          Float localityObj = rackLocality.get(region);
250          float locality = localityObj == null ? 0 : localityObj.floatValue();
251          locality += localityPerServer[i][j];
252          rackLocality.put(region, locality);
253        }
254      }
255      for (int i = 0; i < numRegions; i++) {
256        for (int j = 0; j < regionSlots; j++) {
257          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
258          Float totalRackLocalityObj = rackRegionLocality.get(rack).get(regions.get(i));
259          float totalRackLocality =
260            totalRackLocalityObj == null ? 0 : totalRackLocalityObj.floatValue();
261
262          // Primary cost aims to favor servers with high node locality and low
263          // rack locality, so that secondaries and tertiaries can be chosen for
264          // nodes with high rack locality. This might give primaries with
265          // slightly less locality at first compared to a cost which only
266          // considers the node locality, but should be better in the long run.
267          primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - totalRackLocality);
268
269          // Secondary cost aims to favor servers with high node locality and high
270          // rack locality since the tertiary will be chosen from the same rack as
271          // the secondary. This could be negative, but that is okay.
272          secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
273
274          // Tertiary cost is only concerned with the node locality. It will later
275          // be restricted to only hosts on the same rack as the secondary.
276          tertiaryCost[i][j] = 1 - localityPerServer[i][j];
277        }
278      }
279    }
280
281    if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
282      // We want to minimize the number of regions which move as the result of a
283      // new assignment. Therefore, slightly penalize any placement which is for
284      // a host that is not currently serving the region.
285      for (int i = 0; i < numRegions; i++) {
286        for (int j = 0; j < servers.size(); j++) {
287          ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
288          if (currentAddress != null && !currentAddress.equals(servers.get(j))) {
289            for (int k = 0; k < slotsPerServer; k++) {
290              primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
291            }
292          }
293        }
294      }
295    }
296
297    // Artificially increase cost of last slot of each server to evenly
298    // distribute the slop, otherwise there will be a few servers with too few
299    // regions and many servers with the max number of regions.
300    for (int i = 0; i < numRegions; i++) {
301      for (int j = 0; j < regionSlots; j += slotsPerServer) {
302        primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
303        secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
304        tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
305      }
306    }
307
308    RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
309    primaryCost = randomizedMatrix.transform(primaryCost);
310    int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
311    primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
312
313    // Modify the secondary and tertiary costs for each region/server pair to
314    // prevent a region from being assigned to the same rack for both primary
315    // and either one of secondary or tertiary.
316    for (int i = 0; i < numRegions; i++) {
317      int slot = primaryAssignment[i];
318      String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
319      for (int k = 0; k < servers.size(); k++) {
320        if (!rackManager.getRack(servers.get(k)).equals(rack)) {
321          continue;
322        }
323        if (k == slot / slotsPerServer) {
324          // Same node, do not place secondary or tertiary here ever.
325          for (int m = 0; m < slotsPerServer; m++) {
326            secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
327            tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
328          }
329        } else {
330          // Same rack, do not place secondary or tertiary here if possible.
331          for (int m = 0; m < slotsPerServer; m++) {
332            secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
333            tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
334          }
335        }
336      }
337    }
338    if (munkresForSecondaryAndTertiary) {
339      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
340      secondaryCost = randomizedMatrix.transform(secondaryCost);
341      int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
342      secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
343
344      // Modify the tertiary costs for each region/server pair to ensure that a
345      // region is assigned to a tertiary server on the same rack as its secondary
346      // server, but not the same server in that rack.
347      for (int i = 0; i < numRegions; i++) {
348        int slot = secondaryAssignment[i];
349        String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
350        for (int k = 0; k < servers.size(); k++) {
351          if (k == slot / slotsPerServer) {
352            // Same node, do not place tertiary here ever.
353            for (int m = 0; m < slotsPerServer; m++) {
354              tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
355            }
356          } else {
357            if (rackManager.getRack(servers.get(k)).equals(rack)) {
358              continue;
359            }
360            // Different rack, do not place tertiary here if possible.
361            for (int m = 0; m < slotsPerServer; m++) {
362              tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
363            }
364          }
365        }
366      }
367
368      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
369      tertiaryCost = randomizedMatrix.transform(tertiaryCost);
370      int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
371      tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
372
373      for (int i = 0; i < numRegions; i++) {
374        List<ServerName> favoredServers =
375          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
376        ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
377        favoredServers
378          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
379
380        s = servers.get(secondaryAssignment[i] / slotsPerServer);
381        favoredServers
382          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
383
384        s = servers.get(tertiaryAssignment[i] / slotsPerServer);
385        favoredServers
386          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
387        // Update the assignment plan
388        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
389      }
390      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
391        + tableName + " with " + servers.size() + " region servers");
392      LOG.info("Assignment plan for secondary and tertiary generated " + "using MunkresAssignment");
393    } else {
394      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
395      for (int i = 0; i < numRegions; i++) {
396        primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
397      }
398      FavoredNodeAssignmentHelper favoredNodeHelper =
399        new FavoredNodeAssignmentHelper(servers, conf);
400      favoredNodeHelper.initialize();
401      Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
402        favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
403      for (int i = 0; i < numRegions; i++) {
404        List<ServerName> favoredServers =
405          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
406        RegionInfo currentRegion = regions.get(i);
407        ServerName s = primaryRSMap.get(currentRegion);
408        favoredServers
409          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
410
411        ServerName[] secondaryAndTertiary = secondaryAndTertiaryMap.get(currentRegion);
412        s = secondaryAndTertiary[0];
413        favoredServers
414          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
415
416        s = secondaryAndTertiary[1];
417        favoredServers
418          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
419        // Update the assignment plan
420        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
421      }
422      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
423        + tableName + " with " + servers.size() + " region servers");
424      LOG.info("Assignment plan for secondary and tertiary generated "
425        + "using placeSecondaryAndTertiaryWithRestrictions method");
426    }
427  }
428
429  public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
430    // Get the current region assignment snapshot by scanning from the META
431    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = this.getRegionAssignmentSnapshot();
432
433    // Get the region locality map
434    Map<String, Map<String, Float>> regionLocalityMap = null;
435    if (this.enforceLocality) {
436      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
437    }
438    // Initialize the assignment plan
439    FavoredNodesPlan plan = new FavoredNodesPlan();
440
441    // Get the table to region mapping
442    Map<TableName, List<RegionInfo>> tableToRegionMap = assignmentSnapshot.getTableToRegionMap();
443    LOG.info("Start to generate the new assignment plan for the "
444      + +tableToRegionMap.keySet().size() + " tables");
445    for (TableName table : tableToRegionMap.keySet()) {
446      try {
447        if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
448          continue;
449        }
450        // TODO: maybe run the placement in parallel for each table
451        genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
452          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
453      } catch (Exception e) {
454        LOG.error("Get some exceptions for placing primary region server" + "for table " + table
455          + " because " + e);
456      }
457    }
458    LOG.info("Finish to generate the new assignment plan for the "
459      + +tableToRegionMap.keySet().size() + " tables");
460    return plan;
461  }
462
463  /**
464   * Some algorithms for solving the assignment problem may traverse workers or jobs in linear order
465   * which may result in skewing the assignments of the first jobs in the matrix toward the last
466   * workers in the matrix if the costs are uniform. To avoid this kind of clumping, we can
467   * randomize the rows and columns of the cost matrix in a reversible way, such that the solution
468   * to the assignment problem can be interpreted in terms of the original untransformed cost
469   * matrix. Rows and columns are transformed independently such that the elements contained in any
470   * row of the input matrix are the same as the elements in the corresponding output matrix, and
471   * each row has its elements transformed in the same way. Similarly for columns.
472   */
473  protected static class RandomizedMatrix {
474    private final int rows;
475    private final int cols;
476    private final int[] rowTransform;
477    private final int[] rowInverse;
478    private final int[] colTransform;
479    private final int[] colInverse;
480
481    /**
482     * Create a randomization scheme for a matrix of a given size.
483     * @param rows the number of rows in the matrix
484     * @param cols the number of columns in the matrix
485     */
486    public RandomizedMatrix(int rows, int cols) {
487      this.rows = rows;
488      this.cols = cols;
489      Random random = ThreadLocalRandom.current();
490      rowTransform = new int[rows];
491      rowInverse = new int[rows];
492      for (int i = 0; i < rows; i++) {
493        rowTransform[i] = i;
494      }
495      // Shuffle the row indices.
496      for (int i = rows - 1; i >= 0; i--) {
497        int r = random.nextInt(i + 1);
498        int temp = rowTransform[r];
499        rowTransform[r] = rowTransform[i];
500        rowTransform[i] = temp;
501      }
502      // Generate the inverse row indices.
503      for (int i = 0; i < rows; i++) {
504        rowInverse[rowTransform[i]] = i;
505      }
506
507      colTransform = new int[cols];
508      colInverse = new int[cols];
509      for (int i = 0; i < cols; i++) {
510        colTransform[i] = i;
511      }
512      // Shuffle the column indices.
513      for (int i = cols - 1; i >= 0; i--) {
514        int r = random.nextInt(i + 1);
515        int temp = colTransform[r];
516        colTransform[r] = colTransform[i];
517        colTransform[i] = temp;
518      }
519      // Generate the inverse column indices.
520      for (int i = 0; i < cols; i++) {
521        colInverse[colTransform[i]] = i;
522      }
523    }
524
525    /**
526     * Copy a given matrix into a new matrix, transforming each row index and each column index
527     * according to the randomization scheme that was created at construction time.
528     * @param matrix the cost matrix to transform
529     * @return a new matrix with row and column indices transformed
530     */
531    public float[][] transform(float[][] matrix) {
532      float[][] result = new float[rows][cols];
533      for (int i = 0; i < rows; i++) {
534        for (int j = 0; j < cols; j++) {
535          result[rowTransform[i]][colTransform[j]] = matrix[i][j];
536        }
537      }
538      return result;
539    }
540
541    /**
542     * Copy a given matrix into a new matrix, transforming each row index and each column index
543     * according to the inverse of the randomization scheme that was created at construction time.
544     * @param matrix the cost matrix to be inverted
545     * @return a new matrix with row and column indices inverted
546     */
547    public float[][] invert(float[][] matrix) {
548      float[][] result = new float[rows][cols];
549      for (int i = 0; i < rows; i++) {
550        for (int j = 0; j < cols; j++) {
551          result[rowInverse[i]][colInverse[j]] = matrix[i][j];
552        }
553      }
554      return result;
555    }
556
557    /**
558     * Given an array where each element {@code indices[i]} represents the randomized column index
559     * corresponding to randomized row index {@code i}, create a new array with the corresponding
560     * inverted indices.
561     * @param indices an array of transformed indices to be inverted
562     * @return an array of inverted indices
563     */
564    public int[] invertIndices(int[] indices) {
565      int[] result = new int[indices.length];
566      for (int i = 0; i < indices.length; i++) {
567        result[rowInverse[i]] = colInverse[indices[i]];
568      }
569      return result;
570    }
571  }
572
573  /**
574   * Print the assignment plan to the system output stream n
575   */
576  public static void printAssignmentPlan(FavoredNodesPlan plan) {
577    if (plan == null) return;
578    LOG.info("========== Start to print the assignment plan ================");
579    // sort the map based on region info
580    Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
581
582    for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
583
584      String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
585      String regionName = entry.getKey();
586      LOG.info("Region: " + regionName);
587      LOG.info("Its favored nodes: " + serverList);
588    }
589    LOG.info("========== Finish to print the assignment plan ================");
590  }
591
592  /**
593   * Update the assignment plan into hbase:meta
594   * @param plan the assignments plan to be updated into hbase:meta
595   * @throws IOException if cannot update assignment plan in hbase:meta
596   */
597  public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) throws IOException {
598    try {
599      LOG.info("Start to update the hbase:meta with the new assignment plan");
600      Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
601      Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
602      Map<String, RegionInfo> regionToRegionInfoMap =
603        getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
604      for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
605        planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
606      }
607
608      FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
609      LOG.info("Updated the hbase:meta with the new assignment plan");
610    } catch (Exception e) {
611      LOG.error(
612        "Failed to update hbase:meta with the new assignment" + "plan because " + e.getMessage());
613    }
614  }
615
616  /**
617   * Update the assignment plan to all the region servers nn
618   */
619  private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) throws IOException {
620    LOG.info("Start to update the region servers with the new assignment plan");
621    // Get the region to region server map
622    Map<ServerName, List<RegionInfo>> currentAssignment =
623      this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
624
625    // track of the failed and succeeded updates
626    int succeededNum = 0;
627    Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
628
629    for (Map.Entry<ServerName, List<RegionInfo>> entry : currentAssignment.entrySet()) {
630      List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
631      try {
632        // Keep track of the favored updates for the current region server
633        FavoredNodesPlan singleServerPlan = null;
634        // Find out all the updates for the current region server
635        for (RegionInfo region : entry.getValue()) {
636          List<ServerName> favoredServerList = plan.getFavoredNodes(region);
637          if (
638            favoredServerList != null
639              && favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM
640          ) {
641            // Create the single server plan if necessary
642            if (singleServerPlan == null) {
643              singleServerPlan = new FavoredNodesPlan();
644            }
645            // Update the single server update
646            singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
647            regionUpdateInfos.add(new Pair<>(region, favoredServerList));
648          }
649        }
650        if (singleServerPlan != null) {
651          // Update the current region server with its updated favored nodes
652          BlockingInterface currentRegionServer =
653            ((ClusterConnection) this.connection).getAdmin(entry.getKey());
654          UpdateFavoredNodesRequest request =
655            RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
656
657          UpdateFavoredNodesResponse updateFavoredNodesResponse =
658            currentRegionServer.updateFavoredNodes(null, request);
659          LOG.info(
660            "Region server " + ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName()
661              + " has updated " + updateFavoredNodesResponse.getResponse() + " / "
662              + singleServerPlan.size() + " regions with the assignment plan");
663          succeededNum++;
664        }
665      } catch (Exception e) {
666        failedUpdateMap.put(entry.getKey(), e);
667      }
668    }
669    // log the succeeded updates
670    LOG.info("Updated " + succeededNum + " region servers with " + "the new assignment plan");
671
672    // log the failed updates
673    int failedNum = failedUpdateMap.size();
674    if (failedNum != 0) {
675      LOG.error("Failed to update the following + " + failedNum
676        + " region servers with its corresponding favored nodes");
677      for (Map.Entry<ServerName, Exception> entry : failedUpdateMap.entrySet()) {
678        LOG.error("Failed to update " + entry.getKey().getAddress() + " because of "
679          + entry.getValue().getMessage());
680      }
681    }
682  }
683
684  public void updateAssignmentPlan(FavoredNodesPlan plan) throws IOException {
685    LOG.info("Start to update the new assignment plan for the hbase:meta table and"
686      + " the region servers");
687    // Update the new assignment plan to META
688    updateAssignmentPlanToMeta(plan);
689    // Update the new assignment plan to Region Servers
690    updateAssignmentPlanToRegionServers(plan);
691    LOG.info("Finish to update the new assignment plan for the hbase:meta table and"
692      + " the region servers");
693  }
694
695  /**
696   * Return how many regions will move per table since their primary RS will change
697   * @param newPlan - new AssignmentPlan
698   * @return how many primaries will move per table
699   */
700  public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) throws IOException {
701    Map<TableName, Integer> movesPerTable = new HashMap<>();
702    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
703    Map<TableName, List<RegionInfo>> tableToRegions = snapshot.getTableToRegionMap();
704    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
705    Set<TableName> tables = snapshot.getTableSet();
706    for (TableName table : tables) {
707      int movedPrimaries = 0;
708      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
709        continue;
710      }
711      List<RegionInfo> regions = tableToRegions.get(table);
712      for (RegionInfo region : regions) {
713        List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
714        List<ServerName> newServers = newPlan.getFavoredNodes(region);
715        if (oldServers != null && newServers != null) {
716          ServerName oldPrimary = oldServers.get(0);
717          ServerName newPrimary = newServers.get(0);
718          if (oldPrimary.compareTo(newPrimary) != 0) {
719            movedPrimaries++;
720          }
721        }
722      }
723      movesPerTable.put(table, movedPrimaries);
724    }
725    return movesPerTable;
726  }
727
728  /**
729   * Compares two plans and check whether the locality dropped or increased (prints the information
730   * as a string) also prints the baseline locality
731   * @param movesPerTable     - how many primary regions will move per table
732   * @param regionLocalityMap - locality map from FS
733   * @param newPlan           - new assignment plan n
734   */
735  public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
736    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
737    throws IOException {
738    // localities for primary, secondary and tertiary
739    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
740    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
741    Set<TableName> tables = snapshot.getTableSet();
742    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
743    for (TableName table : tables) {
744      float[] deltaLocality = new float[3];
745      float[] locality = new float[3];
746      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
747        continue;
748      }
749      List<RegionInfo> regions = tableToRegionsMap.get(table);
750      System.out.println("==================================================");
751      System.out.println("Assignment Plan Projection Report For Table: " + table);
752      System.out.println("\t Total regions: " + regions.size());
753      System.out.println(
754        "\t" + movesPerTable.get(table) + " primaries will move due to their primary has changed");
755      for (RegionInfo currentRegion : regions) {
756        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
757        if (regionLocality == null) {
758          continue;
759        }
760        List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
761        List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
762        if (newServers != null && oldServers != null) {
763          int i = 0;
764          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
765            ServerName newServer = newServers.get(p.ordinal());
766            ServerName oldServer = oldServers.get(p.ordinal());
767            Float oldLocality = 0f;
768            if (oldServers != null) {
769              oldLocality = regionLocality.get(oldServer.getHostname());
770              if (oldLocality == null) {
771                oldLocality = 0f;
772              }
773              locality[i] += oldLocality;
774            }
775            Float newLocality = regionLocality.get(newServer.getHostname());
776            if (newLocality == null) {
777              newLocality = 0f;
778            }
779            deltaLocality[i] += newLocality - oldLocality;
780            i++;
781          }
782        }
783      }
784      DecimalFormat df = new java.text.DecimalFormat("#.##");
785      for (int i = 0; i < deltaLocality.length; i++) {
786        System.out.print("\t\t Baseline locality for ");
787        if (i == 0) {
788          System.out.print("primary ");
789        } else if (i == 1) {
790          System.out.print("secondary ");
791        } else if (i == 2) {
792          System.out.print("tertiary ");
793        }
794        System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
795        System.out.print("\t\t Locality will change with the new plan: ");
796        System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + "%");
797      }
798      System.out.println("\t Baseline dispersion");
799      printDispersionScores(table, snapshot, regions.size(), null, true);
800      System.out.println("\t Projected dispersion");
801      printDispersionScores(table, snapshot, regions.size(), newPlan, true);
802    }
803  }
804
805  public void printDispersionScores(TableName table, SnapshotOfRegionAssignmentFromMeta snapshot,
806    int numRegions, FavoredNodesPlan newPlan, boolean simplePrint) {
807    if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
808      return;
809    }
810    AssignmentVerificationReport report = new AssignmentVerificationReport();
811    report.fillUpDispersion(table, snapshot, newPlan);
812    List<Float> dispersion = report.getDispersionInformation();
813    if (simplePrint) {
814      DecimalFormat df = new java.text.DecimalFormat("#.##");
815      System.out.println("\tAvg dispersion score: " + df.format(dispersion.get(0))
816        + " hosts;\tMax dispersion score: " + df.format(dispersion.get(1))
817        + " hosts;\tMin dispersion score: " + df.format(dispersion.get(2)) + " hosts;");
818    } else {
819      LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
820        + " ; The average dispersion score is " + dispersion.get(0));
821    }
822  }
823
824  public void printLocalityAndDispersionForCurrentPlan(
825    Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
826    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
827    FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
828    Set<TableName> tables = snapshot.getTableSet();
829    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
830    for (TableName table : tables) {
831      float[] locality = new float[3];
832      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
833        continue;
834      }
835      List<RegionInfo> regions = tableToRegionsMap.get(table);
836      for (RegionInfo currentRegion : regions) {
837        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
838        if (regionLocality == null) {
839          continue;
840        }
841        List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
842        if (servers != null) {
843          int i = 0;
844          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
845            ServerName server = servers.get(p.ordinal());
846            Float currentLocality = 0f;
847            if (servers != null) {
848              currentLocality = regionLocality.get(server.getHostname());
849              if (currentLocality == null) {
850                currentLocality = 0f;
851              }
852              locality[i] += currentLocality;
853            }
854            i++;
855          }
856        }
857      }
858      for (int i = 0; i < locality.length; i++) {
859        String copy = null;
860        if (i == 0) {
861          copy = "primary";
862        } else if (i == 1) {
863          copy = "secondary";
864        } else if (i == 2) {
865          copy = "tertiary";
866        }
867        float avgLocality = 100 * locality[i] / regions.size();
868        LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
869          + " ; The average locality for " + copy + " is " + avgLocality + " %");
870      }
871      printDispersionScores(table, snapshot, regions.size(), null, false);
872    }
873  }
874
875  /**
876   * @param favoredNodesStr The String of favored nodes
877   * @return the list of ServerName for the byte array of favored nodes.
878   */
879  public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
880    String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
881    if (favoredNodesArray == null) return null;
882
883    List<ServerName> serverList = new ArrayList<>();
884    for (String hostNameAndPort : favoredNodesArray) {
885      serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
886    }
887    return serverList;
888  }
889
890  public static void main(String args[]) throws IOException {
891    Options opt = new Options();
892    opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
893    opt.addOption("u", "update", false,
894      "update the assignments to hbase:meta and RegionServers together");
895    opt.addOption("n", "dry-run", false, "do not write assignments to META");
896    opt.addOption("v", "verify", false, "verify current assignments against META");
897    opt.addOption("p", "print", false, "print the current assignment plan in META");
898    opt.addOption("h", "help", false, "print usage");
899    opt.addOption("d", "verification-details", false, "print the details of verification report");
900
901    opt.addOption("zk", true, "to set the zookeeper quorum");
902    opt.addOption("fs", true, "to set HDFS");
903    opt.addOption("hbase_root", true, "to set hbase_root directory");
904
905    opt.addOption("overwrite", false, "overwrite the favored nodes for a single region,"
906      + "for example: -update -r regionName -f server1:port,server2:port,server3:port");
907    opt.addOption("r", true, "The region name that needs to be updated");
908    opt.addOption("f", true, "The new favored nodes");
909
910    opt.addOption("tables", true,
911      "The list of table names splitted by ',' ;" + "For example: -tables: t1,t2,...,tn");
912    opt.addOption("l", "locality", true, "enforce the maximum locality");
913    opt.addOption("m", "min-move", true, "enforce minimum assignment move");
914    opt.addOption("diff", false, "calculate difference between assignment plans");
915    opt.addOption("munkres", false, "use munkres to place secondaries and tertiaries");
916    opt.addOption("ld", "locality-dispersion", false,
917      "print locality and dispersion " + "information for current plan");
918    try {
919      CommandLine cmd = new GnuParser().parse(opt, args);
920      Configuration conf = HBaseConfiguration.create();
921
922      boolean enforceMinAssignmentMove = true;
923      boolean enforceLocality = true;
924      boolean verificationDetails = false;
925
926      // Read all the options
927      if (
928        (cmd.hasOption("l") && cmd.getOptionValue("l").equalsIgnoreCase("false"))
929          || (cmd.hasOption("locality") && cmd.getOptionValue("locality").equalsIgnoreCase("false"))
930      ) {
931        enforceLocality = false;
932      }
933
934      if (
935        (cmd.hasOption("m") && cmd.getOptionValue("m").equalsIgnoreCase("false"))
936          || (cmd.hasOption("min-move") && cmd.getOptionValue("min-move").equalsIgnoreCase("false"))
937      ) {
938        enforceMinAssignmentMove = false;
939      }
940
941      if (cmd.hasOption("zk")) {
942        conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
943        LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
944      }
945
946      if (cmd.hasOption("fs")) {
947        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
948        LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
949      }
950
951      if (cmd.hasOption("hbase_root")) {
952        conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
953        LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
954      }
955
956      // Create the region placement obj
957      RegionPlacementMaintainer rp =
958        new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove);
959
960      if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
961        verificationDetails = true;
962      }
963
964      if (cmd.hasOption("tables")) {
965        String tableNameListStr = cmd.getOptionValue("tables");
966        String[] tableNames = StringUtils.split(tableNameListStr, ",");
967        rp.setTargetTableName(tableNames);
968      }
969
970      if (cmd.hasOption("munkres")) {
971        USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
972      }
973
974      // Read all the modes
975      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
976        // Verify the region placement.
977        rp.verifyRegionPlacement(verificationDetails);
978      } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
979        // Generate the assignment plan only without updating the hbase:meta and RS
980        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
981        printAssignmentPlan(plan);
982      } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
983        // Generate the new assignment plan
984        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
985        // Print the new assignment plan
986        printAssignmentPlan(plan);
987        // Write the new assignment plan to META
988        rp.updateAssignmentPlanToMeta(plan);
989      } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
990        // Generate the new assignment plan
991        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
992        // Print the new assignment plan
993        printAssignmentPlan(plan);
994        // Update the assignment to hbase:meta and Region Servers
995        rp.updateAssignmentPlan(plan);
996      } else if (cmd.hasOption("diff")) {
997        FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
998        Map<String, Map<String, Float>> locality =
999          FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1000        Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1001        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1002        System.out.println("Do you want to update the assignment plan? [y/n]");
1003        Scanner s = new Scanner(System.in);
1004        String input = s.nextLine().trim();
1005        if (input.equals("y")) {
1006          System.out.println("Updating assignment plan...");
1007          rp.updateAssignmentPlan(newPlan);
1008        }
1009        s.close();
1010      } else if (cmd.hasOption("ld")) {
1011        Map<String, Map<String, Float>> locality =
1012          FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1013        rp.printLocalityAndDispersionForCurrentPlan(locality);
1014      } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1015        FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1016        printAssignmentPlan(plan);
1017      } else if (cmd.hasOption("overwrite")) {
1018        if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1019          throw new IllegalArgumentException("Please specify: "
1020            + " -update -r regionName -f server1:port,server2:port,server3:port");
1021        }
1022
1023        String regionName = cmd.getOptionValue("r");
1024        String favoredNodesStr = cmd.getOptionValue("f");
1025        LOG.info("Going to update the region " + regionName + " with the new favored nodes "
1026          + favoredNodesStr);
1027        List<ServerName> favoredNodes = null;
1028        RegionInfo regionInfo =
1029          rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1030        if (regionInfo == null) {
1031          LOG.error("Cannot find the region " + regionName + " from the META");
1032        } else {
1033          try {
1034            favoredNodes = getFavoredNodeList(favoredNodesStr);
1035          } catch (IllegalArgumentException e) {
1036            LOG.error("Cannot parse the invalid favored nodes because " + e);
1037          }
1038          FavoredNodesPlan newPlan = new FavoredNodesPlan();
1039          newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1040          rp.updateAssignmentPlan(newPlan);
1041        }
1042      } else {
1043        printHelp(opt);
1044      }
1045    } catch (ParseException e) {
1046      printHelp(opt);
1047    }
1048  }
1049}