001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.UncheckedIOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.Modifier;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.BlockCache;
096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.ipc.RpcServerInterface;
099import org.apache.hadoop.hbase.logging.Log4jUtils;
100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
101import org.apache.hadoop.hbase.master.HMaster;
102import org.apache.hadoop.hbase.master.RegionState;
103import org.apache.hadoop.hbase.master.ServerManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
106import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
107import org.apache.hadoop.hbase.master.assignment.RegionStates;
108import org.apache.hadoop.hbase.mob.MobFileCache;
109import org.apache.hadoop.hbase.regionserver.BloomType;
110import org.apache.hadoop.hbase.regionserver.ChunkCreator;
111import org.apache.hadoop.hbase.regionserver.HRegion;
112import org.apache.hadoop.hbase.regionserver.HRegionServer;
113import org.apache.hadoop.hbase.regionserver.HStore;
114import org.apache.hadoop.hbase.regionserver.InternalScanner;
115import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
116import org.apache.hadoop.hbase.regionserver.Region;
117import org.apache.hadoop.hbase.regionserver.RegionScanner;
118import org.apache.hadoop.hbase.regionserver.RegionServerServices;
119import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
120import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
121import org.apache.hadoop.hbase.security.User;
122import org.apache.hadoop.hbase.security.UserProvider;
123import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
124import org.apache.hadoop.hbase.util.Bytes;
125import org.apache.hadoop.hbase.util.CommonFSUtils;
126import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
127import org.apache.hadoop.hbase.util.FSUtils;
128import org.apache.hadoop.hbase.util.JVM;
129import org.apache.hadoop.hbase.util.JVMClusterUtil;
130import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
131import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
132import org.apache.hadoop.hbase.util.Pair;
133import org.apache.hadoop.hbase.util.ReflectionUtils;
134import org.apache.hadoop.hbase.util.RegionSplitter;
135import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
136import org.apache.hadoop.hbase.util.RetryCounter;
137import org.apache.hadoop.hbase.util.Threads;
138import org.apache.hadoop.hbase.wal.WAL;
139import org.apache.hadoop.hbase.wal.WALFactory;
140import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
141import org.apache.hadoop.hbase.zookeeper.ZKConfig;
142import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
143import org.apache.hadoop.hdfs.DFSClient;
144import org.apache.hadoop.hdfs.DistributedFileSystem;
145import org.apache.hadoop.hdfs.MiniDFSCluster;
146import org.apache.hadoop.hdfs.server.datanode.DataNode;
147import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
148import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
149import org.apache.hadoop.mapred.JobConf;
150import org.apache.hadoop.mapred.MiniMRCluster;
151import org.apache.hadoop.mapred.TaskLog;
152import org.apache.hadoop.minikdc.MiniKdc;
153import org.apache.yetus.audience.InterfaceAudience;
154import org.apache.yetus.audience.InterfaceStability;
155import org.apache.zookeeper.WatchedEvent;
156import org.apache.zookeeper.ZooKeeper;
157import org.apache.zookeeper.ZooKeeper.States;
158
159import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
160
161import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
162
163/**
164 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
165 * functionality. Create an instance and keep it around testing HBase.
166 * <p/>
167 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
168 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
169 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
170 * cluster.
171 * <p/>
172 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
173 * <p/>
174 * It does not set logging levels.
175 * <p/>
176 * In the configuration properties, default values for master-info-port and region-server-port are
177 * overridden such that a random port will be assigned (thus avoiding port contention if another
178 * local HBase instance is already running).
179 * <p/>
180 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
181 * setting it to true.
182 */
183@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
184@InterfaceStability.Evolving
185public class HBaseTestingUtil extends HBaseZKTestingUtil {
186
187  /**
188   * System property key to get test directory value. Name is as it is because mini dfs has
189   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
190   * used in mini dfs.
191   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
192   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
193   */
194  @Deprecated
195  private static final String TEST_DIRECTORY_KEY = "test.build.data";
196
197  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
198  /**
199   * The default number of regions per regionserver when creating a pre-split table.
200   */
201  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
202
203  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
204  public static final boolean PRESPLIT_TEST_TABLE = true;
205
206  private MiniDFSCluster dfsCluster = null;
207  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
208
209  private volatile HBaseClusterInterface hbaseCluster = null;
210  private MiniMRCluster mrCluster = null;
211
212  /** If there is a mini cluster running for this testing utility instance. */
213  private volatile boolean miniClusterRunning;
214
215  private String hadoopLogDir;
216
217  /**
218   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
219   */
220  private Path dataTestDirOnTestFS = null;
221
222  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
223
224  /** Filesystem URI used for map-reduce mini-cluster setup */
225  private static String FS_URI;
226
227  /** This is for unit tests parameterized with a single boolean. */
228  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
229
230  /**
231   * Checks to see if a specific port is available.
232   * @param port the port number to check for availability
233   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
234   */
235  public static boolean available(int port) {
236    ServerSocket ss = null;
237    DatagramSocket ds = null;
238    try {
239      ss = new ServerSocket(port);
240      ss.setReuseAddress(true);
241      ds = new DatagramSocket(port);
242      ds.setReuseAddress(true);
243      return true;
244    } catch (IOException e) {
245      // Do nothing
246    } finally {
247      if (ds != null) {
248        ds.close();
249      }
250
251      if (ss != null) {
252        try {
253          ss.close();
254        } catch (IOException e) {
255          /* should not be thrown */
256        }
257      }
258    }
259
260    return false;
261  }
262
263  /**
264   * Create all combinations of Bloom filters and compression algorithms for testing.
265   */
266  private static List<Object[]> bloomAndCompressionCombinations() {
267    List<Object[]> configurations = new ArrayList<>();
268    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
269      for (BloomType bloomType : BloomType.values()) {
270        configurations.add(new Object[] { comprAlgo, bloomType });
271      }
272    }
273    return Collections.unmodifiableList(configurations);
274  }
275
276  /**
277   * Create combination of memstoreTS and tags
278   */
279  private static List<Object[]> memStoreTSAndTagsCombination() {
280    List<Object[]> configurations = new ArrayList<>();
281    configurations.add(new Object[] { false, false });
282    configurations.add(new Object[] { false, true });
283    configurations.add(new Object[] { true, false });
284    configurations.add(new Object[] { true, true });
285    return Collections.unmodifiableList(configurations);
286  }
287
288  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
289    List<Object[]> configurations = new ArrayList<>();
290    configurations.add(new Object[] { false, false, true });
291    configurations.add(new Object[] { false, false, false });
292    configurations.add(new Object[] { false, true, true });
293    configurations.add(new Object[] { false, true, false });
294    configurations.add(new Object[] { true, false, true });
295    configurations.add(new Object[] { true, false, false });
296    configurations.add(new Object[] { true, true, true });
297    configurations.add(new Object[] { true, true, false });
298    return Collections.unmodifiableList(configurations);
299  }
300
301  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
302    bloomAndCompressionCombinations();
303
304  /**
305   * <p>
306   * Create an HBaseTestingUtility using a default configuration.
307   * <p>
308   * Initially, all tmp files are written to a local test data directory. Once
309   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
310   * data will be written to the DFS directory instead.
311   */
312  public HBaseTestingUtil() {
313    this(HBaseConfiguration.create());
314  }
315
316  /**
317   * <p>
318   * Create an HBaseTestingUtility using a given configuration.
319   * <p>
320   * Initially, all tmp files are written to a local test data directory. Once
321   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
322   * data will be written to the DFS directory instead.
323   * @param conf The configuration to use for further operations
324   */
325  public HBaseTestingUtil(@Nullable Configuration conf) {
326    super(conf);
327
328    // a hbase checksum verification failure will cause unit tests to fail
329    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
330
331    // Save this for when setting default file:// breaks things
332    if (this.conf.get("fs.defaultFS") != null) {
333      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
334    }
335    if (this.conf.get(HConstants.HBASE_DIR) != null) {
336      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
337    }
338    // Every cluster is a local cluster until we start DFS
339    // Note that conf could be null, but this.conf will not be
340    String dataTestDir = getDataTestDir().toString();
341    this.conf.set("fs.defaultFS", "file:///");
342    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
343    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
344    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
345    // If the value for random ports isn't set set it to true, thus making
346    // tests opt-out for random port assignment
347    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
348      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
349  }
350
351  /**
352   * Close both the region {@code r} and it's underlying WAL. For use in tests.
353   */
354  public static void closeRegionAndWAL(final Region r) throws IOException {
355    closeRegionAndWAL((HRegion) r);
356  }
357
358  /**
359   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
360   */
361  public static void closeRegionAndWAL(final HRegion r) throws IOException {
362    if (r == null) return;
363    r.close();
364    if (r.getWAL() == null) return;
365    r.getWAL().close();
366  }
367
368  /**
369   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
370   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
371   * by the Configuration. If say, a Connection was being used against a cluster that had been
372   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
373   * Rather than use the return direct, its usually best to make a copy and use that. Do
374   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
375   * @return Instance of Configuration.
376   */
377  @Override
378  public Configuration getConfiguration() {
379    return super.getConfiguration();
380  }
381
382  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
383    this.hbaseCluster = hbaseCluster;
384  }
385
386  /**
387   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
388   * have many concurrent tests running if we need to. It needs to amend the
389   * {@link #TEST_DIRECTORY_KEY} System property, as it's what minidfscluster bases it data dir on.
390   * Moding a System property is not the way to do concurrent instances -- another instance could
391   * grab the temporary value unintentionally -- but not anything can do about it at moment; single
392   * instance only is how the minidfscluster works. We also create the underlying directory names
393   * for hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values in the
394   * conf, and as a system property for hadoop.tmp.dir (We do not create them!).
395   * @return The calculated data test build directory, if newly-created.
396   */
397  @Override
398  protected Path setupDataTestDir() {
399    Path testPath = super.setupDataTestDir();
400    if (null == testPath) {
401      return null;
402    }
403
404    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
405
406    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
407    // we want our own value to ensure uniqueness on the same machine
408    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
409
410    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
411    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
412    return testPath;
413  }
414
415  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
416
417    String sysValue = System.getProperty(propertyName);
418
419    if (sysValue != null) {
420      // There is already a value set. So we do nothing but hope
421      // that there will be no conflicts
422      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
423        + " so I do NOT create it in " + parent);
424      String confValue = conf.get(propertyName);
425      if (confValue != null && !confValue.endsWith(sysValue)) {
426        LOG.warn(propertyName + " property value differs in configuration and system: "
427          + "Configuration=" + confValue + " while System=" + sysValue
428          + " Erasing configuration value by system value.");
429      }
430      conf.set(propertyName, sysValue);
431    } else {
432      // Ok, it's not set, so we create it as a subdirectory
433      createSubDir(propertyName, parent, subDirName);
434      System.setProperty(propertyName, conf.get(propertyName));
435    }
436  }
437
438  /**
439   * @return Where to write test data on the test filesystem; Returns working directory for the test
440   *         filesystem by default
441   * @see #setupDataTestDirOnTestFS()
442   * @see #getTestFileSystem()
443   */
444  private Path getBaseTestDirOnTestFS() throws IOException {
445    FileSystem fs = getTestFileSystem();
446    return new Path(fs.getWorkingDirectory(), "test-data");
447  }
448
449  /**
450   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
451   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
452   * on it.
453   * @return a unique path in the test filesystem
454   */
455  public Path getDataTestDirOnTestFS() throws IOException {
456    if (dataTestDirOnTestFS == null) {
457      setupDataTestDirOnTestFS();
458    }
459
460    return dataTestDirOnTestFS;
461  }
462
463  /**
464   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
465   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
466   * on it.
467   * @return a unique path in the test filesystem
468   * @param subdirName name of the subdir to create under the base test dir
469   */
470  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
471    return new Path(getDataTestDirOnTestFS(), subdirName);
472  }
473
474  /**
475   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
476   * setup.
477   */
478  private void setupDataTestDirOnTestFS() throws IOException {
479    if (dataTestDirOnTestFS != null) {
480      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
481      return;
482    }
483    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
484  }
485
486  /**
487   * Sets up a new path in test filesystem to be used by tests.
488   */
489  private Path getNewDataTestDirOnTestFS() throws IOException {
490    // The file system can be either local, mini dfs, or if the configuration
491    // is supplied externally, it can be an external cluster FS. If it is a local
492    // file system, the tests should use getBaseTestDir, otherwise, we can use
493    // the working directory, and create a unique sub dir there
494    FileSystem fs = getTestFileSystem();
495    Path newDataTestDir;
496    String randomStr = getRandomUUID().toString();
497    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
498      newDataTestDir = new Path(getDataTestDir(), randomStr);
499      File dataTestDir = new File(newDataTestDir.toString());
500      if (deleteOnExit()) dataTestDir.deleteOnExit();
501    } else {
502      Path base = getBaseTestDirOnTestFS();
503      newDataTestDir = new Path(base, randomStr);
504      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
505    }
506    return newDataTestDir;
507  }
508
509  /**
510   * Cleans the test data directory on the test filesystem.
511   * @return True if we removed the test dirs
512   */
513  public boolean cleanupDataTestDirOnTestFS() throws IOException {
514    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
515    if (ret) {
516      dataTestDirOnTestFS = null;
517    }
518    return ret;
519  }
520
521  /**
522   * Cleans a subdirectory under the test data directory on the test filesystem.
523   * @return True if we removed child
524   */
525  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
526    Path cpath = getDataTestDirOnTestFS(subdirName);
527    return getTestFileSystem().delete(cpath, true);
528  }
529
530  /**
531   * Start a minidfscluster.
532   * @param servers How many DNs to start.
533   * @see #shutdownMiniDFSCluster()
534   * @return The mini dfs cluster created.
535   */
536  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
537    return startMiniDFSCluster(servers, null);
538  }
539
540  /**
541   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
542   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
543   * instances of the datanodes will have the same host name.
544   * @param hosts hostnames DNs to run on. n * @see #shutdownMiniDFSCluster()
545   * @return The mini dfs cluster created.
546   */
547  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
548    if (hosts != null && hosts.length != 0) {
549      return startMiniDFSCluster(hosts.length, hosts);
550    } else {
551      return startMiniDFSCluster(1, null);
552    }
553  }
554
555  /**
556   * Start a minidfscluster. Can only create one.
557   * @param servers How many DNs to start.
558   * @param hosts   hostnames DNs to run on. n * @see #shutdownMiniDFSCluster()
559   * @return The mini dfs cluster created.
560   */
561  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
562    return startMiniDFSCluster(servers, null, hosts);
563  }
564
565  private void setFs() throws IOException {
566    if (this.dfsCluster == null) {
567      LOG.info("Skipping setting fs because dfsCluster is null");
568      return;
569    }
570    FileSystem fs = this.dfsCluster.getFileSystem();
571    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
572
573    // re-enable this check with dfs
574    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
575  }
576
577  // Workaround to avoid IllegalThreadStateException
578  // See HBASE-27148 for more details
579  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
580
581    private volatile boolean stopped = false;
582
583    private final MiniDFSCluster cluster;
584
585    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
586      super("FsDatasetAsyncDiskServiceFixer");
587      setDaemon(true);
588      this.cluster = cluster;
589    }
590
591    @Override
592    public void run() {
593      while (!stopped) {
594        try {
595          Thread.sleep(30000);
596        } catch (InterruptedException e) {
597          Thread.currentThread().interrupt();
598          continue;
599        }
600        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
601        // timeout of the thread pool executor is 60 seconds by default.
602        try {
603          for (DataNode dn : cluster.getDataNodes()) {
604            FsDatasetSpi<?> dataset = dn.getFSDataset();
605            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
606            service.setAccessible(true);
607            Object asyncDiskService = service.get(dataset);
608            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
609            group.setAccessible(true);
610            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
611            if (threadGroup.isDaemon()) {
612              threadGroup.setDaemon(false);
613            }
614          }
615        } catch (Exception e) {
616          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
617        }
618      }
619    }
620
621    void shutdown() {
622      stopped = true;
623      interrupt();
624    }
625  }
626
627  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
628    throws Exception {
629    createDirsAndSetProperties();
630    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
631
632    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
633    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
634    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
635      "ERROR");
636    this.dfsCluster =
637      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
638    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
639    this.dfsClusterFixer.start();
640    // Set this just-started cluster as our filesystem.
641    setFs();
642
643    // Wait for the cluster to be totally up
644    this.dfsCluster.waitClusterUp();
645
646    // reset the test directory for test file system
647    dataTestDirOnTestFS = null;
648    String dataTestDir = getDataTestDir().toString();
649    conf.set(HConstants.HBASE_DIR, dataTestDir);
650    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
651
652    return this.dfsCluster;
653  }
654
655  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
656    createDirsAndSetProperties();
657    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
658    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
659    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
660      "ERROR");
661    dfsCluster =
662      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
663    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
664    this.dfsClusterFixer.start();
665    return dfsCluster;
666  }
667
668  /**
669   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
670   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
671   * the conf.
672   *
673   * <pre>
674   * Configuration conf = TEST_UTIL.getConfiguration();
675   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
676   *   Map.Entry&lt;String, String&gt; e = i.next();
677   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
678   * }
679   * </pre>
680   */
681  private void createDirsAndSetProperties() throws IOException {
682    setupClusterTestDir();
683    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
684    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
685    createDirAndSetProperty("test.cache.data");
686    createDirAndSetProperty("hadoop.tmp.dir");
687    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
688    createDirAndSetProperty("mapreduce.cluster.local.dir");
689    createDirAndSetProperty("mapreduce.cluster.temp.dir");
690    enableShortCircuit();
691
692    Path root = getDataTestDirOnTestFS("hadoop");
693    conf.set(MapreduceTestingShim.getMROutputDirProp(),
694      new Path(root, "mapred-output-dir").toString());
695    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
696    conf.set("mapreduce.jobtracker.staging.root.dir",
697      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
698    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
699    conf.set("yarn.app.mapreduce.am.staging-dir",
700      new Path(root, "mapreduce-am-staging-root-dir").toString());
701
702    // Frustrate yarn's and hdfs's attempts at writing /tmp.
703    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
704    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
705    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
706    createDirAndSetProperty("yarn.nodemanager.log-dirs");
707    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
708    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
709    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
710    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
711    createDirAndSetProperty("dfs.journalnode.edits.dir");
712    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
713    createDirAndSetProperty("nfs.dump.dir");
714    createDirAndSetProperty("java.io.tmpdir");
715    createDirAndSetProperty("dfs.journalnode.edits.dir");
716    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
717    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
718  }
719
720  /**
721   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
722   * Default to false.
723   */
724  public boolean isNewVersionBehaviorEnabled() {
725    final String propName = "hbase.tests.new.version.behavior";
726    String v = System.getProperty(propName);
727    if (v != null) {
728      return Boolean.parseBoolean(v);
729    }
730    return false;
731  }
732
733  /**
734   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
735   * allows to specify this parameter on the command line. If not set, default is true.
736   */
737  public boolean isReadShortCircuitOn() {
738    final String propName = "hbase.tests.use.shortcircuit.reads";
739    String readOnProp = System.getProperty(propName);
740    if (readOnProp != null) {
741      return Boolean.parseBoolean(readOnProp);
742    } else {
743      return conf.getBoolean(propName, false);
744    }
745  }
746
747  /**
748   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
749   * including skipping the hdfs checksum checks.
750   */
751  private void enableShortCircuit() {
752    if (isReadShortCircuitOn()) {
753      String curUser = System.getProperty("user.name");
754      LOG.info("read short circuit is ON for user " + curUser);
755      // read short circuit, for hdfs
756      conf.set("dfs.block.local-path-access.user", curUser);
757      // read short circuit, for hbase
758      conf.setBoolean("dfs.client.read.shortcircuit", true);
759      // Skip checking checksum, for the hdfs client and the datanode
760      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
761    } else {
762      LOG.info("read short circuit is OFF");
763    }
764  }
765
766  private String createDirAndSetProperty(final String property) {
767    return createDirAndSetProperty(property, property);
768  }
769
770  private String createDirAndSetProperty(final String relPath, String property) {
771    String path = getDataTestDir(relPath).toString();
772    System.setProperty(property, path);
773    conf.set(property, path);
774    new File(path).mkdirs();
775    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
776    return path;
777  }
778
779  /**
780   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
781   */
782  public void shutdownMiniDFSCluster() throws IOException {
783    if (this.dfsCluster != null) {
784      // The below throws an exception per dn, AsynchronousCloseException.
785      this.dfsCluster.shutdown();
786      dfsCluster = null;
787      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
788      // have a fixer
789      if (dfsClusterFixer != null) {
790        this.dfsClusterFixer.shutdown();
791        dfsClusterFixer = null;
792      }
793      dataTestDirOnTestFS = null;
794      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
795    }
796  }
797
798  /**
799   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
800   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
801   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
802   * @see #startMiniCluster(StartTestingClusterOption option)
803   * @see #shutdownMiniDFSCluster()
804   */
805  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
806    StartTestingClusterOption option = StartTestingClusterOption.builder()
807      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
808    return startMiniCluster(option);
809  }
810
811  /**
812   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
813   * value can be found in {@link StartTestingClusterOption.Builder}.
814   * @see #startMiniCluster(StartTestingClusterOption option)
815   * @see #shutdownMiniDFSCluster()
816   */
817  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
818    return startMiniCluster(StartTestingClusterOption.builder().build());
819  }
820
821  /**
822   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
823   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
824   * under System property test.build.data, to be cleaned up on exit.
825   * @see #shutdownMiniDFSCluster()
826   */
827  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
828    throws Exception {
829    LOG.info("Starting up minicluster with option: {}", option);
830
831    // If we already put up a cluster, fail.
832    if (miniClusterRunning) {
833      throw new IllegalStateException("A mini-cluster is already running");
834    }
835    miniClusterRunning = true;
836
837    setupClusterTestDir();
838    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
839
840    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
841    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
842    if (dfsCluster == null) {
843      LOG.info("STARTING DFS");
844      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
845    } else {
846      LOG.info("NOT STARTING DFS");
847    }
848
849    // Start up a zk cluster.
850    if (getZkCluster() == null) {
851      startMiniZKCluster(option.getNumZkServers());
852    }
853
854    // Start the MiniHBaseCluster
855    return startMiniHBaseCluster(option);
856  }
857
858  /**
859   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
860   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
861   * @return Reference to the hbase mini hbase cluster.
862   * @see #startMiniCluster(StartTestingClusterOption)
863   * @see #shutdownMiniHBaseCluster()
864   */
865  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
866    throws IOException, InterruptedException {
867    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
868    createRootDir(option.isCreateRootDir());
869    if (option.isCreateWALDir()) {
870      createWALRootDir();
871    }
872    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
873    // for tests that do not read hbase-defaults.xml
874    setHBaseFsTmpDir();
875
876    // These settings will make the server waits until this exact number of
877    // regions servers are connected.
878    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
879      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
880    }
881    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
882      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
883    }
884
885    // Avoid log flooded with chore execution time, see HBASE-24646 for more details.
886    Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
887
888    Configuration c = new Configuration(this.conf);
889    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
890      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
891      option.getMasterClass(), option.getRsClass());
892    // Populate the master address configuration from mini cluster configuration.
893    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
894    // Don't leave here till we've done a successful scan of the hbase:meta
895    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
896      ResultScanner s = t.getScanner(new Scan())) {
897      for (;;) {
898        if (s.next() == null) {
899          break;
900        }
901      }
902    }
903
904    getAdmin(); // create immediately the hbaseAdmin
905    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
906
907    return (SingleProcessHBaseCluster) hbaseCluster;
908  }
909
910  /**
911   * Starts up mini hbase cluster using default options. Default options can be found in
912   * {@link StartTestingClusterOption.Builder}.
913   * @see #startMiniHBaseCluster(StartTestingClusterOption)
914   * @see #shutdownMiniHBaseCluster()
915   */
916  public SingleProcessHBaseCluster startMiniHBaseCluster()
917    throws IOException, InterruptedException {
918    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
919  }
920
921  /**
922   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
923   * {@link #startMiniCluster()}. All other options will use default values, defined in
924   * {@link StartTestingClusterOption.Builder}.
925   * @param numMasters       Master node number.
926   * @param numRegionServers Number of region servers.
927   * @return The mini HBase cluster created.
928   * @see #shutdownMiniHBaseCluster()
929   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
930   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
931   * @see #startMiniHBaseCluster(StartTestingClusterOption)
932   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
933   */
934  @Deprecated
935  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
936    throws IOException, InterruptedException {
937    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
938      .numRegionServers(numRegionServers).build();
939    return startMiniHBaseCluster(option);
940  }
941
942  /**
943   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
944   * {@link #startMiniCluster()}. All other options will use default values, defined in
945   * {@link StartTestingClusterOption.Builder}.
946   * @param numMasters       Master node number.
947   * @param numRegionServers Number of region servers.
948   * @param rsPorts          Ports that RegionServer should use.
949   * @return The mini HBase cluster created.
950   * @see #shutdownMiniHBaseCluster()
951   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
952   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
953   * @see #startMiniHBaseCluster(StartTestingClusterOption)
954   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
955   */
956  @Deprecated
957  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
958    List<Integer> rsPorts) throws IOException, InterruptedException {
959    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
960      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
961    return startMiniHBaseCluster(option);
962  }
963
964  /**
965   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
966   * {@link #startMiniCluster()}. All other options will use default values, defined in
967   * {@link StartTestingClusterOption.Builder}.
968   * @param numMasters       Master node number.
969   * @param numRegionServers Number of region servers.
970   * @param rsPorts          Ports that RegionServer should use.
971   * @param masterClass      The class to use as HMaster, or null for default.
972   * @param rsClass          The class to use as HRegionServer, or null for default.
973   * @param createRootDir    Whether to create a new root or data directory path.
974   * @param createWALDir     Whether to create a new WAL directory.
975   * @return The mini HBase cluster created.
976   * @see #shutdownMiniHBaseCluster()
977   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
978   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
979   * @see #startMiniHBaseCluster(StartTestingClusterOption)
980   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
981   */
982  @Deprecated
983  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
984    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
985    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
986    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
987    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
988      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
989      .createRootDir(createRootDir).createWALDir(createWALDir).build();
990    return startMiniHBaseCluster(option);
991  }
992
993  /**
994   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
995   * want to keep dfs/zk up and just stop/start hbase.
996   * @param servers number of region servers
997   */
998  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
999    this.restartHBaseCluster(servers, null);
1000  }
1001
1002  public void restartHBaseCluster(int servers, List<Integer> ports)
1003    throws IOException, InterruptedException {
1004    StartTestingClusterOption option =
1005      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1006    restartHBaseCluster(option);
1007    invalidateConnection();
1008  }
1009
1010  public void restartHBaseCluster(StartTestingClusterOption option)
1011    throws IOException, InterruptedException {
1012    closeConnection();
1013    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1014      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1015      option.getMasterClass(), option.getRsClass());
1016    // Don't leave here till we've done a successful scan of the hbase:meta
1017    Connection conn = ConnectionFactory.createConnection(this.conf);
1018    Table t = conn.getTable(TableName.META_TABLE_NAME);
1019    ResultScanner s = t.getScanner(new Scan());
1020    while (s.next() != null) {
1021      // do nothing
1022    }
1023    LOG.info("HBase has been restarted");
1024    s.close();
1025    t.close();
1026    conn.close();
1027  }
1028
1029  /**
1030   * Returns current mini hbase cluster. Only has something in it after a call to
1031   * {@link #startMiniCluster()}.
1032   * @see #startMiniCluster()
1033   */
1034  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1035    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1036      return (SingleProcessHBaseCluster) this.hbaseCluster;
1037    }
1038    throw new RuntimeException(
1039      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1040  }
1041
1042  /**
1043   * Stops mini hbase, zk, and hdfs clusters.
1044   * @see #startMiniCluster(int)
1045   */
1046  public void shutdownMiniCluster() throws IOException {
1047    LOG.info("Shutting down minicluster");
1048    shutdownMiniHBaseCluster();
1049    shutdownMiniDFSCluster();
1050    shutdownMiniZKCluster();
1051
1052    cleanupTestDir();
1053    miniClusterRunning = false;
1054    LOG.info("Minicluster is down");
1055  }
1056
1057  /**
1058   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1059   * @throws java.io.IOException in case command is unsuccessful
1060   */
1061  public void shutdownMiniHBaseCluster() throws IOException {
1062    cleanup();
1063    if (this.hbaseCluster != null) {
1064      this.hbaseCluster.shutdown();
1065      // Wait till hbase is down before going on to shutdown zk.
1066      this.hbaseCluster.waitUntilShutDown();
1067      this.hbaseCluster = null;
1068    }
1069    if (zooKeeperWatcher != null) {
1070      zooKeeperWatcher.close();
1071      zooKeeperWatcher = null;
1072    }
1073  }
1074
1075  /**
1076   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1077   * @throws java.io.IOException throws in case command is unsuccessful
1078   */
1079  public void killMiniHBaseCluster() throws IOException {
1080    cleanup();
1081    if (this.hbaseCluster != null) {
1082      getMiniHBaseCluster().killAll();
1083      this.hbaseCluster = null;
1084    }
1085    if (zooKeeperWatcher != null) {
1086      zooKeeperWatcher.close();
1087      zooKeeperWatcher = null;
1088    }
1089  }
1090
1091  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1092  private void cleanup() throws IOException {
1093    closeConnection();
1094    // unset the configuration for MIN and MAX RS to start
1095    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1096    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1097  }
1098
1099  /**
1100   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1101   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1102   * If false, previous path is used. Note: this does not cause the root dir to be created.
1103   * @return Fully qualified path for the default hbase root dir n
1104   */
1105  public Path getDefaultRootDirPath(boolean create) throws IOException {
1106    if (!create) {
1107      return getDataTestDirOnTestFS();
1108    } else {
1109      return getNewDataTestDirOnTestFS();
1110    }
1111  }
1112
1113  /**
1114   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1115   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1116   * @return Fully qualified path for the default hbase root dir n
1117   */
1118  public Path getDefaultRootDirPath() throws IOException {
1119    return getDefaultRootDirPath(false);
1120  }
1121
1122  /**
1123   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1124   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1125   * startup. You'd only use this method if you were doing manual operation.
1126   * @param create This flag decides whether to get a new root or data directory path or not, if it
1127   *               has been fetched already. Note : Directory will be made irrespective of whether
1128   *               path has been fetched or not. If directory already exists, it will be overwritten
1129   * @return Fully qualified path to hbase root dir n
1130   */
1131  public Path createRootDir(boolean create) throws IOException {
1132    FileSystem fs = FileSystem.get(this.conf);
1133    Path hbaseRootdir = getDefaultRootDirPath(create);
1134    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1135    fs.mkdirs(hbaseRootdir);
1136    FSUtils.setVersion(fs, hbaseRootdir);
1137    return hbaseRootdir;
1138  }
1139
1140  /**
1141   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1142   * flag is false.
1143   * @return Fully qualified path to hbase root dir n
1144   */
1145  public Path createRootDir() throws IOException {
1146    return createRootDir(false);
1147  }
1148
1149  /**
1150   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1151   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1152   * this method if you were doing manual operation.
1153   * @return Fully qualified path to hbase root dir n
1154   */
1155  public Path createWALRootDir() throws IOException {
1156    FileSystem fs = FileSystem.get(this.conf);
1157    Path walDir = getNewDataTestDirOnTestFS();
1158    CommonFSUtils.setWALRootDir(this.conf, walDir);
1159    fs.mkdirs(walDir);
1160    return walDir;
1161  }
1162
1163  private void setHBaseFsTmpDir() throws IOException {
1164    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1165    if (hbaseFsTmpDirInString == null) {
1166      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1167      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1168    } else {
1169      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1170    }
1171  }
1172
1173  /**
1174   * Flushes all caches in the mini hbase cluster n
1175   */
1176  public void flush() throws IOException {
1177    getMiniHBaseCluster().flushcache();
1178  }
1179
1180  /**
1181   * Flushes all caches in the mini hbase cluster n
1182   */
1183  public void flush(TableName tableName) throws IOException {
1184    getMiniHBaseCluster().flushcache(tableName);
1185  }
1186
1187  /**
1188   * Compact all regions in the mini hbase cluster n
1189   */
1190  public void compact(boolean major) throws IOException {
1191    getMiniHBaseCluster().compact(major);
1192  }
1193
1194  /**
1195   * Compact all of a table's reagion in the mini hbase cluster n
1196   */
1197  public void compact(TableName tableName, boolean major) throws IOException {
1198    getMiniHBaseCluster().compact(tableName, major);
1199  }
1200
1201  /**
1202   * Create a table. nn * @return A Table instance for the created table. n
1203   */
1204  public Table createTable(TableName tableName, String family) throws IOException {
1205    return createTable(tableName, new String[] { family });
1206  }
1207
1208  /**
1209   * Create a table. nn * @return A Table instance for the created table. n
1210   */
1211  public Table createTable(TableName tableName, String[] families) throws IOException {
1212    List<byte[]> fams = new ArrayList<>(families.length);
1213    for (String family : families) {
1214      fams.add(Bytes.toBytes(family));
1215    }
1216    return createTable(tableName, fams.toArray(new byte[0][]));
1217  }
1218
1219  /**
1220   * Create a table. nn * @return A Table instance for the created table. n
1221   */
1222  public Table createTable(TableName tableName, byte[] family) throws IOException {
1223    return createTable(tableName, new byte[][] { family });
1224  }
1225
1226  /**
1227   * Create a table with multiple regions. nnn * @return A Table instance for the created table. n
1228   */
1229  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1230    throws IOException {
1231    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1232    byte[] startKey = Bytes.toBytes("aaaaa");
1233    byte[] endKey = Bytes.toBytes("zzzzz");
1234    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1235
1236    return createTable(tableName, new byte[][] { family }, splitKeys);
1237  }
1238
1239  /**
1240   * Create a table. nn * @return A Table instance for the created table. n
1241   */
1242  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1243    return createTable(tableName, families, (byte[][]) null);
1244  }
1245
1246  /**
1247   * Create a table with multiple regions. nn * @return A Table instance for the created table. n
1248   */
1249  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1250    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1251  }
1252
1253  /**
1254   * Create a table with multiple regions. n * @param replicaCount replica count. n * @return A
1255   * Table instance for the created table. n
1256   */
1257  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1258    throws IOException {
1259    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1260  }
1261
1262  /**
1263   * Create a table. nnn * @return A Table instance for the created table. n
1264   */
1265  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1266    throws IOException {
1267    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1268  }
1269
1270  /**
1271   * Create a table.
1272   * @param tableName    the table name
1273   * @param families     the families
1274   * @param splitKeys    the splitkeys
1275   * @param replicaCount the region replica count
1276   * @return A Table instance for the created table.
1277   * @throws IOException throws IOException
1278   */
1279  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1280    int replicaCount) throws IOException {
1281    return createTable(tableName, families, splitKeys, replicaCount,
1282      new Configuration(getConfiguration()));
1283  }
1284
1285  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1286    byte[] endKey, int numRegions) throws IOException {
1287    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1288
1289    getAdmin().createTable(desc, startKey, endKey, numRegions);
1290    // HBaseAdmin only waits for regions to appear in hbase:meta we
1291    // should wait until they are assigned
1292    waitUntilAllRegionsAssigned(tableName);
1293    return getConnection().getTable(tableName);
1294  }
1295
1296  /**
1297   * Create a table.
1298   * @param c Configuration to use
1299   * @return A Table instance for the created table.
1300   */
1301  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1302    throws IOException {
1303    return createTable(htd, families, null, c);
1304  }
1305
1306  /**
1307   * Create a table.
1308   * @param htd       table descriptor
1309   * @param families  array of column families
1310   * @param splitKeys array of split keys
1311   * @param c         Configuration to use
1312   * @return A Table instance for the created table.
1313   * @throws IOException if getAdmin or createTable fails
1314   */
1315  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1316    Configuration c) throws IOException {
1317    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1318    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1319    // on is interfering.
1320    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1321  }
1322
1323  /**
1324   * Create a table.
1325   * @param htd       table descriptor
1326   * @param families  array of column families
1327   * @param splitKeys array of split keys
1328   * @param type      Bloom type
1329   * @param blockSize block size
1330   * @param c         Configuration to use
1331   * @return A Table instance for the created table.
1332   * @throws IOException if getAdmin or createTable fails
1333   */
1334
1335  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1336    BloomType type, int blockSize, Configuration c) throws IOException {
1337    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1338    for (byte[] family : families) {
1339      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1340        .setBloomFilterType(type).setBlocksize(blockSize);
1341      if (isNewVersionBehaviorEnabled()) {
1342        cfdb.setNewVersionBehavior(true);
1343      }
1344      builder.setColumnFamily(cfdb.build());
1345    }
1346    TableDescriptor td = builder.build();
1347    if (splitKeys != null) {
1348      getAdmin().createTable(td, splitKeys);
1349    } else {
1350      getAdmin().createTable(td);
1351    }
1352    // HBaseAdmin only waits for regions to appear in hbase:meta
1353    // we should wait until they are assigned
1354    waitUntilAllRegionsAssigned(td.getTableName());
1355    return getConnection().getTable(td.getTableName());
1356  }
1357
1358  /**
1359   * Create a table.
1360   * @param htd       table descriptor
1361   * @param splitRows array of split keys
1362   * @return A Table instance for the created table. n
1363   */
1364  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1365    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1366    if (isNewVersionBehaviorEnabled()) {
1367      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1368        builder.setColumnFamily(
1369          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1370      }
1371    }
1372    if (splitRows != null) {
1373      getAdmin().createTable(builder.build(), splitRows);
1374    } else {
1375      getAdmin().createTable(builder.build());
1376    }
1377    // HBaseAdmin only waits for regions to appear in hbase:meta
1378    // we should wait until they are assigned
1379    waitUntilAllRegionsAssigned(htd.getTableName());
1380    return getConnection().getTable(htd.getTableName());
1381  }
1382
1383  /**
1384   * Create a table.
1385   * @param tableName    the table name
1386   * @param families     the families
1387   * @param splitKeys    the split keys
1388   * @param replicaCount the replica count
1389   * @param c            Configuration to use
1390   * @return A Table instance for the created table.
1391   */
1392  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1393    int replicaCount, final Configuration c) throws IOException {
1394    TableDescriptor htd =
1395      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1396    return createTable(htd, families, splitKeys, c);
1397  }
1398
1399  /**
1400   * Create a table.
1401   * @return A Table instance for the created table.
1402   */
1403  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1404    return createTable(tableName, new byte[][] { family }, numVersions);
1405  }
1406
1407  /**
1408   * Create a table.
1409   * @return A Table instance for the created table.
1410   */
1411  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1412    throws IOException {
1413    return createTable(tableName, families, numVersions, (byte[][]) null);
1414  }
1415
1416  /**
1417   * Create a table.
1418   * @return A Table instance for the created table.
1419   */
1420  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1421    byte[][] splitKeys) throws IOException {
1422    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1423    for (byte[] family : families) {
1424      ColumnFamilyDescriptorBuilder cfBuilder =
1425        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1426      if (isNewVersionBehaviorEnabled()) {
1427        cfBuilder.setNewVersionBehavior(true);
1428      }
1429      builder.setColumnFamily(cfBuilder.build());
1430    }
1431    if (splitKeys != null) {
1432      getAdmin().createTable(builder.build(), splitKeys);
1433    } else {
1434      getAdmin().createTable(builder.build());
1435    }
1436    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1437    // assigned
1438    waitUntilAllRegionsAssigned(tableName);
1439    return getConnection().getTable(tableName);
1440  }
1441
1442  /**
1443   * Create a table with multiple regions.
1444   * @return A Table instance for the created table.
1445   */
1446  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1447    throws IOException {
1448    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1449  }
1450
1451  /**
1452   * Create a table.
1453   * @return A Table instance for the created table.
1454   */
1455  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1456    throws IOException {
1457    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1458    for (byte[] family : families) {
1459      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1460        .setMaxVersions(numVersions).setBlocksize(blockSize);
1461      if (isNewVersionBehaviorEnabled()) {
1462        cfBuilder.setNewVersionBehavior(true);
1463      }
1464      builder.setColumnFamily(cfBuilder.build());
1465    }
1466    getAdmin().createTable(builder.build());
1467    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1468    // assigned
1469    waitUntilAllRegionsAssigned(tableName);
1470    return getConnection().getTable(tableName);
1471  }
1472
1473  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1474    String cpName) throws IOException {
1475    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1476    for (byte[] family : families) {
1477      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1478        .setMaxVersions(numVersions).setBlocksize(blockSize);
1479      if (isNewVersionBehaviorEnabled()) {
1480        cfBuilder.setNewVersionBehavior(true);
1481      }
1482      builder.setColumnFamily(cfBuilder.build());
1483    }
1484    if (cpName != null) {
1485      builder.setCoprocessor(cpName);
1486    }
1487    getAdmin().createTable(builder.build());
1488    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1489    // assigned
1490    waitUntilAllRegionsAssigned(tableName);
1491    return getConnection().getTable(tableName);
1492  }
1493
1494  /**
1495   * Create a table.
1496   * @return A Table instance for the created table.
1497   */
1498  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1499    throws IOException {
1500    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1501    int i = 0;
1502    for (byte[] family : families) {
1503      ColumnFamilyDescriptorBuilder cfBuilder =
1504        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1505      if (isNewVersionBehaviorEnabled()) {
1506        cfBuilder.setNewVersionBehavior(true);
1507      }
1508      builder.setColumnFamily(cfBuilder.build());
1509      i++;
1510    }
1511    getAdmin().createTable(builder.build());
1512    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1513    // assigned
1514    waitUntilAllRegionsAssigned(tableName);
1515    return getConnection().getTable(tableName);
1516  }
1517
1518  /**
1519   * Create a table.
1520   * @return A Table instance for the created table.
1521   */
1522  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1523    throws IOException {
1524    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1525    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1526    if (isNewVersionBehaviorEnabled()) {
1527      cfBuilder.setNewVersionBehavior(true);
1528    }
1529    builder.setColumnFamily(cfBuilder.build());
1530    getAdmin().createTable(builder.build(), splitRows);
1531    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1532    // assigned
1533    waitUntilAllRegionsAssigned(tableName);
1534    return getConnection().getTable(tableName);
1535  }
1536
1537  /**
1538   * Create a table with multiple regions.
1539   * @return A Table instance for the created table.
1540   */
1541  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1542    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1543  }
1544
1545  /**
1546   * Set the number of Region replicas.
1547   */
1548  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1549    throws IOException, InterruptedException {
1550    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1551      .setRegionReplication(replicaCount).build();
1552    admin.modifyTable(desc);
1553  }
1554
1555  /**
1556   * Set the number of Region replicas.
1557   */
1558  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1559    throws ExecutionException, IOException, InterruptedException {
1560    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1561      .setRegionReplication(replicaCount).build();
1562    admin.modifyTable(desc).get();
1563  }
1564
1565  /**
1566   * Drop an existing table
1567   * @param tableName existing table
1568   */
1569  public void deleteTable(TableName tableName) throws IOException {
1570    try {
1571      getAdmin().disableTable(tableName);
1572    } catch (TableNotEnabledException e) {
1573      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1574    }
1575    getAdmin().deleteTable(tableName);
1576  }
1577
1578  /**
1579   * Drop an existing table
1580   * @param tableName existing table
1581   */
1582  public void deleteTableIfAny(TableName tableName) throws IOException {
1583    try {
1584      deleteTable(tableName);
1585    } catch (TableNotFoundException e) {
1586      // ignore
1587    }
1588  }
1589
1590  // ==========================================================================
1591  // Canned table and table descriptor creation
1592
1593  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1594  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1595  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1596  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1597  private static final int MAXVERSIONS = 3;
1598
1599  public static final char FIRST_CHAR = 'a';
1600  public static final char LAST_CHAR = 'z';
1601  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1602  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1603
1604  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1605    return createModifyableTableDescriptor(TableName.valueOf(name),
1606      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1607      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1608  }
1609
1610  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1611    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1612    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1613    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1614      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1615        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1616        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1617      if (isNewVersionBehaviorEnabled()) {
1618        cfBuilder.setNewVersionBehavior(true);
1619      }
1620      builder.setColumnFamily(cfBuilder.build());
1621    }
1622    return builder.build();
1623  }
1624
1625  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1626    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1627    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1628    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1629      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1630        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1631        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1632      if (isNewVersionBehaviorEnabled()) {
1633        cfBuilder.setNewVersionBehavior(true);
1634      }
1635      builder.setColumnFamily(cfBuilder.build());
1636    }
1637    return builder;
1638  }
1639
1640  /**
1641   * Create a table of name <code>name</code>.
1642   * @param name Name to give table.
1643   * @return Column descriptor.
1644   */
1645  public TableDescriptor createTableDescriptor(final TableName name) {
1646    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1647      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1648  }
1649
1650  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1651    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1652  }
1653
1654  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1655    int maxVersions) {
1656    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1657    for (byte[] family : families) {
1658      ColumnFamilyDescriptorBuilder cfBuilder =
1659        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1660      if (isNewVersionBehaviorEnabled()) {
1661        cfBuilder.setNewVersionBehavior(true);
1662      }
1663      builder.setColumnFamily(cfBuilder.build());
1664    }
1665    return builder.build();
1666  }
1667
1668  /**
1669   * Create an HRegion that writes to the local tmp dirs
1670   * @param desc     a table descriptor indicating which table the region belongs to
1671   * @param startKey the start boundary of the region
1672   * @param endKey   the end boundary of the region
1673   * @return a region that writes to local dir for testing
1674   */
1675  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1676    throws IOException {
1677    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1678      .setEndKey(endKey).build();
1679    return createLocalHRegion(hri, desc);
1680  }
1681
1682  /**
1683   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1684   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1685   */
1686  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1687    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1688  }
1689
1690  /**
1691   * Create an HRegion that writes to the local tmp dirs with specified wal
1692   * @param info regioninfo
1693   * @param conf configuration
1694   * @param desc table descriptor
1695   * @param wal  wal for this region.
1696   * @return created hregion n
1697   */
1698  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1699    WAL wal) throws IOException {
1700    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1701  }
1702
1703  /**
1704   * nnnnn * @return A region on which you must call
1705   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when done. n
1706   */
1707  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1708    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1709    throws IOException {
1710    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1711      durability, wal, null, families);
1712  }
1713
1714  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1715    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1716    boolean[] compactedMemStore, byte[]... families) throws IOException {
1717    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1718    builder.setReadOnly(isReadOnly);
1719    int i = 0;
1720    for (byte[] family : families) {
1721      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1722      if (compactedMemStore != null && i < compactedMemStore.length) {
1723        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1724      } else {
1725        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1726
1727      }
1728      i++;
1729      // Set default to be three versions.
1730      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1731      builder.setColumnFamily(cfBuilder.build());
1732    }
1733    builder.setDurability(durability);
1734    RegionInfo info =
1735      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1736    return createLocalHRegion(info, conf, builder.build(), wal);
1737  }
1738
1739  //
1740  // ==========================================================================
1741
1742  /**
1743   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1744   * read.
1745   * @param tableName existing table
1746   * @return HTable to that new table n
1747   */
1748  public Table deleteTableData(TableName tableName) throws IOException {
1749    Table table = getConnection().getTable(tableName);
1750    Scan scan = new Scan();
1751    ResultScanner resScan = table.getScanner(scan);
1752    for (Result res : resScan) {
1753      Delete del = new Delete(res.getRow());
1754      table.delete(del);
1755    }
1756    resScan = table.getScanner(scan);
1757    resScan.close();
1758    return table;
1759  }
1760
1761  /**
1762   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1763   * table.
1764   * @param tableName       table which must exist.
1765   * @param preserveRegions keep the existing split points
1766   * @return HTable for the new table
1767   */
1768  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1769    throws IOException {
1770    Admin admin = getAdmin();
1771    if (!admin.isTableDisabled(tableName)) {
1772      admin.disableTable(tableName);
1773    }
1774    admin.truncateTable(tableName, preserveRegions);
1775    return getConnection().getTable(tableName);
1776  }
1777
1778  /**
1779   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1780   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1781   * preserve regions of existing table.
1782   * @param tableName table which must exist.
1783   * @return HTable for the new table
1784   */
1785  public Table truncateTable(final TableName tableName) throws IOException {
1786    return truncateTable(tableName, false);
1787  }
1788
1789  /**
1790   * Load table with rows from 'aaa' to 'zzz'.
1791   * @param t Table
1792   * @param f Family
1793   * @return Count of rows loaded. n
1794   */
1795  public int loadTable(final Table t, final byte[] f) throws IOException {
1796    return loadTable(t, new byte[][] { f });
1797  }
1798
1799  /**
1800   * Load table with rows from 'aaa' to 'zzz'.
1801   * @param t Table
1802   * @param f Family
1803   * @return Count of rows loaded. n
1804   */
1805  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1806    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1807  }
1808
1809  /**
1810   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1811   * @param t Table
1812   * @param f Array of Families to load
1813   * @return Count of rows loaded. n
1814   */
1815  public int loadTable(final Table t, final byte[][] f) throws IOException {
1816    return loadTable(t, f, null);
1817  }
1818
1819  /**
1820   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1821   * @param t     Table
1822   * @param f     Array of Families to load
1823   * @param value the values of the cells. If null is passed, the row key is used as value
1824   * @return Count of rows loaded. n
1825   */
1826  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1827    return loadTable(t, f, value, true);
1828  }
1829
1830  /**
1831   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1832   * @param t     Table
1833   * @param f     Array of Families to load
1834   * @param value the values of the cells. If null is passed, the row key is used as value
1835   * @return Count of rows loaded. n
1836   */
1837  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1838    throws IOException {
1839    List<Put> puts = new ArrayList<>();
1840    for (byte[] row : HBaseTestingUtil.ROWS) {
1841      Put put = new Put(row);
1842      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1843      for (int i = 0; i < f.length; i++) {
1844        byte[] value1 = value != null ? value : row;
1845        put.addColumn(f[i], f[i], value1);
1846      }
1847      puts.add(put);
1848    }
1849    t.put(puts);
1850    return puts.size();
1851  }
1852
1853  /**
1854   * A tracker for tracking and validating table rows generated with
1855   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1856   */
1857  public static class SeenRowTracker {
1858    int dim = 'z' - 'a' + 1;
1859    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1860    byte[] startRow;
1861    byte[] stopRow;
1862
1863    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1864      this.startRow = startRow;
1865      this.stopRow = stopRow;
1866    }
1867
1868    void reset() {
1869      for (byte[] row : ROWS) {
1870        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1871      }
1872    }
1873
1874    int i(byte b) {
1875      return b - 'a';
1876    }
1877
1878    public void addRow(byte[] row) {
1879      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1880    }
1881
1882    /**
1883     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1884     * rows none
1885     */
1886    public void validate() {
1887      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1888        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1889          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1890            int count = seenRows[i(b1)][i(b2)][i(b3)];
1891            int expectedCount = 0;
1892            if (
1893              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1894                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1895            ) {
1896              expectedCount = 1;
1897            }
1898            if (count != expectedCount) {
1899              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1900              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1901                + "instead of " + expectedCount);
1902            }
1903          }
1904        }
1905      }
1906    }
1907  }
1908
1909  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1910    return loadRegion(r, f, false);
1911  }
1912
1913  public int loadRegion(final Region r, final byte[] f) throws IOException {
1914    return loadRegion((HRegion) r, f);
1915  }
1916
1917  /**
1918   * Load region with rows from 'aaa' to 'zzz'.
1919   * @param r     Region
1920   * @param f     Family
1921   * @param flush flush the cache if true
1922   * @return Count of rows loaded. n
1923   */
1924  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1925    byte[] k = new byte[3];
1926    int rowCount = 0;
1927    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1928      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1929        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1930          k[0] = b1;
1931          k[1] = b2;
1932          k[2] = b3;
1933          Put put = new Put(k);
1934          put.setDurability(Durability.SKIP_WAL);
1935          put.addColumn(f, null, k);
1936          if (r.getWAL() == null) {
1937            put.setDurability(Durability.SKIP_WAL);
1938          }
1939          int preRowCount = rowCount;
1940          int pause = 10;
1941          int maxPause = 1000;
1942          while (rowCount == preRowCount) {
1943            try {
1944              r.put(put);
1945              rowCount++;
1946            } catch (RegionTooBusyException e) {
1947              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1948              Threads.sleep(pause);
1949            }
1950          }
1951        }
1952      }
1953      if (flush) {
1954        r.flush(true);
1955      }
1956    }
1957    return rowCount;
1958  }
1959
1960  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1961    throws IOException {
1962    for (int i = startRow; i < endRow; i++) {
1963      byte[] data = Bytes.toBytes(String.valueOf(i));
1964      Put put = new Put(data);
1965      put.addColumn(f, null, data);
1966      t.put(put);
1967    }
1968  }
1969
1970  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1971    throws IOException {
1972    for (int i = 0; i < totalRows; i++) {
1973      byte[] row = new byte[rowSize];
1974      Bytes.random(row);
1975      Put put = new Put(row);
1976      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1977      t.put(put);
1978    }
1979  }
1980
1981  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
1982    int replicaId) throws IOException {
1983    for (int i = startRow; i < endRow; i++) {
1984      String failMsg = "Failed verification of row :" + i;
1985      byte[] data = Bytes.toBytes(String.valueOf(i));
1986      Get get = new Get(data);
1987      get.setReplicaId(replicaId);
1988      get.setConsistency(Consistency.TIMELINE);
1989      Result result = table.get(get);
1990      assertTrue(failMsg, result.containsColumn(f, null));
1991      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
1992      Cell cell = result.getColumnLatestCell(f, null);
1993      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
1994        cell.getValueOffset(), cell.getValueLength()));
1995    }
1996  }
1997
1998  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
1999    throws IOException {
2000    verifyNumericRows((HRegion) region, f, startRow, endRow);
2001  }
2002
2003  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2004    throws IOException {
2005    verifyNumericRows(region, f, startRow, endRow, true);
2006  }
2007
2008  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2009    final boolean present) throws IOException {
2010    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2011  }
2012
2013  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2014    final boolean present) throws IOException {
2015    for (int i = startRow; i < endRow; i++) {
2016      String failMsg = "Failed verification of row :" + i;
2017      byte[] data = Bytes.toBytes(String.valueOf(i));
2018      Result result = region.get(new Get(data));
2019
2020      boolean hasResult = result != null && !result.isEmpty();
2021      assertEquals(failMsg + result, present, hasResult);
2022      if (!present) continue;
2023
2024      assertTrue(failMsg, result.containsColumn(f, null));
2025      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2026      Cell cell = result.getColumnLatestCell(f, null);
2027      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2028        cell.getValueOffset(), cell.getValueLength()));
2029    }
2030  }
2031
2032  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2033    throws IOException {
2034    for (int i = startRow; i < endRow; i++) {
2035      byte[] data = Bytes.toBytes(String.valueOf(i));
2036      Delete delete = new Delete(data);
2037      delete.addFamily(f);
2038      t.delete(delete);
2039    }
2040  }
2041
2042  /**
2043   * Return the number of rows in the given table.
2044   * @param table to count rows
2045   * @return count of rows
2046   */
2047  public static int countRows(final Table table) throws IOException {
2048    return countRows(table, new Scan());
2049  }
2050
2051  public static int countRows(final Table table, final Scan scan) throws IOException {
2052    try (ResultScanner results = table.getScanner(scan)) {
2053      int count = 0;
2054      while (results.next() != null) {
2055        count++;
2056      }
2057      return count;
2058    }
2059  }
2060
2061  public static int countRows(final Table table, final byte[]... families) throws IOException {
2062    Scan scan = new Scan();
2063    for (byte[] family : families) {
2064      scan.addFamily(family);
2065    }
2066    return countRows(table, scan);
2067  }
2068
2069  /**
2070   * Return the number of rows in the given table.
2071   */
2072  public int countRows(final TableName tableName) throws IOException {
2073    try (Table table = getConnection().getTable(tableName)) {
2074      return countRows(table);
2075    }
2076  }
2077
2078  public static int countRows(final Region region) throws IOException {
2079    return countRows(region, new Scan());
2080  }
2081
2082  public static int countRows(final Region region, final Scan scan) throws IOException {
2083    try (InternalScanner scanner = region.getScanner(scan)) {
2084      return countRows(scanner);
2085    }
2086  }
2087
2088  public static int countRows(final InternalScanner scanner) throws IOException {
2089    int scannedCount = 0;
2090    List<Cell> results = new ArrayList<>();
2091    boolean hasMore = true;
2092    while (hasMore) {
2093      hasMore = scanner.next(results);
2094      scannedCount += results.size();
2095      results.clear();
2096    }
2097    return scannedCount;
2098  }
2099
2100  /**
2101   * Return an md5 digest of the entire contents of a table.
2102   */
2103  public String checksumRows(final Table table) throws Exception {
2104    MessageDigest digest = MessageDigest.getInstance("MD5");
2105    try (ResultScanner results = table.getScanner(new Scan())) {
2106      for (Result res : results) {
2107        digest.update(res.getRow());
2108      }
2109    }
2110    return digest.toString();
2111  }
2112
2113  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2114  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2115  static {
2116    int i = 0;
2117    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2118      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2119        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2120          ROWS[i][0] = b1;
2121          ROWS[i][1] = b2;
2122          ROWS[i][2] = b3;
2123          i++;
2124        }
2125      }
2126    }
2127  }
2128
2129  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2130    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2131    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2132    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2133    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2134    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2135    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2136
2137  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2138    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2139    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2140    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2141    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2142    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2143    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2144
2145  /**
2146   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2147   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2148   * @return list of region info for regions added to meta
2149   */
2150  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2151    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2152    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2153      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2154      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2155      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2156        TableState.State.ENABLED);
2157      // add custom ones
2158      for (int i = 0; i < startKeys.length; i++) {
2159        int j = (i + 1) % startKeys.length;
2160        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2161          .setEndKey(startKeys[j]).build();
2162        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2163        newRegions.add(hri);
2164      }
2165      return newRegions;
2166    }
2167  }
2168
2169  /**
2170   * Create an unmanaged WAL. Be sure to close it when you're through.
2171   */
2172  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2173    throws IOException {
2174    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2175    // unless I pass along via the conf.
2176    Configuration confForWAL = new Configuration(conf);
2177    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2178    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2179  }
2180
2181  /**
2182   * Create a region with it's own WAL. Be sure to call
2183   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2184   */
2185  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2186    final Configuration conf, final TableDescriptor htd) throws IOException {
2187    return createRegionAndWAL(info, rootDir, conf, htd, true);
2188  }
2189
2190  /**
2191   * Create a region with it's own WAL. Be sure to call
2192   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2193   */
2194  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2195    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2196    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2197    region.setBlockCache(blockCache);
2198    region.initialize();
2199    return region;
2200  }
2201
2202  /**
2203   * Create a region with it's own WAL. Be sure to call
2204   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2205   */
2206  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2207    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2208    throws IOException {
2209    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2210    region.setMobFileCache(mobFileCache);
2211    region.initialize();
2212    return region;
2213  }
2214
2215  /**
2216   * Create a region with it's own WAL. Be sure to call
2217   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2218   */
2219  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2220    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2221    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2222      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2223    WAL wal = createWal(conf, rootDir, info);
2224    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2225  }
2226
2227  /**
2228   * Find any other region server which is different from the one identified by parameter
2229   * @return another region server
2230   */
2231  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2232    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2233      if (!(rst.getRegionServer() == rs)) {
2234        return rst.getRegionServer();
2235      }
2236    }
2237    return null;
2238  }
2239
2240  /**
2241   * Tool to get the reference to the region server object that holds the region of the specified
2242   * user table.
2243   * @param tableName user table to lookup in hbase:meta
2244   * @return region server that holds it, null if the row doesn't exist
2245   */
2246  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2247    throws IOException, InterruptedException {
2248    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2249    if (regions == null || regions.isEmpty()) {
2250      return null;
2251    }
2252    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2253
2254    byte[] firstRegionName =
2255      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2256        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2257
2258    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2259    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2260      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2261    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2262      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2263    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2264    while (retrier.shouldRetry()) {
2265      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2266      if (index != -1) {
2267        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2268      }
2269      // Came back -1. Region may not be online yet. Sleep a while.
2270      retrier.sleepUntilNextRetry();
2271    }
2272    return null;
2273  }
2274
2275  /**
2276   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2277   * @throws IOException When starting the cluster fails.
2278   */
2279  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2280    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2281    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2282      "99.0");
2283    startMiniMapReduceCluster(2);
2284    return mrCluster;
2285  }
2286
2287  /**
2288   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2289   * internal static LOG_DIR variable.
2290   */
2291  private void forceChangeTaskLogDir() {
2292    Field logDirField;
2293    try {
2294      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2295      logDirField.setAccessible(true);
2296
2297      Field modifiersField = ReflectionUtils.getModifiersField();
2298      modifiersField.setAccessible(true);
2299      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2300
2301      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2302    } catch (SecurityException e) {
2303      throw new RuntimeException(e);
2304    } catch (NoSuchFieldException e) {
2305      throw new RuntimeException(e);
2306    } catch (IllegalArgumentException e) {
2307      throw new RuntimeException(e);
2308    } catch (IllegalAccessException e) {
2309      throw new RuntimeException(e);
2310    }
2311  }
2312
2313  /**
2314   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2315   * filesystem.
2316   * @param servers The number of <code>TaskTracker</code>'s to start.
2317   * @throws IOException When starting the cluster fails.
2318   */
2319  private void startMiniMapReduceCluster(final int servers) throws IOException {
2320    if (mrCluster != null) {
2321      throw new IllegalStateException("MiniMRCluster is already running");
2322    }
2323    LOG.info("Starting mini mapreduce cluster...");
2324    setupClusterTestDir();
2325    createDirsAndSetProperties();
2326
2327    forceChangeTaskLogDir();
2328
2329    //// hadoop2 specific settings
2330    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2331    // we up the VM usable so that processes don't get killed.
2332    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2333
2334    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2335    // this avoids the problem by disabling speculative task execution in tests.
2336    conf.setBoolean("mapreduce.map.speculative", false);
2337    conf.setBoolean("mapreduce.reduce.speculative", false);
2338    ////
2339
2340    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2341    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2342    if (JVM.getJVMSpecVersion() >= 17) {
2343      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2344      conf.set("yarn.app.mapreduce.am.command-opts",
2345        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2346    }
2347
2348    // Allow the user to override FS URI for this map-reduce cluster to use.
2349    mrCluster =
2350      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2351        1, null, null, new JobConf(this.conf));
2352    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2353    if (jobConf == null) {
2354      jobConf = mrCluster.createJobConf();
2355    }
2356
2357    // Hadoop MiniMR overwrites this while it should not
2358    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2359    LOG.info("Mini mapreduce cluster started");
2360
2361    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2362    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2363    // necessary config properties here. YARN-129 required adding a few properties.
2364    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2365    // this for mrv2 support; mr1 ignores this
2366    conf.set("mapreduce.framework.name", "yarn");
2367    conf.setBoolean("yarn.is.minicluster", true);
2368    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2369    if (rmAddress != null) {
2370      conf.set("yarn.resourcemanager.address", rmAddress);
2371    }
2372    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2373    if (historyAddress != null) {
2374      conf.set("mapreduce.jobhistory.address", historyAddress);
2375    }
2376    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2377    if (schedulerAddress != null) {
2378      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2379    }
2380    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2381    if (mrJobHistoryWebappAddress != null) {
2382      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2383    }
2384    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2385    if (yarnRMWebappAddress != null) {
2386      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2387    }
2388  }
2389
2390  /**
2391   * Stops the previously started <code>MiniMRCluster</code>.
2392   */
2393  public void shutdownMiniMapReduceCluster() {
2394    if (mrCluster != null) {
2395      LOG.info("Stopping mini mapreduce cluster...");
2396      mrCluster.shutdown();
2397      mrCluster = null;
2398      LOG.info("Mini mapreduce cluster stopped");
2399    }
2400    // Restore configuration to point to local jobtracker
2401    conf.set("mapreduce.jobtracker.address", "local");
2402  }
2403
2404  /**
2405   * Create a stubbed out RegionServerService, mainly for getting FS.
2406   */
2407  public RegionServerServices createMockRegionServerService() throws IOException {
2408    return createMockRegionServerService((ServerName) null);
2409  }
2410
2411  /**
2412   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2413   * TestTokenAuthentication
2414   */
2415  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2416    throws IOException {
2417    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2418    rss.setFileSystem(getTestFileSystem());
2419    rss.setRpcServer(rpc);
2420    return rss;
2421  }
2422
2423  /**
2424   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2425   * TestOpenRegionHandler
2426   */
2427  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2428    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2429    rss.setFileSystem(getTestFileSystem());
2430    return rss;
2431  }
2432
2433  /**
2434   * Expire the Master's session
2435   */
2436  public void expireMasterSession() throws Exception {
2437    HMaster master = getMiniHBaseCluster().getMaster();
2438    expireSession(master.getZooKeeper(), false);
2439  }
2440
2441  /**
2442   * Expire a region server's session
2443   * @param index which RS
2444   */
2445  public void expireRegionServerSession(int index) throws Exception {
2446    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2447    expireSession(rs.getZooKeeper(), false);
2448    decrementMinRegionServerCount();
2449  }
2450
2451  private void decrementMinRegionServerCount() {
2452    // decrement the count for this.conf, for newly spwaned master
2453    // this.hbaseCluster shares this configuration too
2454    decrementMinRegionServerCount(getConfiguration());
2455
2456    // each master thread keeps a copy of configuration
2457    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2458      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2459    }
2460  }
2461
2462  private void decrementMinRegionServerCount(Configuration conf) {
2463    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2464    if (currentCount != -1) {
2465      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2466    }
2467  }
2468
2469  public void expireSession(ZKWatcher nodeZK) throws Exception {
2470    expireSession(nodeZK, false);
2471  }
2472
2473  /**
2474   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2475   * http://hbase.apache.org/book.html#trouble.zookeeper
2476   * <p/>
2477   * There are issues when doing this:
2478   * <ol>
2479   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2480   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2481   * </ol>
2482   * @param nodeZK      - the ZK watcher to expire
2483   * @param checkStatus - true to check if we can create a Table with the current configuration.
2484   */
2485  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2486    Configuration c = new Configuration(this.conf);
2487    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2488    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2489    byte[] password = zk.getSessionPasswd();
2490    long sessionID = zk.getSessionId();
2491
2492    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2493    // so we create a first watcher to be sure that the
2494    // event was sent. We expect that if our watcher receives the event
2495    // other watchers on the same machine will get is as well.
2496    // When we ask to close the connection, ZK does not close it before
2497    // we receive all the events, so don't have to capture the event, just
2498    // closing the connection should be enough.
2499    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2500      @Override
2501      public void process(WatchedEvent watchedEvent) {
2502        LOG.info("Monitor ZKW received event=" + watchedEvent);
2503      }
2504    }, sessionID, password);
2505
2506    // Making it expire
2507    ZooKeeper newZK =
2508      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2509
2510    // ensure that we have connection to the server before closing down, otherwise
2511    // the close session event will be eaten out before we start CONNECTING state
2512    long start = EnvironmentEdgeManager.currentTime();
2513    while (
2514      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2515    ) {
2516      Thread.sleep(1);
2517    }
2518    newZK.close();
2519    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2520
2521    // Now closing & waiting to be sure that the clients get it.
2522    monitor.close();
2523
2524    if (checkStatus) {
2525      getConnection().getTable(TableName.META_TABLE_NAME).close();
2526    }
2527  }
2528
2529  /**
2530   * Get the Mini HBase cluster.
2531   * @return hbase cluster
2532   * @see #getHBaseClusterInterface()
2533   */
2534  public SingleProcessHBaseCluster getHBaseCluster() {
2535    return getMiniHBaseCluster();
2536  }
2537
2538  /**
2539   * Returns the HBaseCluster instance.
2540   * <p>
2541   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2542   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2543   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2544   * instead w/o the need to type-cast.
2545   */
2546  public HBaseClusterInterface getHBaseClusterInterface() {
2547    // implementation note: we should rename this method as #getHBaseCluster(),
2548    // but this would require refactoring 90+ calls.
2549    return hbaseCluster;
2550  }
2551
2552  /**
2553   * Resets the connections so that the next time getConnection() is called, a new connection is
2554   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2555   * the connection is not valid anymore.
2556   * <p/>
2557   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2558   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2559   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2560   * the connection state automatically. Cleaning this is a much bigger refactor.
2561   */
2562  public void invalidateConnection() throws IOException {
2563    closeConnection();
2564    // Update the master addresses if they changed.
2565    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2566    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2567    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2568      masterConfigBefore, masterConfAfter);
2569    conf.set(HConstants.MASTER_ADDRS_KEY,
2570      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2571  }
2572
2573  /**
2574   * Get a shared Connection to the cluster. this method is thread safe.
2575   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2576   */
2577  public Connection getConnection() throws IOException {
2578    return getAsyncConnection().toConnection();
2579  }
2580
2581  /**
2582   * Get a assigned Connection to the cluster. this method is thread safe.
2583   * @param user assigned user
2584   * @return A Connection with assigned user.
2585   */
2586  public Connection getConnection(User user) throws IOException {
2587    return getAsyncConnection(user).toConnection();
2588  }
2589
2590  /**
2591   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2592   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2593   *         of cluster.
2594   */
2595  public AsyncClusterConnection getAsyncConnection() throws IOException {
2596    try {
2597      return asyncConnection.updateAndGet(connection -> {
2598        if (connection == null) {
2599          try {
2600            User user = UserProvider.instantiate(conf).getCurrent();
2601            connection = getAsyncConnection(user);
2602          } catch (IOException ioe) {
2603            throw new UncheckedIOException("Failed to create connection", ioe);
2604          }
2605        }
2606        return connection;
2607      });
2608    } catch (UncheckedIOException exception) {
2609      throw exception.getCause();
2610    }
2611  }
2612
2613  /**
2614   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2615   * @param user assigned user
2616   * @return An AsyncClusterConnection with assigned user.
2617   */
2618  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2619    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2620  }
2621
2622  public void closeConnection() throws IOException {
2623    if (hbaseAdmin != null) {
2624      Closeables.close(hbaseAdmin, true);
2625      hbaseAdmin = null;
2626    }
2627    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2628    if (asyncConnection != null) {
2629      Closeables.close(asyncConnection, true);
2630    }
2631  }
2632
2633  /**
2634   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2635   * it has no effect, it will be closed automatically when the cluster shutdowns
2636   */
2637  public Admin getAdmin() throws IOException {
2638    if (hbaseAdmin == null) {
2639      this.hbaseAdmin = getConnection().getAdmin();
2640    }
2641    return hbaseAdmin;
2642  }
2643
2644  private Admin hbaseAdmin = null;
2645
2646  /**
2647   * Returns an {@link Hbck} instance. Needs be closed when done.
2648   */
2649  public Hbck getHbck() throws IOException {
2650    return getConnection().getHbck();
2651  }
2652
2653  /**
2654   * Unassign the named region.
2655   * @param regionName The region to unassign.
2656   */
2657  public void unassignRegion(String regionName) throws IOException {
2658    unassignRegion(Bytes.toBytes(regionName));
2659  }
2660
2661  /**
2662   * Unassign the named region.
2663   * @param regionName The region to unassign.
2664   */
2665  public void unassignRegion(byte[] regionName) throws IOException {
2666    getAdmin().unassign(regionName);
2667  }
2668
2669  /**
2670   * Closes the region containing the given row.
2671   * @param row   The row to find the containing region.
2672   * @param table The table to find the region.
2673   */
2674  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2675    unassignRegionByRow(Bytes.toBytes(row), table);
2676  }
2677
2678  /**
2679   * Closes the region containing the given row.
2680   * @param row   The row to find the containing region.
2681   * @param table The table to find the region. n
2682   */
2683  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2684    HRegionLocation hrl = table.getRegionLocation(row);
2685    unassignRegion(hrl.getRegion().getRegionName());
2686  }
2687
2688  /**
2689   * Retrieves a splittable region randomly from tableName
2690   * @param tableName   name of table
2691   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2692   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2693   */
2694  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2695    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2696    int regCount = regions.size();
2697    Set<Integer> attempted = new HashSet<>();
2698    int idx;
2699    int attempts = 0;
2700    do {
2701      regions = getHBaseCluster().getRegions(tableName);
2702      if (regCount != regions.size()) {
2703        // if there was region movement, clear attempted Set
2704        attempted.clear();
2705      }
2706      regCount = regions.size();
2707      // There are chances that before we get the region for the table from an RS the region may
2708      // be going for CLOSE. This may be because online schema change is enabled
2709      if (regCount > 0) {
2710        idx = ThreadLocalRandom.current().nextInt(regCount);
2711        // if we have just tried this region, there is no need to try again
2712        if (attempted.contains(idx)) {
2713          continue;
2714        }
2715        HRegion region = regions.get(idx);
2716        if (region.checkSplit().isPresent()) {
2717          return region;
2718        }
2719        attempted.add(idx);
2720      }
2721      attempts++;
2722    } while (maxAttempts == -1 || attempts < maxAttempts);
2723    return null;
2724  }
2725
2726  public MiniDFSCluster getDFSCluster() {
2727    return dfsCluster;
2728  }
2729
2730  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2731    setDFSCluster(cluster, true);
2732  }
2733
2734  /**
2735   * Set the MiniDFSCluster
2736   * @param cluster     cluster to use
2737   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2738   *                    is set.
2739   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2740   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2741   */
2742  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2743    throws IllegalStateException, IOException {
2744    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2745      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2746    }
2747    this.dfsCluster = cluster;
2748    this.setFs();
2749  }
2750
2751  public FileSystem getTestFileSystem() throws IOException {
2752    return HFileSystem.get(conf);
2753  }
2754
2755  /**
2756   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2757   * (30 seconds).
2758   * @param table Table to wait on.
2759   */
2760  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2761    waitTableAvailable(table.getName(), 30000);
2762  }
2763
2764  public void waitTableAvailable(TableName table, long timeoutMillis)
2765    throws InterruptedException, IOException {
2766    waitFor(timeoutMillis, predicateTableAvailable(table));
2767  }
2768
2769  /**
2770   * Wait until all regions in a table have been assigned
2771   * @param table         Table to wait on.
2772   * @param timeoutMillis Timeout.
2773   */
2774  public void waitTableAvailable(byte[] table, long timeoutMillis)
2775    throws InterruptedException, IOException {
2776    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2777  }
2778
2779  public String explainTableAvailability(TableName tableName) throws IOException {
2780    StringBuilder msg =
2781      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2782    if (getHBaseCluster().getMaster().isAlive()) {
2783      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2784        .getRegionStates().getRegionAssignments();
2785      final List<Pair<RegionInfo, ServerName>> metaLocations =
2786        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2787      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2788        RegionInfo hri = metaLocation.getFirst();
2789        ServerName sn = metaLocation.getSecond();
2790        if (!assignments.containsKey(hri)) {
2791          msg.append(", region ").append(hri)
2792            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2793        } else if (sn == null) {
2794          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2795        } else if (!sn.equals(assignments.get(hri))) {
2796          msg.append(",  region ").append(hri)
2797            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2798            .append(" <> ").append(assignments.get(hri));
2799        }
2800      }
2801    }
2802    return msg.toString();
2803  }
2804
2805  public String explainTableState(final TableName table, TableState.State state)
2806    throws IOException {
2807    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2808    if (tableState == null) {
2809      return "TableState in META: No table state in META for table " + table
2810        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2811    } else if (!tableState.inStates(state)) {
2812      return "TableState in META: Not " + state + " state, but " + tableState;
2813    } else {
2814      return "TableState in META: OK";
2815    }
2816  }
2817
2818  @Nullable
2819  public TableState findLastTableState(final TableName table) throws IOException {
2820    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2821    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2822      @Override
2823      public boolean visit(Result r) throws IOException {
2824        if (!Arrays.equals(r.getRow(), table.getName())) {
2825          return false;
2826        }
2827        TableState state = CatalogFamilyFormat.getTableState(r);
2828        if (state != null) {
2829          lastTableState.set(state);
2830        }
2831        return true;
2832      }
2833    };
2834    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2835      Integer.MAX_VALUE, visitor);
2836    return lastTableState.get();
2837  }
2838
2839  /**
2840   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2841   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2842   * table.
2843   * @param table the table to wait on.
2844   * @throws InterruptedException if interrupted while waiting
2845   * @throws IOException          if an IO problem is encountered
2846   */
2847  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2848    waitTableEnabled(table, 30000);
2849  }
2850
2851  /**
2852   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2853   * have been all assigned.
2854   * @see #waitTableEnabled(TableName, long)
2855   * @param table         Table to wait on.
2856   * @param timeoutMillis Time to wait on it being marked enabled.
2857   */
2858  public void waitTableEnabled(byte[] table, long timeoutMillis)
2859    throws InterruptedException, IOException {
2860    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2861  }
2862
2863  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2864    waitFor(timeoutMillis, predicateTableEnabled(table));
2865  }
2866
2867  /**
2868   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2869   * after default period (30 seconds)
2870   * @param table Table to wait on.
2871   */
2872  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2873    waitTableDisabled(table, 30000);
2874  }
2875
2876  public void waitTableDisabled(TableName table, long millisTimeout)
2877    throws InterruptedException, IOException {
2878    waitFor(millisTimeout, predicateTableDisabled(table));
2879  }
2880
2881  /**
2882   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2883   * @param table         Table to wait on.
2884   * @param timeoutMillis Time to wait on it being marked disabled.
2885   */
2886  public void waitTableDisabled(byte[] table, long timeoutMillis)
2887    throws InterruptedException, IOException {
2888    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2889  }
2890
2891  /**
2892   * Make sure that at least the specified number of region servers are running
2893   * @param num minimum number of region servers that should be running
2894   * @return true if we started some servers
2895   */
2896  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2897    boolean startedServer = false;
2898    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2899    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2900      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2901      startedServer = true;
2902    }
2903
2904    return startedServer;
2905  }
2906
2907  /**
2908   * Make sure that at least the specified number of region servers are running. We don't count the
2909   * ones that are currently stopping or are stopped.
2910   * @param num minimum number of region servers that should be running
2911   * @return true if we started some servers
2912   */
2913  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2914    boolean startedServer = ensureSomeRegionServersAvailable(num);
2915
2916    int nonStoppedServers = 0;
2917    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2918
2919      HRegionServer hrs = rst.getRegionServer();
2920      if (hrs.isStopping() || hrs.isStopped()) {
2921        LOG.info("A region server is stopped or stopping:" + hrs);
2922      } else {
2923        nonStoppedServers++;
2924      }
2925    }
2926    for (int i = nonStoppedServers; i < num; ++i) {
2927      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2928      startedServer = true;
2929    }
2930    return startedServer;
2931  }
2932
2933  /**
2934   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2935   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2936   * @param c                     Initial configuration
2937   * @param differentiatingSuffix Suffix to differentiate this user from others.
2938   * @return A new configuration instance with a different user set into it.
2939   */
2940  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2941    throws IOException {
2942    FileSystem currentfs = FileSystem.get(c);
2943    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2944      return User.getCurrent();
2945    }
2946    // Else distributed filesystem. Make a new instance per daemon. Below
2947    // code is taken from the AppendTestUtil over in hdfs.
2948    String username = User.getCurrent().getName() + differentiatingSuffix;
2949    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2950    return user;
2951  }
2952
2953  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2954    throws IOException {
2955    NavigableSet<String> online = new TreeSet<>();
2956    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2957      try {
2958        for (RegionInfo region : ProtobufUtil
2959          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2960          online.add(region.getRegionNameAsString());
2961        }
2962      } catch (RegionServerStoppedException e) {
2963        // That's fine.
2964      }
2965    }
2966    return online;
2967  }
2968
2969  /**
2970   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2971   * linger. Here is the exception you'll see:
2972   *
2973   * <pre>
2974   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2975   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2976   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2977   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2978   * </pre>
2979   *
2980   * @param stream A DFSClient.DFSOutputStream.
2981   */
2982  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2983    try {
2984      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2985      for (Class<?> clazz : clazzes) {
2986        String className = clazz.getSimpleName();
2987        if (className.equals("DFSOutputStream")) {
2988          if (clazz.isInstance(stream)) {
2989            Field maxRecoveryErrorCountField =
2990              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2991            maxRecoveryErrorCountField.setAccessible(true);
2992            maxRecoveryErrorCountField.setInt(stream, max);
2993            break;
2994          }
2995        }
2996      }
2997    } catch (Exception e) {
2998      LOG.info("Could not set max recovery field", e);
2999    }
3000  }
3001
3002  /**
3003   * Uses directly the assignment manager to assign the region. and waits until the specified region
3004   * has completed assignment.
3005   * @return true if the region is assigned false otherwise.
3006   */
3007  public boolean assignRegion(final RegionInfo regionInfo)
3008    throws IOException, InterruptedException {
3009    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3010    am.assign(regionInfo);
3011    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3012  }
3013
3014  /**
3015   * Move region to destination server and wait till region is completely moved and online
3016   * @param destRegion region to move
3017   * @param destServer destination server of the region
3018   */
3019  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3020    throws InterruptedException, IOException {
3021    HMaster master = getMiniHBaseCluster().getMaster();
3022    // TODO: Here we start the move. The move can take a while.
3023    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3024    while (true) {
3025      ServerName serverName =
3026        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3027      if (serverName != null && serverName.equals(destServer)) {
3028        assertRegionOnServer(destRegion, serverName, 2000);
3029        break;
3030      }
3031      Thread.sleep(10);
3032    }
3033  }
3034
3035  /**
3036   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3037   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3038   * master has been informed and updated hbase:meta with the regions deployed server.
3039   * @param tableName the table name
3040   */
3041  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3042    waitUntilAllRegionsAssigned(tableName,
3043      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3044  }
3045
3046  /**
3047   * Waith until all system table's regions get assigned
3048   */
3049  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3050    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3051  }
3052
3053  /**
3054   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3055   * timeout. This means all regions have been deployed, master has been informed and updated
3056   * hbase:meta with the regions deployed server.
3057   * @param tableName the table name
3058   * @param timeout   timeout, in milliseconds
3059   */
3060  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3061    throws IOException {
3062    if (!TableName.isMetaTableName(tableName)) {
3063      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3064        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3065          + timeout + "ms");
3066        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3067          @Override
3068          public String explainFailure() throws IOException {
3069            return explainTableAvailability(tableName);
3070          }
3071
3072          @Override
3073          public boolean evaluate() throws IOException {
3074            Scan scan = new Scan();
3075            scan.addFamily(HConstants.CATALOG_FAMILY);
3076            boolean tableFound = false;
3077            try (ResultScanner s = meta.getScanner(scan)) {
3078              for (Result r; (r = s.next()) != null;) {
3079                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3080                RegionInfo info = RegionInfo.parseFromOrNull(b);
3081                if (info != null && info.getTable().equals(tableName)) {
3082                  // Get server hosting this region from catalog family. Return false if no server
3083                  // hosting this region, or if the server hosting this region was recently killed
3084                  // (for fault tolerance testing).
3085                  tableFound = true;
3086                  byte[] server =
3087                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3088                  if (server == null) {
3089                    return false;
3090                  } else {
3091                    byte[] startCode =
3092                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3093                    ServerName serverName =
3094                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3095                        + Bytes.toLong(startCode));
3096                    if (
3097                      !getHBaseClusterInterface().isDistributedCluster()
3098                        && getHBaseCluster().isKilledRS(serverName)
3099                    ) {
3100                      return false;
3101                    }
3102                  }
3103                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3104                    return false;
3105                  }
3106                }
3107              }
3108            }
3109            if (!tableFound) {
3110              LOG.warn(
3111                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3112            }
3113            return tableFound;
3114          }
3115        });
3116      }
3117    }
3118    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3119    // check from the master state if we are using a mini cluster
3120    if (!getHBaseClusterInterface().isDistributedCluster()) {
3121      // So, all regions are in the meta table but make sure master knows of the assignments before
3122      // returning -- sometimes this can lag.
3123      HMaster master = getHBaseCluster().getMaster();
3124      final RegionStates states = master.getAssignmentManager().getRegionStates();
3125      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3126        @Override
3127        public String explainFailure() throws IOException {
3128          return explainTableAvailability(tableName);
3129        }
3130
3131        @Override
3132        public boolean evaluate() throws IOException {
3133          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3134          return hris != null && !hris.isEmpty();
3135        }
3136      });
3137    }
3138    LOG.info("All regions for table " + tableName + " assigned.");
3139  }
3140
3141  /**
3142   * Do a small get/scan against one store. This is required because store has no actual methods of
3143   * querying itself, and relies on StoreScanner.
3144   */
3145  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3146    Scan scan = new Scan(get);
3147    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3148      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3149      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3150      // readpoint 0.
3151      0);
3152
3153    List<Cell> result = new ArrayList<>();
3154    scanner.next(result);
3155    if (!result.isEmpty()) {
3156      // verify that we are on the row we want:
3157      Cell kv = result.get(0);
3158      if (!CellUtil.matchingRows(kv, get.getRow())) {
3159        result.clear();
3160      }
3161    }
3162    scanner.close();
3163    return result;
3164  }
3165
3166  /**
3167   * Create region split keys between startkey and endKey
3168   * @param numRegions the number of regions to be created. it has to be greater than 3.
3169   * @return resulting split keys
3170   */
3171  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3172    assertTrue(numRegions > 3);
3173    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3174    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3175    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3176    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3177    return result;
3178  }
3179
3180  /**
3181   * Do a small get/scan against one store. This is required because store has no actual methods of
3182   * querying itself, and relies on StoreScanner.
3183   */
3184  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3185    throws IOException {
3186    Get get = new Get(row);
3187    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3188    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3189
3190    return getFromStoreFile(store, get);
3191  }
3192
3193  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3194    final List<? extends Cell> actual) {
3195    final int eLen = expected.size();
3196    final int aLen = actual.size();
3197    final int minLen = Math.min(eLen, aLen);
3198
3199    int i = 0;
3200    while (
3201      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3202    ) {
3203      i++;
3204    }
3205
3206    if (additionalMsg == null) {
3207      additionalMsg = "";
3208    }
3209    if (!additionalMsg.isEmpty()) {
3210      additionalMsg = ". " + additionalMsg;
3211    }
3212
3213    if (eLen != aLen || i != minLen) {
3214      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3215        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3216        + " (length " + aLen + ")" + additionalMsg);
3217    }
3218  }
3219
3220  public static <T> String safeGetAsStr(List<T> lst, int i) {
3221    if (0 <= i && i < lst.size()) {
3222      return lst.get(i).toString();
3223    } else {
3224      return "<out_of_range>";
3225    }
3226  }
3227
3228  public String getClusterKey() {
3229    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3230      + ":"
3231      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3232  }
3233
3234  /**
3235   * Creates a random table with the given parameters
3236   */
3237  public Table createRandomTable(TableName tableName, final Collection<String> families,
3238    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3239    final int numRowsPerFlush) throws IOException, InterruptedException {
3240    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3241      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3242      + maxVersions + "\n");
3243
3244    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3245    final int numCF = families.size();
3246    final byte[][] cfBytes = new byte[numCF][];
3247    {
3248      int cfIndex = 0;
3249      for (String cf : families) {
3250        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3251      }
3252    }
3253
3254    final int actualStartKey = 0;
3255    final int actualEndKey = Integer.MAX_VALUE;
3256    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3257    final int splitStartKey = actualStartKey + keysPerRegion;
3258    final int splitEndKey = actualEndKey - keysPerRegion;
3259    final String keyFormat = "%08x";
3260    final Table table = createTable(tableName, cfBytes, maxVersions,
3261      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3262      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3263
3264    if (hbaseCluster != null) {
3265      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3266    }
3267
3268    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3269
3270    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3271      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3272        final byte[] row = Bytes.toBytes(
3273          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3274
3275        Put put = new Put(row);
3276        Delete del = new Delete(row);
3277        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3278          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3279          final long ts = rand.nextInt();
3280          final byte[] qual = Bytes.toBytes("col" + iCol);
3281          if (rand.nextBoolean()) {
3282            final byte[] value =
3283              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3284                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3285            put.addColumn(cf, qual, ts, value);
3286          } else if (rand.nextDouble() < 0.8) {
3287            del.addColumn(cf, qual, ts);
3288          } else {
3289            del.addColumns(cf, qual, ts);
3290          }
3291        }
3292
3293        if (!put.isEmpty()) {
3294          mutator.mutate(put);
3295        }
3296
3297        if (!del.isEmpty()) {
3298          mutator.mutate(del);
3299        }
3300      }
3301      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3302      mutator.flush();
3303      if (hbaseCluster != null) {
3304        getMiniHBaseCluster().flushcache(table.getName());
3305      }
3306    }
3307    mutator.close();
3308
3309    return table;
3310  }
3311
3312  public static int randomFreePort() {
3313    return HBaseCommonTestingUtil.randomFreePort();
3314  }
3315
3316  public static String randomMultiCastAddress() {
3317    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3318  }
3319
3320  public static void waitForHostPort(String host, int port) throws IOException {
3321    final int maxTimeMs = 10000;
3322    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3323    IOException savedException = null;
3324    LOG.info("Waiting for server at " + host + ":" + port);
3325    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3326      try {
3327        Socket sock = new Socket(InetAddress.getByName(host), port);
3328        sock.close();
3329        savedException = null;
3330        LOG.info("Server at " + host + ":" + port + " is available");
3331        break;
3332      } catch (UnknownHostException e) {
3333        throw new IOException("Failed to look up " + host, e);
3334      } catch (IOException e) {
3335        savedException = e;
3336      }
3337      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3338    }
3339
3340    if (savedException != null) {
3341      throw savedException;
3342    }
3343  }
3344
3345  /**
3346   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3347   * continues.
3348   * @return the number of regions the table was split into
3349   */
3350  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3351    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3352    throws IOException {
3353    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3354      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3355  }
3356
3357  /**
3358   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3359   * continues.
3360   * @return the number of regions the table was split into
3361   */
3362  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3363    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3364    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3365    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3366    builder.setDurability(durability);
3367    builder.setRegionReplication(regionReplication);
3368    ColumnFamilyDescriptorBuilder cfBuilder =
3369      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3370    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3371    cfBuilder.setCompressionType(compression);
3372    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3373      numRegionsPerServer);
3374  }
3375
3376  /**
3377   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3378   * continues.
3379   * @return the number of regions the table was split into
3380   */
3381  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3382    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3383    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3384    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3385    builder.setDurability(durability);
3386    builder.setRegionReplication(regionReplication);
3387    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3388    for (int i = 0; i < columnFamilies.length; i++) {
3389      ColumnFamilyDescriptorBuilder cfBuilder =
3390        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3391      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3392      cfBuilder.setCompressionType(compression);
3393      hcds[i] = cfBuilder.build();
3394    }
3395    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3396  }
3397
3398  /**
3399   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3400   * continues.
3401   * @return the number of regions the table was split into
3402   */
3403  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3404    ColumnFamilyDescriptor hcd) throws IOException {
3405    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3406  }
3407
3408  /**
3409   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3410   * continues.
3411   * @return the number of regions the table was split into
3412   */
3413  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3414    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3415    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3416      numRegionsPerServer);
3417  }
3418
3419  /**
3420   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3421   * continues.
3422   * @return the number of regions the table was split into
3423   */
3424  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3425    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3426    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3427      numRegionsPerServer);
3428  }
3429
3430  /**
3431   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3432   * continues.
3433   * @return the number of regions the table was split into
3434   */
3435  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3436    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3437    throws IOException {
3438    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3439    for (ColumnFamilyDescriptor cd : cds) {
3440      if (!td.hasColumnFamily(cd.getName())) {
3441        builder.setColumnFamily(cd);
3442      }
3443    }
3444    td = builder.build();
3445    int totalNumberOfRegions = 0;
3446    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3447    Admin admin = unmanagedConnection.getAdmin();
3448
3449    try {
3450      // create a table a pre-splits regions.
3451      // The number of splits is set as:
3452      // region servers * regions per region server).
3453      int numberOfServers = admin.getRegionServers().size();
3454      if (numberOfServers == 0) {
3455        throw new IllegalStateException("No live regionservers");
3456      }
3457
3458      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3459      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3460        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3461        + numRegionsPerServer + ")");
3462
3463      byte[][] splits = splitter.split(totalNumberOfRegions);
3464
3465      admin.createTable(td, splits);
3466    } catch (MasterNotRunningException e) {
3467      LOG.error("Master not running", e);
3468      throw new IOException(e);
3469    } catch (TableExistsException e) {
3470      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3471    } finally {
3472      admin.close();
3473      unmanagedConnection.close();
3474    }
3475    return totalNumberOfRegions;
3476  }
3477
3478  public static int getMetaRSPort(Connection connection) throws IOException {
3479    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3480      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3481    }
3482  }
3483
3484  /**
3485   * Due to async racing issue, a region may not be in the online region list of a region server
3486   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3487   */
3488  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3489    final long timeout) throws IOException, InterruptedException {
3490    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3491    while (true) {
3492      List<RegionInfo> regions = getAdmin().getRegions(server);
3493      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3494      long now = EnvironmentEdgeManager.currentTime();
3495      if (now > timeoutTime) break;
3496      Thread.sleep(10);
3497    }
3498    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3499  }
3500
3501  /**
3502   * Check to make sure the region is open on the specified region server, but not on any other one.
3503   */
3504  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3505    final long timeout) throws IOException, InterruptedException {
3506    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3507    while (true) {
3508      List<RegionInfo> regions = getAdmin().getRegions(server);
3509      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3510        List<JVMClusterUtil.RegionServerThread> rsThreads =
3511          getHBaseCluster().getLiveRegionServerThreads();
3512        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3513          HRegionServer rs = rsThread.getRegionServer();
3514          if (server.equals(rs.getServerName())) {
3515            continue;
3516          }
3517          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3518          for (HRegion r : hrs) {
3519            assertTrue("Region should not be double assigned",
3520              r.getRegionInfo().getRegionId() != hri.getRegionId());
3521          }
3522        }
3523        return; // good, we are happy
3524      }
3525      long now = EnvironmentEdgeManager.currentTime();
3526      if (now > timeoutTime) break;
3527      Thread.sleep(10);
3528    }
3529    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3530  }
3531
3532  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3533    TableDescriptor td =
3534      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3535    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3536    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3537  }
3538
3539  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3540    BlockCache blockCache) throws IOException {
3541    TableDescriptor td =
3542      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3543    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3544    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3545  }
3546
3547  public static void setFileSystemURI(String fsURI) {
3548    FS_URI = fsURI;
3549  }
3550
3551  /**
3552   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3553   */
3554  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3555    return new ExplainingPredicate<IOException>() {
3556      @Override
3557      public String explainFailure() throws IOException {
3558        final RegionStates regionStates =
3559          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3560        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3561      }
3562
3563      @Override
3564      public boolean evaluate() throws IOException {
3565        HMaster master = getMiniHBaseCluster().getMaster();
3566        if (master == null) return false;
3567        AssignmentManager am = master.getAssignmentManager();
3568        if (am == null) return false;
3569        return !am.hasRegionsInTransition();
3570      }
3571    };
3572  }
3573
3574  /**
3575   * Returns a {@link Predicate} for checking that table is enabled
3576   */
3577  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3578    return new ExplainingPredicate<IOException>() {
3579      @Override
3580      public String explainFailure() throws IOException {
3581        return explainTableState(tableName, TableState.State.ENABLED);
3582      }
3583
3584      @Override
3585      public boolean evaluate() throws IOException {
3586        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3587      }
3588    };
3589  }
3590
3591  /**
3592   * Returns a {@link Predicate} for checking that table is enabled
3593   */
3594  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3595    return new ExplainingPredicate<IOException>() {
3596      @Override
3597      public String explainFailure() throws IOException {
3598        return explainTableState(tableName, TableState.State.DISABLED);
3599      }
3600
3601      @Override
3602      public boolean evaluate() throws IOException {
3603        return getAdmin().isTableDisabled(tableName);
3604      }
3605    };
3606  }
3607
3608  /**
3609   * Returns a {@link Predicate} for checking that table is enabled
3610   */
3611  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3612    return new ExplainingPredicate<IOException>() {
3613      @Override
3614      public String explainFailure() throws IOException {
3615        return explainTableAvailability(tableName);
3616      }
3617
3618      @Override
3619      public boolean evaluate() throws IOException {
3620        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3621        if (tableAvailable) {
3622          try (Table table = getConnection().getTable(tableName)) {
3623            TableDescriptor htd = table.getDescriptor();
3624            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3625              .getAllRegionLocations()) {
3626              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3627                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3628                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3629              for (byte[] family : htd.getColumnFamilyNames()) {
3630                scan.addFamily(family);
3631              }
3632              try (ResultScanner scanner = table.getScanner(scan)) {
3633                scanner.next();
3634              }
3635            }
3636          }
3637        }
3638        return tableAvailable;
3639      }
3640    };
3641  }
3642
3643  /**
3644   * Wait until no regions in transition.
3645   * @param timeout How long to wait.
3646   */
3647  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3648    waitFor(timeout, predicateNoRegionsInTransition());
3649  }
3650
3651  /**
3652   * Wait until no regions in transition. (time limit 15min) n
3653   */
3654  public void waitUntilNoRegionsInTransition() throws IOException {
3655    waitUntilNoRegionsInTransition(15 * 60000);
3656  }
3657
3658  /**
3659   * Wait until labels is ready in VisibilityLabelsCache.
3660   */
3661  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3662    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3663    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3664
3665      @Override
3666      public boolean evaluate() {
3667        for (String label : labels) {
3668          if (labelsCache.getLabelOrdinal(label) == 0) {
3669            return false;
3670          }
3671        }
3672        return true;
3673      }
3674
3675      @Override
3676      public String explainFailure() {
3677        for (String label : labels) {
3678          if (labelsCache.getLabelOrdinal(label) == 0) {
3679            return label + " is not available yet";
3680          }
3681        }
3682        return "";
3683      }
3684    });
3685  }
3686
3687  /**
3688   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3689   * available.
3690   * @return the list of column descriptors
3691   */
3692  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3693    return generateColumnDescriptors("");
3694  }
3695
3696  /**
3697   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3698   * available.
3699   * @param prefix family names prefix
3700   * @return the list of column descriptors
3701   */
3702  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3703    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3704    long familyId = 0;
3705    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3706      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3707        for (BloomType bloomType : BloomType.values()) {
3708          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3709          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3710            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3711          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3712          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3713          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3714          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3715          familyId++;
3716        }
3717      }
3718    }
3719    return columnFamilyDescriptors;
3720  }
3721
3722  /**
3723   * Get supported compression algorithms.
3724   * @return supported compression algorithms.
3725   */
3726  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3727    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3728    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3729    for (String algoName : allAlgos) {
3730      try {
3731        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3732        algo.getCompressor();
3733        supportedAlgos.add(algo);
3734      } catch (Throwable t) {
3735        // this algo is not available
3736      }
3737    }
3738    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3739  }
3740
3741  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3742    Scan scan = new Scan().withStartRow(row);
3743    scan.setReadType(ReadType.PREAD);
3744    scan.setCaching(1);
3745    scan.setReversed(true);
3746    scan.addFamily(family);
3747    try (RegionScanner scanner = r.getScanner(scan)) {
3748      List<Cell> cells = new ArrayList<>(1);
3749      scanner.next(cells);
3750      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3751        return null;
3752      }
3753      return Result.create(cells);
3754    }
3755  }
3756
3757  private boolean isTargetTable(final byte[] inRow, Cell c) {
3758    String inputRowString = Bytes.toString(inRow);
3759    int i = inputRowString.indexOf(HConstants.DELIMITER);
3760    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3761    int o = outputRowString.indexOf(HConstants.DELIMITER);
3762    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3763  }
3764
3765  /**
3766   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3767   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3768   * kerby KDC server and utility for using it,
3769   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3770   * less baggage. It came in in HBASE-5291.
3771   */
3772  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3773    Properties conf = MiniKdc.createConf();
3774    conf.put(MiniKdc.DEBUG, true);
3775    MiniKdc kdc = null;
3776    File dir = null;
3777    // There is time lag between selecting a port and trying to bind with it. It's possible that
3778    // another service captures the port in between which'll result in BindException.
3779    boolean bindException;
3780    int numTries = 0;
3781    do {
3782      try {
3783        bindException = false;
3784        dir = new File(getDataTestDir("kdc").toUri().getPath());
3785        kdc = new MiniKdc(conf, dir);
3786        kdc.start();
3787      } catch (BindException e) {
3788        FileUtils.deleteDirectory(dir); // clean directory
3789        numTries++;
3790        if (numTries == 3) {
3791          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3792          throw e;
3793        }
3794        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3795        bindException = true;
3796      }
3797    } while (bindException);
3798    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3799    return kdc;
3800  }
3801
3802  public int getNumHFiles(final TableName tableName, final byte[] family) {
3803    int numHFiles = 0;
3804    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3805      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3806    }
3807    return numHFiles;
3808  }
3809
3810  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3811    final byte[] family) {
3812    int numHFiles = 0;
3813    for (Region region : rs.getRegions(tableName)) {
3814      numHFiles += region.getStore(family).getStorefilesCount();
3815    }
3816    return numHFiles;
3817  }
3818
3819  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3820    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3821    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3822    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3823    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3824    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3825        it2 = rtdFamilies.iterator(); it.hasNext();) {
3826      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3827    }
3828  }
3829
3830  /**
3831   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3832   * invocations.
3833   */
3834  public static void await(final long sleepMillis, final BooleanSupplier condition)
3835    throws InterruptedException {
3836    try {
3837      while (!condition.getAsBoolean()) {
3838        Thread.sleep(sleepMillis);
3839      }
3840    } catch (RuntimeException e) {
3841      if (e.getCause() instanceof AssertionError) {
3842        throw (AssertionError) e.getCause();
3843      }
3844      throw e;
3845    }
3846  }
3847}