001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.text.DecimalFormat;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Random;
029import java.util.Scanner;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.concurrent.ThreadLocalRandom;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.AsyncClusterConnection;
041import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
042import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
046import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
047import org.apache.hadoop.hbase.security.User;
048import org.apache.hadoop.hbase.util.FSUtils;
049import org.apache.hadoop.hbase.util.FutureUtils;
050import org.apache.hadoop.hbase.util.MunkresAssignment;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
058import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
059import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
060import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
061import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
062
063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
066
067/**
068 * A tool that is used for manipulating and viewing favored nodes information for regions. Run with
069 * -h to get a list of the options
070 */
071@InterfaceAudience.Private
072// TODO: Remove? Unused. Partially implemented only.
073public class RegionPlacementMaintainer implements Closeable {
074  private static final Logger LOG =
075    LoggerFactory.getLogger(RegionPlacementMaintainer.class.getName());
076  // The cost of a placement that should never be assigned.
077  private static final float MAX_COST = Float.POSITIVE_INFINITY;
078
079  // The cost of a placement that is undesirable but acceptable.
080  private static final float AVOID_COST = 100000f;
081
082  // The amount by which the cost of a placement is increased if it is the
083  // last slot of the server. This is done to more evenly distribute the slop
084  // amongst servers.
085  private static final float LAST_SLOT_COST_PENALTY = 0.5f;
086
087  // The amount by which the cost of a primary placement is penalized if it is
088  // not the host currently serving the region. This is done to minimize moves.
089  private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
090
091  private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
092
093  private Configuration conf;
094  private final boolean enforceLocality;
095  private final boolean enforceMinAssignmentMove;
096  private RackManager rackManager;
097  private Set<TableName> targetTableSet;
098  private AsyncClusterConnection connection;
099
100  public RegionPlacementMaintainer(Configuration conf) throws IOException {
101    this(conf, true, true);
102  }
103
104  public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
105    boolean enforceMinAssignmentMove) {
106    this.conf = conf;
107    this.enforceLocality = enforceLocality;
108    this.enforceMinAssignmentMove = enforceMinAssignmentMove;
109    this.targetTableSet = new HashSet<>();
110    this.rackManager = new RackManager(conf);
111  }
112
113  private static void printHelp(Options opt) {
114    new HelpFormatter().printHelp(
115      "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes "
116        + "-diff>" + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]"
117        + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]",
118      opt);
119  }
120
121  private AsyncClusterConnection getConnection() throws IOException {
122    if (connection == null) {
123      connection =
124        ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
125    }
126    return connection;
127  }
128
129  public void setTargetTableName(String[] tableNames) {
130    if (tableNames != null) {
131      for (String table : tableNames)
132        this.targetTableSet.add(TableName.valueOf(table));
133    }
134  }
135
136  /**
137   * @return the new RegionAssignmentSnapshot
138   */
139  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
140    SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
141      new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
142    currentAssignmentShapshot.initialize();
143    return currentAssignmentShapshot;
144  }
145
146  /**
147   * Verify the region placement is consistent with the assignment plan
148   */
149  public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
150    throws IOException {
151    System.out
152      .println("Start to verify the region assignment and " + "generate the verification report");
153    // Get the region assignment snapshot
154    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
155
156    // Get all the tables
157    Set<TableName> tables = snapshot.getTableSet();
158
159    // Get the region locality map
160    Map<String, Map<String, Float>> regionLocalityMap = null;
161    if (this.enforceLocality == true) {
162      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
163    }
164    List<AssignmentVerificationReport> reports = new ArrayList<>();
165    // Iterate all the tables to fill up the verification report
166    for (TableName table : tables) {
167      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
168        continue;
169      }
170      AssignmentVerificationReport report = new AssignmentVerificationReport();
171      report.fillUp(table, snapshot, regionLocalityMap);
172      report.print(isDetailMode);
173      reports.add(report);
174    }
175    return reports;
176  }
177
178  /**
179   * Generate the assignment plan for the existing table nnnn * @param
180   * munkresForSecondaryAndTertiary if set on true the assignment plan for the tertiary and
181   * secondary will be generated with Munkres algorithm, otherwise will be generated using
182   * placeSecondaryAndTertiaryRS n
183   */
184  private void genAssignmentPlan(TableName tableName,
185    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
186    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
187    boolean munkresForSecondaryAndTertiary) throws IOException {
188    // Get the all the regions for the current table
189    List<RegionInfo> regions = assignmentSnapshot.getTableToRegionMap().get(tableName);
190    int numRegions = regions.size();
191
192    // Get the current assignment map
193    Map<RegionInfo, ServerName> currentAssignmentMap =
194      assignmentSnapshot.getRegionToRegionServerMap();
195
196    // Get the all the region servers
197    List<ServerName> servers = new ArrayList<>();
198    servers.addAll(FutureUtils.get(getConnection().getAdmin().getRegionServers()));
199
200    LOG.info("Start to generate assignment plan for " + numRegions + " regions from table "
201      + tableName + " with " + servers.size() + " region servers");
202
203    int slotsPerServer = (int) Math.ceil((float) numRegions / servers.size());
204    int regionSlots = slotsPerServer * servers.size();
205
206    // Compute the primary, secondary and tertiary costs for each region/server
207    // pair. These costs are based only on node locality and rack locality, and
208    // will be modified later.
209    float[][] primaryCost = new float[numRegions][regionSlots];
210    float[][] secondaryCost = new float[numRegions][regionSlots];
211    float[][] tertiaryCost = new float[numRegions][regionSlots];
212
213    if (this.enforceLocality && regionLocalityMap != null) {
214      // Transform the locality mapping into a 2D array, assuming that any
215      // unspecified locality value is 0.
216      float[][] localityPerServer = new float[numRegions][regionSlots];
217      for (int i = 0; i < numRegions; i++) {
218        Map<String, Float> serverLocalityMap =
219          regionLocalityMap.get(regions.get(i).getEncodedName());
220        if (serverLocalityMap == null) {
221          continue;
222        }
223        for (int j = 0; j < servers.size(); j++) {
224          String serverName = servers.get(j).getHostname();
225          if (serverName == null) {
226            continue;
227          }
228          Float locality = serverLocalityMap.get(serverName);
229          if (locality == null) {
230            continue;
231          }
232          for (int k = 0; k < slotsPerServer; k++) {
233            // If we can't find the locality of a region to a server, which occurs
234            // because locality is only reported for servers which have some
235            // blocks of a region local, then the locality for that pair is 0.
236            localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
237          }
238        }
239      }
240
241      // Compute the total rack locality for each region in each rack. The total
242      // rack locality is the sum of the localities of a region on all servers in
243      // a rack.
244      Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
245      for (int i = 0; i < numRegions; i++) {
246        RegionInfo region = regions.get(i);
247        for (int j = 0; j < regionSlots; j += slotsPerServer) {
248          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
249          Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
250          if (rackLocality == null) {
251            rackLocality = new HashMap<>();
252            rackRegionLocality.put(rack, rackLocality);
253          }
254          Float localityObj = rackLocality.get(region);
255          float locality = localityObj == null ? 0 : localityObj.floatValue();
256          locality += localityPerServer[i][j];
257          rackLocality.put(region, locality);
258        }
259      }
260      for (int i = 0; i < numRegions; i++) {
261        for (int j = 0; j < regionSlots; j++) {
262          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
263          Float totalRackLocalityObj = rackRegionLocality.get(rack).get(regions.get(i));
264          float totalRackLocality =
265            totalRackLocalityObj == null ? 0 : totalRackLocalityObj.floatValue();
266
267          // Primary cost aims to favor servers with high node locality and low
268          // rack locality, so that secondaries and tertiaries can be chosen for
269          // nodes with high rack locality. This might give primaries with
270          // slightly less locality at first compared to a cost which only
271          // considers the node locality, but should be better in the long run.
272          primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - totalRackLocality);
273
274          // Secondary cost aims to favor servers with high node locality and high
275          // rack locality since the tertiary will be chosen from the same rack as
276          // the secondary. This could be negative, but that is okay.
277          secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
278
279          // Tertiary cost is only concerned with the node locality. It will later
280          // be restricted to only hosts on the same rack as the secondary.
281          tertiaryCost[i][j] = 1 - localityPerServer[i][j];
282        }
283      }
284    }
285
286    if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
287      // We want to minimize the number of regions which move as the result of a
288      // new assignment. Therefore, slightly penalize any placement which is for
289      // a host that is not currently serving the region.
290      for (int i = 0; i < numRegions; i++) {
291        for (int j = 0; j < servers.size(); j++) {
292          ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
293          if (currentAddress != null && !currentAddress.equals(servers.get(j))) {
294            for (int k = 0; k < slotsPerServer; k++) {
295              primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
296            }
297          }
298        }
299      }
300    }
301
302    // Artificially increase cost of last slot of each server to evenly
303    // distribute the slop, otherwise there will be a few servers with too few
304    // regions and many servers with the max number of regions.
305    for (int i = 0; i < numRegions; i++) {
306      for (int j = 0; j < regionSlots; j += slotsPerServer) {
307        primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
308        secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
309        tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
310      }
311    }
312
313    RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
314    primaryCost = randomizedMatrix.transform(primaryCost);
315    int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
316    primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
317
318    // Modify the secondary and tertiary costs for each region/server pair to
319    // prevent a region from being assigned to the same rack for both primary
320    // and either one of secondary or tertiary.
321    for (int i = 0; i < numRegions; i++) {
322      int slot = primaryAssignment[i];
323      String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
324      for (int k = 0; k < servers.size(); k++) {
325        if (!rackManager.getRack(servers.get(k)).equals(rack)) {
326          continue;
327        }
328        if (k == slot / slotsPerServer) {
329          // Same node, do not place secondary or tertiary here ever.
330          for (int m = 0; m < slotsPerServer; m++) {
331            secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
332            tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
333          }
334        } else {
335          // Same rack, do not place secondary or tertiary here if possible.
336          for (int m = 0; m < slotsPerServer; m++) {
337            secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
338            tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
339          }
340        }
341      }
342    }
343    if (munkresForSecondaryAndTertiary) {
344      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
345      secondaryCost = randomizedMatrix.transform(secondaryCost);
346      int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
347      secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
348
349      // Modify the tertiary costs for each region/server pair to ensure that a
350      // region is assigned to a tertiary server on the same rack as its secondary
351      // server, but not the same server in that rack.
352      for (int i = 0; i < numRegions; i++) {
353        int slot = secondaryAssignment[i];
354        String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
355        for (int k = 0; k < servers.size(); k++) {
356          if (k == slot / slotsPerServer) {
357            // Same node, do not place tertiary here ever.
358            for (int m = 0; m < slotsPerServer; m++) {
359              tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
360            }
361          } else {
362            if (rackManager.getRack(servers.get(k)).equals(rack)) {
363              continue;
364            }
365            // Different rack, do not place tertiary here if possible.
366            for (int m = 0; m < slotsPerServer; m++) {
367              tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
368            }
369          }
370        }
371      }
372
373      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
374      tertiaryCost = randomizedMatrix.transform(tertiaryCost);
375      int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
376      tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
377
378      for (int i = 0; i < numRegions; i++) {
379        List<ServerName> favoredServers =
380          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
381        ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
382        favoredServers
383          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
384
385        s = servers.get(secondaryAssignment[i] / slotsPerServer);
386        favoredServers
387          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
388
389        s = servers.get(tertiaryAssignment[i] / slotsPerServer);
390        favoredServers
391          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
392        // Update the assignment plan
393        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
394      }
395      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
396        + tableName + " with " + servers.size() + " region servers");
397      LOG.info("Assignment plan for secondary and tertiary generated " + "using MunkresAssignment");
398    } else {
399      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
400      for (int i = 0; i < numRegions; i++) {
401        primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
402      }
403      FavoredNodeAssignmentHelper favoredNodeHelper =
404        new FavoredNodeAssignmentHelper(servers, conf);
405      favoredNodeHelper.initialize();
406      Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
407        favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
408      for (int i = 0; i < numRegions; i++) {
409        List<ServerName> favoredServers =
410          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
411        RegionInfo currentRegion = regions.get(i);
412        ServerName s = primaryRSMap.get(currentRegion);
413        favoredServers
414          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
415
416        ServerName[] secondaryAndTertiary = secondaryAndTertiaryMap.get(currentRegion);
417        s = secondaryAndTertiary[0];
418        favoredServers
419          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
420
421        s = secondaryAndTertiary[1];
422        favoredServers
423          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
424        // Update the assignment plan
425        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
426      }
427      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
428        + tableName + " with " + servers.size() + " region servers");
429      LOG.info("Assignment plan for secondary and tertiary generated "
430        + "using placeSecondaryAndTertiaryWithRestrictions method");
431    }
432  }
433
434  public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
435    // Get the current region assignment snapshot by scanning from the META
436    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = this.getRegionAssignmentSnapshot();
437
438    // Get the region locality map
439    Map<String, Map<String, Float>> regionLocalityMap = null;
440    if (this.enforceLocality) {
441      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
442    }
443    // Initialize the assignment plan
444    FavoredNodesPlan plan = new FavoredNodesPlan();
445
446    // Get the table to region mapping
447    Map<TableName, List<RegionInfo>> tableToRegionMap = assignmentSnapshot.getTableToRegionMap();
448    LOG.info("Start to generate the new assignment plan for the "
449      + +tableToRegionMap.keySet().size() + " tables");
450    for (TableName table : tableToRegionMap.keySet()) {
451      try {
452        if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
453          continue;
454        }
455        // TODO: maybe run the placement in parallel for each table
456        genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
457          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
458      } catch (Exception e) {
459        LOG.error("Get some exceptions for placing primary region server" + "for table " + table
460          + " because " + e);
461      }
462    }
463    LOG.info("Finish to generate the new assignment plan for the "
464      + +tableToRegionMap.keySet().size() + " tables");
465    return plan;
466  }
467
468  @Override
469  public void close() throws IOException {
470    Closeables.close(connection, true);
471  }
472
473  /**
474   * Some algorithms for solving the assignment problem may traverse workers or jobs in linear order
475   * which may result in skewing the assignments of the first jobs in the matrix toward the last
476   * workers in the matrix if the costs are uniform. To avoid this kind of clumping, we can
477   * randomize the rows and columns of the cost matrix in a reversible way, such that the solution
478   * to the assignment problem can be interpreted in terms of the original untransformed cost
479   * matrix. Rows and columns are transformed independently such that the elements contained in any
480   * row of the input matrix are the same as the elements in the corresponding output matrix, and
481   * each row has its elements transformed in the same way. Similarly for columns.
482   */
483  protected static class RandomizedMatrix {
484    private final int rows;
485    private final int cols;
486    private final int[] rowTransform;
487    private final int[] rowInverse;
488    private final int[] colTransform;
489    private final int[] colInverse;
490
491    /**
492     * Create a randomization scheme for a matrix of a given size.
493     * @param rows the number of rows in the matrix
494     * @param cols the number of columns in the matrix
495     */
496    public RandomizedMatrix(int rows, int cols) {
497      this.rows = rows;
498      this.cols = cols;
499      Random random = ThreadLocalRandom.current();
500      rowTransform = new int[rows];
501      rowInverse = new int[rows];
502      for (int i = 0; i < rows; i++) {
503        rowTransform[i] = i;
504      }
505      // Shuffle the row indices.
506      for (int i = rows - 1; i >= 0; i--) {
507        int r = random.nextInt(i + 1);
508        int temp = rowTransform[r];
509        rowTransform[r] = rowTransform[i];
510        rowTransform[i] = temp;
511      }
512      // Generate the inverse row indices.
513      for (int i = 0; i < rows; i++) {
514        rowInverse[rowTransform[i]] = i;
515      }
516
517      colTransform = new int[cols];
518      colInverse = new int[cols];
519      for (int i = 0; i < cols; i++) {
520        colTransform[i] = i;
521      }
522      // Shuffle the column indices.
523      for (int i = cols - 1; i >= 0; i--) {
524        int r = random.nextInt(i + 1);
525        int temp = colTransform[r];
526        colTransform[r] = colTransform[i];
527        colTransform[i] = temp;
528      }
529      // Generate the inverse column indices.
530      for (int i = 0; i < cols; i++) {
531        colInverse[colTransform[i]] = i;
532      }
533    }
534
535    /**
536     * Copy a given matrix into a new matrix, transforming each row index and each column index
537     * according to the randomization scheme that was created at construction time.
538     * @param matrix the cost matrix to transform
539     * @return a new matrix with row and column indices transformed
540     */
541    public float[][] transform(float[][] matrix) {
542      float[][] result = new float[rows][cols];
543      for (int i = 0; i < rows; i++) {
544        for (int j = 0; j < cols; j++) {
545          result[rowTransform[i]][colTransform[j]] = matrix[i][j];
546        }
547      }
548      return result;
549    }
550
551    /**
552     * Copy a given matrix into a new matrix, transforming each row index and each column index
553     * according to the inverse of the randomization scheme that was created at construction time.
554     * @param matrix the cost matrix to be inverted
555     * @return a new matrix with row and column indices inverted
556     */
557    public float[][] invert(float[][] matrix) {
558      float[][] result = new float[rows][cols];
559      for (int i = 0; i < rows; i++) {
560        for (int j = 0; j < cols; j++) {
561          result[rowInverse[i]][colInverse[j]] = matrix[i][j];
562        }
563      }
564      return result;
565    }
566
567    /**
568     * Given an array where each element {@code indices[i]} represents the randomized column index
569     * corresponding to randomized row index {@code i}, create a new array with the corresponding
570     * inverted indices.
571     * @param indices an array of transformed indices to be inverted
572     * @return an array of inverted indices
573     */
574    public int[] invertIndices(int[] indices) {
575      int[] result = new int[indices.length];
576      for (int i = 0; i < indices.length; i++) {
577        result[rowInverse[i]] = colInverse[indices[i]];
578      }
579      return result;
580    }
581  }
582
583  /**
584   * Print the assignment plan to the system output stream n
585   */
586  public static void printAssignmentPlan(FavoredNodesPlan plan) {
587    if (plan == null) return;
588    LOG.info("========== Start to print the assignment plan ================");
589    // sort the map based on region info
590    Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
591
592    for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
593
594      String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
595      String regionName = entry.getKey();
596      LOG.info("Region: " + regionName);
597      LOG.info("Its favored nodes: " + serverList);
598    }
599    LOG.info("========== Finish to print the assignment plan ================");
600  }
601
602  /**
603   * Update the assignment plan into hbase:meta
604   * @param plan the assignments plan to be updated into hbase:meta
605   * @throws IOException if cannot update assignment plan in hbase:meta
606   */
607  public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) throws IOException {
608    try {
609      LOG.info("Start to update the hbase:meta with the new assignment plan");
610      Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
611      Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
612      Map<String, RegionInfo> regionToRegionInfoMap =
613        getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
614      for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
615        planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
616      }
617
618      FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
619      LOG.info("Updated the hbase:meta with the new assignment plan");
620    } catch (Exception e) {
621      LOG.error(
622        "Failed to update hbase:meta with the new assignment" + "plan because " + e.getMessage());
623    }
624  }
625
626  /**
627   * Update the assignment plan to all the region servers nn
628   */
629  private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) throws IOException {
630    LOG.info("Start to update the region servers with the new assignment plan");
631    // Get the region to region server map
632    Map<ServerName, List<RegionInfo>> currentAssignment =
633      this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
634
635    // track of the failed and succeeded updates
636    int succeededNum = 0;
637    Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
638
639    for (Map.Entry<ServerName, List<RegionInfo>> entry : currentAssignment.entrySet()) {
640      List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
641      try {
642        // Keep track of the favored updates for the current region server
643        FavoredNodesPlan singleServerPlan = null;
644        // Find out all the updates for the current region server
645        for (RegionInfo region : entry.getValue()) {
646          List<ServerName> favoredServerList = plan.getFavoredNodes(region);
647          if (
648            favoredServerList != null
649              && favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM
650          ) {
651            // Create the single server plan if necessary
652            if (singleServerPlan == null) {
653              singleServerPlan = new FavoredNodesPlan();
654            }
655            // Update the single server update
656            singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
657            regionUpdateInfos.add(new Pair<>(region, favoredServerList));
658          }
659        }
660        if (singleServerPlan != null) {
661          // Update the current region server with its updated favored nodes
662          AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
663          UpdateFavoredNodesRequest request =
664            RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
665          UpdateFavoredNodesResponse updateFavoredNodesResponse =
666            FutureUtils.get(rsAdmin.updateFavoredNodes(request));
667          LOG.info("Region server "
668            + FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
669              .getServerInfo()
670            + " has updated " + updateFavoredNodesResponse.getResponse() + " / "
671            + singleServerPlan.size() + " regions with the assignment plan");
672          succeededNum++;
673        }
674      } catch (Exception e) {
675        failedUpdateMap.put(entry.getKey(), e);
676      }
677    }
678    // log the succeeded updates
679    LOG.info("Updated " + succeededNum + " region servers with " + "the new assignment plan");
680
681    // log the failed updates
682    int failedNum = failedUpdateMap.size();
683    if (failedNum != 0) {
684      LOG.error("Failed to update the following + " + failedNum
685        + " region servers with its corresponding favored nodes");
686      for (Map.Entry<ServerName, Exception> entry : failedUpdateMap.entrySet()) {
687        LOG.error("Failed to update " + entry.getKey().getAddress() + " because of "
688          + entry.getValue().getMessage());
689      }
690    }
691  }
692
693  public void updateAssignmentPlan(FavoredNodesPlan plan) throws IOException {
694    LOG.info("Start to update the new assignment plan for the hbase:meta table and"
695      + " the region servers");
696    // Update the new assignment plan to META
697    updateAssignmentPlanToMeta(plan);
698    // Update the new assignment plan to Region Servers
699    updateAssignmentPlanToRegionServers(plan);
700    LOG.info("Finish to update the new assignment plan for the hbase:meta table and"
701      + " the region servers");
702  }
703
704  /**
705   * Return how many regions will move per table since their primary RS will change
706   * @param newPlan - new AssignmentPlan
707   * @return how many primaries will move per table
708   */
709  public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) throws IOException {
710    Map<TableName, Integer> movesPerTable = new HashMap<>();
711    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
712    Map<TableName, List<RegionInfo>> tableToRegions = snapshot.getTableToRegionMap();
713    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
714    Set<TableName> tables = snapshot.getTableSet();
715    for (TableName table : tables) {
716      int movedPrimaries = 0;
717      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
718        continue;
719      }
720      List<RegionInfo> regions = tableToRegions.get(table);
721      for (RegionInfo region : regions) {
722        List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
723        List<ServerName> newServers = newPlan.getFavoredNodes(region);
724        if (oldServers != null && newServers != null) {
725          ServerName oldPrimary = oldServers.get(0);
726          ServerName newPrimary = newServers.get(0);
727          if (oldPrimary.compareTo(newPrimary) != 0) {
728            movedPrimaries++;
729          }
730        }
731      }
732      movesPerTable.put(table, movedPrimaries);
733    }
734    return movesPerTable;
735  }
736
737  /**
738   * Compares two plans and check whether the locality dropped or increased (prints the information
739   * as a string) also prints the baseline locality
740   * @param movesPerTable     - how many primary regions will move per table
741   * @param regionLocalityMap - locality map from FS
742   * @param newPlan           - new assignment plan n
743   */
744  public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
745    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
746    throws IOException {
747    // localities for primary, secondary and tertiary
748    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
749    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
750    Set<TableName> tables = snapshot.getTableSet();
751    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
752    for (TableName table : tables) {
753      float[] deltaLocality = new float[3];
754      float[] locality = new float[3];
755      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
756        continue;
757      }
758      List<RegionInfo> regions = tableToRegionsMap.get(table);
759      System.out.println("==================================================");
760      System.out.println("Assignment Plan Projection Report For Table: " + table);
761      System.out.println("\t Total regions: " + regions.size());
762      System.out.println(
763        "\t" + movesPerTable.get(table) + " primaries will move due to their primary has changed");
764      for (RegionInfo currentRegion : regions) {
765        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
766        if (regionLocality == null) {
767          continue;
768        }
769        List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
770        List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
771        if (newServers != null && oldServers != null) {
772          int i = 0;
773          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
774            ServerName newServer = newServers.get(p.ordinal());
775            ServerName oldServer = oldServers.get(p.ordinal());
776            Float oldLocality = 0f;
777            if (oldServers != null) {
778              oldLocality = regionLocality.get(oldServer.getHostname());
779              if (oldLocality == null) {
780                oldLocality = 0f;
781              }
782              locality[i] += oldLocality;
783            }
784            Float newLocality = regionLocality.get(newServer.getHostname());
785            if (newLocality == null) {
786              newLocality = 0f;
787            }
788            deltaLocality[i] += newLocality - oldLocality;
789            i++;
790          }
791        }
792      }
793      DecimalFormat df = new java.text.DecimalFormat("#.##");
794      for (int i = 0; i < deltaLocality.length; i++) {
795        System.out.print("\t\t Baseline locality for ");
796        if (i == 0) {
797          System.out.print("primary ");
798        } else if (i == 1) {
799          System.out.print("secondary ");
800        } else if (i == 2) {
801          System.out.print("tertiary ");
802        }
803        System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
804        System.out.print("\t\t Locality will change with the new plan: ");
805        System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + "%");
806      }
807      System.out.println("\t Baseline dispersion");
808      printDispersionScores(table, snapshot, regions.size(), null, true);
809      System.out.println("\t Projected dispersion");
810      printDispersionScores(table, snapshot, regions.size(), newPlan, true);
811    }
812  }
813
814  public void printDispersionScores(TableName table, SnapshotOfRegionAssignmentFromMeta snapshot,
815    int numRegions, FavoredNodesPlan newPlan, boolean simplePrint) {
816    if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
817      return;
818    }
819    AssignmentVerificationReport report = new AssignmentVerificationReport();
820    report.fillUpDispersion(table, snapshot, newPlan);
821    List<Float> dispersion = report.getDispersionInformation();
822    if (simplePrint) {
823      DecimalFormat df = new java.text.DecimalFormat("#.##");
824      System.out.println("\tAvg dispersion score: " + df.format(dispersion.get(0))
825        + " hosts;\tMax dispersion score: " + df.format(dispersion.get(1))
826        + " hosts;\tMin dispersion score: " + df.format(dispersion.get(2)) + " hosts;");
827    } else {
828      LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
829        + " ; The average dispersion score is " + dispersion.get(0));
830    }
831  }
832
833  public void printLocalityAndDispersionForCurrentPlan(
834    Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
835    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
836    FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
837    Set<TableName> tables = snapshot.getTableSet();
838    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
839    for (TableName table : tables) {
840      float[] locality = new float[3];
841      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
842        continue;
843      }
844      List<RegionInfo> regions = tableToRegionsMap.get(table);
845      for (RegionInfo currentRegion : regions) {
846        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
847        if (regionLocality == null) {
848          continue;
849        }
850        List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
851        if (servers != null) {
852          int i = 0;
853          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
854            ServerName server = servers.get(p.ordinal());
855            Float currentLocality = 0f;
856            if (servers != null) {
857              currentLocality = regionLocality.get(server.getHostname());
858              if (currentLocality == null) {
859                currentLocality = 0f;
860              }
861              locality[i] += currentLocality;
862            }
863            i++;
864          }
865        }
866      }
867      for (int i = 0; i < locality.length; i++) {
868        String copy = null;
869        if (i == 0) {
870          copy = "primary";
871        } else if (i == 1) {
872          copy = "secondary";
873        } else if (i == 2) {
874          copy = "tertiary";
875        }
876        float avgLocality = 100 * locality[i] / regions.size();
877        LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
878          + " ; The average locality for " + copy + " is " + avgLocality + " %");
879      }
880      printDispersionScores(table, snapshot, regions.size(), null, false);
881    }
882  }
883
884  /**
885   * @param favoredNodesStr The String of favored nodes
886   * @return the list of ServerName for the byte array of favored nodes.
887   */
888  public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
889    String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
890    if (favoredNodesArray == null) return null;
891
892    List<ServerName> serverList = new ArrayList<>();
893    for (String hostNameAndPort : favoredNodesArray) {
894      serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
895    }
896    return serverList;
897  }
898
899  public static void main(String args[]) throws IOException {
900    Options opt = new Options();
901    opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
902    opt.addOption("u", "update", false,
903      "update the assignments to hbase:meta and RegionServers together");
904    opt.addOption("n", "dry-run", false, "do not write assignments to META");
905    opt.addOption("v", "verify", false, "verify current assignments against META");
906    opt.addOption("p", "print", false, "print the current assignment plan in META");
907    opt.addOption("h", "help", false, "print usage");
908    opt.addOption("d", "verification-details", false, "print the details of verification report");
909
910    opt.addOption("zk", true, "to set the zookeeper quorum");
911    opt.addOption("fs", true, "to set HDFS");
912    opt.addOption("hbase_root", true, "to set hbase_root directory");
913
914    opt.addOption("overwrite", false, "overwrite the favored nodes for a single region,"
915      + "for example: -update -r regionName -f server1:port,server2:port,server3:port");
916    opt.addOption("r", true, "The region name that needs to be updated");
917    opt.addOption("f", true, "The new favored nodes");
918
919    opt.addOption("tables", true,
920      "The list of table names splitted by ',' ;" + "For example: -tables: t1,t2,...,tn");
921    opt.addOption("l", "locality", true, "enforce the maximum locality");
922    opt.addOption("m", "min-move", true, "enforce minimum assignment move");
923    opt.addOption("diff", false, "calculate difference between assignment plans");
924    opt.addOption("munkres", false, "use munkres to place secondaries and tertiaries");
925    opt.addOption("ld", "locality-dispersion", false,
926      "print locality and dispersion " + "information for current plan");
927    try {
928      CommandLine cmd = new GnuParser().parse(opt, args);
929      Configuration conf = HBaseConfiguration.create();
930
931      boolean enforceMinAssignmentMove = true;
932      boolean enforceLocality = true;
933      boolean verificationDetails = false;
934
935      // Read all the options
936      if (
937        (cmd.hasOption("l") && cmd.getOptionValue("l").equalsIgnoreCase("false"))
938          || (cmd.hasOption("locality") && cmd.getOptionValue("locality").equalsIgnoreCase("false"))
939      ) {
940        enforceLocality = false;
941      }
942
943      if (
944        (cmd.hasOption("m") && cmd.getOptionValue("m").equalsIgnoreCase("false"))
945          || (cmd.hasOption("min-move") && cmd.getOptionValue("min-move").equalsIgnoreCase("false"))
946      ) {
947        enforceMinAssignmentMove = false;
948      }
949
950      if (cmd.hasOption("zk")) {
951        conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
952        LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
953      }
954
955      if (cmd.hasOption("fs")) {
956        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
957        LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
958      }
959
960      if (cmd.hasOption("hbase_root")) {
961        conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
962        LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
963      }
964
965      // Create the region placement obj
966      try (RegionPlacementMaintainer rp =
967        new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
968
969        if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
970          verificationDetails = true;
971        }
972
973        if (cmd.hasOption("tables")) {
974          String tableNameListStr = cmd.getOptionValue("tables");
975          String[] tableNames = StringUtils.split(tableNameListStr, ",");
976          rp.setTargetTableName(tableNames);
977        }
978
979        if (cmd.hasOption("munkres")) {
980          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
981        }
982
983        // Read all the modes
984        if (cmd.hasOption("v") || cmd.hasOption("verify")) {
985          // Verify the region placement.
986          rp.verifyRegionPlacement(verificationDetails);
987        } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
988          // Generate the assignment plan only without updating the hbase:meta and RS
989          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
990          printAssignmentPlan(plan);
991        } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
992          // Generate the new assignment plan
993          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
994          // Print the new assignment plan
995          printAssignmentPlan(plan);
996          // Write the new assignment plan to META
997          rp.updateAssignmentPlanToMeta(plan);
998        } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
999          // Generate the new assignment plan
1000          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1001          // Print the new assignment plan
1002          printAssignmentPlan(plan);
1003          // Update the assignment to hbase:meta and Region Servers
1004          rp.updateAssignmentPlan(plan);
1005        } else if (cmd.hasOption("diff")) {
1006          FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
1007          Map<String, Map<String, Float>> locality =
1008            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1009          Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1010          rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1011          System.out.println("Do you want to update the assignment plan? [y/n]");
1012          Scanner s = new Scanner(System.in);
1013          String input = s.nextLine().trim();
1014          if (input.equals("y")) {
1015            System.out.println("Updating assignment plan...");
1016            rp.updateAssignmentPlan(newPlan);
1017          }
1018          s.close();
1019        } else if (cmd.hasOption("ld")) {
1020          Map<String, Map<String, Float>> locality =
1021            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1022          rp.printLocalityAndDispersionForCurrentPlan(locality);
1023        } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1024          FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1025          printAssignmentPlan(plan);
1026        } else if (cmd.hasOption("overwrite")) {
1027          if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1028            throw new IllegalArgumentException("Please specify: "
1029              + " -update -r regionName -f server1:port,server2:port,server3:port");
1030          }
1031
1032          String regionName = cmd.getOptionValue("r");
1033          String favoredNodesStr = cmd.getOptionValue("f");
1034          LOG.info("Going to update the region " + regionName + " with the new favored nodes "
1035            + favoredNodesStr);
1036          List<ServerName> favoredNodes = null;
1037          RegionInfo regionInfo =
1038            rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1039          if (regionInfo == null) {
1040            LOG.error("Cannot find the region " + regionName + " from the META");
1041          } else {
1042            try {
1043              favoredNodes = getFavoredNodeList(favoredNodesStr);
1044            } catch (IllegalArgumentException e) {
1045              LOG.error("Cannot parse the invalid favored nodes because " + e);
1046            }
1047            FavoredNodesPlan newPlan = new FavoredNodesPlan();
1048            newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1049            rp.updateAssignmentPlan(newPlan);
1050          }
1051        } else {
1052          printHelp(opt);
1053        }
1054      }
1055    } catch (ParseException e) {
1056      printHelp(opt);
1057    }
1058  }
1059}