001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.IOException;
022import java.math.BigInteger;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import org.apache.commons.lang3.ArrayUtils;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.ClusterMetrics;
039import org.apache.hadoop.hbase.ClusterMetrics.Option;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionInfo;
043import org.apache.hadoop.hbase.HRegionLocation;
044import org.apache.hadoop.hbase.MetaTableAccessor;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.client.ClusterConnection;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.ConnectionFactory;
056import org.apache.hadoop.hbase.client.NoServerForRegionException;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
060
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
066import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
069import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
070import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
071import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
072import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
073
074/**
075 * The {@link RegionSplitter} class provides several utilities to help in the
076 * administration lifecycle for developers who choose to manually split regions
077 * instead of having HBase handle that automatically. The most useful utilities
078 * are:
079 * <p>
080 * <ul>
081 * <li>Create a table with a specified number of pre-split regions
082 * <li>Execute a rolling split of all regions on an existing table
083 * </ul>
084 * <p>
085 * Both operations can be safely done on a live server.
086 * <p>
087 * <b>Question:</b> How do I turn off automatic splitting? <br>
088 * <b>Answer:</b> Automatic splitting is determined by the configuration value
089 * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this
090 * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
091 * is 100GB, which would result in &gt; 1hr major compactions if reached.
092 * <p>
093 * <b>Question:</b> Why did the original authors decide to manually split? <br>
094 * <b>Answer:</b> Specific workload characteristics of our use case allowed us
095 * to benefit from a manual split system.
096 * <p>
097 * <ul>
098 * <li>Data (~1k) that would grow instead of being replaced
099 * <li>Data growth was roughly uniform across all regions
100 * <li>OLTP workload. Data loss is a big deal.
101 * </ul>
102 * <p>
103 * <b>Question:</b> Why is manual splitting good for this workload? <br>
104 * <b>Answer:</b> Although automated splitting is not a bad option, there are
105 * benefits to manual splitting.
106 * <p>
107 * <ul>
108 * <li>With growing amounts of data, splits will continually be needed. Since
109 * you always know exactly what regions you have, long-term debugging and
110 * profiling is much easier with manual splits. It is hard to trace the logs to
111 * understand region level problems if it keeps splitting and getting renamed.
112 * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
113 * WAL or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
114 * you notice it a day or so later, you can be assured that the regions
115 * specified in these files are the same as the current regions and you have
116 * less headaches trying to restore/replay your data.
117 * <li>You can finely tune your compaction algorithm. With roughly uniform data
118 * growth, it's easy to cause split / compaction storms as the regions all
119 * roughly hit the same data size at the same time. With manual splits, you can
120 * let staggered, time-based major compactions spread out your network IO load.
121 * </ul>
122 * <p>
123 * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
124 * <b>Answer:</b> Mileage will vary depending upon your application.
125 * <p>
126 * The short answer for our application is that we started with 10 pre-split
127 * regions / server and watched our data growth over time. It's better to err on
128 * the side of too little regions and rolling split later.
129 * <p>
130 * The more complicated answer is that this depends upon the largest storefile
131 * in your region. With a growing data size, this will get larger over time. You
132 * want the largest region to be just big enough that the
133 * {@link org.apache.hadoop.hbase.regionserver.HStore} compact
134 * selection algorithm only compacts it due to a timed major. If you don't, your
135 * cluster can be prone to compaction storms as the algorithm decides to run
136 * major compactions on a large series of regions all at once. Note that
137 * compaction storms are due to the uniform data growth, not the manual split
138 * decision.
139 * <p>
140 * If you pre-split your regions too thin, you can increase the major compaction
141 * interval by configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size
142 * grows too large, use this script to perform a network IO safe rolling split
143 * of all regions.
144 */
145@InterfaceAudience.Private
146public class RegionSplitter {
147  private static final Logger LOG = LoggerFactory.getLogger(RegionSplitter.class);
148
149  /**
150   * A generic interface for the RegionSplitter code to use for all it's
151   * functionality. Note that the original authors of this code use
152   * {@link HexStringSplit} to partition their table and set it as default, but
153   * provided this for your custom algorithm. To use, create a new derived class
154   * from this interface and call {@link RegionSplitter#createPresplitTable} or
155   * RegionSplitter#rollingSplit(TableName, SplitAlgorithm, Configuration) with the
156   * argument splitClassName giving the name of your class.
157   */
158  public interface SplitAlgorithm {
159    /**
160     * Split a pre-existing region into 2 regions.
161     *
162     * @param start
163     *          first row (inclusive)
164     * @param end
165     *          last row (exclusive)
166     * @return the split row to use
167     */
168    byte[] split(byte[] start, byte[] end);
169
170    /**
171     * Split an entire table.
172     *
173     * @param numRegions
174     *          number of regions to split the table into
175     *
176     * @throws RuntimeException
177     *           user input is validated at this time. may throw a runtime
178     *           exception in response to a parse failure
179     * @return array of split keys for the initial regions of the table. The
180     *         length of the returned array should be numRegions-1.
181     */
182    byte[][] split(int numRegions);
183
184    /**
185     * Some MapReduce jobs may want to run multiple mappers per region,
186     * this is intended for such usecase.
187     *
188     * @param start first row (inclusive)
189     * @param end last row (exclusive)
190     * @param numSplits number of splits to generate
191     * @param inclusive whether start and end are returned as split points
192     */
193    byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive);
194
195    /**
196     * In HBase, the first row is represented by an empty byte array. This might
197     * cause problems with your split algorithm or row printing. All your APIs
198     * will be passed firstRow() instead of empty array.
199     *
200     * @return your representation of your first row
201     */
202    byte[] firstRow();
203
204    /**
205     * In HBase, the last row is represented by an empty byte array. This might
206     * cause problems with your split algorithm or row printing. All your APIs
207     * will be passed firstRow() instead of empty array.
208     *
209     * @return your representation of your last row
210     */
211    byte[] lastRow();
212
213    /**
214     * In HBase, the last row is represented by an empty byte array. Set this
215     * value to help the split code understand how to evenly divide the first
216     * region.
217     *
218     * @param userInput
219     *          raw user input (may throw RuntimeException on parse failure)
220     */
221    void setFirstRow(String userInput);
222
223    /**
224     * In HBase, the last row is represented by an empty byte array. Set this
225     * value to help the split code understand how to evenly divide the last
226     * region. Note that this last row is inclusive for all rows sharing the
227     * same prefix.
228     *
229     * @param userInput
230     *          raw user input (may throw RuntimeException on parse failure)
231     */
232    void setLastRow(String userInput);
233
234    /**
235     * @param input
236     *          user or file input for row
237     * @return byte array representation of this row for HBase
238     */
239    byte[] strToRow(String input);
240
241    /**
242     * @param row
243     *          byte array representing a row in HBase
244     * @return String to use for debug &amp; file printing
245     */
246    String rowToStr(byte[] row);
247
248    /**
249     * @return the separator character to use when storing / printing the row
250     */
251    String separator();
252
253    /**
254     * Set the first row
255     * @param userInput byte array of the row key.
256     */
257    void setFirstRow(byte[] userInput);
258
259    /**
260     * Set the last row
261     * @param userInput byte array of the row key.
262     */
263    void setLastRow(byte[] userInput);
264  }
265
266  /**
267   * The main function for the RegionSplitter application. Common uses:
268   * <p>
269   * <ul>
270   * <li>create a table named 'myTable' with 60 pre-split regions containing 2
271   * column families 'test' &amp; 'rs', assuming the keys are hex-encoded ASCII:
272   * <ul>
273   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs
274   * myTable HexStringSplit
275   * </ul>
276   * <li>create a table named 'myTable' with 50 pre-split regions,
277   * assuming the keys are decimal-encoded ASCII:
278   * <ul>
279   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 50
280   * myTable DecimalStringSplit
281   * </ul>
282   * <li>perform a rolling split of 'myTable' (i.e. 60 =&gt; 120 regions), # 2
283   * outstanding splits at a time, assuming keys are uniformly distributed
284   * bytes:
285   * <ul>
286   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable
287   * UniformSplit
288   * </ul>
289   * </ul>
290   *
291   * There are three SplitAlgorithms built into RegionSplitter, HexStringSplit,
292   * DecimalStringSplit, and UniformSplit. These are different strategies for
293   * choosing region boundaries. See their source code for details.
294   *
295   * @param args
296   *          Usage: RegionSplitter &lt;TABLE&gt; &lt;SPLITALGORITHM&gt;
297   *          &lt;-c &lt;# regions&gt; -f &lt;family:family:...&gt; | -r
298   *          [-o &lt;# outstanding splits&gt;]&gt;
299   *          [-D &lt;conf.param=value&gt;]
300   * @throws IOException
301   *           HBase IO problem
302   * @throws InterruptedException
303   *           user requested exit
304   * @throws ParseException
305   *           problem parsing user input
306   */
307  @SuppressWarnings("static-access")
308  public static void main(String[] args) throws IOException,
309      InterruptedException, ParseException {
310    Configuration conf = HBaseConfiguration.create();
311
312    // parse user input
313    Options opt = new Options();
314    opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
315        .withDescription("Override HBase Configuration Settings").create("D"));
316    opt.addOption(OptionBuilder.withArgName("region count").hasArg()
317        .withDescription(
318            "Create a new table with a pre-split number of regions")
319        .create("c"));
320    opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
321        .withDescription(
322            "Column Families to create with new table.  Required with -c")
323        .create("f"));
324    opt.addOption("h", false, "Print this usage help");
325    opt.addOption("r", false, "Perform a rolling split of an existing region");
326    opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
327        "Max outstanding splits that have unfinished major compactions")
328        .create("o"));
329    opt.addOption(null, "firstrow", true,
330        "First Row in Table for Split Algorithm");
331    opt.addOption(null, "lastrow", true,
332        "Last Row in Table for Split Algorithm");
333    opt.addOption(null, "risky", false,
334        "Skip verification steps to complete quickly. "
335            + "STRONGLY DISCOURAGED for production systems.  ");
336    CommandLine cmd = new GnuParser().parse(opt, args);
337
338    if (cmd.hasOption("D")) {
339      for (String confOpt : cmd.getOptionValues("D")) {
340        String[] kv = confOpt.split("=", 2);
341        if (kv.length == 2) {
342          conf.set(kv[0], kv[1]);
343          LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
344        } else {
345          throw new ParseException("-D option format invalid: " + confOpt);
346        }
347      }
348    }
349
350    if (cmd.hasOption("risky")) {
351      conf.setBoolean("split.verify", false);
352    }
353
354    boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
355    boolean rollingSplit = cmd.hasOption("r");
356    boolean oneOperOnly = createTable ^ rollingSplit;
357
358    if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
359      new HelpFormatter().printHelp("bin/hbase regionsplitter <TABLE> <SPLITALGORITHM>\n"+
360          "SPLITALGORITHM is the java class name of a class implementing " +
361          "SplitAlgorithm, or one of the special strings HexStringSplit or " +
362          "DecimalStringSplit or UniformSplit, which are built-in split algorithms. " +
363          "HexStringSplit treats keys as hexadecimal ASCII, and " +
364          "DecimalStringSplit treats keys as decimal ASCII, and " +
365          "UniformSplit treats keys as arbitrary bytes.", opt);
366      return;
367    }
368    TableName tableName = TableName.valueOf(cmd.getArgs()[0]);
369    String splitClass = cmd.getArgs()[1];
370    SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass);
371
372    if (cmd.hasOption("firstrow")) {
373      splitAlgo.setFirstRow(cmd.getOptionValue("firstrow"));
374    }
375    if (cmd.hasOption("lastrow")) {
376      splitAlgo.setLastRow(cmd.getOptionValue("lastrow"));
377    }
378
379    if (createTable) {
380      conf.set("split.count", cmd.getOptionValue("c"));
381      createPresplitTable(tableName, splitAlgo, cmd.getOptionValue("f").split(":"), conf);
382    }
383
384    if (rollingSplit) {
385      if (cmd.hasOption("o")) {
386        conf.set("split.outstanding", cmd.getOptionValue("o"));
387      }
388      rollingSplit(tableName, splitAlgo, conf);
389    }
390  }
391
392  static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo,
393          String[] columnFamilies, Configuration conf)
394  throws IOException, InterruptedException {
395    final int splitCount = conf.getInt("split.count", 0);
396    Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
397
398    Preconditions.checkArgument(columnFamilies.length > 0,
399        "Must specify at least one column family. ");
400    LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
401        + " column families.  Presplitting to " + splitCount + " regions");
402
403    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
404    for (String cf : columnFamilies) {
405      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf));
406    }
407    try (Connection connection = ConnectionFactory.createConnection(conf)) {
408      Admin admin = connection.getAdmin();
409      try {
410        Preconditions.checkArgument(!admin.tableExists(tableName),
411          "Table already exists: " + tableName);
412        admin.createTable(builder.build(), splitAlgo.split(splitCount));
413      } finally {
414        admin.close();
415      }
416      LOG.debug("Table created!  Waiting for regions to show online in META...");
417      if (!conf.getBoolean("split.verify", true)) {
418        // NOTE: createTable is synchronous on the table, but not on the regions
419        int onlineRegions = 0;
420        while (onlineRegions < splitCount) {
421          onlineRegions = MetaTableAccessor.getRegionCount(connection, tableName);
422          LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
423          if (onlineRegions < splitCount) {
424            Thread.sleep(10 * 1000); // sleep
425          }
426        }
427      }
428      LOG.debug("Finished creating table with " + splitCount + " regions");
429    }
430  }
431
432  /**
433   * Alternative getCurrentNrHRS which is no longer available.
434   * @param connection
435   * @return Rough count of regionservers out on cluster.
436   * @throws IOException if a remote or network exception occurs
437   */
438  private static int getRegionServerCount(final Connection connection) throws IOException {
439    try (Admin admin = connection.getAdmin()) {
440      ClusterMetrics status = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
441      Collection<ServerName> servers = status.getLiveServerMetrics().keySet();
442      return servers == null || servers.isEmpty()? 0: servers.size();
443    }
444  }
445
446  private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
447    FSDataInputStream tmpIn = fs.open(path);
448    try {
449      byte [] rawData = new byte[tmpIn.available()];
450      tmpIn.readFully(rawData);
451      return rawData;
452    } finally {
453      tmpIn.close();
454    }
455  }
456
457  static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
458  throws IOException, InterruptedException {
459    final int minOS = conf.getInt("split.outstanding", 2);
460    try (Connection connection = ConnectionFactory.createConnection(conf)) {
461      // Max outstanding splits. default == 50% of servers
462      final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
463
464      Path hbDir = FSUtils.getRootDir(conf);
465      Path tableDir = FSUtils.getTableDir(hbDir, tableName);
466      Path splitFile = new Path(tableDir, "_balancedSplit");
467      FileSystem fs = FileSystem.get(conf);
468
469      // Get a list of daughter regions to create
470      LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
471      try (Table table = connection.getTable(tableName)) {
472        tmpRegionSet = getSplits(connection, tableName, splitAlgo);
473      }
474      LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
475      int splitCount = 0;
476      final int origCount = tmpRegionSet.size();
477
478      // all splits must compact & we have 1 compact thread, so 2 split
479      // requests to the same RS can stall the outstanding split queue.
480      // To fix, group the regions into an RS pool and round-robin through it
481      LOG.debug("Bucketing regions by regionserver...");
482      TreeMap<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegions =
483          Maps.newTreeMap();
484      // Get a regionLocator.  Need it in below.
485      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
486        for (Pair<byte[], byte[]> dr : tmpRegionSet) {
487          ServerName rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getServerName();
488          if (!daughterRegions.containsKey(rsLocation)) {
489            LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
490            daughterRegions.put(rsLocation, entry);
491          }
492          daughterRegions.get(rsLocation).add(dr);
493        }
494        LOG.debug("Done with bucketing.  Split time!");
495        long startTime = System.currentTimeMillis();
496
497        // Open the split file and modify it as splits finish
498        byte[] rawData = readFile(fs, splitFile);
499
500        FSDataOutputStream splitOut = fs.create(splitFile);
501        try {
502          splitOut.write(rawData);
503
504          try {
505            // *** split code ***
506            while (!daughterRegions.isEmpty()) {
507              LOG.debug(daughterRegions.size() + " RS have regions to splt.");
508
509              // Get ServerName to region count mapping
510              final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
511              List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
512              for (HRegionLocation hrl: hrls) {
513                ServerName sn = hrl.getServerName();
514                if (rsSizes.containsKey(sn)) {
515                  rsSizes.put(sn, rsSizes.get(sn) + 1);
516                } else {
517                  rsSizes.put(sn, 1);
518                }
519              }
520
521              // Round-robin through the ServerName list. Choose the lightest-loaded servers
522              // first to keep the master from load-balancing regions as we split.
523              for (Map.Entry<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegion :
524                      daughterRegions.entrySet()) {
525                Pair<byte[], byte[]> dr = null;
526                ServerName rsLoc = daughterRegion.getKey();
527                LinkedList<Pair<byte[], byte[]>> regionList = daughterRegion.getValue();
528
529                // Find a region in the ServerName list that hasn't been moved
530                LOG.debug("Finding a region on " + rsLoc);
531                while (!regionList.isEmpty()) {
532                  dr = regionList.pop();
533
534                  // get current region info
535                  byte[] split = dr.getSecond();
536                  HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
537
538                  // if this region moved locations
539                  ServerName newRs = regionLoc.getServerName();
540                  if (newRs.compareTo(rsLoc) != 0) {
541                    LOG.debug("Region with " + splitAlgo.rowToStr(split)
542                        + " moved to " + newRs + ". Relocating...");
543                    // relocate it, don't use it right now
544                    if (!daughterRegions.containsKey(newRs)) {
545                      LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
546                      daughterRegions.put(newRs, entry);
547                    }
548                    daughterRegions.get(newRs).add(dr);
549                    dr = null;
550                    continue;
551                  }
552
553                  // make sure this region wasn't already split
554                  byte[] sk = regionLoc.getRegionInfo().getStartKey();
555                  if (sk.length != 0) {
556                    if (Bytes.equals(split, sk)) {
557                      LOG.debug("Region already split on "
558                          + splitAlgo.rowToStr(split) + ".  Skipping this region...");
559                      ++splitCount;
560                      dr = null;
561                      continue;
562                    }
563                    byte[] start = dr.getFirst();
564                    Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
565                        .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
566                  }
567
568                  // passed all checks! found a good region
569                  break;
570                }
571                if (regionList.isEmpty()) {
572                  daughterRegions.remove(rsLoc);
573                }
574                if (dr == null)
575                  continue;
576
577                // we have a good region, time to split!
578                byte[] split = dr.getSecond();
579                LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
580                try (Admin admin = connection.getAdmin()) {
581                  admin.split(tableName, split);
582                }
583
584                LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
585                LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
586                if (conf.getBoolean("split.verify", true)) {
587                  // we need to verify and rate-limit our splits
588                  outstanding.addLast(dr);
589                  // with too many outstanding splits, wait for some to finish
590                  while (outstanding.size() >= MAX_OUTSTANDING) {
591                    LOG.debug("Wait for outstanding splits " + outstanding.size());
592                    local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
593                    if (local_finished.isEmpty()) {
594                      Thread.sleep(30 * 1000);
595                    } else {
596                      finished.addAll(local_finished);
597                      outstanding.removeAll(local_finished);
598                      LOG.debug(local_finished.size() + " outstanding splits finished");
599                    }
600                  }
601                } else {
602                  finished.add(dr);
603                }
604
605                // mark each finished region as successfully split.
606                for (Pair<byte[], byte[]> region : finished) {
607                  splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
608                      + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
609                  splitCount++;
610                  if (splitCount % 10 == 0) {
611                    long tDiff = (System.currentTimeMillis() - startTime)
612                        / splitCount;
613                    LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
614                        + ". Avg Time / Split = "
615                        + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
616                  }
617                }
618              }
619            }
620            if (conf.getBoolean("split.verify", true)) {
621              while (!outstanding.isEmpty()) {
622                LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
623                LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
624                    connection, tableName, splitAlgo);
625                if (finished.isEmpty()) {
626                  Thread.sleep(30 * 1000);
627                } else {
628                  outstanding.removeAll(finished);
629                  for (Pair<byte[], byte[]> region : finished) {
630                    splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
631                        + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
632                    splitCount++;
633                  }
634                  LOG.debug("Finally " + finished.size() + " outstanding splits finished");
635                }
636              }
637            }
638            LOG.debug("All regions have been successfully split!");
639          } finally {
640            long tDiff = System.currentTimeMillis() - startTime;
641            LOG.debug("TOTAL TIME = "
642                + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
643            LOG.debug("Splits = " + splitCount);
644            if (0 < splitCount) {
645              LOG.debug("Avg Time / Split = "
646                  + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
647            }
648          }
649        } finally {
650          splitOut.close();
651          fs.delete(splitFile, false);
652        }
653      }
654    }
655  }
656
657  /**
658   * @throws IOException if the specified SplitAlgorithm class couldn't be
659   * instantiated
660   */
661  public static SplitAlgorithm newSplitAlgoInstance(Configuration conf,
662          String splitClassName) throws IOException {
663    Class<?> splitClass;
664
665    // For split algorithms builtin to RegionSplitter, the user can specify
666    // their simple class name instead of a fully qualified class name.
667    if(splitClassName.equals(HexStringSplit.class.getSimpleName())) {
668      splitClass = HexStringSplit.class;
669    } else if (splitClassName.equals(DecimalStringSplit.class.getSimpleName())) {
670      splitClass = DecimalStringSplit.class;
671    } else if (splitClassName.equals(UniformSplit.class.getSimpleName())) {
672      splitClass = UniformSplit.class;
673    } else {
674      try {
675        splitClass = conf.getClassByName(splitClassName);
676      } catch (ClassNotFoundException e) {
677        throw new IOException("Couldn't load split class " + splitClassName, e);
678      }
679      if(splitClass == null) {
680        throw new IOException("Failed loading split class " + splitClassName);
681      }
682      if(!SplitAlgorithm.class.isAssignableFrom(splitClass)) {
683        throw new IOException(
684                "Specified split class doesn't implement SplitAlgorithm");
685      }
686    }
687    try {
688      return splitClass.asSubclass(SplitAlgorithm.class).getDeclaredConstructor().newInstance();
689    } catch (Exception e) {
690      throw new IOException("Problem loading split algorithm: ", e);
691    }
692  }
693
694  static LinkedList<Pair<byte[], byte[]>> splitScan(
695      LinkedList<Pair<byte[], byte[]>> regionList,
696      final Connection connection,
697      final TableName tableName,
698      SplitAlgorithm splitAlgo)
699      throws IOException, InterruptedException {
700    LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
701    LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
702    LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
703
704    // Get table info
705    Pair<Path, Path> tableDirAndSplitFile =
706      getTableDirAndSplitFile(connection.getConfiguration(), tableName);
707    Path tableDir = tableDirAndSplitFile.getFirst();
708    FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
709    // Clear the cache to forcibly refresh region information
710    ((ClusterConnection)connection).clearRegionCache();
711    TableDescriptor htd = null;
712    try (Table table = connection.getTable(tableName)) {
713      htd = table.getDescriptor();
714    }
715    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
716
717      // for every region that hasn't been verified as a finished split
718      for (Pair<byte[], byte[]> region : regionList) {
719        byte[] start = region.getFirst();
720        byte[] split = region.getSecond();
721
722        // see if the new split daughter region has come online
723        try {
724          HRegionInfo dri = regionLocator.getRegionLocation(split).getRegionInfo();
725          if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
726            logicalSplitting.add(region);
727            continue;
728          }
729        } catch (NoServerForRegionException nsfre) {
730          // NSFRE will occur if the old hbase:meta entry has no server assigned
731          LOG.info(nsfre.toString(), nsfre);
732          logicalSplitting.add(region);
733          continue;
734        }
735
736        try {
737          // when a daughter region is opened, a compaction is triggered
738          // wait until compaction completes for both daughter regions
739          LinkedList<HRegionInfo> check = Lists.newLinkedList();
740          check.add(regionLocator.getRegionLocation(start).getRegionInfo());
741          check.add(regionLocator.getRegionLocation(split).getRegionInfo());
742          for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) {
743            byte[] sk = hri.getStartKey();
744            if (sk.length == 0)
745              sk = splitAlgo.firstRow();
746
747            HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
748                connection.getConfiguration(), fs, tableDir, hri, true);
749
750            // Check every Column Family for that region -- check does not have references.
751            boolean refFound = false;
752            for (ColumnFamilyDescriptor c : htd.getColumnFamilies()) {
753              if ((refFound = regionFs.hasReferences(c.getNameAsString()))) {
754                break;
755              }
756            }
757
758            // compaction is completed when all reference files are gone
759            if (!refFound) {
760              check.remove(hri);
761            }
762          }
763          if (check.isEmpty()) {
764            finished.add(region);
765          } else {
766            physicalSplitting.add(region);
767          }
768        } catch (NoServerForRegionException nsfre) {
769          LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
770          physicalSplitting.add(region);
771          ((ClusterConnection)connection).clearRegionCache();
772        }
773      }
774
775      LOG.debug("Split Scan: " + finished.size() + " finished / "
776          + logicalSplitting.size() + " split wait / "
777          + physicalSplitting.size() + " reference wait");
778
779      return finished;
780    }
781  }
782
783  /**
784   * @param conf
785   * @param tableName
786   * @return A Pair where first item is table dir and second is the split file.
787   * @throws IOException if a remote or network exception occurs
788   */
789  private static Pair<Path, Path> getTableDirAndSplitFile(final Configuration conf,
790      final TableName tableName)
791  throws IOException {
792    Path hbDir = FSUtils.getRootDir(conf);
793    Path tableDir = FSUtils.getTableDir(hbDir, tableName);
794    Path splitFile = new Path(tableDir, "_balancedSplit");
795    return new Pair<>(tableDir, splitFile);
796  }
797
798  static LinkedList<Pair<byte[], byte[]>> getSplits(final Connection connection,
799      TableName tableName, SplitAlgorithm splitAlgo)
800  throws IOException {
801    Pair<Path, Path> tableDirAndSplitFile =
802      getTableDirAndSplitFile(connection.getConfiguration(), tableName);
803    Path tableDir = tableDirAndSplitFile.getFirst();
804    Path splitFile = tableDirAndSplitFile.getSecond();
805
806    FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
807
808    // Using strings because (new byte[]{0}).equals(new byte[]{0}) == false
809    Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
810
811    // Does a split file exist?
812    if (!fs.exists(splitFile)) {
813      // NO = fresh start. calculate splits to make
814      LOG.debug("No " + splitFile.getName() + " file. Calculating splits ");
815
816      // Query meta for all regions in the table
817      Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
818      Pair<byte[][], byte[][]> tmp = null;
819      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
820        tmp = regionLocator.getStartEndKeys();
821      }
822      Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length,
823          "Start and End rows should be equivalent");
824      for (int i = 0; i < tmp.getFirst().length; ++i) {
825        byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
826        if (start.length == 0)
827          start = splitAlgo.firstRow();
828        if (end.length == 0)
829          end = splitAlgo.lastRow();
830        rows.add(Pair.newPair(start, end));
831      }
832      LOG.debug("Table " + tableName + " has " + rows.size() + " regions that will be split.");
833
834      // prepare the split file
835      Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
836      FSDataOutputStream tmpOut = fs.create(tmpFile);
837
838      // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
839      for (Pair<byte[], byte[]> r : rows) {
840        byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
841        String startStr = splitAlgo.rowToStr(r.getFirst());
842        String splitStr = splitAlgo.rowToStr(splitPoint);
843        daughterRegions.add(Pair.newPair(startStr, splitStr));
844        LOG.debug("Will Split [" + startStr + " , "
845            + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
846        tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
847            + "\n");
848      }
849      tmpOut.close();
850      fs.rename(tmpFile, splitFile);
851    } else {
852      LOG.debug("_balancedSplit file found. Replay log to restore state...");
853      FSUtils.getInstance(fs, connection.getConfiguration())
854        .recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
855
856      // parse split file and process remaining splits
857      FSDataInputStream tmpIn = fs.open(splitFile);
858      StringBuilder sb = new StringBuilder(tmpIn.available());
859      while (tmpIn.available() > 0) {
860        sb.append(tmpIn.readChar());
861      }
862      tmpIn.close();
863      for (String line : sb.toString().split("\n")) {
864        String[] cmd = line.split(splitAlgo.separator());
865        Preconditions.checkArgument(3 == cmd.length);
866        byte[] start = splitAlgo.strToRow(cmd[1]);
867        String startStr = splitAlgo.rowToStr(start);
868        byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
869        String splitStr = splitAlgo.rowToStr(splitPoint);
870        Pair<String, String> r = Pair.newPair(startStr, splitStr);
871        if (cmd[0].equals("+")) {
872          LOG.debug("Adding: " + r);
873          daughterRegions.add(r);
874        } else {
875          LOG.debug("Removing: " + r);
876          Preconditions.checkArgument(cmd[0].equals("-"),
877              "Unknown option: " + cmd[0]);
878          Preconditions.checkState(daughterRegions.contains(r),
879              "Missing row: " + r);
880          daughterRegions.remove(r);
881        }
882      }
883      LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
884    }
885    LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
886    for (Pair<String, String> r : daughterRegions) {
887      ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
888          .strToRow(r.getSecond())));
889    }
890    return ret;
891  }
892
893  /**
894   * HexStringSplit is a well-known {@link SplitAlgorithm} for choosing region
895   * boundaries. The format of a HexStringSplit region boundary is the ASCII
896   * representation of an MD5 checksum, or any other uniformly distributed
897   * hexadecimal value. Row are hex-encoded long values in the range
898   * <b>"00000000" =&gt; "FFFFFFFF"</b> and are left-padded with zeros to keep the
899   * same order lexicographically as if they were binary.
900   *
901   * Since this split algorithm uses hex strings as keys, it is easy to read &amp;
902   * write in the shell but takes up more space and may be non-intuitive.
903   */
904  public static class HexStringSplit extends NumberStringSplit {
905    final static String DEFAULT_MIN_HEX = "00000000";
906    final static String DEFAULT_MAX_HEX = "FFFFFFFF";
907    final static int RADIX_HEX = 16;
908
909    public HexStringSplit() {
910      super(DEFAULT_MIN_HEX, DEFAULT_MAX_HEX, RADIX_HEX);
911    }
912
913  }
914
915  /**
916   * The format of a DecimalStringSplit region boundary is the ASCII representation of
917   * reversed sequential number, or any other uniformly distributed decimal value.
918   * Row are decimal-encoded long values in the range
919   * <b>"00000000" =&gt; "99999999"</b> and are left-padded with zeros to keep the
920   * same order lexicographically as if they were binary.
921   */
922  public static class DecimalStringSplit extends NumberStringSplit {
923    final static String DEFAULT_MIN_DEC = "00000000";
924    final static String DEFAULT_MAX_DEC = "99999999";
925    final static int RADIX_DEC = 10;
926
927    public DecimalStringSplit() {
928      super(DEFAULT_MIN_DEC, DEFAULT_MAX_DEC, RADIX_DEC);
929    }
930
931  }
932
933  public abstract static class NumberStringSplit implements SplitAlgorithm {
934
935    String firstRow;
936    BigInteger firstRowInt;
937    String lastRow;
938    BigInteger lastRowInt;
939    int rowComparisonLength;
940    int radix;
941
942    NumberStringSplit(String minRow, String maxRow, int radix) {
943      this.firstRow = minRow;
944      this.lastRow = maxRow;
945      this.radix = radix;
946      this.firstRowInt = BigInteger.ZERO;
947      this.lastRowInt = new BigInteger(lastRow, this.radix);
948      this.rowComparisonLength = lastRow.length();
949    }
950
951    @Override
952    public byte[] split(byte[] start, byte[] end) {
953      BigInteger s = convertToBigInteger(start);
954      BigInteger e = convertToBigInteger(end);
955      Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
956      return convertToByte(split2(s, e));
957    }
958
959    @Override
960    public byte[][] split(int n) {
961      Preconditions.checkArgument(lastRowInt.compareTo(firstRowInt) > 0,
962          "last row (%s) is configured less than first row (%s)", lastRow,
963          firstRow);
964      // +1 to range because the last row is inclusive
965      BigInteger range = lastRowInt.subtract(firstRowInt).add(BigInteger.ONE);
966      Preconditions.checkState(range.compareTo(BigInteger.valueOf(n)) >= 0,
967          "split granularity (%s) is greater than the range (%s)", n, range);
968
969      BigInteger[] splits = new BigInteger[n - 1];
970      BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(n));
971      for (int i = 1; i < n; i++) {
972        // NOTE: this means the last region gets all the slop.
973        // This is not a big deal if we're assuming n << MAXHEX
974        splits[i - 1] = firstRowInt.add(sizeOfEachSplit.multiply(BigInteger
975            .valueOf(i)));
976      }
977      return convertToBytes(splits);
978    }
979
980    @Override
981    public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
982      BigInteger s = convertToBigInteger(start);
983      BigInteger e = convertToBigInteger(end);
984
985      Preconditions.checkArgument(e.compareTo(s) > 0,
986                      "last row (%s) is configured less than first row (%s)", rowToStr(end),
987                      end);
988      // +1 to range because the last row is inclusive
989      BigInteger range = e.subtract(s).add(BigInteger.ONE);
990      Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0,
991              "split granularity (%s) is greater than the range (%s)", numSplits, range);
992
993      BigInteger[] splits = new BigInteger[numSplits - 1];
994      BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits));
995      for (int i = 1; i < numSplits; i++) {
996        // NOTE: this means the last region gets all the slop.
997        // This is not a big deal if we're assuming n << MAXHEX
998        splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger
999                .valueOf(i)));
1000      }
1001
1002      if (inclusive) {
1003        BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1];
1004        inclusiveSplitPoints[0] = convertToBigInteger(start);
1005        inclusiveSplitPoints[numSplits] = convertToBigInteger(end);
1006        System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length);
1007        return convertToBytes(inclusiveSplitPoints);
1008      } else {
1009        return convertToBytes(splits);
1010      }
1011    }
1012
1013    @Override
1014    public byte[] firstRow() {
1015      return convertToByte(firstRowInt);
1016    }
1017
1018    @Override
1019    public byte[] lastRow() {
1020      return convertToByte(lastRowInt);
1021    }
1022
1023    @Override
1024    public void setFirstRow(String userInput) {
1025      firstRow = userInput;
1026      firstRowInt = new BigInteger(firstRow, radix);
1027    }
1028
1029    @Override
1030    public void setLastRow(String userInput) {
1031      lastRow = userInput;
1032      lastRowInt = new BigInteger(lastRow, radix);
1033      // Precondition: lastRow > firstRow, so last's length is the greater
1034      rowComparisonLength = lastRow.length();
1035    }
1036
1037    @Override
1038    public byte[] strToRow(String in) {
1039      return convertToByte(new BigInteger(in, radix));
1040    }
1041
1042    @Override
1043    public String rowToStr(byte[] row) {
1044      return Bytes.toStringBinary(row);
1045    }
1046
1047    @Override
1048    public String separator() {
1049      return " ";
1050    }
1051
1052    @Override
1053    public void setFirstRow(byte[] userInput) {
1054      firstRow = Bytes.toString(userInput);
1055    }
1056
1057    @Override
1058    public void setLastRow(byte[] userInput) {
1059      lastRow = Bytes.toString(userInput);
1060    }
1061
1062    /**
1063     * Divide 2 numbers in half (for split algorithm)
1064     *
1065     * @param a number #1
1066     * @param b number #2
1067     * @return the midpoint of the 2 numbers
1068     */
1069    public BigInteger split2(BigInteger a, BigInteger b) {
1070      return a.add(b).divide(BigInteger.valueOf(2)).abs();
1071    }
1072
1073    /**
1074     * Returns an array of bytes corresponding to an array of BigIntegers
1075     *
1076     * @param bigIntegers numbers to convert
1077     * @return bytes corresponding to the bigIntegers
1078     */
1079    public byte[][] convertToBytes(BigInteger[] bigIntegers) {
1080      byte[][] returnBytes = new byte[bigIntegers.length][];
1081      for (int i = 0; i < bigIntegers.length; i++) {
1082        returnBytes[i] = convertToByte(bigIntegers[i]);
1083      }
1084      return returnBytes;
1085    }
1086
1087    /**
1088     * Returns the bytes corresponding to the BigInteger
1089     *
1090     * @param bigInteger number to convert
1091     * @param pad padding length
1092     * @return byte corresponding to input BigInteger
1093     */
1094    public byte[] convertToByte(BigInteger bigInteger, int pad) {
1095      String bigIntegerString = bigInteger.toString(radix);
1096      bigIntegerString = StringUtils.leftPad(bigIntegerString, pad, '0');
1097      return Bytes.toBytes(bigIntegerString);
1098    }
1099
1100    /**
1101     * Returns the bytes corresponding to the BigInteger
1102     *
1103     * @param bigInteger number to convert
1104     * @return corresponding bytes
1105     */
1106    public byte[] convertToByte(BigInteger bigInteger) {
1107      return convertToByte(bigInteger, rowComparisonLength);
1108    }
1109
1110    /**
1111     * Returns the BigInteger represented by the byte array
1112     *
1113     * @param row byte array representing row
1114     * @return the corresponding BigInteger
1115     */
1116    public BigInteger convertToBigInteger(byte[] row) {
1117      return (row.length > 0) ? new BigInteger(Bytes.toString(row), radix)
1118          : BigInteger.ZERO;
1119    }
1120
1121    @Override
1122    public String toString() {
1123      return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1124          + "," + rowToStr(lastRow()) + "]";
1125    }
1126  }
1127
1128  /**
1129   * A SplitAlgorithm that divides the space of possible keys evenly. Useful
1130   * when the keys are approximately uniform random bytes (e.g. hashes). Rows
1131   * are raw byte values in the range <b>00 =&gt; FF</b> and are right-padded with
1132   * zeros to keep the same memcmp() order. This is the natural algorithm to use
1133   * for a byte[] environment and saves space, but is not necessarily the
1134   * easiest for readability.
1135   */
1136  public static class UniformSplit implements SplitAlgorithm {
1137    static final byte xFF = (byte) 0xFF;
1138    byte[] firstRowBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
1139    byte[] lastRowBytes =
1140            new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF};
1141    @Override
1142    public byte[] split(byte[] start, byte[] end) {
1143      return Bytes.split(start, end, 1)[1];
1144    }
1145
1146    @Override
1147    public byte[][] split(int numRegions) {
1148      Preconditions.checkArgument(
1149          Bytes.compareTo(lastRowBytes, firstRowBytes) > 0,
1150          "last row (%s) is configured less than first row (%s)",
1151          Bytes.toStringBinary(lastRowBytes),
1152          Bytes.toStringBinary(firstRowBytes));
1153
1154      byte[][] splits = Bytes.split(firstRowBytes, lastRowBytes, true,
1155          numRegions - 1);
1156      Preconditions.checkState(splits != null,
1157          "Could not split region with given user input: " + this);
1158
1159      // remove endpoints, which are included in the splits list
1160
1161      return splits == null? null: Arrays.copyOfRange(splits, 1, splits.length - 1);
1162    }
1163
1164    @Override
1165    public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
1166      if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
1167        start = firstRowBytes;
1168      }
1169      if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) {
1170        end = lastRowBytes;
1171      }
1172      Preconditions.checkArgument(
1173              Bytes.compareTo(end, start) > 0,
1174              "last row (%s) is configured less than first row (%s)",
1175              Bytes.toStringBinary(end),
1176              Bytes.toStringBinary(start));
1177
1178      byte[][] splits = Bytes.split(start, end, true,
1179              numSplits - 1);
1180      Preconditions.checkState(splits != null,
1181              "Could not calculate input splits with given user input: " + this);
1182      if (inclusive) {
1183        return splits;
1184      } else {
1185        // remove endpoints, which are included in the splits list
1186        return Arrays.copyOfRange(splits, 1, splits.length - 1);
1187      }
1188    }
1189
1190    @Override
1191    public byte[] firstRow() {
1192      return firstRowBytes;
1193    }
1194
1195    @Override
1196    public byte[] lastRow() {
1197      return lastRowBytes;
1198    }
1199
1200    @Override
1201    public void setFirstRow(String userInput) {
1202      firstRowBytes = Bytes.toBytesBinary(userInput);
1203    }
1204
1205    @Override
1206    public void setLastRow(String userInput) {
1207      lastRowBytes = Bytes.toBytesBinary(userInput);
1208    }
1209
1210
1211    @Override
1212    public void setFirstRow(byte[] userInput) {
1213      firstRowBytes = userInput;
1214    }
1215
1216    @Override
1217    public void setLastRow(byte[] userInput) {
1218      lastRowBytes = userInput;
1219    }
1220
1221    @Override
1222    public byte[] strToRow(String input) {
1223      return Bytes.toBytesBinary(input);
1224    }
1225
1226    @Override
1227    public String rowToStr(byte[] row) {
1228      return Bytes.toStringBinary(row);
1229    }
1230
1231    @Override
1232    public String separator() {
1233      return ",";
1234    }
1235
1236    @Override
1237    public String toString() {
1238      return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1239          + "," + rowToStr(lastRow()) + "]";
1240    }
1241  }
1242}