001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.IOException;
022import java.math.BigInteger;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.TreeMap;
030import org.apache.commons.lang3.ArrayUtils;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.MetaTableAccessor;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049import org.apache.hadoop.hbase.client.Admin;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.NoServerForRegionException;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
057
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
063import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
069import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
070
071/**
072 * The {@link RegionSplitter} class provides several utilities to help in the
073 * administration lifecycle for developers who choose to manually split regions
074 * instead of having HBase handle that automatically. The most useful utilities
075 * are:
076 * <p>
077 * <ul>
078 * <li>Create a table with a specified number of pre-split regions
079 * <li>Execute a rolling split of all regions on an existing table
080 * </ul>
081 * <p>
082 * Both operations can be safely done on a live server.
083 * <p>
084 * <b>Question:</b> How do I turn off automatic splitting? <br>
085 * <b>Answer:</b> Automatic splitting is determined by the configuration value
086 * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this
087 * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
088 * is 100GB, which would result in &gt; 1hr major compactions if reached.
089 * <p>
090 * <b>Question:</b> Why did the original authors decide to manually split? <br>
091 * <b>Answer:</b> Specific workload characteristics of our use case allowed us
092 * to benefit from a manual split system.
093 * <p>
094 * <ul>
095 * <li>Data (~1k) that would grow instead of being replaced
096 * <li>Data growth was roughly uniform across all regions
097 * <li>OLTP workload. Data loss is a big deal.
098 * </ul>
099 * <p>
100 * <b>Question:</b> Why is manual splitting good for this workload? <br>
101 * <b>Answer:</b> Although automated splitting is not a bad option, there are
102 * benefits to manual splitting.
103 * <p>
104 * <ul>
105 * <li>With growing amounts of data, splits will continually be needed. Since
106 * you always know exactly what regions you have, long-term debugging and
107 * profiling is much easier with manual splits. It is hard to trace the logs to
108 * understand region level problems if it keeps splitting and getting renamed.
109 * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
110 * WAL or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
111 * you notice it a day or so later, you can be assured that the regions
112 * specified in these files are the same as the current regions and you have
113 * less headaches trying to restore/replay your data.
114 * <li>You can finely tune your compaction algorithm. With roughly uniform data
115 * growth, it's easy to cause split / compaction storms as the regions all
116 * roughly hit the same data size at the same time. With manual splits, you can
117 * let staggered, time-based major compactions spread out your network IO load.
118 * </ul>
119 * <p>
120 * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
121 * <b>Answer:</b> Mileage will vary depending upon your application.
122 * <p>
123 * The short answer for our application is that we started with 10 pre-split
124 * regions / server and watched our data growth over time. It's better to err on
125 * the side of too little regions and rolling split later.
126 * <p>
127 * The more complicated answer is that this depends upon the largest storefile
128 * in your region. With a growing data size, this will get larger over time. You
129 * want the largest region to be just big enough that the
130 * {@link org.apache.hadoop.hbase.regionserver.HStore} compact
131 * selection algorithm only compacts it due to a timed major. If you don't, your
132 * cluster can be prone to compaction storms as the algorithm decides to run
133 * major compactions on a large series of regions all at once. Note that
134 * compaction storms are due to the uniform data growth, not the manual split
135 * decision.
136 * <p>
137 * If you pre-split your regions too thin, you can increase the major compaction
138 * interval by configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size
139 * grows too large, use this script to perform a network IO safe rolling split
140 * of all regions.
141 */
142@InterfaceAudience.Private
143public class RegionSplitter {
144  private static final Logger LOG = LoggerFactory.getLogger(RegionSplitter.class);
145
146  /**
147   * A generic interface for the RegionSplitter code to use for all it's
148   * functionality. Note that the original authors of this code use
149   * {@link HexStringSplit} to partition their table and set it as default, but
150   * provided this for your custom algorithm. To use, create a new derived class
151   * from this interface and call {@link RegionSplitter#createPresplitTable} or
152   * RegionSplitter#rollingSplit(TableName, SplitAlgorithm, Configuration) with the
153   * argument splitClassName giving the name of your class.
154   */
155  public interface SplitAlgorithm {
156    /**
157     * Split a pre-existing region into 2 regions.
158     *
159     * @param start
160     *          first row (inclusive)
161     * @param end
162     *          last row (exclusive)
163     * @return the split row to use
164     */
165    byte[] split(byte[] start, byte[] end);
166
167    /**
168     * Split an entire table.
169     *
170     * @param numRegions
171     *          number of regions to split the table into
172     *
173     * @throws RuntimeException
174     *           user input is validated at this time. may throw a runtime
175     *           exception in response to a parse failure
176     * @return array of split keys for the initial regions of the table. The
177     *         length of the returned array should be numRegions-1.
178     */
179    byte[][] split(int numRegions);
180
181    /**
182     * Some MapReduce jobs may want to run multiple mappers per region,
183     * this is intended for such usecase.
184     *
185     * @param start first row (inclusive)
186     * @param end last row (exclusive)
187     * @param numSplits number of splits to generate
188     * @param inclusive whether start and end are returned as split points
189     */
190    byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive);
191
192    /**
193     * In HBase, the first row is represented by an empty byte array. This might
194     * cause problems with your split algorithm or row printing. All your APIs
195     * will be passed firstRow() instead of empty array.
196     *
197     * @return your representation of your first row
198     */
199    byte[] firstRow();
200
201    /**
202     * In HBase, the last row is represented by an empty byte array. This might
203     * cause problems with your split algorithm or row printing. All your APIs
204     * will be passed firstRow() instead of empty array.
205     *
206     * @return your representation of your last row
207     */
208    byte[] lastRow();
209
210    /**
211     * In HBase, the last row is represented by an empty byte array. Set this
212     * value to help the split code understand how to evenly divide the first
213     * region.
214     *
215     * @param userInput
216     *          raw user input (may throw RuntimeException on parse failure)
217     */
218    void setFirstRow(String userInput);
219
220    /**
221     * In HBase, the last row is represented by an empty byte array. Set this
222     * value to help the split code understand how to evenly divide the last
223     * region. Note that this last row is inclusive for all rows sharing the
224     * same prefix.
225     *
226     * @param userInput
227     *          raw user input (may throw RuntimeException on parse failure)
228     */
229    void setLastRow(String userInput);
230
231    /**
232     * @param input
233     *          user or file input for row
234     * @return byte array representation of this row for HBase
235     */
236    byte[] strToRow(String input);
237
238    /**
239     * @param row
240     *          byte array representing a row in HBase
241     * @return String to use for debug &amp; file printing
242     */
243    String rowToStr(byte[] row);
244
245    /**
246     * @return the separator character to use when storing / printing the row
247     */
248    String separator();
249
250    /**
251     * Set the first row
252     * @param userInput byte array of the row key.
253     */
254    void setFirstRow(byte[] userInput);
255
256    /**
257     * Set the last row
258     * @param userInput byte array of the row key.
259     */
260    void setLastRow(byte[] userInput);
261  }
262
263  /**
264   * The main function for the RegionSplitter application. Common uses:
265   * <p>
266   * <ul>
267   * <li>create a table named 'myTable' with 60 pre-split regions containing 2
268   * column families 'test' &amp; 'rs', assuming the keys are hex-encoded ASCII:
269   * <ul>
270   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs
271   * myTable HexStringSplit
272   * </ul>
273   * <li>create a table named 'myTable' with 50 pre-split regions,
274   * assuming the keys are decimal-encoded ASCII:
275   * <ul>
276   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 50
277   * myTable DecimalStringSplit
278   * </ul>
279   * <li>perform a rolling split of 'myTable' (i.e. 60 =&gt; 120 regions), # 2
280   * outstanding splits at a time, assuming keys are uniformly distributed
281   * bytes:
282   * <ul>
283   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable
284   * UniformSplit
285   * </ul>
286   * </ul>
287   *
288   * There are three SplitAlgorithms built into RegionSplitter, HexStringSplit,
289   * DecimalStringSplit, and UniformSplit. These are different strategies for
290   * choosing region boundaries. See their source code for details.
291   *
292   * @param args
293   *          Usage: RegionSplitter &lt;TABLE&gt; &lt;SPLITALGORITHM&gt;
294   *          &lt;-c &lt;# regions&gt; -f &lt;family:family:...&gt; | -r
295   *          [-o &lt;# outstanding splits&gt;]&gt;
296   *          [-D &lt;conf.param=value&gt;]
297   * @throws IOException
298   *           HBase IO problem
299   * @throws InterruptedException
300   *           user requested exit
301   * @throws ParseException
302   *           problem parsing user input
303   */
304  @SuppressWarnings("static-access")
305  public static void main(String[] args) throws IOException,
306      InterruptedException, ParseException {
307    Configuration conf = HBaseConfiguration.create();
308
309    // parse user input
310    Options opt = new Options();
311    opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
312        .withDescription("Override HBase Configuration Settings").create("D"));
313    opt.addOption(OptionBuilder.withArgName("region count").hasArg()
314        .withDescription(
315            "Create a new table with a pre-split number of regions")
316        .create("c"));
317    opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
318        .withDescription(
319            "Column Families to create with new table.  Required with -c")
320        .create("f"));
321    opt.addOption("h", false, "Print this usage help");
322    opt.addOption("r", false, "Perform a rolling split of an existing region");
323    opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
324        "Max outstanding splits that have unfinished major compactions")
325        .create("o"));
326    opt.addOption(null, "firstrow", true,
327        "First Row in Table for Split Algorithm");
328    opt.addOption(null, "lastrow", true,
329        "Last Row in Table for Split Algorithm");
330    opt.addOption(null, "risky", false,
331        "Skip verification steps to complete quickly. "
332            + "STRONGLY DISCOURAGED for production systems.  ");
333    CommandLine cmd = new GnuParser().parse(opt, args);
334
335    if (cmd.hasOption("D")) {
336      for (String confOpt : cmd.getOptionValues("D")) {
337        String[] kv = confOpt.split("=", 2);
338        if (kv.length == 2) {
339          conf.set(kv[0], kv[1]);
340          LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
341        } else {
342          throw new ParseException("-D option format invalid: " + confOpt);
343        }
344      }
345    }
346
347    if (cmd.hasOption("risky")) {
348      conf.setBoolean("split.verify", false);
349    }
350
351    boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
352    boolean rollingSplit = cmd.hasOption("r");
353    boolean oneOperOnly = createTable ^ rollingSplit;
354
355    if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
356      new HelpFormatter().printHelp("bin/hbase regionsplitter <TABLE> <SPLITALGORITHM>\n"+
357          "SPLITALGORITHM is the java class name of a class implementing " +
358          "SplitAlgorithm, or one of the special strings HexStringSplit or " +
359          "DecimalStringSplit or UniformSplit, which are built-in split algorithms. " +
360          "HexStringSplit treats keys as hexadecimal ASCII, and " +
361          "DecimalStringSplit treats keys as decimal ASCII, and " +
362          "UniformSplit treats keys as arbitrary bytes.", opt);
363      return;
364    }
365    TableName tableName = TableName.valueOf(cmd.getArgs()[0]);
366    String splitClass = cmd.getArgs()[1];
367    SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass);
368
369    if (cmd.hasOption("firstrow")) {
370      splitAlgo.setFirstRow(cmd.getOptionValue("firstrow"));
371    }
372    if (cmd.hasOption("lastrow")) {
373      splitAlgo.setLastRow(cmd.getOptionValue("lastrow"));
374    }
375
376    if (createTable) {
377      conf.set("split.count", cmd.getOptionValue("c"));
378      createPresplitTable(tableName, splitAlgo, cmd.getOptionValue("f").split(":"), conf);
379    }
380
381    if (rollingSplit) {
382      if (cmd.hasOption("o")) {
383        conf.set("split.outstanding", cmd.getOptionValue("o"));
384      }
385      rollingSplit(tableName, splitAlgo, conf);
386    }
387  }
388
389  static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo,
390          String[] columnFamilies, Configuration conf)
391  throws IOException, InterruptedException {
392    final int splitCount = conf.getInt("split.count", 0);
393    Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
394
395    Preconditions.checkArgument(columnFamilies.length > 0,
396        "Must specify at least one column family. ");
397    LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
398        + " column families.  Presplitting to " + splitCount + " regions");
399
400    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
401    for (String cf : columnFamilies) {
402      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf));
403    }
404    try (Connection connection = ConnectionFactory.createConnection(conf)) {
405      Admin admin = connection.getAdmin();
406      try {
407        Preconditions.checkArgument(!admin.tableExists(tableName),
408          "Table already exists: " + tableName);
409        admin.createTable(builder.build(), splitAlgo.split(splitCount));
410      } finally {
411        admin.close();
412      }
413      LOG.debug("Table created!  Waiting for regions to show online in META...");
414      if (!conf.getBoolean("split.verify", true)) {
415        // NOTE: createTable is synchronous on the table, but not on the regions
416        int onlineRegions = 0;
417        while (onlineRegions < splitCount) {
418          onlineRegions = MetaTableAccessor.getRegionCount(connection, tableName);
419          LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
420          if (onlineRegions < splitCount) {
421            Thread.sleep(10 * 1000); // sleep
422          }
423        }
424      }
425      LOG.debug("Finished creating table with " + splitCount + " regions");
426    }
427  }
428
429  /**
430   * Alternative getCurrentNrHRS which is no longer available.
431   * @param connection
432   * @return Rough count of regionservers out on cluster.
433   * @throws IOException if a remote or network exception occurs
434   */
435  private static int getRegionServerCount(final Connection connection) throws IOException {
436    try (Admin admin = connection.getAdmin()) {
437      Collection<ServerName> servers = admin.getRegionServers();
438      return servers == null || servers.isEmpty()? 0: servers.size();
439    }
440  }
441
442  private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
443    FSDataInputStream tmpIn = fs.open(path);
444    try {
445      byte [] rawData = new byte[tmpIn.available()];
446      tmpIn.readFully(rawData);
447      return rawData;
448    } finally {
449      tmpIn.close();
450    }
451  }
452
453  static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
454  throws IOException, InterruptedException {
455    final int minOS = conf.getInt("split.outstanding", 2);
456    try (Connection connection = ConnectionFactory.createConnection(conf)) {
457      // Max outstanding splits. default == 50% of servers
458      final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
459
460      Path hbDir = FSUtils.getRootDir(conf);
461      Path tableDir = FSUtils.getTableDir(hbDir, tableName);
462      Path splitFile = new Path(tableDir, "_balancedSplit");
463      FileSystem fs = FileSystem.get(conf);
464
465      // Get a list of daughter regions to create
466      LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
467      try (Table table = connection.getTable(tableName)) {
468        tmpRegionSet = getSplits(connection, tableName, splitAlgo);
469      }
470      LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
471      int splitCount = 0;
472      final int origCount = tmpRegionSet.size();
473
474      // all splits must compact & we have 1 compact thread, so 2 split
475      // requests to the same RS can stall the outstanding split queue.
476      // To fix, group the regions into an RS pool and round-robin through it
477      LOG.debug("Bucketing regions by regionserver...");
478      TreeMap<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegions =
479          Maps.newTreeMap();
480      // Get a regionLocator.  Need it in below.
481      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
482        for (Pair<byte[], byte[]> dr : tmpRegionSet) {
483          ServerName rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getServerName();
484          if (!daughterRegions.containsKey(rsLocation)) {
485            LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
486            daughterRegions.put(rsLocation, entry);
487          }
488          daughterRegions.get(rsLocation).add(dr);
489        }
490        LOG.debug("Done with bucketing.  Split time!");
491        long startTime = System.currentTimeMillis();
492
493        // Open the split file and modify it as splits finish
494        byte[] rawData = readFile(fs, splitFile);
495
496        FSDataOutputStream splitOut = fs.create(splitFile);
497        try {
498          splitOut.write(rawData);
499
500          try {
501            // *** split code ***
502            while (!daughterRegions.isEmpty()) {
503              LOG.debug(daughterRegions.size() + " RS have regions to splt.");
504
505              // Get ServerName to region count mapping
506              final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
507              List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
508              for (HRegionLocation hrl: hrls) {
509                ServerName sn = hrl.getServerName();
510                if (rsSizes.containsKey(sn)) {
511                  rsSizes.put(sn, rsSizes.get(sn) + 1);
512                } else {
513                  rsSizes.put(sn, 1);
514                }
515              }
516
517              // Round-robin through the ServerName list. Choose the lightest-loaded servers
518              // first to keep the master from load-balancing regions as we split.
519              for (Map.Entry<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegion :
520                      daughterRegions.entrySet()) {
521                Pair<byte[], byte[]> dr = null;
522                ServerName rsLoc = daughterRegion.getKey();
523                LinkedList<Pair<byte[], byte[]>> regionList = daughterRegion.getValue();
524
525                // Find a region in the ServerName list that hasn't been moved
526                LOG.debug("Finding a region on " + rsLoc);
527                while (!regionList.isEmpty()) {
528                  dr = regionList.pop();
529
530                  // get current region info
531                  byte[] split = dr.getSecond();
532                  HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
533
534                  // if this region moved locations
535                  ServerName newRs = regionLoc.getServerName();
536                  if (newRs.compareTo(rsLoc) != 0) {
537                    LOG.debug("Region with " + splitAlgo.rowToStr(split)
538                        + " moved to " + newRs + ". Relocating...");
539                    // relocate it, don't use it right now
540                    if (!daughterRegions.containsKey(newRs)) {
541                      LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
542                      daughterRegions.put(newRs, entry);
543                    }
544                    daughterRegions.get(newRs).add(dr);
545                    dr = null;
546                    continue;
547                  }
548
549                  // make sure this region wasn't already split
550                  byte[] sk = regionLoc.getRegionInfo().getStartKey();
551                  if (sk.length != 0) {
552                    if (Bytes.equals(split, sk)) {
553                      LOG.debug("Region already split on "
554                          + splitAlgo.rowToStr(split) + ".  Skipping this region...");
555                      ++splitCount;
556                      dr = null;
557                      continue;
558                    }
559                    byte[] start = dr.getFirst();
560                    Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
561                        .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
562                  }
563
564                  // passed all checks! found a good region
565                  break;
566                }
567                if (regionList.isEmpty()) {
568                  daughterRegions.remove(rsLoc);
569                }
570                if (dr == null)
571                  continue;
572
573                // we have a good region, time to split!
574                byte[] split = dr.getSecond();
575                LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
576                try (Admin admin = connection.getAdmin()) {
577                  admin.split(tableName, split);
578                }
579
580                LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
581                LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
582                if (conf.getBoolean("split.verify", true)) {
583                  // we need to verify and rate-limit our splits
584                  outstanding.addLast(dr);
585                  // with too many outstanding splits, wait for some to finish
586                  while (outstanding.size() >= MAX_OUTSTANDING) {
587                    LOG.debug("Wait for outstanding splits " + outstanding.size());
588                    local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
589                    if (local_finished.isEmpty()) {
590                      Thread.sleep(30 * 1000);
591                    } else {
592                      finished.addAll(local_finished);
593                      outstanding.removeAll(local_finished);
594                      LOG.debug(local_finished.size() + " outstanding splits finished");
595                    }
596                  }
597                } else {
598                  finished.add(dr);
599                }
600
601                // mark each finished region as successfully split.
602                for (Pair<byte[], byte[]> region : finished) {
603                  splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
604                      + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
605                  splitCount++;
606                  if (splitCount % 10 == 0) {
607                    long tDiff = (System.currentTimeMillis() - startTime)
608                        / splitCount;
609                    LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
610                        + ". Avg Time / Split = "
611                        + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
612                  }
613                }
614              }
615            }
616            if (conf.getBoolean("split.verify", true)) {
617              while (!outstanding.isEmpty()) {
618                LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
619                LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
620                    connection, tableName, splitAlgo);
621                if (finished.isEmpty()) {
622                  Thread.sleep(30 * 1000);
623                } else {
624                  outstanding.removeAll(finished);
625                  for (Pair<byte[], byte[]> region : finished) {
626                    splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
627                        + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
628                    splitCount++;
629                  }
630                  LOG.debug("Finally " + finished.size() + " outstanding splits finished");
631                }
632              }
633            }
634            LOG.debug("All regions have been successfully split!");
635          } finally {
636            long tDiff = System.currentTimeMillis() - startTime;
637            LOG.debug("TOTAL TIME = "
638                + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
639            LOG.debug("Splits = " + splitCount);
640            if (0 < splitCount) {
641              LOG.debug("Avg Time / Split = "
642                  + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
643            }
644          }
645        } finally {
646          splitOut.close();
647          fs.delete(splitFile, false);
648        }
649      }
650    }
651  }
652
653  /**
654   * @throws IOException if the specified SplitAlgorithm class couldn't be
655   * instantiated
656   */
657  public static SplitAlgorithm newSplitAlgoInstance(Configuration conf,
658          String splitClassName) throws IOException {
659    Class<?> splitClass;
660
661    // For split algorithms builtin to RegionSplitter, the user can specify
662    // their simple class name instead of a fully qualified class name.
663    if(splitClassName.equals(HexStringSplit.class.getSimpleName())) {
664      splitClass = HexStringSplit.class;
665    } else if (splitClassName.equals(DecimalStringSplit.class.getSimpleName())) {
666      splitClass = DecimalStringSplit.class;
667    } else if (splitClassName.equals(UniformSplit.class.getSimpleName())) {
668      splitClass = UniformSplit.class;
669    } else {
670      try {
671        splitClass = conf.getClassByName(splitClassName);
672      } catch (ClassNotFoundException e) {
673        throw new IOException("Couldn't load split class " + splitClassName, e);
674      }
675      if(splitClass == null) {
676        throw new IOException("Failed loading split class " + splitClassName);
677      }
678      if(!SplitAlgorithm.class.isAssignableFrom(splitClass)) {
679        throw new IOException(
680                "Specified split class doesn't implement SplitAlgorithm");
681      }
682    }
683    try {
684      return splitClass.asSubclass(SplitAlgorithm.class).getDeclaredConstructor().newInstance();
685    } catch (Exception e) {
686      throw new IOException("Problem loading split algorithm: ", e);
687    }
688  }
689
690  static LinkedList<Pair<byte[], byte[]>> splitScan(
691      LinkedList<Pair<byte[], byte[]>> regionList,
692      final Connection connection,
693      final TableName tableName,
694      SplitAlgorithm splitAlgo)
695      throws IOException, InterruptedException {
696    LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
697    LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
698    LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
699
700    // Get table info
701    Pair<Path, Path> tableDirAndSplitFile =
702      getTableDirAndSplitFile(connection.getConfiguration(), tableName);
703    Path tableDir = tableDirAndSplitFile.getFirst();
704    FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
705    // Clear the cache to forcibly refresh region information
706    ((ClusterConnection)connection).clearRegionLocationCache();
707    TableDescriptor htd = null;
708    try (Table table = connection.getTable(tableName)) {
709      htd = table.getDescriptor();
710    }
711    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
712
713      // for every region that hasn't been verified as a finished split
714      for (Pair<byte[], byte[]> region : regionList) {
715        byte[] start = region.getFirst();
716        byte[] split = region.getSecond();
717
718        // see if the new split daughter region has come online
719        try {
720          HRegionInfo dri = regionLocator.getRegionLocation(split).getRegionInfo();
721          if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
722            logicalSplitting.add(region);
723            continue;
724          }
725        } catch (NoServerForRegionException nsfre) {
726          // NSFRE will occur if the old hbase:meta entry has no server assigned
727          LOG.info(nsfre.toString(), nsfre);
728          logicalSplitting.add(region);
729          continue;
730        }
731
732        try {
733          // when a daughter region is opened, a compaction is triggered
734          // wait until compaction completes for both daughter regions
735          LinkedList<HRegionInfo> check = Lists.newLinkedList();
736          check.add(regionLocator.getRegionLocation(start).getRegionInfo());
737          check.add(regionLocator.getRegionLocation(split).getRegionInfo());
738          for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) {
739            byte[] sk = hri.getStartKey();
740            if (sk.length == 0)
741              sk = splitAlgo.firstRow();
742
743            HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
744                connection.getConfiguration(), fs, tableDir, hri, true);
745
746            // Check every Column Family for that region -- check does not have references.
747            boolean refFound = false;
748            for (ColumnFamilyDescriptor c : htd.getColumnFamilies()) {
749              if ((refFound = regionFs.hasReferences(c.getNameAsString()))) {
750                break;
751              }
752            }
753
754            // compaction is completed when all reference files are gone
755            if (!refFound) {
756              check.remove(hri);
757            }
758          }
759          if (check.isEmpty()) {
760            finished.add(region);
761          } else {
762            physicalSplitting.add(region);
763          }
764        } catch (NoServerForRegionException nsfre) {
765          LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
766          physicalSplitting.add(region);
767          ((ClusterConnection)connection).clearRegionLocationCache();
768        }
769      }
770
771      LOG.debug("Split Scan: " + finished.size() + " finished / "
772          + logicalSplitting.size() + " split wait / "
773          + physicalSplitting.size() + " reference wait");
774
775      return finished;
776    }
777  }
778
779  /**
780   * @param conf
781   * @param tableName
782   * @return A Pair where first item is table dir and second is the split file.
783   * @throws IOException if a remote or network exception occurs
784   */
785  private static Pair<Path, Path> getTableDirAndSplitFile(final Configuration conf,
786      final TableName tableName)
787  throws IOException {
788    Path hbDir = FSUtils.getRootDir(conf);
789    Path tableDir = FSUtils.getTableDir(hbDir, tableName);
790    Path splitFile = new Path(tableDir, "_balancedSplit");
791    return new Pair<>(tableDir, splitFile);
792  }
793
794  static LinkedList<Pair<byte[], byte[]>> getSplits(final Connection connection,
795      TableName tableName, SplitAlgorithm splitAlgo)
796  throws IOException {
797    Pair<Path, Path> tableDirAndSplitFile =
798      getTableDirAndSplitFile(connection.getConfiguration(), tableName);
799    Path tableDir = tableDirAndSplitFile.getFirst();
800    Path splitFile = tableDirAndSplitFile.getSecond();
801
802    FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
803
804    // Using strings because (new byte[]{0}).equals(new byte[]{0}) == false
805    Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
806
807    // Does a split file exist?
808    if (!fs.exists(splitFile)) {
809      // NO = fresh start. calculate splits to make
810      LOG.debug("No " + splitFile.getName() + " file. Calculating splits ");
811
812      // Query meta for all regions in the table
813      Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
814      Pair<byte[][], byte[][]> tmp = null;
815      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
816        tmp = regionLocator.getStartEndKeys();
817      }
818      Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length,
819          "Start and End rows should be equivalent");
820      for (int i = 0; i < tmp.getFirst().length; ++i) {
821        byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
822        if (start.length == 0)
823          start = splitAlgo.firstRow();
824        if (end.length == 0)
825          end = splitAlgo.lastRow();
826        rows.add(Pair.newPair(start, end));
827      }
828      LOG.debug("Table " + tableName + " has " + rows.size() + " regions that will be split.");
829
830      // prepare the split file
831      Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
832      FSDataOutputStream tmpOut = fs.create(tmpFile);
833
834      // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
835      for (Pair<byte[], byte[]> r : rows) {
836        byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
837        String startStr = splitAlgo.rowToStr(r.getFirst());
838        String splitStr = splitAlgo.rowToStr(splitPoint);
839        daughterRegions.add(Pair.newPair(startStr, splitStr));
840        LOG.debug("Will Split [" + startStr + " , "
841            + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
842        tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
843            + "\n");
844      }
845      tmpOut.close();
846      fs.rename(tmpFile, splitFile);
847    } else {
848      LOG.debug("_balancedSplit file found. Replay log to restore state...");
849      FSUtils.getInstance(fs, connection.getConfiguration())
850        .recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
851
852      // parse split file and process remaining splits
853      FSDataInputStream tmpIn = fs.open(splitFile);
854      StringBuilder sb = new StringBuilder(tmpIn.available());
855      while (tmpIn.available() > 0) {
856        sb.append(tmpIn.readChar());
857      }
858      tmpIn.close();
859      for (String line : sb.toString().split("\n")) {
860        String[] cmd = line.split(splitAlgo.separator());
861        Preconditions.checkArgument(3 == cmd.length);
862        byte[] start = splitAlgo.strToRow(cmd[1]);
863        String startStr = splitAlgo.rowToStr(start);
864        byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
865        String splitStr = splitAlgo.rowToStr(splitPoint);
866        Pair<String, String> r = Pair.newPair(startStr, splitStr);
867        if (cmd[0].equals("+")) {
868          LOG.debug("Adding: " + r);
869          daughterRegions.add(r);
870        } else {
871          LOG.debug("Removing: " + r);
872          Preconditions.checkArgument(cmd[0].equals("-"),
873              "Unknown option: " + cmd[0]);
874          Preconditions.checkState(daughterRegions.contains(r),
875              "Missing row: " + r);
876          daughterRegions.remove(r);
877        }
878      }
879      LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
880    }
881    LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
882    for (Pair<String, String> r : daughterRegions) {
883      ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
884          .strToRow(r.getSecond())));
885    }
886    return ret;
887  }
888
889  /**
890   * HexStringSplit is a well-known {@link SplitAlgorithm} for choosing region
891   * boundaries. The format of a HexStringSplit region boundary is the ASCII
892   * representation of an MD5 checksum, or any other uniformly distributed
893   * hexadecimal value. Row are hex-encoded long values in the range
894   * <b>"00000000" =&gt; "FFFFFFFF"</b> and are left-padded with zeros to keep the
895   * same order lexicographically as if they were binary.
896   *
897   * Since this split algorithm uses hex strings as keys, it is easy to read &amp;
898   * write in the shell but takes up more space and may be non-intuitive.
899   */
900  public static class HexStringSplit extends NumberStringSplit {
901    final static String DEFAULT_MIN_HEX = "00000000";
902    final static String DEFAULT_MAX_HEX = "FFFFFFFF";
903    final static int RADIX_HEX = 16;
904
905    public HexStringSplit() {
906      super(DEFAULT_MIN_HEX, DEFAULT_MAX_HEX, RADIX_HEX);
907    }
908
909  }
910
911  /**
912   * The format of a DecimalStringSplit region boundary is the ASCII representation of
913   * reversed sequential number, or any other uniformly distributed decimal value.
914   * Row are decimal-encoded long values in the range
915   * <b>"00000000" =&gt; "99999999"</b> and are left-padded with zeros to keep the
916   * same order lexicographically as if they were binary.
917   */
918  public static class DecimalStringSplit extends NumberStringSplit {
919    final static String DEFAULT_MIN_DEC = "00000000";
920    final static String DEFAULT_MAX_DEC = "99999999";
921    final static int RADIX_DEC = 10;
922
923    public DecimalStringSplit() {
924      super(DEFAULT_MIN_DEC, DEFAULT_MAX_DEC, RADIX_DEC);
925    }
926
927  }
928
929  public abstract static class NumberStringSplit implements SplitAlgorithm {
930
931    String firstRow;
932    BigInteger firstRowInt;
933    String lastRow;
934    BigInteger lastRowInt;
935    int rowComparisonLength;
936    int radix;
937
938    NumberStringSplit(String minRow, String maxRow, int radix) {
939      this.firstRow = minRow;
940      this.lastRow = maxRow;
941      this.radix = radix;
942      this.firstRowInt = BigInteger.ZERO;
943      this.lastRowInt = new BigInteger(lastRow, this.radix);
944      this.rowComparisonLength = lastRow.length();
945    }
946
947    @Override
948    public byte[] split(byte[] start, byte[] end) {
949      BigInteger s = convertToBigInteger(start);
950      BigInteger e = convertToBigInteger(end);
951      Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
952      return convertToByte(split2(s, e));
953    }
954
955    @Override
956    public byte[][] split(int n) {
957      Preconditions.checkArgument(lastRowInt.compareTo(firstRowInt) > 0,
958          "last row (%s) is configured less than first row (%s)", lastRow,
959          firstRow);
960      // +1 to range because the last row is inclusive
961      BigInteger range = lastRowInt.subtract(firstRowInt).add(BigInteger.ONE);
962      Preconditions.checkState(range.compareTo(BigInteger.valueOf(n)) >= 0,
963          "split granularity (%s) is greater than the range (%s)", n, range);
964
965      BigInteger[] splits = new BigInteger[n - 1];
966      BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(n));
967      for (int i = 1; i < n; i++) {
968        // NOTE: this means the last region gets all the slop.
969        // This is not a big deal if we're assuming n << MAXHEX
970        splits[i - 1] = firstRowInt.add(sizeOfEachSplit.multiply(BigInteger
971            .valueOf(i)));
972      }
973      return convertToBytes(splits);
974    }
975
976    @Override
977    public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
978      BigInteger s = convertToBigInteger(start);
979      BigInteger e = convertToBigInteger(end);
980
981      Preconditions.checkArgument(e.compareTo(s) > 0,
982                      "last row (%s) is configured less than first row (%s)", rowToStr(end),
983                      end);
984      // +1 to range because the last row is inclusive
985      BigInteger range = e.subtract(s).add(BigInteger.ONE);
986      Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0,
987              "split granularity (%s) is greater than the range (%s)", numSplits, range);
988
989      BigInteger[] splits = new BigInteger[numSplits - 1];
990      BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits));
991      for (int i = 1; i < numSplits; i++) {
992        // NOTE: this means the last region gets all the slop.
993        // This is not a big deal if we're assuming n << MAXHEX
994        splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger
995                .valueOf(i)));
996      }
997
998      if (inclusive) {
999        BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1];
1000        inclusiveSplitPoints[0] = convertToBigInteger(start);
1001        inclusiveSplitPoints[numSplits] = convertToBigInteger(end);
1002        System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length);
1003        return convertToBytes(inclusiveSplitPoints);
1004      } else {
1005        return convertToBytes(splits);
1006      }
1007    }
1008
1009    @Override
1010    public byte[] firstRow() {
1011      return convertToByte(firstRowInt);
1012    }
1013
1014    @Override
1015    public byte[] lastRow() {
1016      return convertToByte(lastRowInt);
1017    }
1018
1019    @Override
1020    public void setFirstRow(String userInput) {
1021      firstRow = userInput;
1022      firstRowInt = new BigInteger(firstRow, radix);
1023    }
1024
1025    @Override
1026    public void setLastRow(String userInput) {
1027      lastRow = userInput;
1028      lastRowInt = new BigInteger(lastRow, radix);
1029      // Precondition: lastRow > firstRow, so last's length is the greater
1030      rowComparisonLength = lastRow.length();
1031    }
1032
1033    @Override
1034    public byte[] strToRow(String in) {
1035      return convertToByte(new BigInteger(in, radix));
1036    }
1037
1038    @Override
1039    public String rowToStr(byte[] row) {
1040      return Bytes.toStringBinary(row);
1041    }
1042
1043    @Override
1044    public String separator() {
1045      return " ";
1046    }
1047
1048    @Override
1049    public void setFirstRow(byte[] userInput) {
1050      firstRow = Bytes.toString(userInput);
1051    }
1052
1053    @Override
1054    public void setLastRow(byte[] userInput) {
1055      lastRow = Bytes.toString(userInput);
1056    }
1057
1058    /**
1059     * Divide 2 numbers in half (for split algorithm)
1060     *
1061     * @param a number #1
1062     * @param b number #2
1063     * @return the midpoint of the 2 numbers
1064     */
1065    public BigInteger split2(BigInteger a, BigInteger b) {
1066      return a.add(b).divide(BigInteger.valueOf(2)).abs();
1067    }
1068
1069    /**
1070     * Returns an array of bytes corresponding to an array of BigIntegers
1071     *
1072     * @param bigIntegers numbers to convert
1073     * @return bytes corresponding to the bigIntegers
1074     */
1075    public byte[][] convertToBytes(BigInteger[] bigIntegers) {
1076      byte[][] returnBytes = new byte[bigIntegers.length][];
1077      for (int i = 0; i < bigIntegers.length; i++) {
1078        returnBytes[i] = convertToByte(bigIntegers[i]);
1079      }
1080      return returnBytes;
1081    }
1082
1083    /**
1084     * Returns the bytes corresponding to the BigInteger
1085     *
1086     * @param bigInteger number to convert
1087     * @param pad padding length
1088     * @return byte corresponding to input BigInteger
1089     */
1090    public byte[] convertToByte(BigInteger bigInteger, int pad) {
1091      String bigIntegerString = bigInteger.toString(radix);
1092      bigIntegerString = StringUtils.leftPad(bigIntegerString, pad, '0');
1093      return Bytes.toBytes(bigIntegerString);
1094    }
1095
1096    /**
1097     * Returns the bytes corresponding to the BigInteger
1098     *
1099     * @param bigInteger number to convert
1100     * @return corresponding bytes
1101     */
1102    public byte[] convertToByte(BigInteger bigInteger) {
1103      return convertToByte(bigInteger, rowComparisonLength);
1104    }
1105
1106    /**
1107     * Returns the BigInteger represented by the byte array
1108     *
1109     * @param row byte array representing row
1110     * @return the corresponding BigInteger
1111     */
1112    public BigInteger convertToBigInteger(byte[] row) {
1113      return (row.length > 0) ? new BigInteger(Bytes.toString(row), radix)
1114          : BigInteger.ZERO;
1115    }
1116
1117    @Override
1118    public String toString() {
1119      return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1120          + "," + rowToStr(lastRow()) + "]";
1121    }
1122  }
1123
1124  /**
1125   * A SplitAlgorithm that divides the space of possible keys evenly. Useful
1126   * when the keys are approximately uniform random bytes (e.g. hashes). Rows
1127   * are raw byte values in the range <b>00 =&gt; FF</b> and are right-padded with
1128   * zeros to keep the same memcmp() order. This is the natural algorithm to use
1129   * for a byte[] environment and saves space, but is not necessarily the
1130   * easiest for readability.
1131   */
1132  public static class UniformSplit implements SplitAlgorithm {
1133    static final byte xFF = (byte) 0xFF;
1134    byte[] firstRowBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
1135    byte[] lastRowBytes =
1136            new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF};
1137    @Override
1138    public byte[] split(byte[] start, byte[] end) {
1139      return Bytes.split(start, end, 1)[1];
1140    }
1141
1142    @Override
1143    public byte[][] split(int numRegions) {
1144      Preconditions.checkArgument(
1145          Bytes.compareTo(lastRowBytes, firstRowBytes) > 0,
1146          "last row (%s) is configured less than first row (%s)",
1147          Bytes.toStringBinary(lastRowBytes),
1148          Bytes.toStringBinary(firstRowBytes));
1149
1150      byte[][] splits = Bytes.split(firstRowBytes, lastRowBytes, true,
1151          numRegions - 1);
1152      Preconditions.checkState(splits != null,
1153          "Could not split region with given user input: " + this);
1154
1155      // remove endpoints, which are included in the splits list
1156
1157      return splits == null? null: Arrays.copyOfRange(splits, 1, splits.length - 1);
1158    }
1159
1160    @Override
1161    public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
1162      if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
1163        start = firstRowBytes;
1164      }
1165      if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) {
1166        end = lastRowBytes;
1167      }
1168      Preconditions.checkArgument(
1169              Bytes.compareTo(end, start) > 0,
1170              "last row (%s) is configured less than first row (%s)",
1171              Bytes.toStringBinary(end),
1172              Bytes.toStringBinary(start));
1173
1174      byte[][] splits = Bytes.split(start, end, true,
1175              numSplits - 1);
1176      Preconditions.checkState(splits != null,
1177              "Could not calculate input splits with given user input: " + this);
1178      if (inclusive) {
1179        return splits;
1180      } else {
1181        // remove endpoints, which are included in the splits list
1182        return Arrays.copyOfRange(splits, 1, splits.length - 1);
1183      }
1184    }
1185
1186    @Override
1187    public byte[] firstRow() {
1188      return firstRowBytes;
1189    }
1190
1191    @Override
1192    public byte[] lastRow() {
1193      return lastRowBytes;
1194    }
1195
1196    @Override
1197    public void setFirstRow(String userInput) {
1198      firstRowBytes = Bytes.toBytesBinary(userInput);
1199    }
1200
1201    @Override
1202    public void setLastRow(String userInput) {
1203      lastRowBytes = Bytes.toBytesBinary(userInput);
1204    }
1205
1206
1207    @Override
1208    public void setFirstRow(byte[] userInput) {
1209      firstRowBytes = userInput;
1210    }
1211
1212    @Override
1213    public void setLastRow(byte[] userInput) {
1214      lastRowBytes = userInput;
1215    }
1216
1217    @Override
1218    public byte[] strToRow(String input) {
1219      return Bytes.toBytesBinary(input);
1220    }
1221
1222    @Override
1223    public String rowToStr(byte[] row) {
1224      return Bytes.toStringBinary(row);
1225    }
1226
1227    @Override
1228    public String separator() {
1229      return ",";
1230    }
1231
1232    @Override
1233    public String toString() {
1234      return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1235          + "," + rowToStr(lastRow()) + "]";
1236    }
1237  }
1238}