001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.UncheckedIOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.Modifier;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.BlockCache;
096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.ipc.RpcServerInterface;
099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
100import org.apache.hadoop.hbase.master.HMaster;
101import org.apache.hadoop.hbase.master.RegionState;
102import org.apache.hadoop.hbase.master.ServerManager;
103import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
105import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
106import org.apache.hadoop.hbase.master.assignment.RegionStates;
107import org.apache.hadoop.hbase.mob.MobFileCache;
108import org.apache.hadoop.hbase.regionserver.BloomType;
109import org.apache.hadoop.hbase.regionserver.ChunkCreator;
110import org.apache.hadoop.hbase.regionserver.HRegion;
111import org.apache.hadoop.hbase.regionserver.HRegionServer;
112import org.apache.hadoop.hbase.regionserver.HStore;
113import org.apache.hadoop.hbase.regionserver.InternalScanner;
114import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
115import org.apache.hadoop.hbase.regionserver.Region;
116import org.apache.hadoop.hbase.regionserver.RegionScanner;
117import org.apache.hadoop.hbase.regionserver.RegionServerServices;
118import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
119import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
120import org.apache.hadoop.hbase.security.User;
121import org.apache.hadoop.hbase.security.UserProvider;
122import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
123import org.apache.hadoop.hbase.util.Bytes;
124import org.apache.hadoop.hbase.util.CommonFSUtils;
125import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
126import org.apache.hadoop.hbase.util.FSUtils;
127import org.apache.hadoop.hbase.util.JVM;
128import org.apache.hadoop.hbase.util.JVMClusterUtil;
129import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
130import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
131import org.apache.hadoop.hbase.util.Pair;
132import org.apache.hadoop.hbase.util.ReflectionUtils;
133import org.apache.hadoop.hbase.util.RegionSplitter;
134import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
135import org.apache.hadoop.hbase.util.RetryCounter;
136import org.apache.hadoop.hbase.util.Threads;
137import org.apache.hadoop.hbase.wal.WAL;
138import org.apache.hadoop.hbase.wal.WALFactory;
139import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
140import org.apache.hadoop.hbase.zookeeper.ZKConfig;
141import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
142import org.apache.hadoop.hdfs.DFSClient;
143import org.apache.hadoop.hdfs.DistributedFileSystem;
144import org.apache.hadoop.hdfs.MiniDFSCluster;
145import org.apache.hadoop.hdfs.server.datanode.DataNode;
146import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
147import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
148import org.apache.hadoop.mapred.JobConf;
149import org.apache.hadoop.mapred.MiniMRCluster;
150import org.apache.hadoop.mapred.TaskLog;
151import org.apache.hadoop.minikdc.MiniKdc;
152import org.apache.yetus.audience.InterfaceAudience;
153import org.apache.yetus.audience.InterfaceStability;
154import org.apache.zookeeper.WatchedEvent;
155import org.apache.zookeeper.ZooKeeper;
156import org.apache.zookeeper.ZooKeeper.States;
157
158import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
159
160import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
161
162/**
163 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
164 * functionality. Create an instance and keep it around testing HBase.
165 * <p/>
166 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
167 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
168 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
169 * cluster.
170 * <p/>
171 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
172 * <p/>
173 * It does not set logging levels.
174 * <p/>
175 * In the configuration properties, default values for master-info-port and region-server-port are
176 * overridden such that a random port will be assigned (thus avoiding port contention if another
177 * local HBase instance is already running).
178 * <p/>
179 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
180 * setting it to true.
181 */
182@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
183@InterfaceStability.Evolving
184public class HBaseTestingUtil extends HBaseZKTestingUtil {
185
186  /**
187   * System property key to get test directory value. Name is as it is because mini dfs has
188   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
189   * used in mini dfs.
190   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
191   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
192   */
193  @Deprecated
194  private static final String TEST_DIRECTORY_KEY = "test.build.data";
195
196  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
197  /**
198   * The default number of regions per regionserver when creating a pre-split table.
199   */
200  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
201
202  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
203  public static final boolean PRESPLIT_TEST_TABLE = true;
204
205  private MiniDFSCluster dfsCluster = null;
206  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
207
208  private volatile HBaseClusterInterface hbaseCluster = null;
209  private MiniMRCluster mrCluster = null;
210
211  /** If there is a mini cluster running for this testing utility instance. */
212  private volatile boolean miniClusterRunning;
213
214  private String hadoopLogDir;
215
216  /**
217   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
218   */
219  private Path dataTestDirOnTestFS = null;
220
221  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
222
223  /** Filesystem URI used for map-reduce mini-cluster setup */
224  private static String FS_URI;
225
226  /** This is for unit tests parameterized with a single boolean. */
227  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
228
229  /**
230   * Checks to see if a specific port is available.
231   * @param port the port number to check for availability
232   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
233   */
234  public static boolean available(int port) {
235    ServerSocket ss = null;
236    DatagramSocket ds = null;
237    try {
238      ss = new ServerSocket(port);
239      ss.setReuseAddress(true);
240      ds = new DatagramSocket(port);
241      ds.setReuseAddress(true);
242      return true;
243    } catch (IOException e) {
244      // Do nothing
245    } finally {
246      if (ds != null) {
247        ds.close();
248      }
249
250      if (ss != null) {
251        try {
252          ss.close();
253        } catch (IOException e) {
254          /* should not be thrown */
255        }
256      }
257    }
258
259    return false;
260  }
261
262  /**
263   * Create all combinations of Bloom filters and compression algorithms for testing.
264   */
265  private static List<Object[]> bloomAndCompressionCombinations() {
266    List<Object[]> configurations = new ArrayList<>();
267    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
268      for (BloomType bloomType : BloomType.values()) {
269        configurations.add(new Object[] { comprAlgo, bloomType });
270      }
271    }
272    return Collections.unmodifiableList(configurations);
273  }
274
275  /**
276   * Create combination of memstoreTS and tags
277   */
278  private static List<Object[]> memStoreTSAndTagsCombination() {
279    List<Object[]> configurations = new ArrayList<>();
280    configurations.add(new Object[] { false, false });
281    configurations.add(new Object[] { false, true });
282    configurations.add(new Object[] { true, false });
283    configurations.add(new Object[] { true, true });
284    return Collections.unmodifiableList(configurations);
285  }
286
287  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
288    List<Object[]> configurations = new ArrayList<>();
289    configurations.add(new Object[] { false, false, true });
290    configurations.add(new Object[] { false, false, false });
291    configurations.add(new Object[] { false, true, true });
292    configurations.add(new Object[] { false, true, false });
293    configurations.add(new Object[] { true, false, true });
294    configurations.add(new Object[] { true, false, false });
295    configurations.add(new Object[] { true, true, true });
296    configurations.add(new Object[] { true, true, false });
297    return Collections.unmodifiableList(configurations);
298  }
299
300  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
301    bloomAndCompressionCombinations();
302
303  /**
304   * <p>
305   * Create an HBaseTestingUtility using a default configuration.
306   * <p>
307   * Initially, all tmp files are written to a local test data directory. Once
308   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
309   * data will be written to the DFS directory instead.
310   */
311  public HBaseTestingUtil() {
312    this(HBaseConfiguration.create());
313  }
314
315  /**
316   * <p>
317   * Create an HBaseTestingUtility using a given configuration.
318   * <p>
319   * Initially, all tmp files are written to a local test data directory. Once
320   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
321   * data will be written to the DFS directory instead.
322   * @param conf The configuration to use for further operations
323   */
324  public HBaseTestingUtil(@Nullable Configuration conf) {
325    super(conf);
326
327    // a hbase checksum verification failure will cause unit tests to fail
328    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
329
330    // Save this for when setting default file:// breaks things
331    if (this.conf.get("fs.defaultFS") != null) {
332      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
333    }
334    if (this.conf.get(HConstants.HBASE_DIR) != null) {
335      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
336    }
337    // Every cluster is a local cluster until we start DFS
338    // Note that conf could be null, but this.conf will not be
339    String dataTestDir = getDataTestDir().toString();
340    this.conf.set("fs.defaultFS", "file:///");
341    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
342    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
343    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
344    // If the value for random ports isn't set set it to true, thus making
345    // tests opt-out for random port assignment
346    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
347      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
348  }
349
350  /**
351   * Close both the region {@code r} and it's underlying WAL. For use in tests.
352   */
353  public static void closeRegionAndWAL(final Region r) throws IOException {
354    closeRegionAndWAL((HRegion) r);
355  }
356
357  /**
358   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
359   */
360  public static void closeRegionAndWAL(final HRegion r) throws IOException {
361    if (r == null) return;
362    r.close();
363    if (r.getWAL() == null) return;
364    r.getWAL().close();
365  }
366
367  /**
368   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
369   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
370   * by the Configuration. If say, a Connection was being used against a cluster that had been
371   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
372   * Rather than use the return direct, its usually best to make a copy and use that. Do
373   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
374   * @return Instance of Configuration.
375   */
376  @Override
377  public Configuration getConfiguration() {
378    return super.getConfiguration();
379  }
380
381  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
382    this.hbaseCluster = hbaseCluster;
383  }
384
385  /**
386   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
387   * have many concurrent tests running if we need to. It needs to amend the
388   * {@link #TEST_DIRECTORY_KEY} System property, as it's what minidfscluster bases it data dir on.
389   * Moding a System property is not the way to do concurrent instances -- another instance could
390   * grab the temporary value unintentionally -- but not anything can do about it at moment; single
391   * instance only is how the minidfscluster works. We also create the underlying directory names
392   * for hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values in the
393   * conf, and as a system property for hadoop.tmp.dir (We do not create them!).
394   * @return The calculated data test build directory, if newly-created.
395   */
396  @Override
397  protected Path setupDataTestDir() {
398    Path testPath = super.setupDataTestDir();
399    if (null == testPath) {
400      return null;
401    }
402
403    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
404
405    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
406    // we want our own value to ensure uniqueness on the same machine
407    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
408
409    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
410    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
411    return testPath;
412  }
413
414  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
415
416    String sysValue = System.getProperty(propertyName);
417
418    if (sysValue != null) {
419      // There is already a value set. So we do nothing but hope
420      // that there will be no conflicts
421      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
422        + " so I do NOT create it in " + parent);
423      String confValue = conf.get(propertyName);
424      if (confValue != null && !confValue.endsWith(sysValue)) {
425        LOG.warn(propertyName + " property value differs in configuration and system: "
426          + "Configuration=" + confValue + " while System=" + sysValue
427          + " Erasing configuration value by system value.");
428      }
429      conf.set(propertyName, sysValue);
430    } else {
431      // Ok, it's not set, so we create it as a subdirectory
432      createSubDir(propertyName, parent, subDirName);
433      System.setProperty(propertyName, conf.get(propertyName));
434    }
435  }
436
437  /**
438   * @return Where to write test data on the test filesystem; Returns working directory for the test
439   *         filesystem by default
440   * @see #setupDataTestDirOnTestFS()
441   * @see #getTestFileSystem()
442   */
443  private Path getBaseTestDirOnTestFS() throws IOException {
444    FileSystem fs = getTestFileSystem();
445    return new Path(fs.getWorkingDirectory(), "test-data");
446  }
447
448  /**
449   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
450   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
451   * on it.
452   * @return a unique path in the test filesystem
453   */
454  public Path getDataTestDirOnTestFS() throws IOException {
455    if (dataTestDirOnTestFS == null) {
456      setupDataTestDirOnTestFS();
457    }
458
459    return dataTestDirOnTestFS;
460  }
461
462  /**
463   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
464   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
465   * on it.
466   * @return a unique path in the test filesystem
467   * @param subdirName name of the subdir to create under the base test dir
468   */
469  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
470    return new Path(getDataTestDirOnTestFS(), subdirName);
471  }
472
473  /**
474   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
475   * setup.
476   */
477  private void setupDataTestDirOnTestFS() throws IOException {
478    if (dataTestDirOnTestFS != null) {
479      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
480      return;
481    }
482    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
483  }
484
485  /**
486   * Sets up a new path in test filesystem to be used by tests.
487   */
488  private Path getNewDataTestDirOnTestFS() throws IOException {
489    // The file system can be either local, mini dfs, or if the configuration
490    // is supplied externally, it can be an external cluster FS. If it is a local
491    // file system, the tests should use getBaseTestDir, otherwise, we can use
492    // the working directory, and create a unique sub dir there
493    FileSystem fs = getTestFileSystem();
494    Path newDataTestDir;
495    String randomStr = getRandomUUID().toString();
496    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
497      newDataTestDir = new Path(getDataTestDir(), randomStr);
498      File dataTestDir = new File(newDataTestDir.toString());
499      if (deleteOnExit()) dataTestDir.deleteOnExit();
500    } else {
501      Path base = getBaseTestDirOnTestFS();
502      newDataTestDir = new Path(base, randomStr);
503      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
504    }
505    return newDataTestDir;
506  }
507
508  /**
509   * Cleans the test data directory on the test filesystem.
510   * @return True if we removed the test dirs
511   */
512  public boolean cleanupDataTestDirOnTestFS() throws IOException {
513    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
514    if (ret) {
515      dataTestDirOnTestFS = null;
516    }
517    return ret;
518  }
519
520  /**
521   * Cleans a subdirectory under the test data directory on the test filesystem.
522   * @return True if we removed child
523   */
524  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
525    Path cpath = getDataTestDirOnTestFS(subdirName);
526    return getTestFileSystem().delete(cpath, true);
527  }
528
529  /**
530   * Start a minidfscluster.
531   * @param servers How many DNs to start.
532   * @see #shutdownMiniDFSCluster()
533   * @return The mini dfs cluster created.
534   */
535  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
536    return startMiniDFSCluster(servers, null);
537  }
538
539  /**
540   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
541   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
542   * instances of the datanodes will have the same host name.
543   * @param hosts hostnames DNs to run on.
544   * @see #shutdownMiniDFSCluster()
545   * @return The mini dfs cluster created.
546   */
547  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
548    if (hosts != null && hosts.length != 0) {
549      return startMiniDFSCluster(hosts.length, hosts);
550    } else {
551      return startMiniDFSCluster(1, null);
552    }
553  }
554
555  /**
556   * Start a minidfscluster. Can only create one.
557   * @param servers How many DNs to start.
558   * @param hosts   hostnames DNs to run on.
559   * @see #shutdownMiniDFSCluster()
560   * @return The mini dfs cluster created.
561   */
562  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
563    return startMiniDFSCluster(servers, null, hosts);
564  }
565
566  private void setFs() throws IOException {
567    if (this.dfsCluster == null) {
568      LOG.info("Skipping setting fs because dfsCluster is null");
569      return;
570    }
571    FileSystem fs = this.dfsCluster.getFileSystem();
572    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
573
574    // re-enable this check with dfs
575    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
576  }
577
578  // Workaround to avoid IllegalThreadStateException
579  // See HBASE-27148 for more details
580  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
581
582    private volatile boolean stopped = false;
583
584    private final MiniDFSCluster cluster;
585
586    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
587      super("FsDatasetAsyncDiskServiceFixer");
588      setDaemon(true);
589      this.cluster = cluster;
590    }
591
592    @Override
593    public void run() {
594      while (!stopped) {
595        try {
596          Thread.sleep(30000);
597        } catch (InterruptedException e) {
598          Thread.currentThread().interrupt();
599          continue;
600        }
601        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
602        // timeout of the thread pool executor is 60 seconds by default.
603        try {
604          for (DataNode dn : cluster.getDataNodes()) {
605            FsDatasetSpi<?> dataset = dn.getFSDataset();
606            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
607            service.setAccessible(true);
608            Object asyncDiskService = service.get(dataset);
609            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
610            group.setAccessible(true);
611            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
612            if (threadGroup.isDaemon()) {
613              threadGroup.setDaemon(false);
614            }
615          }
616        } catch (NoSuchFieldException e) {
617          LOG.debug("NoSuchFieldException: " + e.getMessage()
618            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
619            + "See HBASE-27595 for details.");
620        } catch (Exception e) {
621          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
622        }
623      }
624    }
625
626    void shutdown() {
627      stopped = true;
628      interrupt();
629    }
630  }
631
632  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
633    throws Exception {
634    createDirsAndSetProperties();
635    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
636
637    this.dfsCluster =
638      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
639    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
640    this.dfsClusterFixer.start();
641    // Set this just-started cluster as our filesystem.
642    setFs();
643
644    // Wait for the cluster to be totally up
645    this.dfsCluster.waitClusterUp();
646
647    // reset the test directory for test file system
648    dataTestDirOnTestFS = null;
649    String dataTestDir = getDataTestDir().toString();
650    conf.set(HConstants.HBASE_DIR, dataTestDir);
651    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
652
653    return this.dfsCluster;
654  }
655
656  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
657    createDirsAndSetProperties();
658    dfsCluster =
659      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
660    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
661    this.dfsClusterFixer.start();
662    return dfsCluster;
663  }
664
665  /**
666   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
667   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
668   * the conf.
669   *
670   * <pre>
671   * Configuration conf = TEST_UTIL.getConfiguration();
672   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
673   *   Map.Entry&lt;String, String&gt; e = i.next();
674   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
675   * }
676   * </pre>
677   */
678  private void createDirsAndSetProperties() throws IOException {
679    setupClusterTestDir();
680    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
681    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
682    createDirAndSetProperty("test.cache.data");
683    createDirAndSetProperty("hadoop.tmp.dir");
684    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
685    createDirAndSetProperty("mapreduce.cluster.local.dir");
686    createDirAndSetProperty("mapreduce.cluster.temp.dir");
687    enableShortCircuit();
688
689    Path root = getDataTestDirOnTestFS("hadoop");
690    conf.set(MapreduceTestingShim.getMROutputDirProp(),
691      new Path(root, "mapred-output-dir").toString());
692    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
693    conf.set("mapreduce.jobtracker.staging.root.dir",
694      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
695    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
696    conf.set("yarn.app.mapreduce.am.staging-dir",
697      new Path(root, "mapreduce-am-staging-root-dir").toString());
698
699    // Frustrate yarn's and hdfs's attempts at writing /tmp.
700    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
701    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
702    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
703    createDirAndSetProperty("yarn.nodemanager.log-dirs");
704    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
705    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
706    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
707    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
708    createDirAndSetProperty("dfs.journalnode.edits.dir");
709    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
710    createDirAndSetProperty("nfs.dump.dir");
711    createDirAndSetProperty("java.io.tmpdir");
712    createDirAndSetProperty("dfs.journalnode.edits.dir");
713    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
714    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
715  }
716
717  /**
718   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
719   * Default to false.
720   */
721  public boolean isNewVersionBehaviorEnabled() {
722    final String propName = "hbase.tests.new.version.behavior";
723    String v = System.getProperty(propName);
724    if (v != null) {
725      return Boolean.parseBoolean(v);
726    }
727    return false;
728  }
729
730  /**
731   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
732   * allows to specify this parameter on the command line. If not set, default is true.
733   */
734  public boolean isReadShortCircuitOn() {
735    final String propName = "hbase.tests.use.shortcircuit.reads";
736    String readOnProp = System.getProperty(propName);
737    if (readOnProp != null) {
738      return Boolean.parseBoolean(readOnProp);
739    } else {
740      return conf.getBoolean(propName, false);
741    }
742  }
743
744  /**
745   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
746   * including skipping the hdfs checksum checks.
747   */
748  private void enableShortCircuit() {
749    if (isReadShortCircuitOn()) {
750      String curUser = System.getProperty("user.name");
751      LOG.info("read short circuit is ON for user " + curUser);
752      // read short circuit, for hdfs
753      conf.set("dfs.block.local-path-access.user", curUser);
754      // read short circuit, for hbase
755      conf.setBoolean("dfs.client.read.shortcircuit", true);
756      // Skip checking checksum, for the hdfs client and the datanode
757      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
758    } else {
759      LOG.info("read short circuit is OFF");
760    }
761  }
762
763  private String createDirAndSetProperty(final String property) {
764    return createDirAndSetProperty(property, property);
765  }
766
767  private String createDirAndSetProperty(final String relPath, String property) {
768    String path = getDataTestDir(relPath).toString();
769    System.setProperty(property, path);
770    conf.set(property, path);
771    new File(path).mkdirs();
772    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
773    return path;
774  }
775
776  /**
777   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
778   */
779  public void shutdownMiniDFSCluster() throws IOException {
780    if (this.dfsCluster != null) {
781      // The below throws an exception per dn, AsynchronousCloseException.
782      this.dfsCluster.shutdown();
783      dfsCluster = null;
784      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
785      // have a fixer
786      if (dfsClusterFixer != null) {
787        this.dfsClusterFixer.shutdown();
788        dfsClusterFixer = null;
789      }
790      dataTestDirOnTestFS = null;
791      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
792    }
793  }
794
795  /**
796   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
797   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
798   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
799   * @see #startMiniCluster(StartTestingClusterOption option)
800   * @see #shutdownMiniDFSCluster()
801   */
802  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
803    StartTestingClusterOption option = StartTestingClusterOption.builder()
804      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
805    return startMiniCluster(option);
806  }
807
808  /**
809   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
810   * value can be found in {@link StartTestingClusterOption.Builder}.
811   * @see #startMiniCluster(StartTestingClusterOption option)
812   * @see #shutdownMiniDFSCluster()
813   */
814  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
815    return startMiniCluster(StartTestingClusterOption.builder().build());
816  }
817
818  /**
819   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
820   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
821   * under System property test.build.data, to be cleaned up on exit.
822   * @see #shutdownMiniDFSCluster()
823   */
824  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
825    throws Exception {
826    LOG.info("Starting up minicluster with option: {}", option);
827
828    // If we already put up a cluster, fail.
829    if (miniClusterRunning) {
830      throw new IllegalStateException("A mini-cluster is already running");
831    }
832    miniClusterRunning = true;
833
834    setupClusterTestDir();
835    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
836
837    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
838    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
839    if (dfsCluster == null) {
840      LOG.info("STARTING DFS");
841      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
842    } else {
843      LOG.info("NOT STARTING DFS");
844    }
845
846    // Start up a zk cluster.
847    if (getZkCluster() == null) {
848      startMiniZKCluster(option.getNumZkServers());
849    }
850
851    // Start the MiniHBaseCluster
852    return startMiniHBaseCluster(option);
853  }
854
855  /**
856   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
857   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
858   * @return Reference to the hbase mini hbase cluster.
859   * @see #startMiniCluster(StartTestingClusterOption)
860   * @see #shutdownMiniHBaseCluster()
861   */
862  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
863    throws IOException, InterruptedException {
864    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
865    createRootDir(option.isCreateRootDir());
866    if (option.isCreateWALDir()) {
867      createWALRootDir();
868    }
869    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
870    // for tests that do not read hbase-defaults.xml
871    setHBaseFsTmpDir();
872
873    // These settings will make the server waits until this exact number of
874    // regions servers are connected.
875    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
876      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
877    }
878    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
879      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
880    }
881
882    Configuration c = new Configuration(this.conf);
883    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
884      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
885      option.getMasterClass(), option.getRsClass());
886    // Populate the master address configuration from mini cluster configuration.
887    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
888    // Don't leave here till we've done a successful scan of the hbase:meta
889    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
890      ResultScanner s = t.getScanner(new Scan())) {
891      for (;;) {
892        if (s.next() == null) {
893          break;
894        }
895      }
896    }
897
898    getAdmin(); // create immediately the hbaseAdmin
899    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
900
901    return (SingleProcessHBaseCluster) hbaseCluster;
902  }
903
904  /**
905   * Starts up mini hbase cluster using default options. Default options can be found in
906   * {@link StartTestingClusterOption.Builder}.
907   * @see #startMiniHBaseCluster(StartTestingClusterOption)
908   * @see #shutdownMiniHBaseCluster()
909   */
910  public SingleProcessHBaseCluster startMiniHBaseCluster()
911    throws IOException, InterruptedException {
912    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
913  }
914
915  /**
916   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
917   * {@link #startMiniCluster()}. All other options will use default values, defined in
918   * {@link StartTestingClusterOption.Builder}.
919   * @param numMasters       Master node number.
920   * @param numRegionServers Number of region servers.
921   * @return The mini HBase cluster created.
922   * @see #shutdownMiniHBaseCluster()
923   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
924   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
925   * @see #startMiniHBaseCluster(StartTestingClusterOption)
926   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
927   */
928  @Deprecated
929  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
930    throws IOException, InterruptedException {
931    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
932      .numRegionServers(numRegionServers).build();
933    return startMiniHBaseCluster(option);
934  }
935
936  /**
937   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
938   * {@link #startMiniCluster()}. All other options will use default values, defined in
939   * {@link StartTestingClusterOption.Builder}.
940   * @param numMasters       Master node number.
941   * @param numRegionServers Number of region servers.
942   * @param rsPorts          Ports that RegionServer should use.
943   * @return The mini HBase cluster created.
944   * @see #shutdownMiniHBaseCluster()
945   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
946   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
947   * @see #startMiniHBaseCluster(StartTestingClusterOption)
948   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
949   */
950  @Deprecated
951  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
952    List<Integer> rsPorts) throws IOException, InterruptedException {
953    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
954      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
955    return startMiniHBaseCluster(option);
956  }
957
958  /**
959   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
960   * {@link #startMiniCluster()}. All other options will use default values, defined in
961   * {@link StartTestingClusterOption.Builder}.
962   * @param numMasters       Master node number.
963   * @param numRegionServers Number of region servers.
964   * @param rsPorts          Ports that RegionServer should use.
965   * @param masterClass      The class to use as HMaster, or null for default.
966   * @param rsClass          The class to use as HRegionServer, or null for default.
967   * @param createRootDir    Whether to create a new root or data directory path.
968   * @param createWALDir     Whether to create a new WAL directory.
969   * @return The mini HBase cluster created.
970   * @see #shutdownMiniHBaseCluster()
971   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
972   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
973   * @see #startMiniHBaseCluster(StartTestingClusterOption)
974   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
975   */
976  @Deprecated
977  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
978    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
979    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
980    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
981    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
982      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
983      .createRootDir(createRootDir).createWALDir(createWALDir).build();
984    return startMiniHBaseCluster(option);
985  }
986
987  /**
988   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
989   * want to keep dfs/zk up and just stop/start hbase.
990   * @param servers number of region servers
991   */
992  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
993    this.restartHBaseCluster(servers, null);
994  }
995
996  public void restartHBaseCluster(int servers, List<Integer> ports)
997    throws IOException, InterruptedException {
998    StartTestingClusterOption option =
999      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1000    restartHBaseCluster(option);
1001    invalidateConnection();
1002  }
1003
1004  public void restartHBaseCluster(StartTestingClusterOption option)
1005    throws IOException, InterruptedException {
1006    closeConnection();
1007    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1008      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1009      option.getMasterClass(), option.getRsClass());
1010    // Don't leave here till we've done a successful scan of the hbase:meta
1011    Connection conn = ConnectionFactory.createConnection(this.conf);
1012    Table t = conn.getTable(TableName.META_TABLE_NAME);
1013    ResultScanner s = t.getScanner(new Scan());
1014    while (s.next() != null) {
1015      // do nothing
1016    }
1017    LOG.info("HBase has been restarted");
1018    s.close();
1019    t.close();
1020    conn.close();
1021  }
1022
1023  /**
1024   * Returns current mini hbase cluster. Only has something in it after a call to
1025   * {@link #startMiniCluster()}.
1026   * @see #startMiniCluster()
1027   */
1028  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1029    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1030      return (SingleProcessHBaseCluster) this.hbaseCluster;
1031    }
1032    throw new RuntimeException(
1033      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1034  }
1035
1036  /**
1037   * Stops mini hbase, zk, and hdfs clusters.
1038   * @see #startMiniCluster(int)
1039   */
1040  public void shutdownMiniCluster() throws IOException {
1041    LOG.info("Shutting down minicluster");
1042    shutdownMiniHBaseCluster();
1043    shutdownMiniDFSCluster();
1044    shutdownMiniZKCluster();
1045
1046    cleanupTestDir();
1047    miniClusterRunning = false;
1048    LOG.info("Minicluster is down");
1049  }
1050
1051  /**
1052   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1053   * @throws java.io.IOException in case command is unsuccessful
1054   */
1055  public void shutdownMiniHBaseCluster() throws IOException {
1056    cleanup();
1057    if (this.hbaseCluster != null) {
1058      this.hbaseCluster.shutdown();
1059      // Wait till hbase is down before going on to shutdown zk.
1060      this.hbaseCluster.waitUntilShutDown();
1061      this.hbaseCluster = null;
1062    }
1063    if (zooKeeperWatcher != null) {
1064      zooKeeperWatcher.close();
1065      zooKeeperWatcher = null;
1066    }
1067  }
1068
1069  /**
1070   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1071   * @throws java.io.IOException throws in case command is unsuccessful
1072   */
1073  public void killMiniHBaseCluster() throws IOException {
1074    cleanup();
1075    if (this.hbaseCluster != null) {
1076      getMiniHBaseCluster().killAll();
1077      this.hbaseCluster = null;
1078    }
1079    if (zooKeeperWatcher != null) {
1080      zooKeeperWatcher.close();
1081      zooKeeperWatcher = null;
1082    }
1083  }
1084
1085  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1086  private void cleanup() throws IOException {
1087    closeConnection();
1088    // unset the configuration for MIN and MAX RS to start
1089    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1090    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1091  }
1092
1093  /**
1094   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1095   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1096   * If false, previous path is used. Note: this does not cause the root dir to be created.
1097   * @return Fully qualified path for the default hbase root dir
1098   */
1099  public Path getDefaultRootDirPath(boolean create) throws IOException {
1100    if (!create) {
1101      return getDataTestDirOnTestFS();
1102    } else {
1103      return getNewDataTestDirOnTestFS();
1104    }
1105  }
1106
1107  /**
1108   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1109   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1110   * @return Fully qualified path for the default hbase root dir
1111   */
1112  public Path getDefaultRootDirPath() throws IOException {
1113    return getDefaultRootDirPath(false);
1114  }
1115
1116  /**
1117   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1118   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1119   * startup. You'd only use this method if you were doing manual operation.
1120   * @param create This flag decides whether to get a new root or data directory path or not, if it
1121   *               has been fetched already. Note : Directory will be made irrespective of whether
1122   *               path has been fetched or not. If directory already exists, it will be overwritten
1123   * @return Fully qualified path to hbase root dir
1124   */
1125  public Path createRootDir(boolean create) throws IOException {
1126    FileSystem fs = FileSystem.get(this.conf);
1127    Path hbaseRootdir = getDefaultRootDirPath(create);
1128    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1129    fs.mkdirs(hbaseRootdir);
1130    FSUtils.setVersion(fs, hbaseRootdir);
1131    return hbaseRootdir;
1132  }
1133
1134  /**
1135   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1136   * flag is false.
1137   * @return Fully qualified path to hbase root dir
1138   */
1139  public Path createRootDir() throws IOException {
1140    return createRootDir(false);
1141  }
1142
1143  /**
1144   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1145   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1146   * this method if you were doing manual operation.
1147   * @return Fully qualified path to hbase root dir
1148   */
1149  public Path createWALRootDir() throws IOException {
1150    FileSystem fs = FileSystem.get(this.conf);
1151    Path walDir = getNewDataTestDirOnTestFS();
1152    CommonFSUtils.setWALRootDir(this.conf, walDir);
1153    fs.mkdirs(walDir);
1154    return walDir;
1155  }
1156
1157  private void setHBaseFsTmpDir() throws IOException {
1158    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1159    if (hbaseFsTmpDirInString == null) {
1160      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1161      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1162    } else {
1163      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1164    }
1165  }
1166
1167  /**
1168   * Flushes all caches in the mini hbase cluster
1169   */
1170  public void flush() throws IOException {
1171    getMiniHBaseCluster().flushcache();
1172  }
1173
1174  /**
1175   * Flushes all caches in the mini hbase cluster
1176   */
1177  public void flush(TableName tableName) throws IOException {
1178    getMiniHBaseCluster().flushcache(tableName);
1179  }
1180
1181  /**
1182   * Compact all regions in the mini hbase cluster
1183   */
1184  public void compact(boolean major) throws IOException {
1185    getMiniHBaseCluster().compact(major);
1186  }
1187
1188  /**
1189   * Compact all of a table's reagion in the mini hbase cluster
1190   */
1191  public void compact(TableName tableName, boolean major) throws IOException {
1192    getMiniHBaseCluster().compact(tableName, major);
1193  }
1194
1195  /**
1196   * Create a table.
1197   * @return A Table instance for the created table.
1198   */
1199  public Table createTable(TableName tableName, String family) throws IOException {
1200    return createTable(tableName, new String[] { family });
1201  }
1202
1203  /**
1204   * Create a table.
1205   * @return A Table instance for the created table.
1206   */
1207  public Table createTable(TableName tableName, String[] families) throws IOException {
1208    List<byte[]> fams = new ArrayList<>(families.length);
1209    for (String family : families) {
1210      fams.add(Bytes.toBytes(family));
1211    }
1212    return createTable(tableName, fams.toArray(new byte[0][]));
1213  }
1214
1215  /**
1216   * Create a table.
1217   * @return A Table instance for the created table.
1218   */
1219  public Table createTable(TableName tableName, byte[] family) throws IOException {
1220    return createTable(tableName, new byte[][] { family });
1221  }
1222
1223  /**
1224   * Create a table with multiple regions.
1225   * @return A Table instance for the created table.
1226   */
1227  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1228    throws IOException {
1229    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1230    byte[] startKey = Bytes.toBytes("aaaaa");
1231    byte[] endKey = Bytes.toBytes("zzzzz");
1232    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1233
1234    return createTable(tableName, new byte[][] { family }, splitKeys);
1235  }
1236
1237  /**
1238   * Create a table.
1239   * @return A Table instance for the created table.
1240   */
1241  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1242    return createTable(tableName, families, (byte[][]) null);
1243  }
1244
1245  /**
1246   * Create a table with multiple regions.
1247   * @return A Table instance for the created table.
1248   */
1249  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1250    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1251  }
1252
1253  /**
1254   * Create a table with multiple regions.
1255   * @param replicaCount replica count.
1256   * @return A Table instance for the created table.
1257   */
1258  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1259    throws IOException {
1260    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1261  }
1262
1263  /**
1264   * Create a table.
1265   * @return A Table instance for the created table.
1266   */
1267  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1268    throws IOException {
1269    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1270  }
1271
1272  /**
1273   * Create a table.
1274   * @param tableName    the table name
1275   * @param families     the families
1276   * @param splitKeys    the splitkeys
1277   * @param replicaCount the region replica count
1278   * @return A Table instance for the created table.
1279   * @throws IOException throws IOException
1280   */
1281  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1282    int replicaCount) throws IOException {
1283    return createTable(tableName, families, splitKeys, replicaCount,
1284      new Configuration(getConfiguration()));
1285  }
1286
1287  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1288    byte[] endKey, int numRegions) throws IOException {
1289    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1290
1291    getAdmin().createTable(desc, startKey, endKey, numRegions);
1292    // HBaseAdmin only waits for regions to appear in hbase:meta we
1293    // should wait until they are assigned
1294    waitUntilAllRegionsAssigned(tableName);
1295    return getConnection().getTable(tableName);
1296  }
1297
1298  /**
1299   * Create a table.
1300   * @param c Configuration to use
1301   * @return A Table instance for the created table.
1302   */
1303  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1304    throws IOException {
1305    return createTable(htd, families, null, c);
1306  }
1307
1308  /**
1309   * Create a table.
1310   * @param htd       table descriptor
1311   * @param families  array of column families
1312   * @param splitKeys array of split keys
1313   * @param c         Configuration to use
1314   * @return A Table instance for the created table.
1315   * @throws IOException if getAdmin or createTable fails
1316   */
1317  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1318    Configuration c) throws IOException {
1319    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1320    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1321    // on is interfering.
1322    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1323  }
1324
1325  /**
1326   * Create a table.
1327   * @param htd       table descriptor
1328   * @param families  array of column families
1329   * @param splitKeys array of split keys
1330   * @param type      Bloom type
1331   * @param blockSize block size
1332   * @param c         Configuration to use
1333   * @return A Table instance for the created table.
1334   * @throws IOException if getAdmin or createTable fails
1335   */
1336
1337  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1338    BloomType type, int blockSize, Configuration c) throws IOException {
1339    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1340    for (byte[] family : families) {
1341      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1342        .setBloomFilterType(type).setBlocksize(blockSize);
1343      if (isNewVersionBehaviorEnabled()) {
1344        cfdb.setNewVersionBehavior(true);
1345      }
1346      builder.setColumnFamily(cfdb.build());
1347    }
1348    TableDescriptor td = builder.build();
1349    if (splitKeys != null) {
1350      getAdmin().createTable(td, splitKeys);
1351    } else {
1352      getAdmin().createTable(td);
1353    }
1354    // HBaseAdmin only waits for regions to appear in hbase:meta
1355    // we should wait until they are assigned
1356    waitUntilAllRegionsAssigned(td.getTableName());
1357    return getConnection().getTable(td.getTableName());
1358  }
1359
1360  /**
1361   * Create a table.
1362   * @param htd       table descriptor
1363   * @param splitRows array of split keys
1364   * @return A Table instance for the created table.
1365   */
1366  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1367    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1368    if (isNewVersionBehaviorEnabled()) {
1369      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1370        builder.setColumnFamily(
1371          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1372      }
1373    }
1374    if (splitRows != null) {
1375      getAdmin().createTable(builder.build(), splitRows);
1376    } else {
1377      getAdmin().createTable(builder.build());
1378    }
1379    // HBaseAdmin only waits for regions to appear in hbase:meta
1380    // we should wait until they are assigned
1381    waitUntilAllRegionsAssigned(htd.getTableName());
1382    return getConnection().getTable(htd.getTableName());
1383  }
1384
1385  /**
1386   * Create a table.
1387   * @param tableName    the table name
1388   * @param families     the families
1389   * @param splitKeys    the split keys
1390   * @param replicaCount the replica count
1391   * @param c            Configuration to use
1392   * @return A Table instance for the created table.
1393   */
1394  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1395    int replicaCount, final Configuration c) throws IOException {
1396    TableDescriptor htd =
1397      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1398    return createTable(htd, families, splitKeys, c);
1399  }
1400
1401  /**
1402   * Create a table.
1403   * @return A Table instance for the created table.
1404   */
1405  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1406    return createTable(tableName, new byte[][] { family }, numVersions);
1407  }
1408
1409  /**
1410   * Create a table.
1411   * @return A Table instance for the created table.
1412   */
1413  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1414    throws IOException {
1415    return createTable(tableName, families, numVersions, (byte[][]) null);
1416  }
1417
1418  /**
1419   * Create a table.
1420   * @return A Table instance for the created table.
1421   */
1422  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1423    byte[][] splitKeys) throws IOException {
1424    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1425    for (byte[] family : families) {
1426      ColumnFamilyDescriptorBuilder cfBuilder =
1427        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1428      if (isNewVersionBehaviorEnabled()) {
1429        cfBuilder.setNewVersionBehavior(true);
1430      }
1431      builder.setColumnFamily(cfBuilder.build());
1432    }
1433    if (splitKeys != null) {
1434      getAdmin().createTable(builder.build(), splitKeys);
1435    } else {
1436      getAdmin().createTable(builder.build());
1437    }
1438    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1439    // assigned
1440    waitUntilAllRegionsAssigned(tableName);
1441    return getConnection().getTable(tableName);
1442  }
1443
1444  /**
1445   * Create a table with multiple regions.
1446   * @return A Table instance for the created table.
1447   */
1448  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1449    throws IOException {
1450    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1451  }
1452
1453  /**
1454   * Create a table.
1455   * @return A Table instance for the created table.
1456   */
1457  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1458    throws IOException {
1459    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1460    for (byte[] family : families) {
1461      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1462        .setMaxVersions(numVersions).setBlocksize(blockSize);
1463      if (isNewVersionBehaviorEnabled()) {
1464        cfBuilder.setNewVersionBehavior(true);
1465      }
1466      builder.setColumnFamily(cfBuilder.build());
1467    }
1468    getAdmin().createTable(builder.build());
1469    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1470    // assigned
1471    waitUntilAllRegionsAssigned(tableName);
1472    return getConnection().getTable(tableName);
1473  }
1474
1475  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1476    String cpName) throws IOException {
1477    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1478    for (byte[] family : families) {
1479      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1480        .setMaxVersions(numVersions).setBlocksize(blockSize);
1481      if (isNewVersionBehaviorEnabled()) {
1482        cfBuilder.setNewVersionBehavior(true);
1483      }
1484      builder.setColumnFamily(cfBuilder.build());
1485    }
1486    if (cpName != null) {
1487      builder.setCoprocessor(cpName);
1488    }
1489    getAdmin().createTable(builder.build());
1490    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1491    // assigned
1492    waitUntilAllRegionsAssigned(tableName);
1493    return getConnection().getTable(tableName);
1494  }
1495
1496  /**
1497   * Create a table.
1498   * @return A Table instance for the created table.
1499   */
1500  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1501    throws IOException {
1502    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1503    int i = 0;
1504    for (byte[] family : families) {
1505      ColumnFamilyDescriptorBuilder cfBuilder =
1506        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1507      if (isNewVersionBehaviorEnabled()) {
1508        cfBuilder.setNewVersionBehavior(true);
1509      }
1510      builder.setColumnFamily(cfBuilder.build());
1511      i++;
1512    }
1513    getAdmin().createTable(builder.build());
1514    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1515    // assigned
1516    waitUntilAllRegionsAssigned(tableName);
1517    return getConnection().getTable(tableName);
1518  }
1519
1520  /**
1521   * Create a table.
1522   * @return A Table instance for the created table.
1523   */
1524  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1525    throws IOException {
1526    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1527    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1528    if (isNewVersionBehaviorEnabled()) {
1529      cfBuilder.setNewVersionBehavior(true);
1530    }
1531    builder.setColumnFamily(cfBuilder.build());
1532    getAdmin().createTable(builder.build(), splitRows);
1533    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1534    // assigned
1535    waitUntilAllRegionsAssigned(tableName);
1536    return getConnection().getTable(tableName);
1537  }
1538
1539  /**
1540   * Create a table with multiple regions.
1541   * @return A Table instance for the created table.
1542   */
1543  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1544    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1545  }
1546
1547  /**
1548   * Set the number of Region replicas.
1549   */
1550  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1551    throws IOException, InterruptedException {
1552    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1553      .setRegionReplication(replicaCount).build();
1554    admin.modifyTable(desc);
1555  }
1556
1557  /**
1558   * Set the number of Region replicas.
1559   */
1560  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1561    throws ExecutionException, IOException, InterruptedException {
1562    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1563      .setRegionReplication(replicaCount).build();
1564    admin.modifyTable(desc).get();
1565  }
1566
1567  /**
1568   * Drop an existing table
1569   * @param tableName existing table
1570   */
1571  public void deleteTable(TableName tableName) throws IOException {
1572    try {
1573      getAdmin().disableTable(tableName);
1574    } catch (TableNotEnabledException e) {
1575      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1576    }
1577    getAdmin().deleteTable(tableName);
1578  }
1579
1580  /**
1581   * Drop an existing table
1582   * @param tableName existing table
1583   */
1584  public void deleteTableIfAny(TableName tableName) throws IOException {
1585    try {
1586      deleteTable(tableName);
1587    } catch (TableNotFoundException e) {
1588      // ignore
1589    }
1590  }
1591
1592  // ==========================================================================
1593  // Canned table and table descriptor creation
1594
1595  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1596  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1597  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1598  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1599  private static final int MAXVERSIONS = 3;
1600
1601  public static final char FIRST_CHAR = 'a';
1602  public static final char LAST_CHAR = 'z';
1603  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1604  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1605
1606  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1607    return createModifyableTableDescriptor(TableName.valueOf(name),
1608      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1609      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1610  }
1611
1612  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1613    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1614    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1615    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1616      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1617        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1618        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1619      if (isNewVersionBehaviorEnabled()) {
1620        cfBuilder.setNewVersionBehavior(true);
1621      }
1622      builder.setColumnFamily(cfBuilder.build());
1623    }
1624    return builder.build();
1625  }
1626
1627  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1628    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1629    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1630    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1631      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1632        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1633        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1634      if (isNewVersionBehaviorEnabled()) {
1635        cfBuilder.setNewVersionBehavior(true);
1636      }
1637      builder.setColumnFamily(cfBuilder.build());
1638    }
1639    return builder;
1640  }
1641
1642  /**
1643   * Create a table of name <code>name</code>.
1644   * @param name Name to give table.
1645   * @return Column descriptor.
1646   */
1647  public TableDescriptor createTableDescriptor(final TableName name) {
1648    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1649      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1650  }
1651
1652  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1653    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1654  }
1655
1656  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1657    int maxVersions) {
1658    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1659    for (byte[] family : families) {
1660      ColumnFamilyDescriptorBuilder cfBuilder =
1661        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1662      if (isNewVersionBehaviorEnabled()) {
1663        cfBuilder.setNewVersionBehavior(true);
1664      }
1665      builder.setColumnFamily(cfBuilder.build());
1666    }
1667    return builder.build();
1668  }
1669
1670  /**
1671   * Create an HRegion that writes to the local tmp dirs
1672   * @param desc     a table descriptor indicating which table the region belongs to
1673   * @param startKey the start boundary of the region
1674   * @param endKey   the end boundary of the region
1675   * @return a region that writes to local dir for testing
1676   */
1677  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1678    throws IOException {
1679    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1680      .setEndKey(endKey).build();
1681    return createLocalHRegion(hri, desc);
1682  }
1683
1684  /**
1685   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1686   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1687   */
1688  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1689    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1690  }
1691
1692  /**
1693   * Create an HRegion that writes to the local tmp dirs with specified wal
1694   * @param info regioninfo
1695   * @param conf configuration
1696   * @param desc table descriptor
1697   * @param wal  wal for this region.
1698   * @return created hregion
1699   */
1700  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1701    WAL wal) throws IOException {
1702    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1703      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1704    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1705  }
1706
1707  /**
1708   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1709   *         when done.
1710   */
1711  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1712    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1713    throws IOException {
1714    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1715      durability, wal, null, families);
1716  }
1717
1718  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1719    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1720    boolean[] compactedMemStore, byte[]... families) throws IOException {
1721    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1722    builder.setReadOnly(isReadOnly);
1723    int i = 0;
1724    for (byte[] family : families) {
1725      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1726      if (compactedMemStore != null && i < compactedMemStore.length) {
1727        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1728      } else {
1729        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1730
1731      }
1732      i++;
1733      // Set default to be three versions.
1734      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1735      builder.setColumnFamily(cfBuilder.build());
1736    }
1737    builder.setDurability(durability);
1738    RegionInfo info =
1739      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1740    return createLocalHRegion(info, conf, builder.build(), wal);
1741  }
1742
1743  //
1744  // ==========================================================================
1745
1746  /**
1747   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1748   * read.
1749   * @param tableName existing table
1750   * @return HTable to that new table
1751   */
1752  public Table deleteTableData(TableName tableName) throws IOException {
1753    Table table = getConnection().getTable(tableName);
1754    Scan scan = new Scan();
1755    ResultScanner resScan = table.getScanner(scan);
1756    for (Result res : resScan) {
1757      Delete del = new Delete(res.getRow());
1758      table.delete(del);
1759    }
1760    resScan = table.getScanner(scan);
1761    resScan.close();
1762    return table;
1763  }
1764
1765  /**
1766   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1767   * table.
1768   * @param tableName       table which must exist.
1769   * @param preserveRegions keep the existing split points
1770   * @return HTable for the new table
1771   */
1772  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1773    throws IOException {
1774    Admin admin = getAdmin();
1775    if (!admin.isTableDisabled(tableName)) {
1776      admin.disableTable(tableName);
1777    }
1778    admin.truncateTable(tableName, preserveRegions);
1779    return getConnection().getTable(tableName);
1780  }
1781
1782  /**
1783   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1784   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1785   * preserve regions of existing table.
1786   * @param tableName table which must exist.
1787   * @return HTable for the new table
1788   */
1789  public Table truncateTable(final TableName tableName) throws IOException {
1790    return truncateTable(tableName, false);
1791  }
1792
1793  /**
1794   * Load table with rows from 'aaa' to 'zzz'.
1795   * @param t Table
1796   * @param f Family
1797   * @return Count of rows loaded.
1798   */
1799  public int loadTable(final Table t, final byte[] f) throws IOException {
1800    return loadTable(t, new byte[][] { f });
1801  }
1802
1803  /**
1804   * Load table with rows from 'aaa' to 'zzz'.
1805   * @param t Table
1806   * @param f Family
1807   * @return Count of rows loaded.
1808   */
1809  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1810    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1811  }
1812
1813  /**
1814   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1815   * @param t Table
1816   * @param f Array of Families to load
1817   * @return Count of rows loaded.
1818   */
1819  public int loadTable(final Table t, final byte[][] f) throws IOException {
1820    return loadTable(t, f, null);
1821  }
1822
1823  /**
1824   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1825   * @param t     Table
1826   * @param f     Array of Families to load
1827   * @param value the values of the cells. If null is passed, the row key is used as value
1828   * @return Count of rows loaded.
1829   */
1830  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1831    return loadTable(t, f, value, true);
1832  }
1833
1834  /**
1835   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1836   * @param t     Table
1837   * @param f     Array of Families to load
1838   * @param value the values of the cells. If null is passed, the row key is used as value
1839   * @return Count of rows loaded.
1840   */
1841  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1842    throws IOException {
1843    List<Put> puts = new ArrayList<>();
1844    for (byte[] row : HBaseTestingUtil.ROWS) {
1845      Put put = new Put(row);
1846      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1847      for (int i = 0; i < f.length; i++) {
1848        byte[] value1 = value != null ? value : row;
1849        put.addColumn(f[i], f[i], value1);
1850      }
1851      puts.add(put);
1852    }
1853    t.put(puts);
1854    return puts.size();
1855  }
1856
1857  /**
1858   * A tracker for tracking and validating table rows generated with
1859   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1860   */
1861  public static class SeenRowTracker {
1862    int dim = 'z' - 'a' + 1;
1863    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1864    byte[] startRow;
1865    byte[] stopRow;
1866
1867    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1868      this.startRow = startRow;
1869      this.stopRow = stopRow;
1870    }
1871
1872    void reset() {
1873      for (byte[] row : ROWS) {
1874        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1875      }
1876    }
1877
1878    int i(byte b) {
1879      return b - 'a';
1880    }
1881
1882    public void addRow(byte[] row) {
1883      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1884    }
1885
1886    /**
1887     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1888     * rows none
1889     */
1890    public void validate() {
1891      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1892        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1893          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1894            int count = seenRows[i(b1)][i(b2)][i(b3)];
1895            int expectedCount = 0;
1896            if (
1897              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1898                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1899            ) {
1900              expectedCount = 1;
1901            }
1902            if (count != expectedCount) {
1903              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1904              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1905                + "instead of " + expectedCount);
1906            }
1907          }
1908        }
1909      }
1910    }
1911  }
1912
1913  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1914    return loadRegion(r, f, false);
1915  }
1916
1917  public int loadRegion(final Region r, final byte[] f) throws IOException {
1918    return loadRegion((HRegion) r, f);
1919  }
1920
1921  /**
1922   * Load region with rows from 'aaa' to 'zzz'.
1923   * @param r     Region
1924   * @param f     Family
1925   * @param flush flush the cache if true
1926   * @return Count of rows loaded.
1927   */
1928  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1929    byte[] k = new byte[3];
1930    int rowCount = 0;
1931    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1932      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1933        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1934          k[0] = b1;
1935          k[1] = b2;
1936          k[2] = b3;
1937          Put put = new Put(k);
1938          put.setDurability(Durability.SKIP_WAL);
1939          put.addColumn(f, null, k);
1940          if (r.getWAL() == null) {
1941            put.setDurability(Durability.SKIP_WAL);
1942          }
1943          int preRowCount = rowCount;
1944          int pause = 10;
1945          int maxPause = 1000;
1946          while (rowCount == preRowCount) {
1947            try {
1948              r.put(put);
1949              rowCount++;
1950            } catch (RegionTooBusyException e) {
1951              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1952              Threads.sleep(pause);
1953            }
1954          }
1955        }
1956      }
1957      if (flush) {
1958        r.flush(true);
1959      }
1960    }
1961    return rowCount;
1962  }
1963
1964  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1965    throws IOException {
1966    for (int i = startRow; i < endRow; i++) {
1967      byte[] data = Bytes.toBytes(String.valueOf(i));
1968      Put put = new Put(data);
1969      put.addColumn(f, null, data);
1970      t.put(put);
1971    }
1972  }
1973
1974  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1975    throws IOException {
1976    for (int i = 0; i < totalRows; i++) {
1977      byte[] row = new byte[rowSize];
1978      Bytes.random(row);
1979      Put put = new Put(row);
1980      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1981      t.put(put);
1982    }
1983  }
1984
1985  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
1986    int replicaId) throws IOException {
1987    for (int i = startRow; i < endRow; i++) {
1988      String failMsg = "Failed verification of row :" + i;
1989      byte[] data = Bytes.toBytes(String.valueOf(i));
1990      Get get = new Get(data);
1991      get.setReplicaId(replicaId);
1992      get.setConsistency(Consistency.TIMELINE);
1993      Result result = table.get(get);
1994      assertTrue(failMsg, result.containsColumn(f, null));
1995      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
1996      Cell cell = result.getColumnLatestCell(f, null);
1997      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
1998        cell.getValueOffset(), cell.getValueLength()));
1999    }
2000  }
2001
2002  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2003    throws IOException {
2004    verifyNumericRows((HRegion) region, f, startRow, endRow);
2005  }
2006
2007  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2008    throws IOException {
2009    verifyNumericRows(region, f, startRow, endRow, true);
2010  }
2011
2012  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2013    final boolean present) throws IOException {
2014    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2015  }
2016
2017  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2018    final boolean present) throws IOException {
2019    for (int i = startRow; i < endRow; i++) {
2020      String failMsg = "Failed verification of row :" + i;
2021      byte[] data = Bytes.toBytes(String.valueOf(i));
2022      Result result = region.get(new Get(data));
2023
2024      boolean hasResult = result != null && !result.isEmpty();
2025      assertEquals(failMsg + result, present, hasResult);
2026      if (!present) continue;
2027
2028      assertTrue(failMsg, result.containsColumn(f, null));
2029      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2030      Cell cell = result.getColumnLatestCell(f, null);
2031      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2032        cell.getValueOffset(), cell.getValueLength()));
2033    }
2034  }
2035
2036  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2037    throws IOException {
2038    for (int i = startRow; i < endRow; i++) {
2039      byte[] data = Bytes.toBytes(String.valueOf(i));
2040      Delete delete = new Delete(data);
2041      delete.addFamily(f);
2042      t.delete(delete);
2043    }
2044  }
2045
2046  /**
2047   * Return the number of rows in the given table.
2048   * @param table to count rows
2049   * @return count of rows
2050   */
2051  public static int countRows(final Table table) throws IOException {
2052    return countRows(table, new Scan());
2053  }
2054
2055  public static int countRows(final Table table, final Scan scan) throws IOException {
2056    try (ResultScanner results = table.getScanner(scan)) {
2057      int count = 0;
2058      while (results.next() != null) {
2059        count++;
2060      }
2061      return count;
2062    }
2063  }
2064
2065  public static int countRows(final Table table, final byte[]... families) throws IOException {
2066    Scan scan = new Scan();
2067    for (byte[] family : families) {
2068      scan.addFamily(family);
2069    }
2070    return countRows(table, scan);
2071  }
2072
2073  /**
2074   * Return the number of rows in the given table.
2075   */
2076  public int countRows(final TableName tableName) throws IOException {
2077    try (Table table = getConnection().getTable(tableName)) {
2078      return countRows(table);
2079    }
2080  }
2081
2082  public static int countRows(final Region region) throws IOException {
2083    return countRows(region, new Scan());
2084  }
2085
2086  public static int countRows(final Region region, final Scan scan) throws IOException {
2087    try (InternalScanner scanner = region.getScanner(scan)) {
2088      return countRows(scanner);
2089    }
2090  }
2091
2092  public static int countRows(final InternalScanner scanner) throws IOException {
2093    int scannedCount = 0;
2094    List<Cell> results = new ArrayList<>();
2095    boolean hasMore = true;
2096    while (hasMore) {
2097      hasMore = scanner.next(results);
2098      scannedCount += results.size();
2099      results.clear();
2100    }
2101    return scannedCount;
2102  }
2103
2104  /**
2105   * Return an md5 digest of the entire contents of a table.
2106   */
2107  public String checksumRows(final Table table) throws Exception {
2108    MessageDigest digest = MessageDigest.getInstance("MD5");
2109    try (ResultScanner results = table.getScanner(new Scan())) {
2110      for (Result res : results) {
2111        digest.update(res.getRow());
2112      }
2113    }
2114    return digest.toString();
2115  }
2116
2117  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2118  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2119  static {
2120    int i = 0;
2121    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2122      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2123        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2124          ROWS[i][0] = b1;
2125          ROWS[i][1] = b2;
2126          ROWS[i][2] = b3;
2127          i++;
2128        }
2129      }
2130    }
2131  }
2132
2133  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2134    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2135    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2136    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2137    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2138    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2139    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2140
2141  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2142    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2143    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2144    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2145    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2146    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2147    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2148
2149  /**
2150   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2151   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2152   * @return list of region info for regions added to meta
2153   */
2154  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2155    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2156    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2157      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2158      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2159      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2160        TableState.State.ENABLED);
2161      // add custom ones
2162      for (int i = 0; i < startKeys.length; i++) {
2163        int j = (i + 1) % startKeys.length;
2164        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2165          .setEndKey(startKeys[j]).build();
2166        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2167        newRegions.add(hri);
2168      }
2169      return newRegions;
2170    }
2171  }
2172
2173  /**
2174   * Create an unmanaged WAL. Be sure to close it when you're through.
2175   */
2176  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2177    throws IOException {
2178    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2179    // unless I pass along via the conf.
2180    Configuration confForWAL = new Configuration(conf);
2181    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2182    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2183  }
2184
2185  /**
2186   * Create a region with it's own WAL. Be sure to call
2187   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2188   */
2189  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2190    final Configuration conf, final TableDescriptor htd) throws IOException {
2191    return createRegionAndWAL(info, rootDir, conf, htd, true);
2192  }
2193
2194  /**
2195   * Create a region with it's own WAL. Be sure to call
2196   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2197   */
2198  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2199    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2200    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2201    region.setBlockCache(blockCache);
2202    region.initialize();
2203    return region;
2204  }
2205
2206  /**
2207   * Create a region with it's own WAL. Be sure to call
2208   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2209   */
2210  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2211    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2212    throws IOException {
2213    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2214    region.setMobFileCache(mobFileCache);
2215    region.initialize();
2216    return region;
2217  }
2218
2219  /**
2220   * Create a region with it's own WAL. Be sure to call
2221   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2222   */
2223  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2224    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2225    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2226      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2227    WAL wal = createWal(conf, rootDir, info);
2228    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2229  }
2230
2231  /**
2232   * Find any other region server which is different from the one identified by parameter
2233   * @return another region server
2234   */
2235  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2236    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2237      if (!(rst.getRegionServer() == rs)) {
2238        return rst.getRegionServer();
2239      }
2240    }
2241    return null;
2242  }
2243
2244  /**
2245   * Tool to get the reference to the region server object that holds the region of the specified
2246   * user table.
2247   * @param tableName user table to lookup in hbase:meta
2248   * @return region server that holds it, null if the row doesn't exist
2249   */
2250  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2251    throws IOException, InterruptedException {
2252    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2253    if (regions == null || regions.isEmpty()) {
2254      return null;
2255    }
2256    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2257
2258    byte[] firstRegionName =
2259      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2260        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2261
2262    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2263    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2264      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2265    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2266      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2267    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2268    while (retrier.shouldRetry()) {
2269      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2270      if (index != -1) {
2271        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2272      }
2273      // Came back -1. Region may not be online yet. Sleep a while.
2274      retrier.sleepUntilNextRetry();
2275    }
2276    return null;
2277  }
2278
2279  /**
2280   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2281   * @throws IOException When starting the cluster fails.
2282   */
2283  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2284    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2285    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2286      "99.0");
2287    startMiniMapReduceCluster(2);
2288    return mrCluster;
2289  }
2290
2291  /**
2292   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2293   * internal static LOG_DIR variable.
2294   */
2295  private void forceChangeTaskLogDir() {
2296    Field logDirField;
2297    try {
2298      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2299      logDirField.setAccessible(true);
2300
2301      Field modifiersField = ReflectionUtils.getModifiersField();
2302      modifiersField.setAccessible(true);
2303      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2304
2305      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2306    } catch (SecurityException e) {
2307      throw new RuntimeException(e);
2308    } catch (NoSuchFieldException e) {
2309      throw new RuntimeException(e);
2310    } catch (IllegalArgumentException e) {
2311      throw new RuntimeException(e);
2312    } catch (IllegalAccessException e) {
2313      throw new RuntimeException(e);
2314    }
2315  }
2316
2317  /**
2318   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2319   * filesystem.
2320   * @param servers The number of <code>TaskTracker</code>'s to start.
2321   * @throws IOException When starting the cluster fails.
2322   */
2323  private void startMiniMapReduceCluster(final int servers) throws IOException {
2324    if (mrCluster != null) {
2325      throw new IllegalStateException("MiniMRCluster is already running");
2326    }
2327    LOG.info("Starting mini mapreduce cluster...");
2328    setupClusterTestDir();
2329    createDirsAndSetProperties();
2330
2331    forceChangeTaskLogDir();
2332
2333    //// hadoop2 specific settings
2334    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2335    // we up the VM usable so that processes don't get killed.
2336    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2337
2338    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2339    // this avoids the problem by disabling speculative task execution in tests.
2340    conf.setBoolean("mapreduce.map.speculative", false);
2341    conf.setBoolean("mapreduce.reduce.speculative", false);
2342    ////
2343
2344    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2345    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2346    if (JVM.getJVMSpecVersion() >= 17) {
2347      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2348      conf.set("yarn.app.mapreduce.am.command-opts",
2349        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2350    }
2351
2352    // Allow the user to override FS URI for this map-reduce cluster to use.
2353    mrCluster =
2354      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2355        1, null, null, new JobConf(this.conf));
2356    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2357    if (jobConf == null) {
2358      jobConf = mrCluster.createJobConf();
2359    }
2360
2361    // Hadoop MiniMR overwrites this while it should not
2362    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2363    LOG.info("Mini mapreduce cluster started");
2364
2365    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2366    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2367    // necessary config properties here. YARN-129 required adding a few properties.
2368    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2369    // this for mrv2 support; mr1 ignores this
2370    conf.set("mapreduce.framework.name", "yarn");
2371    conf.setBoolean("yarn.is.minicluster", true);
2372    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2373    if (rmAddress != null) {
2374      conf.set("yarn.resourcemanager.address", rmAddress);
2375    }
2376    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2377    if (historyAddress != null) {
2378      conf.set("mapreduce.jobhistory.address", historyAddress);
2379    }
2380    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2381    if (schedulerAddress != null) {
2382      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2383    }
2384    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2385    if (mrJobHistoryWebappAddress != null) {
2386      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2387    }
2388    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2389    if (yarnRMWebappAddress != null) {
2390      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2391    }
2392  }
2393
2394  /**
2395   * Stops the previously started <code>MiniMRCluster</code>.
2396   */
2397  public void shutdownMiniMapReduceCluster() {
2398    if (mrCluster != null) {
2399      LOG.info("Stopping mini mapreduce cluster...");
2400      mrCluster.shutdown();
2401      mrCluster = null;
2402      LOG.info("Mini mapreduce cluster stopped");
2403    }
2404    // Restore configuration to point to local jobtracker
2405    conf.set("mapreduce.jobtracker.address", "local");
2406  }
2407
2408  /**
2409   * Create a stubbed out RegionServerService, mainly for getting FS.
2410   */
2411  public RegionServerServices createMockRegionServerService() throws IOException {
2412    return createMockRegionServerService((ServerName) null);
2413  }
2414
2415  /**
2416   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2417   * TestTokenAuthentication
2418   */
2419  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2420    throws IOException {
2421    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2422    rss.setFileSystem(getTestFileSystem());
2423    rss.setRpcServer(rpc);
2424    return rss;
2425  }
2426
2427  /**
2428   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2429   * TestOpenRegionHandler
2430   */
2431  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2432    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2433    rss.setFileSystem(getTestFileSystem());
2434    return rss;
2435  }
2436
2437  /**
2438   * Expire the Master's session
2439   */
2440  public void expireMasterSession() throws Exception {
2441    HMaster master = getMiniHBaseCluster().getMaster();
2442    expireSession(master.getZooKeeper(), false);
2443  }
2444
2445  /**
2446   * Expire a region server's session
2447   * @param index which RS
2448   */
2449  public void expireRegionServerSession(int index) throws Exception {
2450    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2451    expireSession(rs.getZooKeeper(), false);
2452    decrementMinRegionServerCount();
2453  }
2454
2455  private void decrementMinRegionServerCount() {
2456    // decrement the count for this.conf, for newly spwaned master
2457    // this.hbaseCluster shares this configuration too
2458    decrementMinRegionServerCount(getConfiguration());
2459
2460    // each master thread keeps a copy of configuration
2461    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2462      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2463    }
2464  }
2465
2466  private void decrementMinRegionServerCount(Configuration conf) {
2467    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2468    if (currentCount != -1) {
2469      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2470    }
2471  }
2472
2473  public void expireSession(ZKWatcher nodeZK) throws Exception {
2474    expireSession(nodeZK, false);
2475  }
2476
2477  /**
2478   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2479   * http://hbase.apache.org/book.html#trouble.zookeeper
2480   * <p/>
2481   * There are issues when doing this:
2482   * <ol>
2483   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2484   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2485   * </ol>
2486   * @param nodeZK      - the ZK watcher to expire
2487   * @param checkStatus - true to check if we can create a Table with the current configuration.
2488   */
2489  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2490    Configuration c = new Configuration(this.conf);
2491    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2492    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2493    byte[] password = zk.getSessionPasswd();
2494    long sessionID = zk.getSessionId();
2495
2496    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2497    // so we create a first watcher to be sure that the
2498    // event was sent. We expect that if our watcher receives the event
2499    // other watchers on the same machine will get is as well.
2500    // When we ask to close the connection, ZK does not close it before
2501    // we receive all the events, so don't have to capture the event, just
2502    // closing the connection should be enough.
2503    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2504      @Override
2505      public void process(WatchedEvent watchedEvent) {
2506        LOG.info("Monitor ZKW received event=" + watchedEvent);
2507      }
2508    }, sessionID, password);
2509
2510    // Making it expire
2511    ZooKeeper newZK =
2512      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2513
2514    // ensure that we have connection to the server before closing down, otherwise
2515    // the close session event will be eaten out before we start CONNECTING state
2516    long start = EnvironmentEdgeManager.currentTime();
2517    while (
2518      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2519    ) {
2520      Thread.sleep(1);
2521    }
2522    newZK.close();
2523    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2524
2525    // Now closing & waiting to be sure that the clients get it.
2526    monitor.close();
2527
2528    if (checkStatus) {
2529      getConnection().getTable(TableName.META_TABLE_NAME).close();
2530    }
2531  }
2532
2533  /**
2534   * Get the Mini HBase cluster.
2535   * @return hbase cluster
2536   * @see #getHBaseClusterInterface()
2537   */
2538  public SingleProcessHBaseCluster getHBaseCluster() {
2539    return getMiniHBaseCluster();
2540  }
2541
2542  /**
2543   * Returns the HBaseCluster instance.
2544   * <p>
2545   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2546   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2547   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2548   * instead w/o the need to type-cast.
2549   */
2550  public HBaseClusterInterface getHBaseClusterInterface() {
2551    // implementation note: we should rename this method as #getHBaseCluster(),
2552    // but this would require refactoring 90+ calls.
2553    return hbaseCluster;
2554  }
2555
2556  /**
2557   * Resets the connections so that the next time getConnection() is called, a new connection is
2558   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2559   * the connection is not valid anymore.
2560   * <p/>
2561   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2562   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2563   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2564   * the connection state automatically. Cleaning this is a much bigger refactor.
2565   */
2566  public void invalidateConnection() throws IOException {
2567    closeConnection();
2568    // Update the master addresses if they changed.
2569    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2570    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2571    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2572      masterConfigBefore, masterConfAfter);
2573    conf.set(HConstants.MASTER_ADDRS_KEY,
2574      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2575  }
2576
2577  /**
2578   * Get a shared Connection to the cluster. this method is thread safe.
2579   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2580   */
2581  public Connection getConnection() throws IOException {
2582    return getAsyncConnection().toConnection();
2583  }
2584
2585  /**
2586   * Get a assigned Connection to the cluster. this method is thread safe.
2587   * @param user assigned user
2588   * @return A Connection with assigned user.
2589   */
2590  public Connection getConnection(User user) throws IOException {
2591    return getAsyncConnection(user).toConnection();
2592  }
2593
2594  /**
2595   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2596   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2597   *         of cluster.
2598   */
2599  public AsyncClusterConnection getAsyncConnection() throws IOException {
2600    try {
2601      return asyncConnection.updateAndGet(connection -> {
2602        if (connection == null) {
2603          try {
2604            User user = UserProvider.instantiate(conf).getCurrent();
2605            connection = getAsyncConnection(user);
2606          } catch (IOException ioe) {
2607            throw new UncheckedIOException("Failed to create connection", ioe);
2608          }
2609        }
2610        return connection;
2611      });
2612    } catch (UncheckedIOException exception) {
2613      throw exception.getCause();
2614    }
2615  }
2616
2617  /**
2618   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2619   * @param user assigned user
2620   * @return An AsyncClusterConnection with assigned user.
2621   */
2622  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2623    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2624  }
2625
2626  public void closeConnection() throws IOException {
2627    if (hbaseAdmin != null) {
2628      Closeables.close(hbaseAdmin, true);
2629      hbaseAdmin = null;
2630    }
2631    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2632    if (asyncConnection != null) {
2633      Closeables.close(asyncConnection, true);
2634    }
2635  }
2636
2637  /**
2638   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2639   * it has no effect, it will be closed automatically when the cluster shutdowns
2640   */
2641  public Admin getAdmin() throws IOException {
2642    if (hbaseAdmin == null) {
2643      this.hbaseAdmin = getConnection().getAdmin();
2644    }
2645    return hbaseAdmin;
2646  }
2647
2648  private Admin hbaseAdmin = null;
2649
2650  /**
2651   * Returns an {@link Hbck} instance. Needs be closed when done.
2652   */
2653  public Hbck getHbck() throws IOException {
2654    return getConnection().getHbck();
2655  }
2656
2657  /**
2658   * Unassign the named region.
2659   * @param regionName The region to unassign.
2660   */
2661  public void unassignRegion(String regionName) throws IOException {
2662    unassignRegion(Bytes.toBytes(regionName));
2663  }
2664
2665  /**
2666   * Unassign the named region.
2667   * @param regionName The region to unassign.
2668   */
2669  public void unassignRegion(byte[] regionName) throws IOException {
2670    getAdmin().unassign(regionName);
2671  }
2672
2673  /**
2674   * Closes the region containing the given row.
2675   * @param row   The row to find the containing region.
2676   * @param table The table to find the region.
2677   */
2678  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2679    unassignRegionByRow(Bytes.toBytes(row), table);
2680  }
2681
2682  /**
2683   * Closes the region containing the given row.
2684   * @param row   The row to find the containing region.
2685   * @param table The table to find the region.
2686   */
2687  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2688    HRegionLocation hrl = table.getRegionLocation(row);
2689    unassignRegion(hrl.getRegion().getRegionName());
2690  }
2691
2692  /**
2693   * Retrieves a splittable region randomly from tableName
2694   * @param tableName   name of table
2695   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2696   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2697   */
2698  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2699    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2700    int regCount = regions.size();
2701    Set<Integer> attempted = new HashSet<>();
2702    int idx;
2703    int attempts = 0;
2704    do {
2705      regions = getHBaseCluster().getRegions(tableName);
2706      if (regCount != regions.size()) {
2707        // if there was region movement, clear attempted Set
2708        attempted.clear();
2709      }
2710      regCount = regions.size();
2711      // There are chances that before we get the region for the table from an RS the region may
2712      // be going for CLOSE. This may be because online schema change is enabled
2713      if (regCount > 0) {
2714        idx = ThreadLocalRandom.current().nextInt(regCount);
2715        // if we have just tried this region, there is no need to try again
2716        if (attempted.contains(idx)) {
2717          continue;
2718        }
2719        HRegion region = regions.get(idx);
2720        if (region.checkSplit().isPresent()) {
2721          return region;
2722        }
2723        attempted.add(idx);
2724      }
2725      attempts++;
2726    } while (maxAttempts == -1 || attempts < maxAttempts);
2727    return null;
2728  }
2729
2730  public MiniDFSCluster getDFSCluster() {
2731    return dfsCluster;
2732  }
2733
2734  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2735    setDFSCluster(cluster, true);
2736  }
2737
2738  /**
2739   * Set the MiniDFSCluster
2740   * @param cluster     cluster to use
2741   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2742   *                    is set.
2743   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2744   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2745   */
2746  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2747    throws IllegalStateException, IOException {
2748    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2749      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2750    }
2751    this.dfsCluster = cluster;
2752    this.setFs();
2753  }
2754
2755  public FileSystem getTestFileSystem() throws IOException {
2756    return HFileSystem.get(conf);
2757  }
2758
2759  /**
2760   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2761   * (30 seconds).
2762   * @param table Table to wait on.
2763   */
2764  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2765    waitTableAvailable(table.getName(), 30000);
2766  }
2767
2768  public void waitTableAvailable(TableName table, long timeoutMillis)
2769    throws InterruptedException, IOException {
2770    waitFor(timeoutMillis, predicateTableAvailable(table));
2771  }
2772
2773  /**
2774   * Wait until all regions in a table have been assigned
2775   * @param table         Table to wait on.
2776   * @param timeoutMillis Timeout.
2777   */
2778  public void waitTableAvailable(byte[] table, long timeoutMillis)
2779    throws InterruptedException, IOException {
2780    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2781  }
2782
2783  public String explainTableAvailability(TableName tableName) throws IOException {
2784    StringBuilder msg =
2785      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2786    if (getHBaseCluster().getMaster().isAlive()) {
2787      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2788        .getRegionStates().getRegionAssignments();
2789      final List<Pair<RegionInfo, ServerName>> metaLocations =
2790        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2791      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2792        RegionInfo hri = metaLocation.getFirst();
2793        ServerName sn = metaLocation.getSecond();
2794        if (!assignments.containsKey(hri)) {
2795          msg.append(", region ").append(hri)
2796            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2797        } else if (sn == null) {
2798          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2799        } else if (!sn.equals(assignments.get(hri))) {
2800          msg.append(",  region ").append(hri)
2801            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2802            .append(" <> ").append(assignments.get(hri));
2803        }
2804      }
2805    }
2806    return msg.toString();
2807  }
2808
2809  public String explainTableState(final TableName table, TableState.State state)
2810    throws IOException {
2811    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2812    if (tableState == null) {
2813      return "TableState in META: No table state in META for table " + table
2814        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2815    } else if (!tableState.inStates(state)) {
2816      return "TableState in META: Not " + state + " state, but " + tableState;
2817    } else {
2818      return "TableState in META: OK";
2819    }
2820  }
2821
2822  @Nullable
2823  public TableState findLastTableState(final TableName table) throws IOException {
2824    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2825    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2826      @Override
2827      public boolean visit(Result r) throws IOException {
2828        if (!Arrays.equals(r.getRow(), table.getName())) {
2829          return false;
2830        }
2831        TableState state = CatalogFamilyFormat.getTableState(r);
2832        if (state != null) {
2833          lastTableState.set(state);
2834        }
2835        return true;
2836      }
2837    };
2838    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2839      Integer.MAX_VALUE, visitor);
2840    return lastTableState.get();
2841  }
2842
2843  /**
2844   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2845   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2846   * table.
2847   * @param table the table to wait on.
2848   * @throws InterruptedException if interrupted while waiting
2849   * @throws IOException          if an IO problem is encountered
2850   */
2851  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2852    waitTableEnabled(table, 30000);
2853  }
2854
2855  /**
2856   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2857   * have been all assigned.
2858   * @see #waitTableEnabled(TableName, long)
2859   * @param table         Table to wait on.
2860   * @param timeoutMillis Time to wait on it being marked enabled.
2861   */
2862  public void waitTableEnabled(byte[] table, long timeoutMillis)
2863    throws InterruptedException, IOException {
2864    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2865  }
2866
2867  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2868    waitFor(timeoutMillis, predicateTableEnabled(table));
2869  }
2870
2871  /**
2872   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2873   * after default period (30 seconds)
2874   * @param table Table to wait on.
2875   */
2876  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2877    waitTableDisabled(table, 30000);
2878  }
2879
2880  public void waitTableDisabled(TableName table, long millisTimeout)
2881    throws InterruptedException, IOException {
2882    waitFor(millisTimeout, predicateTableDisabled(table));
2883  }
2884
2885  /**
2886   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2887   * @param table         Table to wait on.
2888   * @param timeoutMillis Time to wait on it being marked disabled.
2889   */
2890  public void waitTableDisabled(byte[] table, long timeoutMillis)
2891    throws InterruptedException, IOException {
2892    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2893  }
2894
2895  /**
2896   * Make sure that at least the specified number of region servers are running
2897   * @param num minimum number of region servers that should be running
2898   * @return true if we started some servers
2899   */
2900  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2901    boolean startedServer = false;
2902    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2903    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2904      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2905      startedServer = true;
2906    }
2907
2908    return startedServer;
2909  }
2910
2911  /**
2912   * Make sure that at least the specified number of region servers are running. We don't count the
2913   * ones that are currently stopping or are stopped.
2914   * @param num minimum number of region servers that should be running
2915   * @return true if we started some servers
2916   */
2917  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2918    boolean startedServer = ensureSomeRegionServersAvailable(num);
2919
2920    int nonStoppedServers = 0;
2921    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2922
2923      HRegionServer hrs = rst.getRegionServer();
2924      if (hrs.isStopping() || hrs.isStopped()) {
2925        LOG.info("A region server is stopped or stopping:" + hrs);
2926      } else {
2927        nonStoppedServers++;
2928      }
2929    }
2930    for (int i = nonStoppedServers; i < num; ++i) {
2931      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2932      startedServer = true;
2933    }
2934    return startedServer;
2935  }
2936
2937  /**
2938   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2939   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2940   * @param c                     Initial configuration
2941   * @param differentiatingSuffix Suffix to differentiate this user from others.
2942   * @return A new configuration instance with a different user set into it.
2943   */
2944  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2945    throws IOException {
2946    FileSystem currentfs = FileSystem.get(c);
2947    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2948      return User.getCurrent();
2949    }
2950    // Else distributed filesystem. Make a new instance per daemon. Below
2951    // code is taken from the AppendTestUtil over in hdfs.
2952    String username = User.getCurrent().getName() + differentiatingSuffix;
2953    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2954    return user;
2955  }
2956
2957  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2958    throws IOException {
2959    NavigableSet<String> online = new TreeSet<>();
2960    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2961      try {
2962        for (RegionInfo region : ProtobufUtil
2963          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2964          online.add(region.getRegionNameAsString());
2965        }
2966      } catch (RegionServerStoppedException e) {
2967        // That's fine.
2968      }
2969    }
2970    return online;
2971  }
2972
2973  /**
2974   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2975   * linger. Here is the exception you'll see:
2976   *
2977   * <pre>
2978   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2979   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2980   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2981   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2982   * </pre>
2983   *
2984   * @param stream A DFSClient.DFSOutputStream.
2985   */
2986  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2987    try {
2988      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2989      for (Class<?> clazz : clazzes) {
2990        String className = clazz.getSimpleName();
2991        if (className.equals("DFSOutputStream")) {
2992          if (clazz.isInstance(stream)) {
2993            Field maxRecoveryErrorCountField =
2994              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2995            maxRecoveryErrorCountField.setAccessible(true);
2996            maxRecoveryErrorCountField.setInt(stream, max);
2997            break;
2998          }
2999        }
3000      }
3001    } catch (Exception e) {
3002      LOG.info("Could not set max recovery field", e);
3003    }
3004  }
3005
3006  /**
3007   * Uses directly the assignment manager to assign the region. and waits until the specified region
3008   * has completed assignment.
3009   * @return true if the region is assigned false otherwise.
3010   */
3011  public boolean assignRegion(final RegionInfo regionInfo)
3012    throws IOException, InterruptedException {
3013    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3014    am.assign(regionInfo);
3015    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3016  }
3017
3018  /**
3019   * Move region to destination server and wait till region is completely moved and online
3020   * @param destRegion region to move
3021   * @param destServer destination server of the region
3022   */
3023  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3024    throws InterruptedException, IOException {
3025    HMaster master = getMiniHBaseCluster().getMaster();
3026    // TODO: Here we start the move. The move can take a while.
3027    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3028    while (true) {
3029      ServerName serverName =
3030        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3031      if (serverName != null && serverName.equals(destServer)) {
3032        assertRegionOnServer(destRegion, serverName, 2000);
3033        break;
3034      }
3035      Thread.sleep(10);
3036    }
3037  }
3038
3039  /**
3040   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3041   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3042   * master has been informed and updated hbase:meta with the regions deployed server.
3043   * @param tableName the table name
3044   */
3045  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3046    waitUntilAllRegionsAssigned(tableName,
3047      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3048  }
3049
3050  /**
3051   * Waith until all system table's regions get assigned
3052   */
3053  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3054    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3055  }
3056
3057  /**
3058   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3059   * timeout. This means all regions have been deployed, master has been informed and updated
3060   * hbase:meta with the regions deployed server.
3061   * @param tableName the table name
3062   * @param timeout   timeout, in milliseconds
3063   */
3064  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3065    throws IOException {
3066    if (!TableName.isMetaTableName(tableName)) {
3067      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3068        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3069          + timeout + "ms");
3070        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3071          @Override
3072          public String explainFailure() throws IOException {
3073            return explainTableAvailability(tableName);
3074          }
3075
3076          @Override
3077          public boolean evaluate() throws IOException {
3078            Scan scan = new Scan();
3079            scan.addFamily(HConstants.CATALOG_FAMILY);
3080            boolean tableFound = false;
3081            try (ResultScanner s = meta.getScanner(scan)) {
3082              for (Result r; (r = s.next()) != null;) {
3083                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3084                RegionInfo info = RegionInfo.parseFromOrNull(b);
3085                if (info != null && info.getTable().equals(tableName)) {
3086                  // Get server hosting this region from catalog family. Return false if no server
3087                  // hosting this region, or if the server hosting this region was recently killed
3088                  // (for fault tolerance testing).
3089                  tableFound = true;
3090                  byte[] server =
3091                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3092                  if (server == null) {
3093                    return false;
3094                  } else {
3095                    byte[] startCode =
3096                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3097                    ServerName serverName =
3098                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3099                        + Bytes.toLong(startCode));
3100                    if (
3101                      !getHBaseClusterInterface().isDistributedCluster()
3102                        && getHBaseCluster().isKilledRS(serverName)
3103                    ) {
3104                      return false;
3105                    }
3106                  }
3107                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3108                    return false;
3109                  }
3110                }
3111              }
3112            }
3113            if (!tableFound) {
3114              LOG.warn(
3115                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3116            }
3117            return tableFound;
3118          }
3119        });
3120      }
3121    }
3122    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3123    // check from the master state if we are using a mini cluster
3124    if (!getHBaseClusterInterface().isDistributedCluster()) {
3125      // So, all regions are in the meta table but make sure master knows of the assignments before
3126      // returning -- sometimes this can lag.
3127      HMaster master = getHBaseCluster().getMaster();
3128      final RegionStates states = master.getAssignmentManager().getRegionStates();
3129      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3130        @Override
3131        public String explainFailure() throws IOException {
3132          return explainTableAvailability(tableName);
3133        }
3134
3135        @Override
3136        public boolean evaluate() throws IOException {
3137          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3138          return hris != null && !hris.isEmpty();
3139        }
3140      });
3141    }
3142    LOG.info("All regions for table " + tableName + " assigned.");
3143  }
3144
3145  /**
3146   * Do a small get/scan against one store. This is required because store has no actual methods of
3147   * querying itself, and relies on StoreScanner.
3148   */
3149  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3150    Scan scan = new Scan(get);
3151    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3152      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3153      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3154      // readpoint 0.
3155      0);
3156
3157    List<Cell> result = new ArrayList<>();
3158    scanner.next(result);
3159    if (!result.isEmpty()) {
3160      // verify that we are on the row we want:
3161      Cell kv = result.get(0);
3162      if (!CellUtil.matchingRows(kv, get.getRow())) {
3163        result.clear();
3164      }
3165    }
3166    scanner.close();
3167    return result;
3168  }
3169
3170  /**
3171   * Create region split keys between startkey and endKey
3172   * @param numRegions the number of regions to be created. it has to be greater than 3.
3173   * @return resulting split keys
3174   */
3175  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3176    assertTrue(numRegions > 3);
3177    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3178    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3179    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3180    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3181    return result;
3182  }
3183
3184  /**
3185   * Do a small get/scan against one store. This is required because store has no actual methods of
3186   * querying itself, and relies on StoreScanner.
3187   */
3188  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3189    throws IOException {
3190    Get get = new Get(row);
3191    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3192    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3193
3194    return getFromStoreFile(store, get);
3195  }
3196
3197  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3198    final List<? extends Cell> actual) {
3199    final int eLen = expected.size();
3200    final int aLen = actual.size();
3201    final int minLen = Math.min(eLen, aLen);
3202
3203    int i = 0;
3204    while (
3205      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3206    ) {
3207      i++;
3208    }
3209
3210    if (additionalMsg == null) {
3211      additionalMsg = "";
3212    }
3213    if (!additionalMsg.isEmpty()) {
3214      additionalMsg = ". " + additionalMsg;
3215    }
3216
3217    if (eLen != aLen || i != minLen) {
3218      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3219        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3220        + " (length " + aLen + ")" + additionalMsg);
3221    }
3222  }
3223
3224  public static <T> String safeGetAsStr(List<T> lst, int i) {
3225    if (0 <= i && i < lst.size()) {
3226      return lst.get(i).toString();
3227    } else {
3228      return "<out_of_range>";
3229    }
3230  }
3231
3232  public String getClusterKey() {
3233    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3234      + ":"
3235      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3236  }
3237
3238  /**
3239   * Creates a random table with the given parameters
3240   */
3241  public Table createRandomTable(TableName tableName, final Collection<String> families,
3242    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3243    final int numRowsPerFlush) throws IOException, InterruptedException {
3244    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3245      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3246      + maxVersions + "\n");
3247
3248    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3249    final int numCF = families.size();
3250    final byte[][] cfBytes = new byte[numCF][];
3251    {
3252      int cfIndex = 0;
3253      for (String cf : families) {
3254        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3255      }
3256    }
3257
3258    final int actualStartKey = 0;
3259    final int actualEndKey = Integer.MAX_VALUE;
3260    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3261    final int splitStartKey = actualStartKey + keysPerRegion;
3262    final int splitEndKey = actualEndKey - keysPerRegion;
3263    final String keyFormat = "%08x";
3264    final Table table = createTable(tableName, cfBytes, maxVersions,
3265      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3266      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3267
3268    if (hbaseCluster != null) {
3269      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3270    }
3271
3272    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3273
3274    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3275      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3276        final byte[] row = Bytes.toBytes(
3277          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3278
3279        Put put = new Put(row);
3280        Delete del = new Delete(row);
3281        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3282          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3283          final long ts = rand.nextInt();
3284          final byte[] qual = Bytes.toBytes("col" + iCol);
3285          if (rand.nextBoolean()) {
3286            final byte[] value =
3287              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3288                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3289            put.addColumn(cf, qual, ts, value);
3290          } else if (rand.nextDouble() < 0.8) {
3291            del.addColumn(cf, qual, ts);
3292          } else {
3293            del.addColumns(cf, qual, ts);
3294          }
3295        }
3296
3297        if (!put.isEmpty()) {
3298          mutator.mutate(put);
3299        }
3300
3301        if (!del.isEmpty()) {
3302          mutator.mutate(del);
3303        }
3304      }
3305      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3306      mutator.flush();
3307      if (hbaseCluster != null) {
3308        getMiniHBaseCluster().flushcache(table.getName());
3309      }
3310    }
3311    mutator.close();
3312
3313    return table;
3314  }
3315
3316  public static int randomFreePort() {
3317    return HBaseCommonTestingUtil.randomFreePort();
3318  }
3319
3320  public static String randomMultiCastAddress() {
3321    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3322  }
3323
3324  public static void waitForHostPort(String host, int port) throws IOException {
3325    final int maxTimeMs = 10000;
3326    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3327    IOException savedException = null;
3328    LOG.info("Waiting for server at " + host + ":" + port);
3329    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3330      try {
3331        Socket sock = new Socket(InetAddress.getByName(host), port);
3332        sock.close();
3333        savedException = null;
3334        LOG.info("Server at " + host + ":" + port + " is available");
3335        break;
3336      } catch (UnknownHostException e) {
3337        throw new IOException("Failed to look up " + host, e);
3338      } catch (IOException e) {
3339        savedException = e;
3340      }
3341      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3342    }
3343
3344    if (savedException != null) {
3345      throw savedException;
3346    }
3347  }
3348
3349  /**
3350   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3351   * continues.
3352   * @return the number of regions the table was split into
3353   */
3354  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3355    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3356    throws IOException {
3357    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3358      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3359  }
3360
3361  /**
3362   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3363   * continues.
3364   * @return the number of regions the table was split into
3365   */
3366  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3367    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3368    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3369    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3370    builder.setDurability(durability);
3371    builder.setRegionReplication(regionReplication);
3372    ColumnFamilyDescriptorBuilder cfBuilder =
3373      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3374    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3375    cfBuilder.setCompressionType(compression);
3376    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3377      numRegionsPerServer);
3378  }
3379
3380  /**
3381   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3382   * continues.
3383   * @return the number of regions the table was split into
3384   */
3385  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3386    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3387    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3388    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3389    builder.setDurability(durability);
3390    builder.setRegionReplication(regionReplication);
3391    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3392    for (int i = 0; i < columnFamilies.length; i++) {
3393      ColumnFamilyDescriptorBuilder cfBuilder =
3394        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3395      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3396      cfBuilder.setCompressionType(compression);
3397      hcds[i] = cfBuilder.build();
3398    }
3399    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3400  }
3401
3402  /**
3403   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3404   * continues.
3405   * @return the number of regions the table was split into
3406   */
3407  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3408    ColumnFamilyDescriptor hcd) throws IOException {
3409    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3410  }
3411
3412  /**
3413   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3414   * continues.
3415   * @return the number of regions the table was split into
3416   */
3417  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3418    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3419    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3420      numRegionsPerServer);
3421  }
3422
3423  /**
3424   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3425   * continues.
3426   * @return the number of regions the table was split into
3427   */
3428  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3429    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3430    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3431      numRegionsPerServer);
3432  }
3433
3434  /**
3435   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3436   * continues.
3437   * @return the number of regions the table was split into
3438   */
3439  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3440    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3441    throws IOException {
3442    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3443    for (ColumnFamilyDescriptor cd : cds) {
3444      if (!td.hasColumnFamily(cd.getName())) {
3445        builder.setColumnFamily(cd);
3446      }
3447    }
3448    td = builder.build();
3449    int totalNumberOfRegions = 0;
3450    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3451    Admin admin = unmanagedConnection.getAdmin();
3452
3453    try {
3454      // create a table a pre-splits regions.
3455      // The number of splits is set as:
3456      // region servers * regions per region server).
3457      int numberOfServers = admin.getRegionServers().size();
3458      if (numberOfServers == 0) {
3459        throw new IllegalStateException("No live regionservers");
3460      }
3461
3462      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3463      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3464        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3465        + numRegionsPerServer + ")");
3466
3467      byte[][] splits = splitter.split(totalNumberOfRegions);
3468
3469      admin.createTable(td, splits);
3470    } catch (MasterNotRunningException e) {
3471      LOG.error("Master not running", e);
3472      throw new IOException(e);
3473    } catch (TableExistsException e) {
3474      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3475    } finally {
3476      admin.close();
3477      unmanagedConnection.close();
3478    }
3479    return totalNumberOfRegions;
3480  }
3481
3482  public static int getMetaRSPort(Connection connection) throws IOException {
3483    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3484      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3485    }
3486  }
3487
3488  /**
3489   * Due to async racing issue, a region may not be in the online region list of a region server
3490   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3491   */
3492  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3493    final long timeout) throws IOException, InterruptedException {
3494    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3495    while (true) {
3496      List<RegionInfo> regions = getAdmin().getRegions(server);
3497      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3498      long now = EnvironmentEdgeManager.currentTime();
3499      if (now > timeoutTime) break;
3500      Thread.sleep(10);
3501    }
3502    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3503  }
3504
3505  /**
3506   * Check to make sure the region is open on the specified region server, but not on any other one.
3507   */
3508  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3509    final long timeout) throws IOException, InterruptedException {
3510    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3511    while (true) {
3512      List<RegionInfo> regions = getAdmin().getRegions(server);
3513      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3514        List<JVMClusterUtil.RegionServerThread> rsThreads =
3515          getHBaseCluster().getLiveRegionServerThreads();
3516        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3517          HRegionServer rs = rsThread.getRegionServer();
3518          if (server.equals(rs.getServerName())) {
3519            continue;
3520          }
3521          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3522          for (HRegion r : hrs) {
3523            assertTrue("Region should not be double assigned",
3524              r.getRegionInfo().getRegionId() != hri.getRegionId());
3525          }
3526        }
3527        return; // good, we are happy
3528      }
3529      long now = EnvironmentEdgeManager.currentTime();
3530      if (now > timeoutTime) break;
3531      Thread.sleep(10);
3532    }
3533    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3534  }
3535
3536  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3537    TableDescriptor td =
3538      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3539    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3540    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3541  }
3542
3543  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3544    BlockCache blockCache) throws IOException {
3545    TableDescriptor td =
3546      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3547    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3548    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3549  }
3550
3551  public static void setFileSystemURI(String fsURI) {
3552    FS_URI = fsURI;
3553  }
3554
3555  /**
3556   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3557   */
3558  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3559    return new ExplainingPredicate<IOException>() {
3560      @Override
3561      public String explainFailure() throws IOException {
3562        final RegionStates regionStates =
3563          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3564        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3565      }
3566
3567      @Override
3568      public boolean evaluate() throws IOException {
3569        HMaster master = getMiniHBaseCluster().getMaster();
3570        if (master == null) return false;
3571        AssignmentManager am = master.getAssignmentManager();
3572        if (am == null) return false;
3573        return !am.hasRegionsInTransition();
3574      }
3575    };
3576  }
3577
3578  /**
3579   * Returns a {@link Predicate} for checking that table is enabled
3580   */
3581  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3582    return new ExplainingPredicate<IOException>() {
3583      @Override
3584      public String explainFailure() throws IOException {
3585        return explainTableState(tableName, TableState.State.ENABLED);
3586      }
3587
3588      @Override
3589      public boolean evaluate() throws IOException {
3590        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3591      }
3592    };
3593  }
3594
3595  /**
3596   * Returns a {@link Predicate} for checking that table is enabled
3597   */
3598  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3599    return new ExplainingPredicate<IOException>() {
3600      @Override
3601      public String explainFailure() throws IOException {
3602        return explainTableState(tableName, TableState.State.DISABLED);
3603      }
3604
3605      @Override
3606      public boolean evaluate() throws IOException {
3607        return getAdmin().isTableDisabled(tableName);
3608      }
3609    };
3610  }
3611
3612  /**
3613   * Returns a {@link Predicate} for checking that table is enabled
3614   */
3615  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3616    return new ExplainingPredicate<IOException>() {
3617      @Override
3618      public String explainFailure() throws IOException {
3619        return explainTableAvailability(tableName);
3620      }
3621
3622      @Override
3623      public boolean evaluate() throws IOException {
3624        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3625        if (tableAvailable) {
3626          try (Table table = getConnection().getTable(tableName)) {
3627            TableDescriptor htd = table.getDescriptor();
3628            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3629              .getAllRegionLocations()) {
3630              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3631                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3632                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3633              for (byte[] family : htd.getColumnFamilyNames()) {
3634                scan.addFamily(family);
3635              }
3636              try (ResultScanner scanner = table.getScanner(scan)) {
3637                scanner.next();
3638              }
3639            }
3640          }
3641        }
3642        return tableAvailable;
3643      }
3644    };
3645  }
3646
3647  /**
3648   * Wait until no regions in transition.
3649   * @param timeout How long to wait.
3650   */
3651  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3652    waitFor(timeout, predicateNoRegionsInTransition());
3653  }
3654
3655  /**
3656   * Wait until no regions in transition. (time limit 15min)
3657   */
3658  public void waitUntilNoRegionsInTransition() throws IOException {
3659    waitUntilNoRegionsInTransition(15 * 60000);
3660  }
3661
3662  /**
3663   * Wait until labels is ready in VisibilityLabelsCache.
3664   */
3665  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3666    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3667    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3668
3669      @Override
3670      public boolean evaluate() {
3671        for (String label : labels) {
3672          if (labelsCache.getLabelOrdinal(label) == 0) {
3673            return false;
3674          }
3675        }
3676        return true;
3677      }
3678
3679      @Override
3680      public String explainFailure() {
3681        for (String label : labels) {
3682          if (labelsCache.getLabelOrdinal(label) == 0) {
3683            return label + " is not available yet";
3684          }
3685        }
3686        return "";
3687      }
3688    });
3689  }
3690
3691  /**
3692   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3693   * available.
3694   * @return the list of column descriptors
3695   */
3696  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3697    return generateColumnDescriptors("");
3698  }
3699
3700  /**
3701   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3702   * available.
3703   * @param prefix family names prefix
3704   * @return the list of column descriptors
3705   */
3706  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3707    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3708    long familyId = 0;
3709    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3710      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3711        for (BloomType bloomType : BloomType.values()) {
3712          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3713          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3714            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3715          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3716          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3717          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3718          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3719          familyId++;
3720        }
3721      }
3722    }
3723    return columnFamilyDescriptors;
3724  }
3725
3726  /**
3727   * Get supported compression algorithms.
3728   * @return supported compression algorithms.
3729   */
3730  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3731    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3732    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3733    for (String algoName : allAlgos) {
3734      try {
3735        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3736        algo.getCompressor();
3737        supportedAlgos.add(algo);
3738      } catch (Throwable t) {
3739        // this algo is not available
3740      }
3741    }
3742    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3743  }
3744
3745  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3746    Scan scan = new Scan().withStartRow(row);
3747    scan.setReadType(ReadType.PREAD);
3748    scan.setCaching(1);
3749    scan.setReversed(true);
3750    scan.addFamily(family);
3751    try (RegionScanner scanner = r.getScanner(scan)) {
3752      List<Cell> cells = new ArrayList<>(1);
3753      scanner.next(cells);
3754      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3755        return null;
3756      }
3757      return Result.create(cells);
3758    }
3759  }
3760
3761  private boolean isTargetTable(final byte[] inRow, Cell c) {
3762    String inputRowString = Bytes.toString(inRow);
3763    int i = inputRowString.indexOf(HConstants.DELIMITER);
3764    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3765    int o = outputRowString.indexOf(HConstants.DELIMITER);
3766    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3767  }
3768
3769  /**
3770   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3771   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3772   * kerby KDC server and utility for using it,
3773   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3774   * less baggage. It came in in HBASE-5291.
3775   */
3776  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3777    Properties conf = MiniKdc.createConf();
3778    conf.put(MiniKdc.DEBUG, true);
3779    MiniKdc kdc = null;
3780    File dir = null;
3781    // There is time lag between selecting a port and trying to bind with it. It's possible that
3782    // another service captures the port in between which'll result in BindException.
3783    boolean bindException;
3784    int numTries = 0;
3785    do {
3786      try {
3787        bindException = false;
3788        dir = new File(getDataTestDir("kdc").toUri().getPath());
3789        kdc = new MiniKdc(conf, dir);
3790        kdc.start();
3791      } catch (BindException e) {
3792        FileUtils.deleteDirectory(dir); // clean directory
3793        numTries++;
3794        if (numTries == 3) {
3795          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3796          throw e;
3797        }
3798        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3799        bindException = true;
3800      }
3801    } while (bindException);
3802    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3803    return kdc;
3804  }
3805
3806  public int getNumHFiles(final TableName tableName, final byte[] family) {
3807    int numHFiles = 0;
3808    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3809      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3810    }
3811    return numHFiles;
3812  }
3813
3814  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3815    final byte[] family) {
3816    int numHFiles = 0;
3817    for (Region region : rs.getRegions(tableName)) {
3818      numHFiles += region.getStore(family).getStorefilesCount();
3819    }
3820    return numHFiles;
3821  }
3822
3823  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3824    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3825    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3826    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3827    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3828    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3829        it2 = rtdFamilies.iterator(); it.hasNext();) {
3830      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3831    }
3832  }
3833
3834  /**
3835   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3836   * invocations.
3837   */
3838  public static void await(final long sleepMillis, final BooleanSupplier condition)
3839    throws InterruptedException {
3840    try {
3841      while (!condition.getAsBoolean()) {
3842        Thread.sleep(sleepMillis);
3843      }
3844    } catch (RuntimeException e) {
3845      if (e.getCause() instanceof AssertionError) {
3846        throw (AssertionError) e.getCause();
3847      }
3848      throw e;
3849    }
3850  }
3851}