View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.math.BigInteger;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.Comparator;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Set;
30  import java.util.TreeMap;
31  
32  import org.apache.commons.cli.CommandLine;
33  import org.apache.commons.cli.GnuParser;
34  import org.apache.commons.cli.HelpFormatter;
35  import org.apache.commons.cli.OptionBuilder;
36  import org.apache.commons.cli.Options;
37  import org.apache.commons.cli.ParseException;
38  import org.apache.commons.lang.ArrayUtils;
39  import org.apache.commons.lang.StringUtils;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataInputStream;
44  import org.apache.hadoop.fs.FSDataOutputStream;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.ClusterStatus;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HRegionLocation;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.MetaTableAccessor;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.classification.InterfaceAudience;
57  import org.apache.hadoop.hbase.client.Admin;
58  import org.apache.hadoop.hbase.client.ClusterConnection;
59  import org.apache.hadoop.hbase.client.Connection;
60  import org.apache.hadoop.hbase.client.ConnectionFactory;
61  import org.apache.hadoop.hbase.client.NoServerForRegionException;
62  import org.apache.hadoop.hbase.client.RegionLocator;
63  import org.apache.hadoop.hbase.client.Table;
64  import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
65  
66  import com.google.common.base.Preconditions;
67  import com.google.common.collect.Lists;
68  import com.google.common.collect.Maps;
69  import com.google.common.collect.Sets;
70  
71  /**
72   * The {@link RegionSplitter} class provides several utilities to help in the
73   * administration lifecycle for developers who choose to manually split regions
74   * instead of having HBase handle that automatically. The most useful utilities
75   * are:
76   * <p>
77   * <ul>
78   * <li>Create a table with a specified number of pre-split regions
79   * <li>Execute a rolling split of all regions on an existing table
80   * </ul>
81   * <p>
82   * Both operations can be safely done on a live server.
83   * <p>
84   * <b>Question:</b> How do I turn off automatic splitting? <br>
85   * <b>Answer:</b> Automatic splitting is determined by the configuration value
86   * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this
87   * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
88   * is 100GB, which would result in &gt; 1hr major compactions if reached.
89   * <p>
90   * <b>Question:</b> Why did the original authors decide to manually split? <br>
91   * <b>Answer:</b> Specific workload characteristics of our use case allowed us
92   * to benefit from a manual split system.
93   * <p>
94   * <ul>
95   * <li>Data (~1k) that would grow instead of being replaced
96   * <li>Data growth was roughly uniform across all regions
97   * <li>OLTP workload. Data loss is a big deal.
98   * </ul>
99   * <p>
100  * <b>Question:</b> Why is manual splitting good for this workload? <br>
101  * <b>Answer:</b> Although automated splitting is not a bad option, there are
102  * benefits to manual splitting.
103  * <p>
104  * <ul>
105  * <li>With growing amounts of data, splits will continually be needed. Since
106  * you always know exactly what regions you have, long-term debugging and
107  * profiling is much easier with manual splits. It is hard to trace the logs to
108  * understand region level problems if it keeps splitting and getting renamed.
109  * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
110  * WAL or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
111  * you notice it a day or so later, you can be assured that the regions
112  * specified in these files are the same as the current regions and you have
113  * less headaches trying to restore/replay your data.
114  * <li>You can finely tune your compaction algorithm. With roughly uniform data
115  * growth, it's easy to cause split / compaction storms as the regions all
116  * roughly hit the same data size at the same time. With manual splits, you can
117  * let staggered, time-based major compactions spread out your network IO load.
118  * </ul>
119  * <p>
120  * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
121  * <b>Answer:</b> Mileage will vary depending upon your application.
122  * <p>
123  * The short answer for our application is that we started with 10 pre-split
124  * regions / server and watched our data growth over time. It's better to err on
125  * the side of too little regions and rolling split later.
126  * <p>
127  * The more complicated answer is that this depends upon the largest storefile
128  * in your region. With a growing data size, this will get larger over time. You
129  * want the largest region to be just big enough that the
130  * {@link org.apache.hadoop.hbase.regionserver.HStore} compact
131  * selection algorithm only compacts it due to a timed major. If you don't, your
132  * cluster can be prone to compaction storms as the algorithm decides to run
133  * major compactions on a large series of regions all at once. Note that
134  * compaction storms are due to the uniform data growth, not the manual split
135  * decision.
136  * <p>
137  * If you pre-split your regions too thin, you can increase the major compaction
138  * interval by configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size
139  * grows too large, use this script to perform a network IO safe rolling split
140  * of all regions.
141  */
142 @InterfaceAudience.Private
143 public class RegionSplitter {
144   private static final Log LOG = LogFactory.getLog(RegionSplitter.class);
145 
146   /**
147    * A generic interface for the RegionSplitter code to use for all it's
148    * functionality. Note that the original authors of this code use
149    * {@link HexStringSplit} to partition their table and set it as default, but
150    * provided this for your custom algorithm. To use, create a new derived class
151    * from this interface and call {@link RegionSplitter#createPresplitTable} or
152    * RegionSplitter#rollingSplit(TableName, SplitAlgorithm, Configuration) with the
153    * argument splitClassName giving the name of your class.
154    */
155   public interface SplitAlgorithm {
156     /**
157      * Split a pre-existing region into 2 regions.
158      *
159      * @param start
160      *          first row (inclusive)
161      * @param end
162      *          last row (exclusive)
163      * @return the split row to use
164      */
165     byte[] split(byte[] start, byte[] end);
166 
167     /**
168      * Split an entire table.
169      *
170      * @param numRegions
171      *          number of regions to split the table into
172      *
173      * @throws RuntimeException
174      *           user input is validated at this time. may throw a runtime
175      *           exception in response to a parse failure
176      * @return array of split keys for the initial regions of the table. The
177      *         length of the returned array should be numRegions-1.
178      */
179     byte[][] split(int numRegions);
180 
181     /**
182      * In HBase, the first row is represented by an empty byte array. This might
183      * cause problems with your split algorithm or row printing. All your APIs
184      * will be passed firstRow() instead of empty array.
185      *
186      * @return your representation of your first row
187      */
188     byte[] firstRow();
189 
190     /**
191      * In HBase, the last row is represented by an empty byte array. This might
192      * cause problems with your split algorithm or row printing. All your APIs
193      * will be passed firstRow() instead of empty array.
194      *
195      * @return your representation of your last row
196      */
197     byte[] lastRow();
198 
199     /**
200      * In HBase, the last row is represented by an empty byte array. Set this
201      * value to help the split code understand how to evenly divide the first
202      * region.
203      *
204      * @param userInput
205      *          raw user input (may throw RuntimeException on parse failure)
206      */
207     void setFirstRow(String userInput);
208 
209     /**
210      * In HBase, the last row is represented by an empty byte array. Set this
211      * value to help the split code understand how to evenly divide the last
212      * region. Note that this last row is inclusive for all rows sharing the
213      * same prefix.
214      *
215      * @param userInput
216      *          raw user input (may throw RuntimeException on parse failure)
217      */
218     void setLastRow(String userInput);
219 
220     /**
221      * @param input
222      *          user or file input for row
223      * @return byte array representation of this row for HBase
224      */
225     byte[] strToRow(String input);
226 
227     /**
228      * @param row
229      *          byte array representing a row in HBase
230      * @return String to use for debug &amp; file printing
231      */
232     String rowToStr(byte[] row);
233 
234     /**
235      * @return the separator character to use when storing / printing the row
236      */
237     String separator();
238 
239     /**
240      * Set the first row
241      * @param userInput byte array of the row key.
242      */
243     void setFirstRow(byte[] userInput);
244 
245     /**
246      * Set the last row
247      * @param userInput byte array of the row key.
248      */
249     void setLastRow(byte[] userInput);
250   }
251 
252   /**
253    * The main function for the RegionSplitter application. Common uses:
254    * <p>
255    * <ul>
256    * <li>create a table named 'myTable' with 60 pre-split regions containing 2
257    * column families 'test' &amp; 'rs', assuming the keys are hex-encoded ASCII:
258    * <ul>
259    * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs
260    * myTable HexStringSplit
261    * </ul>
262    * <li>perform a rolling split of 'myTable' (i.e. 60 =&gt; 120 regions), # 2
263    * outstanding splits at a time, assuming keys are uniformly distributed
264    * bytes:
265    * <ul>
266    * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable
267    * UniformSplit
268    * </ul>
269    * </ul>
270    *
271    * There are two SplitAlgorithms built into RegionSplitter, HexStringSplit
272    * and UniformSplit. These are different strategies for choosing region
273    * boundaries. See their source code for details.
274    *
275    * @param args
276    *          Usage: RegionSplitter &lt;TABLE&gt; &lt;SPLITALGORITHM&gt;
277    *          &lt;-c &lt;# regions&gt; -f &lt;family:family:...&gt; | -r
278    *          [-o &lt;# outstanding splits&gt;]&gt;
279    *          [-D &lt;conf.param=value&gt;]
280    * @throws IOException
281    *           HBase IO problem
282    * @throws InterruptedException
283    *           user requested exit
284    * @throws ParseException
285    *           problem parsing user input
286    */
287   @SuppressWarnings("static-access")
288   public static void main(String[] args) throws IOException,
289       InterruptedException, ParseException {
290     Configuration conf = HBaseConfiguration.create();
291 
292     // parse user input
293     Options opt = new Options();
294     opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
295         .withDescription("Override HBase Configuration Settings").create("D"));
296     opt.addOption(OptionBuilder.withArgName("region count").hasArg()
297         .withDescription(
298             "Create a new table with a pre-split number of regions")
299         .create("c"));
300     opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
301         .withDescription(
302             "Column Families to create with new table.  Required with -c")
303         .create("f"));
304     opt.addOption("h", false, "Print this usage help");
305     opt.addOption("r", false, "Perform a rolling split of an existing region");
306     opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
307         "Max outstanding splits that have unfinished major compactions")
308         .create("o"));
309     opt.addOption(null, "firstrow", true,
310         "First Row in Table for Split Algorithm");
311     opt.addOption(null, "lastrow", true,
312         "Last Row in Table for Split Algorithm");
313     opt.addOption(null, "risky", false,
314         "Skip verification steps to complete quickly."
315             + "STRONGLY DISCOURAGED for production systems.  ");
316     CommandLine cmd = new GnuParser().parse(opt, args);
317 
318     if (cmd.hasOption("D")) {
319       for (String confOpt : cmd.getOptionValues("D")) {
320         String[] kv = confOpt.split("=", 2);
321         if (kv.length == 2) {
322           conf.set(kv[0], kv[1]);
323           LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
324         } else {
325           throw new ParseException("-D option format invalid: " + confOpt);
326         }
327       }
328     }
329 
330     if (cmd.hasOption("risky")) {
331       conf.setBoolean("split.verify", false);
332     }
333 
334     boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
335     boolean rollingSplit = cmd.hasOption("r");
336     boolean oneOperOnly = createTable ^ rollingSplit;
337 
338     if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
339       new HelpFormatter().printHelp("RegionSplitter <TABLE> <SPLITALGORITHM>\n"+
340           "SPLITALGORITHM is a java class name of a class implementing " +
341           "SplitAlgorithm, or one of the special strings HexStringSplit " +
342           "or UniformSplit, which are built-in split algorithms. " +
343           "HexStringSplit treats keys as hexadecimal ASCII, and " +
344           "UniformSplit treats keys as arbitrary bytes.", opt);
345       return;
346     }
347     TableName tableName = TableName.valueOf(cmd.getArgs()[0]);
348     String splitClass = cmd.getArgs()[1];
349     SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass);
350 
351     if (cmd.hasOption("firstrow")) {
352       splitAlgo.setFirstRow(cmd.getOptionValue("firstrow"));
353     }
354     if (cmd.hasOption("lastrow")) {
355       splitAlgo.setLastRow(cmd.getOptionValue("lastrow"));
356     }
357 
358     if (createTable) {
359       conf.set("split.count", cmd.getOptionValue("c"));
360       createPresplitTable(tableName, splitAlgo, cmd.getOptionValue("f").split(":"), conf);
361     }
362 
363     if (rollingSplit) {
364       if (cmd.hasOption("o")) {
365         conf.set("split.outstanding", cmd.getOptionValue("o"));
366       }
367       rollingSplit(tableName, splitAlgo, conf);
368     }
369   }
370 
371   static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo,
372           String[] columnFamilies, Configuration conf)
373   throws IOException, InterruptedException {
374     final int splitCount = conf.getInt("split.count", 0);
375     Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
376 
377     Preconditions.checkArgument(columnFamilies.length > 0,
378         "Must specify at least one column family. ");
379     LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
380         + " column families.  Presplitting to " + splitCount + " regions");
381 
382     HTableDescriptor desc = new HTableDescriptor(tableName);
383     for (String cf : columnFamilies) {
384       desc.addFamily(new HColumnDescriptor(Bytes.toBytes(cf)));
385     }
386     try (Connection connection = ConnectionFactory.createConnection(conf)) {
387       Admin admin = connection.getAdmin();
388       try {
389         Preconditions.checkArgument(!admin.tableExists(tableName),
390           "Table already exists: " + tableName);
391         admin.createTable(desc, splitAlgo.split(splitCount));
392       } finally {
393         admin.close();
394       }
395       LOG.debug("Table created!  Waiting for regions to show online in META...");
396       if (!conf.getBoolean("split.verify", true)) {
397         // NOTE: createTable is synchronous on the table, but not on the regions
398         int onlineRegions = 0;
399         while (onlineRegions < splitCount) {
400           onlineRegions = MetaTableAccessor.getRegionCount(connection, tableName);
401           LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
402           if (onlineRegions < splitCount) {
403             Thread.sleep(10 * 1000); // sleep
404           }
405         }
406       }
407       LOG.debug("Finished creating table with " + splitCount + " regions");
408     }
409   }
410 
411   /**
412    * Alternative getCurrentNrHRS which is no longer available.
413    * @param connection
414    * @return Rough count of regionservers out on cluster.
415    * @throws IOException 
416    */
417   private static int getRegionServerCount(final Connection connection) throws IOException {
418     try (Admin admin = connection.getAdmin()) {
419       ClusterStatus status = admin.getClusterStatus();
420       Collection<ServerName> servers = status.getServers();
421       return servers == null || servers.isEmpty()? 0: servers.size();
422     }
423   }
424 
425   private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
426     FSDataInputStream tmpIn = fs.open(path);
427     try {
428       byte [] rawData = new byte[tmpIn.available()];
429       tmpIn.readFully(rawData);
430       return rawData;
431     } finally {
432       tmpIn.close();
433     }
434   }
435 
436   static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
437   throws IOException, InterruptedException {
438     final int minOS = conf.getInt("split.outstanding", 2);
439     try (Connection connection = ConnectionFactory.createConnection(conf)) {
440       // Max outstanding splits. default == 50% of servers
441       final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
442 
443       Path hbDir = FSUtils.getRootDir(conf);
444       Path tableDir = FSUtils.getTableDir(hbDir, tableName);
445       Path splitFile = new Path(tableDir, "_balancedSplit");
446       FileSystem fs = FileSystem.get(conf);
447 
448       // Get a list of daughter regions to create
449       LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
450       try (Table table = connection.getTable(tableName)) {
451         tmpRegionSet = getSplits(connection, tableName, splitAlgo);
452       }
453       LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
454       int splitCount = 0;
455       final int origCount = tmpRegionSet.size();
456 
457       // all splits must compact & we have 1 compact thread, so 2 split
458       // requests to the same RS can stall the outstanding split queue.
459       // To fix, group the regions into an RS pool and round-robin through it
460       LOG.debug("Bucketing regions by regionserver...");
461       TreeMap<String, LinkedList<Pair<byte[], byte[]>>> daughterRegions =
462           Maps.newTreeMap();
463       // Get a regionLocator.  Need it in below.
464       try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
465         for (Pair<byte[], byte[]> dr : tmpRegionSet) {
466           String rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getHostnamePort();
467           if (!daughterRegions.containsKey(rsLocation)) {
468             LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
469             daughterRegions.put(rsLocation, entry);
470           }
471           daughterRegions.get(rsLocation).add(dr);
472         }
473         LOG.debug("Done with bucketing.  Split time!");
474         long startTime = System.currentTimeMillis();
475 
476         // Open the split file and modify it as splits finish
477         byte[] rawData = readFile(fs, splitFile);
478 
479         FSDataOutputStream splitOut = fs.create(splitFile);
480         try {
481           splitOut.write(rawData);
482 
483           try {
484             // *** split code ***
485             while (!daughterRegions.isEmpty()) {
486               LOG.debug(daughterRegions.size() + " RS have regions to splt.");
487 
488               // Get ServerName to region count mapping
489               final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
490               List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
491               for (HRegionLocation hrl: hrls) {
492                 ServerName sn = hrl.getServerName();
493                 if (rsSizes.containsKey(sn)) {
494                   rsSizes.put(sn, rsSizes.get(sn) + 1);
495                 } else {
496                   rsSizes.put(sn, 1);
497                 }
498               }
499 
500               // Sort the ServerNames by the number of regions they have
501               List<String> serversLeft = Lists.newArrayList(daughterRegions .keySet());
502               Collections.sort(serversLeft, new Comparator<String>() {
503                 public int compare(String o1, String o2) {
504                   return rsSizes.get(o1).compareTo(rsSizes.get(o2));
505                 }
506               });
507 
508               // Round-robin through the ServerName list. Choose the lightest-loaded servers
509               // first to keep the master from load-balancing regions as we split.
510               for (String rsLoc : serversLeft) {
511                 Pair<byte[], byte[]> dr = null;
512 
513                 // Find a region in the ServerName list that hasn't been moved
514                 LOG.debug("Finding a region on " + rsLoc);
515                 LinkedList<Pair<byte[], byte[]>> regionList = daughterRegions.get(rsLoc);
516                 while (!regionList.isEmpty()) {
517                   dr = regionList.pop();
518 
519                   // get current region info
520                   byte[] split = dr.getSecond();
521                   HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
522 
523                   // if this region moved locations
524                   String newRs = regionLoc.getHostnamePort();
525                   if (newRs.compareTo(rsLoc) != 0) {
526                     LOG.debug("Region with " + splitAlgo.rowToStr(split)
527                         + " moved to " + newRs + ". Relocating...");
528                     // relocate it, don't use it right now
529                     if (!daughterRegions.containsKey(newRs)) {
530                       LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
531                       daughterRegions.put(newRs, entry);
532                     }
533                     daughterRegions.get(newRs).add(dr);
534                     dr = null;
535                     continue;
536                   }
537 
538                   // make sure this region wasn't already split
539                   byte[] sk = regionLoc.getRegionInfo().getStartKey();
540                   if (sk.length != 0) {
541                     if (Bytes.equals(split, sk)) {
542                       LOG.debug("Region already split on "
543                           + splitAlgo.rowToStr(split) + ".  Skipping this region...");
544                       ++splitCount;
545                       dr = null;
546                       continue;
547                     }
548                     byte[] start = dr.getFirst();
549                     Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
550                         .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
551                   }
552 
553                   // passed all checks! found a good region
554                   break;
555                 }
556                 if (regionList.isEmpty()) {
557                   daughterRegions.remove(rsLoc);
558                 }
559                 if (dr == null)
560                   continue;
561 
562                 // we have a good region, time to split!
563                 byte[] split = dr.getSecond();
564                 LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
565                 try (Admin admin = connection.getAdmin()) {
566                   admin.split(tableName, split);
567                 }
568 
569                 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
570                 LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
571                 if (conf.getBoolean("split.verify", true)) {
572                   // we need to verify and rate-limit our splits
573                   outstanding.addLast(dr);
574                   // with too many outstanding splits, wait for some to finish
575                   while (outstanding.size() >= MAX_OUTSTANDING) {
576                     LOG.debug("Wait for outstanding splits " + outstanding.size());
577                     local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
578                     if (local_finished.isEmpty()) {
579                       Thread.sleep(30 * 1000);
580                     } else {
581                       finished.addAll(local_finished);
582                       outstanding.removeAll(local_finished);
583                       LOG.debug(local_finished.size() + " outstanding splits finished");
584                     }
585                   }
586                 } else {
587                   finished.add(dr);
588                 }
589 
590                 // mark each finished region as successfully split.
591                 for (Pair<byte[], byte[]> region : finished) {
592                   splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
593                       + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
594                   splitCount++;
595                   if (splitCount % 10 == 0) {
596                     long tDiff = (System.currentTimeMillis() - startTime)
597                         / splitCount;
598                     LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
599                         + ". Avg Time / Split = "
600                         + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
601                   }
602                 }
603               }
604             }
605             if (conf.getBoolean("split.verify", true)) {
606               while (!outstanding.isEmpty()) {
607                 LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
608                 LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
609                     connection, tableName, splitAlgo);
610                 if (finished.isEmpty()) {
611                   Thread.sleep(30 * 1000);
612                 } else {
613                   outstanding.removeAll(finished);
614                   for (Pair<byte[], byte[]> region : finished) {
615                     splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
616                         + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
617                     splitCount++;
618                   }
619                   LOG.debug("Finally " + finished.size() + " outstanding splits finished");
620                 }
621               }
622             }
623             LOG.debug("All regions have been successfully split!");
624           } finally {
625             long tDiff = System.currentTimeMillis() - startTime;
626             LOG.debug("TOTAL TIME = "
627                 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
628             LOG.debug("Splits = " + splitCount);
629             if (0 < splitCount) {
630               LOG.debug("Avg Time / Split = "
631                   + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
632             }
633           }
634         } finally {
635           splitOut.close();
636           fs.delete(splitFile, false);
637         }
638       }
639     }
640   }
641 
642   /**
643    * @throws IOException if the specified SplitAlgorithm class couldn't be
644    * instantiated
645    */
646   public static SplitAlgorithm newSplitAlgoInstance(Configuration conf,
647           String splitClassName) throws IOException {
648     Class<?> splitClass;
649 
650     // For split algorithms builtin to RegionSplitter, the user can specify
651     // their simple class name instead of a fully qualified class name.
652     if(splitClassName.equals(HexStringSplit.class.getSimpleName())) {
653       splitClass = HexStringSplit.class;
654     } else if (splitClassName.equals(UniformSplit.class.getSimpleName())) {
655       splitClass = UniformSplit.class;
656     } else {
657       try {
658         splitClass = conf.getClassByName(splitClassName);
659       } catch (ClassNotFoundException e) {
660         throw new IOException("Couldn't load split class " + splitClassName, e);
661       }
662       if(splitClass == null) {
663         throw new IOException("Failed loading split class " + splitClassName);
664       }
665       if(!SplitAlgorithm.class.isAssignableFrom(splitClass)) {
666         throw new IOException(
667                 "Specified split class doesn't implement SplitAlgorithm");
668       }
669     }
670     try {
671       return splitClass.asSubclass(SplitAlgorithm.class).newInstance();
672     } catch (Exception e) {
673       throw new IOException("Problem loading split algorithm: ", e);
674     }
675   }
676 
677   static LinkedList<Pair<byte[], byte[]>> splitScan(
678       LinkedList<Pair<byte[], byte[]>> regionList,
679       final Connection connection,
680       final TableName tableName,
681       SplitAlgorithm splitAlgo)
682       throws IOException, InterruptedException {
683     LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
684     LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
685     LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
686 
687     // Get table info
688     Pair<Path, Path> tableDirAndSplitFile =
689       getTableDirAndSplitFile(connection.getConfiguration(), tableName);
690     Path tableDir = tableDirAndSplitFile.getFirst();
691     FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
692     // Clear the cache to forcibly refresh region information
693     ((ClusterConnection)connection).clearRegionCache();
694     HTableDescriptor htd = null;
695     try (Table table = connection.getTable(tableName)) {
696       htd = table.getTableDescriptor();
697     }
698     try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
699 
700       // for every region that hasn't been verified as a finished split
701       for (Pair<byte[], byte[]> region : regionList) {
702         byte[] start = region.getFirst();
703         byte[] split = region.getSecond();
704 
705         // see if the new split daughter region has come online
706         try {
707           HRegionInfo dri = regionLocator.getRegionLocation(split).getRegionInfo();
708           if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
709             logicalSplitting.add(region);
710             continue;
711           }
712         } catch (NoServerForRegionException nsfre) {
713           // NSFRE will occur if the old hbase:meta entry has no server assigned
714           LOG.info(nsfre);
715           logicalSplitting.add(region);
716           continue;
717         }
718 
719         try {
720           // when a daughter region is opened, a compaction is triggered
721           // wait until compaction completes for both daughter regions
722           LinkedList<HRegionInfo> check = Lists.newLinkedList();
723           check.add(regionLocator.getRegionLocation(start).getRegionInfo());
724           check.add(regionLocator.getRegionLocation(split).getRegionInfo());
725           for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) {
726             byte[] sk = hri.getStartKey();
727             if (sk.length == 0)
728               sk = splitAlgo.firstRow();
729 
730             HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
731                 connection.getConfiguration(), fs, tableDir, hri, true);
732 
733             // Check every Column Family for that region -- check does not have references.
734             boolean refFound = false;
735             for (HColumnDescriptor c : htd.getFamilies()) {
736               if ((refFound = regionFs.hasReferences(c.getNameAsString()))) {
737                 break;
738               }
739             }
740 
741             // compaction is completed when all reference files are gone
742             if (!refFound) {
743               check.remove(hri);
744             }
745           }
746           if (check.isEmpty()) {
747             finished.add(region);
748           } else {
749             physicalSplitting.add(region);
750           }
751         } catch (NoServerForRegionException nsfre) {
752           LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
753           physicalSplitting.add(region);
754           ((ClusterConnection)connection).clearRegionCache();
755         }
756       }
757 
758       LOG.debug("Split Scan: " + finished.size() + " finished / "
759           + logicalSplitting.size() + " split wait / "
760           + physicalSplitting.size() + " reference wait");
761 
762       return finished;
763     }
764   }
765 
766   /**
767    * @param conf
768    * @param tableName
769    * @return A Pair where first item is table dir and second is the split file.
770    * @throws IOException 
771    */
772   private static Pair<Path, Path> getTableDirAndSplitFile(final Configuration conf,
773       final TableName tableName)
774   throws IOException {
775     Path hbDir = FSUtils.getRootDir(conf);
776     Path tableDir = FSUtils.getTableDir(hbDir, tableName);
777     Path splitFile = new Path(tableDir, "_balancedSplit");
778     return new Pair<Path, Path>(tableDir, splitFile);
779   }
780 
781   static LinkedList<Pair<byte[], byte[]>> getSplits(final Connection connection,
782       TableName tableName, SplitAlgorithm splitAlgo)
783   throws IOException {
784     Pair<Path, Path> tableDirAndSplitFile =
785       getTableDirAndSplitFile(connection.getConfiguration(), tableName);
786     Path tableDir = tableDirAndSplitFile.getFirst();
787     Path splitFile = tableDirAndSplitFile.getSecond();
788  
789     FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
790 
791     // Using strings because (new byte[]{0}).equals(new byte[]{0}) == false
792     Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
793 
794     // Does a split file exist?
795     if (!fs.exists(splitFile)) {
796       // NO = fresh start. calculate splits to make
797       LOG.debug("No " + splitFile.getName() + " file. Calculating splits ");
798 
799       // Query meta for all regions in the table
800       Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
801       Pair<byte[][], byte[][]> tmp = null;
802       try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
803         tmp = regionLocator.getStartEndKeys();
804       }
805       Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length,
806           "Start and End rows should be equivalent");
807       for (int i = 0; i < tmp.getFirst().length; ++i) {
808         byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
809         if (start.length == 0)
810           start = splitAlgo.firstRow();
811         if (end.length == 0)
812           end = splitAlgo.lastRow();
813         rows.add(Pair.newPair(start, end));
814       }
815       LOG.debug("Table " + tableName + " has " + rows.size() + " regions that will be split.");
816 
817       // prepare the split file
818       Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
819       FSDataOutputStream tmpOut = fs.create(tmpFile);
820 
821       // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
822       for (Pair<byte[], byte[]> r : rows) {
823         byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
824         String startStr = splitAlgo.rowToStr(r.getFirst());
825         String splitStr = splitAlgo.rowToStr(splitPoint);
826         daughterRegions.add(Pair.newPair(startStr, splitStr));
827         LOG.debug("Will Split [" + startStr + " , "
828             + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
829         tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
830             + "\n");
831       }
832       tmpOut.close();
833       fs.rename(tmpFile, splitFile);
834     } else {
835       LOG.debug("_balancedSplit file found. Replay log to restore state...");
836       FSUtils.getInstance(fs, connection.getConfiguration())
837         .recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
838 
839       // parse split file and process remaining splits
840       FSDataInputStream tmpIn = fs.open(splitFile);
841       StringBuilder sb = new StringBuilder(tmpIn.available());
842       while (tmpIn.available() > 0) {
843         sb.append(tmpIn.readChar());
844       }
845       tmpIn.close();
846       for (String line : sb.toString().split("\n")) {
847         String[] cmd = line.split(splitAlgo.separator());
848         Preconditions.checkArgument(3 == cmd.length);
849         byte[] start = splitAlgo.strToRow(cmd[1]);
850         String startStr = splitAlgo.rowToStr(start);
851         byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
852         String splitStr = splitAlgo.rowToStr(splitPoint);
853         Pair<String, String> r = Pair.newPair(startStr, splitStr);
854         if (cmd[0].equals("+")) {
855           LOG.debug("Adding: " + r);
856           daughterRegions.add(r);
857         } else {
858           LOG.debug("Removing: " + r);
859           Preconditions.checkArgument(cmd[0].equals("-"),
860               "Unknown option: " + cmd[0]);
861           Preconditions.checkState(daughterRegions.contains(r),
862               "Missing row: " + r);
863           daughterRegions.remove(r);
864         }
865       }
866       LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
867     }
868     LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
869     for (Pair<String, String> r : daughterRegions) {
870       ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
871           .strToRow(r.getSecond())));
872     }
873     return ret;
874   }
875 
876   /**
877    * HexStringSplit is a well-known {@link SplitAlgorithm} for choosing region
878    * boundaries. The format of a HexStringSplit region boundary is the ASCII
879    * representation of an MD5 checksum, or any other uniformly distributed
880    * hexadecimal value. Row are hex-encoded long values in the range
881    * <b>"00000000" =&gt; "FFFFFFFF"</b> and are left-padded with zeros to keep the
882    * same order lexicographically as if they were binary.
883    *
884    * Since this split algorithm uses hex strings as keys, it is easy to read &amp;
885    * write in the shell but takes up more space and may be non-intuitive.
886    */
887   public static class HexStringSplit implements SplitAlgorithm {
888     final static String DEFAULT_MIN_HEX = "00000000";
889     final static String DEFAULT_MAX_HEX = "FFFFFFFF";
890 
891     String firstRow = DEFAULT_MIN_HEX;
892     BigInteger firstRowInt = BigInteger.ZERO;
893     String lastRow = DEFAULT_MAX_HEX;
894     BigInteger lastRowInt = new BigInteger(lastRow, 16);
895     int rowComparisonLength = lastRow.length();
896 
897     public byte[] split(byte[] start, byte[] end) {
898       BigInteger s = convertToBigInteger(start);
899       BigInteger e = convertToBigInteger(end);
900       Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
901       return convertToByte(split2(s, e));
902     }
903 
904     public byte[][] split(int n) {
905       Preconditions.checkArgument(lastRowInt.compareTo(firstRowInt) > 0,
906           "last row (%s) is configured less than first row (%s)", lastRow,
907           firstRow);
908       // +1 to range because the last row is inclusive
909       BigInteger range = lastRowInt.subtract(firstRowInt).add(BigInteger.ONE);
910       Preconditions.checkState(range.compareTo(BigInteger.valueOf(n)) >= 0,
911           "split granularity (%s) is greater than the range (%s)", n, range);
912 
913       BigInteger[] splits = new BigInteger[n - 1];
914       BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(n));
915       for (int i = 1; i < n; i++) {
916         // NOTE: this means the last region gets all the slop.
917         // This is not a big deal if we're assuming n << MAXHEX
918         splits[i - 1] = firstRowInt.add(sizeOfEachSplit.multiply(BigInteger
919             .valueOf(i)));
920       }
921       return convertToBytes(splits);
922     }
923 
924     public byte[] firstRow() {
925       return convertToByte(firstRowInt);
926     }
927 
928     public byte[] lastRow() {
929       return convertToByte(lastRowInt);
930     }
931 
932     public void setFirstRow(String userInput) {
933       firstRow = userInput;
934       firstRowInt = new BigInteger(firstRow, 16);
935     }
936 
937     public void setLastRow(String userInput) {
938       lastRow = userInput;
939       lastRowInt = new BigInteger(lastRow, 16);
940       // Precondition: lastRow > firstRow, so last's length is the greater
941       rowComparisonLength = lastRow.length();
942     }
943 
944     public byte[] strToRow(String in) {
945       return convertToByte(new BigInteger(in, 16));
946     }
947 
948     public String rowToStr(byte[] row) {
949       return Bytes.toStringBinary(row);
950     }
951 
952     public String separator() {
953       return " ";
954     }
955 
956     @Override
957     public void setFirstRow(byte[] userInput) {
958       firstRow = Bytes.toString(userInput);
959     }
960 
961     @Override
962     public void setLastRow(byte[] userInput) {
963       lastRow = Bytes.toString(userInput);
964     }
965 
966     /**
967      * Divide 2 numbers in half (for split algorithm)
968      *
969      * @param a number #1
970      * @param b number #2
971      * @return the midpoint of the 2 numbers
972      */
973     public BigInteger split2(BigInteger a, BigInteger b) {
974       return a.add(b).divide(BigInteger.valueOf(2)).abs();
975     }
976 
977     /**
978      * Returns an array of bytes corresponding to an array of BigIntegers
979      *
980      * @param bigIntegers numbers to convert
981      * @return bytes corresponding to the bigIntegers
982      */
983     public byte[][] convertToBytes(BigInteger[] bigIntegers) {
984       byte[][] returnBytes = new byte[bigIntegers.length][];
985       for (int i = 0; i < bigIntegers.length; i++) {
986         returnBytes[i] = convertToByte(bigIntegers[i]);
987       }
988       return returnBytes;
989     }
990 
991     /**
992      * Returns the bytes corresponding to the BigInteger
993      *
994      * @param bigInteger number to convert
995      * @param pad padding length
996      * @return byte corresponding to input BigInteger
997      */
998     public static byte[] convertToByte(BigInteger bigInteger, int pad) {
999       String bigIntegerString = bigInteger.toString(16);
1000       bigIntegerString = StringUtils.leftPad(bigIntegerString, pad, '0');
1001       return Bytes.toBytes(bigIntegerString);
1002     }
1003 
1004     /**
1005      * Returns the bytes corresponding to the BigInteger
1006      *
1007      * @param bigInteger number to convert
1008      * @return corresponding bytes
1009      */
1010     public byte[] convertToByte(BigInteger bigInteger) {
1011       return convertToByte(bigInteger, rowComparisonLength);
1012     }
1013 
1014     /**
1015      * Returns the BigInteger represented by the byte array
1016      *
1017      * @param row byte array representing row
1018      * @return the corresponding BigInteger
1019      */
1020     public BigInteger convertToBigInteger(byte[] row) {
1021       return (row.length > 0) ? new BigInteger(Bytes.toString(row), 16)
1022           : BigInteger.ZERO;
1023     }
1024 
1025     @Override
1026     public String toString() {
1027       return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1028           + "," + rowToStr(lastRow()) + "]";
1029     }
1030   }
1031 
1032   /**
1033    * A SplitAlgorithm that divides the space of possible keys evenly. Useful
1034    * when the keys are approximately uniform random bytes (e.g. hashes). Rows
1035    * are raw byte values in the range <b>00 =&gt; FF</b> and are right-padded with
1036    * zeros to keep the same memcmp() order. This is the natural algorithm to use
1037    * for a byte[] environment and saves space, but is not necessarily the
1038    * easiest for readability.
1039    */
1040   public static class UniformSplit implements SplitAlgorithm {
1041     static final byte xFF = (byte) 0xFF;
1042     byte[] firstRowBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
1043     byte[] lastRowBytes =
1044             new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF};
1045     public byte[] split(byte[] start, byte[] end) {
1046       return Bytes.split(start, end, 1)[1];
1047     }
1048 
1049     @Override
1050     public byte[][] split(int numRegions) {
1051       Preconditions.checkArgument(
1052           Bytes.compareTo(lastRowBytes, firstRowBytes) > 0,
1053           "last row (%s) is configured less than first row (%s)",
1054           Bytes.toStringBinary(lastRowBytes),
1055           Bytes.toStringBinary(firstRowBytes));
1056 
1057       byte[][] splits = Bytes.split(firstRowBytes, lastRowBytes, true,
1058           numRegions - 1);
1059       Preconditions.checkState(splits != null,
1060           "Could not split region with given user input: " + this);
1061 
1062       // remove endpoints, which are included in the splits list
1063       return Arrays.copyOfRange(splits, 1, splits.length - 1);
1064     }
1065 
1066     @Override
1067     public byte[] firstRow() {
1068       return firstRowBytes;
1069     }
1070 
1071     @Override
1072     public byte[] lastRow() {
1073       return lastRowBytes;
1074     }
1075 
1076     @Override
1077     public void setFirstRow(String userInput) {
1078       firstRowBytes = Bytes.toBytesBinary(userInput);
1079     }
1080 
1081     @Override
1082     public void setLastRow(String userInput) {
1083       lastRowBytes = Bytes.toBytesBinary(userInput);
1084     }
1085 
1086 
1087     @Override
1088     public void setFirstRow(byte[] userInput) {
1089       firstRowBytes = userInput;
1090     }
1091 
1092     @Override
1093     public void setLastRow(byte[] userInput) {
1094       lastRowBytes = userInput;
1095     }
1096 
1097     @Override
1098     public byte[] strToRow(String input) {
1099       return Bytes.toBytesBinary(input);
1100     }
1101 
1102     @Override
1103     public String rowToStr(byte[] row) {
1104       return Bytes.toStringBinary(row);
1105     }
1106 
1107     @Override
1108     public String separator() {
1109       return ",";
1110     }
1111 
1112     @Override
1113     public String toString() {
1114       return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1115           + "," + rowToStr(lastRow()) + "]";
1116     }
1117   }
1118 }