001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.io.UncheckedIOException;
024import java.lang.reflect.Field;
025import java.lang.reflect.Modifier;
026import java.net.BindException;
027import java.net.DatagramSocket;
028import java.net.InetAddress;
029import java.net.ServerSocket;
030import java.net.Socket;
031import java.net.UnknownHostException;
032import java.nio.charset.StandardCharsets;
033import java.security.MessageDigest;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Properties;
044import java.util.Random;
045import java.util.Set;
046import java.util.TreeSet;
047import java.util.concurrent.ThreadLocalRandom;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicReference;
050import java.util.function.BooleanSupplier;
051import org.apache.commons.io.FileUtils;
052import org.apache.commons.lang3.RandomStringUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
057import org.apache.hadoop.hbase.Waiter.Predicate;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.AsyncClusterConnection;
060import org.apache.hadoop.hbase.client.BufferedMutator;
061import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
064import org.apache.hadoop.hbase.client.Connection;
065import org.apache.hadoop.hbase.client.ConnectionFactory;
066import org.apache.hadoop.hbase.client.Consistency;
067import org.apache.hadoop.hbase.client.Delete;
068import org.apache.hadoop.hbase.client.Durability;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Hbck;
071import org.apache.hadoop.hbase.client.MasterRegistry;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.client.RegionLocator;
076import org.apache.hadoop.hbase.client.Result;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.Scan.ReadType;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.client.TableState;
084import org.apache.hadoop.hbase.fs.HFileSystem;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.BlockCache;
089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
090import org.apache.hadoop.hbase.io.hfile.HFile;
091import org.apache.hadoop.hbase.ipc.RpcServerInterface;
092import org.apache.hadoop.hbase.logging.Log4jUtils;
093import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
094import org.apache.hadoop.hbase.master.HMaster;
095import org.apache.hadoop.hbase.master.RegionState;
096import org.apache.hadoop.hbase.master.ServerManager;
097import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
098import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
099import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
100import org.apache.hadoop.hbase.master.assignment.RegionStates;
101import org.apache.hadoop.hbase.mob.MobFileCache;
102import org.apache.hadoop.hbase.regionserver.BloomType;
103import org.apache.hadoop.hbase.regionserver.ChunkCreator;
104import org.apache.hadoop.hbase.regionserver.HRegion;
105import org.apache.hadoop.hbase.regionserver.HRegionServer;
106import org.apache.hadoop.hbase.regionserver.HStore;
107import org.apache.hadoop.hbase.regionserver.InternalScanner;
108import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
109import org.apache.hadoop.hbase.regionserver.Region;
110import org.apache.hadoop.hbase.regionserver.RegionScanner;
111import org.apache.hadoop.hbase.regionserver.RegionServerServices;
112import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
113import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
114import org.apache.hadoop.hbase.security.User;
115import org.apache.hadoop.hbase.security.UserProvider;
116import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
117import org.apache.hadoop.hbase.util.Bytes;
118import org.apache.hadoop.hbase.util.CommonFSUtils;
119import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
120import org.apache.hadoop.hbase.util.FSUtils;
121import org.apache.hadoop.hbase.util.JVMClusterUtil;
122import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
123import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
124import org.apache.hadoop.hbase.util.Pair;
125import org.apache.hadoop.hbase.util.ReflectionUtils;
126import org.apache.hadoop.hbase.util.RegionSplitter;
127import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
128import org.apache.hadoop.hbase.util.RetryCounter;
129import org.apache.hadoop.hbase.util.Threads;
130import org.apache.hadoop.hbase.wal.WAL;
131import org.apache.hadoop.hbase.wal.WALFactory;
132import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
133import org.apache.hadoop.hbase.zookeeper.ZKConfig;
134import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
135import org.apache.hadoop.hdfs.DFSClient;
136import org.apache.hadoop.hdfs.DistributedFileSystem;
137import org.apache.hadoop.hdfs.MiniDFSCluster;
138import org.apache.hadoop.hdfs.server.datanode.DataNode;
139import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
140import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
141import org.apache.hadoop.mapred.JobConf;
142import org.apache.hadoop.mapred.MiniMRCluster;
143import org.apache.hadoop.mapred.TaskLog;
144import org.apache.hadoop.minikdc.MiniKdc;
145import org.apache.yetus.audience.InterfaceAudience;
146import org.apache.zookeeper.WatchedEvent;
147import org.apache.zookeeper.ZooKeeper;
148import org.apache.zookeeper.ZooKeeper.States;
149
150import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
151
152import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
153
154/**
155 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
156 * functionality. Create an instance and keep it around testing HBase. This class is meant to be
157 * your one-stop shop for anything you might need testing. Manages one cluster at a time only.
158 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type
159 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j
160 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set
161 * logging levels. In the configuration properties, default values for master-info-port and
162 * region-server-port are overridden such that a random port will be assigned (thus avoiding port
163 * contention if another local HBase instance is already running).
164 * <p>
165 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
166 * setting it to true.
167 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
168 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
169 */
170@InterfaceAudience.Public
171@Deprecated
172public class HBaseTestingUtility extends HBaseZKTestingUtility {
173
174  /**
175   * System property key to get test directory value. Name is as it is because mini dfs has
176   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
177   * used in mini dfs.
178   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
179   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
180   */
181  @Deprecated
182  private static final String TEST_DIRECTORY_KEY = "test.build.data";
183
184  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
185  /**
186   * The default number of regions per regionserver when creating a pre-split table.
187   */
188  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
189
190  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
191  public static final boolean PRESPLIT_TEST_TABLE = true;
192
193  private MiniDFSCluster dfsCluster = null;
194  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
195
196  private volatile HBaseCluster hbaseCluster = null;
197  private MiniMRCluster mrCluster = null;
198
199  /** If there is a mini cluster running for this testing utility instance. */
200  private volatile boolean miniClusterRunning;
201
202  private String hadoopLogDir;
203
204  /**
205   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
206   */
207  private Path dataTestDirOnTestFS = null;
208
209  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
210
211  /** Filesystem URI used for map-reduce mini-cluster setup */
212  private static String FS_URI;
213
214  /** This is for unit tests parameterized with a single boolean. */
215  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
216
217  /**
218   * Checks to see if a specific port is available.
219   * @param port the port number to check for availability
220   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
221   */
222  public static boolean available(int port) {
223    ServerSocket ss = null;
224    DatagramSocket ds = null;
225    try {
226      ss = new ServerSocket(port);
227      ss.setReuseAddress(true);
228      ds = new DatagramSocket(port);
229      ds.setReuseAddress(true);
230      return true;
231    } catch (IOException e) {
232      // Do nothing
233    } finally {
234      if (ds != null) {
235        ds.close();
236      }
237
238      if (ss != null) {
239        try {
240          ss.close();
241        } catch (IOException e) {
242          /* should not be thrown */
243        }
244      }
245    }
246
247    return false;
248  }
249
250  /**
251   * Create all combinations of Bloom filters and compression algorithms for testing.
252   */
253  private static List<Object[]> bloomAndCompressionCombinations() {
254    List<Object[]> configurations = new ArrayList<>();
255    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
256      for (BloomType bloomType : BloomType.values()) {
257        configurations.add(new Object[] { comprAlgo, bloomType });
258      }
259    }
260    return Collections.unmodifiableList(configurations);
261  }
262
263  /**
264   * Create combination of memstoreTS and tags
265   */
266  private static List<Object[]> memStoreTSAndTagsCombination() {
267    List<Object[]> configurations = new ArrayList<>();
268    configurations.add(new Object[] { false, false });
269    configurations.add(new Object[] { false, true });
270    configurations.add(new Object[] { true, false });
271    configurations.add(new Object[] { true, true });
272    return Collections.unmodifiableList(configurations);
273  }
274
275  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
276    List<Object[]> configurations = new ArrayList<>();
277    configurations.add(new Object[] { false, false, true });
278    configurations.add(new Object[] { false, false, false });
279    configurations.add(new Object[] { false, true, true });
280    configurations.add(new Object[] { false, true, false });
281    configurations.add(new Object[] { true, false, true });
282    configurations.add(new Object[] { true, false, false });
283    configurations.add(new Object[] { true, true, true });
284    configurations.add(new Object[] { true, true, false });
285    return Collections.unmodifiableList(configurations);
286  }
287
288  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
289    bloomAndCompressionCombinations();
290
291  /**
292   * <p>
293   * Create an HBaseTestingUtility using a default configuration.
294   * <p>
295   * Initially, all tmp files are written to a local test data directory. Once
296   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
297   * data will be written to the DFS directory instead.
298   */
299  public HBaseTestingUtility() {
300    this(HBaseConfiguration.create());
301  }
302
303  /**
304   * <p>
305   * Create an HBaseTestingUtility using a given configuration.
306   * <p>
307   * Initially, all tmp files are written to a local test data directory. Once
308   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
309   * data will be written to the DFS directory instead.
310   * @param conf The configuration to use for further operations
311   */
312  public HBaseTestingUtility(Configuration conf) {
313    super(conf);
314
315    // a hbase checksum verification failure will cause unit tests to fail
316    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
317
318    // Save this for when setting default file:// breaks things
319    if (this.conf.get("fs.defaultFS") != null) {
320      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
321    }
322    if (this.conf.get(HConstants.HBASE_DIR) != null) {
323      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
324    }
325    // Every cluster is a local cluster until we start DFS
326    // Note that conf could be null, but this.conf will not be
327    String dataTestDir = getDataTestDir().toString();
328    this.conf.set("fs.defaultFS", "file:///");
329    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
330    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
331    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
332    // If the value for random ports isn't set set it to true, thus making
333    // tests opt-out for random port assignment
334    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
335      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
336  }
337
338  /**
339   * Close both the region {@code r} and it's underlying WAL. For use in tests.
340   */
341  public static void closeRegionAndWAL(final Region r) throws IOException {
342    closeRegionAndWAL((HRegion) r);
343  }
344
345  /**
346   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
347   */
348  public static void closeRegionAndWAL(final HRegion r) throws IOException {
349    if (r == null) return;
350    r.close();
351    if (r.getWAL() == null) return;
352    r.getWAL().close();
353  }
354
355  /**
356   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
357   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
358   * by the Configuration. If say, a Connection was being used against a cluster that had been
359   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
360   * Rather than use the return direct, its usually best to make a copy and use that. Do
361   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
362   * @return Instance of Configuration.
363   */
364  @Override
365  public Configuration getConfiguration() {
366    return super.getConfiguration();
367  }
368
369  public void setHBaseCluster(HBaseCluster hbaseCluster) {
370    this.hbaseCluster = hbaseCluster;
371  }
372
373  /**
374   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
375   * have many concurrent tests running if we need to. It needs to amend the
376   * {@link #TEST_DIRECTORY_KEY} System property, as it's what minidfscluster bases it data dir on.
377   * Moding a System property is not the way to do concurrent instances -- another instance could
378   * grab the temporary value unintentionally -- but not anything can do about it at moment; single
379   * instance only is how the minidfscluster works. We also create the underlying directory names
380   * for hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values in the
381   * conf, and as a system property for hadoop.tmp.dir (We do not create them!).
382   * @return The calculated data test build directory, if newly-created.
383   */
384  @Override
385  protected Path setupDataTestDir() {
386    Path testPath = super.setupDataTestDir();
387    if (null == testPath) {
388      return null;
389    }
390
391    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
392
393    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
394    // we want our own value to ensure uniqueness on the same machine
395    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
396
397    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
398    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
399    return testPath;
400  }
401
402  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
403
404    String sysValue = System.getProperty(propertyName);
405
406    if (sysValue != null) {
407      // There is already a value set. So we do nothing but hope
408      // that there will be no conflicts
409      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
410        + " so I do NOT create it in " + parent);
411      String confValue = conf.get(propertyName);
412      if (confValue != null && !confValue.endsWith(sysValue)) {
413        LOG.warn(propertyName + " property value differs in configuration and system: "
414          + "Configuration=" + confValue + " while System=" + sysValue
415          + " Erasing configuration value by system value.");
416      }
417      conf.set(propertyName, sysValue);
418    } else {
419      // Ok, it's not set, so we create it as a subdirectory
420      createSubDir(propertyName, parent, subDirName);
421      System.setProperty(propertyName, conf.get(propertyName));
422    }
423  }
424
425  /**
426   * @return Where to write test data on the test filesystem; Returns working directory for the test
427   *         filesystem by default
428   * @see #setupDataTestDirOnTestFS()
429   * @see #getTestFileSystem()
430   */
431  private Path getBaseTestDirOnTestFS() throws IOException {
432    FileSystem fs = getTestFileSystem();
433    return new Path(fs.getWorkingDirectory(), "test-data");
434  }
435
436  /**
437   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
438   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
439   * on it.
440   * @return a unique path in the test filesystem
441   */
442  public Path getDataTestDirOnTestFS() throws IOException {
443    if (dataTestDirOnTestFS == null) {
444      setupDataTestDirOnTestFS();
445    }
446
447    return dataTestDirOnTestFS;
448  }
449
450  /**
451   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
452   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
453   * on it.
454   * @return a unique path in the test filesystem
455   * @param subdirName name of the subdir to create under the base test dir
456   */
457  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
458    return new Path(getDataTestDirOnTestFS(), subdirName);
459  }
460
461  /**
462   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
463   * setup.
464   */
465  private void setupDataTestDirOnTestFS() throws IOException {
466    if (dataTestDirOnTestFS != null) {
467      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
468      return;
469    }
470    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
471  }
472
473  /**
474   * Sets up a new path in test filesystem to be used by tests.
475   */
476  private Path getNewDataTestDirOnTestFS() throws IOException {
477    // The file system can be either local, mini dfs, or if the configuration
478    // is supplied externally, it can be an external cluster FS. If it is a local
479    // file system, the tests should use getBaseTestDir, otherwise, we can use
480    // the working directory, and create a unique sub dir there
481    FileSystem fs = getTestFileSystem();
482    Path newDataTestDir;
483    String randomStr = getRandomUUID().toString();
484    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
485      newDataTestDir = new Path(getDataTestDir(), randomStr);
486      File dataTestDir = new File(newDataTestDir.toString());
487      if (deleteOnExit()) dataTestDir.deleteOnExit();
488    } else {
489      Path base = getBaseTestDirOnTestFS();
490      newDataTestDir = new Path(base, randomStr);
491      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
492    }
493    return newDataTestDir;
494  }
495
496  /**
497   * Cleans the test data directory on the test filesystem.
498   * @return True if we removed the test dirs
499   */
500  public boolean cleanupDataTestDirOnTestFS() throws IOException {
501    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
502    if (ret) dataTestDirOnTestFS = null;
503    return ret;
504  }
505
506  /**
507   * Cleans a subdirectory under the test data directory on the test filesystem.
508   * @return True if we removed child
509   */
510  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
511    Path cpath = getDataTestDirOnTestFS(subdirName);
512    return getTestFileSystem().delete(cpath, true);
513  }
514
515  // Workaround to avoid IllegalThreadStateException
516  // See HBASE-27148 for more details
517  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
518
519    private volatile boolean stopped = false;
520
521    private final MiniDFSCluster cluster;
522
523    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
524      super("FsDatasetAsyncDiskServiceFixer");
525      setDaemon(true);
526      this.cluster = cluster;
527    }
528
529    @Override
530    public void run() {
531      while (!stopped) {
532        try {
533          Thread.sleep(30000);
534        } catch (InterruptedException e) {
535          Thread.currentThread().interrupt();
536          continue;
537        }
538        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
539        // timeout of the thread pool executor is 60 seconds by default.
540        try {
541          for (DataNode dn : cluster.getDataNodes()) {
542            FsDatasetSpi<?> dataset = dn.getFSDataset();
543            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
544            service.setAccessible(true);
545            Object asyncDiskService = service.get(dataset);
546            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
547            group.setAccessible(true);
548            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
549            if (threadGroup.isDaemon()) {
550              threadGroup.setDaemon(false);
551            }
552          }
553        } catch (NoSuchFieldException e) {
554          LOG.debug("NoSuchFieldException: " + e.getMessage()
555            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
556            + "See HBASE-27595 for details.");
557        } catch (Exception e) {
558          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
559        }
560      }
561    }
562
563    void shutdown() {
564      stopped = true;
565      interrupt();
566    }
567  }
568
569  /**
570   * Start a minidfscluster.
571   * @param servers How many DNs to start.
572   * @see #shutdownMiniDFSCluster()
573   * @return The mini dfs cluster created.
574   */
575  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
576    return startMiniDFSCluster(servers, null);
577  }
578
579  /**
580   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
581   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
582   * instances of the datanodes will have the same host name.
583   * @param hosts hostnames DNs to run on.
584   * @see #shutdownMiniDFSCluster()
585   * @return The mini dfs cluster created.
586   */
587  public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception {
588    if (hosts != null && hosts.length != 0) {
589      return startMiniDFSCluster(hosts.length, hosts);
590    } else {
591      return startMiniDFSCluster(1, null);
592    }
593  }
594
595  /**
596   * Start a minidfscluster. Can only create one.
597   * @param servers How many DNs to start.
598   * @param hosts   hostnames DNs to run on.
599   * @see #shutdownMiniDFSCluster()
600   * @return The mini dfs cluster created.
601   */
602  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception {
603    return startMiniDFSCluster(servers, null, hosts);
604  }
605
606  private void setFs() throws IOException {
607    if (this.dfsCluster == null) {
608      LOG.info("Skipping setting fs because dfsCluster is null");
609      return;
610    }
611    FileSystem fs = this.dfsCluster.getFileSystem();
612    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
613
614    // re-enable this check with dfs
615    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
616  }
617
618  public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
619    throws Exception {
620    createDirsAndSetProperties();
621    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
622
623    this.dfsCluster =
624      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
625    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
626    this.dfsClusterFixer.start();
627    // Set this just-started cluster as our filesystem.
628    setFs();
629
630    // Wait for the cluster to be totally up
631    this.dfsCluster.waitClusterUp();
632
633    // reset the test directory for test file system
634    dataTestDirOnTestFS = null;
635    String dataTestDir = getDataTestDir().toString();
636    conf.set(HConstants.HBASE_DIR, dataTestDir);
637    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
638
639    return this.dfsCluster;
640  }
641
642  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
643    createDirsAndSetProperties();
644    dfsCluster =
645      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
646    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
647    this.dfsClusterFixer.start();
648    return dfsCluster;
649  }
650
651  /**
652   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
653   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
654   * the conf.
655   *
656   * <pre>
657   * Configuration conf = TEST_UTIL.getConfiguration();
658   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
659   *   Map.Entry&lt;String, String&gt; e = i.next();
660   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
661   * }
662   * </pre>
663   */
664  private void createDirsAndSetProperties() throws IOException {
665    setupClusterTestDir();
666    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
667    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
668    createDirAndSetProperty("test.cache.data");
669    createDirAndSetProperty("hadoop.tmp.dir");
670    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
671    createDirAndSetProperty("mapreduce.cluster.local.dir");
672    createDirAndSetProperty("mapreduce.cluster.temp.dir");
673    enableShortCircuit();
674
675    Path root = getDataTestDirOnTestFS("hadoop");
676    conf.set(MapreduceTestingShim.getMROutputDirProp(),
677      new Path(root, "mapred-output-dir").toString());
678    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
679    conf.set("mapreduce.jobtracker.staging.root.dir",
680      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
681    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
682    conf.set("yarn.app.mapreduce.am.staging-dir",
683      new Path(root, "mapreduce-am-staging-root-dir").toString());
684
685    // Frustrate yarn's and hdfs's attempts at writing /tmp.
686    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
687    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
688    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
689    createDirAndSetProperty("yarn.nodemanager.log-dirs");
690    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
691    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
692    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
693    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
694    createDirAndSetProperty("dfs.journalnode.edits.dir");
695    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
696    createDirAndSetProperty("nfs.dump.dir");
697    createDirAndSetProperty("java.io.tmpdir");
698    createDirAndSetProperty("dfs.journalnode.edits.dir");
699    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
700    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
701  }
702
703  /**
704   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
705   * Default to false.
706   */
707  public boolean isNewVersionBehaviorEnabled() {
708    final String propName = "hbase.tests.new.version.behavior";
709    String v = System.getProperty(propName);
710    if (v != null) {
711      return Boolean.parseBoolean(v);
712    }
713    return false;
714  }
715
716  /**
717   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
718   * allows to specify this parameter on the command line. If not set, default is true.
719   */
720  public boolean isReadShortCircuitOn() {
721    final String propName = "hbase.tests.use.shortcircuit.reads";
722    String readOnProp = System.getProperty(propName);
723    if (readOnProp != null) {
724      return Boolean.parseBoolean(readOnProp);
725    } else {
726      return conf.getBoolean(propName, false);
727    }
728  }
729
730  /**
731   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
732   * including skipping the hdfs checksum checks.
733   */
734  private void enableShortCircuit() {
735    if (isReadShortCircuitOn()) {
736      String curUser = System.getProperty("user.name");
737      LOG.info("read short circuit is ON for user " + curUser);
738      // read short circuit, for hdfs
739      conf.set("dfs.block.local-path-access.user", curUser);
740      // read short circuit, for hbase
741      conf.setBoolean("dfs.client.read.shortcircuit", true);
742      // Skip checking checksum, for the hdfs client and the datanode
743      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
744    } else {
745      LOG.info("read short circuit is OFF");
746    }
747  }
748
749  private String createDirAndSetProperty(final String property) {
750    return createDirAndSetProperty(property, property);
751  }
752
753  private String createDirAndSetProperty(final String relPath, String property) {
754    String path = getDataTestDir(relPath).toString();
755    System.setProperty(property, path);
756    conf.set(property, path);
757    new File(path).mkdirs();
758    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
759    return path;
760  }
761
762  /**
763   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
764   */
765  public void shutdownMiniDFSCluster() throws IOException {
766    if (this.dfsCluster != null) {
767      // The below throws an exception per dn, AsynchronousCloseException.
768      this.dfsCluster.shutdown();
769      dfsCluster = null;
770      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
771      // have a fixer
772      if (dfsClusterFixer != null) {
773        this.dfsClusterFixer.shutdown();
774        dfsClusterFixer = null;
775      }
776      dataTestDirOnTestFS = null;
777      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
778    }
779  }
780
781  /**
782   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
783   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
784   * @param createWALDir Whether to create a new WAL directory.
785   * @return The mini HBase cluster created.
786   * @see #shutdownMiniCluster()
787   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
788   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
789   * @see #startMiniCluster(StartMiniClusterOption)
790   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
791   */
792  @Deprecated
793  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
794    StartMiniClusterOption option =
795      StartMiniClusterOption.builder().createWALDir(createWALDir).build();
796    return startMiniCluster(option);
797  }
798
799  /**
800   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
801   * defined in {@link StartMiniClusterOption.Builder}.
802   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
803   * @param createRootDir Whether to create a new root or data directory path.
804   * @return The mini HBase cluster created.
805   * @see #shutdownMiniCluster()
806   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
807   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
808   * @see #startMiniCluster(StartMiniClusterOption)
809   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
810   */
811  @Deprecated
812  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception {
813    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
814      .numDataNodes(numSlaves).createRootDir(createRootDir).build();
815    return startMiniCluster(option);
816  }
817
818  /**
819   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
820   * defined in {@link StartMiniClusterOption.Builder}.
821   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
822   * @param createRootDir Whether to create a new root or data directory path.
823   * @param createWALDir  Whether to create a new WAL directory.
824   * @return The mini HBase cluster created.
825   * @see #shutdownMiniCluster()
826   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
827   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
828   * @see #startMiniCluster(StartMiniClusterOption)
829   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
830   */
831  @Deprecated
832  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
833    boolean createWALDir) throws Exception {
834    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
835      .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build();
836    return startMiniCluster(option);
837  }
838
839  /**
840   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
841   * defined in {@link StartMiniClusterOption.Builder}.
842   * @param numMasters    Master node number.
843   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
844   * @param createRootDir Whether to create a new root or data directory path.
845   * @return The mini HBase cluster created.
846   * @see #shutdownMiniCluster()
847   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
848   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
849   * @see #startMiniCluster(StartMiniClusterOption)
850   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
851   */
852  @Deprecated
853  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
854    throws Exception {
855    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
856      .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build();
857    return startMiniCluster(option);
858  }
859
860  /**
861   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
862   * defined in {@link StartMiniClusterOption.Builder}.
863   * @param numMasters Master node number.
864   * @param numSlaves  Slave node number, for both HBase region server and HDFS data node.
865   * @return The mini HBase cluster created.
866   * @see #shutdownMiniCluster()
867   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
868   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
869   * @see #startMiniCluster(StartMiniClusterOption)
870   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
871   */
872  @Deprecated
873  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
874    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
875      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
876    return startMiniCluster(option);
877  }
878
879  /**
880   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
881   * defined in {@link StartMiniClusterOption.Builder}.
882   * @param numMasters    Master node number.
883   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
884   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
885   *                      HDFS data node number.
886   * @param createRootDir Whether to create a new root or data directory path.
887   * @return The mini HBase cluster created.
888   * @see #shutdownMiniCluster()
889   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
890   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
891   * @see #startMiniCluster(StartMiniClusterOption)
892   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
893   */
894  @Deprecated
895  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
896    boolean createRootDir) throws Exception {
897    StartMiniClusterOption option =
898      StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves)
899        .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
900    return startMiniCluster(option);
901  }
902
903  /**
904   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
905   * defined in {@link StartMiniClusterOption.Builder}.
906   * @param numMasters    Master node number.
907   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
908   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
909   *                      HDFS data node number.
910   * @return The mini HBase cluster created.
911   * @see #shutdownMiniCluster()
912   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
913   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
914   * @see #startMiniCluster(StartMiniClusterOption)
915   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
916   */
917  @Deprecated
918  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
919    throws Exception {
920    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
921      .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
922    return startMiniCluster(option);
923  }
924
925  /**
926   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
927   * defined in {@link StartMiniClusterOption.Builder}.
928   * @param numMasters       Master node number.
929   * @param numRegionServers Number of region servers.
930   * @param numDataNodes     Number of datanodes.
931   * @return The mini HBase cluster created.
932   * @see #shutdownMiniCluster()
933   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
934   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
935   * @see #startMiniCluster(StartMiniClusterOption)
936   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
937   */
938  @Deprecated
939  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
940    throws Exception {
941    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
942      .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build();
943    return startMiniCluster(option);
944  }
945
946  /**
947   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
948   * defined in {@link StartMiniClusterOption.Builder}.
949   * @param numMasters    Master node number.
950   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
951   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
952   *                      HDFS data node number.
953   * @param masterClass   The class to use as HMaster, or null for default.
954   * @param rsClass       The class to use as HRegionServer, or null for default.
955   * @return The mini HBase cluster created.
956   * @see #shutdownMiniCluster()
957   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
958   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
959   * @see #startMiniCluster(StartMiniClusterOption)
960   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
961   */
962  @Deprecated
963  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
964    Class<? extends HMaster> masterClass,
965    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
966    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
967      .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves)
968      .dataNodeHosts(dataNodeHosts).build();
969    return startMiniCluster(option);
970  }
971
972  /**
973   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
974   * defined in {@link StartMiniClusterOption.Builder}.
975   * @param numMasters       Master node number.
976   * @param numRegionServers Number of region servers.
977   * @param numDataNodes     Number of datanodes.
978   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
979   *                         overwrite HDFS data node number.
980   * @param masterClass      The class to use as HMaster, or null for default.
981   * @param rsClass          The class to use as HRegionServer, or null for default.
982   * @return The mini HBase cluster created.
983   * @see #shutdownMiniCluster()
984   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
985   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
986   * @see #startMiniCluster(StartMiniClusterOption)
987   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
988   */
989  @Deprecated
990  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
991    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
992    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
993    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
994      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
995      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build();
996    return startMiniCluster(option);
997  }
998
999  /**
1000   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
1001   * defined in {@link StartMiniClusterOption.Builder}.
1002   * @param numMasters       Master node number.
1003   * @param numRegionServers Number of region servers.
1004   * @param numDataNodes     Number of datanodes.
1005   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
1006   *                         overwrite HDFS data node number.
1007   * @param masterClass      The class to use as HMaster, or null for default.
1008   * @param rsClass          The class to use as HRegionServer, or null for default.
1009   * @param createRootDir    Whether to create a new root or data directory path.
1010   * @param createWALDir     Whether to create a new WAL directory.
1011   * @return The mini HBase cluster created.
1012   * @see #shutdownMiniCluster()
1013   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1014   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
1015   * @see #startMiniCluster(StartMiniClusterOption)
1016   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1017   */
1018  @Deprecated
1019  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1020    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1021    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1022    boolean createWALDir) throws Exception {
1023    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1024      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
1025      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir)
1026      .createWALDir(createWALDir).build();
1027    return startMiniCluster(option);
1028  }
1029
1030  /**
1031   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
1032   * other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1033   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1034   * @see #startMiniCluster(StartMiniClusterOption option)
1035   * @see #shutdownMiniDFSCluster()
1036   */
1037  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1038    StartMiniClusterOption option =
1039      StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1040    return startMiniCluster(option);
1041  }
1042
1043  /**
1044   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
1045   * value can be found in {@link StartMiniClusterOption.Builder}.
1046   * @see #startMiniCluster(StartMiniClusterOption option)
1047   * @see #shutdownMiniDFSCluster()
1048   */
1049  public MiniHBaseCluster startMiniCluster() throws Exception {
1050    return startMiniCluster(StartMiniClusterOption.builder().build());
1051  }
1052
1053  /**
1054   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
1055   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
1056   * under System property test.build.data, to be cleaned up on exit.
1057   * @see #shutdownMiniDFSCluster()
1058   */
1059  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1060    LOG.info("Starting up minicluster with option: {}", option);
1061
1062    // If we already put up a cluster, fail.
1063    if (miniClusterRunning) {
1064      throw new IllegalStateException("A mini-cluster is already running");
1065    }
1066    miniClusterRunning = true;
1067
1068    setupClusterTestDir();
1069    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
1070
1071    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1072    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1073    if (dfsCluster == null) {
1074      LOG.info("STARTING DFS");
1075      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1076    } else {
1077      LOG.info("NOT STARTING DFS");
1078    }
1079
1080    // Start up a zk cluster.
1081    if (getZkCluster() == null) {
1082      startMiniZKCluster(option.getNumZkServers());
1083    }
1084
1085    // Start the MiniHBaseCluster
1086    return startMiniHBaseCluster(option);
1087  }
1088
1089  /**
1090   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1091   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1092   * @return Reference to the hbase mini hbase cluster.
1093   * @see #startMiniCluster(StartMiniClusterOption)
1094   * @see #shutdownMiniHBaseCluster()
1095   */
1096  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1097    throws IOException, InterruptedException {
1098    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1099    createRootDir(option.isCreateRootDir());
1100    if (option.isCreateWALDir()) {
1101      createWALRootDir();
1102    }
1103    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1104    // for tests that do not read hbase-defaults.xml
1105    setHBaseFsTmpDir();
1106
1107    // These settings will make the server waits until this exact number of
1108    // regions servers are connected.
1109    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1110      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1111    }
1112    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1113      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1114    }
1115
1116    Configuration c = new Configuration(this.conf);
1117    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1118      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1119      option.getMasterClass(), option.getRsClass());
1120    // Populate the master address configuration from mini cluster configuration.
1121    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1122    // Don't leave here till we've done a successful scan of the hbase:meta
1123    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1124      ResultScanner s = t.getScanner(new Scan())) {
1125      for (;;) {
1126        if (s.next() == null) {
1127          break;
1128        }
1129      }
1130    }
1131
1132    getAdmin(); // create immediately the hbaseAdmin
1133    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1134
1135    return (MiniHBaseCluster) hbaseCluster;
1136  }
1137
1138  /**
1139   * Starts up mini hbase cluster using default options. Default options can be found in
1140   * {@link StartMiniClusterOption.Builder}.
1141   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1142   * @see #shutdownMiniHBaseCluster()
1143   */
1144  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1145    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1146  }
1147
1148  /**
1149   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1150   * {@link #startMiniCluster()}. All other options will use default values, defined in
1151   * {@link StartMiniClusterOption.Builder}.
1152   * @param numMasters       Master node number.
1153   * @param numRegionServers Number of region servers.
1154   * @return The mini HBase cluster created.
1155   * @see #shutdownMiniHBaseCluster()
1156   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1157   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1158   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1159   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1160   */
1161  @Deprecated
1162  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1163    throws IOException, InterruptedException {
1164    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1165      .numRegionServers(numRegionServers).build();
1166    return startMiniHBaseCluster(option);
1167  }
1168
1169  /**
1170   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1171   * {@link #startMiniCluster()}. All other options will use default values, defined in
1172   * {@link StartMiniClusterOption.Builder}.
1173   * @param numMasters       Master node number.
1174   * @param numRegionServers Number of region servers.
1175   * @param rsPorts          Ports that RegionServer should use.
1176   * @return The mini HBase cluster created.
1177   * @see #shutdownMiniHBaseCluster()
1178   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1179   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1180   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1181   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1182   */
1183  @Deprecated
1184  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1185    List<Integer> rsPorts) throws IOException, InterruptedException {
1186    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1187      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1188    return startMiniHBaseCluster(option);
1189  }
1190
1191  /**
1192   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1193   * {@link #startMiniCluster()}. All other options will use default values, defined in
1194   * {@link StartMiniClusterOption.Builder}.
1195   * @param numMasters       Master node number.
1196   * @param numRegionServers Number of region servers.
1197   * @param rsPorts          Ports that RegionServer should use.
1198   * @param masterClass      The class to use as HMaster, or null for default.
1199   * @param rsClass          The class to use as HRegionServer, or null for default.
1200   * @param createRootDir    Whether to create a new root or data directory path.
1201   * @param createWALDir     Whether to create a new WAL directory.
1202   * @return The mini HBase cluster created.
1203   * @see #shutdownMiniHBaseCluster()
1204   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1205   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1206   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1207   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1208   */
1209  @Deprecated
1210  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1211    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1212    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1213    boolean createWALDir) throws IOException, InterruptedException {
1214    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1215      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1216      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1217    return startMiniHBaseCluster(option);
1218  }
1219
1220  /**
1221   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1222   * want to keep dfs/zk up and just stop/start hbase.
1223   * @param servers number of region servers
1224   */
1225  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1226    this.restartHBaseCluster(servers, null);
1227  }
1228
1229  public void restartHBaseCluster(int servers, List<Integer> ports)
1230    throws IOException, InterruptedException {
1231    StartMiniClusterOption option =
1232      StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1233    restartHBaseCluster(option);
1234    invalidateConnection();
1235  }
1236
1237  public void restartHBaseCluster(StartMiniClusterOption option)
1238    throws IOException, InterruptedException {
1239    closeConnection();
1240    this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(),
1241      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1242      option.getMasterClass(), option.getRsClass());
1243    // Don't leave here till we've done a successful scan of the hbase:meta
1244    Connection conn = ConnectionFactory.createConnection(this.conf);
1245    Table t = conn.getTable(TableName.META_TABLE_NAME);
1246    ResultScanner s = t.getScanner(new Scan());
1247    while (s.next() != null) {
1248      // do nothing
1249    }
1250    LOG.info("HBase has been restarted");
1251    s.close();
1252    t.close();
1253    conn.close();
1254  }
1255
1256  /**
1257   * @return Current mini hbase cluster. Only has something in it after a call to
1258   *         {@link #startMiniCluster()}.
1259   * @see #startMiniCluster()
1260   */
1261  public MiniHBaseCluster getMiniHBaseCluster() {
1262    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1263      return (MiniHBaseCluster) this.hbaseCluster;
1264    }
1265    throw new RuntimeException(
1266      hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName());
1267  }
1268
1269  /**
1270   * Stops mini hbase, zk, and hdfs clusters.
1271   * @see #startMiniCluster(int)
1272   */
1273  public void shutdownMiniCluster() throws IOException {
1274    LOG.info("Shutting down minicluster");
1275    shutdownMiniHBaseCluster();
1276    shutdownMiniDFSCluster();
1277    shutdownMiniZKCluster();
1278
1279    cleanupTestDir();
1280    miniClusterRunning = false;
1281    LOG.info("Minicluster is down");
1282  }
1283
1284  /**
1285   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1286   * @throws java.io.IOException in case command is unsuccessful
1287   */
1288  public void shutdownMiniHBaseCluster() throws IOException {
1289    cleanup();
1290    if (this.hbaseCluster != null) {
1291      this.hbaseCluster.shutdown();
1292      // Wait till hbase is down before going on to shutdown zk.
1293      this.hbaseCluster.waitUntilShutDown();
1294      this.hbaseCluster = null;
1295    }
1296    if (zooKeeperWatcher != null) {
1297      zooKeeperWatcher.close();
1298      zooKeeperWatcher = null;
1299    }
1300  }
1301
1302  /**
1303   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1304   * @throws java.io.IOException throws in case command is unsuccessful
1305   */
1306  public void killMiniHBaseCluster() throws IOException {
1307    cleanup();
1308    if (this.hbaseCluster != null) {
1309      getMiniHBaseCluster().killAll();
1310      this.hbaseCluster = null;
1311    }
1312    if (zooKeeperWatcher != null) {
1313      zooKeeperWatcher.close();
1314      zooKeeperWatcher = null;
1315    }
1316  }
1317
1318  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1319  private void cleanup() throws IOException {
1320    closeConnection();
1321    // unset the configuration for MIN and MAX RS to start
1322    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1323    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1324  }
1325
1326  /**
1327   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1328   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1329   * If false, previous path is used. Note: this does not cause the root dir to be created.
1330   * @return Fully qualified path for the default hbase root dir
1331   */
1332  public Path getDefaultRootDirPath(boolean create) throws IOException {
1333    if (!create) {
1334      return getDataTestDirOnTestFS();
1335    } else {
1336      return getNewDataTestDirOnTestFS();
1337    }
1338  }
1339
1340  /**
1341   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that
1342   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1343   * @return Fully qualified path for the default hbase root dir
1344   */
1345  public Path getDefaultRootDirPath() throws IOException {
1346    return getDefaultRootDirPath(false);
1347  }
1348
1349  /**
1350   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1351   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1352   * startup. You'd only use this method if you were doing manual operation.
1353   * @param create This flag decides whether to get a new root or data directory path or not, if it
1354   *               has been fetched already. Note : Directory will be made irrespective of whether
1355   *               path has been fetched or not. If directory already exists, it will be overwritten
1356   * @return Fully qualified path to hbase root dir
1357   */
1358  public Path createRootDir(boolean create) throws IOException {
1359    FileSystem fs = FileSystem.get(this.conf);
1360    Path hbaseRootdir = getDefaultRootDirPath(create);
1361    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1362    fs.mkdirs(hbaseRootdir);
1363    FSUtils.setVersion(fs, hbaseRootdir);
1364    return hbaseRootdir;
1365  }
1366
1367  /**
1368   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that
1369   * <code>create</code> flag is false.
1370   * @return Fully qualified path to hbase root dir
1371   */
1372  public Path createRootDir() throws IOException {
1373    return createRootDir(false);
1374  }
1375
1376  /**
1377   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1378   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1379   * this method if you were doing manual operation.
1380   * @return Fully qualified path to hbase root dir
1381   */
1382  public Path createWALRootDir() throws IOException {
1383    FileSystem fs = FileSystem.get(this.conf);
1384    Path walDir = getNewDataTestDirOnTestFS();
1385    CommonFSUtils.setWALRootDir(this.conf, walDir);
1386    fs.mkdirs(walDir);
1387    return walDir;
1388  }
1389
1390  private void setHBaseFsTmpDir() throws IOException {
1391    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1392    if (hbaseFsTmpDirInString == null) {
1393      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1394      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1395    } else {
1396      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1397    }
1398  }
1399
1400  /**
1401   * Flushes all caches in the mini hbase cluster
1402   */
1403  public void flush() throws IOException {
1404    getMiniHBaseCluster().flushcache();
1405  }
1406
1407  /**
1408   * Flushes all caches in the mini hbase cluster
1409   */
1410  public void flush(TableName tableName) throws IOException {
1411    getMiniHBaseCluster().flushcache(tableName);
1412  }
1413
1414  /**
1415   * Compact all regions in the mini hbase cluster
1416   */
1417  public void compact(boolean major) throws IOException {
1418    getMiniHBaseCluster().compact(major);
1419  }
1420
1421  /**
1422   * Compact all of a table's reagion in the mini hbase cluster
1423   */
1424  public void compact(TableName tableName, boolean major) throws IOException {
1425    getMiniHBaseCluster().compact(tableName, major);
1426  }
1427
1428  /**
1429   * Create a table.
1430   * @return A Table instance for the created table.
1431   */
1432  public Table createTable(TableName tableName, String family) throws IOException {
1433    return createTable(tableName, new String[] { family });
1434  }
1435
1436  /**
1437   * Create a table.
1438   * @return A Table instance for the created table.
1439   */
1440  public Table createTable(TableName tableName, String[] families) throws IOException {
1441    List<byte[]> fams = new ArrayList<>(families.length);
1442    for (String family : families) {
1443      fams.add(Bytes.toBytes(family));
1444    }
1445    return createTable(tableName, fams.toArray(new byte[0][]));
1446  }
1447
1448  /**
1449   * Create a table.
1450   * @return A Table instance for the created table.
1451   */
1452  public Table createTable(TableName tableName, byte[] family) throws IOException {
1453    return createTable(tableName, new byte[][] { family });
1454  }
1455
1456  /**
1457   * Create a table with multiple regions.
1458   * @return A Table instance for the created table.
1459   */
1460  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1461    throws IOException {
1462    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1463    byte[] startKey = Bytes.toBytes("aaaaa");
1464    byte[] endKey = Bytes.toBytes("zzzzz");
1465    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1466
1467    return createTable(tableName, new byte[][] { family }, splitKeys);
1468  }
1469
1470  /**
1471   * Create a table.
1472   * @return A Table instance for the created table.
1473   */
1474  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1475    return createTable(tableName, families, (byte[][]) null);
1476  }
1477
1478  /**
1479   * Create a table with multiple regions.
1480   * @return A Table instance for the created table.
1481   */
1482  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1483    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1484  }
1485
1486  /**
1487   * Create a table with multiple regions.
1488   * @param replicaCount replica count.
1489   * @return A Table instance for the created table.
1490   */
1491  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1492    throws IOException {
1493    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1494  }
1495
1496  /**
1497   * Create a table.
1498   * @return A Table instance for the created table.
1499   */
1500  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1501    throws IOException {
1502    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1503  }
1504
1505  /**
1506   * Create a table.
1507   * @param tableName    the table name
1508   * @param families     the families
1509   * @param splitKeys    the splitkeys
1510   * @param replicaCount the region replica count
1511   * @return A Table instance for the created table.
1512   * @throws IOException throws IOException
1513   */
1514  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1515    int replicaCount) throws IOException {
1516    return createTable(tableName, families, splitKeys, replicaCount,
1517      new Configuration(getConfiguration()));
1518  }
1519
1520  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1521    byte[] endKey, int numRegions) throws IOException {
1522    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1523
1524    getAdmin().createTable(desc, startKey, endKey, numRegions);
1525    // HBaseAdmin only waits for regions to appear in hbase:meta we
1526    // should wait until they are assigned
1527    waitUntilAllRegionsAssigned(tableName);
1528    return getConnection().getTable(tableName);
1529  }
1530
1531  /**
1532   * Create a table.
1533   * @param c Configuration to use
1534   * @return A Table instance for the created table.
1535   */
1536  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1537    throws IOException {
1538    return createTable(htd, families, null, c);
1539  }
1540
1541  /**
1542   * Create a table.
1543   * @param htd       table descriptor
1544   * @param families  array of column families
1545   * @param splitKeys array of split keys
1546   * @param c         Configuration to use
1547   * @return A Table instance for the created table.
1548   * @throws IOException if getAdmin or createTable fails
1549   */
1550  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1551    Configuration c) throws IOException {
1552    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1553    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1554    // on is interfering.
1555    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1556  }
1557
1558  /**
1559   * Create a table.
1560   * @param htd       table descriptor
1561   * @param families  array of column families
1562   * @param splitKeys array of split keys
1563   * @param type      Bloom type
1564   * @param blockSize block size
1565   * @param c         Configuration to use
1566   * @return A Table instance for the created table.
1567   * @throws IOException if getAdmin or createTable fails
1568   */
1569
1570  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1571    BloomType type, int blockSize, Configuration c) throws IOException {
1572    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1573    for (byte[] family : families) {
1574      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1575        .setBloomFilterType(type).setBlocksize(blockSize);
1576      if (isNewVersionBehaviorEnabled()) {
1577        cfdb.setNewVersionBehavior(true);
1578      }
1579      builder.setColumnFamily(cfdb.build());
1580    }
1581    TableDescriptor td = builder.build();
1582    if (splitKeys != null) {
1583      getAdmin().createTable(td, splitKeys);
1584    } else {
1585      getAdmin().createTable(td);
1586    }
1587    // HBaseAdmin only waits for regions to appear in hbase:meta
1588    // we should wait until they are assigned
1589    waitUntilAllRegionsAssigned(td.getTableName());
1590    return getConnection().getTable(td.getTableName());
1591  }
1592
1593  /**
1594   * Create a table.
1595   * @param htd       table descriptor
1596   * @param splitRows array of split keys
1597   * @return A Table instance for the created table.
1598   */
1599  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1600    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1601    if (isNewVersionBehaviorEnabled()) {
1602      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1603        builder.setColumnFamily(
1604          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1605      }
1606    }
1607    if (splitRows != null) {
1608      getAdmin().createTable(builder.build(), splitRows);
1609    } else {
1610      getAdmin().createTable(builder.build());
1611    }
1612    // HBaseAdmin only waits for regions to appear in hbase:meta
1613    // we should wait until they are assigned
1614    waitUntilAllRegionsAssigned(htd.getTableName());
1615    return getConnection().getTable(htd.getTableName());
1616  }
1617
1618  /**
1619   * Create a table.
1620   * @param tableName    the table name
1621   * @param families     the families
1622   * @param splitKeys    the split keys
1623   * @param replicaCount the replica count
1624   * @param c            Configuration to use
1625   * @return A Table instance for the created table.
1626   */
1627  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1628    int replicaCount, final Configuration c) throws IOException {
1629    TableDescriptor htd =
1630      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1631    return createTable(htd, families, splitKeys, c);
1632  }
1633
1634  /**
1635   * Create a table.
1636   * @return A Table instance for the created table.
1637   */
1638  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1639    return createTable(tableName, new byte[][] { family }, numVersions);
1640  }
1641
1642  /**
1643   * Create a table.
1644   * @return A Table instance for the created table.
1645   */
1646  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1647    throws IOException {
1648    return createTable(tableName, families, numVersions, (byte[][]) null);
1649  }
1650
1651  /**
1652   * Create a table.
1653   * @return A Table instance for the created table.
1654   */
1655  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1656    byte[][] splitKeys) throws IOException {
1657    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1658    for (byte[] family : families) {
1659      ColumnFamilyDescriptorBuilder cfBuilder =
1660        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1661      if (isNewVersionBehaviorEnabled()) {
1662        cfBuilder.setNewVersionBehavior(true);
1663      }
1664      builder.setColumnFamily(cfBuilder.build());
1665    }
1666    if (splitKeys != null) {
1667      getAdmin().createTable(builder.build(), splitKeys);
1668    } else {
1669      getAdmin().createTable(builder.build());
1670    }
1671    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1672    // assigned
1673    waitUntilAllRegionsAssigned(tableName);
1674    return getConnection().getTable(tableName);
1675  }
1676
1677  /**
1678   * Create a table with multiple regions.
1679   * @return A Table instance for the created table.
1680   */
1681  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1682    throws IOException {
1683    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1684  }
1685
1686  /**
1687   * Create a table.
1688   * @return A Table instance for the created table.
1689   */
1690  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1691    throws IOException {
1692    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1693    for (byte[] family : families) {
1694      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1695        .setMaxVersions(numVersions).setBlocksize(blockSize);
1696      if (isNewVersionBehaviorEnabled()) {
1697        cfBuilder.setNewVersionBehavior(true);
1698      }
1699      builder.setColumnFamily(cfBuilder.build());
1700    }
1701    getAdmin().createTable(builder.build());
1702    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1703    // assigned
1704    waitUntilAllRegionsAssigned(tableName);
1705    return getConnection().getTable(tableName);
1706  }
1707
1708  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1709    String cpName) throws IOException {
1710    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1711    for (byte[] family : families) {
1712      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1713        .setMaxVersions(numVersions).setBlocksize(blockSize);
1714      if (isNewVersionBehaviorEnabled()) {
1715        cfBuilder.setNewVersionBehavior(true);
1716      }
1717      builder.setColumnFamily(cfBuilder.build());
1718    }
1719    if (cpName != null) {
1720      builder.setCoprocessor(cpName);
1721    }
1722    getAdmin().createTable(builder.build());
1723    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1724    // assigned
1725    waitUntilAllRegionsAssigned(tableName);
1726    return getConnection().getTable(tableName);
1727  }
1728
1729  /**
1730   * Create a table.
1731   * @return A Table instance for the created table.
1732   */
1733  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1734    throws IOException {
1735    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1736    int i = 0;
1737    for (byte[] family : families) {
1738      ColumnFamilyDescriptorBuilder cfBuilder =
1739        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1740      if (isNewVersionBehaviorEnabled()) {
1741        cfBuilder.setNewVersionBehavior(true);
1742      }
1743      builder.setColumnFamily(cfBuilder.build());
1744      i++;
1745    }
1746    getAdmin().createTable(builder.build());
1747    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1748    // assigned
1749    waitUntilAllRegionsAssigned(tableName);
1750    return getConnection().getTable(tableName);
1751  }
1752
1753  /**
1754   * Create a table.
1755   * @return A Table instance for the created table.
1756   */
1757  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1758    throws IOException {
1759    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1760    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1761    if (isNewVersionBehaviorEnabled()) {
1762      cfBuilder.setNewVersionBehavior(true);
1763    }
1764    builder.setColumnFamily(cfBuilder.build());
1765    getAdmin().createTable(builder.build(), splitRows);
1766    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1767    // assigned
1768    waitUntilAllRegionsAssigned(tableName);
1769    return getConnection().getTable(tableName);
1770  }
1771
1772  /**
1773   * Create a table with multiple regions.
1774   * @return A Table instance for the created table.
1775   */
1776  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1777    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1778  }
1779
1780  /**
1781   * Modify a table, synchronous.
1782   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1783   *             {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1784   * @see Admin#modifyTable(TableDescriptor)
1785   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1786   */
1787  @Deprecated
1788  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1789    throws IOException, InterruptedException {
1790    admin.modifyTable(desc);
1791  }
1792
1793  /**
1794   * Set the number of Region replicas.
1795   */
1796  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1797    throws IOException, InterruptedException {
1798    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1799      .setRegionReplication(replicaCount).build();
1800    admin.modifyTable(desc);
1801  }
1802
1803  /**
1804   * Drop an existing table
1805   * @param tableName existing table
1806   */
1807  public void deleteTable(TableName tableName) throws IOException {
1808    try {
1809      getAdmin().disableTable(tableName);
1810    } catch (TableNotEnabledException e) {
1811      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1812    }
1813    getAdmin().deleteTable(tableName);
1814  }
1815
1816  /**
1817   * Drop an existing table
1818   * @param tableName existing table
1819   */
1820  public void deleteTableIfAny(TableName tableName) throws IOException {
1821    try {
1822      deleteTable(tableName);
1823    } catch (TableNotFoundException e) {
1824      // ignore
1825    }
1826  }
1827
1828  // ==========================================================================
1829  // Canned table and table descriptor creation
1830
1831  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1832  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1833  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1834  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1835  private static final int MAXVERSIONS = 3;
1836
1837  public static final char FIRST_CHAR = 'a';
1838  public static final char LAST_CHAR = 'z';
1839  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1840  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1841
1842  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1843    return createModifyableTableDescriptor(TableName.valueOf(name),
1844      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1845      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1846  }
1847
1848  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1849    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1850    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1851    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1852      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1853        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1854        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1855      if (isNewVersionBehaviorEnabled()) {
1856        cfBuilder.setNewVersionBehavior(true);
1857      }
1858      builder.setColumnFamily(cfBuilder.build());
1859    }
1860    return builder.build();
1861  }
1862
1863  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1864    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1865    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1866    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1867      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1868        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1869        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1870      if (isNewVersionBehaviorEnabled()) {
1871        cfBuilder.setNewVersionBehavior(true);
1872      }
1873      builder.setColumnFamily(cfBuilder.build());
1874    }
1875    return builder;
1876  }
1877
1878  /**
1879   * Create a table of name <code>name</code>.
1880   * @param name Name to give table.
1881   * @return Column descriptor.
1882   */
1883  public TableDescriptor createTableDescriptor(final TableName name) {
1884    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1885      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1886  }
1887
1888  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1889    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1890  }
1891
1892  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1893    int maxVersions) {
1894    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1895    for (byte[] family : families) {
1896      ColumnFamilyDescriptorBuilder cfBuilder =
1897        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1898      if (isNewVersionBehaviorEnabled()) {
1899        cfBuilder.setNewVersionBehavior(true);
1900      }
1901      builder.setColumnFamily(cfBuilder.build());
1902    }
1903    return builder.build();
1904  }
1905
1906  /**
1907   * Create an HRegion that writes to the local tmp dirs
1908   * @param desc     a table descriptor indicating which table the region belongs to
1909   * @param startKey the start boundary of the region
1910   * @param endKey   the end boundary of the region
1911   * @return a region that writes to local dir for testing
1912   */
1913  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1914    throws IOException {
1915    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1916      .setEndKey(endKey).build();
1917    return createLocalHRegion(hri, desc);
1918  }
1919
1920  /**
1921   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1922   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1923   */
1924  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1925    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1926  }
1927
1928  /**
1929   * Create an HRegion that writes to the local tmp dirs with specified wal
1930   * @param info regioninfo
1931   * @param conf configuration
1932   * @param desc table descriptor
1933   * @param wal  wal for this region.
1934   * @return created hregion
1935   */
1936  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1937    WAL wal) throws IOException {
1938    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1939  }
1940
1941  /**
1942   * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
1943   * when done.
1944   */
1945  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1946    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1947    throws IOException {
1948    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1949      durability, wal, null, families);
1950  }
1951
1952  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1953    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1954    boolean[] compactedMemStore, byte[]... families) throws IOException {
1955    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1956    builder.setReadOnly(isReadOnly);
1957    int i = 0;
1958    for (byte[] family : families) {
1959      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1960      if (compactedMemStore != null && i < compactedMemStore.length) {
1961        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1962      } else {
1963        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1964
1965      }
1966      i++;
1967      // Set default to be three versions.
1968      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1969      builder.setColumnFamily(cfBuilder.build());
1970    }
1971    builder.setDurability(durability);
1972    RegionInfo info =
1973      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1974    return createLocalHRegion(info, conf, builder.build(), wal);
1975  }
1976
1977  //
1978  // ==========================================================================
1979
1980  /**
1981   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1982   * read.
1983   * @param tableName existing table
1984   * @return HTable to that new table
1985   */
1986  public Table deleteTableData(TableName tableName) throws IOException {
1987    Table table = getConnection().getTable(tableName);
1988    Scan scan = new Scan();
1989    ResultScanner resScan = table.getScanner(scan);
1990    for (Result res : resScan) {
1991      Delete del = new Delete(res.getRow());
1992      table.delete(del);
1993    }
1994    resScan = table.getScanner(scan);
1995    resScan.close();
1996    return table;
1997  }
1998
1999  /**
2000   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2001   * table.
2002   * @param tableName       table which must exist.
2003   * @param preserveRegions keep the existing split points
2004   * @return HTable for the new table
2005   */
2006  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
2007    throws IOException {
2008    Admin admin = getAdmin();
2009    if (!admin.isTableDisabled(tableName)) {
2010      admin.disableTable(tableName);
2011    }
2012    admin.truncateTable(tableName, preserveRegions);
2013    return getConnection().getTable(tableName);
2014  }
2015
2016  /**
2017   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2018   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
2019   * preserve regions of existing table.
2020   * @param tableName table which must exist.
2021   * @return HTable for the new table
2022   */
2023  public Table truncateTable(final TableName tableName) throws IOException {
2024    return truncateTable(tableName, false);
2025  }
2026
2027  /**
2028   * Load table with rows from 'aaa' to 'zzz'.
2029   * @param t Table
2030   * @param f Family
2031   * @return Count of rows loaded.
2032   */
2033  public int loadTable(final Table t, final byte[] f) throws IOException {
2034    return loadTable(t, new byte[][] { f });
2035  }
2036
2037  /**
2038   * Load table with rows from 'aaa' to 'zzz'.
2039   * @param t Table
2040   * @param f Family
2041   * @return Count of rows loaded.
2042   */
2043  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2044    return loadTable(t, new byte[][] { f }, null, writeToWAL);
2045  }
2046
2047  /**
2048   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2049   * @param t Table
2050   * @param f Array of Families to load
2051   * @return Count of rows loaded.
2052   */
2053  public int loadTable(final Table t, final byte[][] f) throws IOException {
2054    return loadTable(t, f, null);
2055  }
2056
2057  /**
2058   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2059   * @param t     Table
2060   * @param f     Array of Families to load
2061   * @param value the values of the cells. If null is passed, the row key is used as value
2062   * @return Count of rows loaded.
2063   */
2064  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2065    return loadTable(t, f, value, true);
2066  }
2067
2068  /**
2069   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2070   * @param t     Table
2071   * @param f     Array of Families to load
2072   * @param value the values of the cells. If null is passed, the row key is used as value
2073   * @return Count of rows loaded.
2074   */
2075  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
2076    throws IOException {
2077    List<Put> puts = new ArrayList<>();
2078    for (byte[] row : HBaseTestingUtility.ROWS) {
2079      Put put = new Put(row);
2080      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2081      for (int i = 0; i < f.length; i++) {
2082        byte[] value1 = value != null ? value : row;
2083        put.addColumn(f[i], f[i], value1);
2084      }
2085      puts.add(put);
2086    }
2087    t.put(puts);
2088    return puts.size();
2089  }
2090
2091  /**
2092   * A tracker for tracking and validating table rows generated with
2093   * {@link HBaseTestingUtility#loadTable(Table, byte[])}
2094   */
2095  public static class SeenRowTracker {
2096    int dim = 'z' - 'a' + 1;
2097    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
2098    byte[] startRow;
2099    byte[] stopRow;
2100
2101    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2102      this.startRow = startRow;
2103      this.stopRow = stopRow;
2104    }
2105
2106    void reset() {
2107      for (byte[] row : ROWS) {
2108        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2109      }
2110    }
2111
2112    int i(byte b) {
2113      return b - 'a';
2114    }
2115
2116    public void addRow(byte[] row) {
2117      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2118    }
2119
2120    /**
2121     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
2122     * rows none
2123     */
2124    public void validate() {
2125      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2126        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2127          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2128            int count = seenRows[i(b1)][i(b2)][i(b3)];
2129            int expectedCount = 0;
2130            if (
2131              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
2132                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
2133            ) {
2134              expectedCount = 1;
2135            }
2136            if (count != expectedCount) {
2137              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
2138              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
2139                + "instead of " + expectedCount);
2140            }
2141          }
2142        }
2143      }
2144    }
2145  }
2146
2147  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2148    return loadRegion(r, f, false);
2149  }
2150
2151  public int loadRegion(final Region r, final byte[] f) throws IOException {
2152    return loadRegion((HRegion) r, f);
2153  }
2154
2155  /**
2156   * Load region with rows from 'aaa' to 'zzz'.
2157   * @param r     Region
2158   * @param f     Family
2159   * @param flush flush the cache if true
2160   * @return Count of rows loaded.
2161   */
2162  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
2163    byte[] k = new byte[3];
2164    int rowCount = 0;
2165    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2166      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2167        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2168          k[0] = b1;
2169          k[1] = b2;
2170          k[2] = b3;
2171          Put put = new Put(k);
2172          put.setDurability(Durability.SKIP_WAL);
2173          put.addColumn(f, null, k);
2174          if (r.getWAL() == null) {
2175            put.setDurability(Durability.SKIP_WAL);
2176          }
2177          int preRowCount = rowCount;
2178          int pause = 10;
2179          int maxPause = 1000;
2180          while (rowCount == preRowCount) {
2181            try {
2182              r.put(put);
2183              rowCount++;
2184            } catch (RegionTooBusyException e) {
2185              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2186              Threads.sleep(pause);
2187            }
2188          }
2189        }
2190      }
2191      if (flush) {
2192        r.flush(true);
2193      }
2194    }
2195    return rowCount;
2196  }
2197
2198  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2199    throws IOException {
2200    for (int i = startRow; i < endRow; i++) {
2201      byte[] data = Bytes.toBytes(String.valueOf(i));
2202      Put put = new Put(data);
2203      put.addColumn(f, null, data);
2204      t.put(put);
2205    }
2206  }
2207
2208  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2209    throws IOException {
2210    byte[] row = new byte[rowSize];
2211    for (int i = 0; i < totalRows; i++) {
2212      Bytes.random(row);
2213      Put put = new Put(row);
2214      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2215      t.put(put);
2216    }
2217  }
2218
2219  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2220    int replicaId) throws IOException {
2221    for (int i = startRow; i < endRow; i++) {
2222      String failMsg = "Failed verification of row :" + i;
2223      byte[] data = Bytes.toBytes(String.valueOf(i));
2224      Get get = new Get(data);
2225      get.setReplicaId(replicaId);
2226      get.setConsistency(Consistency.TIMELINE);
2227      Result result = table.get(get);
2228      if (!result.containsColumn(f, null)) {
2229        throw new AssertionError(failMsg);
2230      }
2231      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2232      Cell cell = result.getColumnLatestCell(f, null);
2233      if (
2234        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2235          cell.getValueLength())
2236      ) {
2237        throw new AssertionError(failMsg);
2238      }
2239    }
2240  }
2241
2242  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2243    throws IOException {
2244    verifyNumericRows((HRegion) region, f, startRow, endRow);
2245  }
2246
2247  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2248    throws IOException {
2249    verifyNumericRows(region, f, startRow, endRow, true);
2250  }
2251
2252  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2253    final boolean present) throws IOException {
2254    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2255  }
2256
2257  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2258    final boolean present) throws IOException {
2259    for (int i = startRow; i < endRow; i++) {
2260      String failMsg = "Failed verification of row :" + i;
2261      byte[] data = Bytes.toBytes(String.valueOf(i));
2262      Result result = region.get(new Get(data));
2263
2264      boolean hasResult = result != null && !result.isEmpty();
2265      if (present != hasResult) {
2266        throw new AssertionError(
2267          failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">");
2268      }
2269      if (!present) continue;
2270
2271      if (!result.containsColumn(f, null)) {
2272        throw new AssertionError(failMsg);
2273      }
2274      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2275      Cell cell = result.getColumnLatestCell(f, null);
2276      if (
2277        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2278          cell.getValueLength())
2279      ) {
2280        throw new AssertionError(failMsg);
2281      }
2282    }
2283  }
2284
2285  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2286    throws IOException {
2287    for (int i = startRow; i < endRow; i++) {
2288      byte[] data = Bytes.toBytes(String.valueOf(i));
2289      Delete delete = new Delete(data);
2290      delete.addFamily(f);
2291      t.delete(delete);
2292    }
2293  }
2294
2295  /**
2296   * Return the number of rows in the given table.
2297   * @param table to count rows
2298   * @return count of rows
2299   */
2300  public static int countRows(final Table table) throws IOException {
2301    return countRows(table, new Scan());
2302  }
2303
2304  public static int countRows(final Table table, final Scan scan) throws IOException {
2305    try (ResultScanner results = table.getScanner(scan)) {
2306      int count = 0;
2307      while (results.next() != null) {
2308        count++;
2309      }
2310      return count;
2311    }
2312  }
2313
2314  public int countRows(final Table table, final byte[]... families) throws IOException {
2315    Scan scan = new Scan();
2316    for (byte[] family : families) {
2317      scan.addFamily(family);
2318    }
2319    return countRows(table, scan);
2320  }
2321
2322  /**
2323   * Return the number of rows in the given table.
2324   */
2325  public int countRows(final TableName tableName) throws IOException {
2326    Table table = getConnection().getTable(tableName);
2327    try {
2328      return countRows(table);
2329    } finally {
2330      table.close();
2331    }
2332  }
2333
2334  public int countRows(final Region region) throws IOException {
2335    return countRows(region, new Scan());
2336  }
2337
2338  public int countRows(final Region region, final Scan scan) throws IOException {
2339    InternalScanner scanner = region.getScanner(scan);
2340    try {
2341      return countRows(scanner);
2342    } finally {
2343      scanner.close();
2344    }
2345  }
2346
2347  public int countRows(final InternalScanner scanner) throws IOException {
2348    int scannedCount = 0;
2349    List<Cell> results = new ArrayList<>();
2350    boolean hasMore = true;
2351    while (hasMore) {
2352      hasMore = scanner.next(results);
2353      scannedCount += results.size();
2354      results.clear();
2355    }
2356    return scannedCount;
2357  }
2358
2359  /**
2360   * Return an md5 digest of the entire contents of a table.
2361   */
2362  public String checksumRows(final Table table) throws Exception {
2363
2364    Scan scan = new Scan();
2365    ResultScanner results = table.getScanner(scan);
2366    MessageDigest digest = MessageDigest.getInstance("MD5");
2367    for (Result res : results) {
2368      digest.update(res.getRow());
2369    }
2370    results.close();
2371    return digest.toString();
2372  }
2373
2374  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2375  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2376  static {
2377    int i = 0;
2378    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2379      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2380        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2381          ROWS[i][0] = b1;
2382          ROWS[i][1] = b2;
2383          ROWS[i][2] = b3;
2384          i++;
2385        }
2386      }
2387    }
2388  }
2389
2390  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2391    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2392    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2393    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2394    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2395    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2396    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2397
2398  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2399    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2400    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2401    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2402    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2403    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2404    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2405
2406  /**
2407   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2408   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2409   * @return list of region info for regions added to meta
2410   */
2411  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2412    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2413    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2414    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2415    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2416    MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2417      TableState.State.ENABLED);
2418    // add custom ones
2419    for (int i = 0; i < startKeys.length; i++) {
2420      int j = (i + 1) % startKeys.length;
2421      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2422        .setEndKey(startKeys[j]).build();
2423      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2424      newRegions.add(hri);
2425    }
2426
2427    meta.close();
2428    return newRegions;
2429  }
2430
2431  /**
2432   * Create an unmanaged WAL. Be sure to close it when you're through.
2433   */
2434  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2435    throws IOException {
2436    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2437    // unless I pass along via the conf.
2438    Configuration confForWAL = new Configuration(conf);
2439    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2440    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2441  }
2442
2443  /**
2444   * Create a region with it's own WAL. Be sure to call
2445   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2446   */
2447  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2448    final Configuration conf, final TableDescriptor htd) throws IOException {
2449    return createRegionAndWAL(info, rootDir, conf, htd, true);
2450  }
2451
2452  /**
2453   * Create a region with it's own WAL. Be sure to call
2454   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2455   */
2456  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2457    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2458    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2459    region.setBlockCache(blockCache);
2460    region.initialize();
2461    return region;
2462  }
2463
2464  /**
2465   * Create a region with it's own WAL. Be sure to call
2466   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2467   */
2468  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2469    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2470    throws IOException {
2471    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2472    region.setMobFileCache(mobFileCache);
2473    region.initialize();
2474    return region;
2475  }
2476
2477  /**
2478   * Create a region with it's own WAL. Be sure to call
2479   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2480   */
2481  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2482    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2483    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2484      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2485    WAL wal = createWal(conf, rootDir, info);
2486    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2487  }
2488
2489  /**
2490   * Returns all rows from the hbase:meta table.
2491   * @throws IOException When reading the rows fails.
2492   */
2493  public List<byte[]> getMetaTableRows() throws IOException {
2494    // TODO: Redo using MetaTableAccessor class
2495    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2496    List<byte[]> rows = new ArrayList<>();
2497    ResultScanner s = t.getScanner(new Scan());
2498    for (Result result : s) {
2499      LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()));
2500      rows.add(result.getRow());
2501    }
2502    s.close();
2503    t.close();
2504    return rows;
2505  }
2506
2507  /**
2508   * Returns all rows from the hbase:meta table for a given user table
2509   * @throws IOException When reading the rows fails.
2510   */
2511  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2512    // TODO: Redo using MetaTableAccessor.
2513    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2514    List<byte[]> rows = new ArrayList<>();
2515    ResultScanner s = t.getScanner(new Scan());
2516    for (Result result : s) {
2517      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2518      if (info == null) {
2519        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2520        // TODO figure out what to do for this new hosed case.
2521        continue;
2522      }
2523
2524      if (info.getTable().equals(tableName)) {
2525        LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info);
2526        rows.add(result.getRow());
2527      }
2528    }
2529    s.close();
2530    t.close();
2531    return rows;
2532  }
2533
2534  /**
2535   * Returns all regions of the specified table
2536   * @param tableName the table name
2537   * @return all regions of the specified table
2538   * @throws IOException when getting the regions fails.
2539   */
2540  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2541    try (Admin admin = getConnection().getAdmin()) {
2542      return admin.getRegions(tableName);
2543    }
2544  }
2545
2546  /**
2547   * Find any other region server which is different from the one identified by parameter
2548   * @return another region server
2549   */
2550  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2551    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2552      if (!(rst.getRegionServer() == rs)) {
2553        return rst.getRegionServer();
2554      }
2555    }
2556    return null;
2557  }
2558
2559  /**
2560   * Tool to get the reference to the region server object that holds the region of the specified
2561   * user table.
2562   * @param tableName user table to lookup in hbase:meta
2563   * @return region server that holds it, null if the row doesn't exist
2564   */
2565  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2566    throws IOException, InterruptedException {
2567    List<RegionInfo> regions = getRegions(tableName);
2568    if (regions == null || regions.isEmpty()) {
2569      return null;
2570    }
2571    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2572
2573    byte[] firstRegionName =
2574      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2575        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2576
2577    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2578    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2579      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2580    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2581      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2582    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2583    while (retrier.shouldRetry()) {
2584      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2585      if (index != -1) {
2586        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2587      }
2588      // Came back -1. Region may not be online yet. Sleep a while.
2589      retrier.sleepUntilNextRetry();
2590    }
2591    return null;
2592  }
2593
2594  /**
2595   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2596   * @throws IOException When starting the cluster fails.
2597   */
2598  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2599    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2600    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2601      "99.0");
2602    startMiniMapReduceCluster(2);
2603    return mrCluster;
2604  }
2605
2606  /**
2607   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2608   * internal static LOG_DIR variable.
2609   */
2610  private void forceChangeTaskLogDir() {
2611    Field logDirField;
2612    try {
2613      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2614      logDirField.setAccessible(true);
2615
2616      Field modifiersField = ReflectionUtils.getModifiersField();
2617      modifiersField.setAccessible(true);
2618      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2619
2620      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2621    } catch (SecurityException e) {
2622      throw new RuntimeException(e);
2623    } catch (NoSuchFieldException e) {
2624      throw new RuntimeException(e);
2625    } catch (IllegalArgumentException e) {
2626      throw new RuntimeException(e);
2627    } catch (IllegalAccessException e) {
2628      throw new RuntimeException(e);
2629    }
2630  }
2631
2632  /**
2633   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2634   * filesystem.
2635   * @param servers The number of <code>TaskTracker</code>'s to start.
2636   * @throws IOException When starting the cluster fails.
2637   */
2638  private void startMiniMapReduceCluster(final int servers) throws IOException {
2639    if (mrCluster != null) {
2640      throw new IllegalStateException("MiniMRCluster is already running");
2641    }
2642    LOG.info("Starting mini mapreduce cluster...");
2643    setupClusterTestDir();
2644    createDirsAndSetProperties();
2645
2646    forceChangeTaskLogDir();
2647
2648    //// hadoop2 specific settings
2649    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2650    // we up the VM usable so that processes don't get killed.
2651    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2652
2653    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2654    // this avoids the problem by disabling speculative task execution in tests.
2655    conf.setBoolean("mapreduce.map.speculative", false);
2656    conf.setBoolean("mapreduce.reduce.speculative", false);
2657    ////
2658
2659    // Allow the user to override FS URI for this map-reduce cluster to use.
2660    mrCluster =
2661      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2662        1, null, null, new JobConf(this.conf));
2663    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2664    if (jobConf == null) {
2665      jobConf = mrCluster.createJobConf();
2666    }
2667    // Hadoop MiniMR overwrites this while it should not
2668    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2669    LOG.info("Mini mapreduce cluster started");
2670
2671    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2672    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2673    // necessary config properties here. YARN-129 required adding a few properties.
2674    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2675    // this for mrv2 support; mr1 ignores this
2676    conf.set("mapreduce.framework.name", "yarn");
2677    conf.setBoolean("yarn.is.minicluster", true);
2678    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2679    if (rmAddress != null) {
2680      conf.set("yarn.resourcemanager.address", rmAddress);
2681    }
2682    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2683    if (historyAddress != null) {
2684      conf.set("mapreduce.jobhistory.address", historyAddress);
2685    }
2686    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2687    if (schedulerAddress != null) {
2688      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2689    }
2690    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2691    if (mrJobHistoryWebappAddress != null) {
2692      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2693    }
2694    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2695    if (yarnRMWebappAddress != null) {
2696      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2697    }
2698  }
2699
2700  /**
2701   * Stops the previously started <code>MiniMRCluster</code>.
2702   */
2703  public void shutdownMiniMapReduceCluster() {
2704    if (mrCluster != null) {
2705      LOG.info("Stopping mini mapreduce cluster...");
2706      mrCluster.shutdown();
2707      mrCluster = null;
2708      LOG.info("Mini mapreduce cluster stopped");
2709    }
2710    // Restore configuration to point to local jobtracker
2711    conf.set("mapreduce.jobtracker.address", "local");
2712  }
2713
2714  /**
2715   * Create a stubbed out RegionServerService, mainly for getting FS.
2716   */
2717  public RegionServerServices createMockRegionServerService() throws IOException {
2718    return createMockRegionServerService((ServerName) null);
2719  }
2720
2721  /**
2722   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2723   * TestTokenAuthentication
2724   */
2725  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2726    throws IOException {
2727    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2728    rss.setFileSystem(getTestFileSystem());
2729    rss.setRpcServer(rpc);
2730    return rss;
2731  }
2732
2733  /**
2734   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2735   * TestOpenRegionHandler
2736   */
2737  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2738    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2739    rss.setFileSystem(getTestFileSystem());
2740    return rss;
2741  }
2742
2743  /**
2744   * Switches the logger for the given class to DEBUG level.
2745   * @param clazz The class for which to switch to debug logging.
2746   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2747   *             HBase only uses log4j. You should do this by your own as it you know which log
2748   *             framework you are using then set the log level to debug is very easy.
2749   */
2750  @Deprecated
2751  public void enableDebug(Class<?> clazz) {
2752    Log4jUtils.enableDebug(clazz);
2753  }
2754
2755  /**
2756   * Expire the Master's session
2757   */
2758  public void expireMasterSession() throws Exception {
2759    HMaster master = getMiniHBaseCluster().getMaster();
2760    expireSession(master.getZooKeeper(), false);
2761  }
2762
2763  /**
2764   * Expire a region server's session
2765   * @param index which RS
2766   */
2767  public void expireRegionServerSession(int index) throws Exception {
2768    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2769    expireSession(rs.getZooKeeper(), false);
2770    decrementMinRegionServerCount();
2771  }
2772
2773  private void decrementMinRegionServerCount() {
2774    // decrement the count for this.conf, for newly spwaned master
2775    // this.hbaseCluster shares this configuration too
2776    decrementMinRegionServerCount(getConfiguration());
2777
2778    // each master thread keeps a copy of configuration
2779    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2780      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2781    }
2782  }
2783
2784  private void decrementMinRegionServerCount(Configuration conf) {
2785    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2786    if (currentCount != -1) {
2787      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2788    }
2789  }
2790
2791  public void expireSession(ZKWatcher nodeZK) throws Exception {
2792    expireSession(nodeZK, false);
2793  }
2794
2795  /**
2796   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2797   * http://hbase.apache.org/book.html#trouble.zookeeper There are issues when doing this: [1]
2798   * http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html [2]
2799   * https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2800   * @param nodeZK      - the ZK watcher to expire
2801   * @param checkStatus - true to check if we can create a Table with the current configuration.
2802   */
2803  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2804    Configuration c = new Configuration(this.conf);
2805    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2806    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2807    byte[] password = zk.getSessionPasswd();
2808    long sessionID = zk.getSessionId();
2809
2810    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2811    // so we create a first watcher to be sure that the
2812    // event was sent. We expect that if our watcher receives the event
2813    // other watchers on the same machine will get is as well.
2814    // When we ask to close the connection, ZK does not close it before
2815    // we receive all the events, so don't have to capture the event, just
2816    // closing the connection should be enough.
2817    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2818      @Override
2819      public void process(WatchedEvent watchedEvent) {
2820        LOG.info("Monitor ZKW received event=" + watchedEvent);
2821      }
2822    }, sessionID, password);
2823
2824    // Making it expire
2825    ZooKeeper newZK =
2826      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2827
2828    // ensure that we have connection to the server before closing down, otherwise
2829    // the close session event will be eaten out before we start CONNECTING state
2830    long start = EnvironmentEdgeManager.currentTime();
2831    while (
2832      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2833    ) {
2834      Thread.sleep(1);
2835    }
2836    newZK.close();
2837    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2838
2839    // Now closing & waiting to be sure that the clients get it.
2840    monitor.close();
2841
2842    if (checkStatus) {
2843      getConnection().getTable(TableName.META_TABLE_NAME).close();
2844    }
2845  }
2846
2847  /**
2848   * Get the Mini HBase cluster.
2849   * @return hbase cluster
2850   * @see #getHBaseClusterInterface()
2851   */
2852  public MiniHBaseCluster getHBaseCluster() {
2853    return getMiniHBaseCluster();
2854  }
2855
2856  /**
2857   * Returns the HBaseCluster instance.
2858   * <p>
2859   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2860   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2861   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2862   * instead w/o the need to type-cast.
2863   */
2864  public HBaseCluster getHBaseClusterInterface() {
2865    // implementation note: we should rename this method as #getHBaseCluster(),
2866    // but this would require refactoring 90+ calls.
2867    return hbaseCluster;
2868  }
2869
2870  /**
2871   * Resets the connections so that the next time getConnection() is called, a new connection is
2872   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2873   * the connection is not valid anymore. TODO: There should be a more coherent way of doing this.
2874   * Unfortunately the way tests are written, not all start() stop() calls go through this class.
2875   * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult
2876   * for this wrapper class to maintain the connection state automatically. Cleaning this is a much
2877   * bigger refactor.
2878   */
2879  public void invalidateConnection() throws IOException {
2880    closeConnection();
2881    // Update the master addresses if they changed.
2882    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2883    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2884    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2885      masterConfigBefore, masterConfAfter);
2886    conf.set(HConstants.MASTER_ADDRS_KEY,
2887      getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2888  }
2889
2890  /**
2891   * Get a shared Connection to the cluster. this method is thread safe.
2892   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2893   */
2894  public Connection getConnection() throws IOException {
2895    return getAsyncConnection().toConnection();
2896  }
2897
2898  /**
2899   * Get a assigned Connection to the cluster. this method is thread safe.
2900   * @param user assigned user
2901   * @return A Connection with assigned user.
2902   */
2903  public Connection getConnection(User user) throws IOException {
2904    return getAsyncConnection(user).toConnection();
2905  }
2906
2907  /**
2908   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2909   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2910   *         of cluster.
2911   */
2912  public AsyncClusterConnection getAsyncConnection() throws IOException {
2913    try {
2914      return asyncConnection.updateAndGet(connection -> {
2915        if (connection == null) {
2916          try {
2917            User user = UserProvider.instantiate(conf).getCurrent();
2918            connection = getAsyncConnection(user);
2919          } catch (IOException ioe) {
2920            throw new UncheckedIOException("Failed to create connection", ioe);
2921          }
2922        }
2923        return connection;
2924      });
2925    } catch (UncheckedIOException exception) {
2926      throw exception.getCause();
2927    }
2928  }
2929
2930  /**
2931   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2932   * @param user assigned user
2933   * @return An AsyncClusterConnection with assigned user.
2934   */
2935  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2936    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2937  }
2938
2939  public void closeConnection() throws IOException {
2940    if (hbaseAdmin != null) {
2941      Closeables.close(hbaseAdmin, true);
2942      hbaseAdmin = null;
2943    }
2944    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2945    if (asyncConnection != null) {
2946      Closeables.close(asyncConnection, true);
2947    }
2948  }
2949
2950  /**
2951   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2952   * it has no effect, it will be closed automatically when the cluster shutdowns
2953   */
2954  public Admin getAdmin() throws IOException {
2955    if (hbaseAdmin == null) {
2956      this.hbaseAdmin = getConnection().getAdmin();
2957    }
2958    return hbaseAdmin;
2959  }
2960
2961  private Admin hbaseAdmin = null;
2962
2963  /**
2964   * Returns an {@link Hbck} instance. Needs be closed when done.
2965   */
2966  public Hbck getHbck() throws IOException {
2967    return getConnection().getHbck();
2968  }
2969
2970  /**
2971   * Unassign the named region.
2972   * @param regionName The region to unassign.
2973   */
2974  public void unassignRegion(String regionName) throws IOException {
2975    unassignRegion(Bytes.toBytes(regionName));
2976  }
2977
2978  /**
2979   * Unassign the named region.
2980   * @param regionName The region to unassign.
2981   */
2982  public void unassignRegion(byte[] regionName) throws IOException {
2983    getAdmin().unassign(regionName, true);
2984  }
2985
2986  /**
2987   * Closes the region containing the given row.
2988   * @param row   The row to find the containing region.
2989   * @param table The table to find the region.
2990   */
2991  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2992    unassignRegionByRow(Bytes.toBytes(row), table);
2993  }
2994
2995  /**
2996   * Closes the region containing the given row.
2997   * @param row   The row to find the containing region.
2998   * @param table The table to find the region.
2999   */
3000  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
3001    HRegionLocation hrl = table.getRegionLocation(row);
3002    unassignRegion(hrl.getRegion().getRegionName());
3003  }
3004
3005  /**
3006   * Retrieves a splittable region randomly from tableName
3007   * @param tableName   name of table
3008   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3009   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3010   */
3011  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3012    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3013    int regCount = regions.size();
3014    Set<Integer> attempted = new HashSet<>();
3015    int idx;
3016    int attempts = 0;
3017    do {
3018      regions = getHBaseCluster().getRegions(tableName);
3019      if (regCount != regions.size()) {
3020        // if there was region movement, clear attempted Set
3021        attempted.clear();
3022      }
3023      regCount = regions.size();
3024      // There are chances that before we get the region for the table from an RS the region may
3025      // be going for CLOSE. This may be because online schema change is enabled
3026      if (regCount > 0) {
3027        idx = ThreadLocalRandom.current().nextInt(regCount);
3028        // if we have just tried this region, there is no need to try again
3029        if (attempted.contains(idx)) {
3030          continue;
3031        }
3032        HRegion region = regions.get(idx);
3033        if (region.checkSplit().isPresent()) {
3034          return region;
3035        }
3036        attempted.add(idx);
3037      }
3038      attempts++;
3039    } while (maxAttempts == -1 || attempts < maxAttempts);
3040    return null;
3041  }
3042
3043  public MiniDFSCluster getDFSCluster() {
3044    return dfsCluster;
3045  }
3046
3047  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3048    setDFSCluster(cluster, true);
3049  }
3050
3051  /**
3052   * Set the MiniDFSCluster
3053   * @param cluster     cluster to use
3054   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
3055   *                    is set.
3056   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3057   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
3058   */
3059  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3060    throws IllegalStateException, IOException {
3061    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3062      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3063    }
3064    this.dfsCluster = cluster;
3065    this.setFs();
3066  }
3067
3068  public FileSystem getTestFileSystem() throws IOException {
3069    return HFileSystem.get(conf);
3070  }
3071
3072  /**
3073   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
3074   * (30 seconds).
3075   * @param table Table to wait on.
3076   */
3077  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
3078    waitTableAvailable(table.getName(), 30000);
3079  }
3080
3081  public void waitTableAvailable(TableName table, long timeoutMillis)
3082    throws InterruptedException, IOException {
3083    waitFor(timeoutMillis, predicateTableAvailable(table));
3084  }
3085
3086  /**
3087   * Wait until all regions in a table have been assigned
3088   * @param table         Table to wait on.
3089   * @param timeoutMillis Timeout.
3090   */
3091  public void waitTableAvailable(byte[] table, long timeoutMillis)
3092    throws InterruptedException, IOException {
3093    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3094  }
3095
3096  public String explainTableAvailability(TableName tableName) throws IOException {
3097    StringBuilder msg =
3098      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
3099    if (getHBaseCluster().getMaster().isAlive()) {
3100      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3101        .getRegionStates().getRegionAssignments();
3102      final List<Pair<RegionInfo, ServerName>> metaLocations =
3103        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3104      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3105        RegionInfo hri = metaLocation.getFirst();
3106        ServerName sn = metaLocation.getSecond();
3107        if (!assignments.containsKey(hri)) {
3108          msg.append(", region ").append(hri)
3109            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
3110        } else if (sn == null) {
3111          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
3112        } else if (!sn.equals(assignments.get(hri))) {
3113          msg.append(",  region ").append(hri)
3114            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
3115            .append(" <> ").append(assignments.get(hri));
3116        }
3117      }
3118    }
3119    return msg.toString();
3120  }
3121
3122  public String explainTableState(final TableName table, TableState.State state)
3123    throws IOException {
3124    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3125    if (tableState == null) {
3126      return "TableState in META: No table state in META for table " + table
3127        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3128    } else if (!tableState.inStates(state)) {
3129      return "TableState in META: Not " + state + " state, but " + tableState;
3130    } else {
3131      return "TableState in META: OK";
3132    }
3133  }
3134
3135  public TableState findLastTableState(final TableName table) throws IOException {
3136    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3137    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3138      @Override
3139      public boolean visit(Result r) throws IOException {
3140        if (!Arrays.equals(r.getRow(), table.getName())) {
3141          return false;
3142        }
3143        TableState state = CatalogFamilyFormat.getTableState(r);
3144        if (state != null) {
3145          lastTableState.set(state);
3146        }
3147        return true;
3148      }
3149    };
3150    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
3151      Integer.MAX_VALUE, visitor);
3152    return lastTableState.get();
3153  }
3154
3155  /**
3156   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3157   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
3158   * table.
3159   * @param table the table to wait on.
3160   * @throws InterruptedException if interrupted while waiting
3161   * @throws IOException          if an IO problem is encountered
3162   */
3163  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
3164    waitTableEnabled(table, 30000);
3165  }
3166
3167  /**
3168   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3169   * have been all assigned.
3170   * @see #waitTableEnabled(TableName, long)
3171   * @param table         Table to wait on.
3172   * @param timeoutMillis Time to wait on it being marked enabled.
3173   */
3174  public void waitTableEnabled(byte[] table, long timeoutMillis)
3175    throws InterruptedException, IOException {
3176    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3177  }
3178
3179  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
3180    waitFor(timeoutMillis, predicateTableEnabled(table));
3181  }
3182
3183  /**
3184   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
3185   * after default period (30 seconds)
3186   * @param table Table to wait on.
3187   */
3188  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
3189    waitTableDisabled(table, 30000);
3190  }
3191
3192  public void waitTableDisabled(TableName table, long millisTimeout)
3193    throws InterruptedException, IOException {
3194    waitFor(millisTimeout, predicateTableDisabled(table));
3195  }
3196
3197  /**
3198   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
3199   * @param table         Table to wait on.
3200   * @param timeoutMillis Time to wait on it being marked disabled.
3201   */
3202  public void waitTableDisabled(byte[] table, long timeoutMillis)
3203    throws InterruptedException, IOException {
3204    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3205  }
3206
3207  /**
3208   * Make sure that at least the specified number of region servers are running
3209   * @param num minimum number of region servers that should be running
3210   * @return true if we started some servers
3211   */
3212  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
3213    boolean startedServer = false;
3214    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3215    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
3216      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3217      startedServer = true;
3218    }
3219
3220    return startedServer;
3221  }
3222
3223  /**
3224   * Make sure that at least the specified number of region servers are running. We don't count the
3225   * ones that are currently stopping or are stopped.
3226   * @param num minimum number of region servers that should be running
3227   * @return true if we started some servers
3228   */
3229  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
3230    boolean startedServer = ensureSomeRegionServersAvailable(num);
3231
3232    int nonStoppedServers = 0;
3233    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
3234
3235      HRegionServer hrs = rst.getRegionServer();
3236      if (hrs.isStopping() || hrs.isStopped()) {
3237        LOG.info("A region server is stopped or stopping:" + hrs);
3238      } else {
3239        nonStoppedServers++;
3240      }
3241    }
3242    for (int i = nonStoppedServers; i < num; ++i) {
3243      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3244      startedServer = true;
3245    }
3246    return startedServer;
3247  }
3248
3249  /**
3250   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
3251   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
3252   * @param c                     Initial configuration
3253   * @param differentiatingSuffix Suffix to differentiate this user from others.
3254   * @return A new configuration instance with a different user set into it.
3255   */
3256  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
3257    throws IOException {
3258    FileSystem currentfs = FileSystem.get(c);
3259    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3260      return User.getCurrent();
3261    }
3262    // Else distributed filesystem. Make a new instance per daemon. Below
3263    // code is taken from the AppendTestUtil over in hdfs.
3264    String username = User.getCurrent().getName() + differentiatingSuffix;
3265    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
3266    return user;
3267  }
3268
3269  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3270    throws IOException {
3271    NavigableSet<String> online = new TreeSet<>();
3272    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3273      try {
3274        for (RegionInfo region : ProtobufUtil
3275          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3276          online.add(region.getRegionNameAsString());
3277        }
3278      } catch (RegionServerStoppedException e) {
3279        // That's fine.
3280      }
3281    }
3282    return online;
3283  }
3284
3285  /**
3286   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
3287   * linger. Here is the exception you'll see:
3288   *
3289   * <pre>
3290   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3291   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3292   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3293   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3294   * </pre>
3295   *
3296   * @param stream A DFSClient.DFSOutputStream.
3297   */
3298  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3299    try {
3300      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3301      for (Class<?> clazz : clazzes) {
3302        String className = clazz.getSimpleName();
3303        if (className.equals("DFSOutputStream")) {
3304          if (clazz.isInstance(stream)) {
3305            Field maxRecoveryErrorCountField =
3306              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3307            maxRecoveryErrorCountField.setAccessible(true);
3308            maxRecoveryErrorCountField.setInt(stream, max);
3309            break;
3310          }
3311        }
3312      }
3313    } catch (Exception e) {
3314      LOG.info("Could not set max recovery field", e);
3315    }
3316  }
3317
3318  /**
3319   * Uses directly the assignment manager to assign the region. and waits until the specified region
3320   * has completed assignment.
3321   * @return true if the region is assigned false otherwise.
3322   */
3323  public boolean assignRegion(final RegionInfo regionInfo)
3324    throws IOException, InterruptedException {
3325    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3326    am.assign(regionInfo);
3327    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3328  }
3329
3330  /**
3331   * Move region to destination server and wait till region is completely moved and online
3332   * @param destRegion region to move
3333   * @param destServer destination server of the region
3334   */
3335  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3336    throws InterruptedException, IOException {
3337    HMaster master = getMiniHBaseCluster().getMaster();
3338    // TODO: Here we start the move. The move can take a while.
3339    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3340    while (true) {
3341      ServerName serverName =
3342        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3343      if (serverName != null && serverName.equals(destServer)) {
3344        assertRegionOnServer(destRegion, serverName, 2000);
3345        break;
3346      }
3347      Thread.sleep(10);
3348    }
3349  }
3350
3351  /**
3352   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3353   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3354   * master has been informed and updated hbase:meta with the regions deployed server.
3355   * @param tableName the table name
3356   */
3357  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3358    waitUntilAllRegionsAssigned(tableName,
3359      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3360  }
3361
3362  /**
3363   * Waith until all system table's regions get assigned
3364   */
3365  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3366    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3367  }
3368
3369  /**
3370   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3371   * timeout. This means all regions have been deployed, master has been informed and updated
3372   * hbase:meta with the regions deployed server.
3373   * @param tableName the table name
3374   * @param timeout   timeout, in milliseconds
3375   */
3376  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3377    throws IOException {
3378    if (!TableName.isMetaTableName(tableName)) {
3379      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3380        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3381          + timeout + "ms");
3382        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3383          @Override
3384          public String explainFailure() throws IOException {
3385            return explainTableAvailability(tableName);
3386          }
3387
3388          @Override
3389          public boolean evaluate() throws IOException {
3390            Scan scan = new Scan();
3391            scan.addFamily(HConstants.CATALOG_FAMILY);
3392            boolean tableFound = false;
3393            try (ResultScanner s = meta.getScanner(scan)) {
3394              for (Result r; (r = s.next()) != null;) {
3395                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3396                RegionInfo info = RegionInfo.parseFromOrNull(b);
3397                if (info != null && info.getTable().equals(tableName)) {
3398                  // Get server hosting this region from catalog family. Return false if no server
3399                  // hosting this region, or if the server hosting this region was recently killed
3400                  // (for fault tolerance testing).
3401                  tableFound = true;
3402                  byte[] server =
3403                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3404                  if (server == null) {
3405                    return false;
3406                  } else {
3407                    byte[] startCode =
3408                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3409                    ServerName serverName =
3410                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3411                        + Bytes.toLong(startCode));
3412                    if (
3413                      !getHBaseClusterInterface().isDistributedCluster()
3414                        && getHBaseCluster().isKilledRS(serverName)
3415                    ) {
3416                      return false;
3417                    }
3418                  }
3419                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3420                    return false;
3421                  }
3422                }
3423              }
3424            }
3425            if (!tableFound) {
3426              LOG.warn(
3427                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3428            }
3429            return tableFound;
3430          }
3431        });
3432      }
3433    }
3434    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3435    // check from the master state if we are using a mini cluster
3436    if (!getHBaseClusterInterface().isDistributedCluster()) {
3437      // So, all regions are in the meta table but make sure master knows of the assignments before
3438      // returning -- sometimes this can lag.
3439      HMaster master = getHBaseCluster().getMaster();
3440      final RegionStates states = master.getAssignmentManager().getRegionStates();
3441      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3442        @Override
3443        public String explainFailure() throws IOException {
3444          return explainTableAvailability(tableName);
3445        }
3446
3447        @Override
3448        public boolean evaluate() throws IOException {
3449          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3450          return hris != null && !hris.isEmpty();
3451        }
3452      });
3453    }
3454    LOG.info("All regions for table " + tableName + " assigned.");
3455  }
3456
3457  /**
3458   * Do a small get/scan against one store. This is required because store has no actual methods of
3459   * querying itself, and relies on StoreScanner.
3460   */
3461  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3462    Scan scan = new Scan(get);
3463    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3464      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3465      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3466      // readpoint 0.
3467      0);
3468
3469    List<Cell> result = new ArrayList<>();
3470    scanner.next(result);
3471    if (!result.isEmpty()) {
3472      // verify that we are on the row we want:
3473      Cell kv = result.get(0);
3474      if (!CellUtil.matchingRows(kv, get.getRow())) {
3475        result.clear();
3476      }
3477    }
3478    scanner.close();
3479    return result;
3480  }
3481
3482  /**
3483   * Create region split keys between startkey and endKey
3484   * @param numRegions the number of regions to be created. it has to be greater than 3.
3485   * @return resulting split keys
3486   */
3487  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3488    if (numRegions <= 3) {
3489      throw new AssertionError();
3490    }
3491    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3492    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3493    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3494    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3495    return result;
3496  }
3497
3498  /**
3499   * Do a small get/scan against one store. This is required because store has no actual methods of
3500   * querying itself, and relies on StoreScanner.
3501   */
3502  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3503    throws IOException {
3504    Get get = new Get(row);
3505    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3506    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3507
3508    return getFromStoreFile(store, get);
3509  }
3510
3511  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3512    final List<? extends Cell> actual) {
3513    final int eLen = expected.size();
3514    final int aLen = actual.size();
3515    final int minLen = Math.min(eLen, aLen);
3516
3517    int i;
3518    for (i = 0; i < minLen
3519      && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) {
3520    }
3521
3522    if (additionalMsg == null) {
3523      additionalMsg = "";
3524    }
3525    if (!additionalMsg.isEmpty()) {
3526      additionalMsg = ". " + additionalMsg;
3527    }
3528
3529    if (eLen != aLen || i != minLen) {
3530      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3531        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3532        + " (length " + aLen + ")" + additionalMsg);
3533    }
3534  }
3535
3536  public static <T> String safeGetAsStr(List<T> lst, int i) {
3537    if (0 <= i && i < lst.size()) {
3538      return lst.get(i).toString();
3539    } else {
3540      return "<out_of_range>";
3541    }
3542  }
3543
3544  public String getClusterKey() {
3545    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3546      + ":"
3547      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3548  }
3549
3550  /** Creates a random table with the given parameters */
3551  public Table createRandomTable(TableName tableName, final Collection<String> families,
3552    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3553    final int numRowsPerFlush) throws IOException, InterruptedException {
3554
3555    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3556      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3557      + maxVersions + "\n");
3558
3559    final int numCF = families.size();
3560    final byte[][] cfBytes = new byte[numCF][];
3561    {
3562      int cfIndex = 0;
3563      for (String cf : families) {
3564        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3565      }
3566    }
3567
3568    final int actualStartKey = 0;
3569    final int actualEndKey = Integer.MAX_VALUE;
3570    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3571    final int splitStartKey = actualStartKey + keysPerRegion;
3572    final int splitEndKey = actualEndKey - keysPerRegion;
3573    final String keyFormat = "%08x";
3574    final Table table = createTable(tableName, cfBytes, maxVersions,
3575      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3576      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3577
3578    if (hbaseCluster != null) {
3579      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3580    }
3581
3582    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3583
3584    final Random rand = ThreadLocalRandom.current();
3585    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3586      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3587        final byte[] row = Bytes.toBytes(
3588          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3589
3590        Put put = new Put(row);
3591        Delete del = new Delete(row);
3592        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3593          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3594          final long ts = rand.nextInt();
3595          final byte[] qual = Bytes.toBytes("col" + iCol);
3596          if (rand.nextBoolean()) {
3597            final byte[] value =
3598              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3599                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3600            put.addColumn(cf, qual, ts, value);
3601          } else if (rand.nextDouble() < 0.8) {
3602            del.addColumn(cf, qual, ts);
3603          } else {
3604            del.addColumns(cf, qual, ts);
3605          }
3606        }
3607
3608        if (!put.isEmpty()) {
3609          mutator.mutate(put);
3610        }
3611
3612        if (!del.isEmpty()) {
3613          mutator.mutate(del);
3614        }
3615      }
3616      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3617      mutator.flush();
3618      if (hbaseCluster != null) {
3619        getMiniHBaseCluster().flushcache(table.getName());
3620      }
3621    }
3622    mutator.close();
3623
3624    return table;
3625  }
3626
3627  public static int randomFreePort() {
3628    return HBaseCommonTestingUtility.randomFreePort();
3629  }
3630
3631  public static String randomMultiCastAddress() {
3632    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3633  }
3634
3635  public static void waitForHostPort(String host, int port) throws IOException {
3636    final int maxTimeMs = 10000;
3637    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3638    IOException savedException = null;
3639    LOG.info("Waiting for server at " + host + ":" + port);
3640    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3641      try {
3642        Socket sock = new Socket(InetAddress.getByName(host), port);
3643        sock.close();
3644        savedException = null;
3645        LOG.info("Server at " + host + ":" + port + " is available");
3646        break;
3647      } catch (UnknownHostException e) {
3648        throw new IOException("Failed to look up " + host, e);
3649      } catch (IOException e) {
3650        savedException = e;
3651      }
3652      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3653    }
3654
3655    if (savedException != null) {
3656      throw savedException;
3657    }
3658  }
3659
3660  /**
3661   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3662   * continues.
3663   * @return the number of regions the table was split into
3664   */
3665  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3666    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3667    throws IOException {
3668    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3669      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3670  }
3671
3672  /**
3673   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3674   * continues.
3675   * @return the number of regions the table was split into
3676   */
3677  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3678    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3679    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3680    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3681    builder.setDurability(durability);
3682    builder.setRegionReplication(regionReplication);
3683    ColumnFamilyDescriptorBuilder cfBuilder =
3684      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3685    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3686    cfBuilder.setCompressionType(compression);
3687    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3688      numRegionsPerServer);
3689  }
3690
3691  /**
3692   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3693   * continues.
3694   * @return the number of regions the table was split into
3695   */
3696  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3697    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3698    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3699    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3700    builder.setDurability(durability);
3701    builder.setRegionReplication(regionReplication);
3702    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3703    for (int i = 0; i < columnFamilies.length; i++) {
3704      ColumnFamilyDescriptorBuilder cfBuilder =
3705        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3706      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3707      cfBuilder.setCompressionType(compression);
3708      hcds[i] = cfBuilder.build();
3709    }
3710    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3711  }
3712
3713  /**
3714   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3715   * continues.
3716   * @return the number of regions the table was split into
3717   */
3718  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3719    ColumnFamilyDescriptor hcd) throws IOException {
3720    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3721  }
3722
3723  /**
3724   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3725   * continues.
3726   * @return the number of regions the table was split into
3727   */
3728  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3729    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3730    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3731      numRegionsPerServer);
3732  }
3733
3734  /**
3735   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3736   * continues.
3737   * @return the number of regions the table was split into
3738   */
3739  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3740    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3741    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3742      numRegionsPerServer);
3743  }
3744
3745  /**
3746   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3747   * continues.
3748   * @return the number of regions the table was split into
3749   */
3750  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3751    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3752    throws IOException {
3753    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3754    for (ColumnFamilyDescriptor cd : cds) {
3755      if (!td.hasColumnFamily(cd.getName())) {
3756        builder.setColumnFamily(cd);
3757      }
3758    }
3759    td = builder.build();
3760    int totalNumberOfRegions = 0;
3761    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3762    Admin admin = unmanagedConnection.getAdmin();
3763
3764    try {
3765      // create a table a pre-splits regions.
3766      // The number of splits is set as:
3767      // region servers * regions per region server).
3768      int numberOfServers = admin.getRegionServers().size();
3769      if (numberOfServers == 0) {
3770        throw new IllegalStateException("No live regionservers");
3771      }
3772
3773      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3774      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3775        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3776        + numRegionsPerServer + ")");
3777
3778      byte[][] splits = splitter.split(totalNumberOfRegions);
3779
3780      admin.createTable(td, splits);
3781    } catch (MasterNotRunningException e) {
3782      LOG.error("Master not running", e);
3783      throw new IOException(e);
3784    } catch (TableExistsException e) {
3785      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3786    } finally {
3787      admin.close();
3788      unmanagedConnection.close();
3789    }
3790    return totalNumberOfRegions;
3791  }
3792
3793  public static int getMetaRSPort(Connection connection) throws IOException {
3794    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3795      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3796    }
3797  }
3798
3799  /**
3800   * Due to async racing issue, a region may not be in the online region list of a region server
3801   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3802   */
3803  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3804    final long timeout) throws IOException, InterruptedException {
3805    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3806    while (true) {
3807      List<RegionInfo> regions = getAdmin().getRegions(server);
3808      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3809      long now = EnvironmentEdgeManager.currentTime();
3810      if (now > timeoutTime) break;
3811      Thread.sleep(10);
3812    }
3813    throw new AssertionError(
3814      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3815  }
3816
3817  /**
3818   * Check to make sure the region is open on the specified region server, but not on any other one.
3819   */
3820  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3821    final long timeout) throws IOException, InterruptedException {
3822    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3823    while (true) {
3824      List<RegionInfo> regions = getAdmin().getRegions(server);
3825      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3826        List<JVMClusterUtil.RegionServerThread> rsThreads =
3827          getHBaseCluster().getLiveRegionServerThreads();
3828        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3829          HRegionServer rs = rsThread.getRegionServer();
3830          if (server.equals(rs.getServerName())) {
3831            continue;
3832          }
3833          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3834          for (HRegion r : hrs) {
3835            if (r.getRegionInfo().getRegionId() == hri.getRegionId()) {
3836              throw new AssertionError("Region should not be double assigned");
3837            }
3838          }
3839        }
3840        return; // good, we are happy
3841      }
3842      long now = EnvironmentEdgeManager.currentTime();
3843      if (now > timeoutTime) break;
3844      Thread.sleep(10);
3845    }
3846    throw new AssertionError(
3847      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3848  }
3849
3850  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3851    TableDescriptor td =
3852      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3853    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3854    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3855  }
3856
3857  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3858    BlockCache blockCache) throws IOException {
3859    TableDescriptor td =
3860      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3861    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3862    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3863  }
3864
3865  public static void setFileSystemURI(String fsURI) {
3866    FS_URI = fsURI;
3867  }
3868
3869  /**
3870   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3871   */
3872  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3873    return new ExplainingPredicate<IOException>() {
3874      @Override
3875      public String explainFailure() throws IOException {
3876        final RegionStates regionStates =
3877          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3878        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3879      }
3880
3881      @Override
3882      public boolean evaluate() throws IOException {
3883        HMaster master = getMiniHBaseCluster().getMaster();
3884        if (master == null) return false;
3885        AssignmentManager am = master.getAssignmentManager();
3886        if (am == null) return false;
3887        return !am.hasRegionsInTransition();
3888      }
3889    };
3890  }
3891
3892  /**
3893   * Returns a {@link Predicate} for checking that table is enabled
3894   */
3895  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3896    return new ExplainingPredicate<IOException>() {
3897      @Override
3898      public String explainFailure() throws IOException {
3899        return explainTableState(tableName, TableState.State.ENABLED);
3900      }
3901
3902      @Override
3903      public boolean evaluate() throws IOException {
3904        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3905      }
3906    };
3907  }
3908
3909  /**
3910   * Returns a {@link Predicate} for checking that table is enabled
3911   */
3912  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3913    return new ExplainingPredicate<IOException>() {
3914      @Override
3915      public String explainFailure() throws IOException {
3916        return explainTableState(tableName, TableState.State.DISABLED);
3917      }
3918
3919      @Override
3920      public boolean evaluate() throws IOException {
3921        return getAdmin().isTableDisabled(tableName);
3922      }
3923    };
3924  }
3925
3926  /**
3927   * Returns a {@link Predicate} for checking that table is enabled
3928   */
3929  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3930    return new ExplainingPredicate<IOException>() {
3931      @Override
3932      public String explainFailure() throws IOException {
3933        return explainTableAvailability(tableName);
3934      }
3935
3936      @Override
3937      public boolean evaluate() throws IOException {
3938        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3939        if (tableAvailable) {
3940          try (Table table = getConnection().getTable(tableName)) {
3941            TableDescriptor htd = table.getDescriptor();
3942            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3943              .getAllRegionLocations()) {
3944              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3945                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3946                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3947              for (byte[] family : htd.getColumnFamilyNames()) {
3948                scan.addFamily(family);
3949              }
3950              try (ResultScanner scanner = table.getScanner(scan)) {
3951                scanner.next();
3952              }
3953            }
3954          }
3955        }
3956        return tableAvailable;
3957      }
3958    };
3959  }
3960
3961  /**
3962   * Wait until no regions in transition.
3963   * @param timeout How long to wait.
3964   */
3965  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3966    waitFor(timeout, predicateNoRegionsInTransition());
3967  }
3968
3969  /**
3970   * Wait until no regions in transition. (time limit 15min)
3971   */
3972  public void waitUntilNoRegionsInTransition() throws IOException {
3973    waitUntilNoRegionsInTransition(15 * 60000);
3974  }
3975
3976  /**
3977   * Wait until labels is ready in VisibilityLabelsCache.
3978   */
3979  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3980    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3981    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3982
3983      @Override
3984      public boolean evaluate() {
3985        for (String label : labels) {
3986          if (labelsCache.getLabelOrdinal(label) == 0) {
3987            return false;
3988          }
3989        }
3990        return true;
3991      }
3992
3993      @Override
3994      public String explainFailure() {
3995        for (String label : labels) {
3996          if (labelsCache.getLabelOrdinal(label) == 0) {
3997            return label + " is not available yet";
3998          }
3999        }
4000        return "";
4001      }
4002    });
4003  }
4004
4005  /**
4006   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4007   * available.
4008   * @return the list of column descriptors
4009   */
4010  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4011    return generateColumnDescriptors("");
4012  }
4013
4014  /**
4015   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4016   * available.
4017   * @param prefix family names prefix
4018   * @return the list of column descriptors
4019   */
4020  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4021    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4022    long familyId = 0;
4023    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
4024      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
4025        for (BloomType bloomType : BloomType.values()) {
4026          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4027          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4028            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4029          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4030          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4031          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4032          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4033          familyId++;
4034        }
4035      }
4036    }
4037    return columnFamilyDescriptors;
4038  }
4039
4040  /**
4041   * Get supported compression algorithms.
4042   * @return supported compression algorithms.
4043   */
4044  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4045    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4046    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4047    for (String algoName : allAlgos) {
4048      try {
4049        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4050        algo.getCompressor();
4051        supportedAlgos.add(algo);
4052      } catch (Throwable t) {
4053        // this algo is not available
4054      }
4055    }
4056    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4057  }
4058
4059  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4060    Scan scan = new Scan().withStartRow(row);
4061    scan.setReadType(ReadType.PREAD);
4062    scan.setCaching(1);
4063    scan.setReversed(true);
4064    scan.addFamily(family);
4065    try (RegionScanner scanner = r.getScanner(scan)) {
4066      List<Cell> cells = new ArrayList<>(1);
4067      scanner.next(cells);
4068      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4069        return null;
4070      }
4071      return Result.create(cells);
4072    }
4073  }
4074
4075  private boolean isTargetTable(final byte[] inRow, Cell c) {
4076    String inputRowString = Bytes.toString(inRow);
4077    int i = inputRowString.indexOf(HConstants.DELIMITER);
4078    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4079    int o = outputRowString.indexOf(HConstants.DELIMITER);
4080    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4081  }
4082
4083  /**
4084   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
4085   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
4086   * kerby KDC server and utility for using it,
4087   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4088   * less baggage. It came in in HBASE-5291.
4089   */
4090  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4091    Properties conf = MiniKdc.createConf();
4092    conf.put(MiniKdc.DEBUG, true);
4093    MiniKdc kdc = null;
4094    File dir = null;
4095    // There is time lag between selecting a port and trying to bind with it. It's possible that
4096    // another service captures the port in between which'll result in BindException.
4097    boolean bindException;
4098    int numTries = 0;
4099    do {
4100      try {
4101        bindException = false;
4102        dir = new File(getDataTestDir("kdc").toUri().getPath());
4103        kdc = new MiniKdc(conf, dir);
4104        kdc.start();
4105      } catch (BindException e) {
4106        FileUtils.deleteDirectory(dir); // clean directory
4107        numTries++;
4108        if (numTries == 3) {
4109          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4110          throw e;
4111        }
4112        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4113        bindException = true;
4114      }
4115    } while (bindException);
4116    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4117    return kdc;
4118  }
4119
4120  public int getNumHFiles(final TableName tableName, final byte[] family) {
4121    int numHFiles = 0;
4122    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4123      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
4124    }
4125    return numHFiles;
4126  }
4127
4128  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4129    final byte[] family) {
4130    int numHFiles = 0;
4131    for (Region region : rs.getRegions(tableName)) {
4132      numHFiles += region.getStore(family).getStorefilesCount();
4133    }
4134    return numHFiles;
4135  }
4136
4137  private void assertEquals(String message, int expected, int actual) {
4138    if (expected == actual) {
4139      return;
4140    }
4141    String formatted = "";
4142    if (message != null && !"".equals(message)) {
4143      formatted = message + " ";
4144    }
4145    throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">");
4146  }
4147
4148  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4149    if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) {
4150      throw new AssertionError();
4151    }
4152    assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode());
4153    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4154    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4155    assertEquals("", ltdFamilies.size(), rtdFamilies.size());
4156    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
4157        it2 = rtdFamilies.iterator(); it.hasNext();) {
4158      assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4159    }
4160  }
4161
4162  /**
4163   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4164   * invocations.
4165   */
4166  public static void await(final long sleepMillis, final BooleanSupplier condition)
4167    throws InterruptedException {
4168    try {
4169      while (!condition.getAsBoolean()) {
4170        Thread.sleep(sleepMillis);
4171      }
4172    } catch (RuntimeException e) {
4173      if (e.getCause() instanceof AssertionError) {
4174        throw (AssertionError) e.getCause();
4175      }
4176      throw e;
4177    }
4178  }
4179}