001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master;
021
022import java.io.IOException;
023import java.text.DecimalFormat;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import java.util.Scanner;
032import java.util.Set;
033import java.util.TreeMap;
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.hbase.ClusterMetrics.Option;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.ClusterConnection;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
048import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.hbase.util.MunkresAssignment;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
058import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
059import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
060import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
067
068/**
069 * A tool that is used for manipulating and viewing favored nodes information
070 * for regions. Run with -h to get a list of the options
071 */
072@InterfaceAudience.Private
073// TODO: Remove? Unused. Partially implemented only.
074public class RegionPlacementMaintainer {
075  private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
076      .getName());
077  //The cost of a placement that should never be assigned.
078  private static final float MAX_COST = Float.POSITIVE_INFINITY;
079
080  // The cost of a placement that is undesirable but acceptable.
081  private static final float AVOID_COST = 100000f;
082
083  // The amount by which the cost of a placement is increased if it is the
084  // last slot of the server. This is done to more evenly distribute the slop
085  // amongst servers.
086  private static final float LAST_SLOT_COST_PENALTY = 0.5f;
087
088  // The amount by which the cost of a primary placement is penalized if it is
089  // not the host currently serving the region. This is done to minimize moves.
090  private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
091
092  private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
093
094  private Configuration conf;
095  private final boolean enforceLocality;
096  private final boolean enforceMinAssignmentMove;
097  private RackManager rackManager;
098  private Set<TableName> targetTableSet;
099  private final Connection connection;
100
101  public RegionPlacementMaintainer(Configuration conf) {
102    this(conf, true, true);
103  }
104
105  public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
106      boolean enforceMinAssignmentMove) {
107    this.conf = conf;
108    this.enforceLocality = enforceLocality;
109    this.enforceMinAssignmentMove = enforceMinAssignmentMove;
110    this.targetTableSet = new HashSet<>();
111    this.rackManager = new RackManager(conf);
112    try {
113      this.connection = ConnectionFactory.createConnection(this.conf);
114    } catch (IOException e) {
115      throw new RuntimeException(e);
116    }
117  }
118
119  private static void printHelp(Options opt) {
120    new HelpFormatter().printHelp(
121        "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
122        "-diff>" +
123        " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
124        " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
125  }
126
127  public void setTargetTableName(String[] tableNames) {
128    if (tableNames != null) {
129      for (String table : tableNames)
130        this.targetTableSet.add(TableName.valueOf(table));
131    }
132  }
133
134  /**
135   * @return the new RegionAssignmentSnapshot
136   * @throws IOException
137   */
138  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
139  throws IOException {
140    SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
141      new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
142    currentAssignmentShapshot.initialize();
143    return currentAssignmentShapshot;
144  }
145
146  /**
147   * Verify the region placement is consistent with the assignment plan
148   * @param isDetailMode
149   * @return reports
150   * @throws IOException
151   */
152  public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
153      throws IOException {
154    System.out.println("Start to verify the region assignment and " +
155        "generate the verification report");
156    // Get the region assignment snapshot
157    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
158
159    // Get all the tables
160    Set<TableName> tables = snapshot.getTableSet();
161
162    // Get the region locality map
163    Map<String, Map<String, Float>> regionLocalityMap = null;
164    if (this.enforceLocality == true) {
165      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
166    }
167    List<AssignmentVerificationReport> reports = new ArrayList<>();
168    // Iterate all the tables to fill up the verification report
169    for (TableName table : tables) {
170      if (!this.targetTableSet.isEmpty() &&
171          !this.targetTableSet.contains(table)) {
172        continue;
173      }
174      AssignmentVerificationReport report = new AssignmentVerificationReport();
175      report.fillUp(table, snapshot, regionLocalityMap);
176      report.print(isDetailMode);
177      reports.add(report);
178    }
179    return reports;
180  }
181
182  /**
183   * Generate the assignment plan for the existing table
184   *
185   * @param tableName
186   * @param assignmentSnapshot
187   * @param regionLocalityMap
188   * @param plan
189   * @param munkresForSecondaryAndTertiary if set on true the assignment plan
190   * for the tertiary and secondary will be generated with Munkres algorithm,
191   * otherwise will be generated using placeSecondaryAndTertiaryRS
192   * @throws IOException
193   */
194  private void genAssignmentPlan(TableName tableName,
195    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
196    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
197    boolean munkresForSecondaryAndTertiary) throws IOException {
198    // Get the all the regions for the current table
199    List<RegionInfo> regions =
200      assignmentSnapshot.getTableToRegionMap().get(tableName);
201    int numRegions = regions.size();
202
203    // Get the current assignment map
204    Map<RegionInfo, ServerName> currentAssignmentMap =
205      assignmentSnapshot.getRegionToRegionServerMap();
206
207    // Get the all the region servers
208    List<ServerName> servers = new ArrayList<>();
209    try (Admin admin = this.connection.getAdmin()) {
210      servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
211        .getLiveServerMetrics().keySet());
212    }
213
214    LOG.info("Start to generate assignment plan for " + numRegions +
215        " regions from table " + tableName + " with " +
216        servers.size() + " region servers");
217
218    int slotsPerServer = (int) Math.ceil((float) numRegions /
219        servers.size());
220    int regionSlots = slotsPerServer * servers.size();
221
222    // Compute the primary, secondary and tertiary costs for each region/server
223    // pair. These costs are based only on node locality and rack locality, and
224    // will be modified later.
225    float[][] primaryCost = new float[numRegions][regionSlots];
226    float[][] secondaryCost = new float[numRegions][regionSlots];
227    float[][] tertiaryCost = new float[numRegions][regionSlots];
228
229    if (this.enforceLocality && regionLocalityMap != null) {
230      // Transform the locality mapping into a 2D array, assuming that any
231      // unspecified locality value is 0.
232      float[][] localityPerServer = new float[numRegions][regionSlots];
233      for (int i = 0; i < numRegions; i++) {
234        Map<String, Float> serverLocalityMap =
235            regionLocalityMap.get(regions.get(i).getEncodedName());
236        if (serverLocalityMap == null) {
237          continue;
238        }
239        for (int j = 0; j < servers.size(); j++) {
240          String serverName = servers.get(j).getHostname();
241          if (serverName == null) {
242            continue;
243          }
244          Float locality = serverLocalityMap.get(serverName);
245          if (locality == null) {
246            continue;
247          }
248          for (int k = 0; k < slotsPerServer; k++) {
249            // If we can't find the locality of a region to a server, which occurs
250            // because locality is only reported for servers which have some
251            // blocks of a region local, then the locality for that pair is 0.
252            localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
253          }
254        }
255      }
256
257      // Compute the total rack locality for each region in each rack. The total
258      // rack locality is the sum of the localities of a region on all servers in
259      // a rack.
260      Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
261      for (int i = 0; i < numRegions; i++) {
262        RegionInfo region = regions.get(i);
263        for (int j = 0; j < regionSlots; j += slotsPerServer) {
264          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
265          Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
266          if (rackLocality == null) {
267            rackLocality = new HashMap<>();
268            rackRegionLocality.put(rack, rackLocality);
269          }
270          Float localityObj = rackLocality.get(region);
271          float locality = localityObj == null ? 0 : localityObj.floatValue();
272          locality += localityPerServer[i][j];
273          rackLocality.put(region, locality);
274        }
275      }
276      for (int i = 0; i < numRegions; i++) {
277        for (int j = 0; j < regionSlots; j++) {
278          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
279          Float totalRackLocalityObj =
280              rackRegionLocality.get(rack).get(regions.get(i));
281          float totalRackLocality = totalRackLocalityObj == null ?
282              0 : totalRackLocalityObj.floatValue();
283
284          // Primary cost aims to favor servers with high node locality and low
285          // rack locality, so that secondaries and tertiaries can be chosen for
286          // nodes with high rack locality. This might give primaries with
287          // slightly less locality at first compared to a cost which only
288          // considers the node locality, but should be better in the long run.
289          primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] -
290              totalRackLocality);
291
292          // Secondary cost aims to favor servers with high node locality and high
293          // rack locality since the tertiary will be chosen from the same rack as
294          // the secondary. This could be negative, but that is okay.
295          secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
296
297          // Tertiary cost is only concerned with the node locality. It will later
298          // be restricted to only hosts on the same rack as the secondary.
299          tertiaryCost[i][j] = 1 - localityPerServer[i][j];
300        }
301      }
302    }
303
304    if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
305      // We want to minimize the number of regions which move as the result of a
306      // new assignment. Therefore, slightly penalize any placement which is for
307      // a host that is not currently serving the region.
308      for (int i = 0; i < numRegions; i++) {
309        for (int j = 0; j < servers.size(); j++) {
310          ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
311          if (currentAddress != null &&
312              !currentAddress.equals(servers.get(j))) {
313            for (int k = 0; k < slotsPerServer; k++) {
314              primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
315            }
316          }
317        }
318      }
319    }
320
321    // Artificially increase cost of last slot of each server to evenly
322    // distribute the slop, otherwise there will be a few servers with too few
323    // regions and many servers with the max number of regions.
324    for (int i = 0; i < numRegions; i++) {
325      for (int j = 0; j < regionSlots; j += slotsPerServer) {
326        primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
327        secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
328        tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
329      }
330    }
331
332    RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions,
333        regionSlots);
334    primaryCost = randomizedMatrix.transform(primaryCost);
335    int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
336    primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
337
338    // Modify the secondary and tertiary costs for each region/server pair to
339    // prevent a region from being assigned to the same rack for both primary
340    // and either one of secondary or tertiary.
341    for (int i = 0; i < numRegions; i++) {
342      int slot = primaryAssignment[i];
343      String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
344      for (int k = 0; k < servers.size(); k++) {
345        if (!rackManager.getRack(servers.get(k)).equals(rack)) {
346          continue;
347        }
348        if (k == slot / slotsPerServer) {
349          // Same node, do not place secondary or tertiary here ever.
350          for (int m = 0; m < slotsPerServer; m++) {
351            secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
352            tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
353          }
354        } else {
355          // Same rack, do not place secondary or tertiary here if possible.
356          for (int m = 0; m < slotsPerServer; m++) {
357            secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
358            tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
359          }
360        }
361      }
362    }
363    if (munkresForSecondaryAndTertiary) {
364      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
365      secondaryCost = randomizedMatrix.transform(secondaryCost);
366      int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
367      secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
368
369      // Modify the tertiary costs for each region/server pair to ensure that a
370      // region is assigned to a tertiary server on the same rack as its secondary
371      // server, but not the same server in that rack.
372      for (int i = 0; i < numRegions; i++) {
373        int slot = secondaryAssignment[i];
374        String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
375        for (int k = 0; k < servers.size(); k++) {
376          if (k == slot / slotsPerServer) {
377            // Same node, do not place tertiary here ever.
378            for (int m = 0; m < slotsPerServer; m++) {
379              tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
380            }
381          } else {
382            if (rackManager.getRack(servers.get(k)).equals(rack)) {
383              continue;
384            }
385            // Different rack, do not place tertiary here if possible.
386            for (int m = 0; m < slotsPerServer; m++) {
387              tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
388            }
389          }
390        }
391      }
392
393      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
394      tertiaryCost = randomizedMatrix.transform(tertiaryCost);
395      int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
396      tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
397
398      for (int i = 0; i < numRegions; i++) {
399        List<ServerName> favoredServers
400          = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
401        ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
402        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
403            ServerName.NON_STARTCODE));
404
405        s = servers.get(secondaryAssignment[i] / slotsPerServer);
406        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
407            ServerName.NON_STARTCODE));
408
409        s = servers.get(tertiaryAssignment[i] / slotsPerServer);
410        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
411            ServerName.NON_STARTCODE));
412        // Update the assignment plan
413        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
414      }
415      LOG.info("Generated the assignment plan for " + numRegions +
416          " regions from table " + tableName + " with " +
417          servers.size() + " region servers");
418      LOG.info("Assignment plan for secondary and tertiary generated " +
419          "using MunkresAssignment");
420    } else {
421      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
422      for (int i = 0; i < numRegions; i++) {
423        primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
424      }
425      FavoredNodeAssignmentHelper favoredNodeHelper =
426          new FavoredNodeAssignmentHelper(servers, conf);
427      favoredNodeHelper.initialize();
428      Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
429          favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
430      for (int i = 0; i < numRegions; i++) {
431        List<ServerName> favoredServers
432          = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
433        RegionInfo currentRegion = regions.get(i);
434        ServerName s = primaryRSMap.get(currentRegion);
435        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
436            ServerName.NON_STARTCODE));
437
438        ServerName[] secondaryAndTertiary =
439            secondaryAndTertiaryMap.get(currentRegion);
440        s = secondaryAndTertiary[0];
441        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
442            ServerName.NON_STARTCODE));
443
444        s = secondaryAndTertiary[1];
445        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
446            ServerName.NON_STARTCODE));
447        // Update the assignment plan
448        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
449      }
450      LOG.info("Generated the assignment plan for " + numRegions +
451          " regions from table " + tableName + " with " +
452          servers.size() + " region servers");
453      LOG.info("Assignment plan for secondary and tertiary generated " +
454          "using placeSecondaryAndTertiaryWithRestrictions method");
455    }
456  }
457
458  public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
459    // Get the current region assignment snapshot by scanning from the META
460    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot =
461      this.getRegionAssignmentSnapshot();
462
463    // Get the region locality map
464    Map<String, Map<String, Float>> regionLocalityMap = null;
465    if (this.enforceLocality) {
466      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
467    }
468    // Initialize the assignment plan
469    FavoredNodesPlan plan = new FavoredNodesPlan();
470
471    // Get the table to region mapping
472    Map<TableName, List<RegionInfo>> tableToRegionMap =
473      assignmentSnapshot.getTableToRegionMap();
474    LOG.info("Start to generate the new assignment plan for the " +
475         + tableToRegionMap.keySet().size() + " tables" );
476    for (TableName table : tableToRegionMap.keySet()) {
477      try {
478        if (!this.targetTableSet.isEmpty() &&
479            !this.targetTableSet.contains(table)) {
480          continue;
481        }
482        // TODO: maybe run the placement in parallel for each table
483        genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
484            USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
485      } catch (Exception e) {
486        LOG.error("Get some exceptions for placing primary region server" +
487            "for table " + table + " because " + e);
488      }
489    }
490    LOG.info("Finish to generate the new assignment plan for the " +
491        + tableToRegionMap.keySet().size() + " tables" );
492    return plan;
493  }
494
495  /**
496   * Some algorithms for solving the assignment problem may traverse workers or
497   * jobs in linear order which may result in skewing the assignments of the
498   * first jobs in the matrix toward the last workers in the matrix if the
499   * costs are uniform. To avoid this kind of clumping, we can randomize the
500   * rows and columns of the cost matrix in a reversible way, such that the
501   * solution to the assignment problem can be interpreted in terms of the
502   * original untransformed cost matrix. Rows and columns are transformed
503   * independently such that the elements contained in any row of the input
504   * matrix are the same as the elements in the corresponding output matrix,
505   * and each row has its elements transformed in the same way. Similarly for
506   * columns.
507   */
508  protected static class RandomizedMatrix {
509    private final int rows;
510    private final int cols;
511    private final int[] rowTransform;
512    private final int[] rowInverse;
513    private final int[] colTransform;
514    private final int[] colInverse;
515
516    /**
517     * Create a randomization scheme for a matrix of a given size.
518     * @param rows the number of rows in the matrix
519     * @param cols the number of columns in the matrix
520     */
521    public RandomizedMatrix(int rows, int cols) {
522      this.rows = rows;
523      this.cols = cols;
524      Random random = new Random();
525      rowTransform = new int[rows];
526      rowInverse = new int[rows];
527      for (int i = 0; i < rows; i++) {
528        rowTransform[i] = i;
529      }
530      // Shuffle the row indices.
531      for (int i = rows - 1; i >= 0; i--) {
532        int r = random.nextInt(i + 1);
533        int temp = rowTransform[r];
534        rowTransform[r] = rowTransform[i];
535        rowTransform[i] = temp;
536      }
537      // Generate the inverse row indices.
538      for (int i = 0; i < rows; i++) {
539        rowInverse[rowTransform[i]] = i;
540      }
541
542      colTransform = new int[cols];
543      colInverse = new int[cols];
544      for (int i = 0; i < cols; i++) {
545        colTransform[i] = i;
546      }
547      // Shuffle the column indices.
548      for (int i = cols - 1; i >= 0; i--) {
549        int r = random.nextInt(i + 1);
550        int temp = colTransform[r];
551        colTransform[r] = colTransform[i];
552        colTransform[i] = temp;
553      }
554      // Generate the inverse column indices.
555      for (int i = 0; i < cols; i++) {
556        colInverse[colTransform[i]] = i;
557      }
558    }
559
560    /**
561     * Copy a given matrix into a new matrix, transforming each row index and
562     * each column index according to the randomization scheme that was created
563     * at construction time.
564     * @param matrix the cost matrix to transform
565     * @return a new matrix with row and column indices transformed
566     */
567    public float[][] transform(float[][] matrix) {
568      float[][] result = new float[rows][cols];
569      for (int i = 0; i < rows; i++) {
570        for (int j = 0; j < cols; j++) {
571          result[rowTransform[i]][colTransform[j]] = matrix[i][j];
572        }
573      }
574      return result;
575    }
576
577    /**
578     * Copy a given matrix into a new matrix, transforming each row index and
579     * each column index according to the inverse of the randomization scheme
580     * that was created at construction time.
581     * @param matrix the cost matrix to be inverted
582     * @return a new matrix with row and column indices inverted
583     */
584    public float[][] invert(float[][] matrix) {
585      float[][] result = new float[rows][cols];
586      for (int i = 0; i < rows; i++) {
587        for (int j = 0; j < cols; j++) {
588          result[rowInverse[i]][colInverse[j]] = matrix[i][j];
589        }
590      }
591      return result;
592    }
593
594    /**
595     * Given an array where each element {@code indices[i]} represents the
596     * randomized column index corresponding to randomized row index {@code i},
597     * create a new array with the corresponding inverted indices.
598     * @param indices an array of transformed indices to be inverted
599     * @return an array of inverted indices
600     */
601    public int[] invertIndices(int[] indices) {
602      int[] result = new int[indices.length];
603      for (int i = 0; i < indices.length; i++) {
604        result[rowInverse[i]] = colInverse[indices[i]];
605      }
606      return result;
607    }
608  }
609
610  /**
611   * Print the assignment plan to the system output stream
612   * @param plan
613   */
614  public static void printAssignmentPlan(FavoredNodesPlan plan) {
615    if (plan == null) return;
616    LOG.info("========== Start to print the assignment plan ================");
617    // sort the map based on region info
618    Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
619
620    for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
621
622      String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
623      String regionName = entry.getKey();
624      LOG.info("Region: " + regionName );
625      LOG.info("Its favored nodes: " + serverList);
626    }
627    LOG.info("========== Finish to print the assignment plan ================");
628  }
629
630  /**
631   * Update the assignment plan into hbase:meta
632   * @param plan the assignments plan to be updated into hbase:meta
633   * @throws IOException if cannot update assignment plan in hbase:meta
634   */
635  public void updateAssignmentPlanToMeta(FavoredNodesPlan plan)
636  throws IOException {
637    try {
638      LOG.info("Start to update the hbase:meta with the new assignment plan");
639      Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
640      Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
641      Map<String, RegionInfo> regionToRegionInfoMap =
642        getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
643      for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
644        planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
645      }
646
647      FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
648      LOG.info("Updated the hbase:meta with the new assignment plan");
649    } catch (Exception e) {
650      LOG.error("Failed to update hbase:meta with the new assignment" +
651          "plan because " + e.getMessage());
652    }
653  }
654
655  /**
656   * Update the assignment plan to all the region servers
657   * @param plan
658   * @throws IOException
659   */
660  private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan)
661  throws IOException{
662    LOG.info("Start to update the region servers with the new assignment plan");
663    // Get the region to region server map
664    Map<ServerName, List<RegionInfo>> currentAssignment =
665      this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
666
667    // track of the failed and succeeded updates
668    int succeededNum = 0;
669    Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
670
671    for (Map.Entry<ServerName, List<RegionInfo>> entry :
672      currentAssignment.entrySet()) {
673      List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
674      try {
675        // Keep track of the favored updates for the current region server
676        FavoredNodesPlan singleServerPlan = null;
677        // Find out all the updates for the current region server
678        for (RegionInfo region : entry.getValue()) {
679          List<ServerName> favoredServerList = plan.getFavoredNodes(region);
680          if (favoredServerList != null &&
681              favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
682            // Create the single server plan if necessary
683            if (singleServerPlan == null) {
684              singleServerPlan = new FavoredNodesPlan();
685            }
686            // Update the single server update
687            singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
688            regionUpdateInfos.add(new Pair<>(region, favoredServerList));
689          }
690        }
691        if (singleServerPlan != null) {
692          // Update the current region server with its updated favored nodes
693          BlockingInterface currentRegionServer =
694            ((ClusterConnection)this.connection).getAdmin(entry.getKey());
695          UpdateFavoredNodesRequest request =
696              RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
697
698          UpdateFavoredNodesResponse updateFavoredNodesResponse =
699              currentRegionServer.updateFavoredNodes(null, request);
700          LOG.info("Region server " +
701              ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() +
702              " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
703              singleServerPlan.getAssignmentMap().size() +
704              " regions with the assignment plan");
705          succeededNum ++;
706        }
707      } catch (Exception e) {
708        failedUpdateMap.put(entry.getKey(), e);
709      }
710    }
711    // log the succeeded updates
712    LOG.info("Updated " + succeededNum + " region servers with " +
713            "the new assignment plan");
714
715    // log the failed updates
716    int failedNum = failedUpdateMap.size();
717    if (failedNum != 0) {
718      LOG.error("Failed to update the following + " + failedNum +
719          " region servers with its corresponding favored nodes");
720      for (Map.Entry<ServerName, Exception> entry :
721        failedUpdateMap.entrySet() ) {
722        LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
723            " because of " + entry.getValue().getMessage());
724      }
725    }
726  }
727
728  public void updateAssignmentPlan(FavoredNodesPlan plan)
729      throws IOException {
730    LOG.info("Start to update the new assignment plan for the hbase:meta table and" +
731        " the region servers");
732    // Update the new assignment plan to META
733    updateAssignmentPlanToMeta(plan);
734    // Update the new assignment plan to Region Servers
735    updateAssignmentPlanToRegionServers(plan);
736    LOG.info("Finish to update the new assignment plan for the hbase:meta table and" +
737        " the region servers");
738  }
739
740  /**
741   * Return how many regions will move per table since their primary RS will
742   * change
743   *
744   * @param newPlan - new AssignmentPlan
745   * @return how many primaries will move per table
746   */
747  public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan)
748      throws IOException {
749    Map<TableName, Integer> movesPerTable = new HashMap<>();
750    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
751    Map<TableName, List<RegionInfo>> tableToRegions = snapshot
752        .getTableToRegionMap();
753    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
754    Set<TableName> tables = snapshot.getTableSet();
755    for (TableName table : tables) {
756      int movedPrimaries = 0;
757      if (!this.targetTableSet.isEmpty()
758          && !this.targetTableSet.contains(table)) {
759        continue;
760      }
761      List<RegionInfo> regions = tableToRegions.get(table);
762      for (RegionInfo region : regions) {
763        List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
764        List<ServerName> newServers = newPlan.getFavoredNodes(region);
765        if (oldServers != null && newServers != null) {
766          ServerName oldPrimary = oldServers.get(0);
767          ServerName newPrimary = newServers.get(0);
768          if (oldPrimary.compareTo(newPrimary) != 0) {
769            movedPrimaries++;
770          }
771        }
772      }
773      movesPerTable.put(table, movedPrimaries);
774    }
775    return movesPerTable;
776  }
777
778  /**
779   * Compares two plans and check whether the locality dropped or increased
780   * (prints the information as a string) also prints the baseline locality
781   *
782   * @param movesPerTable - how many primary regions will move per table
783   * @param regionLocalityMap - locality map from FS
784   * @param newPlan - new assignment plan
785   * @throws IOException
786   */
787  public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
788      Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
789          throws IOException {
790    // localities for primary, secondary and tertiary
791    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
792    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
793    Set<TableName> tables = snapshot.getTableSet();
794    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
795    for (TableName table : tables) {
796      float[] deltaLocality = new float[3];
797      float[] locality = new float[3];
798      if (!this.targetTableSet.isEmpty()
799          && !this.targetTableSet.contains(table)) {
800        continue;
801      }
802      List<RegionInfo> regions = tableToRegionsMap.get(table);
803      System.out.println("==================================================");
804      System.out.println("Assignment Plan Projection Report For Table: " + table);
805      System.out.println("\t Total regions: " + regions.size());
806      System.out.println("\t" + movesPerTable.get(table)
807          + " primaries will move due to their primary has changed");
808      for (RegionInfo currentRegion : regions) {
809        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
810            .getEncodedName());
811        if (regionLocality == null) {
812          continue;
813        }
814        List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
815        List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
816        if (newServers != null && oldServers != null) {
817          int i=0;
818          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
819            ServerName newServer = newServers.get(p.ordinal());
820            ServerName oldServer = oldServers.get(p.ordinal());
821            Float oldLocality = 0f;
822            if (oldServers != null) {
823              oldLocality = regionLocality.get(oldServer.getHostname());
824              if (oldLocality == null) {
825                oldLocality = 0f;
826              }
827              locality[i] += oldLocality;
828            }
829            Float newLocality = regionLocality.get(newServer.getHostname());
830            if (newLocality == null) {
831              newLocality = 0f;
832            }
833            deltaLocality[i] += newLocality - oldLocality;
834            i++;
835          }
836        }
837      }
838      DecimalFormat df = new java.text.DecimalFormat( "#.##");
839      for (int i = 0; i < deltaLocality.length; i++) {
840        System.out.print("\t\t Baseline locality for ");
841        if (i == 0) {
842          System.out.print("primary ");
843        } else if (i == 1) {
844          System.out.print("secondary ");
845        } else if (i == 2) {
846          System.out.print("tertiary ");
847        }
848        System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
849        System.out.print("\t\t Locality will change with the new plan: ");
850        System.out.println(df.format(100 * deltaLocality[i] / regions.size())
851            + "%");
852      }
853      System.out.println("\t Baseline dispersion");
854      printDispersionScores(table, snapshot, regions.size(), null, true);
855      System.out.println("\t Projected dispersion");
856      printDispersionScores(table, snapshot, regions.size(), newPlan, true);
857    }
858  }
859
860  public void printDispersionScores(TableName table,
861      SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan,
862      boolean simplePrint) {
863    if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
864      return;
865    }
866    AssignmentVerificationReport report = new AssignmentVerificationReport();
867    report.fillUpDispersion(table, snapshot, newPlan);
868    List<Float> dispersion = report.getDispersionInformation();
869    if (simplePrint) {
870      DecimalFormat df = new java.text.DecimalFormat("#.##");
871      System.out.println("\tAvg dispersion score: "
872          + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: "
873          + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: "
874          + df.format(dispersion.get(2)) + " hosts;");
875    } else {
876      LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
877          + " ; The average dispersion score is " + dispersion.get(0));
878    }
879  }
880
881  public void printLocalityAndDispersionForCurrentPlan(
882      Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
883    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
884    FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
885    Set<TableName> tables = snapshot.getTableSet();
886    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot
887        .getTableToRegionMap();
888    for (TableName table : tables) {
889      float[] locality = new float[3];
890      if (!this.targetTableSet.isEmpty()
891          && !this.targetTableSet.contains(table)) {
892        continue;
893      }
894      List<RegionInfo> regions = tableToRegionsMap.get(table);
895      for (RegionInfo currentRegion : regions) {
896        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
897            .getEncodedName());
898        if (regionLocality == null) {
899          continue;
900        }
901        List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
902        if (servers != null) {
903          int i = 0;
904          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
905            ServerName server = servers.get(p.ordinal());
906            Float currentLocality = 0f;
907            if (servers != null) {
908              currentLocality = regionLocality.get(server.getHostname());
909              if (currentLocality == null) {
910                currentLocality = 0f;
911              }
912              locality[i] += currentLocality;
913            }
914            i++;
915          }
916        }
917      }
918      for (int i = 0; i < locality.length; i++) {
919        String copy =  null;
920        if (i == 0) {
921          copy = "primary";
922        } else if (i == 1) {
923          copy = "secondary";
924        } else if (i == 2) {
925          copy = "tertiary" ;
926        }
927        float avgLocality = 100 * locality[i] / regions.size();
928        LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
929            + " ; The average locality for " + copy+ " is " + avgLocality + " %");
930      }
931      printDispersionScores(table, snapshot, regions.size(), null, false);
932    }
933  }
934
935  /**
936   * @param favoredNodesStr The String of favored nodes
937   * @return the list of ServerName for the byte array of favored nodes.
938   */
939  public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
940    String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
941    if (favoredNodesArray == null)
942      return null;
943
944    List<ServerName> serverList = new ArrayList<>();
945    for (String hostNameAndPort : favoredNodesArray) {
946      serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
947    }
948    return serverList;
949  }
950
951  public static void main(String args[]) throws IOException {
952    Options opt = new Options();
953    opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
954    opt.addOption("u", "update", false,
955        "update the assignments to hbase:meta and RegionServers together");
956    opt.addOption("n", "dry-run", false, "do not write assignments to META");
957    opt.addOption("v", "verify", false, "verify current assignments against META");
958    opt.addOption("p", "print", false, "print the current assignment plan in META");
959    opt.addOption("h", "help", false, "print usage");
960    opt.addOption("d", "verification-details", false,
961        "print the details of verification report");
962
963    opt.addOption("zk", true, "to set the zookeeper quorum");
964    opt.addOption("fs", true, "to set HDFS");
965    opt.addOption("hbase_root", true, "to set hbase_root directory");
966
967    opt.addOption("overwrite", false,
968        "overwrite the favored nodes for a single region," +
969        "for example: -update -r regionName -f server1:port,server2:port,server3:port");
970    opt.addOption("r", true, "The region name that needs to be updated");
971    opt.addOption("f", true, "The new favored nodes");
972
973    opt.addOption("tables", true,
974        "The list of table names splitted by ',' ;" +
975        "For example: -tables: t1,t2,...,tn");
976    opt.addOption("l", "locality", true, "enforce the maximum locality");
977    opt.addOption("m", "min-move", true, "enforce minimum assignment move");
978    opt.addOption("diff", false, "calculate difference between assignment plans");
979    opt.addOption("munkres", false,
980        "use munkres to place secondaries and tertiaries");
981    opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
982        "information for current plan");
983    try {
984      CommandLine cmd = new GnuParser().parse(opt, args);
985      Configuration conf = HBaseConfiguration.create();
986
987      boolean enforceMinAssignmentMove = true;
988      boolean enforceLocality = true;
989      boolean verificationDetails = false;
990
991      // Read all the options
992      if ((cmd.hasOption("l") &&
993          cmd.getOptionValue("l").equalsIgnoreCase("false")) ||
994          (cmd.hasOption("locality") &&
995              cmd.getOptionValue("locality").equalsIgnoreCase("false"))) {
996        enforceLocality = false;
997      }
998
999      if ((cmd.hasOption("m") &&
1000          cmd.getOptionValue("m").equalsIgnoreCase("false")) ||
1001          (cmd.hasOption("min-move") &&
1002              cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) {
1003        enforceMinAssignmentMove = false;
1004      }
1005
1006      if (cmd.hasOption("zk")) {
1007        conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
1008        LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
1009      }
1010
1011      if (cmd.hasOption("fs")) {
1012        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
1013        LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
1014      }
1015
1016      if (cmd.hasOption("hbase_root")) {
1017        conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
1018        LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
1019      }
1020
1021      // Create the region placement obj
1022      RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality,
1023          enforceMinAssignmentMove);
1024
1025      if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
1026        verificationDetails = true;
1027      }
1028
1029      if (cmd.hasOption("tables")) {
1030        String tableNameListStr = cmd.getOptionValue("tables");
1031        String[] tableNames = StringUtils.split(tableNameListStr, ",");
1032        rp.setTargetTableName(tableNames);
1033      }
1034
1035      if (cmd.hasOption("munkres")) {
1036        USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
1037      }
1038
1039      // Read all the modes
1040      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
1041        // Verify the region placement.
1042        rp.verifyRegionPlacement(verificationDetails);
1043      } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
1044        // Generate the assignment plan only without updating the hbase:meta and RS
1045        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1046        printAssignmentPlan(plan);
1047      } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
1048        // Generate the new assignment plan
1049        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1050        // Print the new assignment plan
1051        printAssignmentPlan(plan);
1052        // Write the new assignment plan to META
1053        rp.updateAssignmentPlanToMeta(plan);
1054      } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
1055        // Generate the new assignment plan
1056        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1057        // Print the new assignment plan
1058        printAssignmentPlan(plan);
1059        // Update the assignment to hbase:meta and Region Servers
1060        rp.updateAssignmentPlan(plan);
1061      } else if (cmd.hasOption("diff")) {
1062        FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
1063        Map<String, Map<String, Float>> locality = FSUtils
1064            .getRegionDegreeLocalityMappingFromFS(conf);
1065        Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1066        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1067        System.out.println("Do you want to update the assignment plan? [y/n]");
1068        Scanner s = new Scanner(System.in);
1069        String input = s.nextLine().trim();
1070        if (input.equals("y")) {
1071          System.out.println("Updating assignment plan...");
1072          rp.updateAssignmentPlan(newPlan);
1073        }
1074        s.close();
1075      } else if (cmd.hasOption("ld")) {
1076        Map<String, Map<String, Float>> locality = FSUtils
1077            .getRegionDegreeLocalityMappingFromFS(conf);
1078        rp.printLocalityAndDispersionForCurrentPlan(locality);
1079      } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1080        FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1081        printAssignmentPlan(plan);
1082      } else if (cmd.hasOption("overwrite")) {
1083        if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1084          throw new IllegalArgumentException("Please specify: " +
1085              " -update -r regionName -f server1:port,server2:port,server3:port");
1086        }
1087
1088        String regionName = cmd.getOptionValue("r");
1089        String favoredNodesStr = cmd.getOptionValue("f");
1090        LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
1091            favoredNodesStr);
1092        List<ServerName> favoredNodes = null;
1093        RegionInfo regionInfo =
1094            rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1095        if (regionInfo == null) {
1096          LOG.error("Cannot find the region " + regionName + " from the META");
1097        } else {
1098          try {
1099            favoredNodes = getFavoredNodeList(favoredNodesStr);
1100          } catch (IllegalArgumentException e) {
1101            LOG.error("Cannot parse the invalid favored nodes because " + e);
1102          }
1103          FavoredNodesPlan newPlan = new FavoredNodesPlan();
1104          newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1105          rp.updateAssignmentPlan(newPlan);
1106        }
1107      } else {
1108        printHelp(opt);
1109      }
1110    } catch (ParseException e) {
1111      printHelp(opt);
1112    }
1113  }
1114}