001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master;
021
022import java.io.IOException;
023import java.text.DecimalFormat;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Random;
030import java.util.Scanner;
031import java.util.Set;
032import java.util.TreeMap;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.ClusterConnection;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
046import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
047import org.apache.hadoop.hbase.util.FSUtils;
048import org.apache.hadoop.hbase.util.MunkresAssignment;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
058import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
065
066/**
067 * A tool that is used for manipulating and viewing favored nodes information
068 * for regions. Run with -h to get a list of the options
069 */
070@InterfaceAudience.Private
071// TODO: Remove? Unused. Partially implemented only.
072public class RegionPlacementMaintainer {
073  private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
074      .getName());
075  //The cost of a placement that should never be assigned.
076  private static final float MAX_COST = Float.POSITIVE_INFINITY;
077
078  // The cost of a placement that is undesirable but acceptable.
079  private static final float AVOID_COST = 100000f;
080
081  // The amount by which the cost of a placement is increased if it is the
082  // last slot of the server. This is done to more evenly distribute the slop
083  // amongst servers.
084  private static final float LAST_SLOT_COST_PENALTY = 0.5f;
085
086  // The amount by which the cost of a primary placement is penalized if it is
087  // not the host currently serving the region. This is done to minimize moves.
088  private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
089
090  private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
091
092  private Configuration conf;
093  private final boolean enforceLocality;
094  private final boolean enforceMinAssignmentMove;
095  private RackManager rackManager;
096  private Set<TableName> targetTableSet;
097  private final Connection connection;
098
099  public RegionPlacementMaintainer(Configuration conf) {
100    this(conf, true, true);
101  }
102
103  public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
104      boolean enforceMinAssignmentMove) {
105    this.conf = conf;
106    this.enforceLocality = enforceLocality;
107    this.enforceMinAssignmentMove = enforceMinAssignmentMove;
108    this.targetTableSet = new HashSet<>();
109    this.rackManager = new RackManager(conf);
110    try {
111      this.connection = ConnectionFactory.createConnection(this.conf);
112    } catch (IOException e) {
113      throw new RuntimeException(e);
114    }
115  }
116
117  private static void printHelp(Options opt) {
118    new HelpFormatter().printHelp(
119        "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
120        "-diff>" +
121        " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
122        " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
123  }
124
125  public void setTargetTableName(String[] tableNames) {
126    if (tableNames != null) {
127      for (String table : tableNames)
128        this.targetTableSet.add(TableName.valueOf(table));
129    }
130  }
131
132  /**
133   * @return the new RegionAssignmentSnapshot
134   * @throws IOException
135   */
136  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
137  throws IOException {
138    SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
139      new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
140    currentAssignmentShapshot.initialize();
141    return currentAssignmentShapshot;
142  }
143
144  /**
145   * Verify the region placement is consistent with the assignment plan
146   * @param isDetailMode
147   * @return reports
148   * @throws IOException
149   */
150  public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
151      throws IOException {
152    System.out.println("Start to verify the region assignment and " +
153        "generate the verification report");
154    // Get the region assignment snapshot
155    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
156
157    // Get all the tables
158    Set<TableName> tables = snapshot.getTableSet();
159
160    // Get the region locality map
161    Map<String, Map<String, Float>> regionLocalityMap = null;
162    if (this.enforceLocality == true) {
163      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
164    }
165    List<AssignmentVerificationReport> reports = new ArrayList<>();
166    // Iterate all the tables to fill up the verification report
167    for (TableName table : tables) {
168      if (!this.targetTableSet.isEmpty() &&
169          !this.targetTableSet.contains(table)) {
170        continue;
171      }
172      AssignmentVerificationReport report = new AssignmentVerificationReport();
173      report.fillUp(table, snapshot, regionLocalityMap);
174      report.print(isDetailMode);
175      reports.add(report);
176    }
177    return reports;
178  }
179
180  /**
181   * Generate the assignment plan for the existing table
182   *
183   * @param tableName
184   * @param assignmentSnapshot
185   * @param regionLocalityMap
186   * @param plan
187   * @param munkresForSecondaryAndTertiary if set on true the assignment plan
188   * for the tertiary and secondary will be generated with Munkres algorithm,
189   * otherwise will be generated using placeSecondaryAndTertiaryRS
190   * @throws IOException
191   */
192  private void genAssignmentPlan(TableName tableName,
193    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
194    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
195    boolean munkresForSecondaryAndTertiary) throws IOException {
196    // Get the all the regions for the current table
197    List<RegionInfo> regions =
198      assignmentSnapshot.getTableToRegionMap().get(tableName);
199    int numRegions = regions.size();
200
201    // Get the current assignment map
202    Map<RegionInfo, ServerName> currentAssignmentMap =
203      assignmentSnapshot.getRegionToRegionServerMap();
204
205    // Get the all the region servers
206    List<ServerName> servers = new ArrayList<>();
207    try (Admin admin = this.connection.getAdmin()) {
208      servers.addAll(admin.getRegionServers());
209    }
210
211    LOG.info("Start to generate assignment plan for " + numRegions +
212        " regions from table " + tableName + " with " +
213        servers.size() + " region servers");
214
215    int slotsPerServer = (int) Math.ceil((float) numRegions /
216        servers.size());
217    int regionSlots = slotsPerServer * servers.size();
218
219    // Compute the primary, secondary and tertiary costs for each region/server
220    // pair. These costs are based only on node locality and rack locality, and
221    // will be modified later.
222    float[][] primaryCost = new float[numRegions][regionSlots];
223    float[][] secondaryCost = new float[numRegions][regionSlots];
224    float[][] tertiaryCost = new float[numRegions][regionSlots];
225
226    if (this.enforceLocality && regionLocalityMap != null) {
227      // Transform the locality mapping into a 2D array, assuming that any
228      // unspecified locality value is 0.
229      float[][] localityPerServer = new float[numRegions][regionSlots];
230      for (int i = 0; i < numRegions; i++) {
231        Map<String, Float> serverLocalityMap =
232            regionLocalityMap.get(regions.get(i).getEncodedName());
233        if (serverLocalityMap == null) {
234          continue;
235        }
236        for (int j = 0; j < servers.size(); j++) {
237          String serverName = servers.get(j).getHostname();
238          if (serverName == null) {
239            continue;
240          }
241          Float locality = serverLocalityMap.get(serverName);
242          if (locality == null) {
243            continue;
244          }
245          for (int k = 0; k < slotsPerServer; k++) {
246            // If we can't find the locality of a region to a server, which occurs
247            // because locality is only reported for servers which have some
248            // blocks of a region local, then the locality for that pair is 0.
249            localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
250          }
251        }
252      }
253
254      // Compute the total rack locality for each region in each rack. The total
255      // rack locality is the sum of the localities of a region on all servers in
256      // a rack.
257      Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
258      for (int i = 0; i < numRegions; i++) {
259        RegionInfo region = regions.get(i);
260        for (int j = 0; j < regionSlots; j += slotsPerServer) {
261          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
262          Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
263          if (rackLocality == null) {
264            rackLocality = new HashMap<>();
265            rackRegionLocality.put(rack, rackLocality);
266          }
267          Float localityObj = rackLocality.get(region);
268          float locality = localityObj == null ? 0 : localityObj.floatValue();
269          locality += localityPerServer[i][j];
270          rackLocality.put(region, locality);
271        }
272      }
273      for (int i = 0; i < numRegions; i++) {
274        for (int j = 0; j < regionSlots; j++) {
275          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
276          Float totalRackLocalityObj =
277              rackRegionLocality.get(rack).get(regions.get(i));
278          float totalRackLocality = totalRackLocalityObj == null ?
279              0 : totalRackLocalityObj.floatValue();
280
281          // Primary cost aims to favor servers with high node locality and low
282          // rack locality, so that secondaries and tertiaries can be chosen for
283          // nodes with high rack locality. This might give primaries with
284          // slightly less locality at first compared to a cost which only
285          // considers the node locality, but should be better in the long run.
286          primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] -
287              totalRackLocality);
288
289          // Secondary cost aims to favor servers with high node locality and high
290          // rack locality since the tertiary will be chosen from the same rack as
291          // the secondary. This could be negative, but that is okay.
292          secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
293
294          // Tertiary cost is only concerned with the node locality. It will later
295          // be restricted to only hosts on the same rack as the secondary.
296          tertiaryCost[i][j] = 1 - localityPerServer[i][j];
297        }
298      }
299    }
300
301    if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
302      // We want to minimize the number of regions which move as the result of a
303      // new assignment. Therefore, slightly penalize any placement which is for
304      // a host that is not currently serving the region.
305      for (int i = 0; i < numRegions; i++) {
306        for (int j = 0; j < servers.size(); j++) {
307          ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
308          if (currentAddress != null &&
309              !currentAddress.equals(servers.get(j))) {
310            for (int k = 0; k < slotsPerServer; k++) {
311              primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
312            }
313          }
314        }
315      }
316    }
317
318    // Artificially increase cost of last slot of each server to evenly
319    // distribute the slop, otherwise there will be a few servers with too few
320    // regions and many servers with the max number of regions.
321    for (int i = 0; i < numRegions; i++) {
322      for (int j = 0; j < regionSlots; j += slotsPerServer) {
323        primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
324        secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
325        tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
326      }
327    }
328
329    RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions,
330        regionSlots);
331    primaryCost = randomizedMatrix.transform(primaryCost);
332    int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
333    primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
334
335    // Modify the secondary and tertiary costs for each region/server pair to
336    // prevent a region from being assigned to the same rack for both primary
337    // and either one of secondary or tertiary.
338    for (int i = 0; i < numRegions; i++) {
339      int slot = primaryAssignment[i];
340      String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
341      for (int k = 0; k < servers.size(); k++) {
342        if (!rackManager.getRack(servers.get(k)).equals(rack)) {
343          continue;
344        }
345        if (k == slot / slotsPerServer) {
346          // Same node, do not place secondary or tertiary here ever.
347          for (int m = 0; m < slotsPerServer; m++) {
348            secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
349            tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
350          }
351        } else {
352          // Same rack, do not place secondary or tertiary here if possible.
353          for (int m = 0; m < slotsPerServer; m++) {
354            secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
355            tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
356          }
357        }
358      }
359    }
360    if (munkresForSecondaryAndTertiary) {
361      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
362      secondaryCost = randomizedMatrix.transform(secondaryCost);
363      int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
364      secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
365
366      // Modify the tertiary costs for each region/server pair to ensure that a
367      // region is assigned to a tertiary server on the same rack as its secondary
368      // server, but not the same server in that rack.
369      for (int i = 0; i < numRegions; i++) {
370        int slot = secondaryAssignment[i];
371        String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
372        for (int k = 0; k < servers.size(); k++) {
373          if (k == slot / slotsPerServer) {
374            // Same node, do not place tertiary here ever.
375            for (int m = 0; m < slotsPerServer; m++) {
376              tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
377            }
378          } else {
379            if (rackManager.getRack(servers.get(k)).equals(rack)) {
380              continue;
381            }
382            // Different rack, do not place tertiary here if possible.
383            for (int m = 0; m < slotsPerServer; m++) {
384              tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
385            }
386          }
387        }
388      }
389
390      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
391      tertiaryCost = randomizedMatrix.transform(tertiaryCost);
392      int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
393      tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
394
395      for (int i = 0; i < numRegions; i++) {
396        List<ServerName> favoredServers
397          = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
398        ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
399        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
400            ServerName.NON_STARTCODE));
401
402        s = servers.get(secondaryAssignment[i] / slotsPerServer);
403        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
404            ServerName.NON_STARTCODE));
405
406        s = servers.get(tertiaryAssignment[i] / slotsPerServer);
407        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
408            ServerName.NON_STARTCODE));
409        // Update the assignment plan
410        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
411      }
412      LOG.info("Generated the assignment plan for " + numRegions +
413          " regions from table " + tableName + " with " +
414          servers.size() + " region servers");
415      LOG.info("Assignment plan for secondary and tertiary generated " +
416          "using MunkresAssignment");
417    } else {
418      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
419      for (int i = 0; i < numRegions; i++) {
420        primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
421      }
422      FavoredNodeAssignmentHelper favoredNodeHelper =
423          new FavoredNodeAssignmentHelper(servers, conf);
424      favoredNodeHelper.initialize();
425      Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
426          favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
427      for (int i = 0; i < numRegions; i++) {
428        List<ServerName> favoredServers
429          = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
430        RegionInfo currentRegion = regions.get(i);
431        ServerName s = primaryRSMap.get(currentRegion);
432        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
433            ServerName.NON_STARTCODE));
434
435        ServerName[] secondaryAndTertiary =
436            secondaryAndTertiaryMap.get(currentRegion);
437        s = secondaryAndTertiary[0];
438        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
439            ServerName.NON_STARTCODE));
440
441        s = secondaryAndTertiary[1];
442        favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
443            ServerName.NON_STARTCODE));
444        // Update the assignment plan
445        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
446      }
447      LOG.info("Generated the assignment plan for " + numRegions +
448          " regions from table " + tableName + " with " +
449          servers.size() + " region servers");
450      LOG.info("Assignment plan for secondary and tertiary generated " +
451          "using placeSecondaryAndTertiaryWithRestrictions method");
452    }
453  }
454
455  public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
456    // Get the current region assignment snapshot by scanning from the META
457    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot =
458      this.getRegionAssignmentSnapshot();
459
460    // Get the region locality map
461    Map<String, Map<String, Float>> regionLocalityMap = null;
462    if (this.enforceLocality) {
463      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
464    }
465    // Initialize the assignment plan
466    FavoredNodesPlan plan = new FavoredNodesPlan();
467
468    // Get the table to region mapping
469    Map<TableName, List<RegionInfo>> tableToRegionMap =
470      assignmentSnapshot.getTableToRegionMap();
471    LOG.info("Start to generate the new assignment plan for the " +
472         + tableToRegionMap.keySet().size() + " tables" );
473    for (TableName table : tableToRegionMap.keySet()) {
474      try {
475        if (!this.targetTableSet.isEmpty() &&
476            !this.targetTableSet.contains(table)) {
477          continue;
478        }
479        // TODO: maybe run the placement in parallel for each table
480        genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
481            USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
482      } catch (Exception e) {
483        LOG.error("Get some exceptions for placing primary region server" +
484            "for table " + table + " because " + e);
485      }
486    }
487    LOG.info("Finish to generate the new assignment plan for the " +
488        + tableToRegionMap.keySet().size() + " tables" );
489    return plan;
490  }
491
492  /**
493   * Some algorithms for solving the assignment problem may traverse workers or
494   * jobs in linear order which may result in skewing the assignments of the
495   * first jobs in the matrix toward the last workers in the matrix if the
496   * costs are uniform. To avoid this kind of clumping, we can randomize the
497   * rows and columns of the cost matrix in a reversible way, such that the
498   * solution to the assignment problem can be interpreted in terms of the
499   * original untransformed cost matrix. Rows and columns are transformed
500   * independently such that the elements contained in any row of the input
501   * matrix are the same as the elements in the corresponding output matrix,
502   * and each row has its elements transformed in the same way. Similarly for
503   * columns.
504   */
505  protected static class RandomizedMatrix {
506    private final int rows;
507    private final int cols;
508    private final int[] rowTransform;
509    private final int[] rowInverse;
510    private final int[] colTransform;
511    private final int[] colInverse;
512
513    /**
514     * Create a randomization scheme for a matrix of a given size.
515     * @param rows the number of rows in the matrix
516     * @param cols the number of columns in the matrix
517     */
518    public RandomizedMatrix(int rows, int cols) {
519      this.rows = rows;
520      this.cols = cols;
521      Random random = new Random();
522      rowTransform = new int[rows];
523      rowInverse = new int[rows];
524      for (int i = 0; i < rows; i++) {
525        rowTransform[i] = i;
526      }
527      // Shuffle the row indices.
528      for (int i = rows - 1; i >= 0; i--) {
529        int r = random.nextInt(i + 1);
530        int temp = rowTransform[r];
531        rowTransform[r] = rowTransform[i];
532        rowTransform[i] = temp;
533      }
534      // Generate the inverse row indices.
535      for (int i = 0; i < rows; i++) {
536        rowInverse[rowTransform[i]] = i;
537      }
538
539      colTransform = new int[cols];
540      colInverse = new int[cols];
541      for (int i = 0; i < cols; i++) {
542        colTransform[i] = i;
543      }
544      // Shuffle the column indices.
545      for (int i = cols - 1; i >= 0; i--) {
546        int r = random.nextInt(i + 1);
547        int temp = colTransform[r];
548        colTransform[r] = colTransform[i];
549        colTransform[i] = temp;
550      }
551      // Generate the inverse column indices.
552      for (int i = 0; i < cols; i++) {
553        colInverse[colTransform[i]] = i;
554      }
555    }
556
557    /**
558     * Copy a given matrix into a new matrix, transforming each row index and
559     * each column index according to the randomization scheme that was created
560     * at construction time.
561     * @param matrix the cost matrix to transform
562     * @return a new matrix with row and column indices transformed
563     */
564    public float[][] transform(float[][] matrix) {
565      float[][] result = new float[rows][cols];
566      for (int i = 0; i < rows; i++) {
567        for (int j = 0; j < cols; j++) {
568          result[rowTransform[i]][colTransform[j]] = matrix[i][j];
569        }
570      }
571      return result;
572    }
573
574    /**
575     * Copy a given matrix into a new matrix, transforming each row index and
576     * each column index according to the inverse of the randomization scheme
577     * that was created at construction time.
578     * @param matrix the cost matrix to be inverted
579     * @return a new matrix with row and column indices inverted
580     */
581    public float[][] invert(float[][] matrix) {
582      float[][] result = new float[rows][cols];
583      for (int i = 0; i < rows; i++) {
584        for (int j = 0; j < cols; j++) {
585          result[rowInverse[i]][colInverse[j]] = matrix[i][j];
586        }
587      }
588      return result;
589    }
590
591    /**
592     * Given an array where each element {@code indices[i]} represents the
593     * randomized column index corresponding to randomized row index {@code i},
594     * create a new array with the corresponding inverted indices.
595     * @param indices an array of transformed indices to be inverted
596     * @return an array of inverted indices
597     */
598    public int[] invertIndices(int[] indices) {
599      int[] result = new int[indices.length];
600      for (int i = 0; i < indices.length; i++) {
601        result[rowInverse[i]] = colInverse[indices[i]];
602      }
603      return result;
604    }
605  }
606
607  /**
608   * Print the assignment plan to the system output stream
609   * @param plan
610   */
611  public static void printAssignmentPlan(FavoredNodesPlan plan) {
612    if (plan == null) return;
613    LOG.info("========== Start to print the assignment plan ================");
614    // sort the map based on region info
615    Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
616
617    for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
618
619      String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
620      String regionName = entry.getKey();
621      LOG.info("Region: " + regionName );
622      LOG.info("Its favored nodes: " + serverList);
623    }
624    LOG.info("========== Finish to print the assignment plan ================");
625  }
626
627  /**
628   * Update the assignment plan into hbase:meta
629   * @param plan the assignments plan to be updated into hbase:meta
630   * @throws IOException if cannot update assignment plan in hbase:meta
631   */
632  public void updateAssignmentPlanToMeta(FavoredNodesPlan plan)
633  throws IOException {
634    try {
635      LOG.info("Start to update the hbase:meta with the new assignment plan");
636      Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
637      Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
638      Map<String, RegionInfo> regionToRegionInfoMap =
639        getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
640      for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
641        planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
642      }
643
644      FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
645      LOG.info("Updated the hbase:meta with the new assignment plan");
646    } catch (Exception e) {
647      LOG.error("Failed to update hbase:meta with the new assignment" +
648          "plan because " + e.getMessage());
649    }
650  }
651
652  /**
653   * Update the assignment plan to all the region servers
654   * @param plan
655   * @throws IOException
656   */
657  private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan)
658  throws IOException{
659    LOG.info("Start to update the region servers with the new assignment plan");
660    // Get the region to region server map
661    Map<ServerName, List<RegionInfo>> currentAssignment =
662      this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
663
664    // track of the failed and succeeded updates
665    int succeededNum = 0;
666    Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
667
668    for (Map.Entry<ServerName, List<RegionInfo>> entry :
669      currentAssignment.entrySet()) {
670      List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
671      try {
672        // Keep track of the favored updates for the current region server
673        FavoredNodesPlan singleServerPlan = null;
674        // Find out all the updates for the current region server
675        for (RegionInfo region : entry.getValue()) {
676          List<ServerName> favoredServerList = plan.getFavoredNodes(region);
677          if (favoredServerList != null &&
678              favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
679            // Create the single server plan if necessary
680            if (singleServerPlan == null) {
681              singleServerPlan = new FavoredNodesPlan();
682            }
683            // Update the single server update
684            singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
685            regionUpdateInfos.add(new Pair<>(region, favoredServerList));
686          }
687        }
688        if (singleServerPlan != null) {
689          // Update the current region server with its updated favored nodes
690          BlockingInterface currentRegionServer =
691            ((ClusterConnection)this.connection).getAdmin(entry.getKey());
692          UpdateFavoredNodesRequest request =
693              RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
694
695          UpdateFavoredNodesResponse updateFavoredNodesResponse =
696              currentRegionServer.updateFavoredNodes(null, request);
697          LOG.info("Region server " +
698              ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() +
699              " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
700              singleServerPlan.size() +
701              " regions with the assignment plan");
702          succeededNum ++;
703        }
704      } catch (Exception e) {
705        failedUpdateMap.put(entry.getKey(), e);
706      }
707    }
708    // log the succeeded updates
709    LOG.info("Updated " + succeededNum + " region servers with " +
710            "the new assignment plan");
711
712    // log the failed updates
713    int failedNum = failedUpdateMap.size();
714    if (failedNum != 0) {
715      LOG.error("Failed to update the following + " + failedNum +
716          " region servers with its corresponding favored nodes");
717      for (Map.Entry<ServerName, Exception> entry :
718        failedUpdateMap.entrySet() ) {
719        LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
720            " because of " + entry.getValue().getMessage());
721      }
722    }
723  }
724
725  public void updateAssignmentPlan(FavoredNodesPlan plan)
726      throws IOException {
727    LOG.info("Start to update the new assignment plan for the hbase:meta table and" +
728        " the region servers");
729    // Update the new assignment plan to META
730    updateAssignmentPlanToMeta(plan);
731    // Update the new assignment plan to Region Servers
732    updateAssignmentPlanToRegionServers(plan);
733    LOG.info("Finish to update the new assignment plan for the hbase:meta table and" +
734        " the region servers");
735  }
736
737  /**
738   * Return how many regions will move per table since their primary RS will
739   * change
740   *
741   * @param newPlan - new AssignmentPlan
742   * @return how many primaries will move per table
743   */
744  public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan)
745      throws IOException {
746    Map<TableName, Integer> movesPerTable = new HashMap<>();
747    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
748    Map<TableName, List<RegionInfo>> tableToRegions = snapshot
749        .getTableToRegionMap();
750    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
751    Set<TableName> tables = snapshot.getTableSet();
752    for (TableName table : tables) {
753      int movedPrimaries = 0;
754      if (!this.targetTableSet.isEmpty()
755          && !this.targetTableSet.contains(table)) {
756        continue;
757      }
758      List<RegionInfo> regions = tableToRegions.get(table);
759      for (RegionInfo region : regions) {
760        List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
761        List<ServerName> newServers = newPlan.getFavoredNodes(region);
762        if (oldServers != null && newServers != null) {
763          ServerName oldPrimary = oldServers.get(0);
764          ServerName newPrimary = newServers.get(0);
765          if (oldPrimary.compareTo(newPrimary) != 0) {
766            movedPrimaries++;
767          }
768        }
769      }
770      movesPerTable.put(table, movedPrimaries);
771    }
772    return movesPerTable;
773  }
774
775  /**
776   * Compares two plans and check whether the locality dropped or increased
777   * (prints the information as a string) also prints the baseline locality
778   *
779   * @param movesPerTable - how many primary regions will move per table
780   * @param regionLocalityMap - locality map from FS
781   * @param newPlan - new assignment plan
782   * @throws IOException
783   */
784  public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
785      Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
786          throws IOException {
787    // localities for primary, secondary and tertiary
788    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
789    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
790    Set<TableName> tables = snapshot.getTableSet();
791    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
792    for (TableName table : tables) {
793      float[] deltaLocality = new float[3];
794      float[] locality = new float[3];
795      if (!this.targetTableSet.isEmpty()
796          && !this.targetTableSet.contains(table)) {
797        continue;
798      }
799      List<RegionInfo> regions = tableToRegionsMap.get(table);
800      System.out.println("==================================================");
801      System.out.println("Assignment Plan Projection Report For Table: " + table);
802      System.out.println("\t Total regions: " + regions.size());
803      System.out.println("\t" + movesPerTable.get(table)
804          + " primaries will move due to their primary has changed");
805      for (RegionInfo currentRegion : regions) {
806        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
807            .getEncodedName());
808        if (regionLocality == null) {
809          continue;
810        }
811        List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
812        List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
813        if (newServers != null && oldServers != null) {
814          int i=0;
815          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
816            ServerName newServer = newServers.get(p.ordinal());
817            ServerName oldServer = oldServers.get(p.ordinal());
818            Float oldLocality = 0f;
819            if (oldServers != null) {
820              oldLocality = regionLocality.get(oldServer.getHostname());
821              if (oldLocality == null) {
822                oldLocality = 0f;
823              }
824              locality[i] += oldLocality;
825            }
826            Float newLocality = regionLocality.get(newServer.getHostname());
827            if (newLocality == null) {
828              newLocality = 0f;
829            }
830            deltaLocality[i] += newLocality - oldLocality;
831            i++;
832          }
833        }
834      }
835      DecimalFormat df = new java.text.DecimalFormat( "#.##");
836      for (int i = 0; i < deltaLocality.length; i++) {
837        System.out.print("\t\t Baseline locality for ");
838        if (i == 0) {
839          System.out.print("primary ");
840        } else if (i == 1) {
841          System.out.print("secondary ");
842        } else if (i == 2) {
843          System.out.print("tertiary ");
844        }
845        System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
846        System.out.print("\t\t Locality will change with the new plan: ");
847        System.out.println(df.format(100 * deltaLocality[i] / regions.size())
848            + "%");
849      }
850      System.out.println("\t Baseline dispersion");
851      printDispersionScores(table, snapshot, regions.size(), null, true);
852      System.out.println("\t Projected dispersion");
853      printDispersionScores(table, snapshot, regions.size(), newPlan, true);
854    }
855  }
856
857  public void printDispersionScores(TableName table,
858      SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan,
859      boolean simplePrint) {
860    if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
861      return;
862    }
863    AssignmentVerificationReport report = new AssignmentVerificationReport();
864    report.fillUpDispersion(table, snapshot, newPlan);
865    List<Float> dispersion = report.getDispersionInformation();
866    if (simplePrint) {
867      DecimalFormat df = new java.text.DecimalFormat("#.##");
868      System.out.println("\tAvg dispersion score: "
869          + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: "
870          + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: "
871          + df.format(dispersion.get(2)) + " hosts;");
872    } else {
873      LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
874          + " ; The average dispersion score is " + dispersion.get(0));
875    }
876  }
877
878  public void printLocalityAndDispersionForCurrentPlan(
879      Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
880    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
881    FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
882    Set<TableName> tables = snapshot.getTableSet();
883    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot
884        .getTableToRegionMap();
885    for (TableName table : tables) {
886      float[] locality = new float[3];
887      if (!this.targetTableSet.isEmpty()
888          && !this.targetTableSet.contains(table)) {
889        continue;
890      }
891      List<RegionInfo> regions = tableToRegionsMap.get(table);
892      for (RegionInfo currentRegion : regions) {
893        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
894            .getEncodedName());
895        if (regionLocality == null) {
896          continue;
897        }
898        List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
899        if (servers != null) {
900          int i = 0;
901          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
902            ServerName server = servers.get(p.ordinal());
903            Float currentLocality = 0f;
904            if (servers != null) {
905              currentLocality = regionLocality.get(server.getHostname());
906              if (currentLocality == null) {
907                currentLocality = 0f;
908              }
909              locality[i] += currentLocality;
910            }
911            i++;
912          }
913        }
914      }
915      for (int i = 0; i < locality.length; i++) {
916        String copy =  null;
917        if (i == 0) {
918          copy = "primary";
919        } else if (i == 1) {
920          copy = "secondary";
921        } else if (i == 2) {
922          copy = "tertiary" ;
923        }
924        float avgLocality = 100 * locality[i] / regions.size();
925        LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
926            + " ; The average locality for " + copy+ " is " + avgLocality + " %");
927      }
928      printDispersionScores(table, snapshot, regions.size(), null, false);
929    }
930  }
931
932  /**
933   * @param favoredNodesStr The String of favored nodes
934   * @return the list of ServerName for the byte array of favored nodes.
935   */
936  public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
937    String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
938    if (favoredNodesArray == null)
939      return null;
940
941    List<ServerName> serverList = new ArrayList<>();
942    for (String hostNameAndPort : favoredNodesArray) {
943      serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
944    }
945    return serverList;
946  }
947
948  public static void main(String args[]) throws IOException {
949    Options opt = new Options();
950    opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
951    opt.addOption("u", "update", false,
952        "update the assignments to hbase:meta and RegionServers together");
953    opt.addOption("n", "dry-run", false, "do not write assignments to META");
954    opt.addOption("v", "verify", false, "verify current assignments against META");
955    opt.addOption("p", "print", false, "print the current assignment plan in META");
956    opt.addOption("h", "help", false, "print usage");
957    opt.addOption("d", "verification-details", false,
958        "print the details of verification report");
959
960    opt.addOption("zk", true, "to set the zookeeper quorum");
961    opt.addOption("fs", true, "to set HDFS");
962    opt.addOption("hbase_root", true, "to set hbase_root directory");
963
964    opt.addOption("overwrite", false,
965        "overwrite the favored nodes for a single region," +
966        "for example: -update -r regionName -f server1:port,server2:port,server3:port");
967    opt.addOption("r", true, "The region name that needs to be updated");
968    opt.addOption("f", true, "The new favored nodes");
969
970    opt.addOption("tables", true,
971        "The list of table names splitted by ',' ;" +
972        "For example: -tables: t1,t2,...,tn");
973    opt.addOption("l", "locality", true, "enforce the maximum locality");
974    opt.addOption("m", "min-move", true, "enforce minimum assignment move");
975    opt.addOption("diff", false, "calculate difference between assignment plans");
976    opt.addOption("munkres", false,
977        "use munkres to place secondaries and tertiaries");
978    opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
979        "information for current plan");
980    try {
981      CommandLine cmd = new GnuParser().parse(opt, args);
982      Configuration conf = HBaseConfiguration.create();
983
984      boolean enforceMinAssignmentMove = true;
985      boolean enforceLocality = true;
986      boolean verificationDetails = false;
987
988      // Read all the options
989      if ((cmd.hasOption("l") &&
990          cmd.getOptionValue("l").equalsIgnoreCase("false")) ||
991          (cmd.hasOption("locality") &&
992              cmd.getOptionValue("locality").equalsIgnoreCase("false"))) {
993        enforceLocality = false;
994      }
995
996      if ((cmd.hasOption("m") &&
997          cmd.getOptionValue("m").equalsIgnoreCase("false")) ||
998          (cmd.hasOption("min-move") &&
999              cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) {
1000        enforceMinAssignmentMove = false;
1001      }
1002
1003      if (cmd.hasOption("zk")) {
1004        conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
1005        LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
1006      }
1007
1008      if (cmd.hasOption("fs")) {
1009        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
1010        LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
1011      }
1012
1013      if (cmd.hasOption("hbase_root")) {
1014        conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
1015        LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
1016      }
1017
1018      // Create the region placement obj
1019      RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality,
1020          enforceMinAssignmentMove);
1021
1022      if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
1023        verificationDetails = true;
1024      }
1025
1026      if (cmd.hasOption("tables")) {
1027        String tableNameListStr = cmd.getOptionValue("tables");
1028        String[] tableNames = StringUtils.split(tableNameListStr, ",");
1029        rp.setTargetTableName(tableNames);
1030      }
1031
1032      if (cmd.hasOption("munkres")) {
1033        USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
1034      }
1035
1036      // Read all the modes
1037      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
1038        // Verify the region placement.
1039        rp.verifyRegionPlacement(verificationDetails);
1040      } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
1041        // Generate the assignment plan only without updating the hbase:meta and RS
1042        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1043        printAssignmentPlan(plan);
1044      } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
1045        // Generate the new assignment plan
1046        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1047        // Print the new assignment plan
1048        printAssignmentPlan(plan);
1049        // Write the new assignment plan to META
1050        rp.updateAssignmentPlanToMeta(plan);
1051      } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
1052        // Generate the new assignment plan
1053        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1054        // Print the new assignment plan
1055        printAssignmentPlan(plan);
1056        // Update the assignment to hbase:meta and Region Servers
1057        rp.updateAssignmentPlan(plan);
1058      } else if (cmd.hasOption("diff")) {
1059        FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
1060        Map<String, Map<String, Float>> locality = FSUtils
1061            .getRegionDegreeLocalityMappingFromFS(conf);
1062        Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1063        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1064        System.out.println("Do you want to update the assignment plan? [y/n]");
1065        Scanner s = new Scanner(System.in);
1066        String input = s.nextLine().trim();
1067        if (input.equals("y")) {
1068          System.out.println("Updating assignment plan...");
1069          rp.updateAssignmentPlan(newPlan);
1070        }
1071        s.close();
1072      } else if (cmd.hasOption("ld")) {
1073        Map<String, Map<String, Float>> locality = FSUtils
1074            .getRegionDegreeLocalityMappingFromFS(conf);
1075        rp.printLocalityAndDispersionForCurrentPlan(locality);
1076      } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1077        FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1078        printAssignmentPlan(plan);
1079      } else if (cmd.hasOption("overwrite")) {
1080        if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1081          throw new IllegalArgumentException("Please specify: " +
1082              " -update -r regionName -f server1:port,server2:port,server3:port");
1083        }
1084
1085        String regionName = cmd.getOptionValue("r");
1086        String favoredNodesStr = cmd.getOptionValue("f");
1087        LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
1088            favoredNodesStr);
1089        List<ServerName> favoredNodes = null;
1090        RegionInfo regionInfo =
1091            rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1092        if (regionInfo == null) {
1093          LOG.error("Cannot find the region " + regionName + " from the META");
1094        } else {
1095          try {
1096            favoredNodes = getFavoredNodeList(favoredNodesStr);
1097          } catch (IllegalArgumentException e) {
1098            LOG.error("Cannot parse the invalid favored nodes because " + e);
1099          }
1100          FavoredNodesPlan newPlan = new FavoredNodesPlan();
1101          newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1102          rp.updateAssignmentPlan(newPlan);
1103        }
1104      } else {
1105        printHelp(opt);
1106      }
1107    } catch (ParseException e) {
1108      printHelp(opt);
1109    }
1110  }
1111}