View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.IOException;
23  import java.text.DecimalFormat;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Random;
30  import java.util.Scanner;
31  import java.util.Set;
32  import java.util.TreeMap;
33  
34  import org.apache.commons.cli.CommandLine;
35  import org.apache.commons.cli.GnuParser;
36  import org.apache.commons.cli.HelpFormatter;
37  import org.apache.commons.cli.Options;
38  import org.apache.commons.cli.ParseException;
39  import org.apache.commons.lang.StringUtils;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.HRegionInfo;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Admin;
51  import org.apache.hadoop.hbase.client.ClusterConnection;
52  import org.apache.hadoop.hbase.client.Connection;
53  import org.apache.hadoop.hbase.client.ConnectionFactory;
54  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
55  import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
56  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
57  import org.apache.hadoop.hbase.protobuf.RequestConverter;
58  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
59  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
60  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
61  import org.apache.hadoop.hbase.util.FSUtils;
62  import org.apache.hadoop.hbase.util.MunkresAssignment;
63  import org.apache.hadoop.hbase.util.Pair;
64  import org.apache.log4j.Level;
65  import org.apache.log4j.Logger;
66  
67  /**
68   * A tool that is used for manipulating and viewing favored nodes information
69   * for regions. Run with -h to get a list of the options
70   */
71  @InterfaceAudience.Private
72  // TODO: Remove? Unused. Partially implemented only.
73  public class RegionPlacementMaintainer {
74    private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class
75        .getName());
76    //The cost of a placement that should never be assigned.
77    private static final float MAX_COST = Float.POSITIVE_INFINITY;
78  
79    // The cost of a placement that is undesirable but acceptable.
80    private static final float AVOID_COST = 100000f;
81  
82    // The amount by which the cost of a placement is increased if it is the
83    // last slot of the server. This is done to more evenly distribute the slop
84    // amongst servers.
85    private static final float LAST_SLOT_COST_PENALTY = 0.5f;
86  
87    // The amount by which the cost of a primary placement is penalized if it is
88    // not the host currently serving the region. This is done to minimize moves.
89    private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
90  
91    private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
92  
93    private Configuration conf;
94    private final boolean enforceLocality;
95    private final boolean enforceMinAssignmentMove;
96    private RackManager rackManager;
97    private Set<TableName> targetTableSet;
98    private final Connection connection;
99  
100   public RegionPlacementMaintainer(Configuration conf) {
101     this(conf, true, true);
102   }
103 
104   public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
105       boolean enforceMinAssignmentMove) {
106     this.conf = conf;
107     this.enforceLocality = enforceLocality;
108     this.enforceMinAssignmentMove = enforceMinAssignmentMove;
109     this.targetTableSet = new HashSet<TableName>();
110     this.rackManager = new RackManager(conf);
111     try {
112       this.connection = ConnectionFactory.createConnection(this.conf);
113     } catch (IOException e) {
114       throw new RuntimeException(e);
115     }
116   }
117 
118   private static void printHelp(Options opt) {
119     new HelpFormatter().printHelp(
120         "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
121         "-diff>" +
122         " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
123         " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
124   }
125 
126   public void setTargetTableName(String[] tableNames) {
127     if (tableNames != null) {
128       for (String table : tableNames)
129         this.targetTableSet.add(TableName.valueOf(table));
130     }
131   }
132 
133   /**
134    * @return the new RegionAssignmentSnapshot
135    * @throws IOException
136    */
137   public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
138   throws IOException {
139     SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
140       new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
141     currentAssignmentShapshot.initialize();
142     return currentAssignmentShapshot;
143   }
144 
145   /**
146    * Verify the region placement is consistent with the assignment plan
147    * @param isDetailMode
148    * @return reports
149    * @throws IOException
150    */
151   public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
152       throws IOException {
153     System.out.println("Start to verify the region assignment and " +
154         "generate the verification report");
155     // Get the region assignment snapshot
156     SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
157 
158     // Get all the tables
159     Set<TableName> tables = snapshot.getTableSet();
160 
161     // Get the region locality map
162     Map<String, Map<String, Float>> regionLocalityMap = null;
163     if (this.enforceLocality == true) {
164       regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
165     }
166     List<AssignmentVerificationReport> reports = new ArrayList<AssignmentVerificationReport>();
167     // Iterate all the tables to fill up the verification report
168     for (TableName table : tables) {
169       if (!this.targetTableSet.isEmpty() &&
170           !this.targetTableSet.contains(table)) {
171         continue;
172       }
173       AssignmentVerificationReport report = new AssignmentVerificationReport();
174       report.fillUp(table, snapshot, regionLocalityMap);
175       report.print(isDetailMode);
176       reports.add(report);
177     }
178     return reports;
179   }
180 
181   /**
182    * Generate the assignment plan for the existing table
183    *
184    * @param tableName
185    * @param assignmentSnapshot
186    * @param regionLocalityMap
187    * @param plan
188    * @param munkresForSecondaryAndTertiary if set on true the assignment plan
189    * for the tertiary and secondary will be generated with Munkres algorithm,
190    * otherwise will be generated using placeSecondaryAndTertiaryRS
191    * @throws IOException
192    */
193   private void genAssignmentPlan(TableName tableName,
194       SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
195       Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
196       boolean munkresForSecondaryAndTertiary) throws IOException {
197       // Get the all the regions for the current table
198       List<HRegionInfo> regions =
199         assignmentSnapshot.getTableToRegionMap().get(tableName);
200       int numRegions = regions.size();
201 
202       // Get the current assignment map
203       Map<HRegionInfo, ServerName> currentAssignmentMap =
204         assignmentSnapshot.getRegionToRegionServerMap();
205 
206       // Get the all the region servers
207       List<ServerName> servers = new ArrayList<ServerName>();
208       try (Admin admin = this.connection.getAdmin()) {
209         servers.addAll(admin.getClusterStatus().getServers());
210       }
211 
212       LOG.info("Start to generate assignment plan for " + numRegions +
213           " regions from table " + tableName + " with " +
214           servers.size() + " region servers");
215 
216       int slotsPerServer = (int) Math.ceil((float) numRegions /
217           servers.size());
218       int regionSlots = slotsPerServer * servers.size();
219 
220       // Compute the primary, secondary and tertiary costs for each region/server
221       // pair. These costs are based only on node locality and rack locality, and
222       // will be modified later.
223       float[][] primaryCost = new float[numRegions][regionSlots];
224       float[][] secondaryCost = new float[numRegions][regionSlots];
225       float[][] tertiaryCost = new float[numRegions][regionSlots];
226 
227       if (this.enforceLocality && regionLocalityMap != null) {
228         // Transform the locality mapping into a 2D array, assuming that any
229         // unspecified locality value is 0.
230         float[][] localityPerServer = new float[numRegions][regionSlots];
231         for (int i = 0; i < numRegions; i++) {
232           Map<String, Float> serverLocalityMap =
233               regionLocalityMap.get(regions.get(i).getEncodedName());
234           if (serverLocalityMap == null) {
235             continue;
236           }
237           for (int j = 0; j < servers.size(); j++) {
238             String serverName = servers.get(j).getHostname();
239             if (serverName == null) {
240               continue;
241             }
242             Float locality = serverLocalityMap.get(serverName);
243             if (locality == null) {
244               continue;
245             }
246             for (int k = 0; k < slotsPerServer; k++) {
247               // If we can't find the locality of a region to a server, which occurs
248               // because locality is only reported for servers which have some
249               // blocks of a region local, then the locality for that pair is 0.
250               localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
251             }
252           }
253         }
254 
255         // Compute the total rack locality for each region in each rack. The total
256         // rack locality is the sum of the localities of a region on all servers in
257         // a rack.
258         Map<String, Map<HRegionInfo, Float>> rackRegionLocality =
259             new HashMap<String, Map<HRegionInfo, Float>>();
260         for (int i = 0; i < numRegions; i++) {
261           HRegionInfo region = regions.get(i);
262           for (int j = 0; j < regionSlots; j += slotsPerServer) {
263             String rack = rackManager.getRack(servers.get(j / slotsPerServer));
264             Map<HRegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
265             if (rackLocality == null) {
266               rackLocality = new HashMap<HRegionInfo, Float>();
267               rackRegionLocality.put(rack, rackLocality);
268             }
269             Float localityObj = rackLocality.get(region);
270             float locality = localityObj == null ? 0 : localityObj.floatValue();
271             locality += localityPerServer[i][j];
272             rackLocality.put(region, locality);
273           }
274         }
275         for (int i = 0; i < numRegions; i++) {
276           for (int j = 0; j < regionSlots; j++) {
277             String rack = rackManager.getRack(servers.get(j / slotsPerServer));
278             Float totalRackLocalityObj =
279                 rackRegionLocality.get(rack).get(regions.get(i));
280             float totalRackLocality = totalRackLocalityObj == null ?
281                 0 : totalRackLocalityObj.floatValue();
282 
283             // Primary cost aims to favor servers with high node locality and low
284             // rack locality, so that secondaries and tertiaries can be chosen for
285             // nodes with high rack locality. This might give primaries with
286             // slightly less locality at first compared to a cost which only
287             // considers the node locality, but should be better in the long run.
288             primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] -
289                 totalRackLocality);
290 
291             // Secondary cost aims to favor servers with high node locality and high
292             // rack locality since the tertiary will be chosen from the same rack as
293             // the secondary. This could be negative, but that is okay.
294             secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
295 
296             // Tertiary cost is only concerned with the node locality. It will later
297             // be restricted to only hosts on the same rack as the secondary.
298             tertiaryCost[i][j] = 1 - localityPerServer[i][j];
299           }
300         }
301       }
302 
303       if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
304         // We want to minimize the number of regions which move as the result of a
305         // new assignment. Therefore, slightly penalize any placement which is for
306         // a host that is not currently serving the region.
307         for (int i = 0; i < numRegions; i++) {
308           for (int j = 0; j < servers.size(); j++) {
309             ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
310             if (currentAddress != null &&
311                 !currentAddress.equals(servers.get(j))) {
312               for (int k = 0; k < slotsPerServer; k++) {
313                 primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
314               }
315             }
316           }
317         }
318       }
319 
320       // Artificially increase cost of last slot of each server to evenly
321       // distribute the slop, otherwise there will be a few servers with too few
322       // regions and many servers with the max number of regions.
323       for (int i = 0; i < numRegions; i++) {
324         for (int j = 0; j < regionSlots; j += slotsPerServer) {
325           primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
326           secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
327           tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
328         }
329       }
330 
331       RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions,
332           regionSlots);
333       primaryCost = randomizedMatrix.transform(primaryCost);
334       int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
335       primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
336 
337       // Modify the secondary and tertiary costs for each region/server pair to
338       // prevent a region from being assigned to the same rack for both primary
339       // and either one of secondary or tertiary.
340       for (int i = 0; i < numRegions; i++) {
341         int slot = primaryAssignment[i];
342         String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
343         for (int k = 0; k < servers.size(); k++) {
344           if (!rackManager.getRack(servers.get(k)).equals(rack)) {
345             continue;
346           }
347           if (k == slot / slotsPerServer) {
348             // Same node, do not place secondary or tertiary here ever.
349             for (int m = 0; m < slotsPerServer; m++) {
350               secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
351               tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
352             }
353           } else {
354             // Same rack, do not place secondary or tertiary here if possible.
355             for (int m = 0; m < slotsPerServer; m++) {
356               secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
357               tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
358             }
359           }
360         }
361       }
362       if (munkresForSecondaryAndTertiary) {
363         randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
364         secondaryCost = randomizedMatrix.transform(secondaryCost);
365         int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
366         secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
367 
368         // Modify the tertiary costs for each region/server pair to ensure that a
369         // region is assigned to a tertiary server on the same rack as its secondary
370         // server, but not the same server in that rack.
371         for (int i = 0; i < numRegions; i++) {
372           int slot = secondaryAssignment[i];
373           String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
374           for (int k = 0; k < servers.size(); k++) {
375             if (k == slot / slotsPerServer) {
376               // Same node, do not place tertiary here ever.
377               for (int m = 0; m < slotsPerServer; m++) {
378                 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
379               }
380             } else {
381               if (rackManager.getRack(servers.get(k)).equals(rack)) {
382                 continue;
383               }
384               // Different rack, do not place tertiary here if possible.
385               for (int m = 0; m < slotsPerServer; m++) {
386                 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
387               }
388             }
389           }
390         }
391 
392         randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
393         tertiaryCost = randomizedMatrix.transform(tertiaryCost);
394         int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
395         tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
396 
397         for (int i = 0; i < numRegions; i++) {
398           List<ServerName> favoredServers =
399             new ArrayList<ServerName>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
400           ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
401           favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
402               ServerName.NON_STARTCODE));
403 
404           s = servers.get(secondaryAssignment[i] / slotsPerServer);
405           favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
406               ServerName.NON_STARTCODE));
407 
408           s = servers.get(tertiaryAssignment[i] / slotsPerServer);
409           favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
410               ServerName.NON_STARTCODE));
411           // Update the assignment plan
412           plan.updateAssignmentPlan(regions.get(i), favoredServers);
413         }
414         LOG.info("Generated the assignment plan for " + numRegions +
415             " regions from table " + tableName + " with " +
416             servers.size() + " region servers");
417         LOG.info("Assignment plan for secondary and tertiary generated " +
418             "using MunkresAssignment");
419       } else {
420         Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<HRegionInfo, ServerName>();
421         for (int i = 0; i < numRegions; i++) {
422           primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
423         }
424         FavoredNodeAssignmentHelper favoredNodeHelper =
425             new FavoredNodeAssignmentHelper(servers, conf);
426         favoredNodeHelper.initialize();
427         Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap =
428             favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
429         for (int i = 0; i < numRegions; i++) {
430           List<ServerName> favoredServers =
431             new ArrayList<ServerName>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
432           HRegionInfo currentRegion = regions.get(i);
433           ServerName s = primaryRSMap.get(currentRegion);
434           favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
435               ServerName.NON_STARTCODE));
436 
437           ServerName[] secondaryAndTertiary =
438               secondaryAndTertiaryMap.get(currentRegion);
439           s = secondaryAndTertiary[0];
440           favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
441               ServerName.NON_STARTCODE));
442 
443           s = secondaryAndTertiary[1];
444           favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
445               ServerName.NON_STARTCODE));
446           // Update the assignment plan
447           plan.updateAssignmentPlan(regions.get(i), favoredServers);
448         }
449         LOG.info("Generated the assignment plan for " + numRegions +
450             " regions from table " + tableName + " with " +
451             servers.size() + " region servers");
452         LOG.info("Assignment plan for secondary and tertiary generated " +
453             "using placeSecondaryAndTertiaryWithRestrictions method");
454       }
455     }
456 
457   public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
458     // Get the current region assignment snapshot by scanning from the META
459     SnapshotOfRegionAssignmentFromMeta assignmentSnapshot =
460       this.getRegionAssignmentSnapshot();
461 
462     // Get the region locality map
463     Map<String, Map<String, Float>> regionLocalityMap = null;
464     if (this.enforceLocality) {
465       regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
466     }
467     // Initialize the assignment plan
468     FavoredNodesPlan plan = new FavoredNodesPlan();
469 
470     // Get the table to region mapping
471     Map<TableName, List<HRegionInfo>> tableToRegionMap =
472       assignmentSnapshot.getTableToRegionMap();
473     LOG.info("Start to generate the new assignment plan for the " +
474          + tableToRegionMap.keySet().size() + " tables" );
475     for (TableName table : tableToRegionMap.keySet()) {
476       try {
477         if (!this.targetTableSet.isEmpty() &&
478             !this.targetTableSet.contains(table)) {
479           continue;
480         }
481         // TODO: maybe run the placement in parallel for each table
482         genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
483             USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
484       } catch (Exception e) {
485         LOG.error("Get some exceptions for placing primary region server" +
486             "for table " + table + " because " + e);
487       }
488     }
489     LOG.info("Finish to generate the new assignment plan for the " +
490         + tableToRegionMap.keySet().size() + " tables" );
491     return plan;
492   }
493 
494   /**
495    * Some algorithms for solving the assignment problem may traverse workers or
496    * jobs in linear order which may result in skewing the assignments of the
497    * first jobs in the matrix toward the last workers in the matrix if the
498    * costs are uniform. To avoid this kind of clumping, we can randomize the
499    * rows and columns of the cost matrix in a reversible way, such that the
500    * solution to the assignment problem can be interpreted in terms of the
501    * original untransformed cost matrix. Rows and columns are transformed
502    * independently such that the elements contained in any row of the input
503    * matrix are the same as the elements in the corresponding output matrix,
504    * and each row has its elements transformed in the same way. Similarly for
505    * columns.
506    */
507   protected static class RandomizedMatrix {
508     private final int rows;
509     private final int cols;
510     private final int[] rowTransform;
511     private final int[] rowInverse;
512     private final int[] colTransform;
513     private final int[] colInverse;
514 
515     /**
516      * Create a randomization scheme for a matrix of a given size.
517      * @param rows the number of rows in the matrix
518      * @param cols the number of columns in the matrix
519      */
520     public RandomizedMatrix(int rows, int cols) {
521       this.rows = rows;
522       this.cols = cols;
523       Random random = new Random();
524       rowTransform = new int[rows];
525       rowInverse = new int[rows];
526       for (int i = 0; i < rows; i++) {
527         rowTransform[i] = i;
528       }
529       // Shuffle the row indices.
530       for (int i = rows - 1; i >= 0; i--) {
531         int r = random.nextInt(i + 1);
532         int temp = rowTransform[r];
533         rowTransform[r] = rowTransform[i];
534         rowTransform[i] = temp;
535       }
536       // Generate the inverse row indices.
537       for (int i = 0; i < rows; i++) {
538         rowInverse[rowTransform[i]] = i;
539       }
540 
541       colTransform = new int[cols];
542       colInverse = new int[cols];
543       for (int i = 0; i < cols; i++) {
544         colTransform[i] = i;
545       }
546       // Shuffle the column indices.
547       for (int i = cols - 1; i >= 0; i--) {
548         int r = random.nextInt(i + 1);
549         int temp = colTransform[r];
550         colTransform[r] = colTransform[i];
551         colTransform[i] = temp;
552       }
553       // Generate the inverse column indices.
554       for (int i = 0; i < cols; i++) {
555         colInverse[colTransform[i]] = i;
556       }
557     }
558 
559     /**
560      * Copy a given matrix into a new matrix, transforming each row index and
561      * each column index according to the randomization scheme that was created
562      * at construction time.
563      * @param matrix the cost matrix to transform
564      * @return a new matrix with row and column indices transformed
565      */
566     public float[][] transform(float[][] matrix) {
567       float[][] result = new float[rows][cols];
568       for (int i = 0; i < rows; i++) {
569         for (int j = 0; j < cols; j++) {
570           result[rowTransform[i]][colTransform[j]] = matrix[i][j];
571         }
572       }
573       return result;
574     }
575 
576     /**
577      * Copy a given matrix into a new matrix, transforming each row index and
578      * each column index according to the inverse of the randomization scheme
579      * that was created at construction time.
580      * @param matrix the cost matrix to be inverted
581      * @return a new matrix with row and column indices inverted
582      */
583     public float[][] invert(float[][] matrix) {
584       float[][] result = new float[rows][cols];
585       for (int i = 0; i < rows; i++) {
586         for (int j = 0; j < cols; j++) {
587           result[rowInverse[i]][colInverse[j]] = matrix[i][j];
588         }
589       }
590       return result;
591     }
592 
593     /**
594      * Given an array where each element {@code indices[i]} represents the
595      * randomized column index corresponding to randomized row index {@code i},
596      * create a new array with the corresponding inverted indices.
597      * @param indices an array of transformed indices to be inverted
598      * @return an array of inverted indices
599      */
600     public int[] invertIndices(int[] indices) {
601       int[] result = new int[indices.length];
602       for (int i = 0; i < indices.length; i++) {
603         result[rowInverse[i]] = colInverse[indices[i]];
604       }
605       return result;
606     }
607   }
608 
609   /**
610    * Print the assignment plan to the system output stream
611    * @param plan
612    */
613   public static void printAssignmentPlan(FavoredNodesPlan plan) {
614     if (plan == null) return;
615     LOG.info("========== Start to print the assignment plan ================");
616     // sort the map based on region info
617     Map<HRegionInfo, List<ServerName>> assignmentMap =
618       new TreeMap<HRegionInfo, List<ServerName>>(plan.getAssignmentMap());
619 
620     for (Map.Entry<HRegionInfo, List<ServerName>> entry : assignmentMap.entrySet()) {
621 
622       String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
623       String regionName = entry.getKey().getRegionNameAsString();
624       LOG.info("Region: " + regionName );
625       LOG.info("Its favored nodes: " + serverList);
626     }
627     LOG.info("========== Finish to print the assignment plan ================");
628   }
629 
630   /**
631    * Update the assignment plan into hbase:meta
632    * @param plan the assignments plan to be updated into hbase:meta
633    * @throws IOException if cannot update assignment plan in hbase:meta
634    */
635   public void updateAssignmentPlanToMeta(FavoredNodesPlan plan)
636   throws IOException {
637     try {
638       LOG.info("Start to update the hbase:meta with the new assignment plan");
639       Map<HRegionInfo, List<ServerName>> assignmentMap =
640         plan.getAssignmentMap();
641       FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(assignmentMap, conf);
642       LOG.info("Updated the hbase:meta with the new assignment plan");
643     } catch (Exception e) {
644       LOG.error("Failed to update hbase:meta with the new assignment" +
645           "plan because " + e.getMessage());
646     }
647   }
648 
649   /**
650    * Update the assignment plan to all the region servers
651    * @param plan
652    * @throws IOException
653    */
654   private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan)
655   throws IOException{
656     LOG.info("Start to update the region servers with the new assignment plan");
657     // Get the region to region server map
658     Map<ServerName, List<HRegionInfo>> currentAssignment =
659       this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
660 
661     // track of the failed and succeeded updates
662     int succeededNum = 0;
663     Map<ServerName, Exception> failedUpdateMap =
664       new HashMap<ServerName, Exception>();
665 
666     for (Map.Entry<ServerName, List<HRegionInfo>> entry :
667       currentAssignment.entrySet()) {
668       List<Pair<HRegionInfo, List<ServerName>>> regionUpdateInfos =
669           new ArrayList<Pair<HRegionInfo, List<ServerName>>>();
670       try {
671         // Keep track of the favored updates for the current region server
672         FavoredNodesPlan singleServerPlan = null;
673         // Find out all the updates for the current region server
674         for (HRegionInfo region : entry.getValue()) {
675           List<ServerName> favoredServerList = plan.getFavoredNodes(region);
676           if (favoredServerList != null &&
677               favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
678             // Create the single server plan if necessary
679             if (singleServerPlan == null) {
680               singleServerPlan = new FavoredNodesPlan();
681             }
682             // Update the single server update
683             singleServerPlan.updateAssignmentPlan(region, favoredServerList);
684             regionUpdateInfos.add(
685               new Pair<HRegionInfo, List<ServerName>>(region, favoredServerList));
686           }
687         }
688         if (singleServerPlan != null) {
689           // Update the current region server with its updated favored nodes
690           BlockingInterface currentRegionServer =
691             ((ClusterConnection)this.connection).getAdmin(entry.getKey());
692           UpdateFavoredNodesRequest request =
693               RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
694 
695           UpdateFavoredNodesResponse updateFavoredNodesResponse =
696               currentRegionServer.updateFavoredNodes(null, request);
697           LOG.info("Region server " +
698               ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() +
699               " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
700               singleServerPlan.getAssignmentMap().size() +
701               " regions with the assignment plan");
702           succeededNum ++;
703         }
704       } catch (Exception e) {
705         failedUpdateMap.put(entry.getKey(), e);
706       }
707     }
708     // log the succeeded updates
709     LOG.info("Updated " + succeededNum + " region servers with " +
710             "the new assignment plan");
711 
712     // log the failed updates
713     int failedNum = failedUpdateMap.size();
714     if (failedNum != 0) {
715       LOG.error("Failed to update the following + " + failedNum +
716           " region servers with its corresponding favored nodes");
717       for (Map.Entry<ServerName, Exception> entry :
718         failedUpdateMap.entrySet() ) {
719         LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
720             " because of " + entry.getValue().getMessage());
721       }
722     }
723   }
724 
725   public void updateAssignmentPlan(FavoredNodesPlan plan)
726       throws IOException {
727     LOG.info("Start to update the new assignment plan for the hbase:meta table and" +
728         " the region servers");
729     // Update the new assignment plan to META
730     updateAssignmentPlanToMeta(plan);
731     // Update the new assignment plan to Region Servers
732     updateAssignmentPlanToRegionServers(plan);
733     LOG.info("Finish to update the new assignment plan for the hbase:meta table and" +
734         " the region servers");
735   }
736 
737   /**
738    * Return how many regions will move per table since their primary RS will
739    * change
740    *
741    * @param newPlan - new AssignmentPlan
742    * @return how many primaries will move per table
743    */
744   public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan)
745       throws IOException {
746     Map<TableName, Integer> movesPerTable = new HashMap<TableName, Integer>();
747     SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
748     Map<TableName, List<HRegionInfo>> tableToRegions = snapshot
749         .getTableToRegionMap();
750     FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
751     Set<TableName> tables = snapshot.getTableSet();
752     for (TableName table : tables) {
753       int movedPrimaries = 0;
754       if (!this.targetTableSet.isEmpty()
755           && !this.targetTableSet.contains(table)) {
756         continue;
757       }
758       List<HRegionInfo> regions = tableToRegions.get(table);
759       for (HRegionInfo region : regions) {
760         List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
761         List<ServerName> newServers = newPlan.getFavoredNodes(region);
762         if (oldServers != null && newServers != null) {
763           ServerName oldPrimary = oldServers.get(0);
764           ServerName newPrimary = newServers.get(0);
765           if (oldPrimary.compareTo(newPrimary) != 0) {
766             movedPrimaries++;
767           }
768         }
769       }
770       movesPerTable.put(table, movedPrimaries);
771     }
772     return movesPerTable;
773   }
774 
775   /**
776    * Compares two plans and check whether the locality dropped or increased
777    * (prints the information as a string) also prints the baseline locality
778    *
779    * @param movesPerTable - how many primary regions will move per table
780    * @param regionLocalityMap - locality map from FS
781    * @param newPlan - new assignment plan
782    * @throws IOException
783    */
784   public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
785       Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
786           throws IOException {
787     // localities for primary, secondary and tertiary
788     SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
789     FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
790     Set<TableName> tables = snapshot.getTableSet();
791     Map<TableName, List<HRegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
792     for (TableName table : tables) {
793       float[] deltaLocality = new float[3];
794       float[] locality = new float[3];
795       if (!this.targetTableSet.isEmpty()
796           && !this.targetTableSet.contains(table)) {
797         continue;
798       }
799       List<HRegionInfo> regions = tableToRegionsMap.get(table);
800       System.out.println("==================================================");
801       System.out.println("Assignment Plan Projection Report For Table: " + table);
802       System.out.println("\t Total regions: " + regions.size());
803       System.out.println("\t" + movesPerTable.get(table)
804           + " primaries will move due to their primary has changed");
805       for (HRegionInfo currentRegion : regions) {
806         Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
807             .getEncodedName());
808         if (regionLocality == null) {
809           continue;
810         }
811         List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
812         List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
813         if (newServers != null && oldServers != null) {
814           int i=0;
815           for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
816             ServerName newServer = newServers.get(p.ordinal());
817             ServerName oldServer = oldServers.get(p.ordinal());
818             Float oldLocality = 0f;
819             if (oldServers != null) {
820               oldLocality = regionLocality.get(oldServer.getHostname());
821               if (oldLocality == null) {
822                 oldLocality = 0f;
823               }
824               locality[i] += oldLocality;
825             }
826             Float newLocality = regionLocality.get(newServer.getHostname());
827             if (newLocality == null) {
828               newLocality = 0f;
829             }
830             deltaLocality[i] += newLocality - oldLocality;
831             i++;
832           }
833         }
834       }
835       DecimalFormat df = new java.text.DecimalFormat( "#.##");
836       for (int i = 0; i < deltaLocality.length; i++) {
837         System.out.print("\t\t Baseline locality for ");
838         if (i == 0) {
839           System.out.print("primary ");
840         } else if (i == 1) {
841           System.out.print("secondary ");
842         } else if (i == 2) {
843           System.out.print("tertiary ");
844         }
845         System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
846         System.out.print("\t\t Locality will change with the new plan: ");
847         System.out.println(df.format(100 * deltaLocality[i] / regions.size())
848             + "%");
849       }
850       System.out.println("\t Baseline dispersion");
851       printDispersionScores(table, snapshot, regions.size(), null, true);
852       System.out.println("\t Projected dispersion");
853       printDispersionScores(table, snapshot, regions.size(), newPlan, true);
854     }
855   }
856 
857   public void printDispersionScores(TableName table,
858       SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan,
859       boolean simplePrint) {
860     if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
861       return;
862     }
863     AssignmentVerificationReport report = new AssignmentVerificationReport();
864     report.fillUpDispersion(table, snapshot, newPlan);
865     List<Float> dispersion = report.getDispersionInformation();
866     if (simplePrint) {
867       DecimalFormat df = new java.text.DecimalFormat("#.##");
868       System.out.println("\tAvg dispersion score: "
869           + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: "
870           + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: "
871           + df.format(dispersion.get(2)) + " hosts;");
872     } else {
873       LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
874           + " ; The average dispersion score is " + dispersion.get(0));
875     }
876   }
877 
878   public void printLocalityAndDispersionForCurrentPlan(
879       Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
880     SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
881     FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
882     Set<TableName> tables = snapshot.getTableSet();
883     Map<TableName, List<HRegionInfo>> tableToRegionsMap = snapshot
884         .getTableToRegionMap();
885     for (TableName table : tables) {
886       float[] locality = new float[3];
887       if (!this.targetTableSet.isEmpty()
888           && !this.targetTableSet.contains(table)) {
889         continue;
890       }
891       List<HRegionInfo> regions = tableToRegionsMap.get(table);
892       for (HRegionInfo currentRegion : regions) {
893         Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
894             .getEncodedName());
895         if (regionLocality == null) {
896           continue;
897         }
898         List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
899         if (servers != null) {
900           int i = 0;
901           for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
902             ServerName server = servers.get(p.ordinal());
903             Float currentLocality = 0f;
904             if (servers != null) {
905               currentLocality = regionLocality.get(server.getHostname());
906               if (currentLocality == null) {
907                 currentLocality = 0f;
908               }
909               locality[i] += currentLocality;
910             }
911             i++;
912           }
913         }
914       }
915       for (int i = 0; i < locality.length; i++) {
916         String copy =  null;
917         if (i == 0) {
918           copy = "primary";
919         } else if (i == 1) {
920           copy = "secondary";
921         } else if (i == 2) {
922           copy = "tertiary" ;
923         }
924         float avgLocality = 100 * locality[i] / regions.size();
925         LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
926             + " ; The average locality for " + copy+ " is " + avgLocality + " %");
927       }
928       printDispersionScores(table, snapshot, regions.size(), null, false);
929     }
930   }
931 
932   /**
933    * @param favoredNodesStr The String of favored nodes
934    * @return the list of ServerName for the byte array of favored nodes.
935    */
936   public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
937     String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
938     if (favoredNodesArray == null)
939       return null;
940 
941     List<ServerName> serverList = new ArrayList<ServerName>();
942     for (String hostNameAndPort : favoredNodesArray) {
943       serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
944     }
945     return serverList;
946   }
947 
948   public static void main(String args[]) throws IOException {
949     Options opt = new Options();
950     opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
951     opt.addOption("u", "update", false,
952         "update the assignments to hbase:meta and RegionServers together");
953     opt.addOption("n", "dry-run", false, "do not write assignments to META");
954     opt.addOption("v", "verify", false, "verify current assignments against META");
955     opt.addOption("p", "print", false, "print the current assignment plan in META");
956     opt.addOption("h", "help", false, "print usage");
957     opt.addOption("d", "verification-details", false,
958         "print the details of verification report");
959 
960     opt.addOption("zk", true, "to set the zookeeper quorum");
961     opt.addOption("fs", true, "to set HDFS");
962     opt.addOption("hbase_root", true, "to set hbase_root directory");
963 
964     opt.addOption("overwrite", false,
965         "overwrite the favored nodes for a single region," +
966         "for example: -update -r regionName -f server1:port,server2:port,server3:port");
967     opt.addOption("r", true, "The region name that needs to be updated");
968     opt.addOption("f", true, "The new favored nodes");
969 
970     opt.addOption("tables", true,
971         "The list of table names splitted by ',' ;" +
972         "For example: -tables: t1,t2,...,tn");
973     opt.addOption("l", "locality", true, "enforce the maxium locality");
974     opt.addOption("m", "min-move", true, "enforce minium assignment move");
975     opt.addOption("diff", false, "calculate difference between assignment plans");
976     opt.addOption("munkres", false,
977         "use munkres to place secondaries and tertiaries");
978     opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
979         "information for current plan");
980     try {
981       // Set the log4j
982       Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
983       Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.ERROR);
984       Logger.getLogger("org.apache.hadoop.hbase.master.RegionPlacementMaintainer")
985       .setLevel(Level.INFO);
986 
987       CommandLine cmd = new GnuParser().parse(opt, args);
988       Configuration conf = HBaseConfiguration.create();
989 
990       boolean enforceMinAssignmentMove = true;
991       boolean enforceLocality = true;
992       boolean verificationDetails = false;
993 
994       // Read all the options
995       if ((cmd.hasOption("l") &&
996           cmd.getOptionValue("l").equalsIgnoreCase("false")) ||
997           (cmd.hasOption("locality") &&
998               cmd.getOptionValue("locality").equalsIgnoreCase("false"))) {
999         enforceLocality = false;
1000       }
1001 
1002       if ((cmd.hasOption("m") &&
1003           cmd.getOptionValue("m").equalsIgnoreCase("false")) ||
1004           (cmd.hasOption("min-move") &&
1005               cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) {
1006         enforceMinAssignmentMove = false;
1007       }
1008 
1009       if (cmd.hasOption("zk")) {
1010         conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
1011         LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
1012       }
1013 
1014       if (cmd.hasOption("fs")) {
1015         conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
1016         LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
1017       }
1018 
1019       if (cmd.hasOption("hbase_root")) {
1020         conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
1021         LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
1022       }
1023 
1024       // Create the region placement obj
1025       RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality,
1026           enforceMinAssignmentMove);
1027 
1028       if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
1029         verificationDetails = true;
1030       }
1031 
1032       if (cmd.hasOption("tables")) {
1033         String tableNameListStr = cmd.getOptionValue("tables");
1034         String[] tableNames = StringUtils.split(tableNameListStr, ",");
1035         rp.setTargetTableName(tableNames);
1036       }
1037 
1038       if (cmd.hasOption("munkres")) {
1039         USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
1040       }
1041 
1042       // Read all the modes
1043       if (cmd.hasOption("v") || cmd.hasOption("verify")) {
1044         // Verify the region placement.
1045         rp.verifyRegionPlacement(verificationDetails);
1046       } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
1047         // Generate the assignment plan only without updating the hbase:meta and RS
1048         FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1049         printAssignmentPlan(plan);
1050       } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
1051         // Generate the new assignment plan
1052         FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1053         // Print the new assignment plan
1054         printAssignmentPlan(plan);
1055         // Write the new assignment plan to META
1056         rp.updateAssignmentPlanToMeta(plan);
1057       } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
1058         // Generate the new assignment plan
1059         FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1060         // Print the new assignment plan
1061         printAssignmentPlan(plan);
1062         // Update the assignment to hbase:meta and Region Servers
1063         rp.updateAssignmentPlan(plan);
1064       } else if (cmd.hasOption("diff")) {
1065         FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
1066         Map<String, Map<String, Float>> locality = FSUtils
1067             .getRegionDegreeLocalityMappingFromFS(conf);
1068         Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1069         rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1070         System.out.println("Do you want to update the assignment plan? [y/n]");
1071         Scanner s = new Scanner(System.in);
1072         String input = s.nextLine().trim();
1073         if (input.equals("y")) {
1074           System.out.println("Updating assignment plan...");
1075           rp.updateAssignmentPlan(newPlan);
1076         }
1077         s.close();
1078       } else if (cmd.hasOption("ld")) {
1079         Map<String, Map<String, Float>> locality = FSUtils
1080             .getRegionDegreeLocalityMappingFromFS(conf);
1081         rp.printLocalityAndDispersionForCurrentPlan(locality);
1082       } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1083         FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1084         printAssignmentPlan(plan);
1085       } else if (cmd.hasOption("overwrite")) {
1086         if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1087           throw new IllegalArgumentException("Please specify: " +
1088               " -update -r regionName -f server1:port,server2:port,server3:port");
1089         }
1090 
1091         String regionName = cmd.getOptionValue("r");
1092         String favoredNodesStr = cmd.getOptionValue("f");
1093         LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
1094             favoredNodesStr);
1095         List<ServerName> favoredNodes = null;
1096         HRegionInfo regionInfo =
1097             rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1098         if (regionInfo == null) {
1099           LOG.error("Cannot find the region " + regionName + " from the META");
1100         } else {
1101           try {
1102             favoredNodes = getFavoredNodeList(favoredNodesStr);
1103           } catch (IllegalArgumentException e) {
1104             LOG.error("Cannot parse the invalid favored nodes because " + e);
1105           }
1106           FavoredNodesPlan newPlan = new FavoredNodesPlan();
1107           newPlan.updateAssignmentPlan(regionInfo, favoredNodes);
1108           rp.updateAssignmentPlan(newPlan);
1109         }
1110       } else {
1111         printHelp(opt);
1112       }
1113     } catch (ParseException e) {
1114       printHelp(opt);
1115     }
1116   }
1117 }