001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.text.DecimalFormat;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Random;
029import java.util.Scanner;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.concurrent.ThreadLocalRandom;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.AsyncClusterConnection;
041import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
042import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
046import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
047import org.apache.hadoop.hbase.security.User;
048import org.apache.hadoop.hbase.util.FSUtils;
049import org.apache.hadoop.hbase.util.FutureUtils;
050import org.apache.hadoop.hbase.util.MunkresAssignment;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
058import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
059import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
060import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
061import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
062
063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
066
067/**
068 * A tool that is used for manipulating and viewing favored nodes information for regions. Run with
069 * -h to get a list of the options
070 */
071@InterfaceAudience.Private
072// TODO: Remove? Unused. Partially implemented only.
073public class RegionPlacementMaintainer implements Closeable {
074  private static final Logger LOG =
075    LoggerFactory.getLogger(RegionPlacementMaintainer.class.getName());
076  // The cost of a placement that should never be assigned.
077  private static final float MAX_COST = Float.POSITIVE_INFINITY;
078
079  // The cost of a placement that is undesirable but acceptable.
080  private static final float AVOID_COST = 100000f;
081
082  // The amount by which the cost of a placement is increased if it is the
083  // last slot of the server. This is done to more evenly distribute the slop
084  // amongst servers.
085  private static final float LAST_SLOT_COST_PENALTY = 0.5f;
086
087  // The amount by which the cost of a primary placement is penalized if it is
088  // not the host currently serving the region. This is done to minimize moves.
089  private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
090
091  private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
092
093  private Configuration conf;
094  private final boolean enforceLocality;
095  private final boolean enforceMinAssignmentMove;
096  private RackManager rackManager;
097  private Set<TableName> targetTableSet;
098  private AsyncClusterConnection connection;
099
100  public RegionPlacementMaintainer(Configuration conf) throws IOException {
101    this(conf, true, true);
102  }
103
104  public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
105    boolean enforceMinAssignmentMove) {
106    this.conf = conf;
107    this.enforceLocality = enforceLocality;
108    this.enforceMinAssignmentMove = enforceMinAssignmentMove;
109    this.targetTableSet = new HashSet<>();
110    this.rackManager = new RackManager(conf);
111  }
112
113  private static void printHelp(Options opt) {
114    new HelpFormatter().printHelp(
115      "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes "
116        + "-diff>" + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]"
117        + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]",
118      opt);
119  }
120
121  private AsyncClusterConnection getConnection() throws IOException {
122    if (connection == null) {
123      connection =
124        ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
125    }
126    return connection;
127  }
128
129  public void setTargetTableName(String[] tableNames) {
130    if (tableNames != null) {
131      for (String table : tableNames)
132        this.targetTableSet.add(TableName.valueOf(table));
133    }
134  }
135
136  /** Returns the new RegionAssignmentSnapshot */
137  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
138    SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
139      new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
140    currentAssignmentShapshot.initialize();
141    return currentAssignmentShapshot;
142  }
143
144  /**
145   * Verify the region placement is consistent with the assignment plan
146   */
147  public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
148    throws IOException {
149    System.out
150      .println("Start to verify the region assignment and " + "generate the verification report");
151    // Get the region assignment snapshot
152    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
153
154    // Get all the tables
155    Set<TableName> tables = snapshot.getTableSet();
156
157    // Get the region locality map
158    Map<String, Map<String, Float>> regionLocalityMap = null;
159    if (this.enforceLocality == true) {
160      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
161    }
162    List<AssignmentVerificationReport> reports = new ArrayList<>();
163    // Iterate all the tables to fill up the verification report
164    for (TableName table : tables) {
165      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
166        continue;
167      }
168      AssignmentVerificationReport report = new AssignmentVerificationReport();
169      report.fillUp(table, snapshot, regionLocalityMap);
170      report.print(isDetailMode);
171      reports.add(report);
172    }
173    return reports;
174  }
175
176  /**
177   * Generate the assignment plan for the existing table
178   * @param munkresForSecondaryAndTertiary if set on true the assignment plan for the tertiary and
179   *                                       secondary will be generated with Munkres algorithm,
180   *                                       otherwise will be generated using
181   *                                       placeSecondaryAndTertiaryRS
182   */
183  private void genAssignmentPlan(TableName tableName,
184    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
185    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
186    boolean munkresForSecondaryAndTertiary) throws IOException {
187    // Get the all the regions for the current table
188    List<RegionInfo> regions = assignmentSnapshot.getTableToRegionMap().get(tableName);
189    int numRegions = regions.size();
190
191    // Get the current assignment map
192    Map<RegionInfo, ServerName> currentAssignmentMap =
193      assignmentSnapshot.getRegionToRegionServerMap();
194
195    // Get the all the region servers
196    List<ServerName> servers = new ArrayList<>();
197    servers.addAll(FutureUtils.get(getConnection().getAdmin().getRegionServers()));
198
199    LOG.info("Start to generate assignment plan for " + numRegions + " regions from table "
200      + tableName + " with " + servers.size() + " region servers");
201
202    int slotsPerServer = (int) Math.ceil((float) numRegions / servers.size());
203    int regionSlots = slotsPerServer * servers.size();
204
205    // Compute the primary, secondary and tertiary costs for each region/server
206    // pair. These costs are based only on node locality and rack locality, and
207    // will be modified later.
208    float[][] primaryCost = new float[numRegions][regionSlots];
209    float[][] secondaryCost = new float[numRegions][regionSlots];
210    float[][] tertiaryCost = new float[numRegions][regionSlots];
211
212    if (this.enforceLocality && regionLocalityMap != null) {
213      // Transform the locality mapping into a 2D array, assuming that any
214      // unspecified locality value is 0.
215      float[][] localityPerServer = new float[numRegions][regionSlots];
216      for (int i = 0; i < numRegions; i++) {
217        Map<String, Float> serverLocalityMap =
218          regionLocalityMap.get(regions.get(i).getEncodedName());
219        if (serverLocalityMap == null) {
220          continue;
221        }
222        for (int j = 0; j < servers.size(); j++) {
223          String serverName = servers.get(j).getHostname();
224          if (serverName == null) {
225            continue;
226          }
227          Float locality = serverLocalityMap.get(serverName);
228          if (locality == null) {
229            continue;
230          }
231          for (int k = 0; k < slotsPerServer; k++) {
232            // If we can't find the locality of a region to a server, which occurs
233            // because locality is only reported for servers which have some
234            // blocks of a region local, then the locality for that pair is 0.
235            localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
236          }
237        }
238      }
239
240      // Compute the total rack locality for each region in each rack. The total
241      // rack locality is the sum of the localities of a region on all servers in
242      // a rack.
243      Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
244      for (int i = 0; i < numRegions; i++) {
245        RegionInfo region = regions.get(i);
246        for (int j = 0; j < regionSlots; j += slotsPerServer) {
247          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
248          Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
249          if (rackLocality == null) {
250            rackLocality = new HashMap<>();
251            rackRegionLocality.put(rack, rackLocality);
252          }
253          Float localityObj = rackLocality.get(region);
254          float locality = localityObj == null ? 0 : localityObj.floatValue();
255          locality += localityPerServer[i][j];
256          rackLocality.put(region, locality);
257        }
258      }
259      for (int i = 0; i < numRegions; i++) {
260        for (int j = 0; j < regionSlots; j++) {
261          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
262          Float totalRackLocalityObj = rackRegionLocality.get(rack).get(regions.get(i));
263          float totalRackLocality =
264            totalRackLocalityObj == null ? 0 : totalRackLocalityObj.floatValue();
265
266          // Primary cost aims to favor servers with high node locality and low
267          // rack locality, so that secondaries and tertiaries can be chosen for
268          // nodes with high rack locality. This might give primaries with
269          // slightly less locality at first compared to a cost which only
270          // considers the node locality, but should be better in the long run.
271          primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - totalRackLocality);
272
273          // Secondary cost aims to favor servers with high node locality and high
274          // rack locality since the tertiary will be chosen from the same rack as
275          // the secondary. This could be negative, but that is okay.
276          secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
277
278          // Tertiary cost is only concerned with the node locality. It will later
279          // be restricted to only hosts on the same rack as the secondary.
280          tertiaryCost[i][j] = 1 - localityPerServer[i][j];
281        }
282      }
283    }
284
285    if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
286      // We want to minimize the number of regions which move as the result of a
287      // new assignment. Therefore, slightly penalize any placement which is for
288      // a host that is not currently serving the region.
289      for (int i = 0; i < numRegions; i++) {
290        for (int j = 0; j < servers.size(); j++) {
291          ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
292          if (currentAddress != null && !currentAddress.equals(servers.get(j))) {
293            for (int k = 0; k < slotsPerServer; k++) {
294              primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
295            }
296          }
297        }
298      }
299    }
300
301    // Artificially increase cost of last slot of each server to evenly
302    // distribute the slop, otherwise there will be a few servers with too few
303    // regions and many servers with the max number of regions.
304    for (int i = 0; i < numRegions; i++) {
305      for (int j = 0; j < regionSlots; j += slotsPerServer) {
306        primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
307        secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
308        tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
309      }
310    }
311
312    RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
313    primaryCost = randomizedMatrix.transform(primaryCost);
314    int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
315    primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
316
317    // Modify the secondary and tertiary costs for each region/server pair to
318    // prevent a region from being assigned to the same rack for both primary
319    // and either one of secondary or tertiary.
320    for (int i = 0; i < numRegions; i++) {
321      int slot = primaryAssignment[i];
322      String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
323      for (int k = 0; k < servers.size(); k++) {
324        if (!rackManager.getRack(servers.get(k)).equals(rack)) {
325          continue;
326        }
327        if (k == slot / slotsPerServer) {
328          // Same node, do not place secondary or tertiary here ever.
329          for (int m = 0; m < slotsPerServer; m++) {
330            secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
331            tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
332          }
333        } else {
334          // Same rack, do not place secondary or tertiary here if possible.
335          for (int m = 0; m < slotsPerServer; m++) {
336            secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
337            tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
338          }
339        }
340      }
341    }
342    if (munkresForSecondaryAndTertiary) {
343      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
344      secondaryCost = randomizedMatrix.transform(secondaryCost);
345      int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
346      secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
347
348      // Modify the tertiary costs for each region/server pair to ensure that a
349      // region is assigned to a tertiary server on the same rack as its secondary
350      // server, but not the same server in that rack.
351      for (int i = 0; i < numRegions; i++) {
352        int slot = secondaryAssignment[i];
353        String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
354        for (int k = 0; k < servers.size(); k++) {
355          if (k == slot / slotsPerServer) {
356            // Same node, do not place tertiary here ever.
357            for (int m = 0; m < slotsPerServer; m++) {
358              tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
359            }
360          } else {
361            if (rackManager.getRack(servers.get(k)).equals(rack)) {
362              continue;
363            }
364            // Different rack, do not place tertiary here if possible.
365            for (int m = 0; m < slotsPerServer; m++) {
366              tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
367            }
368          }
369        }
370      }
371
372      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
373      tertiaryCost = randomizedMatrix.transform(tertiaryCost);
374      int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
375      tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
376
377      for (int i = 0; i < numRegions; i++) {
378        List<ServerName> favoredServers =
379          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
380        ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
381        favoredServers
382          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
383
384        s = servers.get(secondaryAssignment[i] / slotsPerServer);
385        favoredServers
386          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
387
388        s = servers.get(tertiaryAssignment[i] / slotsPerServer);
389        favoredServers
390          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
391        // Update the assignment plan
392        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
393      }
394      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
395        + tableName + " with " + servers.size() + " region servers");
396      LOG.info("Assignment plan for secondary and tertiary generated " + "using MunkresAssignment");
397    } else {
398      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
399      for (int i = 0; i < numRegions; i++) {
400        primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
401      }
402      FavoredNodeAssignmentHelper favoredNodeHelper =
403        new FavoredNodeAssignmentHelper(servers, conf);
404      favoredNodeHelper.initialize();
405      Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
406        favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
407      for (int i = 0; i < numRegions; i++) {
408        List<ServerName> favoredServers =
409          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
410        RegionInfo currentRegion = regions.get(i);
411        ServerName s = primaryRSMap.get(currentRegion);
412        favoredServers
413          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
414
415        ServerName[] secondaryAndTertiary = secondaryAndTertiaryMap.get(currentRegion);
416        s = secondaryAndTertiary[0];
417        favoredServers
418          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
419
420        s = secondaryAndTertiary[1];
421        favoredServers
422          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
423        // Update the assignment plan
424        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
425      }
426      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
427        + tableName + " with " + servers.size() + " region servers");
428      LOG.info("Assignment plan for secondary and tertiary generated "
429        + "using placeSecondaryAndTertiaryWithRestrictions method");
430    }
431  }
432
433  public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
434    // Get the current region assignment snapshot by scanning from the META
435    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = this.getRegionAssignmentSnapshot();
436
437    // Get the region locality map
438    Map<String, Map<String, Float>> regionLocalityMap = null;
439    if (this.enforceLocality) {
440      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
441    }
442    // Initialize the assignment plan
443    FavoredNodesPlan plan = new FavoredNodesPlan();
444
445    // Get the table to region mapping
446    Map<TableName, List<RegionInfo>> tableToRegionMap = assignmentSnapshot.getTableToRegionMap();
447    LOG.info("Start to generate the new assignment plan for the "
448      + +tableToRegionMap.keySet().size() + " tables");
449    for (TableName table : tableToRegionMap.keySet()) {
450      try {
451        if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
452          continue;
453        }
454        // TODO: maybe run the placement in parallel for each table
455        genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
456          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
457      } catch (Exception e) {
458        LOG.error("Get some exceptions for placing primary region server" + "for table " + table
459          + " because " + e);
460      }
461    }
462    LOG.info("Finish to generate the new assignment plan for the "
463      + +tableToRegionMap.keySet().size() + " tables");
464    return plan;
465  }
466
467  @Override
468  public void close() throws IOException {
469    Closeables.close(connection, true);
470  }
471
472  /**
473   * Some algorithms for solving the assignment problem may traverse workers or jobs in linear order
474   * which may result in skewing the assignments of the first jobs in the matrix toward the last
475   * workers in the matrix if the costs are uniform. To avoid this kind of clumping, we can
476   * randomize the rows and columns of the cost matrix in a reversible way, such that the solution
477   * to the assignment problem can be interpreted in terms of the original untransformed cost
478   * matrix. Rows and columns are transformed independently such that the elements contained in any
479   * row of the input matrix are the same as the elements in the corresponding output matrix, and
480   * each row has its elements transformed in the same way. Similarly for columns.
481   */
482  protected static class RandomizedMatrix {
483    private final int rows;
484    private final int cols;
485    private final int[] rowTransform;
486    private final int[] rowInverse;
487    private final int[] colTransform;
488    private final int[] colInverse;
489
490    /**
491     * Create a randomization scheme for a matrix of a given size.
492     * @param rows the number of rows in the matrix
493     * @param cols the number of columns in the matrix
494     */
495    public RandomizedMatrix(int rows, int cols) {
496      this.rows = rows;
497      this.cols = cols;
498      Random random = ThreadLocalRandom.current();
499      rowTransform = new int[rows];
500      rowInverse = new int[rows];
501      for (int i = 0; i < rows; i++) {
502        rowTransform[i] = i;
503      }
504      // Shuffle the row indices.
505      for (int i = rows - 1; i >= 0; i--) {
506        int r = random.nextInt(i + 1);
507        int temp = rowTransform[r];
508        rowTransform[r] = rowTransform[i];
509        rowTransform[i] = temp;
510      }
511      // Generate the inverse row indices.
512      for (int i = 0; i < rows; i++) {
513        rowInverse[rowTransform[i]] = i;
514      }
515
516      colTransform = new int[cols];
517      colInverse = new int[cols];
518      for (int i = 0; i < cols; i++) {
519        colTransform[i] = i;
520      }
521      // Shuffle the column indices.
522      for (int i = cols - 1; i >= 0; i--) {
523        int r = random.nextInt(i + 1);
524        int temp = colTransform[r];
525        colTransform[r] = colTransform[i];
526        colTransform[i] = temp;
527      }
528      // Generate the inverse column indices.
529      for (int i = 0; i < cols; i++) {
530        colInverse[colTransform[i]] = i;
531      }
532    }
533
534    /**
535     * Copy a given matrix into a new matrix, transforming each row index and each column index
536     * according to the randomization scheme that was created at construction time.
537     * @param matrix the cost matrix to transform
538     * @return a new matrix with row and column indices transformed
539     */
540    public float[][] transform(float[][] matrix) {
541      float[][] result = new float[rows][cols];
542      for (int i = 0; i < rows; i++) {
543        for (int j = 0; j < cols; j++) {
544          result[rowTransform[i]][colTransform[j]] = matrix[i][j];
545        }
546      }
547      return result;
548    }
549
550    /**
551     * Copy a given matrix into a new matrix, transforming each row index and each column index
552     * according to the inverse of the randomization scheme that was created at construction time.
553     * @param matrix the cost matrix to be inverted
554     * @return a new matrix with row and column indices inverted
555     */
556    public float[][] invert(float[][] matrix) {
557      float[][] result = new float[rows][cols];
558      for (int i = 0; i < rows; i++) {
559        for (int j = 0; j < cols; j++) {
560          result[rowInverse[i]][colInverse[j]] = matrix[i][j];
561        }
562      }
563      return result;
564    }
565
566    /**
567     * Given an array where each element {@code indices[i]} represents the randomized column index
568     * corresponding to randomized row index {@code i}, create a new array with the corresponding
569     * inverted indices.
570     * @param indices an array of transformed indices to be inverted
571     * @return an array of inverted indices
572     */
573    public int[] invertIndices(int[] indices) {
574      int[] result = new int[indices.length];
575      for (int i = 0; i < indices.length; i++) {
576        result[rowInverse[i]] = colInverse[indices[i]];
577      }
578      return result;
579    }
580  }
581
582  /**
583   * Print the assignment plan to the system output stream
584   */
585  public static void printAssignmentPlan(FavoredNodesPlan plan) {
586    if (plan == null) return;
587    LOG.info("========== Start to print the assignment plan ================");
588    // sort the map based on region info
589    Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
590
591    for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
592
593      String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
594      String regionName = entry.getKey();
595      LOG.info("Region: " + regionName);
596      LOG.info("Its favored nodes: " + serverList);
597    }
598    LOG.info("========== Finish to print the assignment plan ================");
599  }
600
601  /**
602   * Update the assignment plan into hbase:meta
603   * @param plan the assignments plan to be updated into hbase:meta
604   * @throws IOException if cannot update assignment plan in hbase:meta
605   */
606  public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) throws IOException {
607    try {
608      LOG.info("Start to update the hbase:meta with the new assignment plan");
609      Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
610      Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
611      Map<String, RegionInfo> regionToRegionInfoMap =
612        getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
613      for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
614        planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
615      }
616
617      FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
618      LOG.info("Updated the hbase:meta with the new assignment plan");
619    } catch (Exception e) {
620      LOG.error(
621        "Failed to update hbase:meta with the new assignment" + "plan because " + e.getMessage());
622    }
623  }
624
625  /**
626   * Update the assignment plan to all the region servers
627   */
628  private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) throws IOException {
629    LOG.info("Start to update the region servers with the new assignment plan");
630    // Get the region to region server map
631    Map<ServerName, List<RegionInfo>> currentAssignment =
632      this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
633
634    // track of the failed and succeeded updates
635    int succeededNum = 0;
636    Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
637
638    for (Map.Entry<ServerName, List<RegionInfo>> entry : currentAssignment.entrySet()) {
639      List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
640      try {
641        // Keep track of the favored updates for the current region server
642        FavoredNodesPlan singleServerPlan = null;
643        // Find out all the updates for the current region server
644        for (RegionInfo region : entry.getValue()) {
645          List<ServerName> favoredServerList = plan.getFavoredNodes(region);
646          if (
647            favoredServerList != null
648              && favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM
649          ) {
650            // Create the single server plan if necessary
651            if (singleServerPlan == null) {
652              singleServerPlan = new FavoredNodesPlan();
653            }
654            // Update the single server update
655            singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
656            regionUpdateInfos.add(new Pair<>(region, favoredServerList));
657          }
658        }
659        if (singleServerPlan != null) {
660          // Update the current region server with its updated favored nodes
661          AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
662          UpdateFavoredNodesRequest request =
663            RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
664          UpdateFavoredNodesResponse updateFavoredNodesResponse =
665            FutureUtils.get(rsAdmin.updateFavoredNodes(request));
666          LOG.info("Region server "
667            + FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
668              .getServerInfo()
669            + " has updated " + updateFavoredNodesResponse.getResponse() + " / "
670            + singleServerPlan.size() + " regions with the assignment plan");
671          succeededNum++;
672        }
673      } catch (Exception e) {
674        failedUpdateMap.put(entry.getKey(), e);
675      }
676    }
677    // log the succeeded updates
678    LOG.info("Updated " + succeededNum + " region servers with " + "the new assignment plan");
679
680    // log the failed updates
681    int failedNum = failedUpdateMap.size();
682    if (failedNum != 0) {
683      LOG.error("Failed to update the following + " + failedNum
684        + " region servers with its corresponding favored nodes");
685      for (Map.Entry<ServerName, Exception> entry : failedUpdateMap.entrySet()) {
686        LOG.error("Failed to update " + entry.getKey().getAddress() + " because of "
687          + entry.getValue().getMessage());
688      }
689    }
690  }
691
692  public void updateAssignmentPlan(FavoredNodesPlan plan) throws IOException {
693    LOG.info("Start to update the new assignment plan for the hbase:meta table and"
694      + " the region servers");
695    // Update the new assignment plan to META
696    updateAssignmentPlanToMeta(plan);
697    // Update the new assignment plan to Region Servers
698    updateAssignmentPlanToRegionServers(plan);
699    LOG.info("Finish to update the new assignment plan for the hbase:meta table and"
700      + " the region servers");
701  }
702
703  /**
704   * Return how many regions will move per table since their primary RS will change
705   * @param newPlan - new AssignmentPlan
706   * @return how many primaries will move per table
707   */
708  public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) throws IOException {
709    Map<TableName, Integer> movesPerTable = new HashMap<>();
710    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
711    Map<TableName, List<RegionInfo>> tableToRegions = snapshot.getTableToRegionMap();
712    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
713    Set<TableName> tables = snapshot.getTableSet();
714    for (TableName table : tables) {
715      int movedPrimaries = 0;
716      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
717        continue;
718      }
719      List<RegionInfo> regions = tableToRegions.get(table);
720      for (RegionInfo region : regions) {
721        List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
722        List<ServerName> newServers = newPlan.getFavoredNodes(region);
723        if (oldServers != null && newServers != null) {
724          ServerName oldPrimary = oldServers.get(0);
725          ServerName newPrimary = newServers.get(0);
726          if (oldPrimary.compareTo(newPrimary) != 0) {
727            movedPrimaries++;
728          }
729        }
730      }
731      movesPerTable.put(table, movedPrimaries);
732    }
733    return movesPerTable;
734  }
735
736  /**
737   * Compares two plans and check whether the locality dropped or increased (prints the information
738   * as a string) also prints the baseline locality
739   * @param movesPerTable     - how many primary regions will move per table
740   * @param regionLocalityMap - locality map from FS
741   * @param newPlan           - new assignment plan
742   */
743  public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
744    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
745    throws IOException {
746    // localities for primary, secondary and tertiary
747    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
748    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
749    Set<TableName> tables = snapshot.getTableSet();
750    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
751    for (TableName table : tables) {
752      float[] deltaLocality = new float[3];
753      float[] locality = new float[3];
754      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
755        continue;
756      }
757      List<RegionInfo> regions = tableToRegionsMap.get(table);
758      System.out.println("==================================================");
759      System.out.println("Assignment Plan Projection Report For Table: " + table);
760      System.out.println("\t Total regions: " + regions.size());
761      System.out.println(
762        "\t" + movesPerTable.get(table) + " primaries will move due to their primary has changed");
763      for (RegionInfo currentRegion : regions) {
764        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
765        if (regionLocality == null) {
766          continue;
767        }
768        List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
769        List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
770        if (newServers != null && oldServers != null) {
771          int i = 0;
772          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
773            ServerName newServer = newServers.get(p.ordinal());
774            ServerName oldServer = oldServers.get(p.ordinal());
775            Float oldLocality = 0f;
776            if (oldServers != null) {
777              oldLocality = regionLocality.get(oldServer.getHostname());
778              if (oldLocality == null) {
779                oldLocality = 0f;
780              }
781              locality[i] += oldLocality;
782            }
783            Float newLocality = regionLocality.get(newServer.getHostname());
784            if (newLocality == null) {
785              newLocality = 0f;
786            }
787            deltaLocality[i] += newLocality - oldLocality;
788            i++;
789          }
790        }
791      }
792      DecimalFormat df = new java.text.DecimalFormat("#.##");
793      for (int i = 0; i < deltaLocality.length; i++) {
794        System.out.print("\t\t Baseline locality for ");
795        if (i == 0) {
796          System.out.print("primary ");
797        } else if (i == 1) {
798          System.out.print("secondary ");
799        } else if (i == 2) {
800          System.out.print("tertiary ");
801        }
802        System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
803        System.out.print("\t\t Locality will change with the new plan: ");
804        System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + "%");
805      }
806      System.out.println("\t Baseline dispersion");
807      printDispersionScores(table, snapshot, regions.size(), null, true);
808      System.out.println("\t Projected dispersion");
809      printDispersionScores(table, snapshot, regions.size(), newPlan, true);
810    }
811  }
812
813  public void printDispersionScores(TableName table, SnapshotOfRegionAssignmentFromMeta snapshot,
814    int numRegions, FavoredNodesPlan newPlan, boolean simplePrint) {
815    if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
816      return;
817    }
818    AssignmentVerificationReport report = new AssignmentVerificationReport();
819    report.fillUpDispersion(table, snapshot, newPlan);
820    List<Float> dispersion = report.getDispersionInformation();
821    if (simplePrint) {
822      DecimalFormat df = new java.text.DecimalFormat("#.##");
823      System.out.println("\tAvg dispersion score: " + df.format(dispersion.get(0))
824        + " hosts;\tMax dispersion score: " + df.format(dispersion.get(1))
825        + " hosts;\tMin dispersion score: " + df.format(dispersion.get(2)) + " hosts;");
826    } else {
827      LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
828        + " ; The average dispersion score is " + dispersion.get(0));
829    }
830  }
831
832  public void printLocalityAndDispersionForCurrentPlan(
833    Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
834    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
835    FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
836    Set<TableName> tables = snapshot.getTableSet();
837    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
838    for (TableName table : tables) {
839      float[] locality = new float[3];
840      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
841        continue;
842      }
843      List<RegionInfo> regions = tableToRegionsMap.get(table);
844      for (RegionInfo currentRegion : regions) {
845        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
846        if (regionLocality == null) {
847          continue;
848        }
849        List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
850        if (servers != null) {
851          int i = 0;
852          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
853            ServerName server = servers.get(p.ordinal());
854            Float currentLocality = 0f;
855            if (servers != null) {
856              currentLocality = regionLocality.get(server.getHostname());
857              if (currentLocality == null) {
858                currentLocality = 0f;
859              }
860              locality[i] += currentLocality;
861            }
862            i++;
863          }
864        }
865      }
866      for (int i = 0; i < locality.length; i++) {
867        String copy = null;
868        if (i == 0) {
869          copy = "primary";
870        } else if (i == 1) {
871          copy = "secondary";
872        } else if (i == 2) {
873          copy = "tertiary";
874        }
875        float avgLocality = 100 * locality[i] / regions.size();
876        LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
877          + " ; The average locality for " + copy + " is " + avgLocality + " %");
878      }
879      printDispersionScores(table, snapshot, regions.size(), null, false);
880    }
881  }
882
883  /**
884   * @param favoredNodesStr The String of favored nodes
885   * @return the list of ServerName for the byte array of favored nodes.
886   */
887  public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
888    String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
889    if (favoredNodesArray == null) return null;
890
891    List<ServerName> serverList = new ArrayList<>();
892    for (String hostNameAndPort : favoredNodesArray) {
893      serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
894    }
895    return serverList;
896  }
897
898  public static void main(String args[]) throws IOException {
899    Options opt = new Options();
900    opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
901    opt.addOption("u", "update", false,
902      "update the assignments to hbase:meta and RegionServers together");
903    opt.addOption("n", "dry-run", false, "do not write assignments to META");
904    opt.addOption("v", "verify", false, "verify current assignments against META");
905    opt.addOption("p", "print", false, "print the current assignment plan in META");
906    opt.addOption("h", "help", false, "print usage");
907    opt.addOption("d", "verification-details", false, "print the details of verification report");
908
909    opt.addOption("zk", true, "to set the zookeeper quorum");
910    opt.addOption("fs", true, "to set HDFS");
911    opt.addOption("hbase_root", true, "to set hbase_root directory");
912
913    opt.addOption("overwrite", false, "overwrite the favored nodes for a single region,"
914      + "for example: -update -r regionName -f server1:port,server2:port,server3:port");
915    opt.addOption("r", true, "The region name that needs to be updated");
916    opt.addOption("f", true, "The new favored nodes");
917
918    opt.addOption("tables", true,
919      "The list of table names splitted by ',' ;" + "For example: -tables: t1,t2,...,tn");
920    opt.addOption("l", "locality", true, "enforce the maximum locality");
921    opt.addOption("m", "min-move", true, "enforce minimum assignment move");
922    opt.addOption("diff", false, "calculate difference between assignment plans");
923    opt.addOption("munkres", false, "use munkres to place secondaries and tertiaries");
924    opt.addOption("ld", "locality-dispersion", false,
925      "print locality and dispersion " + "information for current plan");
926    try {
927      CommandLine cmd = new GnuParser().parse(opt, args);
928      Configuration conf = HBaseConfiguration.create();
929
930      boolean enforceMinAssignmentMove = true;
931      boolean enforceLocality = true;
932      boolean verificationDetails = false;
933
934      // Read all the options
935      if (
936        (cmd.hasOption("l") && cmd.getOptionValue("l").equalsIgnoreCase("false"))
937          || (cmd.hasOption("locality") && cmd.getOptionValue("locality").equalsIgnoreCase("false"))
938      ) {
939        enforceLocality = false;
940      }
941
942      if (
943        (cmd.hasOption("m") && cmd.getOptionValue("m").equalsIgnoreCase("false"))
944          || (cmd.hasOption("min-move") && cmd.getOptionValue("min-move").equalsIgnoreCase("false"))
945      ) {
946        enforceMinAssignmentMove = false;
947      }
948
949      if (cmd.hasOption("zk")) {
950        conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
951        LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
952      }
953
954      if (cmd.hasOption("fs")) {
955        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
956        LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
957      }
958
959      if (cmd.hasOption("hbase_root")) {
960        conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
961        LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
962      }
963
964      // Create the region placement obj
965      try (RegionPlacementMaintainer rp =
966        new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
967
968        if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
969          verificationDetails = true;
970        }
971
972        if (cmd.hasOption("tables")) {
973          String tableNameListStr = cmd.getOptionValue("tables");
974          String[] tableNames = StringUtils.split(tableNameListStr, ",");
975          rp.setTargetTableName(tableNames);
976        }
977
978        if (cmd.hasOption("munkres")) {
979          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
980        }
981
982        // Read all the modes
983        if (cmd.hasOption("v") || cmd.hasOption("verify")) {
984          // Verify the region placement.
985          rp.verifyRegionPlacement(verificationDetails);
986        } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
987          // Generate the assignment plan only without updating the hbase:meta and RS
988          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
989          printAssignmentPlan(plan);
990        } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
991          // Generate the new assignment plan
992          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
993          // Print the new assignment plan
994          printAssignmentPlan(plan);
995          // Write the new assignment plan to META
996          rp.updateAssignmentPlanToMeta(plan);
997        } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
998          // Generate the new assignment plan
999          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1000          // Print the new assignment plan
1001          printAssignmentPlan(plan);
1002          // Update the assignment to hbase:meta and Region Servers
1003          rp.updateAssignmentPlan(plan);
1004        } else if (cmd.hasOption("diff")) {
1005          FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
1006          Map<String, Map<String, Float>> locality =
1007            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1008          Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1009          rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1010          System.out.println("Do you want to update the assignment plan? [y/n]");
1011          Scanner s = new Scanner(System.in);
1012          String input = s.nextLine().trim();
1013          if (input.equals("y")) {
1014            System.out.println("Updating assignment plan...");
1015            rp.updateAssignmentPlan(newPlan);
1016          }
1017          s.close();
1018        } else if (cmd.hasOption("ld")) {
1019          Map<String, Map<String, Float>> locality =
1020            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1021          rp.printLocalityAndDispersionForCurrentPlan(locality);
1022        } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1023          FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1024          printAssignmentPlan(plan);
1025        } else if (cmd.hasOption("overwrite")) {
1026          if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1027            throw new IllegalArgumentException("Please specify: "
1028              + " -update -r regionName -f server1:port,server2:port,server3:port");
1029          }
1030
1031          String regionName = cmd.getOptionValue("r");
1032          String favoredNodesStr = cmd.getOptionValue("f");
1033          LOG.info("Going to update the region " + regionName + " with the new favored nodes "
1034            + favoredNodesStr);
1035          List<ServerName> favoredNodes = null;
1036          RegionInfo regionInfo =
1037            rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1038          if (regionInfo == null) {
1039            LOG.error("Cannot find the region " + regionName + " from the META");
1040          } else {
1041            try {
1042              favoredNodes = getFavoredNodeList(favoredNodesStr);
1043            } catch (IllegalArgumentException e) {
1044              LOG.error("Cannot parse the invalid favored nodes because " + e);
1045            }
1046            FavoredNodesPlan newPlan = new FavoredNodesPlan();
1047            newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1048            rp.updateAssignmentPlan(newPlan);
1049          }
1050        } else {
1051          printHelp(opt);
1052        }
1053      }
1054    } catch (ParseException e) {
1055      printHelp(opt);
1056    }
1057  }
1058}