001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.math.BigInteger;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Set;
028import java.util.TreeMap;
029import org.apache.commons.lang3.ArrayUtils;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.NoServerForRegionException;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionLocator;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
053import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
054import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
061import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
062import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
063import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
064import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
067import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
069
070/**
071 * The {@link RegionSplitter} class provides several utilities to help in the administration
072 * lifecycle for developers who choose to manually split regions instead of having HBase handle that
073 * automatically. The most useful utilities are:
074 * <p>
075 * <ul>
076 * <li>Create a table with a specified number of pre-split regions
077 * <li>Execute a rolling split of all regions on an existing table
078 * </ul>
079 * <p>
080 * Both operations can be safely done on a live server.
081 * <p>
082 * <b>Question:</b> How do I turn off automatic splitting? <br>
083 * <b>Answer:</b> Automatic splitting is determined by the configuration value
084 * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this to Long.MAX_VALUE
085 * in case you forget about manual splits. A suggested setting is 100GB, which would result in &gt;
086 * 1hr major compactions if reached.
087 * <p>
088 * <b>Question:</b> Why did the original authors decide to manually split? <br>
089 * <b>Answer:</b> Specific workload characteristics of our use case allowed us to benefit from a
090 * manual split system.
091 * <p>
092 * <ul>
093 * <li>Data (~1k) that would grow instead of being replaced
094 * <li>Data growth was roughly uniform across all regions
095 * <li>OLTP workload. Data loss is a big deal.
096 * </ul>
097 * <p>
098 * <b>Question:</b> Why is manual splitting good for this workload? <br>
099 * <b>Answer:</b> Although automated splitting is not a bad option, there are benefits to manual
100 * splitting.
101 * <p>
102 * <ul>
103 * <li>With growing amounts of data, splits will continually be needed. Since you always know
104 * exactly what regions you have, long-term debugging and profiling is much easier with manual
105 * splits. It is hard to trace the logs to understand region level problems if it keeps splitting
106 * and getting renamed.
107 * <li>Data offlining bugs + unknown number of split regions == oh crap! If an WAL or StoreFile was
108 * mistakenly unprocessed by HBase due to a weird bug and you notice it a day or so later, you can
109 * be assured that the regions specified in these files are the same as the current regions and you
110 * have less headaches trying to restore/replay your data.
111 * <li>You can finely tune your compaction algorithm. With roughly uniform data growth, it's easy to
112 * cause split / compaction storms as the regions all roughly hit the same data size at the same
113 * time. With manual splits, you can let staggered, time-based major compactions spread out your
114 * network IO load.
115 * </ul>
116 * <p>
117 * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
118 * <b>Answer:</b> Mileage will vary depending upon your application.
119 * <p>
120 * The short answer for our application is that we started with 10 pre-split regions / server and
121 * watched our data growth over time. It's better to err on the side of too little regions and
122 * rolling split later.
123 * <p>
124 * The more complicated answer is that this depends upon the largest storefile in your region. With
125 * a growing data size, this will get larger over time. You want the largest region to be just big
126 * enough that the {@link org.apache.hadoop.hbase.regionserver.HStore} compact selection algorithm
127 * only compacts it due to a timed major. If you don't, your cluster can be prone to compaction
128 * storms as the algorithm decides to run major compactions on a large series of regions all at
129 * once. Note that compaction storms are due to the uniform data growth, not the manual split
130 * decision.
131 * <p>
132 * If you pre-split your regions too thin, you can increase the major compaction interval by
133 * configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size grows too large, use this
134 * script to perform a network IO safe rolling split of all regions.
135 */
136@InterfaceAudience.Private
137public class RegionSplitter {
138  private static final Logger LOG = LoggerFactory.getLogger(RegionSplitter.class);
139
140  /**
141   * A generic interface for the RegionSplitter code to use for all it's functionality. Note that
142   * the original authors of this code use {@link HexStringSplit} to partition their table and set
143   * it as default, but provided this for your custom algorithm. To use, create a new derived class
144   * from this interface and call {@link RegionSplitter#createPresplitTable} or
145   * RegionSplitter#rollingSplit(TableName, SplitAlgorithm, Configuration) with the argument
146   * splitClassName giving the name of your class.
147   */
148  public interface SplitAlgorithm {
149    /**
150     * Split a pre-existing region into 2 regions. first row (inclusive) last row (exclusive)
151     * @return the split row to use
152     */
153    byte[] split(byte[] start, byte[] end);
154
155    /**
156     * Split an entire table. number of regions to split the table into user input is validated at
157     * this time. may throw a runtime exception in response to a parse failure
158     * @return array of split keys for the initial regions of the table. The length of the returned
159     *         array should be numRegions-1.
160     */
161    byte[][] split(int numRegions);
162
163    /**
164     * Some MapReduce jobs may want to run multiple mappers per region, this is intended for such
165     * usecase.
166     * @param start     first row (inclusive)
167     * @param end       last row (exclusive)
168     * @param numSplits number of splits to generate
169     * @param inclusive whether start and end are returned as split points
170     */
171    byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive);
172
173    /**
174     * In HBase, the first row is represented by an empty byte array. This might cause problems with
175     * your split algorithm or row printing. All your APIs will be passed firstRow() instead of
176     * empty array.
177     * @return your representation of your first row
178     */
179    byte[] firstRow();
180
181    /**
182     * In HBase, the last row is represented by an empty byte array. This might cause problems with
183     * your split algorithm or row printing. All your APIs will be passed firstRow() instead of
184     * empty array.
185     * @return your representation of your last row
186     */
187    byte[] lastRow();
188
189    /**
190     * In HBase, the last row is represented by an empty byte array. Set this value to help the
191     * split code understand how to evenly divide the first region. raw user input (may throw
192     * RuntimeException on parse failure)
193     */
194    void setFirstRow(String userInput);
195
196    /**
197     * In HBase, the last row is represented by an empty byte array. Set this value to help the
198     * split code understand how to evenly divide the last region. Note that this last row is
199     * inclusive for all rows sharing the same prefix. raw user input (may throw RuntimeException on
200     * parse failure)
201     */
202    void setLastRow(String userInput);
203
204    /**
205     * user or file input for row
206     * @return byte array representation of this row for HBase
207     */
208    byte[] strToRow(String input);
209
210    /**
211     * byte array representing a row in HBase
212     * @return String to use for debug &amp; file printing
213     */
214    String rowToStr(byte[] row);
215
216    /** Returns the separator character to use when storing / printing the row */
217    String separator();
218
219    /**
220     * Set the first row
221     * @param userInput byte array of the row key.
222     */
223    void setFirstRow(byte[] userInput);
224
225    /**
226     * Set the last row
227     * @param userInput byte array of the row key.
228     */
229    void setLastRow(byte[] userInput);
230  }
231
232  /**
233   * The main function for the RegionSplitter application. Common uses:
234   * <p>
235   * <ul>
236   * <li>create a table named 'myTable' with 60 pre-split regions containing 2 column families
237   * 'test' &amp; 'rs', assuming the keys are hex-encoded ASCII:
238   * <ul>
239   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs myTable
240   * HexStringSplit
241   * </ul>
242   * <li>create a table named 'myTable' with 50 pre-split regions, assuming the keys are
243   * decimal-encoded ASCII:
244   * <ul>
245   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 50 myTable DecimalStringSplit
246   * </ul>
247   * <li>perform a rolling split of 'myTable' (i.e. 60 =&gt; 120 regions), # 2 outstanding splits at
248   * a time, assuming keys are uniformly distributed bytes:
249   * <ul>
250   * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable UniformSplit
251   * </ul>
252   * </ul>
253   * There are three SplitAlgorithms built into RegionSplitter, HexStringSplit, DecimalStringSplit,
254   * and UniformSplit. These are different strategies for choosing region boundaries. See their
255   * source code for details. Usage: RegionSplitter &lt;TABLE&gt; &lt;SPLITALGORITHM&gt; &lt;-c
256   * &lt;# regions&gt; -f &lt;family:family:...&gt; | -r [-o &lt;# outstanding splits&gt;]&gt; [-D
257   * &lt;conf.param=value&gt;] HBase IO problem user requested exit problem parsing user input
258   */
259  @SuppressWarnings("static-access")
260  public static void main(String[] args) throws IOException, InterruptedException, ParseException {
261    Configuration conf = HBaseConfiguration.create();
262
263    // parse user input
264    Options opt = new Options();
265    opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
266      .withDescription("Override HBase Configuration Settings").create("D"));
267    opt.addOption(OptionBuilder.withArgName("region count").hasArg()
268      .withDescription("Create a new table with a pre-split number of regions").create("c"));
269    opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
270      .withDescription("Column Families to create with new table.  Required with -c").create("f"));
271    opt.addOption("h", false, "Print this usage help");
272    opt.addOption("r", false, "Perform a rolling split of an existing region");
273    opt.addOption(OptionBuilder.withArgName("count").hasArg()
274      .withDescription("Max outstanding splits that have unfinished major compactions")
275      .create("o"));
276    opt.addOption(null, "firstrow", true, "First Row in Table for Split Algorithm");
277    opt.addOption(null, "lastrow", true, "Last Row in Table for Split Algorithm");
278    opt.addOption(null, "risky", false, "Skip verification steps to complete quickly. "
279      + "STRONGLY DISCOURAGED for production systems.  ");
280    CommandLine cmd = new GnuParser().parse(opt, args);
281
282    if (cmd.hasOption("D")) {
283      for (String confOpt : cmd.getOptionValues("D")) {
284        String[] kv = confOpt.split("=", 2);
285        if (kv.length == 2) {
286          conf.set(kv[0], kv[1]);
287          LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
288        } else {
289          throw new ParseException("-D option format invalid: " + confOpt);
290        }
291      }
292    }
293
294    if (cmd.hasOption("risky")) {
295      conf.setBoolean("split.verify", false);
296    }
297
298    boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
299    boolean rollingSplit = cmd.hasOption("r");
300    boolean oneOperOnly = createTable ^ rollingSplit;
301
302    if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
303      new HelpFormatter().printHelp("bin/hbase regionsplitter <TABLE> <SPLITALGORITHM>\n"
304        + "SPLITALGORITHM is the java class name of a class implementing "
305        + "SplitAlgorithm, or one of the special strings HexStringSplit or "
306        + "DecimalStringSplit or UniformSplit, which are built-in split algorithms. "
307        + "HexStringSplit treats keys as hexadecimal ASCII, and "
308        + "DecimalStringSplit treats keys as decimal ASCII, and "
309        + "UniformSplit treats keys as arbitrary bytes.", opt);
310      return;
311    }
312    TableName tableName = TableName.valueOf(cmd.getArgs()[0]);
313    String splitClass = cmd.getArgs()[1];
314    SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass);
315
316    if (cmd.hasOption("firstrow")) {
317      splitAlgo.setFirstRow(cmd.getOptionValue("firstrow"));
318    }
319    if (cmd.hasOption("lastrow")) {
320      splitAlgo.setLastRow(cmd.getOptionValue("lastrow"));
321    }
322
323    if (createTable) {
324      conf.set("split.count", cmd.getOptionValue("c"));
325      createPresplitTable(tableName, splitAlgo, cmd.getOptionValue("f").split(":"), conf);
326    }
327
328    if (rollingSplit) {
329      if (cmd.hasOption("o")) {
330        conf.set("split.outstanding", cmd.getOptionValue("o"));
331      }
332      rollingSplit(tableName, splitAlgo, conf);
333    }
334  }
335
336  static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo,
337    String[] columnFamilies, Configuration conf) throws IOException, InterruptedException {
338    final int splitCount = conf.getInt("split.count", 0);
339    Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
340
341    Preconditions.checkArgument(columnFamilies.length > 0,
342      "Must specify at least one column family. ");
343    LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
344      + " column families.  Presplitting to " + splitCount + " regions");
345
346    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
347    for (String cf : columnFamilies) {
348      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf));
349    }
350    try (Connection connection = ConnectionFactory.createConnection(conf)) {
351      Admin admin = connection.getAdmin();
352      try {
353        Preconditions.checkArgument(!admin.tableExists(tableName),
354          "Table already exists: " + tableName);
355        admin.createTable(builder.build(), splitAlgo.split(splitCount));
356      } finally {
357        admin.close();
358      }
359      LOG.debug("Table created!  Waiting for regions to show online in META...");
360      if (!conf.getBoolean("split.verify", true)) {
361        // NOTE: createTable is synchronous on the table, but not on the regions
362        int onlineRegions = 0;
363        try (RegionLocator locator = connection.getRegionLocator(tableName)) {
364          while (onlineRegions < splitCount) {
365            onlineRegions = locator.getAllRegionLocations().size();
366            LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
367            if (onlineRegions < splitCount) {
368              Thread.sleep(10 * 1000); // sleep
369            }
370          }
371        }
372      }
373      LOG.debug("Finished creating table with " + splitCount + " regions");
374    }
375  }
376
377  /**
378   * Alternative getCurrentNrHRS which is no longer available.
379   * @return Rough count of regionservers out on cluster.
380   * @throws IOException if a remote or network exception occurs
381   */
382  private static int getRegionServerCount(final Connection connection) throws IOException {
383    try (Admin admin = connection.getAdmin()) {
384      Collection<ServerName> servers = admin.getRegionServers();
385      return servers == null || servers.isEmpty() ? 0 : servers.size();
386    }
387  }
388
389  private static byte[] readFile(final FileSystem fs, final Path path) throws IOException {
390    FSDataInputStream tmpIn = fs.open(path);
391    try {
392      byte[] rawData = new byte[tmpIn.available()];
393      tmpIn.readFully(rawData);
394      return rawData;
395    } finally {
396      tmpIn.close();
397    }
398  }
399
400  static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
401    throws IOException, InterruptedException {
402    final int minOS = conf.getInt("split.outstanding", 2);
403    try (Connection connection = ConnectionFactory.createConnection(conf)) {
404      // Max outstanding splits. default == 50% of servers
405      final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
406
407      Path hbDir = CommonFSUtils.getRootDir(conf);
408      Path tableDir = CommonFSUtils.getTableDir(hbDir, tableName);
409      Path splitFile = new Path(tableDir, "_balancedSplit");
410      FileSystem fs = FileSystem.get(conf);
411
412      // Get a list of daughter regions to create
413      LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
414      try (Table table = connection.getTable(tableName)) {
415        tmpRegionSet = getSplits(connection, tableName, splitAlgo);
416      }
417      LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
418      int splitCount = 0;
419      final int origCount = tmpRegionSet.size();
420
421      // all splits must compact & we have 1 compact thread, so 2 split
422      // requests to the same RS can stall the outstanding split queue.
423      // To fix, group the regions into an RS pool and round-robin through it
424      LOG.debug("Bucketing regions by regionserver...");
425      TreeMap<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegions = Maps.newTreeMap();
426      // Get a regionLocator. Need it in below.
427      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
428        for (Pair<byte[], byte[]> dr : tmpRegionSet) {
429          ServerName rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getServerName();
430          if (!daughterRegions.containsKey(rsLocation)) {
431            LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
432            daughterRegions.put(rsLocation, entry);
433          }
434          daughterRegions.get(rsLocation).add(dr);
435        }
436        LOG.debug("Done with bucketing.  Split time!");
437        long startTime = EnvironmentEdgeManager.currentTime();
438
439        // Open the split file and modify it as splits finish
440        byte[] rawData = readFile(fs, splitFile);
441
442        FSDataOutputStream splitOut = fs.create(splitFile);
443        try {
444          splitOut.write(rawData);
445
446          try {
447            // *** split code ***
448            while (!daughterRegions.isEmpty()) {
449              LOG.debug(daughterRegions.size() + " RS have regions to splt.");
450
451              // Get ServerName to region count mapping
452              final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
453              List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
454              for (HRegionLocation hrl : hrls) {
455                ServerName sn = hrl.getServerName();
456                if (rsSizes.containsKey(sn)) {
457                  rsSizes.put(sn, rsSizes.get(sn) + 1);
458                } else {
459                  rsSizes.put(sn, 1);
460                }
461              }
462
463              // Sort the ServerNames by the number of regions they have
464              final List<ServerName> serversLeft = Lists.newArrayList(daughterRegions.keySet());
465              serversLeft.sort(Comparator.comparing(rsSizes::get));
466
467              // Round-robin through the ServerName list. Choose the lightest-loaded servers
468              // first to keep the master from load-balancing regions as we split.
469              for (final ServerName rsLoc : serversLeft) {
470                Pair<byte[], byte[]> dr = null;
471                final LinkedList<Pair<byte[], byte[]>> regionList = daughterRegions.get(rsLoc);
472
473                // Find a region in the ServerName list that hasn't been moved
474                LOG.debug("Finding a region on " + rsLoc);
475                while (!regionList.isEmpty()) {
476                  dr = regionList.pop();
477
478                  // get current region info
479                  byte[] split = dr.getSecond();
480                  HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
481
482                  // if this region moved locations
483                  ServerName newRs = regionLoc.getServerName();
484                  if (newRs.compareTo(rsLoc) != 0) {
485                    LOG.debug("Region with " + splitAlgo.rowToStr(split) + " moved to " + newRs
486                      + ". Relocating...");
487                    // relocate it, don't use it right now
488                    if (!daughterRegions.containsKey(newRs)) {
489                      LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
490                      daughterRegions.put(newRs, entry);
491                    }
492                    daughterRegions.get(newRs).add(dr);
493                    dr = null;
494                    continue;
495                  }
496
497                  // make sure this region wasn't already split
498                  byte[] sk = regionLoc.getRegion().getStartKey();
499                  if (sk.length != 0) {
500                    if (Bytes.equals(split, sk)) {
501                      LOG.debug("Region already split on " + splitAlgo.rowToStr(split)
502                        + ".  Skipping this region...");
503                      ++splitCount;
504                      dr = null;
505                      continue;
506                    }
507                    byte[] start = dr.getFirst();
508                    Preconditions.checkArgument(Bytes.equals(start, sk),
509                      splitAlgo.rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
510                  }
511
512                  // passed all checks! found a good region
513                  break;
514                }
515                if (regionList.isEmpty()) {
516                  daughterRegions.remove(rsLoc);
517                }
518                if (dr == null) continue;
519
520                // we have a good region, time to split!
521                byte[] split = dr.getSecond();
522                LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
523                try (Admin admin = connection.getAdmin()) {
524                  admin.split(tableName, split);
525                }
526
527                LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
528                LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
529                if (conf.getBoolean("split.verify", true)) {
530                  // we need to verify and rate-limit our splits
531                  outstanding.addLast(dr);
532                  // with too many outstanding splits, wait for some to finish
533                  while (outstanding.size() >= MAX_OUTSTANDING) {
534                    LOG.debug("Wait for outstanding splits " + outstanding.size());
535                    local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
536                    if (local_finished.isEmpty()) {
537                      Thread.sleep(30 * 1000);
538                    } else {
539                      finished.addAll(local_finished);
540                      outstanding.removeAll(local_finished);
541                      LOG.debug(local_finished.size() + " outstanding splits finished");
542                    }
543                  }
544                } else {
545                  finished.add(dr);
546                }
547
548                // mark each finished region as successfully split.
549                for (Pair<byte[], byte[]> region : finished) {
550                  splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst()) + " "
551                    + splitAlgo.rowToStr(region.getSecond()) + "\n");
552                  splitCount++;
553                  if (splitCount % 10 == 0) {
554                    long tDiff = (EnvironmentEdgeManager.currentTime() - startTime) / splitCount;
555                    LOG.debug(
556                      "STATUS UPDATE: " + splitCount + " / " + origCount + ". Avg Time / Split = "
557                        + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
558                  }
559                }
560              }
561            }
562            if (conf.getBoolean("split.verify", true)) {
563              while (!outstanding.isEmpty()) {
564                LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
565                LinkedList<Pair<byte[], byte[]>> finished =
566                  splitScan(outstanding, connection, tableName, splitAlgo);
567                if (finished.isEmpty()) {
568                  Thread.sleep(30 * 1000);
569                } else {
570                  outstanding.removeAll(finished);
571                  for (Pair<byte[], byte[]> region : finished) {
572                    splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst()) + " "
573                      + splitAlgo.rowToStr(region.getSecond()) + "\n");
574                    splitCount++;
575                  }
576                  LOG.debug("Finally " + finished.size() + " outstanding splits finished");
577                }
578              }
579            }
580            LOG.debug("All regions have been successfully split!");
581          } finally {
582            long tDiff = EnvironmentEdgeManager.currentTime() - startTime;
583            LOG.debug("TOTAL TIME = " + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
584            LOG.debug("Splits = " + splitCount);
585            if (0 < splitCount) {
586              LOG.debug("Avg Time / Split = "
587                + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
588            }
589          }
590        } finally {
591          splitOut.close();
592          fs.delete(splitFile, false);
593        }
594      }
595    }
596  }
597
598  /**
599   * @throws IOException if the specified SplitAlgorithm class couldn't be instantiated
600   */
601  public static SplitAlgorithm newSplitAlgoInstance(Configuration conf, String splitClassName)
602    throws IOException {
603    Class<?> splitClass;
604
605    // For split algorithms builtin to RegionSplitter, the user can specify
606    // their simple class name instead of a fully qualified class name.
607    if (splitClassName.equals(HexStringSplit.class.getSimpleName())) {
608      splitClass = HexStringSplit.class;
609    } else if (splitClassName.equals(DecimalStringSplit.class.getSimpleName())) {
610      splitClass = DecimalStringSplit.class;
611    } else if (splitClassName.equals(UniformSplit.class.getSimpleName())) {
612      splitClass = UniformSplit.class;
613    } else {
614      try {
615        splitClass = conf.getClassByName(splitClassName);
616      } catch (ClassNotFoundException e) {
617        throw new IOException("Couldn't load split class " + splitClassName, e);
618      }
619      if (splitClass == null) {
620        throw new IOException("Failed loading split class " + splitClassName);
621      }
622      if (!SplitAlgorithm.class.isAssignableFrom(splitClass)) {
623        throw new IOException("Specified split class doesn't implement SplitAlgorithm");
624      }
625    }
626    try {
627      return splitClass.asSubclass(SplitAlgorithm.class).getDeclaredConstructor().newInstance();
628    } catch (Exception e) {
629      throw new IOException("Problem loading split algorithm: ", e);
630    }
631  }
632
633  static LinkedList<Pair<byte[], byte[]>> splitScan(LinkedList<Pair<byte[], byte[]>> regionList,
634    final Connection connection, final TableName tableName, SplitAlgorithm splitAlgo)
635    throws IOException, InterruptedException {
636    LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
637    LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
638    LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
639
640    // Get table info
641    Pair<Path, Path> tableDirAndSplitFile =
642      getTableDirAndSplitFile(connection.getConfiguration(), tableName);
643    Path tableDir = tableDirAndSplitFile.getFirst();
644    FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
645    // Clear the cache to forcibly refresh region information
646    connection.clearRegionLocationCache();
647    TableDescriptor htd = null;
648    try (Table table = connection.getTable(tableName)) {
649      htd = table.getDescriptor();
650    }
651    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
652      // for every region that hasn't been verified as a finished split
653      for (Pair<byte[], byte[]> region : regionList) {
654        byte[] start = region.getFirst();
655        byte[] split = region.getSecond();
656
657        // see if the new split daughter region has come online
658        try {
659          RegionInfo dri = regionLocator.getRegionLocation(split, true).getRegion();
660          if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
661            logicalSplitting.add(region);
662            continue;
663          }
664        } catch (NoServerForRegionException nsfre) {
665          // NSFRE will occur if the old hbase:meta entry has no server assigned
666          LOG.info(nsfre.toString(), nsfre);
667          logicalSplitting.add(region);
668          continue;
669        }
670
671        try {
672          // when a daughter region is opened, a compaction is triggered
673          // wait until compaction completes for both daughter regions
674          LinkedList<RegionInfo> check = Lists.newLinkedList();
675          check.add(regionLocator.getRegionLocation(start).getRegion());
676          check.add(regionLocator.getRegionLocation(split).getRegion());
677          for (RegionInfo hri : check.toArray(new RegionInfo[check.size()])) {
678            byte[] sk = hri.getStartKey();
679            if (sk.length == 0) sk = splitAlgo.firstRow();
680
681            HRegionFileSystem regionFs = HRegionFileSystem
682              .openRegionFromFileSystem(connection.getConfiguration(), fs, tableDir, hri, true);
683
684            // Check every Column Family for that region -- check does not have references.
685            boolean refFound = false;
686            for (ColumnFamilyDescriptor c : htd.getColumnFamilies()) {
687              StoreFileTracker sft = StoreFileTrackerFactory
688                .create(regionFs.getFileSystem().getConf(), htd, c, regionFs);
689              if ((refFound = sft.hasReferences())) {
690                break;
691              }
692            }
693
694            // compaction is completed when all reference files are gone
695            if (!refFound) {
696              check.remove(hri);
697            }
698          }
699          if (check.isEmpty()) {
700            finished.add(region);
701          } else {
702            physicalSplitting.add(region);
703          }
704        } catch (NoServerForRegionException nsfre) {
705          LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
706          physicalSplitting.add(region);
707          connection.clearRegionLocationCache();
708        }
709      }
710
711      LOG.debug("Split Scan: " + finished.size() + " finished / " + logicalSplitting.size()
712        + " split wait / " + physicalSplitting.size() + " reference wait");
713
714      return finished;
715    }
716  }
717
718  /**
719   * @return A Pair where first item is table dir and second is the split file.
720   * @throws IOException if a remote or network exception occurs
721   */
722  private static Pair<Path, Path> getTableDirAndSplitFile(final Configuration conf,
723    final TableName tableName) throws IOException {
724    Path hbDir = CommonFSUtils.getRootDir(conf);
725    Path tableDir = CommonFSUtils.getTableDir(hbDir, tableName);
726    Path splitFile = new Path(tableDir, "_balancedSplit");
727    return new Pair<>(tableDir, splitFile);
728  }
729
730  static LinkedList<Pair<byte[], byte[]>> getSplits(final Connection connection,
731    TableName tableName, SplitAlgorithm splitAlgo) throws IOException {
732    Pair<Path, Path> tableDirAndSplitFile =
733      getTableDirAndSplitFile(connection.getConfiguration(), tableName);
734    Path tableDir = tableDirAndSplitFile.getFirst();
735    Path splitFile = tableDirAndSplitFile.getSecond();
736
737    FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
738
739    // Using strings because (new byte[]{0}).equals(new byte[]{0}) == false
740    Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
741
742    // Does a split file exist?
743    if (!fs.exists(splitFile)) {
744      // NO = fresh start. calculate splits to make
745      LOG.debug("No " + splitFile.getName() + " file. Calculating splits ");
746
747      // Query meta for all regions in the table
748      Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
749      Pair<byte[][], byte[][]> tmp = null;
750      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
751        tmp = regionLocator.getStartEndKeys();
752      }
753      Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length,
754        "Start and End rows should be equivalent");
755      for (int i = 0; i < tmp.getFirst().length; ++i) {
756        byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
757        if (start.length == 0) start = splitAlgo.firstRow();
758        if (end.length == 0) end = splitAlgo.lastRow();
759        rows.add(Pair.newPair(start, end));
760      }
761      LOG.debug("Table " + tableName + " has " + rows.size() + " regions that will be split.");
762
763      // prepare the split file
764      Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
765      FSDataOutputStream tmpOut = fs.create(tmpFile);
766
767      // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
768      for (Pair<byte[], byte[]> r : rows) {
769        byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
770        String startStr = splitAlgo.rowToStr(r.getFirst());
771        String splitStr = splitAlgo.rowToStr(splitPoint);
772        daughterRegions.add(Pair.newPair(startStr, splitStr));
773        LOG.debug("Will Split [" + startStr + " , " + splitAlgo.rowToStr(r.getSecond()) + ") at "
774          + splitStr);
775        tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr + "\n");
776      }
777      tmpOut.close();
778      fs.rename(tmpFile, splitFile);
779    } else {
780      LOG.debug("_balancedSplit file found. Replay log to restore state...");
781      RecoverLeaseFSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
782
783      // parse split file and process remaining splits
784      FSDataInputStream tmpIn = fs.open(splitFile);
785      StringBuilder sb = new StringBuilder(tmpIn.available());
786      while (tmpIn.available() > 0) {
787        sb.append(tmpIn.readChar());
788      }
789      tmpIn.close();
790      for (String line : sb.toString().split("\n")) {
791        String[] cmd = line.split(splitAlgo.separator());
792        Preconditions.checkArgument(3 == cmd.length);
793        byte[] start = splitAlgo.strToRow(cmd[1]);
794        String startStr = splitAlgo.rowToStr(start);
795        byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
796        String splitStr = splitAlgo.rowToStr(splitPoint);
797        Pair<String, String> r = Pair.newPair(startStr, splitStr);
798        if (cmd[0].equals("+")) {
799          LOG.debug("Adding: " + r);
800          daughterRegions.add(r);
801        } else {
802          LOG.debug("Removing: " + r);
803          Preconditions.checkArgument(cmd[0].equals("-"), "Unknown option: " + cmd[0]);
804          Preconditions.checkState(daughterRegions.contains(r), "Missing row: " + r);
805          daughterRegions.remove(r);
806        }
807      }
808      LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
809    }
810    LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
811    for (Pair<String, String> r : daughterRegions) {
812      ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo.strToRow(r.getSecond())));
813    }
814    return ret;
815  }
816
817  /**
818   * HexStringSplit is a well-known {@link SplitAlgorithm} for choosing region boundaries. The
819   * format of a HexStringSplit region boundary is the ASCII representation of an MD5 checksum, or
820   * any other uniformly distributed hexadecimal value. Row are hex-encoded long values in the range
821   * <b>"00000000" =&gt; "FFFFFFFF"</b> and are left-padded with zeros to keep the same order
822   * lexicographically as if they were binary. Since this split algorithm uses hex strings as keys,
823   * it is easy to read &amp; write in the shell but takes up more space and may be non-intuitive.
824   */
825  public static class HexStringSplit extends NumberStringSplit {
826    final static String DEFAULT_MIN_HEX = "00000000";
827    final static String DEFAULT_MAX_HEX = "FFFFFFFF";
828    final static int RADIX_HEX = 16;
829
830    public HexStringSplit() {
831      super(DEFAULT_MIN_HEX, DEFAULT_MAX_HEX, RADIX_HEX);
832    }
833
834  }
835
836  /**
837   * The format of a DecimalStringSplit region boundary is the ASCII representation of reversed
838   * sequential number, or any other uniformly distributed decimal value. Row are decimal-encoded
839   * long values in the range <b>"00000000" =&gt; "99999999"</b> and are left-padded with zeros to
840   * keep the same order lexicographically as if they were binary.
841   */
842  public static class DecimalStringSplit extends NumberStringSplit {
843    final static String DEFAULT_MIN_DEC = "00000000";
844    final static String DEFAULT_MAX_DEC = "99999999";
845    final static int RADIX_DEC = 10;
846
847    public DecimalStringSplit() {
848      super(DEFAULT_MIN_DEC, DEFAULT_MAX_DEC, RADIX_DEC);
849    }
850
851  }
852
853  public abstract static class NumberStringSplit implements SplitAlgorithm {
854
855    String firstRow;
856    BigInteger firstRowInt;
857    String lastRow;
858    BigInteger lastRowInt;
859    int rowComparisonLength;
860    int radix;
861
862    NumberStringSplit(String minRow, String maxRow, int radix) {
863      this.firstRow = minRow;
864      this.lastRow = maxRow;
865      this.radix = radix;
866      this.firstRowInt = BigInteger.ZERO;
867      this.lastRowInt = new BigInteger(lastRow, this.radix);
868      this.rowComparisonLength = lastRow.length();
869    }
870
871    @Override
872    public byte[] split(byte[] start, byte[] end) {
873      BigInteger s = convertToBigInteger(start);
874      BigInteger e = convertToBigInteger(end);
875      Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
876      return convertToByte(split2(s, e));
877    }
878
879    @Override
880    public byte[][] split(int n) {
881      Preconditions.checkArgument(lastRowInt.compareTo(firstRowInt) > 0,
882        "last row (%s) is configured less than first row (%s)", lastRow, firstRow);
883      // +1 to range because the last row is inclusive
884      BigInteger range = lastRowInt.subtract(firstRowInt).add(BigInteger.ONE);
885      Preconditions.checkState(range.compareTo(BigInteger.valueOf(n)) >= 0,
886        "split granularity (%s) is greater than the range (%s)", n, range);
887
888      BigInteger[] splits = new BigInteger[n - 1];
889      BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(n));
890      for (int i = 1; i < n; i++) {
891        // NOTE: this means the last region gets all the slop.
892        // This is not a big deal if we're assuming n << MAXHEX
893        splits[i - 1] = firstRowInt.add(sizeOfEachSplit.multiply(BigInteger.valueOf(i)));
894      }
895      return convertToBytes(splits);
896    }
897
898    @Override
899    public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
900      BigInteger s = convertToBigInteger(start);
901      BigInteger e = convertToBigInteger(end);
902
903      Preconditions.checkArgument(e.compareTo(s) > 0,
904        "last row (%s) is configured less than first row (%s)", rowToStr(end), end);
905      // +1 to range because the last row is inclusive
906      BigInteger range = e.subtract(s).add(BigInteger.ONE);
907      Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0,
908        "split granularity (%s) is greater than the range (%s)", numSplits, range);
909
910      BigInteger[] splits = new BigInteger[numSplits - 1];
911      BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits));
912      for (int i = 1; i < numSplits; i++) {
913        // NOTE: this means the last region gets all the slop.
914        // This is not a big deal if we're assuming n << MAXHEX
915        splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger.valueOf(i)));
916      }
917
918      if (inclusive) {
919        BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1];
920        inclusiveSplitPoints[0] = convertToBigInteger(start);
921        inclusiveSplitPoints[numSplits] = convertToBigInteger(end);
922        System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length);
923        return convertToBytes(inclusiveSplitPoints);
924      } else {
925        return convertToBytes(splits);
926      }
927    }
928
929    @Override
930    public byte[] firstRow() {
931      return convertToByte(firstRowInt);
932    }
933
934    @Override
935    public byte[] lastRow() {
936      return convertToByte(lastRowInt);
937    }
938
939    @Override
940    public void setFirstRow(String userInput) {
941      firstRow = userInput;
942      firstRowInt = new BigInteger(firstRow, radix);
943    }
944
945    @Override
946    public void setLastRow(String userInput) {
947      lastRow = userInput;
948      lastRowInt = new BigInteger(lastRow, radix);
949      // Precondition: lastRow > firstRow, so last's length is the greater
950      rowComparisonLength = lastRow.length();
951    }
952
953    @Override
954    public byte[] strToRow(String in) {
955      return convertToByte(new BigInteger(in, radix));
956    }
957
958    @Override
959    public String rowToStr(byte[] row) {
960      return Bytes.toStringBinary(row);
961    }
962
963    @Override
964    public String separator() {
965      return " ";
966    }
967
968    @Override
969    public void setFirstRow(byte[] userInput) {
970      firstRow = Bytes.toString(userInput);
971    }
972
973    @Override
974    public void setLastRow(byte[] userInput) {
975      lastRow = Bytes.toString(userInput);
976    }
977
978    /**
979     * Divide 2 numbers in half (for split algorithm)
980     * @param a number #1
981     * @param b number #2
982     * @return the midpoint of the 2 numbers
983     */
984    public BigInteger split2(BigInteger a, BigInteger b) {
985      if (b.equals(lastRowInt)) {
986        b = b.add(BigInteger.ONE);
987      }
988      return a.add(b).divide(BigInteger.valueOf(2)).abs();
989    }
990
991    /**
992     * Returns an array of bytes corresponding to an array of BigIntegers
993     * @param bigIntegers numbers to convert
994     * @return bytes corresponding to the bigIntegers
995     */
996    public byte[][] convertToBytes(BigInteger[] bigIntegers) {
997      byte[][] returnBytes = new byte[bigIntegers.length][];
998      for (int i = 0; i < bigIntegers.length; i++) {
999        returnBytes[i] = convertToByte(bigIntegers[i]);
1000      }
1001      return returnBytes;
1002    }
1003
1004    /**
1005     * Returns the bytes corresponding to the BigInteger
1006     * @param bigInteger number to convert
1007     * @param pad        padding length
1008     * @return byte corresponding to input BigInteger
1009     */
1010    public byte[] convertToByte(BigInteger bigInteger, int pad) {
1011      String bigIntegerString = bigInteger.toString(radix);
1012      bigIntegerString = StringUtils.leftPad(bigIntegerString, pad, '0');
1013      return Bytes.toBytes(bigIntegerString);
1014    }
1015
1016    /**
1017     * Returns the bytes corresponding to the BigInteger
1018     * @param bigInteger number to convert
1019     * @return corresponding bytes
1020     */
1021    public byte[] convertToByte(BigInteger bigInteger) {
1022      return convertToByte(bigInteger, rowComparisonLength);
1023    }
1024
1025    /**
1026     * Returns the BigInteger represented by the byte array
1027     * @param row byte array representing row
1028     * @return the corresponding BigInteger
1029     */
1030    public BigInteger convertToBigInteger(byte[] row) {
1031      return (row.length > 0) ? new BigInteger(Bytes.toString(row), radix) : BigInteger.ZERO;
1032    }
1033
1034    @Override
1035    public String toString() {
1036      return this.getClass().getSimpleName() + " [" + rowToStr(firstRow()) + ","
1037        + rowToStr(lastRow()) + "]";
1038    }
1039  }
1040
1041  /**
1042   * A SplitAlgorithm that divides the space of possible keys evenly. Useful when the keys are
1043   * approximately uniform random bytes (e.g. hashes). Rows are raw byte values in the range <b>00
1044   * =&gt; FF</b> and are right-padded with zeros to keep the same memcmp() order. This is the
1045   * natural algorithm to use for a byte[] environment and saves space, but is not necessarily the
1046   * easiest for readability.
1047   */
1048  public static class UniformSplit implements SplitAlgorithm {
1049    static final byte xFF = (byte) 0xFF;
1050    byte[] firstRowBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
1051    byte[] lastRowBytes = new byte[] { xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF };
1052
1053    @Override
1054    public byte[] split(byte[] start, byte[] end) {
1055      return Bytes.split(start, end, 1)[1];
1056    }
1057
1058    @Override
1059    public byte[][] split(int numRegions) {
1060      Preconditions.checkArgument(Bytes.compareTo(lastRowBytes, firstRowBytes) > 0,
1061        "last row (%s) is configured less than first row (%s)", Bytes.toStringBinary(lastRowBytes),
1062        Bytes.toStringBinary(firstRowBytes));
1063
1064      byte[][] splits = Bytes.split(firstRowBytes, lastRowBytes, true, numRegions - 1);
1065      Preconditions.checkState(splits != null,
1066        "Could not split region with given user input: " + this);
1067
1068      // remove endpoints, which are included in the splits list
1069
1070      return splits == null ? null : Arrays.copyOfRange(splits, 1, splits.length - 1);
1071    }
1072
1073    @Override
1074    public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
1075      if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
1076        start = firstRowBytes;
1077      }
1078      if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) {
1079        end = lastRowBytes;
1080      }
1081      Preconditions.checkArgument(Bytes.compareTo(end, start) > 0,
1082        "last row (%s) is configured less than first row (%s)", Bytes.toStringBinary(end),
1083        Bytes.toStringBinary(start));
1084
1085      byte[][] splits = Bytes.split(start, end, true, numSplits - 1);
1086      Preconditions.checkState(splits != null,
1087        "Could not calculate input splits with given user input: " + this);
1088      if (inclusive) {
1089        return splits;
1090      } else {
1091        // remove endpoints, which are included in the splits list
1092        return Arrays.copyOfRange(splits, 1, splits.length - 1);
1093      }
1094    }
1095
1096    @Override
1097    public byte[] firstRow() {
1098      return firstRowBytes;
1099    }
1100
1101    @Override
1102    public byte[] lastRow() {
1103      return lastRowBytes;
1104    }
1105
1106    @Override
1107    public void setFirstRow(String userInput) {
1108      firstRowBytes = Bytes.toBytesBinary(userInput);
1109    }
1110
1111    @Override
1112    public void setLastRow(String userInput) {
1113      lastRowBytes = Bytes.toBytesBinary(userInput);
1114    }
1115
1116    @Override
1117    public void setFirstRow(byte[] userInput) {
1118      firstRowBytes = userInput;
1119    }
1120
1121    @Override
1122    public void setLastRow(byte[] userInput) {
1123      lastRowBytes = userInput;
1124    }
1125
1126    @Override
1127    public byte[] strToRow(String input) {
1128      return Bytes.toBytesBinary(input);
1129    }
1130
1131    @Override
1132    public String rowToStr(byte[] row) {
1133      return Bytes.toStringBinary(row);
1134    }
1135
1136    @Override
1137    public String separator() {
1138      return ",";
1139    }
1140
1141    @Override
1142    public String toString() {
1143      return this.getClass().getSimpleName() + " [" + rowToStr(firstRow()) + ","
1144        + rowToStr(lastRow()) + "]";
1145    }
1146  }
1147}