001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.lang.reflect.Field;
029import java.lang.reflect.Modifier;
030import java.net.BindException;
031import java.net.DatagramSocket;
032import java.net.InetAddress;
033import java.net.ServerSocket;
034import java.net.Socket;
035import java.net.UnknownHostException;
036import java.nio.charset.StandardCharsets;
037import java.security.MessageDigest;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.EnumSet;
043import java.util.HashSet;
044import java.util.List;
045import java.util.Map;
046import java.util.NavigableSet;
047import java.util.Properties;
048import java.util.Random;
049import java.util.Set;
050import java.util.TreeSet;
051import java.util.UUID;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Collectors;
055import org.apache.commons.io.FileUtils;
056import org.apache.commons.lang3.RandomStringUtils;
057import org.apache.commons.logging.impl.Jdk14Logger;
058import org.apache.commons.logging.impl.Log4JLogger;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.ClusterMetrics.Option;
063import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
064import org.apache.hadoop.hbase.Waiter.Predicate;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.BufferedMutator;
067import org.apache.hadoop.hbase.client.ClusterConnection;
068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
070import org.apache.hadoop.hbase.client.Connection;
071import org.apache.hadoop.hbase.client.ConnectionFactory;
072import org.apache.hadoop.hbase.client.Consistency;
073import org.apache.hadoop.hbase.client.Delete;
074import org.apache.hadoop.hbase.client.Durability;
075import org.apache.hadoop.hbase.client.Get;
076import org.apache.hadoop.hbase.client.HBaseAdmin;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.ImmutableHRegionInfo;
079import org.apache.hadoop.hbase.client.ImmutableHTableDescriptor;
080import org.apache.hadoop.hbase.client.Put;
081import org.apache.hadoop.hbase.client.RegionInfo;
082import org.apache.hadoop.hbase.client.RegionInfoBuilder;
083import org.apache.hadoop.hbase.client.RegionLocator;
084import org.apache.hadoop.hbase.client.Result;
085import org.apache.hadoop.hbase.client.ResultScanner;
086import org.apache.hadoop.hbase.client.Scan;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
096import org.apache.hadoop.hbase.io.hfile.HFile;
097import org.apache.hadoop.hbase.ipc.RpcServerInterface;
098import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
100import org.apache.hadoop.hbase.master.HMaster;
101import org.apache.hadoop.hbase.master.RegionState;
102import org.apache.hadoop.hbase.master.ServerManager;
103import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
105import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
106import org.apache.hadoop.hbase.master.assignment.RegionStates;
107import org.apache.hadoop.hbase.regionserver.BloomType;
108import org.apache.hadoop.hbase.regionserver.ChunkCreator;
109import org.apache.hadoop.hbase.regionserver.HRegion;
110import org.apache.hadoop.hbase.regionserver.HRegionServer;
111import org.apache.hadoop.hbase.regionserver.HStore;
112import org.apache.hadoop.hbase.regionserver.InternalScanner;
113import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
114import org.apache.hadoop.hbase.regionserver.Region;
115import org.apache.hadoop.hbase.regionserver.RegionScanner;
116import org.apache.hadoop.hbase.regionserver.RegionServerServices;
117import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
118import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
119import org.apache.hadoop.hbase.security.User;
120import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
121import org.apache.hadoop.hbase.trace.TraceUtil;
122import org.apache.hadoop.hbase.util.Bytes;
123import org.apache.hadoop.hbase.util.CommonFSUtils;
124import org.apache.hadoop.hbase.util.FSTableDescriptors;
125import org.apache.hadoop.hbase.util.FSUtils;
126import org.apache.hadoop.hbase.util.JVMClusterUtil;
127import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
128import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
129import org.apache.hadoop.hbase.util.Pair;
130import org.apache.hadoop.hbase.util.RegionSplitter;
131import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
132import org.apache.hadoop.hbase.util.RetryCounter;
133import org.apache.hadoop.hbase.util.Threads;
134import org.apache.hadoop.hbase.wal.WAL;
135import org.apache.hadoop.hbase.wal.WALFactory;
136import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
137import org.apache.hadoop.hbase.zookeeper.ZKConfig;
138import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
139import org.apache.hadoop.hdfs.DFSClient;
140import org.apache.hadoop.hdfs.DistributedFileSystem;
141import org.apache.hadoop.hdfs.MiniDFSCluster;
142import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
143import org.apache.hadoop.mapred.JobConf;
144import org.apache.hadoop.mapred.MiniMRCluster;
145import org.apache.hadoop.mapred.TaskLog;
146import org.apache.hadoop.minikdc.MiniKdc;
147import org.apache.log4j.LogManager;
148import org.apache.yetus.audience.InterfaceAudience;
149import org.apache.zookeeper.WatchedEvent;
150import org.apache.zookeeper.ZooKeeper;
151import org.apache.zookeeper.ZooKeeper.States;
152import org.slf4j.Logger;
153import org.slf4j.LoggerFactory;
154import org.slf4j.impl.Log4jLoggerAdapter;
155
156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
157
158/**
159 * Facility for testing HBase. Replacement for
160 * old HBaseTestCase and HBaseClusterTestCase functionality.
161 * Create an instance and keep it around testing HBase.  This class is
162 * meant to be your one-stop shop for anything you might need testing.  Manages
163 * one cluster at a time only. Managed cluster can be an in-process
164 * {@link MiniHBaseCluster}, or a deployed cluster of type {@code DistributedHBaseCluster}.
165 * Not all methods work with the real cluster.
166 * Depends on log4j being on classpath and
167 * hbase-site.xml for logging and test-run configuration.  It does not set
168 * logging levels.
169 * In the configuration properties, default values for master-info-port and
170 * region-server-port are overridden such that a random port will be assigned (thus
171 * avoiding port contention if another local HBase instance is already running).
172 * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
173 * setting it to true.
174 * For triggering pre commit
175 */
176@InterfaceAudience.Public
177@SuppressWarnings("deprecation")
178public class HBaseTestingUtility extends HBaseZKTestingUtility {
179
180  /**
181   * System property key to get test directory value. Name is as it is because mini dfs has
182   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
183   * used in mini dfs.
184   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
185   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
186   */
187  @Deprecated
188  private static final String TEST_DIRECTORY_KEY = "test.build.data";
189
190  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
191  /**
192   * The default number of regions per regionserver when creating a pre-split
193   * table.
194   */
195  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
196
197
198  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
199  public static final boolean PRESPLIT_TEST_TABLE = true;
200
201  private MiniDFSCluster dfsCluster = null;
202
203  private volatile HBaseCluster hbaseCluster = null;
204  private MiniMRCluster mrCluster = null;
205
206  /** If there is a mini cluster running for this testing utility instance. */
207  private volatile boolean miniClusterRunning;
208
209  private String hadoopLogDir;
210
211  /** Directory on test filesystem where we put the data for this instance of
212    * HBaseTestingUtility*/
213  private Path dataTestDirOnTestFS = null;
214
215  /**
216   * Shared cluster connection.
217   */
218  private volatile Connection connection;
219
220  /** Filesystem URI used for map-reduce mini-cluster setup */
221  private static String FS_URI;
222
223  /** This is for unit tests parameterized with a single boolean. */
224  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
225
226  /**
227   * Checks to see if a specific port is available.
228   *
229   * @param port the port number to check for availability
230   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
231   */
232  public static boolean available(int port) {
233    ServerSocket ss = null;
234    DatagramSocket ds = null;
235    try {
236      ss = new ServerSocket(port);
237      ss.setReuseAddress(true);
238      ds = new DatagramSocket(port);
239      ds.setReuseAddress(true);
240      return true;
241    } catch (IOException e) {
242      // Do nothing
243    } finally {
244      if (ds != null) {
245        ds.close();
246      }
247
248      if (ss != null) {
249        try {
250          ss.close();
251        } catch (IOException e) {
252          /* should not be thrown */
253        }
254      }
255    }
256
257    return false;
258  }
259
260  /**
261   * Create all combinations of Bloom filters and compression algorithms for
262   * testing.
263   */
264  private static List<Object[]> bloomAndCompressionCombinations() {
265    List<Object[]> configurations = new ArrayList<>();
266    for (Compression.Algorithm comprAlgo :
267         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
268      for (BloomType bloomType : BloomType.values()) {
269        configurations.add(new Object[] { comprAlgo, bloomType });
270      }
271    }
272    return Collections.unmodifiableList(configurations);
273  }
274
275  /**
276   * Create combination of memstoreTS and tags
277   */
278  private static List<Object[]> memStoreTSAndTagsCombination() {
279    List<Object[]> configurations = new ArrayList<>();
280    configurations.add(new Object[] { false, false });
281    configurations.add(new Object[] { false, true });
282    configurations.add(new Object[] { true, false });
283    configurations.add(new Object[] { true, true });
284    return Collections.unmodifiableList(configurations);
285  }
286
287  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
288    List<Object[]> configurations = new ArrayList<>();
289    configurations.add(new Object[] { false, false, true });
290    configurations.add(new Object[] { false, false, false });
291    configurations.add(new Object[] { false, true, true });
292    configurations.add(new Object[] { false, true, false });
293    configurations.add(new Object[] { true, false, true });
294    configurations.add(new Object[] { true, false, false });
295    configurations.add(new Object[] { true, true, true });
296    configurations.add(new Object[] { true, true, false });
297    return Collections.unmodifiableList(configurations);
298  }
299
300  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
301      bloomAndCompressionCombinations();
302
303
304  /**
305   * <p>Create an HBaseTestingUtility using a default configuration.
306   *
307   * <p>Initially, all tmp files are written to a local test data directory.
308   * Once {@link #startMiniDFSCluster} is called, either directly or via
309   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
310   *
311   * <p>Previously, there was a distinction between the type of utility returned by
312   * {@link #createLocalHTU()} and this constructor; this is no longer the case. All
313   * HBaseTestingUtility objects will behave as local until a DFS cluster is started,
314   * at which point they will switch to using mini DFS for storage.
315   */
316  public HBaseTestingUtility() {
317    this(HBaseConfiguration.create());
318  }
319
320  /**
321   * <p>Create an HBaseTestingUtility using a given configuration.
322   *
323   * <p>Initially, all tmp files are written to a local test data directory.
324   * Once {@link #startMiniDFSCluster} is called, either directly or via
325   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
326   *
327   * <p>Previously, there was a distinction between the type of utility returned by
328   * {@link #createLocalHTU()} and this constructor; this is no longer the case. All
329   * HBaseTestingUtility objects will behave as local until a DFS cluster is started,
330   * at which point they will switch to using mini DFS for storage.
331   *
332   * @param conf The configuration to use for further operations
333   */
334  public HBaseTestingUtility(@Nullable Configuration conf) {
335    super(conf);
336
337    // a hbase checksum verification failure will cause unit tests to fail
338    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
339
340    // Save this for when setting default file:// breaks things
341    if (this.conf.get("fs.defaultFS") != null) {
342      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
343    }
344    if (this.conf.get(HConstants.HBASE_DIR) != null) {
345      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
346    }
347    // Every cluster is a local cluster until we start DFS
348    // Note that conf could be null, but this.conf will not be
349    String dataTestDir = getDataTestDir().toString();
350    this.conf.set("fs.defaultFS","file:///");
351    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
352    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
353    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
354    // If the value for random ports isn't set set it to true, thus making
355    // tests opt-out for random port assignment
356    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
357        this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
358  }
359
360  /**
361   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #HBaseTestingUtility()}
362   *   instead.
363   * @return a normal HBaseTestingUtility
364   * @see #HBaseTestingUtility()
365   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19841">HBASE-19841</a>
366   */
367  @Deprecated
368  public static HBaseTestingUtility createLocalHTU() {
369    return new HBaseTestingUtility();
370  }
371
372  /**
373   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
374   *   {@link #HBaseTestingUtility(Configuration)} instead.
375   * @return a normal HBaseTestingUtility
376   * @see #HBaseTestingUtility(Configuration)
377   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19841">HBASE-19841</a>
378   */
379  @Deprecated
380  public static HBaseTestingUtility createLocalHTU(Configuration c) {
381    return new HBaseTestingUtility(c);
382  }
383
384  /**
385   * Close both the region {@code r} and it's underlying WAL. For use in tests.
386   */
387  public static void closeRegionAndWAL(final Region r) throws IOException {
388    closeRegionAndWAL((HRegion)r);
389  }
390
391  /**
392   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
393   */
394  public static void closeRegionAndWAL(final HRegion r) throws IOException {
395    if (r == null) return;
396    r.close();
397    if (r.getWAL() == null) return;
398    r.getWAL().close();
399  }
400
401  /**
402   * Returns this classes's instance of {@link Configuration}.  Be careful how
403   * you use the returned Configuration since {@link Connection} instances
404   * can be shared.  The Map of Connections is keyed by the Configuration.  If
405   * say, a Connection was being used against a cluster that had been shutdown,
406   * see {@link #shutdownMiniCluster()}, then the Connection will no longer
407   * be wholesome.  Rather than use the return direct, its usually best to
408   * make a copy and use that.  Do
409   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
410   * @return Instance of Configuration.
411   */
412  @Override
413  public Configuration getConfiguration() {
414    return super.getConfiguration();
415  }
416
417  public void setHBaseCluster(HBaseCluster hbaseCluster) {
418    this.hbaseCluster = hbaseCluster;
419  }
420
421  /**
422   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
423   * Give it a random name so can have many concurrent tests running if
424   * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
425   * System property, as it's what minidfscluster bases
426   * it data dir on.  Moding a System property is not the way to do concurrent
427   * instances -- another instance could grab the temporary
428   * value unintentionally -- but not anything can do about it at moment;
429   * single instance only is how the minidfscluster works.
430   *
431   * We also create the underlying directory for
432   *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
433   *  in the conf, and as a system property for hadoop.tmp.dir
434   *
435   * @return The calculated data test build directory, if newly-created.
436   */
437  @Override
438  protected Path setupDataTestDir() {
439    Path testPath = super.setupDataTestDir();
440    if (null == testPath) {
441      return null;
442    }
443
444    createSubDirAndSystemProperty(
445      "hadoop.log.dir",
446      testPath, "hadoop-log-dir");
447
448    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
449    //  we want our own value to ensure uniqueness on the same machine
450    createSubDirAndSystemProperty(
451      "hadoop.tmp.dir",
452      testPath, "hadoop-tmp-dir");
453
454    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
455    createSubDir(
456      "mapreduce.cluster.local.dir",
457      testPath, "mapred-local-dir");
458
459    return testPath;
460  }
461
462  private void createSubDirAndSystemProperty(
463    String propertyName, Path parent, String subDirName){
464
465    String sysValue = System.getProperty(propertyName);
466
467    if (sysValue != null) {
468      // There is already a value set. So we do nothing but hope
469      //  that there will be no conflicts
470      LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
471        sysValue + " so I do NOT create it in " + parent);
472      String confValue = conf.get(propertyName);
473      if (confValue != null && !confValue.endsWith(sysValue)){
474       LOG.warn(
475         propertyName + " property value differs in configuration and system: "+
476         "Configuration="+confValue+" while System="+sysValue+
477         " Erasing configuration value by system value."
478       );
479      }
480      conf.set(propertyName, sysValue);
481    } else {
482      // Ok, it's not set, so we create it as a subdirectory
483      createSubDir(propertyName, parent, subDirName);
484      System.setProperty(propertyName, conf.get(propertyName));
485    }
486  }
487
488  /**
489   * @return Where to write test data on the test filesystem; Returns working directory
490   * for the test filesystem by default
491   * @see #setupDataTestDirOnTestFS()
492   * @see #getTestFileSystem()
493   */
494  private Path getBaseTestDirOnTestFS() throws IOException {
495    FileSystem fs = getTestFileSystem();
496    return new Path(fs.getWorkingDirectory(), "test-data");
497  }
498
499  /**
500   * @return META table descriptor
501   * @deprecated since 2.0 version and will be removed in 3.0 version.
502   *             use {@link #getMetaTableDescriptorBuilder()}
503   */
504  @Deprecated
505  public HTableDescriptor getMetaTableDescriptor() {
506    return new ImmutableHTableDescriptor(getMetaTableDescriptorBuilder().build());
507  }
508
509  /**
510   * @return META table descriptor
511   */
512  public TableDescriptorBuilder getMetaTableDescriptorBuilder() {
513    try {
514      return FSTableDescriptors.createMetaTableDescriptorBuilder(conf);
515    } catch (IOException e) {
516      throw new RuntimeException("Unable to create META table descriptor", e);
517    }
518  }
519
520  /**
521   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
522   * to write temporary test data. Call this method after setting up the mini dfs cluster
523   * if the test relies on it.
524   * @return a unique path in the test filesystem
525   */
526  public Path getDataTestDirOnTestFS() throws IOException {
527    if (dataTestDirOnTestFS == null) {
528      setupDataTestDirOnTestFS();
529    }
530
531    return dataTestDirOnTestFS;
532  }
533
534  /**
535   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
536   * to write temporary test data. Call this method after setting up the mini dfs cluster
537   * if the test relies on it.
538   * @return a unique path in the test filesystem
539   * @param subdirName name of the subdir to create under the base test dir
540   */
541  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
542    return new Path(getDataTestDirOnTestFS(), subdirName);
543  }
544
545  /**
546   * Sets up a path in test filesystem to be used by tests.
547   * Creates a new directory if not already setup.
548   */
549  private void setupDataTestDirOnTestFS() throws IOException {
550    if (dataTestDirOnTestFS != null) {
551      LOG.warn("Data test on test fs dir already setup in "
552          + dataTestDirOnTestFS.toString());
553      return;
554    }
555    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
556  }
557
558  /**
559   * Sets up a new path in test filesystem to be used by tests.
560   */
561  private Path getNewDataTestDirOnTestFS() throws IOException {
562    //The file system can be either local, mini dfs, or if the configuration
563    //is supplied externally, it can be an external cluster FS. If it is a local
564    //file system, the tests should use getBaseTestDir, otherwise, we can use
565    //the working directory, and create a unique sub dir there
566    FileSystem fs = getTestFileSystem();
567    Path newDataTestDir;
568    String randomStr = UUID.randomUUID().toString();
569    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
570      newDataTestDir = new Path(getDataTestDir(), randomStr);
571      File dataTestDir = new File(newDataTestDir.toString());
572      if (deleteOnExit()) dataTestDir.deleteOnExit();
573    } else {
574      Path base = getBaseTestDirOnTestFS();
575      newDataTestDir = new Path(base, randomStr);
576      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
577    }
578    return newDataTestDir;
579  }
580
581  /**
582   * Cleans the test data directory on the test filesystem.
583   * @return True if we removed the test dirs
584   * @throws IOException
585   */
586  public boolean cleanupDataTestDirOnTestFS() throws IOException {
587    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
588    if (ret)
589      dataTestDirOnTestFS = null;
590    return ret;
591  }
592
593  /**
594   * Cleans a subdirectory under the test data directory on the test filesystem.
595   * @return True if we removed child
596   * @throws IOException
597   */
598  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
599    Path cpath = getDataTestDirOnTestFS(subdirName);
600    return getTestFileSystem().delete(cpath, true);
601  }
602
603  /**
604   * Start a minidfscluster.
605   * @param servers How many DNs to start.
606   * @throws Exception
607   * @see {@link #shutdownMiniDFSCluster()}
608   * @return The mini dfs cluster created.
609   */
610  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
611    return startMiniDFSCluster(servers, null);
612  }
613
614  /**
615   * Start a minidfscluster.
616   * This is useful if you want to run datanode on distinct hosts for things
617   * like HDFS block location verification.
618   * If you start MiniDFSCluster without host names, all instances of the
619   * datanodes will have the same host name.
620   * @param hosts hostnames DNs to run on.
621   * @throws Exception
622   * @see {@link #shutdownMiniDFSCluster()}
623   * @return The mini dfs cluster created.
624   */
625  public MiniDFSCluster startMiniDFSCluster(final String hosts[])
626  throws Exception {
627    if ( hosts != null && hosts.length != 0) {
628      return startMiniDFSCluster(hosts.length, hosts);
629    } else {
630      return startMiniDFSCluster(1, null);
631    }
632  }
633
634  /**
635   * Start a minidfscluster.
636   * Can only create one.
637   * @param servers How many DNs to start.
638   * @param hosts hostnames DNs to run on.
639   * @throws Exception
640   * @see {@link #shutdownMiniDFSCluster()}
641   * @return The mini dfs cluster created.
642   */
643  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
644  throws Exception {
645    return startMiniDFSCluster(servers, null, hosts);
646  }
647
648  private void setFs() throws IOException {
649    if(this.dfsCluster == null){
650      LOG.info("Skipping setting fs because dfsCluster is null");
651      return;
652    }
653    FileSystem fs = this.dfsCluster.getFileSystem();
654    FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
655
656    // re-enable this check with dfs
657    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
658  }
659
660  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
661      throws Exception {
662    createDirsAndSetProperties();
663    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
664
665    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
666    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
667        setLevel(org.apache.log4j.Level.ERROR);
668    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
669        setLevel(org.apache.log4j.Level.ERROR);
670
671    TraceUtil.initTracer(conf);
672
673    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
674        true, null, racks, hosts, null);
675
676    // Set this just-started cluster as our filesystem.
677    setFs();
678
679    // Wait for the cluster to be totally up
680    this.dfsCluster.waitClusterUp();
681
682    //reset the test directory for test file system
683    dataTestDirOnTestFS = null;
684    String dataTestDir = getDataTestDir().toString();
685    conf.set(HConstants.HBASE_DIR, dataTestDir);
686    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
687
688    return this.dfsCluster;
689  }
690
691  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
692    createDirsAndSetProperties();
693    dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
694        null, null, null);
695    return dfsCluster;
696  }
697
698  /** This is used before starting HDFS and map-reduce mini-clusters */
699  private void createDirsAndSetProperties() throws IOException {
700    setupClusterTestDir();
701    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
702    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
703    createDirAndSetProperty("cache_data", "test.cache.data");
704    createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
705    hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
706    createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir");
707    createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir");
708    enableShortCircuit();
709
710    Path root = getDataTestDirOnTestFS("hadoop");
711    conf.set(MapreduceTestingShim.getMROutputDirProp(),
712      new Path(root, "mapred-output-dir").toString());
713    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
714    conf.set("mapreduce.jobtracker.staging.root.dir",
715      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
716    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
717    conf.set("yarn.app.mapreduce.am.staging-dir",
718      new Path(root, "mapreduce-am-staging-root-dir").toString());
719  }
720
721  /**
722   *  Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating
723   *  new column families. Default to false.
724   */
725  public boolean isNewVersionBehaviorEnabled(){
726    final String propName = "hbase.tests.new.version.behavior";
727    String v = System.getProperty(propName);
728    if (v != null){
729      return Boolean.parseBoolean(v);
730    }
731    return false;
732  }
733
734  /**
735   *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
736   *  This allows to specify this parameter on the command line.
737   *   If not set, default is true.
738   */
739  public boolean isReadShortCircuitOn(){
740    final String propName = "hbase.tests.use.shortcircuit.reads";
741    String readOnProp = System.getProperty(propName);
742    if (readOnProp != null){
743      return  Boolean.parseBoolean(readOnProp);
744    } else {
745      return conf.getBoolean(propName, false);
746    }
747  }
748
749  /** Enable the short circuit read, unless configured differently.
750   * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
751   */
752  private void enableShortCircuit() {
753    if (isReadShortCircuitOn()) {
754      String curUser = System.getProperty("user.name");
755      LOG.info("read short circuit is ON for user " + curUser);
756      // read short circuit, for hdfs
757      conf.set("dfs.block.local-path-access.user", curUser);
758      // read short circuit, for hbase
759      conf.setBoolean("dfs.client.read.shortcircuit", true);
760      // Skip checking checksum, for the hdfs client and the datanode
761      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
762    } else {
763      LOG.info("read short circuit is OFF");
764    }
765  }
766
767  private String createDirAndSetProperty(final String relPath, String property) {
768    String path = getDataTestDir(relPath).toString();
769    System.setProperty(property, path);
770    conf.set(property, path);
771    new File(path).mkdirs();
772    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
773    return path;
774  }
775
776  /**
777   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
778   * or does nothing.
779   * @throws IOException
780   */
781  public void shutdownMiniDFSCluster() throws IOException {
782    if (this.dfsCluster != null) {
783      // The below throws an exception per dn, AsynchronousCloseException.
784      this.dfsCluster.shutdown();
785      dfsCluster = null;
786      dataTestDirOnTestFS = null;
787      FSUtils.setFsDefault(this.conf, new Path("file:///"));
788    }
789  }
790
791
792  /**
793   * Start up a minicluster of hbase, dfs, and zookeeper.
794   * @throws Exception
795   * @return Mini hbase cluster instance created.
796   * @see {@link #shutdownMiniDFSCluster()}
797   */
798  public MiniHBaseCluster startMiniCluster() throws Exception {
799    return startMiniCluster(1, 1);
800  }
801
802  /**
803   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
804   * @throws Exception
805   * @return Mini hbase cluster instance created.
806   * @see {@link #shutdownMiniDFSCluster()}
807   */
808  public MiniHBaseCluster startMiniCluster(boolean withWALDir) throws Exception {
809    return startMiniCluster(1, 1, 1, null, null, null, false, withWALDir);
810  }
811
812  /**
813   * Start up a minicluster of hbase, dfs, and zookeeper.
814   * Set the <code>create</code> flag to create root or data directory path or not
815   * (will overwrite if dir already exists)
816   * @throws Exception
817   * @return Mini hbase cluster instance created.
818   * @see {@link #shutdownMiniDFSCluster()}
819   */
820  public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create)
821  throws Exception {
822    return startMiniCluster(1, numSlaves, create);
823  }
824
825  /**
826   * Start up a minicluster of hbase, optionally dfs, and zookeeper.
827   * Modifies Configuration.  Homes the cluster data directory under a random
828   * subdirectory in a directory under System property test.build.data.
829   * Directory is cleaned up on exit.
830   * @param numSlaves Number of slaves to start up.  We'll start this many
831   * datanodes and regionservers.  If numSlaves is > 1, then make sure
832   * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
833   * bind errors.
834   * @throws Exception
835   * @see {@link #shutdownMiniCluster()}
836   * @return Mini hbase cluster instance created.
837   */
838  public MiniHBaseCluster startMiniCluster(final int numSlaves)
839  throws Exception {
840    return startMiniCluster(1, numSlaves, false);
841  }
842
843  public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create, boolean withWALDir)
844          throws Exception {
845    return startMiniCluster(1, numSlaves, numSlaves, null, null, null, create, withWALDir);
846  }
847
848  /**
849   * Start minicluster. Whether to create a new root or data dir path even if such a path
850   * has been created earlier is decided based on flag <code>create</code>
851   * @throws Exception
852   * @see {@link #shutdownMiniCluster()}
853   * @return Mini hbase cluster instance created.
854   */
855  public MiniHBaseCluster startMiniCluster(final int numMasters,
856      final int numSlaves, boolean create)
857    throws Exception {
858      return startMiniCluster(numMasters, numSlaves, null, create);
859  }
860
861  /**
862   * start minicluster
863   * @throws Exception
864   * @see {@link #shutdownMiniCluster()}
865   * @return Mini hbase cluster instance created.
866   */
867  public MiniHBaseCluster startMiniCluster(final int numMasters,
868    final int numSlaves)
869  throws Exception {
870    return startMiniCluster(numMasters, numSlaves, null, false);
871  }
872
873  public MiniHBaseCluster startMiniCluster(final int numMasters,
874      final int numSlaves, final String[] dataNodeHosts, boolean create)
875      throws Exception {
876    return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
877        null, null, create, false);
878  }
879
880  /**
881   * Start up a minicluster of hbase, optionally dfs, and zookeeper.
882   * Modifies Configuration.  Homes the cluster data directory under a random
883   * subdirectory in a directory under System property test.build.data.
884   * Directory is cleaned up on exit.
885   * @param numMasters Number of masters to start up.  We'll start this many
886   * hbase masters.  If numMasters > 1, you can find the active/primary master
887   * with {@link MiniHBaseCluster#getMaster()}.
888   * @param numSlaves Number of slaves to start up.  We'll start this many
889   * regionservers. If dataNodeHosts == null, this also indicates the number of
890   * datanodes to start. If dataNodeHosts != null, the number of datanodes is
891   * based on dataNodeHosts.length.
892   * If numSlaves is > 1, then make sure
893   * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
894   * bind errors.
895   * @param dataNodeHosts hostnames DNs to run on.
896   * This is useful if you want to run datanode on distinct hosts for things
897   * like HDFS block location verification.
898   * If you start MiniDFSCluster without host names,
899   * all instances of the datanodes will have the same host name.
900   * @throws Exception
901   * @see {@link #shutdownMiniCluster()}
902   * @return Mini hbase cluster instance created.
903   */
904  public MiniHBaseCluster startMiniCluster(final int numMasters,
905      final int numSlaves, final String[] dataNodeHosts) throws Exception {
906    return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
907        null, null);
908  }
909
910  /**
911   * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
912   * @param numDataNodes Number of data nodes.
913   */
914  public MiniHBaseCluster startMiniCluster(final int numMasters,
915      final int numSlaves, final int numDataNodes) throws Exception {
916    return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
917  }
918
919  /**
920   * Start up a minicluster of hbase, optionally dfs, and zookeeper.
921   * Modifies Configuration.  Homes the cluster data directory under a random
922   * subdirectory in a directory under System property test.build.data.
923   * Directory is cleaned up on exit.
924   * @param numMasters Number of masters to start up.  We'll start this many
925   * hbase masters.  If numMasters > 1, you can find the active/primary master
926   * with {@link MiniHBaseCluster#getMaster()}.
927   * @param numSlaves Number of slaves to start up.  We'll start this many
928   * regionservers. If dataNodeHosts == null, this also indicates the number of
929   * datanodes to start. If dataNodeHosts != null, the number of datanodes is
930   * based on dataNodeHosts.length.
931   * If numSlaves is > 1, then make sure
932   * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
933   * bind errors.
934   * @param dataNodeHosts hostnames DNs to run on.
935   * This is useful if you want to run datanode on distinct hosts for things
936   * like HDFS block location verification.
937   * If you start MiniDFSCluster without host names,
938   * all instances of the datanodes will have the same host name.
939   * @param masterClass The class to use as HMaster, or null for default
940   * @param regionserverClass The class to use as HRegionServer, or null for
941   * default
942   * @throws Exception
943   * @see {@link #shutdownMiniCluster()}
944   * @return Mini hbase cluster instance created.
945   */
946  public MiniHBaseCluster startMiniCluster(final int numMasters,
947      final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
948      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
949          throws Exception {
950    return startMiniCluster(
951        numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
952  }
953
954  public MiniHBaseCluster startMiniCluster(final int numMasters,
955      final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
956      Class<? extends HMaster> masterClass,
957      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
958    throws Exception {
959    return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
960        masterClass, regionserverClass, false, false);
961  }
962
963  /**
964   * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
965   * number of datanodes.
966   * @param numDataNodes Number of data nodes.
967   * @param create Set this flag to create a new
968   * root or data directory path or not (will overwrite if exists already).
969   */
970  public MiniHBaseCluster startMiniCluster(final int numMasters,
971    final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
972    Class<? extends HMaster> masterClass,
973    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
974    boolean create, boolean withWALDir)
975  throws Exception {
976    if (dataNodeHosts != null && dataNodeHosts.length != 0) {
977      numDataNodes = dataNodeHosts.length;
978    }
979
980    LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
981        numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
982
983    // If we already put up a cluster, fail.
984    if (miniClusterRunning) {
985      throw new IllegalStateException("A mini-cluster is already running");
986    }
987    miniClusterRunning = true;
988
989    setupClusterTestDir();
990    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
991
992    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
993    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
994    if(this.dfsCluster == null) {
995      LOG.info("STARTING DFS");
996      dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
997    } else LOG.info("NOT STARTING DFS");
998
999    // Start up a zk cluster.
1000    if (getZkCluster() == null) {
1001      startMiniZKCluster();
1002    }
1003
1004    // Start the MiniHBaseCluster
1005    return startMiniHBaseCluster(numMasters, numSlaves, null, masterClass,
1006      regionserverClass, create, withWALDir);
1007  }
1008
1009  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
1010      throws IOException, InterruptedException {
1011     return startMiniHBaseCluster(numMasters, numSlaves, null);
1012  }
1013
1014  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves,
1015      List<Integer> rsPorts) throws IOException, InterruptedException {
1016    return startMiniHBaseCluster(numMasters, numSlaves, rsPorts, null, null, false, false);
1017  }
1018
1019  /**
1020   * Starts up mini hbase cluster.  Usually used after call to
1021   * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
1022   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1023   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
1024   *                restart where for sure the regionservers come up on same address+port (but
1025   *                just with different startcode); by default mini hbase clusters choose new
1026   *                arbitrary ports on each cluster start.
1027   * @param create Whether to create a
1028   * root or data directory path or not; will overwrite if exists already.
1029   * @return Reference to the hbase mini hbase cluster.
1030   * @throws IOException
1031   * @throws InterruptedException
1032   * @see {@link #startMiniCluster()}
1033   */
1034  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
1035        final int numSlaves, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1036        Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
1037        boolean create, boolean withWALDir)
1038  throws IOException, InterruptedException {
1039    // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
1040    createRootDir(create);
1041    if (withWALDir) {
1042      createWALRootDir();
1043    }
1044    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1045    // for tests that do not read hbase-defaults.xml
1046    setHBaseFsTmpDir();
1047
1048    // These settings will make the server waits until this exact number of
1049    // regions servers are connected.
1050    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1051      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
1052    }
1053    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1054      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
1055    }
1056
1057    Configuration c = new Configuration(this.conf);
1058    TraceUtil.initTracer(c);
1059    this.hbaseCluster =
1060        new MiniHBaseCluster(c, numMasters, numSlaves, rsPorts, masterClass, regionserverClass);
1061    // Don't leave here till we've done a successful scan of the hbase:meta
1062    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1063    ResultScanner s = t.getScanner(new Scan());
1064    while (s.next() != null) {
1065      continue;
1066    }
1067    s.close();
1068    t.close();
1069
1070    getAdmin(); // create immediately the hbaseAdmin
1071    LOG.info("Minicluster is up; activeMaster=" + this.getHBaseCluster().getMaster());
1072
1073    return (MiniHBaseCluster)this.hbaseCluster;
1074  }
1075
1076  /**
1077   * Starts the hbase cluster up again after shutting it down previously in a
1078   * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1079   * @param servers number of region servers
1080   */
1081  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1082    if (hbaseAdmin != null) {
1083      hbaseAdmin.close();
1084      hbaseAdmin = null;
1085    }
1086    if (this.connection != null) {
1087      this.connection.close();
1088      this.connection = null;
1089    }
1090    this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
1091    // Don't leave here till we've done a successful scan of the hbase:meta
1092    Connection conn = ConnectionFactory.createConnection(this.conf);
1093    Table t = conn.getTable(TableName.META_TABLE_NAME);
1094    ResultScanner s = t.getScanner(new Scan());
1095    while (s.next() != null) {
1096      // do nothing
1097    }
1098    LOG.info("HBase has been restarted");
1099    s.close();
1100    t.close();
1101    conn.close();
1102  }
1103
1104  /**
1105   * @return Current mini hbase cluster. Only has something in it after a call
1106   * to {@link #startMiniCluster()}.
1107   * @see #startMiniCluster()
1108   */
1109  public MiniHBaseCluster getMiniHBaseCluster() {
1110    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1111      return (MiniHBaseCluster)this.hbaseCluster;
1112    }
1113    throw new RuntimeException(hbaseCluster + " not an instance of " +
1114                               MiniHBaseCluster.class.getName());
1115  }
1116
1117  /**
1118   * Stops mini hbase, zk, and hdfs clusters.
1119   * @throws IOException
1120   * @see {@link #startMiniCluster(int)}
1121   */
1122  public void shutdownMiniCluster() throws Exception {
1123    LOG.info("Shutting down minicluster");
1124    shutdownMiniHBaseCluster();
1125    shutdownMiniDFSCluster();
1126    shutdownMiniZKCluster();
1127
1128    cleanupTestDir();
1129    miniClusterRunning = false;
1130    LOG.info("Minicluster is down");
1131  }
1132
1133  /**
1134   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1135   * @throws java.io.IOException in case command is unsuccessful
1136   */
1137  public void shutdownMiniHBaseCluster() throws IOException {
1138    cleanup();
1139    if (this.hbaseCluster != null) {
1140      this.hbaseCluster.shutdown();
1141      // Wait till hbase is down before going on to shutdown zk.
1142      this.hbaseCluster.waitUntilShutDown();
1143      this.hbaseCluster = null;
1144    }
1145    if (zooKeeperWatcher != null) {
1146      zooKeeperWatcher.close();
1147      zooKeeperWatcher = null;
1148    }
1149  }
1150
1151  /**
1152   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1153   * @throws java.io.IOException throws in case command is unsuccessful
1154   */
1155  public void killMiniHBaseCluster() throws IOException {
1156    cleanup();
1157    if (this.hbaseCluster != null) {
1158      getMiniHBaseCluster().killAll();
1159      this.hbaseCluster = null;
1160    }
1161    if (zooKeeperWatcher != null) {
1162      zooKeeperWatcher.close();
1163      zooKeeperWatcher = null;
1164    }
1165  }
1166
1167  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1168  private void cleanup() throws IOException {
1169    if (hbaseAdmin != null) {
1170      hbaseAdmin.close();
1171      hbaseAdmin = null;
1172    }
1173    if (this.connection != null) {
1174      this.connection.close();
1175      this.connection = null;
1176    }
1177    // unset the configuration for MIN and MAX RS to start
1178    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1179    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1180  }
1181
1182  /**
1183   * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1184   * is true, a new root directory path is fetched irrespective of whether it has been fetched
1185   * before or not. If false, previous path is used.
1186   * Note: this does not cause the root dir to be created.
1187   * @return Fully qualified path for the default hbase root dir
1188   * @throws IOException
1189   */
1190  public Path getDefaultRootDirPath(boolean create) throws IOException {
1191    if (!create) {
1192      return getDataTestDirOnTestFS();
1193    } else {
1194      return getNewDataTestDirOnTestFS();
1195    }
1196  }
1197
1198  /**
1199   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1200   * except that <code>create</code> flag is false.
1201   * Note: this does not cause the root dir to be created.
1202   * @return Fully qualified path for the default hbase root dir
1203   * @throws IOException
1204   */
1205  public Path getDefaultRootDirPath() throws IOException {
1206    return getDefaultRootDirPath(false);
1207  }
1208
1209  /**
1210   * Creates an hbase rootdir in user home directory.  Also creates hbase
1211   * version file.  Normally you won't make use of this method.  Root hbasedir
1212   * is created for you as part of mini cluster startup.  You'd only use this
1213   * method if you were doing manual operation.
1214   * @param create This flag decides whether to get a new
1215   * root or data directory path or not, if it has been fetched already.
1216   * Note : Directory will be made irrespective of whether path has been fetched or not.
1217   * If directory already exists, it will be overwritten
1218   * @return Fully qualified path to hbase root dir
1219   * @throws IOException
1220   */
1221  public Path createRootDir(boolean create) throws IOException {
1222    FileSystem fs = FileSystem.get(this.conf);
1223    Path hbaseRootdir = getDefaultRootDirPath(create);
1224    FSUtils.setRootDir(this.conf, hbaseRootdir);
1225    fs.mkdirs(hbaseRootdir);
1226    FSUtils.setVersion(fs, hbaseRootdir);
1227    return hbaseRootdir;
1228  }
1229
1230  /**
1231   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1232   * except that <code>create</code> flag is false.
1233   * @return Fully qualified path to hbase root dir
1234   * @throws IOException
1235   */
1236  public Path createRootDir() throws IOException {
1237    return createRootDir(false);
1238  }
1239
1240  /**
1241   * Creates a hbase walDir in the user's home directory.
1242   * Normally you won't make use of this method. Root hbaseWALDir
1243   * is created for you as part of mini cluster startup. You'd only use this
1244   * method if you were doing manual operation.
1245   *
1246   * @return Fully qualified path to hbase root dir
1247   * @throws IOException
1248  */
1249  public Path createWALRootDir() throws IOException {
1250    FileSystem fs = FileSystem.get(this.conf);
1251    Path walDir = getNewDataTestDirOnTestFS();
1252    FSUtils.setWALRootDir(this.conf, walDir);
1253    fs.mkdirs(walDir);
1254    return walDir;
1255  }
1256
1257  private void setHBaseFsTmpDir() throws IOException {
1258    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1259    if (hbaseFsTmpDirInString == null) {
1260      this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1261      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1262    } else {
1263      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1264    }
1265  }
1266
1267  /**
1268   * Flushes all caches in the mini hbase cluster
1269   * @throws IOException
1270   */
1271  public void flush() throws IOException {
1272    getMiniHBaseCluster().flushcache();
1273  }
1274
1275  /**
1276   * Flushes all caches in the mini hbase cluster
1277   * @throws IOException
1278   */
1279  public void flush(TableName tableName) throws IOException {
1280    getMiniHBaseCluster().flushcache(tableName);
1281  }
1282
1283  /**
1284   * Compact all regions in the mini hbase cluster
1285   * @throws IOException
1286   */
1287  public void compact(boolean major) throws IOException {
1288    getMiniHBaseCluster().compact(major);
1289  }
1290
1291  /**
1292   * Compact all of a table's reagion in the mini hbase cluster
1293   * @throws IOException
1294   */
1295  public void compact(TableName tableName, boolean major) throws IOException {
1296    getMiniHBaseCluster().compact(tableName, major);
1297  }
1298
1299  /**
1300   * Create a table.
1301   * @param tableName
1302   * @param family
1303   * @return A Table instance for the created table.
1304   * @throws IOException
1305   */
1306  public Table createTable(TableName tableName, String family)
1307  throws IOException{
1308    return createTable(tableName, new String[]{family});
1309  }
1310
1311  /**
1312   * Create a table.
1313   * @param tableName
1314   * @param families
1315   * @return A Table instance for the created table.
1316   * @throws IOException
1317   */
1318  public Table createTable(TableName tableName, String[] families)
1319  throws IOException {
1320    List<byte[]> fams = new ArrayList<>(families.length);
1321    for (String family : families) {
1322      fams.add(Bytes.toBytes(family));
1323    }
1324    return createTable(tableName, fams.toArray(new byte[0][]));
1325  }
1326
1327  /**
1328   * Create a table.
1329   * @param tableName
1330   * @param family
1331   * @return A Table instance for the created table.
1332   * @throws IOException
1333   */
1334  public Table createTable(TableName tableName, byte[] family)
1335  throws IOException{
1336    return createTable(tableName, new byte[][]{family});
1337  }
1338
1339  /**
1340   * Create a table with multiple regions.
1341   * @param tableName
1342   * @param family
1343   * @param numRegions
1344   * @return A Table instance for the created table.
1345   * @throws IOException
1346   */
1347  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1348      throws IOException {
1349    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1350    byte[] startKey = Bytes.toBytes("aaaaa");
1351    byte[] endKey = Bytes.toBytes("zzzzz");
1352    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1353
1354    return createTable(tableName, new byte[][] { family }, splitKeys);
1355  }
1356
1357  /**
1358   * Create a table.
1359   * @param tableName
1360   * @param families
1361   * @return A Table instance for the created table.
1362   * @throws IOException
1363   */
1364  public Table createTable(TableName tableName, byte[][] families)
1365  throws IOException {
1366    return createTable(tableName, families, (byte[][]) null);
1367  }
1368
1369  /**
1370   * Create a table with multiple regions.
1371   * @param tableName
1372   * @param families
1373   * @return A Table instance for the created table.
1374   * @throws IOException
1375   */
1376  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1377    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1378  }
1379
1380  /**
1381   * Create a table.
1382   * @param tableName
1383   * @param families
1384   * @param splitKeys
1385   * @return A Table instance for the created table.
1386   * @throws IOException
1387   */
1388  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1389      throws IOException {
1390    return createTable(tableName, families, splitKeys, new Configuration(getConfiguration()));
1391  }
1392
1393  public Table createTable(TableName tableName, byte[][] families,
1394      int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1395  throws IOException{
1396    HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1397
1398    getAdmin().createTable(desc, startKey, endKey, numRegions);
1399    // HBaseAdmin only waits for regions to appear in hbase:meta we
1400    // should wait until they are assigned
1401    waitUntilAllRegionsAssigned(tableName);
1402    return getConnection().getTable(tableName);
1403  }
1404
1405  /**
1406   * Create a table.
1407   * @param htd
1408   * @param families
1409   * @param c Configuration to use
1410   * @return A Table instance for the created table.
1411   * @throws IOException
1412   */
1413  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1414  throws IOException {
1415    return createTable(htd, families, null, c);
1416  }
1417
1418  /**
1419   * Create a table.
1420   * @param htd table descriptor
1421   * @param families array of column families
1422   * @param splitKeys array of split keys
1423   * @param c Configuration to use
1424   * @return A Table instance for the created table.
1425   * @throws IOException if getAdmin or createTable fails
1426   */
1427  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1428      Configuration c) throws IOException {
1429    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1430    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1431    // on is interfering.
1432    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1433  }
1434
1435  /**
1436   * Create a table.
1437   * @param htd table descriptor
1438   * @param families array of column families
1439   * @param splitKeys array of split keys
1440   * @param type Bloom type
1441   * @param blockSize block size
1442   * @param c Configuration to use
1443   * @return A Table instance for the created table.
1444   * @throws IOException if getAdmin or createTable fails
1445   */
1446
1447  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1448      BloomType type, int blockSize, Configuration c) throws IOException {
1449    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1450    for (byte[] family : families) {
1451      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1452        .setBloomFilterType(type)
1453        .setBlocksize(blockSize);
1454      if (isNewVersionBehaviorEnabled()) {
1455          cfdb.setNewVersionBehavior(true);
1456      }
1457      builder.setColumnFamily(cfdb.build());
1458    }
1459    TableDescriptor td = builder.build();
1460    getAdmin().createTable(td, splitKeys);
1461    // HBaseAdmin only waits for regions to appear in hbase:meta
1462    // we should wait until they are assigned
1463    waitUntilAllRegionsAssigned(td.getTableName());
1464    return getConnection().getTable(td.getTableName());
1465  }
1466
1467  /**
1468   * Create a table.
1469   * @param htd table descriptor
1470   * @param splitRows array of split keys
1471   * @return A Table instance for the created table.
1472   * @throws IOException
1473   */
1474  public Table createTable(TableDescriptor htd, byte[][] splitRows)
1475      throws IOException {
1476    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1477    if (isNewVersionBehaviorEnabled()) {
1478      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1479         builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family)
1480           .setNewVersionBehavior(true).build());
1481      }
1482    }
1483    getAdmin().createTable(builder.build(), splitRows);
1484    // HBaseAdmin only waits for regions to appear in hbase:meta
1485    // we should wait until they are assigned
1486    waitUntilAllRegionsAssigned(htd.getTableName());
1487    return getConnection().getTable(htd.getTableName());
1488  }
1489
1490  /**
1491   * Create a table.
1492   * @param tableName
1493   * @param families
1494   * @param splitKeys
1495   * @param c Configuration to use
1496   * @return A Table instance for the created table.
1497   * @throws IOException
1498   */
1499  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1500      final Configuration c) throws IOException {
1501    return createTable(new HTableDescriptor(tableName), families, splitKeys, c);
1502  }
1503
1504  /**
1505   * Create a table.
1506   * @param tableName
1507   * @param family
1508   * @param numVersions
1509   * @return A Table instance for the created table.
1510   * @throws IOException
1511   */
1512  public Table createTable(TableName tableName, byte[] family, int numVersions)
1513  throws IOException {
1514    return createTable(tableName, new byte[][]{family}, numVersions);
1515  }
1516
1517  /**
1518   * Create a table.
1519   * @param tableName
1520   * @param families
1521   * @param numVersions
1522   * @return A Table instance for the created table.
1523   * @throws IOException
1524   */
1525  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1526      throws IOException {
1527    return createTable(tableName, families, numVersions, (byte[][]) null);
1528  }
1529
1530  /**
1531   * Create a table.
1532   * @param tableName
1533   * @param families
1534   * @param numVersions
1535   * @param splitKeys
1536   * @return A Table instance for the created table.
1537   * @throws IOException
1538   */
1539  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1540      byte[][] splitKeys) throws IOException {
1541    HTableDescriptor desc = new HTableDescriptor(tableName);
1542    for (byte[] family : families) {
1543      HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1544      if (isNewVersionBehaviorEnabled()) {
1545        hcd.setNewVersionBehavior(true);
1546      }
1547      desc.addFamily(hcd);
1548    }
1549    getAdmin().createTable(desc, splitKeys);
1550    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1551    // assigned
1552    waitUntilAllRegionsAssigned(tableName);
1553    return getConnection().getTable(tableName);
1554  }
1555
1556  /**
1557   * Create a table with multiple regions.
1558   * @param tableName
1559   * @param families
1560   * @param numVersions
1561   * @return A Table instance for the created table.
1562   * @throws IOException
1563   */
1564  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1565      throws IOException {
1566    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1567  }
1568
1569  /**
1570   * Create a table.
1571   * @param tableName
1572   * @param families
1573   * @param numVersions
1574   * @param blockSize
1575   * @return A Table instance for the created table.
1576   * @throws IOException
1577   */
1578  public Table createTable(TableName tableName, byte[][] families,
1579    int numVersions, int blockSize) throws IOException {
1580    HTableDescriptor desc = new HTableDescriptor(tableName);
1581    for (byte[] family : families) {
1582      HColumnDescriptor hcd = new HColumnDescriptor(family)
1583          .setMaxVersions(numVersions)
1584          .setBlocksize(blockSize);
1585      if (isNewVersionBehaviorEnabled()) {
1586        hcd.setNewVersionBehavior(true);
1587      }
1588      desc.addFamily(hcd);
1589    }
1590    getAdmin().createTable(desc);
1591    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1592    // assigned
1593    waitUntilAllRegionsAssigned(tableName);
1594    return getConnection().getTable(tableName);
1595  }
1596
1597  public Table createTable(TableName tableName, byte[][] families,
1598      int numVersions, int blockSize, String cpName) throws IOException {
1599      HTableDescriptor desc = new HTableDescriptor(tableName);
1600      for (byte[] family : families) {
1601        HColumnDescriptor hcd = new HColumnDescriptor(family)
1602            .setMaxVersions(numVersions)
1603            .setBlocksize(blockSize);
1604        if (isNewVersionBehaviorEnabled()) {
1605          hcd.setNewVersionBehavior(true);
1606        }
1607        desc.addFamily(hcd);
1608      }
1609      if(cpName != null) {
1610        desc.addCoprocessor(cpName);
1611      }
1612      getAdmin().createTable(desc);
1613      // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1614      // assigned
1615      waitUntilAllRegionsAssigned(tableName);
1616      return getConnection().getTable(tableName);
1617    }
1618
1619  /**
1620   * Create a table.
1621   * @param tableName
1622   * @param families
1623   * @param numVersions
1624   * @return A Table instance for the created table.
1625   * @throws IOException
1626   */
1627  public Table createTable(TableName tableName, byte[][] families,
1628      int[] numVersions)
1629  throws IOException {
1630    HTableDescriptor desc = new HTableDescriptor(tableName);
1631    int i = 0;
1632    for (byte[] family : families) {
1633      HColumnDescriptor hcd = new HColumnDescriptor(family)
1634          .setMaxVersions(numVersions[i]);
1635      if (isNewVersionBehaviorEnabled()) {
1636        hcd.setNewVersionBehavior(true);
1637      }
1638      desc.addFamily(hcd);
1639      i++;
1640    }
1641    getAdmin().createTable(desc);
1642    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1643    // assigned
1644    waitUntilAllRegionsAssigned(tableName);
1645    return getConnection().getTable(tableName);
1646  }
1647
1648  /**
1649   * Create a table.
1650   * @param tableName
1651   * @param family
1652   * @param splitRows
1653   * @return A Table instance for the created table.
1654   * @throws IOException
1655   */
1656  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1657      throws IOException {
1658    HTableDescriptor desc = new HTableDescriptor(tableName);
1659    HColumnDescriptor hcd = new HColumnDescriptor(family);
1660    if (isNewVersionBehaviorEnabled()) {
1661      hcd.setNewVersionBehavior(true);
1662    }
1663    desc.addFamily(hcd);
1664    getAdmin().createTable(desc, splitRows);
1665    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1666    // assigned
1667    waitUntilAllRegionsAssigned(tableName);
1668    return getConnection().getTable(tableName);
1669  }
1670
1671  /**
1672   * Create a table with multiple regions.
1673   * @param tableName
1674   * @param family
1675   * @return A Table instance for the created table.
1676   * @throws IOException
1677   */
1678  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1679    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1680  }
1681
1682  /**
1683   * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
1684   */
1685  @SuppressWarnings("serial")
1686  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1687      throws IOException, InterruptedException {
1688    admin.modifyTable(desc);
1689    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
1690      setFirst(0);
1691      setSecond(0);
1692    }};
1693    int i = 0;
1694    do {
1695      status = admin.getAlterStatus(desc.getTableName());
1696      if (status.getSecond() != 0) {
1697        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
1698          + " regions updated.");
1699        Thread.sleep(1 * 1000L);
1700      } else {
1701        LOG.debug("All regions updated.");
1702        break;
1703      }
1704    } while (status.getFirst() != 0 && i++ < 500);
1705    if (status.getFirst() != 0) {
1706      throw new IOException("Failed to update all regions even after 500 seconds.");
1707    }
1708  }
1709
1710  /**
1711   * Set the number of Region replicas.
1712   */
1713  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1714      throws IOException, InterruptedException {
1715    admin.disableTable(table);
1716    HTableDescriptor desc = new HTableDescriptor(admin.getTableDescriptor(table));
1717    desc.setRegionReplication(replicaCount);
1718    admin.modifyTable(desc.getTableName(), desc);
1719    admin.enableTable(table);
1720  }
1721
1722  /**
1723   * Drop an existing table
1724   * @param tableName existing table
1725   */
1726  public void deleteTable(TableName tableName) throws IOException {
1727    try {
1728      getAdmin().disableTable(tableName);
1729    } catch (TableNotEnabledException e) {
1730      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1731    }
1732    getAdmin().deleteTable(tableName);
1733  }
1734
1735  /**
1736   * Drop an existing table
1737   * @param tableName existing table
1738   */
1739  public void deleteTableIfAny(TableName tableName) throws IOException {
1740    try {
1741      deleteTable(tableName);
1742    } catch (TableNotFoundException e) {
1743      // ignore
1744    }
1745  }
1746
1747  // ==========================================================================
1748  // Canned table and table descriptor creation
1749  // TODO replace HBaseTestCase
1750
1751  public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1752  public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1753  public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1754  public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1755  private static final int MAXVERSIONS = 3;
1756
1757  public static final char FIRST_CHAR = 'a';
1758  public static final char LAST_CHAR = 'z';
1759  public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1760  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1761
1762  /**
1763   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
1764   *   {@link #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)} instead.
1765   * @see #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)
1766   * @see <a href="https://issues.apache.org/jira/browse/HBASE-13893">HBASE-13893</a>
1767   */
1768  @Deprecated
1769  public HTableDescriptor createTableDescriptor(final String name,
1770      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1771    return this.createTableDescriptor(TableName.valueOf(name), minVersions, versions, ttl,
1772        keepDeleted);
1773  }
1774
1775  /**
1776   * Create a table of name <code>name</code>.
1777   * @param name Name to give table.
1778   * @return Column descriptor.
1779   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
1780   *   {@link #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)} instead.
1781   * @see #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)
1782   * @see <a href="https://issues.apache.org/jira/browse/HBASE-13893">HBASE-13893</a>
1783   */
1784  @Deprecated
1785  public HTableDescriptor createTableDescriptor(final String name) {
1786    return createTableDescriptor(TableName.valueOf(name),  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1787        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1788  }
1789
1790  public HTableDescriptor createTableDescriptor(final TableName name,
1791      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1792    HTableDescriptor htd = new HTableDescriptor(name);
1793    for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1794      HColumnDescriptor hcd = new HColumnDescriptor(cfName)
1795          .setMinVersions(minVersions)
1796          .setMaxVersions(versions)
1797          .setKeepDeletedCells(keepDeleted)
1798          .setBlockCacheEnabled(false)
1799          .setTimeToLive(ttl);
1800      if (isNewVersionBehaviorEnabled()) {
1801          hcd.setNewVersionBehavior(true);
1802      }
1803      htd.addFamily(hcd);
1804    }
1805    return htd;
1806  }
1807
1808  /**
1809   * Create a table of name <code>name</code>.
1810   * @param name Name to give table.
1811   * @return Column descriptor.
1812   */
1813  public HTableDescriptor createTableDescriptor(final TableName name) {
1814    return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1815        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1816  }
1817
1818  public HTableDescriptor createTableDescriptor(final TableName tableName,
1819      byte[] family) {
1820    return createTableDescriptor(tableName, new byte[][] {family}, 1);
1821  }
1822
1823  public HTableDescriptor createTableDescriptor(final TableName tableName,
1824      byte[][] families, int maxVersions) {
1825    HTableDescriptor desc = new HTableDescriptor(tableName);
1826    for (byte[] family : families) {
1827      HColumnDescriptor hcd = new HColumnDescriptor(family)
1828          .setMaxVersions(maxVersions);
1829      if (isNewVersionBehaviorEnabled()) {
1830          hcd.setNewVersionBehavior(true);
1831      }
1832      desc.addFamily(hcd);
1833    }
1834    return desc;
1835  }
1836
1837  /**
1838   * Create an HRegion that writes to the local tmp dirs
1839   * @param desc
1840   * @param startKey
1841   * @param endKey
1842   * @return
1843   * @throws IOException
1844   */
1845  public HRegion createLocalHRegion(TableDescriptor desc, byte [] startKey,
1846      byte [] endKey)
1847  throws IOException {
1848    HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
1849    return createLocalHRegion(hri, desc);
1850  }
1851
1852  /**
1853   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1854   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1855   */
1856  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1857    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1858  }
1859
1860  /**
1861   * Create an HRegion that writes to the local tmp dirs with specified wal
1862   * @param info regioninfo
1863   * @param desc table descriptor
1864   * @param wal wal for this region.
1865   * @return created hregion
1866   * @throws IOException
1867   */
1868  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal)
1869      throws IOException {
1870    return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
1871  }
1872
1873  /**
1874   * @param tableName the name of the table
1875   * @param startKey the start key of the region
1876   * @param stopKey the stop key of the region
1877   * @param callingMethod the name of the calling method probably a test method
1878   * @param conf the configuration to use
1879   * @param isReadOnly {@code true} if the table is read only, {@code false} otherwise
1880   * @param families the column families to use
1881   * @throws IOException if an IO problem is encountered
1882   * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
1883   *         when done.
1884   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
1885   *   {@link #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...)}
1886   *   instead.
1887   * @see #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...)
1888   * @see <a href="https://issues.apache.org/jira/browse/HBASE-13893">HBASE-13893</a>
1889   */
1890  @Deprecated
1891  public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1892      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1893      WAL wal, byte[]... families) throws IOException {
1894    return this
1895        .createLocalHRegion(TableName.valueOf(tableName), startKey, stopKey, isReadOnly, durability,
1896            wal, families);
1897  }
1898
1899  /**
1900   * @param tableName
1901   * @param startKey
1902   * @param stopKey
1903   * @param isReadOnly
1904   * @param families
1905   * @return A region on which you must call
1906   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
1907   * @throws IOException
1908   */
1909  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1910      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
1911    return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
1912        durability, wal, null, families);
1913  }
1914
1915  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1916      byte[] stopKey,
1917      boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
1918      byte[]... families)
1919      throws IOException {
1920    HTableDescriptor htd = new HTableDescriptor(tableName);
1921    htd.setReadOnly(isReadOnly);
1922    int i=0;
1923    for (byte[] family : families) {
1924      HColumnDescriptor hcd = new HColumnDescriptor(family);
1925      if(compactedMemStore != null && i < compactedMemStore.length) {
1926        hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1927      } else {
1928        hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1929
1930      }
1931      i++;
1932      // Set default to be three versions.
1933      hcd.setMaxVersions(Integer.MAX_VALUE);
1934      htd.addFamily(hcd);
1935    }
1936    htd.setDurability(durability);
1937    HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
1938    return createLocalHRegion(info, htd, wal);
1939  }
1940
1941  //
1942  // ==========================================================================
1943
1944  /**
1945   * Provide an existing table name to truncate.
1946   * Scans the table and issues a delete for each row read.
1947   * @param tableName existing table
1948   * @return HTable to that new table
1949   * @throws IOException
1950   */
1951  public Table deleteTableData(TableName tableName) throws IOException {
1952    Table table = getConnection().getTable(tableName);
1953    Scan scan = new Scan();
1954    ResultScanner resScan = table.getScanner(scan);
1955    for(Result res : resScan) {
1956      Delete del = new Delete(res.getRow());
1957      table.delete(del);
1958    }
1959    resScan = table.getScanner(scan);
1960    resScan.close();
1961    return table;
1962  }
1963
1964  /**
1965   * Truncate a table using the admin command.
1966   * Effectively disables, deletes, and recreates the table.
1967   * @param tableName table which must exist.
1968   * @param preserveRegions keep the existing split points
1969   * @return HTable for the new table
1970   */
1971  public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws
1972      IOException {
1973    Admin admin = getAdmin();
1974    if (!admin.isTableDisabled(tableName)) {
1975      admin.disableTable(tableName);
1976    }
1977    admin.truncateTable(tableName, preserveRegions);
1978    return getConnection().getTable(tableName);
1979  }
1980
1981  /**
1982   * Truncate a table using the admin command.
1983   * Effectively disables, deletes, and recreates the table.
1984   * For previous behavior of issuing row deletes, see
1985   * deleteTableData.
1986   * Expressly does not preserve regions of existing table.
1987   * @param tableName table which must exist.
1988   * @return HTable for the new table
1989   */
1990  public Table truncateTable(final TableName tableName) throws IOException {
1991    return truncateTable(tableName, false);
1992  }
1993
1994  /**
1995   * Load table with rows from 'aaa' to 'zzz'.
1996   * @param t Table
1997   * @param f Family
1998   * @return Count of rows loaded.
1999   * @throws IOException
2000   */
2001  public int loadTable(final Table t, final byte[] f) throws IOException {
2002    return loadTable(t, new byte[][] {f});
2003  }
2004
2005  /**
2006   * Load table with rows from 'aaa' to 'zzz'.
2007   * @param t Table
2008   * @param f Family
2009   * @return Count of rows loaded.
2010   * @throws IOException
2011   */
2012  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2013    return loadTable(t, new byte[][] {f}, null, writeToWAL);
2014  }
2015
2016  /**
2017   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2018   * @param t Table
2019   * @param f Array of Families to load
2020   * @return Count of rows loaded.
2021   * @throws IOException
2022   */
2023  public int loadTable(final Table t, final byte[][] f) throws IOException {
2024    return loadTable(t, f, null);
2025  }
2026
2027  /**
2028   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2029   * @param t Table
2030   * @param f Array of Families to load
2031   * @param value the values of the cells. If null is passed, the row key is used as value
2032   * @return Count of rows loaded.
2033   * @throws IOException
2034   */
2035  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2036    return loadTable(t, f, value, true);
2037  }
2038
2039  /**
2040   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2041   * @param t Table
2042   * @param f Array of Families to load
2043   * @param value the values of the cells. If null is passed, the row key is used as value
2044   * @return Count of rows loaded.
2045   * @throws IOException
2046   */
2047  public int loadTable(final Table t, final byte[][] f, byte[] value,
2048      boolean writeToWAL) throws IOException {
2049    List<Put> puts = new ArrayList<>();
2050    for (byte[] row : HBaseTestingUtility.ROWS) {
2051      Put put = new Put(row);
2052      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2053      for (int i = 0; i < f.length; i++) {
2054        byte[] value1 = value != null ? value : row;
2055        put.addColumn(f[i], f[i], value1);
2056      }
2057      puts.add(put);
2058    }
2059    t.put(puts);
2060    return puts.size();
2061  }
2062
2063  /** A tracker for tracking and validating table rows
2064   * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])}
2065   */
2066  public static class SeenRowTracker {
2067    int dim = 'z' - 'a' + 1;
2068    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2069    byte[] startRow;
2070    byte[] stopRow;
2071
2072    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2073      this.startRow = startRow;
2074      this.stopRow = stopRow;
2075    }
2076
2077    void reset() {
2078      for (byte[] row : ROWS) {
2079        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2080      }
2081    }
2082
2083    int i(byte b) {
2084      return b - 'a';
2085    }
2086
2087    public void addRow(byte[] row) {
2088      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2089    }
2090
2091    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2092     * all other rows none
2093     */
2094    public void validate() {
2095      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2096        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2097          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2098            int count = seenRows[i(b1)][i(b2)][i(b3)];
2099            int expectedCount = 0;
2100            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2101                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2102              expectedCount = 1;
2103            }
2104            if (count != expectedCount) {
2105              String row = new String(new byte[] {b1,b2,b3}, StandardCharsets.UTF_8);
2106              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
2107                  "instead of " + expectedCount);
2108            }
2109          }
2110        }
2111      }
2112    }
2113  }
2114
2115  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2116    return loadRegion(r, f, false);
2117  }
2118
2119  public int loadRegion(final Region r, final byte[] f) throws IOException {
2120    return loadRegion((HRegion)r, f);
2121  }
2122
2123  /**
2124   * Load region with rows from 'aaa' to 'zzz'.
2125   * @param r Region
2126   * @param f Family
2127   * @param flush flush the cache if true
2128   * @return Count of rows loaded.
2129   * @throws IOException
2130   */
2131  public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2132  throws IOException {
2133    byte[] k = new byte[3];
2134    int rowCount = 0;
2135    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2136      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2137        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2138          k[0] = b1;
2139          k[1] = b2;
2140          k[2] = b3;
2141          Put put = new Put(k);
2142          put.setDurability(Durability.SKIP_WAL);
2143          put.addColumn(f, null, k);
2144          if (r.getWAL() == null) {
2145            put.setDurability(Durability.SKIP_WAL);
2146          }
2147          int preRowCount = rowCount;
2148          int pause = 10;
2149          int maxPause = 1000;
2150          while (rowCount == preRowCount) {
2151            try {
2152              r.put(put);
2153              rowCount++;
2154            } catch (RegionTooBusyException e) {
2155              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2156              Threads.sleep(pause);
2157            }
2158          }
2159        }
2160      }
2161      if (flush) {
2162        r.flush(true);
2163      }
2164    }
2165    return rowCount;
2166  }
2167
2168  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2169      throws IOException {
2170    for (int i = startRow; i < endRow; i++) {
2171      byte[] data = Bytes.toBytes(String.valueOf(i));
2172      Put put = new Put(data);
2173      put.addColumn(f, null, data);
2174      t.put(put);
2175    }
2176  }
2177
2178  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2179      throws IOException {
2180    Random r = new Random();
2181    byte[] row = new byte[rowSize];
2182    for (int i = 0; i < totalRows; i++) {
2183      r.nextBytes(row);
2184      Put put = new Put(row);
2185      put.addColumn(f, new byte[]{0}, new byte[]{0});
2186      t.put(put);
2187    }
2188  }
2189
2190  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2191      int replicaId)
2192      throws IOException {
2193    for (int i = startRow; i < endRow; i++) {
2194      String failMsg = "Failed verification of row :" + i;
2195      byte[] data = Bytes.toBytes(String.valueOf(i));
2196      Get get = new Get(data);
2197      get.setReplicaId(replicaId);
2198      get.setConsistency(Consistency.TIMELINE);
2199      Result result = table.get(get);
2200      assertTrue(failMsg, result.containsColumn(f, null));
2201      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2202      Cell cell = result.getColumnLatestCell(f, null);
2203      assertTrue(failMsg,
2204        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2205          cell.getValueLength()));
2206    }
2207  }
2208
2209  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2210      throws IOException {
2211    verifyNumericRows((HRegion)region, f, startRow, endRow);
2212  }
2213
2214  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2215      throws IOException {
2216    verifyNumericRows(region, f, startRow, endRow, true);
2217  }
2218
2219  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2220      final boolean present) throws IOException {
2221    verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2222  }
2223
2224  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2225      final boolean present) throws IOException {
2226    for (int i = startRow; i < endRow; i++) {
2227      String failMsg = "Failed verification of row :" + i;
2228      byte[] data = Bytes.toBytes(String.valueOf(i));
2229      Result result = region.get(new Get(data));
2230
2231      boolean hasResult = result != null && !result.isEmpty();
2232      assertEquals(failMsg + result, present, hasResult);
2233      if (!present) continue;
2234
2235      assertTrue(failMsg, result.containsColumn(f, null));
2236      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2237      Cell cell = result.getColumnLatestCell(f, null);
2238      assertTrue(failMsg,
2239        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2240          cell.getValueLength()));
2241    }
2242  }
2243
2244  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2245      throws IOException {
2246    for (int i = startRow; i < endRow; i++) {
2247      byte[] data = Bytes.toBytes(String.valueOf(i));
2248      Delete delete = new Delete(data);
2249      delete.addFamily(f);
2250      t.delete(delete);
2251    }
2252  }
2253
2254  /**
2255   * Return the number of rows in the given table.
2256   */
2257  public int countRows(final Table table) throws IOException {
2258    return countRows(table, new Scan());
2259  }
2260
2261  public int countRows(final Table table, final Scan scan) throws IOException {
2262    try (ResultScanner results = table.getScanner(scan)) {
2263      int count = 0;
2264      while (results.next() != null) {
2265        count++;
2266      }
2267      return count;
2268    }
2269  }
2270
2271  public int countRows(final Table table, final byte[]... families) throws IOException {
2272    Scan scan = new Scan();
2273    for (byte[] family: families) {
2274      scan.addFamily(family);
2275    }
2276    return countRows(table, scan);
2277  }
2278
2279  /**
2280   * Return the number of rows in the given table.
2281   */
2282  public int countRows(final TableName tableName) throws IOException {
2283    Table table = getConnection().getTable(tableName);
2284    try {
2285      return countRows(table);
2286    } finally {
2287      table.close();
2288    }
2289  }
2290
2291  public int countRows(final Region region) throws IOException {
2292    return countRows(region, new Scan());
2293  }
2294
2295  public int countRows(final Region region, final Scan scan) throws IOException {
2296    InternalScanner scanner = region.getScanner(scan);
2297    try {
2298      return countRows(scanner);
2299    } finally {
2300      scanner.close();
2301    }
2302  }
2303
2304  public int countRows(final InternalScanner scanner) throws IOException {
2305    int scannedCount = 0;
2306    List<Cell> results = new ArrayList<>();
2307    boolean hasMore = true;
2308    while (hasMore) {
2309      hasMore = scanner.next(results);
2310      scannedCount += results.size();
2311      results.clear();
2312    }
2313    return scannedCount;
2314  }
2315
2316  /**
2317   * Return an md5 digest of the entire contents of a table.
2318   */
2319  public String checksumRows(final Table table) throws Exception {
2320
2321    Scan scan = new Scan();
2322    ResultScanner results = table.getScanner(scan);
2323    MessageDigest digest = MessageDigest.getInstance("MD5");
2324    for (Result res : results) {
2325      digest.update(res.getRow());
2326    }
2327    results.close();
2328    return digest.toString();
2329  }
2330
2331  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2332  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2333  static {
2334    int i = 0;
2335    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2336      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2337        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2338          ROWS[i][0] = b1;
2339          ROWS[i][1] = b2;
2340          ROWS[i][2] = b3;
2341          i++;
2342        }
2343      }
2344    }
2345  }
2346
2347  public static final byte[][] KEYS = {
2348    HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2349    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2350    Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2351    Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2352    Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2353    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2354    Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2355    Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2356    Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2357  };
2358
2359  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2360      Bytes.toBytes("bbb"),
2361      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2362      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2363      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2364      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2365      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2366      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2367      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2368      Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2369  };
2370
2371  /**
2372   * Create rows in hbase:meta for regions of the specified table with the specified
2373   * start keys.  The first startKey should be a 0 length byte array if you
2374   * want to form a proper range of regions.
2375   * @param conf
2376   * @param htd
2377   * @param startKeys
2378   * @return list of region info for regions added to meta
2379   * @throws IOException
2380   * @deprecated since 2.0 version and will be removed in 3.0 version.
2381   *             use {@link #createMultiRegionsInMeta(Configuration, TableDescriptor, byte[][])}
2382   */
2383  @Deprecated
2384  public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2385      final HTableDescriptor htd, byte [][] startKeys) throws IOException {
2386    return createMultiRegionsInMeta(conf, (TableDescriptor) htd, startKeys)
2387        .stream().map(ImmutableHRegionInfo::new).collect(Collectors.toList());
2388  }
2389  /**
2390   * Create rows in hbase:meta for regions of the specified table with the specified
2391   * start keys.  The first startKey should be a 0 length byte array if you
2392   * want to form a proper range of regions.
2393   * @param conf
2394   * @param htd
2395   * @param startKeys
2396   * @return list of region info for regions added to meta
2397   * @throws IOException
2398   */
2399  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2400      final TableDescriptor htd, byte [][] startKeys)
2401  throws IOException {
2402    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2403    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2404    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2405    MetaTableAccessor
2406        .updateTableState(getConnection(), htd.getTableName(), TableState.State.ENABLED);
2407    // add custom ones
2408    for (int i = 0; i < startKeys.length; i++) {
2409      int j = (i + 1) % startKeys.length;
2410      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName())
2411          .setStartKey(startKeys[i])
2412          .setEndKey(startKeys[j])
2413          .build();
2414      MetaTableAccessor.addRegionToMeta(getConnection(), hri);
2415      newRegions.add(hri);
2416    }
2417
2418    meta.close();
2419    return newRegions;
2420  }
2421
2422  /**
2423   * Create an unmanaged WAL. Be sure to close it when you're through.
2424   */
2425  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2426      throws IOException {
2427    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2428    // unless I pass along via the conf.
2429    Configuration confForWAL = new Configuration(conf);
2430    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2431    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2432  }
2433
2434  /**
2435   * Create a region with it's own WAL. Be sure to call
2436   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2437   */
2438  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2439      final Configuration conf, final TableDescriptor htd) throws IOException {
2440    return createRegionAndWAL(info, rootDir, conf, htd, true);
2441  }
2442
2443  /**
2444   * Create a region with it's own WAL. Be sure to call
2445   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2446   */
2447  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2448      final Configuration conf, final TableDescriptor htd, boolean initialize)
2449      throws IOException {
2450    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2451    WAL wal = createWal(conf, rootDir, info);
2452    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2453  }
2454
2455  /**
2456   * Returns all rows from the hbase:meta table.
2457   *
2458   * @throws IOException When reading the rows fails.
2459   */
2460  public List<byte[]> getMetaTableRows() throws IOException {
2461    // TODO: Redo using MetaTableAccessor class
2462    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2463    List<byte[]> rows = new ArrayList<>();
2464    ResultScanner s = t.getScanner(new Scan());
2465    for (Result result : s) {
2466      LOG.info("getMetaTableRows: row -> " +
2467        Bytes.toStringBinary(result.getRow()));
2468      rows.add(result.getRow());
2469    }
2470    s.close();
2471    t.close();
2472    return rows;
2473  }
2474
2475  /**
2476   * Returns all rows from the hbase:meta table for a given user table
2477   *
2478   * @throws IOException When reading the rows fails.
2479   */
2480  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2481    // TODO: Redo using MetaTableAccessor.
2482    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2483    List<byte[]> rows = new ArrayList<>();
2484    ResultScanner s = t.getScanner(new Scan());
2485    for (Result result : s) {
2486      RegionInfo info = MetaTableAccessor.getRegionInfo(result);
2487      if (info == null) {
2488        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2489        // TODO figure out what to do for this new hosed case.
2490        continue;
2491      }
2492
2493      if (info.getTable().equals(tableName)) {
2494        LOG.info("getMetaTableRows: row -> " +
2495            Bytes.toStringBinary(result.getRow()) + info);
2496        rows.add(result.getRow());
2497      }
2498    }
2499    s.close();
2500    t.close();
2501    return rows;
2502  }
2503
2504  /**
2505   * Returns all regions of the specified table
2506   *
2507   * @param tableName the table name
2508   * @return all regions of the specified table
2509   * @throws IOException when getting the regions fails.
2510   */
2511  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2512    try (Admin admin = getConnection().getAdmin()) {
2513      return admin.getRegions(tableName);
2514    }
2515  }
2516
2517  /*
2518   * Find any other region server which is different from the one identified by parameter
2519   * @param rs
2520   * @return another region server
2521   */
2522  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2523    for (JVMClusterUtil.RegionServerThread rst :
2524      getMiniHBaseCluster().getRegionServerThreads()) {
2525      if (!(rst.getRegionServer() == rs)) {
2526        return rst.getRegionServer();
2527      }
2528    }
2529    return null;
2530  }
2531
2532  /**
2533   * Tool to get the reference to the region server object that holds the
2534   * region of the specified user table.
2535   * @param tableName user table to lookup in hbase:meta
2536   * @return region server that holds it, null if the row doesn't exist
2537   * @throws IOException
2538   * @throws InterruptedException
2539   */
2540  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2541      throws IOException, InterruptedException {
2542    List<RegionInfo> regions = getRegions(tableName);
2543    if (regions == null || regions.isEmpty()) {
2544      return null;
2545    }
2546    LOG.debug("Found " + regions.size() + " regions for table " +
2547        tableName);
2548
2549    byte[] firstRegionName = regions.stream()
2550        .filter(r -> !r.isOffline())
2551        .map(RegionInfo::getRegionName)
2552        .findFirst()
2553        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2554
2555    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2556    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2557      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2558    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2559      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2560    RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2561    while(retrier.shouldRetry()) {
2562      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2563      if (index != -1) {
2564        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2565      }
2566      // Came back -1.  Region may not be online yet.  Sleep a while.
2567      retrier.sleepUntilNextRetry();
2568    }
2569    return null;
2570  }
2571
2572  /**
2573   * Starts a <code>MiniMRCluster</code> with a default number of
2574   * <code>TaskTracker</code>'s.
2575   *
2576   * @throws IOException When starting the cluster fails.
2577   */
2578  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2579    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2580    conf.setIfUnset(
2581        "yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2582        "99.0");
2583    startMiniMapReduceCluster(2);
2584    return mrCluster;
2585  }
2586
2587  /**
2588   * Tasktracker has a bug where changing the hadoop.log.dir system property
2589   * will not change its internal static LOG_DIR variable.
2590   */
2591  private void forceChangeTaskLogDir() {
2592    Field logDirField;
2593    try {
2594      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2595      logDirField.setAccessible(true);
2596
2597      Field modifiersField = Field.class.getDeclaredField("modifiers");
2598      modifiersField.setAccessible(true);
2599      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2600
2601      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2602    } catch (SecurityException e) {
2603      throw new RuntimeException(e);
2604    } catch (NoSuchFieldException e) {
2605      // TODO Auto-generated catch block
2606      throw new RuntimeException(e);
2607    } catch (IllegalArgumentException e) {
2608      throw new RuntimeException(e);
2609    } catch (IllegalAccessException e) {
2610      throw new RuntimeException(e);
2611    }
2612  }
2613
2614  /**
2615   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2616   * filesystem.
2617   * @param servers  The number of <code>TaskTracker</code>'s to start.
2618   * @throws IOException When starting the cluster fails.
2619   */
2620  private void startMiniMapReduceCluster(final int servers) throws IOException {
2621    if (mrCluster != null) {
2622      throw new IllegalStateException("MiniMRCluster is already running");
2623    }
2624    LOG.info("Starting mini mapreduce cluster...");
2625    setupClusterTestDir();
2626    createDirsAndSetProperties();
2627
2628    forceChangeTaskLogDir();
2629
2630    //// hadoop2 specific settings
2631    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2632    // we up the VM usable so that processes don't get killed.
2633    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2634
2635    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2636    // this avoids the problem by disabling speculative task execution in tests.
2637    conf.setBoolean("mapreduce.map.speculative", false);
2638    conf.setBoolean("mapreduce.reduce.speculative", false);
2639    ////
2640
2641    // Allow the user to override FS URI for this map-reduce cluster to use.
2642    mrCluster = new MiniMRCluster(servers,
2643      FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2644      null, null, new JobConf(this.conf));
2645    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2646    if (jobConf == null) {
2647      jobConf = mrCluster.createJobConf();
2648    }
2649
2650    jobConf.set("mapreduce.cluster.local.dir",
2651      conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2652    LOG.info("Mini mapreduce cluster started");
2653
2654    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2655    // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2656    // necessary config properties here.  YARN-129 required adding a few properties.
2657    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2658    // this for mrv2 support; mr1 ignores this
2659    conf.set("mapreduce.framework.name", "yarn");
2660    conf.setBoolean("yarn.is.minicluster", true);
2661    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2662    if (rmAddress != null) {
2663      conf.set("yarn.resourcemanager.address", rmAddress);
2664    }
2665    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2666    if (historyAddress != null) {
2667      conf.set("mapreduce.jobhistory.address", historyAddress);
2668    }
2669    String schedulerAddress =
2670      jobConf.get("yarn.resourcemanager.scheduler.address");
2671    if (schedulerAddress != null) {
2672      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2673    }
2674    String mrJobHistoryWebappAddress =
2675      jobConf.get("mapreduce.jobhistory.webapp.address");
2676    if (mrJobHistoryWebappAddress != null) {
2677      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2678    }
2679    String yarnRMWebappAddress =
2680      jobConf.get("yarn.resourcemanager.webapp.address");
2681    if (yarnRMWebappAddress != null) {
2682      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2683    }
2684  }
2685
2686  /**
2687   * Stops the previously started <code>MiniMRCluster</code>.
2688   */
2689  public void shutdownMiniMapReduceCluster() {
2690    if (mrCluster != null) {
2691      LOG.info("Stopping mini mapreduce cluster...");
2692      mrCluster.shutdown();
2693      mrCluster = null;
2694      LOG.info("Mini mapreduce cluster stopped");
2695    }
2696    // Restore configuration to point to local jobtracker
2697    conf.set("mapreduce.jobtracker.address", "local");
2698  }
2699
2700  /**
2701   * Create a stubbed out RegionServerService, mainly for getting FS.
2702   */
2703  public RegionServerServices createMockRegionServerService() throws IOException {
2704    return createMockRegionServerService((ServerName)null);
2705  }
2706
2707  /**
2708   * Create a stubbed out RegionServerService, mainly for getting FS.
2709   * This version is used by TestTokenAuthentication
2710   */
2711  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
2712      IOException {
2713    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2714    rss.setFileSystem(getTestFileSystem());
2715    rss.setRpcServer(rpc);
2716    return rss;
2717  }
2718
2719  /**
2720   * Create a stubbed out RegionServerService, mainly for getting FS.
2721   * This version is used by TestOpenRegionHandler
2722   */
2723  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2724    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2725    rss.setFileSystem(getTestFileSystem());
2726    return rss;
2727  }
2728
2729  /**
2730   * Switches the logger for the given class to DEBUG level.
2731   *
2732   * @param clazz  The class for which to switch to debug logging.
2733   */
2734  public void enableDebug(Class<?> clazz) {
2735    Logger l = LoggerFactory.getLogger(clazz);
2736    if (l instanceof Log4JLogger) {
2737      ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2738    } else if (l instanceof Log4jLoggerAdapter) {
2739      LogManager.getLogger(clazz).setLevel(org.apache.log4j.Level.DEBUG);
2740    } else if (l instanceof Jdk14Logger) {
2741      ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2742    }
2743  }
2744
2745  /**
2746   * Expire the Master's session
2747   * @throws Exception
2748   */
2749  public void expireMasterSession() throws Exception {
2750    HMaster master = getMiniHBaseCluster().getMaster();
2751    expireSession(master.getZooKeeper(), false);
2752  }
2753
2754  /**
2755   * Expire a region server's session
2756   * @param index which RS
2757   */
2758  public void expireRegionServerSession(int index) throws Exception {
2759    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2760    expireSession(rs.getZooKeeper(), false);
2761    decrementMinRegionServerCount();
2762  }
2763
2764  /*
2765   * Doesn't need to be called when using expireRegionServerSession as it will automatically
2766   * decrement the min count
2767   */
2768  public void decrementMinRegionServerCount() {
2769    // decrement the count for this.conf, for newly spwaned master
2770    // this.hbaseCluster shares this configuration too
2771    decrementMinRegionServerCount(getConfiguration());
2772
2773    // each master thread keeps a copy of configuration
2774    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2775      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2776    }
2777  }
2778
2779  /*
2780   * Doesn't need to be called when using expireRegionServerSession as it will automatically
2781   * decrement the min count
2782   */
2783  public void decrementMinRegionServerCount(Configuration conf) {
2784    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2785    if (currentCount != -1) {
2786      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2787    }
2788  }
2789
2790  public void expireSession(ZKWatcher nodeZK) throws Exception {
2791   expireSession(nodeZK, false);
2792  }
2793
2794  /**
2795   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2796   * http://hbase.apache.org/book.html#trouble.zookeeper
2797   * There are issues when doing this:
2798   * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2799   * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2800   *
2801   * @param nodeZK - the ZK watcher to expire
2802   * @param checkStatus - true to check if we can create a Table with the
2803   *                    current configuration.
2804   */
2805  public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
2806    throws Exception {
2807    Configuration c = new Configuration(this.conf);
2808    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2809    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2810    byte[] password = zk.getSessionPasswd();
2811    long sessionID = zk.getSessionId();
2812
2813    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2814    //  so we create a first watcher to be sure that the
2815    //  event was sent. We expect that if our watcher receives the event
2816    //  other watchers on the same machine will get is as well.
2817    // When we ask to close the connection, ZK does not close it before
2818    //  we receive all the events, so don't have to capture the event, just
2819    //  closing the connection should be enough.
2820    ZooKeeper monitor = new ZooKeeper(quorumServers,
2821      1000, new org.apache.zookeeper.Watcher(){
2822      @Override
2823      public void process(WatchedEvent watchedEvent) {
2824        LOG.info("Monitor ZKW received event="+watchedEvent);
2825      }
2826    } , sessionID, password);
2827
2828    // Making it expire
2829    ZooKeeper newZK = new ZooKeeper(quorumServers,
2830        1000, EmptyWatcher.instance, sessionID, password);
2831
2832    //ensure that we have connection to the server before closing down, otherwise
2833    //the close session event will be eaten out before we start CONNECTING state
2834    long start = System.currentTimeMillis();
2835    while (newZK.getState() != States.CONNECTED
2836         && System.currentTimeMillis() - start < 1000) {
2837       Thread.sleep(1);
2838    }
2839    newZK.close();
2840    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2841
2842    // Now closing & waiting to be sure that the clients get it.
2843    monitor.close();
2844
2845    if (checkStatus) {
2846      getConnection().getTable(TableName.META_TABLE_NAME).close();
2847    }
2848  }
2849
2850  /**
2851   * Get the Mini HBase cluster.
2852   *
2853   * @return hbase cluster
2854   * @see #getHBaseClusterInterface()
2855   */
2856  public MiniHBaseCluster getHBaseCluster() {
2857    return getMiniHBaseCluster();
2858  }
2859
2860  /**
2861   * Returns the HBaseCluster instance.
2862   * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2863   * tests referring this should not assume that the cluster is a mini cluster or a
2864   * distributed one. If the test only works on a mini cluster, then specific
2865   * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2866   * need to type-cast.
2867   */
2868  public HBaseCluster getHBaseClusterInterface() {
2869    //implementation note: we should rename this method as #getHBaseCluster(),
2870    //but this would require refactoring 90+ calls.
2871    return hbaseCluster;
2872  }
2873
2874  /**
2875   * Get a Connection to the cluster.
2876   * Not thread-safe (This class needs a lot of work to make it thread-safe).
2877   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2878   * @throws IOException
2879   */
2880  public Connection getConnection() throws IOException {
2881    if (this.connection == null) {
2882      this.connection = ConnectionFactory.createConnection(this.conf);
2883    }
2884    return this.connection;
2885  }
2886
2887  /**
2888   * Returns a Admin instance.
2889   * This instance is shared between HBaseTestingUtility instance users. Closing it has no effect,
2890   * it will be closed automatically when the cluster shutdowns
2891   *
2892   * @return HBaseAdmin instance which is guaranteed to support only {@link Admin} interface.
2893   *   Functions in HBaseAdmin not provided by {@link Admin} interface can be changed/deleted
2894   *   anytime.
2895   * @deprecated Since 2.0. Will be removed in 3.0. Use {@link #getAdmin()} instead.
2896   */
2897  @Deprecated
2898  public synchronized HBaseAdmin getHBaseAdmin()
2899  throws IOException {
2900    if (hbaseAdmin == null){
2901      this.hbaseAdmin = (HBaseAdmin) getConnection().getAdmin();
2902    }
2903    return hbaseAdmin;
2904  }
2905
2906  /**
2907   * Returns an Admin instance which is shared between HBaseTestingUtility instance users.
2908   * Closing it has no effect, it will be closed automatically when the cluster shutdowns
2909   */
2910  public synchronized Admin getAdmin() throws IOException {
2911    if (hbaseAdmin == null){
2912      this.hbaseAdmin = (HBaseAdmin) getConnection().getAdmin();
2913    }
2914    return hbaseAdmin;
2915  }
2916
2917  private HBaseAdmin hbaseAdmin = null;
2918
2919  /**
2920   * Returns an {@link Hbck} instance. Needs be closed when done.
2921   */
2922  public Hbck getHbck() throws IOException {
2923    return ((ClusterConnection) getConnection()).getHbck();
2924  }
2925
2926  /**
2927   * Unassign the named region.
2928   *
2929   * @param regionName  The region to unassign.
2930   */
2931  public void unassignRegion(String regionName) throws IOException {
2932    unassignRegion(Bytes.toBytes(regionName));
2933  }
2934
2935  /**
2936   * Unassign the named region.
2937   *
2938   * @param regionName  The region to unassign.
2939   */
2940  public void unassignRegion(byte[] regionName) throws IOException {
2941    getAdmin().unassign(regionName, true);
2942  }
2943
2944  /**
2945   * Closes the region containing the given row.
2946   *
2947   * @param row  The row to find the containing region.
2948   * @param table  The table to find the region.
2949   */
2950  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2951    unassignRegionByRow(Bytes.toBytes(row), table);
2952  }
2953
2954  /**
2955   * Closes the region containing the given row.
2956   *
2957   * @param row  The row to find the containing region.
2958   * @param table  The table to find the region.
2959   * @throws IOException
2960   */
2961  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2962    HRegionLocation hrl = table.getRegionLocation(row);
2963    unassignRegion(hrl.getRegionInfo().getRegionName());
2964  }
2965
2966  /*
2967   * Retrieves a splittable region randomly from tableName
2968   *
2969   * @param tableName name of table
2970   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2971   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2972   */
2973  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2974    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2975    int regCount = regions.size();
2976    Set<Integer> attempted = new HashSet<>();
2977    int idx;
2978    int attempts = 0;
2979    do {
2980      regions = getHBaseCluster().getRegions(tableName);
2981      if (regCount != regions.size()) {
2982        // if there was region movement, clear attempted Set
2983        attempted.clear();
2984      }
2985      regCount = regions.size();
2986      // There are chances that before we get the region for the table from an RS the region may
2987      // be going for CLOSE.  This may be because online schema change is enabled
2988      if (regCount > 0) {
2989        idx = random.nextInt(regCount);
2990        // if we have just tried this region, there is no need to try again
2991        if (attempted.contains(idx))
2992          continue;
2993        try {
2994          regions.get(idx).checkSplit();
2995          return regions.get(idx);
2996        } catch (Exception ex) {
2997          LOG.warn("Caught exception", ex);
2998          attempted.add(idx);
2999        }
3000      }
3001      attempts++;
3002    } while (maxAttempts == -1 || attempts < maxAttempts);
3003    return null;
3004  }
3005
3006  public MiniDFSCluster getDFSCluster() {
3007    return dfsCluster;
3008  }
3009
3010  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3011    setDFSCluster(cluster, true);
3012  }
3013
3014  /**
3015   * Set the MiniDFSCluster
3016   * @param cluster cluster to use
3017   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3018   * it is set.
3019   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3020   * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3021   */
3022  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3023      throws IllegalStateException, IOException {
3024    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3025      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3026    }
3027    this.dfsCluster = cluster;
3028    this.setFs();
3029  }
3030
3031  public FileSystem getTestFileSystem() throws IOException {
3032    return HFileSystem.get(conf);
3033  }
3034
3035  /**
3036   * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3037   * (30 seconds).
3038   * @param table Table to wait on.
3039   * @throws InterruptedException
3040   * @throws IOException
3041   */
3042  public void waitTableAvailable(TableName table)
3043      throws InterruptedException, IOException {
3044    waitTableAvailable(table.getName(), 30000);
3045  }
3046
3047  public void waitTableAvailable(TableName table, long timeoutMillis)
3048      throws InterruptedException, IOException {
3049    waitFor(timeoutMillis, predicateTableAvailable(table));
3050  }
3051
3052  /**
3053   * Wait until all regions in a table have been assigned
3054   * @param table Table to wait on.
3055   * @param timeoutMillis Timeout.
3056   * @throws InterruptedException
3057   * @throws IOException
3058   */
3059  public void waitTableAvailable(byte[] table, long timeoutMillis)
3060  throws InterruptedException, IOException {
3061    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3062  }
3063
3064  public String explainTableAvailability(TableName tableName) throws IOException {
3065    String msg = explainTableState(tableName, TableState.State.ENABLED) + ", ";
3066    if (getHBaseCluster().getMaster().isAlive()) {
3067      Map<RegionInfo, ServerName> assignments =
3068          getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
3069              .getRegionAssignments();
3070      final List<Pair<RegionInfo, ServerName>> metaLocations =
3071          MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
3072      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3073        RegionInfo hri = metaLocation.getFirst();
3074        ServerName sn = metaLocation.getSecond();
3075        if (!assignments.containsKey(hri)) {
3076          msg += ", region " + hri
3077              + " not assigned, but found in meta, it expected to be on " + sn;
3078
3079        } else if (sn == null) {
3080          msg += ",  region " + hri
3081              + " assigned,  but has no server in meta";
3082        } else if (!sn.equals(assignments.get(hri))) {
3083          msg += ",  region " + hri
3084              + " assigned,  but has different servers in meta and AM ( " +
3085              sn + " <> " + assignments.get(hri);
3086        }
3087      }
3088    }
3089    return msg;
3090  }
3091
3092  public String explainTableState(final TableName table, TableState.State state)
3093      throws IOException {
3094    TableState tableState = MetaTableAccessor.getTableState(connection, table);
3095    if (tableState == null) {
3096      return "TableState in META: No table state in META for table " + table
3097          + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3098    } else if (!tableState.inStates(state)) {
3099      return "TableState in META: Not " + state + " state, but " + tableState;
3100    } else {
3101      return "TableState in META: OK";
3102    }
3103  }
3104
3105  @Nullable
3106  public TableState findLastTableState(final TableName table) throws IOException {
3107    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3108    MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3109      @Override
3110      public boolean visit(Result r) throws IOException {
3111        if (!Arrays.equals(r.getRow(), table.getName()))
3112          return false;
3113        TableState state = MetaTableAccessor.getTableState(r);
3114        if (state != null)
3115          lastTableState.set(state);
3116        return true;
3117      }
3118    };
3119    MetaTableAccessor
3120        .scanMeta(connection, null, null,
3121            MetaTableAccessor.QueryType.TABLE,
3122            Integer.MAX_VALUE, visitor);
3123    return lastTableState.get();
3124  }
3125
3126  /**
3127   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3128   * regions have been all assigned.  Will timeout after default period (30 seconds)
3129   * Tolerates nonexistent table.
3130   * @param table the table to wait on.
3131   * @throws InterruptedException if interrupted while waiting
3132   * @throws IOException if an IO problem is encountered
3133   */
3134  public void waitTableEnabled(TableName table)
3135      throws InterruptedException, IOException {
3136    waitTableEnabled(table, 30000);
3137  }
3138
3139  /**
3140   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3141   * regions have been all assigned.
3142   * @see #waitTableEnabled(TableName, long)
3143   * @param table Table to wait on.
3144   * @param timeoutMillis Time to wait on it being marked enabled.
3145   * @throws InterruptedException
3146   * @throws IOException
3147   */
3148  public void waitTableEnabled(byte[] table, long timeoutMillis)
3149  throws InterruptedException, IOException {
3150    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3151  }
3152
3153  public void waitTableEnabled(TableName table, long timeoutMillis)
3154  throws IOException {
3155    waitFor(timeoutMillis, predicateTableEnabled(table));
3156  }
3157
3158  /**
3159   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3160   * Will timeout after default period (30 seconds)
3161   * @param table Table to wait on.
3162   * @throws InterruptedException
3163   * @throws IOException
3164   */
3165  public void waitTableDisabled(byte[] table)
3166          throws InterruptedException, IOException {
3167    waitTableDisabled(table, 30000);
3168  }
3169
3170  public void waitTableDisabled(TableName table, long millisTimeout)
3171          throws InterruptedException, IOException {
3172    waitFor(millisTimeout, predicateTableDisabled(table));
3173  }
3174
3175  /**
3176   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3177   * @param table Table to wait on.
3178   * @param timeoutMillis Time to wait on it being marked disabled.
3179   * @throws InterruptedException
3180   * @throws IOException
3181   */
3182  public void waitTableDisabled(byte[] table, long timeoutMillis)
3183          throws InterruptedException, IOException {
3184    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3185  }
3186
3187  /**
3188   * Make sure that at least the specified number of region servers
3189   * are running
3190   * @param num minimum number of region servers that should be running
3191   * @return true if we started some servers
3192   * @throws IOException
3193   */
3194  public boolean ensureSomeRegionServersAvailable(final int num)
3195      throws IOException {
3196    boolean startedServer = false;
3197    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3198    for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3199      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3200      startedServer = true;
3201    }
3202
3203    return startedServer;
3204  }
3205
3206
3207  /**
3208   * Make sure that at least the specified number of region servers
3209   * are running. We don't count the ones that are currently stopping or are
3210   * stopped.
3211   * @param num minimum number of region servers that should be running
3212   * @return true if we started some servers
3213   * @throws IOException
3214   */
3215  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3216    throws IOException {
3217    boolean startedServer = ensureSomeRegionServersAvailable(num);
3218
3219    int nonStoppedServers = 0;
3220    for (JVMClusterUtil.RegionServerThread rst :
3221      getMiniHBaseCluster().getRegionServerThreads()) {
3222
3223      HRegionServer hrs = rst.getRegionServer();
3224      if (hrs.isStopping() || hrs.isStopped()) {
3225        LOG.info("A region server is stopped or stopping:"+hrs);
3226      } else {
3227        nonStoppedServers++;
3228      }
3229    }
3230    for (int i=nonStoppedServers; i<num; ++i) {
3231      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3232      startedServer = true;
3233    }
3234    return startedServer;
3235  }
3236
3237
3238  /**
3239   * This method clones the passed <code>c</code> configuration setting a new
3240   * user into the clone.  Use it getting new instances of FileSystem.  Only
3241   * works for DistributedFileSystem w/o Kerberos.
3242   * @param c Initial configuration
3243   * @param differentiatingSuffix Suffix to differentiate this user from others.
3244   * @return A new configuration instance with a different user set into it.
3245   * @throws IOException
3246   */
3247  public static User getDifferentUser(final Configuration c,
3248    final String differentiatingSuffix)
3249  throws IOException {
3250    FileSystem currentfs = FileSystem.get(c);
3251    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3252      return User.getCurrent();
3253    }
3254    // Else distributed filesystem.  Make a new instance per daemon.  Below
3255    // code is taken from the AppendTestUtil over in hdfs.
3256    String username = User.getCurrent().getName() +
3257      differentiatingSuffix;
3258    User user = User.createUserForTesting(c, username,
3259        new String[]{"supergroup"});
3260    return user;
3261  }
3262
3263  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3264      throws IOException {
3265    NavigableSet<String> online = new TreeSet<>();
3266    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3267      try {
3268        for (RegionInfo region :
3269            ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3270          online.add(region.getRegionNameAsString());
3271        }
3272      } catch (RegionServerStoppedException e) {
3273        // That's fine.
3274      }
3275    }
3276    for (MasterThread mt : cluster.getLiveMasterThreads()) {
3277      try {
3278        for (RegionInfo region :
3279            ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3280          online.add(region.getRegionNameAsString());
3281        }
3282      } catch (RegionServerStoppedException e) {
3283        // That's fine.
3284      } catch (ServerNotRunningYetException e) {
3285        // That's fine.
3286      }
3287    }
3288    return online;
3289  }
3290
3291  /**
3292   * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3293   * makes tests linger.  Here is the exception you'll see:
3294   * <pre>
3295   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3296   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3297   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3298   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3299   * </pre>
3300   * @param stream A DFSClient.DFSOutputStream.
3301   * @param max
3302   * @throws NoSuchFieldException
3303   * @throws SecurityException
3304   * @throws IllegalAccessException
3305   * @throws IllegalArgumentException
3306   */
3307  public static void setMaxRecoveryErrorCount(final OutputStream stream,
3308      final int max) {
3309    try {
3310      Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3311      for (Class<?> clazz: clazzes) {
3312        String className = clazz.getSimpleName();
3313        if (className.equals("DFSOutputStream")) {
3314          if (clazz.isInstance(stream)) {
3315            Field maxRecoveryErrorCountField =
3316              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3317            maxRecoveryErrorCountField.setAccessible(true);
3318            maxRecoveryErrorCountField.setInt(stream, max);
3319            break;
3320          }
3321        }
3322      }
3323    } catch (Exception e) {
3324      LOG.info("Could not set max recovery field", e);
3325    }
3326  }
3327
3328  /**
3329   * Uses directly the assignment manager to assign the region. and waits until the specified region
3330   * has completed assignment.
3331   * @return true if the region is assigned false otherwise.
3332   */
3333  public boolean assignRegion(final RegionInfo regionInfo)
3334      throws IOException, InterruptedException {
3335    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3336    am.assign(regionInfo);
3337    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3338  }
3339
3340  /**
3341   * Move region to destination server and wait till region is completely moved and online
3342   *
3343   * @param destRegion region to move
3344   * @param destServer destination server of the region
3345   * @throws InterruptedException
3346   * @throws IOException
3347   */
3348  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3349      throws InterruptedException, IOException {
3350    HMaster master = getMiniHBaseCluster().getMaster();
3351    // TODO: Here we start the move. The move can take a while.
3352    getAdmin().move(destRegion.getEncodedNameAsBytes(),
3353        Bytes.toBytes(destServer.getServerName()));
3354    while (true) {
3355      ServerName serverName = master.getAssignmentManager().getRegionStates()
3356          .getRegionServerOfRegion(destRegion);
3357      if (serverName != null && serverName.equals(destServer)) {
3358        assertRegionOnServer(destRegion, serverName, 2000);
3359        break;
3360      }
3361      Thread.sleep(10);
3362    }
3363  }
3364
3365  /**
3366   * Wait until all regions for a table in hbase:meta have a non-empty
3367   * info:server, up to a configuable timeout value (default is 60 seconds)
3368   * This means all regions have been deployed,
3369   * master has been informed and updated hbase:meta with the regions deployed
3370   * server.
3371   * @param tableName the table name
3372   * @throws IOException
3373   */
3374  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3375    waitUntilAllRegionsAssigned(tableName,
3376      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3377  }
3378
3379  /**
3380   * Waith until all system table's regions get assigned
3381   * @throws IOException
3382   */
3383  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3384    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3385    waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME);
3386  }
3387
3388  /**
3389   * Wait until all regions for a table in hbase:meta have a non-empty
3390   * info:server, or until timeout.  This means all regions have been deployed,
3391   * master has been informed and updated hbase:meta with the regions deployed
3392   * server.
3393   * @param tableName the table name
3394   * @param timeout timeout, in milliseconds
3395   * @throws IOException
3396   */
3397  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3398      throws IOException {
3399    if (!TableName.isMetaTableName(tableName)) {
3400      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3401        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " +
3402            timeout + "ms");
3403        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3404          @Override
3405          public String explainFailure() throws IOException {
3406            return explainTableAvailability(tableName);
3407          }
3408
3409          @Override
3410          public boolean evaluate() throws IOException {
3411            Scan scan = new Scan();
3412            scan.addFamily(HConstants.CATALOG_FAMILY);
3413            boolean tableFound = false;
3414            try (ResultScanner s = meta.getScanner(scan)) {
3415              for (Result r; (r = s.next()) != null;) {
3416                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3417                HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3418                if (info != null && info.getTable().equals(tableName)) {
3419                  // Get server hosting this region from catalog family. Return false if no server
3420                  // hosting this region, or if the server hosting this region was recently killed
3421                  // (for fault tolerance testing).
3422                  tableFound = true;
3423                  byte[] server =
3424                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3425                  if (server == null) {
3426                    return false;
3427                  } else {
3428                    byte[] startCode =
3429                        r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3430                    ServerName serverName =
3431                        ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," +
3432                            Bytes.toLong(startCode));
3433                    if (!getHBaseClusterInterface().isDistributedCluster() &&
3434                        getHBaseCluster().isKilledRS(serverName)) {
3435                      return false;
3436                    }
3437                  }
3438                  if (RegionStateStore.getRegionState(r,
3439                    info.getReplicaId()) != RegionState.State.OPEN) {
3440                    return false;
3441                  }
3442                }
3443              }
3444            }
3445            if (!tableFound) {
3446              LOG.warn("Didn't find the entries for table " + tableName + " in meta, already deleted?");
3447            }
3448            return tableFound;
3449          }
3450        });
3451      }
3452    }
3453    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3454    // check from the master state if we are using a mini cluster
3455    if (!getHBaseClusterInterface().isDistributedCluster()) {
3456      // So, all regions are in the meta table but make sure master knows of the assignments before
3457      // returning -- sometimes this can lag.
3458      HMaster master = getHBaseCluster().getMaster();
3459      final RegionStates states = master.getAssignmentManager().getRegionStates();
3460      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3461        @Override
3462        public String explainFailure() throws IOException {
3463          return explainTableAvailability(tableName);
3464        }
3465
3466        @Override
3467        public boolean evaluate() throws IOException {
3468          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3469          return hris != null && !hris.isEmpty();
3470        }
3471      });
3472    }
3473    LOG.info("All regions for table " + tableName + " assigned.");
3474  }
3475
3476  /**
3477   * Do a small get/scan against one store. This is required because store
3478   * has no actual methods of querying itself, and relies on StoreScanner.
3479   */
3480  public static List<Cell> getFromStoreFile(HStore store,
3481                                                Get get) throws IOException {
3482    Scan scan = new Scan(get);
3483    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3484        scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3485        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3486        // readpoint 0.
3487        0);
3488
3489    List<Cell> result = new ArrayList<>();
3490    scanner.next(result);
3491    if (!result.isEmpty()) {
3492      // verify that we are on the row we want:
3493      Cell kv = result.get(0);
3494      if (!CellUtil.matchingRows(kv, get.getRow())) {
3495        result.clear();
3496      }
3497    }
3498    scanner.close();
3499    return result;
3500  }
3501
3502  /**
3503   * Create region split keys between startkey and endKey
3504   *
3505   * @param startKey
3506   * @param endKey
3507   * @param numRegions the number of regions to be created. it has to be greater than 3.
3508   * @return resulting split keys
3509   */
3510  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3511    assertTrue(numRegions>3);
3512    byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3513    byte [][] result = new byte[tmpSplitKeys.length+1][];
3514    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3515    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3516    return result;
3517  }
3518
3519  /**
3520   * Do a small get/scan against one store. This is required because store
3521   * has no actual methods of querying itself, and relies on StoreScanner.
3522   */
3523  public static List<Cell> getFromStoreFile(HStore store,
3524                                                byte [] row,
3525                                                NavigableSet<byte[]> columns
3526                                                ) throws IOException {
3527    Get get = new Get(row);
3528    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3529    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3530
3531    return getFromStoreFile(store,get);
3532  }
3533
3534  public static void assertKVListsEqual(String additionalMsg,
3535      final List<? extends Cell> expected,
3536      final List<? extends Cell> actual) {
3537    final int eLen = expected.size();
3538    final int aLen = actual.size();
3539    final int minLen = Math.min(eLen, aLen);
3540
3541    int i;
3542    for (i = 0; i < minLen
3543        && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0;
3544        ++i) {}
3545
3546    if (additionalMsg == null) {
3547      additionalMsg = "";
3548    }
3549    if (!additionalMsg.isEmpty()) {
3550      additionalMsg = ". " + additionalMsg;
3551    }
3552
3553    if (eLen != aLen || i != minLen) {
3554      throw new AssertionError(
3555          "Expected and actual KV arrays differ at position " + i + ": " +
3556          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3557          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3558    }
3559  }
3560
3561  public static <T> String safeGetAsStr(List<T> lst, int i) {
3562    if (0 <= i && i < lst.size()) {
3563      return lst.get(i).toString();
3564    } else {
3565      return "<out_of_range>";
3566    }
3567  }
3568
3569  public String getClusterKey() {
3570    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3571        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3572        + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3573            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3574  }
3575
3576  /** Creates a random table with the given parameters */
3577  public Table createRandomTable(TableName tableName,
3578      final Collection<String> families,
3579      final int maxVersions,
3580      final int numColsPerRow,
3581      final int numFlushes,
3582      final int numRegions,
3583      final int numRowsPerFlush)
3584      throws IOException, InterruptedException {
3585
3586    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3587        " regions, " + numFlushes + " storefiles per region, " +
3588        numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3589        "\n");
3590
3591    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3592    final int numCF = families.size();
3593    final byte[][] cfBytes = new byte[numCF][];
3594    {
3595      int cfIndex = 0;
3596      for (String cf : families) {
3597        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3598      }
3599    }
3600
3601    final int actualStartKey = 0;
3602    final int actualEndKey = Integer.MAX_VALUE;
3603    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3604    final int splitStartKey = actualStartKey + keysPerRegion;
3605    final int splitEndKey = actualEndKey - keysPerRegion;
3606    final String keyFormat = "%08x";
3607    final Table table = createTable(tableName, cfBytes,
3608        maxVersions,
3609        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3610        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3611        numRegions);
3612
3613    if (hbaseCluster != null) {
3614      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3615    }
3616
3617    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3618
3619    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3620      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3621        final byte[] row = Bytes.toBytes(String.format(keyFormat,
3622            actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3623
3624        Put put = new Put(row);
3625        Delete del = new Delete(row);
3626        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3627          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3628          final long ts = rand.nextInt();
3629          final byte[] qual = Bytes.toBytes("col" + iCol);
3630          if (rand.nextBoolean()) {
3631            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3632                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3633                ts + "_random_" + rand.nextLong());
3634            put.addColumn(cf, qual, ts, value);
3635          } else if (rand.nextDouble() < 0.8) {
3636            del.addColumn(cf, qual, ts);
3637          } else {
3638            del.addColumns(cf, qual, ts);
3639          }
3640        }
3641
3642        if (!put.isEmpty()) {
3643          mutator.mutate(put);
3644        }
3645
3646        if (!del.isEmpty()) {
3647          mutator.mutate(del);
3648        }
3649      }
3650      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3651      mutator.flush();
3652      if (hbaseCluster != null) {
3653        getMiniHBaseCluster().flushcache(table.getName());
3654      }
3655    }
3656    mutator.close();
3657
3658    return table;
3659  }
3660
3661  private static Random random = new Random();
3662
3663  private static final PortAllocator portAllocator = new PortAllocator(random);
3664
3665  public static int randomFreePort() {
3666    return portAllocator.randomFreePort();
3667  }
3668
3669  static class PortAllocator {
3670    private static final int MIN_RANDOM_PORT = 0xc000;
3671    private static final int MAX_RANDOM_PORT = 0xfffe;
3672
3673    /** A set of ports that have been claimed using {@link #randomFreePort()}. */
3674    private final Set<Integer> takenRandomPorts = new HashSet<>();
3675
3676    private final Random random;
3677    private final AvailablePortChecker portChecker;
3678
3679    public PortAllocator(Random random) {
3680      this.random = random;
3681      this.portChecker = new AvailablePortChecker() {
3682        @Override
3683        public boolean available(int port) {
3684          try {
3685            ServerSocket sock = new ServerSocket(port);
3686            sock.close();
3687            return true;
3688          } catch (IOException ex) {
3689            return false;
3690          }
3691        }
3692      };
3693    }
3694
3695    public PortAllocator(Random random, AvailablePortChecker portChecker) {
3696      this.random = random;
3697      this.portChecker = portChecker;
3698    }
3699
3700    /**
3701     * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3702     * called from single-threaded test setup code/
3703     */
3704    public int randomFreePort() {
3705      int port = 0;
3706      do {
3707        port = randomPort();
3708        if (takenRandomPorts.contains(port)) {
3709          port = 0;
3710          continue;
3711        }
3712        takenRandomPorts.add(port);
3713
3714        if (!portChecker.available(port)) {
3715          port = 0;
3716        }
3717      } while (port == 0);
3718      return port;
3719    }
3720
3721    /**
3722     * Returns a random port. These ports cannot be registered with IANA and are
3723     * intended for dynamic allocation (see http://bit.ly/dynports).
3724     */
3725    private int randomPort() {
3726      return MIN_RANDOM_PORT
3727          + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3728    }
3729
3730    interface AvailablePortChecker {
3731      boolean available(int port);
3732    }
3733  }
3734
3735  public static String randomMultiCastAddress() {
3736    return "226.1.1." + random.nextInt(254);
3737  }
3738
3739  public static void waitForHostPort(String host, int port)
3740      throws IOException {
3741    final int maxTimeMs = 10000;
3742    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3743    IOException savedException = null;
3744    LOG.info("Waiting for server at " + host + ":" + port);
3745    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3746      try {
3747        Socket sock = new Socket(InetAddress.getByName(host), port);
3748        sock.close();
3749        savedException = null;
3750        LOG.info("Server at " + host + ":" + port + " is available");
3751        break;
3752      } catch (UnknownHostException e) {
3753        throw new IOException("Failed to look up " + host, e);
3754      } catch (IOException e) {
3755        savedException = e;
3756      }
3757      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3758    }
3759
3760    if (savedException != null) {
3761      throw savedException;
3762    }
3763  }
3764
3765  /**
3766   * Creates a pre-split table for load testing. If the table already exists,
3767   * logs a warning and continues.
3768   * @return the number of regions the table was split into
3769   */
3770  public static int createPreSplitLoadTestTable(Configuration conf,
3771      TableName tableName, byte[] columnFamily, Algorithm compression,
3772      DataBlockEncoding dataBlockEncoding) throws IOException {
3773    return createPreSplitLoadTestTable(conf, tableName,
3774      columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3775      Durability.USE_DEFAULT);
3776  }
3777  /**
3778   * Creates a pre-split table for load testing. If the table already exists,
3779   * logs a warning and continues.
3780   * @return the number of regions the table was split into
3781   */
3782  public static int createPreSplitLoadTestTable(Configuration conf,
3783      TableName tableName, byte[] columnFamily, Algorithm compression,
3784      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3785      Durability durability)
3786          throws IOException {
3787    HTableDescriptor desc = new HTableDescriptor(tableName);
3788    desc.setDurability(durability);
3789    desc.setRegionReplication(regionReplication);
3790    HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3791    hcd.setDataBlockEncoding(dataBlockEncoding);
3792    hcd.setCompressionType(compression);
3793    return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
3794  }
3795
3796  /**
3797   * Creates a pre-split table for load testing. If the table already exists,
3798   * logs a warning and continues.
3799   * @return the number of regions the table was split into
3800   */
3801  public static int createPreSplitLoadTestTable(Configuration conf,
3802      TableName tableName, byte[][] columnFamilies, Algorithm compression,
3803      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3804      Durability durability)
3805          throws IOException {
3806    HTableDescriptor desc = new HTableDescriptor(tableName);
3807    desc.setDurability(durability);
3808    desc.setRegionReplication(regionReplication);
3809    HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
3810    for (int i = 0; i < columnFamilies.length; i++) {
3811      HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
3812      hcd.setDataBlockEncoding(dataBlockEncoding);
3813      hcd.setCompressionType(compression);
3814      hcds[i] = hcd;
3815    }
3816    return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
3817  }
3818
3819  /**
3820   * Creates a pre-split table for load testing. If the table already exists,
3821   * logs a warning and continues.
3822   * @return the number of regions the table was split into
3823   */
3824  public static int createPreSplitLoadTestTable(Configuration conf,
3825      TableDescriptor desc, ColumnFamilyDescriptor hcd) throws IOException {
3826    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3827  }
3828
3829  /**
3830   * Creates a pre-split table for load testing. If the table already exists,
3831   * logs a warning and continues.
3832   * @return the number of regions the table was split into
3833   */
3834  public static int createPreSplitLoadTestTable(Configuration conf,
3835      TableDescriptor desc, ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3836    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] {hcd},
3837        numRegionsPerServer);
3838  }
3839
3840  /**
3841   * Creates a pre-split table for load testing. If the table already exists,
3842   * logs a warning and continues.
3843   * @return the number of regions the table was split into
3844   */
3845  public static int createPreSplitLoadTestTable(Configuration conf,
3846      TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
3847      int numRegionsPerServer) throws IOException {
3848    return createPreSplitLoadTestTable(conf, desc, hcds,
3849      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
3850  }
3851
3852  /**
3853   * Creates a pre-split table for load testing. If the table already exists,
3854   * logs a warning and continues.
3855   * @return the number of regions the table was split into
3856   */
3857  public static int createPreSplitLoadTestTable(Configuration conf,
3858      TableDescriptor td, ColumnFamilyDescriptor[] cds,
3859      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
3860    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3861    for (ColumnFamilyDescriptor cd : cds) {
3862      if (!td.hasColumnFamily(cd.getName())) {
3863        builder.setColumnFamily(cd);
3864      }
3865    }
3866    td = builder.build();
3867    int totalNumberOfRegions = 0;
3868    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3869    Admin admin = unmanagedConnection.getAdmin();
3870
3871    try {
3872      // create a table a pre-splits regions.
3873      // The number of splits is set as:
3874      //    region servers * regions per region server).
3875      int numberOfServers =
3876          admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
3877              .size();
3878      if (numberOfServers == 0) {
3879        throw new IllegalStateException("No live regionservers");
3880      }
3881
3882      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3883      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3884          "pre-splitting table into " + totalNumberOfRegions + " regions " +
3885          "(regions per server: " + numRegionsPerServer + ")");
3886
3887      byte[][] splits = splitter.split(
3888          totalNumberOfRegions);
3889
3890      admin.createTable(td, splits);
3891    } catch (MasterNotRunningException e) {
3892      LOG.error("Master not running", e);
3893      throw new IOException(e);
3894    } catch (TableExistsException e) {
3895      LOG.warn("Table " + td.getTableName() +
3896          " already exists, continuing");
3897    } finally {
3898      admin.close();
3899      unmanagedConnection.close();
3900    }
3901    return totalNumberOfRegions;
3902  }
3903
3904  public static int getMetaRSPort(Connection connection) throws IOException {
3905    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3906      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3907    }
3908  }
3909
3910  /**
3911   *  Due to async racing issue, a region may not be in
3912   *  the online region list of a region server yet, after
3913   *  the assignment znode is deleted and the new assignment
3914   *  is recorded in master.
3915   */
3916  public void assertRegionOnServer(
3917      final RegionInfo hri, final ServerName server,
3918      final long timeout) throws IOException, InterruptedException {
3919    long timeoutTime = System.currentTimeMillis() + timeout;
3920    while (true) {
3921      List<RegionInfo> regions = getAdmin().getRegions(server);
3922      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3923      long now = System.currentTimeMillis();
3924      if (now > timeoutTime) break;
3925      Thread.sleep(10);
3926    }
3927    fail("Could not find region " + hri.getRegionNameAsString()
3928      + " on server " + server);
3929  }
3930
3931  /**
3932   * Check to make sure the region is open on the specified
3933   * region server, but not on any other one.
3934   */
3935  public void assertRegionOnlyOnServer(
3936      final RegionInfo hri, final ServerName server,
3937      final long timeout) throws IOException, InterruptedException {
3938    long timeoutTime = System.currentTimeMillis() + timeout;
3939    while (true) {
3940      List<RegionInfo> regions = getAdmin().getRegions(server);
3941      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3942        List<JVMClusterUtil.RegionServerThread> rsThreads =
3943          getHBaseCluster().getLiveRegionServerThreads();
3944        for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3945          HRegionServer rs = rsThread.getRegionServer();
3946          if (server.equals(rs.getServerName())) {
3947            continue;
3948          }
3949          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3950          for (HRegion r: hrs) {
3951            assertTrue("Region should not be double assigned",
3952              r.getRegionInfo().getRegionId() != hri.getRegionId());
3953          }
3954        }
3955        return; // good, we are happy
3956      }
3957      long now = System.currentTimeMillis();
3958      if (now > timeoutTime) break;
3959      Thread.sleep(10);
3960    }
3961    fail("Could not find region " + hri.getRegionNameAsString()
3962      + " on server " + server);
3963  }
3964
3965  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd)
3966      throws IOException {
3967    TableDescriptor td
3968        = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
3969            .setColumnFamily(cd)
3970            .build();
3971    HRegionInfo info =
3972        new HRegionInfo(TableName.valueOf(tableName), null, null, false);
3973    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3974  }
3975
3976  public void setFileSystemURI(String fsURI) {
3977    FS_URI = fsURI;
3978  }
3979
3980  /**
3981   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3982   */
3983  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3984    return new ExplainingPredicate<IOException>() {
3985      @Override
3986      public String explainFailure() throws IOException {
3987        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
3988            .getAssignmentManager().getRegionStates();
3989        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3990      }
3991
3992      @Override
3993      public boolean evaluate() throws IOException {
3994        HMaster master = getMiniHBaseCluster().getMaster();
3995        if (master == null) return false;
3996        AssignmentManager am = master.getAssignmentManager();
3997        if (am == null) return false;
3998        return !am.hasRegionsInTransition();
3999      }
4000    };
4001  }
4002
4003  /**
4004   * Returns a {@link Predicate} for checking that table is enabled
4005   */
4006  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4007    return new ExplainingPredicate<IOException>() {
4008      @Override
4009      public String explainFailure() throws IOException {
4010        return explainTableState(tableName, TableState.State.ENABLED);
4011      }
4012
4013      @Override
4014      public boolean evaluate() throws IOException {
4015        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
4016      }
4017    };
4018  }
4019
4020  /**
4021   * Returns a {@link Predicate} for checking that table is enabled
4022   */
4023  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4024    return new ExplainingPredicate<IOException>() {
4025      @Override
4026      public String explainFailure() throws IOException {
4027        return explainTableState(tableName, TableState.State.DISABLED);
4028      }
4029
4030      @Override
4031      public boolean evaluate() throws IOException {
4032        return getAdmin().isTableDisabled(tableName);
4033      }
4034    };
4035  }
4036
4037  /**
4038   * Returns a {@link Predicate} for checking that table is enabled
4039   */
4040  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4041    return new ExplainingPredicate<IOException>() {
4042      @Override
4043      public String explainFailure() throws IOException {
4044        return explainTableAvailability(tableName);
4045      }
4046
4047      @Override
4048      public boolean evaluate() throws IOException {
4049        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
4050        if (tableAvailable) {
4051          try (Table table = getConnection().getTable(tableName)) {
4052            TableDescriptor htd = table.getDescriptor();
4053            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
4054                .getAllRegionLocations()) {
4055              Scan scan = new Scan().withStartRow(loc.getRegionInfo().getStartKey())
4056                  .withStopRow(loc.getRegionInfo().getEndKey()).setOneRowLimit()
4057                  .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
4058              for (byte[] family : htd.getColumnFamilyNames()) {
4059                scan.addFamily(family);
4060              }
4061              try (ResultScanner scanner = table.getScanner(scan)) {
4062                scanner.next();
4063              }
4064            }
4065          }
4066        }
4067        return tableAvailable;
4068      }
4069    };
4070  }
4071
4072  /**
4073   * Wait until no regions in transition.
4074   * @param timeout How long to wait.
4075   * @throws IOException
4076   */
4077  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
4078    waitFor(timeout, predicateNoRegionsInTransition());
4079  }
4080
4081  /**
4082   * Wait until no regions in transition. (time limit 15min)
4083   * @throws IOException
4084   */
4085  public void waitUntilNoRegionsInTransition() throws IOException {
4086    waitUntilNoRegionsInTransition(15 * 60000);
4087  }
4088
4089  /**
4090   * Wait until labels is ready in VisibilityLabelsCache.
4091   * @param timeoutMillis
4092   * @param labels
4093   */
4094  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4095    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4096    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4097
4098      @Override
4099      public boolean evaluate() {
4100        for (String label : labels) {
4101          if (labelsCache.getLabelOrdinal(label) == 0) {
4102            return false;
4103          }
4104        }
4105        return true;
4106      }
4107
4108      @Override
4109      public String explainFailure() {
4110        for (String label : labels) {
4111          if (labelsCache.getLabelOrdinal(label) == 0) {
4112            return label + " is not available yet";
4113          }
4114        }
4115        return "";
4116      }
4117    });
4118  }
4119
4120  /**
4121   * Create a set of column descriptors with the combination of compression,
4122   * encoding, bloom codecs available.
4123   * @return the list of column descriptors
4124   */
4125  public static List<HColumnDescriptor> generateColumnDescriptors() {
4126    return generateColumnDescriptors("");
4127  }
4128
4129  /**
4130   * Create a set of column descriptors with the combination of compression,
4131   * encoding, bloom codecs available.
4132   * @param prefix family names prefix
4133   * @return the list of column descriptors
4134   */
4135  public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
4136    List<HColumnDescriptor> htds = new ArrayList<>();
4137    long familyId = 0;
4138    for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4139      for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4140        for (BloomType bloomType: BloomType.values()) {
4141          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4142          HColumnDescriptor htd = new HColumnDescriptor(name);
4143          htd.setCompressionType(compressionType);
4144          htd.setDataBlockEncoding(encodingType);
4145          htd.setBloomFilterType(bloomType);
4146          htds.add(htd);
4147          familyId++;
4148        }
4149      }
4150    }
4151    return htds;
4152  }
4153
4154  /**
4155   * Get supported compression algorithms.
4156   * @return supported compression algorithms.
4157   */
4158  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4159    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4160    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4161    for (String algoName : allAlgos) {
4162      try {
4163        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4164        algo.getCompressor();
4165        supportedAlgos.add(algo);
4166      } catch (Throwable t) {
4167        // this algo is not available
4168      }
4169    }
4170    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4171  }
4172
4173  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4174    Scan scan = new Scan(row);
4175    scan.setSmall(true);
4176    scan.setCaching(1);
4177    scan.setReversed(true);
4178    scan.addFamily(family);
4179    try (RegionScanner scanner = r.getScanner(scan)) {
4180      List<Cell> cells = new ArrayList<>(1);
4181      scanner.next(cells);
4182      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4183        return null;
4184      }
4185      return Result.create(cells);
4186    }
4187  }
4188
4189  private boolean isTargetTable(final byte[] inRow, Cell c) {
4190    String inputRowString = Bytes.toString(inRow);
4191    int i = inputRowString.indexOf(HConstants.DELIMITER);
4192    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4193    int o = outputRowString.indexOf(HConstants.DELIMITER);
4194    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4195  }
4196
4197  /**
4198   * Sets up {@link MiniKdc} for testing security.
4199   * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4200   * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4201   */
4202  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4203    Properties conf = MiniKdc.createConf();
4204    conf.put(MiniKdc.DEBUG, true);
4205    MiniKdc kdc = null;
4206    File dir = null;
4207    // There is time lag between selecting a port and trying to bind with it. It's possible that
4208    // another service captures the port in between which'll result in BindException.
4209    boolean bindException;
4210    int numTries = 0;
4211    do {
4212      try {
4213        bindException = false;
4214        dir = new File(getDataTestDir("kdc").toUri().getPath());
4215        kdc = new MiniKdc(conf, dir);
4216        kdc.start();
4217      } catch (BindException e) {
4218        FileUtils.deleteDirectory(dir);  // clean directory
4219        numTries++;
4220        if (numTries == 3) {
4221          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4222          throw e;
4223        }
4224        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4225        bindException = true;
4226      }
4227    } while (bindException);
4228    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4229    return kdc;
4230  }
4231
4232  public int getNumHFiles(final TableName tableName, final byte[] family) {
4233    int numHFiles = 0;
4234    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4235      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
4236                                    family);
4237    }
4238    return numHFiles;
4239  }
4240
4241  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4242                               final byte[] family) {
4243    int numHFiles = 0;
4244    for (Region region : rs.getRegions(tableName)) {
4245      numHFiles += region.getStore(family).getStorefilesCount();
4246    }
4247    return numHFiles;
4248  }
4249}