001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.UncheckedIOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.Modifier;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.BlockCache;
096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.ipc.RpcServerInterface;
099import org.apache.hadoop.hbase.logging.Log4jUtils;
100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
101import org.apache.hadoop.hbase.master.HMaster;
102import org.apache.hadoop.hbase.master.RegionState;
103import org.apache.hadoop.hbase.master.ServerManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
106import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
107import org.apache.hadoop.hbase.master.assignment.RegionStates;
108import org.apache.hadoop.hbase.mob.MobFileCache;
109import org.apache.hadoop.hbase.regionserver.BloomType;
110import org.apache.hadoop.hbase.regionserver.ChunkCreator;
111import org.apache.hadoop.hbase.regionserver.HRegion;
112import org.apache.hadoop.hbase.regionserver.HRegionServer;
113import org.apache.hadoop.hbase.regionserver.HStore;
114import org.apache.hadoop.hbase.regionserver.InternalScanner;
115import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
116import org.apache.hadoop.hbase.regionserver.Region;
117import org.apache.hadoop.hbase.regionserver.RegionScanner;
118import org.apache.hadoop.hbase.regionserver.RegionServerServices;
119import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
120import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
121import org.apache.hadoop.hbase.security.User;
122import org.apache.hadoop.hbase.security.UserProvider;
123import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
124import org.apache.hadoop.hbase.util.Bytes;
125import org.apache.hadoop.hbase.util.CommonFSUtils;
126import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
127import org.apache.hadoop.hbase.util.FSUtils;
128import org.apache.hadoop.hbase.util.JVM;
129import org.apache.hadoop.hbase.util.JVMClusterUtil;
130import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
131import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
132import org.apache.hadoop.hbase.util.Pair;
133import org.apache.hadoop.hbase.util.ReflectionUtils;
134import org.apache.hadoop.hbase.util.RegionSplitter;
135import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
136import org.apache.hadoop.hbase.util.RetryCounter;
137import org.apache.hadoop.hbase.util.Threads;
138import org.apache.hadoop.hbase.wal.WAL;
139import org.apache.hadoop.hbase.wal.WALFactory;
140import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
141import org.apache.hadoop.hbase.zookeeper.ZKConfig;
142import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
143import org.apache.hadoop.hdfs.DFSClient;
144import org.apache.hadoop.hdfs.DistributedFileSystem;
145import org.apache.hadoop.hdfs.MiniDFSCluster;
146import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
147import org.apache.hadoop.mapred.JobConf;
148import org.apache.hadoop.mapred.MiniMRCluster;
149import org.apache.hadoop.mapred.TaskLog;
150import org.apache.hadoop.minikdc.MiniKdc;
151import org.apache.yetus.audience.InterfaceAudience;
152import org.apache.yetus.audience.InterfaceStability;
153import org.apache.zookeeper.WatchedEvent;
154import org.apache.zookeeper.ZooKeeper;
155import org.apache.zookeeper.ZooKeeper.States;
156
157import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
158
159import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
160
161/**
162 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
163 * functionality. Create an instance and keep it around testing HBase.
164 * <p/>
165 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
166 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
167 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
168 * cluster.
169 * <p/>
170 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
171 * <p/>
172 * It does not set logging levels.
173 * <p/>
174 * In the configuration properties, default values for master-info-port and region-server-port are
175 * overridden such that a random port will be assigned (thus avoiding port contention if another
176 * local HBase instance is already running).
177 * <p/>
178 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
179 * setting it to true.
180 */
181@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
182@InterfaceStability.Evolving
183public class HBaseTestingUtil extends HBaseZKTestingUtil {
184
185  /**
186   * System property key to get test directory value. Name is as it is because mini dfs has
187   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
188   * used in mini dfs.
189   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
190   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
191   */
192  @Deprecated
193  private static final String TEST_DIRECTORY_KEY = "test.build.data";
194
195  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
196  /**
197   * The default number of regions per regionserver when creating a pre-split table.
198   */
199  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
200
201  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
202  public static final boolean PRESPLIT_TEST_TABLE = true;
203
204  private MiniDFSCluster dfsCluster = null;
205
206  private volatile HBaseClusterInterface hbaseCluster = null;
207  private MiniMRCluster mrCluster = null;
208
209  /** If there is a mini cluster running for this testing utility instance. */
210  private volatile boolean miniClusterRunning;
211
212  private String hadoopLogDir;
213
214  /**
215   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
216   */
217  private Path dataTestDirOnTestFS = null;
218
219  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
220
221  /** Filesystem URI used for map-reduce mini-cluster setup */
222  private static String FS_URI;
223
224  /** This is for unit tests parameterized with a single boolean. */
225  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
226
227  /**
228   * Checks to see if a specific port is available.
229   * @param port the port number to check for availability
230   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
231   */
232  public static boolean available(int port) {
233    ServerSocket ss = null;
234    DatagramSocket ds = null;
235    try {
236      ss = new ServerSocket(port);
237      ss.setReuseAddress(true);
238      ds = new DatagramSocket(port);
239      ds.setReuseAddress(true);
240      return true;
241    } catch (IOException e) {
242      // Do nothing
243    } finally {
244      if (ds != null) {
245        ds.close();
246      }
247
248      if (ss != null) {
249        try {
250          ss.close();
251        } catch (IOException e) {
252          /* should not be thrown */
253        }
254      }
255    }
256
257    return false;
258  }
259
260  /**
261   * Create all combinations of Bloom filters and compression algorithms for testing.
262   */
263  private static List<Object[]> bloomAndCompressionCombinations() {
264    List<Object[]> configurations = new ArrayList<>();
265    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
266      for (BloomType bloomType : BloomType.values()) {
267        configurations.add(new Object[] { comprAlgo, bloomType });
268      }
269    }
270    return Collections.unmodifiableList(configurations);
271  }
272
273  /**
274   * Create combination of memstoreTS and tags
275   */
276  private static List<Object[]> memStoreTSAndTagsCombination() {
277    List<Object[]> configurations = new ArrayList<>();
278    configurations.add(new Object[] { false, false });
279    configurations.add(new Object[] { false, true });
280    configurations.add(new Object[] { true, false });
281    configurations.add(new Object[] { true, true });
282    return Collections.unmodifiableList(configurations);
283  }
284
285  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
286    List<Object[]> configurations = new ArrayList<>();
287    configurations.add(new Object[] { false, false, true });
288    configurations.add(new Object[] { false, false, false });
289    configurations.add(new Object[] { false, true, true });
290    configurations.add(new Object[] { false, true, false });
291    configurations.add(new Object[] { true, false, true });
292    configurations.add(new Object[] { true, false, false });
293    configurations.add(new Object[] { true, true, true });
294    configurations.add(new Object[] { true, true, false });
295    return Collections.unmodifiableList(configurations);
296  }
297
298  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
299    bloomAndCompressionCombinations();
300
301  /**
302   * <p>
303   * Create an HBaseTestingUtility using a default configuration.
304   * <p>
305   * Initially, all tmp files are written to a local test data directory. Once
306   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
307   * data will be written to the DFS directory instead.
308   */
309  public HBaseTestingUtil() {
310    this(HBaseConfiguration.create());
311  }
312
313  /**
314   * <p>
315   * Create an HBaseTestingUtility using a given configuration.
316   * <p>
317   * Initially, all tmp files are written to a local test data directory. Once
318   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
319   * data will be written to the DFS directory instead.
320   * @param conf The configuration to use for further operations
321   */
322  public HBaseTestingUtil(@Nullable Configuration conf) {
323    super(conf);
324
325    // a hbase checksum verification failure will cause unit tests to fail
326    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
327
328    // Save this for when setting default file:// breaks things
329    if (this.conf.get("fs.defaultFS") != null) {
330      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
331    }
332    if (this.conf.get(HConstants.HBASE_DIR) != null) {
333      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
334    }
335    // Every cluster is a local cluster until we start DFS
336    // Note that conf could be null, but this.conf will not be
337    String dataTestDir = getDataTestDir().toString();
338    this.conf.set("fs.defaultFS", "file:///");
339    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
340    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
341    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
342    // If the value for random ports isn't set set it to true, thus making
343    // tests opt-out for random port assignment
344    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
345      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
346  }
347
348  /**
349   * Close both the region {@code r} and it's underlying WAL. For use in tests.
350   */
351  public static void closeRegionAndWAL(final Region r) throws IOException {
352    closeRegionAndWAL((HRegion) r);
353  }
354
355  /**
356   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
357   */
358  public static void closeRegionAndWAL(final HRegion r) throws IOException {
359    if (r == null) return;
360    r.close();
361    if (r.getWAL() == null) return;
362    r.getWAL().close();
363  }
364
365  /**
366   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
367   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
368   * by the Configuration. If say, a Connection was being used against a cluster that had been
369   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
370   * Rather than use the return direct, its usually best to make a copy and use that. Do
371   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
372   * @return Instance of Configuration.
373   */
374  @Override
375  public Configuration getConfiguration() {
376    return super.getConfiguration();
377  }
378
379  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
380    this.hbaseCluster = hbaseCluster;
381  }
382
383  /**
384   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
385   * have many concurrent tests running if we need to. It needs to amend the
386   * {@link #TEST_DIRECTORY_KEY} System property, as it's what minidfscluster bases it data dir on.
387   * Moding a System property is not the way to do concurrent instances -- another instance could
388   * grab the temporary value unintentionally -- but not anything can do about it at moment; single
389   * instance only is how the minidfscluster works. We also create the underlying directory names
390   * for hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values in the
391   * conf, and as a system property for hadoop.tmp.dir (We do not create them!).
392   * @return The calculated data test build directory, if newly-created.
393   */
394  @Override
395  protected Path setupDataTestDir() {
396    Path testPath = super.setupDataTestDir();
397    if (null == testPath) {
398      return null;
399    }
400
401    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
402
403    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
404    // we want our own value to ensure uniqueness on the same machine
405    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
406
407    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
408    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
409    return testPath;
410  }
411
412  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
413
414    String sysValue = System.getProperty(propertyName);
415
416    if (sysValue != null) {
417      // There is already a value set. So we do nothing but hope
418      // that there will be no conflicts
419      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
420        + " so I do NOT create it in " + parent);
421      String confValue = conf.get(propertyName);
422      if (confValue != null && !confValue.endsWith(sysValue)) {
423        LOG.warn(propertyName + " property value differs in configuration and system: "
424          + "Configuration=" + confValue + " while System=" + sysValue
425          + " Erasing configuration value by system value.");
426      }
427      conf.set(propertyName, sysValue);
428    } else {
429      // Ok, it's not set, so we create it as a subdirectory
430      createSubDir(propertyName, parent, subDirName);
431      System.setProperty(propertyName, conf.get(propertyName));
432    }
433  }
434
435  /**
436   * @return Where to write test data on the test filesystem; Returns working directory for the test
437   *         filesystem by default
438   * @see #setupDataTestDirOnTestFS()
439   * @see #getTestFileSystem()
440   */
441  private Path getBaseTestDirOnTestFS() throws IOException {
442    FileSystem fs = getTestFileSystem();
443    return new Path(fs.getWorkingDirectory(), "test-data");
444  }
445
446  /**
447   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
448   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
449   * on it.
450   * @return a unique path in the test filesystem
451   */
452  public Path getDataTestDirOnTestFS() throws IOException {
453    if (dataTestDirOnTestFS == null) {
454      setupDataTestDirOnTestFS();
455    }
456
457    return dataTestDirOnTestFS;
458  }
459
460  /**
461   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
462   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
463   * on it.
464   * @return a unique path in the test filesystem
465   * @param subdirName name of the subdir to create under the base test dir
466   */
467  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
468    return new Path(getDataTestDirOnTestFS(), subdirName);
469  }
470
471  /**
472   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
473   * setup.
474   */
475  private void setupDataTestDirOnTestFS() throws IOException {
476    if (dataTestDirOnTestFS != null) {
477      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
478      return;
479    }
480    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
481  }
482
483  /**
484   * Sets up a new path in test filesystem to be used by tests.
485   */
486  private Path getNewDataTestDirOnTestFS() throws IOException {
487    // The file system can be either local, mini dfs, or if the configuration
488    // is supplied externally, it can be an external cluster FS. If it is a local
489    // file system, the tests should use getBaseTestDir, otherwise, we can use
490    // the working directory, and create a unique sub dir there
491    FileSystem fs = getTestFileSystem();
492    Path newDataTestDir;
493    String randomStr = getRandomUUID().toString();
494    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
495      newDataTestDir = new Path(getDataTestDir(), randomStr);
496      File dataTestDir = new File(newDataTestDir.toString());
497      if (deleteOnExit()) dataTestDir.deleteOnExit();
498    } else {
499      Path base = getBaseTestDirOnTestFS();
500      newDataTestDir = new Path(base, randomStr);
501      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
502    }
503    return newDataTestDir;
504  }
505
506  /**
507   * Cleans the test data directory on the test filesystem.
508   * @return True if we removed the test dirs
509   */
510  public boolean cleanupDataTestDirOnTestFS() throws IOException {
511    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
512    if (ret) {
513      dataTestDirOnTestFS = null;
514    }
515    return ret;
516  }
517
518  /**
519   * Cleans a subdirectory under the test data directory on the test filesystem.
520   * @return True if we removed child
521   */
522  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
523    Path cpath = getDataTestDirOnTestFS(subdirName);
524    return getTestFileSystem().delete(cpath, true);
525  }
526
527  /**
528   * Start a minidfscluster.
529   * @param servers How many DNs to start.
530   * @see #shutdownMiniDFSCluster()
531   * @return The mini dfs cluster created.
532   */
533  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
534    return startMiniDFSCluster(servers, null);
535  }
536
537  /**
538   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
539   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
540   * instances of the datanodes will have the same host name.
541   * @param hosts hostnames DNs to run on. n * @see #shutdownMiniDFSCluster()
542   * @return The mini dfs cluster created.
543   */
544  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
545    if (hosts != null && hosts.length != 0) {
546      return startMiniDFSCluster(hosts.length, hosts);
547    } else {
548      return startMiniDFSCluster(1, null);
549    }
550  }
551
552  /**
553   * Start a minidfscluster. Can only create one.
554   * @param servers How many DNs to start.
555   * @param hosts   hostnames DNs to run on. n * @see #shutdownMiniDFSCluster()
556   * @return The mini dfs cluster created.
557   */
558  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
559    return startMiniDFSCluster(servers, null, hosts);
560  }
561
562  private void setFs() throws IOException {
563    if (this.dfsCluster == null) {
564      LOG.info("Skipping setting fs because dfsCluster is null");
565      return;
566    }
567    FileSystem fs = this.dfsCluster.getFileSystem();
568    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
569
570    // re-enable this check with dfs
571    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
572  }
573
574  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
575    throws Exception {
576    createDirsAndSetProperties();
577    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
578
579    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
580    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
581    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
582      "ERROR");
583    this.dfsCluster =
584      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
585
586    // Set this just-started cluster as our filesystem.
587    setFs();
588
589    // Wait for the cluster to be totally up
590    this.dfsCluster.waitClusterUp();
591
592    // reset the test directory for test file system
593    dataTestDirOnTestFS = null;
594    String dataTestDir = getDataTestDir().toString();
595    conf.set(HConstants.HBASE_DIR, dataTestDir);
596    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
597
598    return this.dfsCluster;
599  }
600
601  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
602    createDirsAndSetProperties();
603    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
604    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
605    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
606      "ERROR");
607    dfsCluster =
608      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
609    return dfsCluster;
610  }
611
612  /**
613   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
614   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
615   * the conf.
616   *
617   * <pre>
618   * Configuration conf = TEST_UTIL.getConfiguration();
619   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
620   *   Map.Entry&lt;String, String&gt; e = i.next();
621   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
622   * }
623   * </pre>
624   */
625  private void createDirsAndSetProperties() throws IOException {
626    setupClusterTestDir();
627    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
628    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
629    createDirAndSetProperty("test.cache.data");
630    createDirAndSetProperty("hadoop.tmp.dir");
631    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
632    createDirAndSetProperty("mapreduce.cluster.local.dir");
633    createDirAndSetProperty("mapreduce.cluster.temp.dir");
634    enableShortCircuit();
635
636    Path root = getDataTestDirOnTestFS("hadoop");
637    conf.set(MapreduceTestingShim.getMROutputDirProp(),
638      new Path(root, "mapred-output-dir").toString());
639    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
640    conf.set("mapreduce.jobtracker.staging.root.dir",
641      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
642    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
643    conf.set("yarn.app.mapreduce.am.staging-dir",
644      new Path(root, "mapreduce-am-staging-root-dir").toString());
645
646    // Frustrate yarn's and hdfs's attempts at writing /tmp.
647    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
648    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
649    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
650    createDirAndSetProperty("yarn.nodemanager.log-dirs");
651    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
652    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
653    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
654    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
655    createDirAndSetProperty("dfs.journalnode.edits.dir");
656    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
657    createDirAndSetProperty("nfs.dump.dir");
658    createDirAndSetProperty("java.io.tmpdir");
659    createDirAndSetProperty("dfs.journalnode.edits.dir");
660    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
661    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
662  }
663
664  /**
665   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
666   * Default to false.
667   */
668  public boolean isNewVersionBehaviorEnabled() {
669    final String propName = "hbase.tests.new.version.behavior";
670    String v = System.getProperty(propName);
671    if (v != null) {
672      return Boolean.parseBoolean(v);
673    }
674    return false;
675  }
676
677  /**
678   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
679   * allows to specify this parameter on the command line. If not set, default is true.
680   */
681  public boolean isReadShortCircuitOn() {
682    final String propName = "hbase.tests.use.shortcircuit.reads";
683    String readOnProp = System.getProperty(propName);
684    if (readOnProp != null) {
685      return Boolean.parseBoolean(readOnProp);
686    } else {
687      return conf.getBoolean(propName, false);
688    }
689  }
690
691  /**
692   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
693   * including skipping the hdfs checksum checks.
694   */
695  private void enableShortCircuit() {
696    if (isReadShortCircuitOn()) {
697      String curUser = System.getProperty("user.name");
698      LOG.info("read short circuit is ON for user " + curUser);
699      // read short circuit, for hdfs
700      conf.set("dfs.block.local-path-access.user", curUser);
701      // read short circuit, for hbase
702      conf.setBoolean("dfs.client.read.shortcircuit", true);
703      // Skip checking checksum, for the hdfs client and the datanode
704      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
705    } else {
706      LOG.info("read short circuit is OFF");
707    }
708  }
709
710  private String createDirAndSetProperty(final String property) {
711    return createDirAndSetProperty(property, property);
712  }
713
714  private String createDirAndSetProperty(final String relPath, String property) {
715    String path = getDataTestDir(relPath).toString();
716    System.setProperty(property, path);
717    conf.set(property, path);
718    new File(path).mkdirs();
719    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
720    return path;
721  }
722
723  /**
724   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
725   */
726  public void shutdownMiniDFSCluster() throws IOException {
727    if (this.dfsCluster != null) {
728      // The below throws an exception per dn, AsynchronousCloseException.
729      this.dfsCluster.shutdown();
730      dfsCluster = null;
731      dataTestDirOnTestFS = null;
732      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
733    }
734  }
735
736  /**
737   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
738   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
739   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
740   * @see #startMiniCluster(StartTestingClusterOption option)
741   * @see #shutdownMiniDFSCluster()
742   */
743  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
744    StartTestingClusterOption option = StartTestingClusterOption.builder()
745      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
746    return startMiniCluster(option);
747  }
748
749  /**
750   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
751   * value can be found in {@link StartTestingClusterOption.Builder}.
752   * @see #startMiniCluster(StartTestingClusterOption option)
753   * @see #shutdownMiniDFSCluster()
754   */
755  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
756    return startMiniCluster(StartTestingClusterOption.builder().build());
757  }
758
759  /**
760   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
761   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
762   * under System property test.build.data, to be cleaned up on exit.
763   * @see #shutdownMiniDFSCluster()
764   */
765  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
766    throws Exception {
767    LOG.info("Starting up minicluster with option: {}", option);
768
769    // If we already put up a cluster, fail.
770    if (miniClusterRunning) {
771      throw new IllegalStateException("A mini-cluster is already running");
772    }
773    miniClusterRunning = true;
774
775    setupClusterTestDir();
776    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
777
778    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
779    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
780    if (dfsCluster == null) {
781      LOG.info("STARTING DFS");
782      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
783    } else {
784      LOG.info("NOT STARTING DFS");
785    }
786
787    // Start up a zk cluster.
788    if (getZkCluster() == null) {
789      startMiniZKCluster(option.getNumZkServers());
790    }
791
792    // Start the MiniHBaseCluster
793    return startMiniHBaseCluster(option);
794  }
795
796  /**
797   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
798   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
799   * @return Reference to the hbase mini hbase cluster.
800   * @see #startMiniCluster(StartTestingClusterOption)
801   * @see #shutdownMiniHBaseCluster()
802   */
803  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
804    throws IOException, InterruptedException {
805    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
806    createRootDir(option.isCreateRootDir());
807    if (option.isCreateWALDir()) {
808      createWALRootDir();
809    }
810    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
811    // for tests that do not read hbase-defaults.xml
812    setHBaseFsTmpDir();
813
814    // These settings will make the server waits until this exact number of
815    // regions servers are connected.
816    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
817      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
818    }
819    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
820      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
821    }
822
823    // Avoid log flooded with chore execution time, see HBASE-24646 for more details.
824    Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
825
826    Configuration c = new Configuration(this.conf);
827    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
828      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
829      option.getMasterClass(), option.getRsClass());
830    // Populate the master address configuration from mini cluster configuration.
831    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
832    // Don't leave here till we've done a successful scan of the hbase:meta
833    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
834      ResultScanner s = t.getScanner(new Scan())) {
835      for (;;) {
836        if (s.next() == null) {
837          break;
838        }
839      }
840    }
841
842    getAdmin(); // create immediately the hbaseAdmin
843    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
844
845    return (SingleProcessHBaseCluster) hbaseCluster;
846  }
847
848  /**
849   * Starts up mini hbase cluster using default options. Default options can be found in
850   * {@link StartTestingClusterOption.Builder}.
851   * @see #startMiniHBaseCluster(StartTestingClusterOption)
852   * @see #shutdownMiniHBaseCluster()
853   */
854  public SingleProcessHBaseCluster startMiniHBaseCluster()
855    throws IOException, InterruptedException {
856    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
857  }
858
859  /**
860   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
861   * {@link #startMiniCluster()}. All other options will use default values, defined in
862   * {@link StartTestingClusterOption.Builder}.
863   * @param numMasters       Master node number.
864   * @param numRegionServers Number of region servers.
865   * @return The mini HBase cluster created.
866   * @see #shutdownMiniHBaseCluster()
867   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
868   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
869   * @see #startMiniHBaseCluster(StartTestingClusterOption)
870   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
871   */
872  @Deprecated
873  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
874    throws IOException, InterruptedException {
875    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
876      .numRegionServers(numRegionServers).build();
877    return startMiniHBaseCluster(option);
878  }
879
880  /**
881   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
882   * {@link #startMiniCluster()}. All other options will use default values, defined in
883   * {@link StartTestingClusterOption.Builder}.
884   * @param numMasters       Master node number.
885   * @param numRegionServers Number of region servers.
886   * @param rsPorts          Ports that RegionServer should use.
887   * @return The mini HBase cluster created.
888   * @see #shutdownMiniHBaseCluster()
889   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
890   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
891   * @see #startMiniHBaseCluster(StartTestingClusterOption)
892   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
893   */
894  @Deprecated
895  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
896    List<Integer> rsPorts) throws IOException, InterruptedException {
897    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
898      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
899    return startMiniHBaseCluster(option);
900  }
901
902  /**
903   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
904   * {@link #startMiniCluster()}. All other options will use default values, defined in
905   * {@link StartTestingClusterOption.Builder}.
906   * @param numMasters       Master node number.
907   * @param numRegionServers Number of region servers.
908   * @param rsPorts          Ports that RegionServer should use.
909   * @param masterClass      The class to use as HMaster, or null for default.
910   * @param rsClass          The class to use as HRegionServer, or null for default.
911   * @param createRootDir    Whether to create a new root or data directory path.
912   * @param createWALDir     Whether to create a new WAL directory.
913   * @return The mini HBase cluster created.
914   * @see #shutdownMiniHBaseCluster()
915   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
916   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
917   * @see #startMiniHBaseCluster(StartTestingClusterOption)
918   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
919   */
920  @Deprecated
921  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
922    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
923    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
924    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
925    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
926      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
927      .createRootDir(createRootDir).createWALDir(createWALDir).build();
928    return startMiniHBaseCluster(option);
929  }
930
931  /**
932   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
933   * want to keep dfs/zk up and just stop/start hbase.
934   * @param servers number of region servers
935   */
936  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
937    this.restartHBaseCluster(servers, null);
938  }
939
940  public void restartHBaseCluster(int servers, List<Integer> ports)
941    throws IOException, InterruptedException {
942    StartTestingClusterOption option =
943      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
944    restartHBaseCluster(option);
945    invalidateConnection();
946  }
947
948  public void restartHBaseCluster(StartTestingClusterOption option)
949    throws IOException, InterruptedException {
950    closeConnection();
951    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
952      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
953      option.getMasterClass(), option.getRsClass());
954    // Don't leave here till we've done a successful scan of the hbase:meta
955    Connection conn = ConnectionFactory.createConnection(this.conf);
956    Table t = conn.getTable(TableName.META_TABLE_NAME);
957    ResultScanner s = t.getScanner(new Scan());
958    while (s.next() != null) {
959      // do nothing
960    }
961    LOG.info("HBase has been restarted");
962    s.close();
963    t.close();
964    conn.close();
965  }
966
967  /**
968   * Returns current mini hbase cluster. Only has something in it after a call to
969   * {@link #startMiniCluster()}.
970   * @see #startMiniCluster()
971   */
972  public SingleProcessHBaseCluster getMiniHBaseCluster() {
973    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
974      return (SingleProcessHBaseCluster) this.hbaseCluster;
975    }
976    throw new RuntimeException(
977      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
978  }
979
980  /**
981   * Stops mini hbase, zk, and hdfs clusters.
982   * @see #startMiniCluster(int)
983   */
984  public void shutdownMiniCluster() throws IOException {
985    LOG.info("Shutting down minicluster");
986    shutdownMiniHBaseCluster();
987    shutdownMiniDFSCluster();
988    shutdownMiniZKCluster();
989
990    cleanupTestDir();
991    miniClusterRunning = false;
992    LOG.info("Minicluster is down");
993  }
994
995  /**
996   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
997   * @throws java.io.IOException in case command is unsuccessful
998   */
999  public void shutdownMiniHBaseCluster() throws IOException {
1000    cleanup();
1001    if (this.hbaseCluster != null) {
1002      this.hbaseCluster.shutdown();
1003      // Wait till hbase is down before going on to shutdown zk.
1004      this.hbaseCluster.waitUntilShutDown();
1005      this.hbaseCluster = null;
1006    }
1007    if (zooKeeperWatcher != null) {
1008      zooKeeperWatcher.close();
1009      zooKeeperWatcher = null;
1010    }
1011  }
1012
1013  /**
1014   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1015   * @throws java.io.IOException throws in case command is unsuccessful
1016   */
1017  public void killMiniHBaseCluster() throws IOException {
1018    cleanup();
1019    if (this.hbaseCluster != null) {
1020      getMiniHBaseCluster().killAll();
1021      this.hbaseCluster = null;
1022    }
1023    if (zooKeeperWatcher != null) {
1024      zooKeeperWatcher.close();
1025      zooKeeperWatcher = null;
1026    }
1027  }
1028
1029  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1030  private void cleanup() throws IOException {
1031    closeConnection();
1032    // unset the configuration for MIN and MAX RS to start
1033    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1034    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1035  }
1036
1037  /**
1038   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1039   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1040   * If false, previous path is used. Note: this does not cause the root dir to be created.
1041   * @return Fully qualified path for the default hbase root dir n
1042   */
1043  public Path getDefaultRootDirPath(boolean create) throws IOException {
1044    if (!create) {
1045      return getDataTestDirOnTestFS();
1046    } else {
1047      return getNewDataTestDirOnTestFS();
1048    }
1049  }
1050
1051  /**
1052   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1053   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1054   * @return Fully qualified path for the default hbase root dir n
1055   */
1056  public Path getDefaultRootDirPath() throws IOException {
1057    return getDefaultRootDirPath(false);
1058  }
1059
1060  /**
1061   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1062   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1063   * startup. You'd only use this method if you were doing manual operation.
1064   * @param create This flag decides whether to get a new root or data directory path or not, if it
1065   *               has been fetched already. Note : Directory will be made irrespective of whether
1066   *               path has been fetched or not. If directory already exists, it will be overwritten
1067   * @return Fully qualified path to hbase root dir n
1068   */
1069  public Path createRootDir(boolean create) throws IOException {
1070    FileSystem fs = FileSystem.get(this.conf);
1071    Path hbaseRootdir = getDefaultRootDirPath(create);
1072    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1073    fs.mkdirs(hbaseRootdir);
1074    FSUtils.setVersion(fs, hbaseRootdir);
1075    return hbaseRootdir;
1076  }
1077
1078  /**
1079   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1080   * flag is false.
1081   * @return Fully qualified path to hbase root dir n
1082   */
1083  public Path createRootDir() throws IOException {
1084    return createRootDir(false);
1085  }
1086
1087  /**
1088   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1089   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1090   * this method if you were doing manual operation.
1091   * @return Fully qualified path to hbase root dir n
1092   */
1093  public Path createWALRootDir() throws IOException {
1094    FileSystem fs = FileSystem.get(this.conf);
1095    Path walDir = getNewDataTestDirOnTestFS();
1096    CommonFSUtils.setWALRootDir(this.conf, walDir);
1097    fs.mkdirs(walDir);
1098    return walDir;
1099  }
1100
1101  private void setHBaseFsTmpDir() throws IOException {
1102    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1103    if (hbaseFsTmpDirInString == null) {
1104      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1105      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1106    } else {
1107      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1108    }
1109  }
1110
1111  /**
1112   * Flushes all caches in the mini hbase cluster n
1113   */
1114  public void flush() throws IOException {
1115    getMiniHBaseCluster().flushcache();
1116  }
1117
1118  /**
1119   * Flushes all caches in the mini hbase cluster n
1120   */
1121  public void flush(TableName tableName) throws IOException {
1122    getMiniHBaseCluster().flushcache(tableName);
1123  }
1124
1125  /**
1126   * Compact all regions in the mini hbase cluster n
1127   */
1128  public void compact(boolean major) throws IOException {
1129    getMiniHBaseCluster().compact(major);
1130  }
1131
1132  /**
1133   * Compact all of a table's reagion in the mini hbase cluster n
1134   */
1135  public void compact(TableName tableName, boolean major) throws IOException {
1136    getMiniHBaseCluster().compact(tableName, major);
1137  }
1138
1139  /**
1140   * Create a table. nn * @return A Table instance for the created table. n
1141   */
1142  public Table createTable(TableName tableName, String family) throws IOException {
1143    return createTable(tableName, new String[] { family });
1144  }
1145
1146  /**
1147   * Create a table. nn * @return A Table instance for the created table. n
1148   */
1149  public Table createTable(TableName tableName, String[] families) throws IOException {
1150    List<byte[]> fams = new ArrayList<>(families.length);
1151    for (String family : families) {
1152      fams.add(Bytes.toBytes(family));
1153    }
1154    return createTable(tableName, fams.toArray(new byte[0][]));
1155  }
1156
1157  /**
1158   * Create a table. nn * @return A Table instance for the created table. n
1159   */
1160  public Table createTable(TableName tableName, byte[] family) throws IOException {
1161    return createTable(tableName, new byte[][] { family });
1162  }
1163
1164  /**
1165   * Create a table with multiple regions. nnn * @return A Table instance for the created table. n
1166   */
1167  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1168    throws IOException {
1169    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1170    byte[] startKey = Bytes.toBytes("aaaaa");
1171    byte[] endKey = Bytes.toBytes("zzzzz");
1172    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1173
1174    return createTable(tableName, new byte[][] { family }, splitKeys);
1175  }
1176
1177  /**
1178   * Create a table. nn * @return A Table instance for the created table. n
1179   */
1180  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1181    return createTable(tableName, families, (byte[][]) null);
1182  }
1183
1184  /**
1185   * Create a table with multiple regions. nn * @return A Table instance for the created table. n
1186   */
1187  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1188    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1189  }
1190
1191  /**
1192   * Create a table with multiple regions. n * @param replicaCount replica count. n * @return A
1193   * Table instance for the created table. n
1194   */
1195  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1196    throws IOException {
1197    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1198  }
1199
1200  /**
1201   * Create a table. nnn * @return A Table instance for the created table. n
1202   */
1203  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1204    throws IOException {
1205    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1206  }
1207
1208  /**
1209   * Create a table.
1210   * @param tableName    the table name
1211   * @param families     the families
1212   * @param splitKeys    the splitkeys
1213   * @param replicaCount the region replica count
1214   * @return A Table instance for the created table.
1215   * @throws IOException throws IOException
1216   */
1217  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1218    int replicaCount) throws IOException {
1219    return createTable(tableName, families, splitKeys, replicaCount,
1220      new Configuration(getConfiguration()));
1221  }
1222
1223  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1224    byte[] endKey, int numRegions) throws IOException {
1225    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1226
1227    getAdmin().createTable(desc, startKey, endKey, numRegions);
1228    // HBaseAdmin only waits for regions to appear in hbase:meta we
1229    // should wait until they are assigned
1230    waitUntilAllRegionsAssigned(tableName);
1231    return getConnection().getTable(tableName);
1232  }
1233
1234  /**
1235   * Create a table.
1236   * @param c Configuration to use
1237   * @return A Table instance for the created table.
1238   */
1239  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1240    throws IOException {
1241    return createTable(htd, families, null, c);
1242  }
1243
1244  /**
1245   * Create a table.
1246   * @param htd       table descriptor
1247   * @param families  array of column families
1248   * @param splitKeys array of split keys
1249   * @param c         Configuration to use
1250   * @return A Table instance for the created table.
1251   * @throws IOException if getAdmin or createTable fails
1252   */
1253  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1254    Configuration c) throws IOException {
1255    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1256    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1257    // on is interfering.
1258    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1259  }
1260
1261  /**
1262   * Create a table.
1263   * @param htd       table descriptor
1264   * @param families  array of column families
1265   * @param splitKeys array of split keys
1266   * @param type      Bloom type
1267   * @param blockSize block size
1268   * @param c         Configuration to use
1269   * @return A Table instance for the created table.
1270   * @throws IOException if getAdmin or createTable fails
1271   */
1272
1273  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1274    BloomType type, int blockSize, Configuration c) throws IOException {
1275    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1276    for (byte[] family : families) {
1277      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1278        .setBloomFilterType(type).setBlocksize(blockSize);
1279      if (isNewVersionBehaviorEnabled()) {
1280        cfdb.setNewVersionBehavior(true);
1281      }
1282      builder.setColumnFamily(cfdb.build());
1283    }
1284    TableDescriptor td = builder.build();
1285    if (splitKeys != null) {
1286      getAdmin().createTable(td, splitKeys);
1287    } else {
1288      getAdmin().createTable(td);
1289    }
1290    // HBaseAdmin only waits for regions to appear in hbase:meta
1291    // we should wait until they are assigned
1292    waitUntilAllRegionsAssigned(td.getTableName());
1293    return getConnection().getTable(td.getTableName());
1294  }
1295
1296  /**
1297   * Create a table.
1298   * @param htd       table descriptor
1299   * @param splitRows array of split keys
1300   * @return A Table instance for the created table. n
1301   */
1302  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1303    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1304    if (isNewVersionBehaviorEnabled()) {
1305      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1306        builder.setColumnFamily(
1307          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1308      }
1309    }
1310    if (splitRows != null) {
1311      getAdmin().createTable(builder.build(), splitRows);
1312    } else {
1313      getAdmin().createTable(builder.build());
1314    }
1315    // HBaseAdmin only waits for regions to appear in hbase:meta
1316    // we should wait until they are assigned
1317    waitUntilAllRegionsAssigned(htd.getTableName());
1318    return getConnection().getTable(htd.getTableName());
1319  }
1320
1321  /**
1322   * Create a table.
1323   * @param tableName    the table name
1324   * @param families     the families
1325   * @param splitKeys    the split keys
1326   * @param replicaCount the replica count
1327   * @param c            Configuration to use
1328   * @return A Table instance for the created table.
1329   */
1330  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1331    int replicaCount, final Configuration c) throws IOException {
1332    TableDescriptor htd =
1333      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1334    return createTable(htd, families, splitKeys, c);
1335  }
1336
1337  /**
1338   * Create a table.
1339   * @return A Table instance for the created table.
1340   */
1341  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1342    return createTable(tableName, new byte[][] { family }, numVersions);
1343  }
1344
1345  /**
1346   * Create a table.
1347   * @return A Table instance for the created table.
1348   */
1349  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1350    throws IOException {
1351    return createTable(tableName, families, numVersions, (byte[][]) null);
1352  }
1353
1354  /**
1355   * Create a table.
1356   * @return A Table instance for the created table.
1357   */
1358  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1359    byte[][] splitKeys) throws IOException {
1360    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1361    for (byte[] family : families) {
1362      ColumnFamilyDescriptorBuilder cfBuilder =
1363        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1364      if (isNewVersionBehaviorEnabled()) {
1365        cfBuilder.setNewVersionBehavior(true);
1366      }
1367      builder.setColumnFamily(cfBuilder.build());
1368    }
1369    if (splitKeys != null) {
1370      getAdmin().createTable(builder.build(), splitKeys);
1371    } else {
1372      getAdmin().createTable(builder.build());
1373    }
1374    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1375    // assigned
1376    waitUntilAllRegionsAssigned(tableName);
1377    return getConnection().getTable(tableName);
1378  }
1379
1380  /**
1381   * Create a table with multiple regions.
1382   * @return A Table instance for the created table.
1383   */
1384  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1385    throws IOException {
1386    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1387  }
1388
1389  /**
1390   * Create a table.
1391   * @return A Table instance for the created table.
1392   */
1393  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1394    throws IOException {
1395    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1396    for (byte[] family : families) {
1397      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1398        .setMaxVersions(numVersions).setBlocksize(blockSize);
1399      if (isNewVersionBehaviorEnabled()) {
1400        cfBuilder.setNewVersionBehavior(true);
1401      }
1402      builder.setColumnFamily(cfBuilder.build());
1403    }
1404    getAdmin().createTable(builder.build());
1405    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1406    // assigned
1407    waitUntilAllRegionsAssigned(tableName);
1408    return getConnection().getTable(tableName);
1409  }
1410
1411  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1412    String cpName) throws IOException {
1413    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1414    for (byte[] family : families) {
1415      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1416        .setMaxVersions(numVersions).setBlocksize(blockSize);
1417      if (isNewVersionBehaviorEnabled()) {
1418        cfBuilder.setNewVersionBehavior(true);
1419      }
1420      builder.setColumnFamily(cfBuilder.build());
1421    }
1422    if (cpName != null) {
1423      builder.setCoprocessor(cpName);
1424    }
1425    getAdmin().createTable(builder.build());
1426    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1427    // assigned
1428    waitUntilAllRegionsAssigned(tableName);
1429    return getConnection().getTable(tableName);
1430  }
1431
1432  /**
1433   * Create a table.
1434   * @return A Table instance for the created table.
1435   */
1436  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1437    throws IOException {
1438    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1439    int i = 0;
1440    for (byte[] family : families) {
1441      ColumnFamilyDescriptorBuilder cfBuilder =
1442        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1443      if (isNewVersionBehaviorEnabled()) {
1444        cfBuilder.setNewVersionBehavior(true);
1445      }
1446      builder.setColumnFamily(cfBuilder.build());
1447      i++;
1448    }
1449    getAdmin().createTable(builder.build());
1450    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1451    // assigned
1452    waitUntilAllRegionsAssigned(tableName);
1453    return getConnection().getTable(tableName);
1454  }
1455
1456  /**
1457   * Create a table.
1458   * @return A Table instance for the created table.
1459   */
1460  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1461    throws IOException {
1462    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1463    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1464    if (isNewVersionBehaviorEnabled()) {
1465      cfBuilder.setNewVersionBehavior(true);
1466    }
1467    builder.setColumnFamily(cfBuilder.build());
1468    getAdmin().createTable(builder.build(), splitRows);
1469    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1470    // assigned
1471    waitUntilAllRegionsAssigned(tableName);
1472    return getConnection().getTable(tableName);
1473  }
1474
1475  /**
1476   * Create a table with multiple regions.
1477   * @return A Table instance for the created table.
1478   */
1479  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1480    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1481  }
1482
1483  /**
1484   * Set the number of Region replicas.
1485   */
1486  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1487    throws IOException, InterruptedException {
1488    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1489      .setRegionReplication(replicaCount).build();
1490    admin.modifyTable(desc);
1491  }
1492
1493  /**
1494   * Set the number of Region replicas.
1495   */
1496  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1497    throws ExecutionException, IOException, InterruptedException {
1498    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1499      .setRegionReplication(replicaCount).build();
1500    admin.modifyTable(desc).get();
1501  }
1502
1503  /**
1504   * Drop an existing table
1505   * @param tableName existing table
1506   */
1507  public void deleteTable(TableName tableName) throws IOException {
1508    try {
1509      getAdmin().disableTable(tableName);
1510    } catch (TableNotEnabledException e) {
1511      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1512    }
1513    getAdmin().deleteTable(tableName);
1514  }
1515
1516  /**
1517   * Drop an existing table
1518   * @param tableName existing table
1519   */
1520  public void deleteTableIfAny(TableName tableName) throws IOException {
1521    try {
1522      deleteTable(tableName);
1523    } catch (TableNotFoundException e) {
1524      // ignore
1525    }
1526  }
1527
1528  // ==========================================================================
1529  // Canned table and table descriptor creation
1530
1531  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1532  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1533  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1534  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1535  private static final int MAXVERSIONS = 3;
1536
1537  public static final char FIRST_CHAR = 'a';
1538  public static final char LAST_CHAR = 'z';
1539  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1540  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1541
1542  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1543    return createModifyableTableDescriptor(TableName.valueOf(name),
1544      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1545      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1546  }
1547
1548  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1549    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1550    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1551    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1552      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1553        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1554        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1555      if (isNewVersionBehaviorEnabled()) {
1556        cfBuilder.setNewVersionBehavior(true);
1557      }
1558      builder.setColumnFamily(cfBuilder.build());
1559    }
1560    return builder.build();
1561  }
1562
1563  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1564    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1565    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1566    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1567      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1568        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1569        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1570      if (isNewVersionBehaviorEnabled()) {
1571        cfBuilder.setNewVersionBehavior(true);
1572      }
1573      builder.setColumnFamily(cfBuilder.build());
1574    }
1575    return builder;
1576  }
1577
1578  /**
1579   * Create a table of name <code>name</code>.
1580   * @param name Name to give table.
1581   * @return Column descriptor.
1582   */
1583  public TableDescriptor createTableDescriptor(final TableName name) {
1584    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1585      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1586  }
1587
1588  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1589    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1590  }
1591
1592  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1593    int maxVersions) {
1594    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1595    for (byte[] family : families) {
1596      ColumnFamilyDescriptorBuilder cfBuilder =
1597        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1598      if (isNewVersionBehaviorEnabled()) {
1599        cfBuilder.setNewVersionBehavior(true);
1600      }
1601      builder.setColumnFamily(cfBuilder.build());
1602    }
1603    return builder.build();
1604  }
1605
1606  /**
1607   * Create an HRegion that writes to the local tmp dirs
1608   * @param desc     a table descriptor indicating which table the region belongs to
1609   * @param startKey the start boundary of the region
1610   * @param endKey   the end boundary of the region
1611   * @return a region that writes to local dir for testing
1612   */
1613  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1614    throws IOException {
1615    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1616      .setEndKey(endKey).build();
1617    return createLocalHRegion(hri, desc);
1618  }
1619
1620  /**
1621   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1622   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1623   */
1624  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1625    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1626  }
1627
1628  /**
1629   * Create an HRegion that writes to the local tmp dirs with specified wal
1630   * @param info regioninfo
1631   * @param conf configuration
1632   * @param desc table descriptor
1633   * @param wal  wal for this region.
1634   * @return created hregion n
1635   */
1636  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1637    WAL wal) throws IOException {
1638    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1639  }
1640
1641  /**
1642   * nnnnn * @return A region on which you must call
1643   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when done. n
1644   */
1645  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1646    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1647    throws IOException {
1648    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1649      durability, wal, null, families);
1650  }
1651
1652  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1653    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1654    boolean[] compactedMemStore, byte[]... families) throws IOException {
1655    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1656    builder.setReadOnly(isReadOnly);
1657    int i = 0;
1658    for (byte[] family : families) {
1659      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1660      if (compactedMemStore != null && i < compactedMemStore.length) {
1661        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1662      } else {
1663        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1664
1665      }
1666      i++;
1667      // Set default to be three versions.
1668      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1669      builder.setColumnFamily(cfBuilder.build());
1670    }
1671    builder.setDurability(durability);
1672    RegionInfo info =
1673      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1674    return createLocalHRegion(info, conf, builder.build(), wal);
1675  }
1676
1677  //
1678  // ==========================================================================
1679
1680  /**
1681   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1682   * read.
1683   * @param tableName existing table
1684   * @return HTable to that new table n
1685   */
1686  public Table deleteTableData(TableName tableName) throws IOException {
1687    Table table = getConnection().getTable(tableName);
1688    Scan scan = new Scan();
1689    ResultScanner resScan = table.getScanner(scan);
1690    for (Result res : resScan) {
1691      Delete del = new Delete(res.getRow());
1692      table.delete(del);
1693    }
1694    resScan = table.getScanner(scan);
1695    resScan.close();
1696    return table;
1697  }
1698
1699  /**
1700   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1701   * table.
1702   * @param tableName       table which must exist.
1703   * @param preserveRegions keep the existing split points
1704   * @return HTable for the new table
1705   */
1706  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1707    throws IOException {
1708    Admin admin = getAdmin();
1709    if (!admin.isTableDisabled(tableName)) {
1710      admin.disableTable(tableName);
1711    }
1712    admin.truncateTable(tableName, preserveRegions);
1713    return getConnection().getTable(tableName);
1714  }
1715
1716  /**
1717   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1718   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1719   * preserve regions of existing table.
1720   * @param tableName table which must exist.
1721   * @return HTable for the new table
1722   */
1723  public Table truncateTable(final TableName tableName) throws IOException {
1724    return truncateTable(tableName, false);
1725  }
1726
1727  /**
1728   * Load table with rows from 'aaa' to 'zzz'.
1729   * @param t Table
1730   * @param f Family
1731   * @return Count of rows loaded. n
1732   */
1733  public int loadTable(final Table t, final byte[] f) throws IOException {
1734    return loadTable(t, new byte[][] { f });
1735  }
1736
1737  /**
1738   * Load table with rows from 'aaa' to 'zzz'.
1739   * @param t Table
1740   * @param f Family
1741   * @return Count of rows loaded. n
1742   */
1743  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1744    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1745  }
1746
1747  /**
1748   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1749   * @param t Table
1750   * @param f Array of Families to load
1751   * @return Count of rows loaded. n
1752   */
1753  public int loadTable(final Table t, final byte[][] f) throws IOException {
1754    return loadTable(t, f, null);
1755  }
1756
1757  /**
1758   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1759   * @param t     Table
1760   * @param f     Array of Families to load
1761   * @param value the values of the cells. If null is passed, the row key is used as value
1762   * @return Count of rows loaded. n
1763   */
1764  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1765    return loadTable(t, f, value, true);
1766  }
1767
1768  /**
1769   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1770   * @param t     Table
1771   * @param f     Array of Families to load
1772   * @param value the values of the cells. If null is passed, the row key is used as value
1773   * @return Count of rows loaded. n
1774   */
1775  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1776    throws IOException {
1777    List<Put> puts = new ArrayList<>();
1778    for (byte[] row : HBaseTestingUtil.ROWS) {
1779      Put put = new Put(row);
1780      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1781      for (int i = 0; i < f.length; i++) {
1782        byte[] value1 = value != null ? value : row;
1783        put.addColumn(f[i], f[i], value1);
1784      }
1785      puts.add(put);
1786    }
1787    t.put(puts);
1788    return puts.size();
1789  }
1790
1791  /**
1792   * A tracker for tracking and validating table rows generated with
1793   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1794   */
1795  public static class SeenRowTracker {
1796    int dim = 'z' - 'a' + 1;
1797    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1798    byte[] startRow;
1799    byte[] stopRow;
1800
1801    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1802      this.startRow = startRow;
1803      this.stopRow = stopRow;
1804    }
1805
1806    void reset() {
1807      for (byte[] row : ROWS) {
1808        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1809      }
1810    }
1811
1812    int i(byte b) {
1813      return b - 'a';
1814    }
1815
1816    public void addRow(byte[] row) {
1817      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1818    }
1819
1820    /**
1821     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1822     * rows none
1823     */
1824    public void validate() {
1825      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1826        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1827          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1828            int count = seenRows[i(b1)][i(b2)][i(b3)];
1829            int expectedCount = 0;
1830            if (
1831              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1832                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1833            ) {
1834              expectedCount = 1;
1835            }
1836            if (count != expectedCount) {
1837              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1838              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1839                + "instead of " + expectedCount);
1840            }
1841          }
1842        }
1843      }
1844    }
1845  }
1846
1847  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1848    return loadRegion(r, f, false);
1849  }
1850
1851  public int loadRegion(final Region r, final byte[] f) throws IOException {
1852    return loadRegion((HRegion) r, f);
1853  }
1854
1855  /**
1856   * Load region with rows from 'aaa' to 'zzz'.
1857   * @param r     Region
1858   * @param f     Family
1859   * @param flush flush the cache if true
1860   * @return Count of rows loaded. n
1861   */
1862  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1863    byte[] k = new byte[3];
1864    int rowCount = 0;
1865    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1866      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1867        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1868          k[0] = b1;
1869          k[1] = b2;
1870          k[2] = b3;
1871          Put put = new Put(k);
1872          put.setDurability(Durability.SKIP_WAL);
1873          put.addColumn(f, null, k);
1874          if (r.getWAL() == null) {
1875            put.setDurability(Durability.SKIP_WAL);
1876          }
1877          int preRowCount = rowCount;
1878          int pause = 10;
1879          int maxPause = 1000;
1880          while (rowCount == preRowCount) {
1881            try {
1882              r.put(put);
1883              rowCount++;
1884            } catch (RegionTooBusyException e) {
1885              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1886              Threads.sleep(pause);
1887            }
1888          }
1889        }
1890      }
1891      if (flush) {
1892        r.flush(true);
1893      }
1894    }
1895    return rowCount;
1896  }
1897
1898  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1899    throws IOException {
1900    for (int i = startRow; i < endRow; i++) {
1901      byte[] data = Bytes.toBytes(String.valueOf(i));
1902      Put put = new Put(data);
1903      put.addColumn(f, null, data);
1904      t.put(put);
1905    }
1906  }
1907
1908  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1909    throws IOException {
1910    for (int i = 0; i < totalRows; i++) {
1911      byte[] row = new byte[rowSize];
1912      Bytes.random(row);
1913      Put put = new Put(row);
1914      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1915      t.put(put);
1916    }
1917  }
1918
1919  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
1920    int replicaId) throws IOException {
1921    for (int i = startRow; i < endRow; i++) {
1922      String failMsg = "Failed verification of row :" + i;
1923      byte[] data = Bytes.toBytes(String.valueOf(i));
1924      Get get = new Get(data);
1925      get.setReplicaId(replicaId);
1926      get.setConsistency(Consistency.TIMELINE);
1927      Result result = table.get(get);
1928      assertTrue(failMsg, result.containsColumn(f, null));
1929      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
1930      Cell cell = result.getColumnLatestCell(f, null);
1931      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
1932        cell.getValueOffset(), cell.getValueLength()));
1933    }
1934  }
1935
1936  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
1937    throws IOException {
1938    verifyNumericRows((HRegion) region, f, startRow, endRow);
1939  }
1940
1941  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
1942    throws IOException {
1943    verifyNumericRows(region, f, startRow, endRow, true);
1944  }
1945
1946  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
1947    final boolean present) throws IOException {
1948    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
1949  }
1950
1951  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
1952    final boolean present) throws IOException {
1953    for (int i = startRow; i < endRow; i++) {
1954      String failMsg = "Failed verification of row :" + i;
1955      byte[] data = Bytes.toBytes(String.valueOf(i));
1956      Result result = region.get(new Get(data));
1957
1958      boolean hasResult = result != null && !result.isEmpty();
1959      assertEquals(failMsg + result, present, hasResult);
1960      if (!present) continue;
1961
1962      assertTrue(failMsg, result.containsColumn(f, null));
1963      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
1964      Cell cell = result.getColumnLatestCell(f, null);
1965      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
1966        cell.getValueOffset(), cell.getValueLength()));
1967    }
1968  }
1969
1970  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1971    throws IOException {
1972    for (int i = startRow; i < endRow; i++) {
1973      byte[] data = Bytes.toBytes(String.valueOf(i));
1974      Delete delete = new Delete(data);
1975      delete.addFamily(f);
1976      t.delete(delete);
1977    }
1978  }
1979
1980  /**
1981   * Return the number of rows in the given table.
1982   * @param table to count rows
1983   * @return count of rows
1984   */
1985  public static int countRows(final Table table) throws IOException {
1986    return countRows(table, new Scan());
1987  }
1988
1989  public static int countRows(final Table table, final Scan scan) throws IOException {
1990    try (ResultScanner results = table.getScanner(scan)) {
1991      int count = 0;
1992      while (results.next() != null) {
1993        count++;
1994      }
1995      return count;
1996    }
1997  }
1998
1999  public static int countRows(final Table table, final byte[]... families) throws IOException {
2000    Scan scan = new Scan();
2001    for (byte[] family : families) {
2002      scan.addFamily(family);
2003    }
2004    return countRows(table, scan);
2005  }
2006
2007  /**
2008   * Return the number of rows in the given table.
2009   */
2010  public int countRows(final TableName tableName) throws IOException {
2011    try (Table table = getConnection().getTable(tableName)) {
2012      return countRows(table);
2013    }
2014  }
2015
2016  public static int countRows(final Region region) throws IOException {
2017    return countRows(region, new Scan());
2018  }
2019
2020  public static int countRows(final Region region, final Scan scan) throws IOException {
2021    try (InternalScanner scanner = region.getScanner(scan)) {
2022      return countRows(scanner);
2023    }
2024  }
2025
2026  public static int countRows(final InternalScanner scanner) throws IOException {
2027    int scannedCount = 0;
2028    List<Cell> results = new ArrayList<>();
2029    boolean hasMore = true;
2030    while (hasMore) {
2031      hasMore = scanner.next(results);
2032      scannedCount += results.size();
2033      results.clear();
2034    }
2035    return scannedCount;
2036  }
2037
2038  /**
2039   * Return an md5 digest of the entire contents of a table.
2040   */
2041  public String checksumRows(final Table table) throws Exception {
2042    MessageDigest digest = MessageDigest.getInstance("MD5");
2043    try (ResultScanner results = table.getScanner(new Scan())) {
2044      for (Result res : results) {
2045        digest.update(res.getRow());
2046      }
2047    }
2048    return digest.toString();
2049  }
2050
2051  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2052  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2053  static {
2054    int i = 0;
2055    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2056      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2057        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2058          ROWS[i][0] = b1;
2059          ROWS[i][1] = b2;
2060          ROWS[i][2] = b3;
2061          i++;
2062        }
2063      }
2064    }
2065  }
2066
2067  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2068    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2069    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2070    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2071    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2072    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2073    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2074
2075  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2076    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2077    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2078    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2079    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2080    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2081    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2082
2083  /**
2084   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2085   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2086   * @return list of region info for regions added to meta
2087   */
2088  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2089    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2090    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2091      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2092      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2093      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2094        TableState.State.ENABLED);
2095      // add custom ones
2096      for (int i = 0; i < startKeys.length; i++) {
2097        int j = (i + 1) % startKeys.length;
2098        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2099          .setEndKey(startKeys[j]).build();
2100        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2101        newRegions.add(hri);
2102      }
2103      return newRegions;
2104    }
2105  }
2106
2107  /**
2108   * Create an unmanaged WAL. Be sure to close it when you're through.
2109   */
2110  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2111    throws IOException {
2112    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2113    // unless I pass along via the conf.
2114    Configuration confForWAL = new Configuration(conf);
2115    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2116    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2117  }
2118
2119  /**
2120   * Create a region with it's own WAL. Be sure to call
2121   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2122   */
2123  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2124    final Configuration conf, final TableDescriptor htd) throws IOException {
2125    return createRegionAndWAL(info, rootDir, conf, htd, true);
2126  }
2127
2128  /**
2129   * Create a region with it's own WAL. Be sure to call
2130   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2131   */
2132  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2133    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2134    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2135    region.setBlockCache(blockCache);
2136    region.initialize();
2137    return region;
2138  }
2139
2140  /**
2141   * Create a region with it's own WAL. Be sure to call
2142   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2143   */
2144  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2145    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2146    throws IOException {
2147    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2148    region.setMobFileCache(mobFileCache);
2149    region.initialize();
2150    return region;
2151  }
2152
2153  /**
2154   * Create a region with it's own WAL. Be sure to call
2155   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2156   */
2157  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2158    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2159    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2160      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2161    WAL wal = createWal(conf, rootDir, info);
2162    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2163  }
2164
2165  /**
2166   * Find any other region server which is different from the one identified by parameter
2167   * @return another region server
2168   */
2169  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2170    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2171      if (!(rst.getRegionServer() == rs)) {
2172        return rst.getRegionServer();
2173      }
2174    }
2175    return null;
2176  }
2177
2178  /**
2179   * Tool to get the reference to the region server object that holds the region of the specified
2180   * user table.
2181   * @param tableName user table to lookup in hbase:meta
2182   * @return region server that holds it, null if the row doesn't exist
2183   */
2184  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2185    throws IOException, InterruptedException {
2186    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2187    if (regions == null || regions.isEmpty()) {
2188      return null;
2189    }
2190    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2191
2192    byte[] firstRegionName =
2193      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2194        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2195
2196    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2197    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2198      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2199    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2200      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2201    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2202    while (retrier.shouldRetry()) {
2203      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2204      if (index != -1) {
2205        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2206      }
2207      // Came back -1. Region may not be online yet. Sleep a while.
2208      retrier.sleepUntilNextRetry();
2209    }
2210    return null;
2211  }
2212
2213  /**
2214   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2215   * @throws IOException When starting the cluster fails.
2216   */
2217  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2218    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2219    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2220      "99.0");
2221    startMiniMapReduceCluster(2);
2222    return mrCluster;
2223  }
2224
2225  /**
2226   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2227   * internal static LOG_DIR variable.
2228   */
2229  private void forceChangeTaskLogDir() {
2230    Field logDirField;
2231    try {
2232      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2233      logDirField.setAccessible(true);
2234
2235      Field modifiersField = ReflectionUtils.getModifiersField();
2236      modifiersField.setAccessible(true);
2237      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2238
2239      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2240    } catch (SecurityException e) {
2241      throw new RuntimeException(e);
2242    } catch (NoSuchFieldException e) {
2243      throw new RuntimeException(e);
2244    } catch (IllegalArgumentException e) {
2245      throw new RuntimeException(e);
2246    } catch (IllegalAccessException e) {
2247      throw new RuntimeException(e);
2248    }
2249  }
2250
2251  /**
2252   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2253   * filesystem.
2254   * @param servers The number of <code>TaskTracker</code>'s to start.
2255   * @throws IOException When starting the cluster fails.
2256   */
2257  private void startMiniMapReduceCluster(final int servers) throws IOException {
2258    if (mrCluster != null) {
2259      throw new IllegalStateException("MiniMRCluster is already running");
2260    }
2261    LOG.info("Starting mini mapreduce cluster...");
2262    setupClusterTestDir();
2263    createDirsAndSetProperties();
2264
2265    forceChangeTaskLogDir();
2266
2267    //// hadoop2 specific settings
2268    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2269    // we up the VM usable so that processes don't get killed.
2270    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2271
2272    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2273    // this avoids the problem by disabling speculative task execution in tests.
2274    conf.setBoolean("mapreduce.map.speculative", false);
2275    conf.setBoolean("mapreduce.reduce.speculative", false);
2276    ////
2277
2278    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2279    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2280    if (JVM.getJVMSpecVersion() >= 17) {
2281      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2282      conf.set("yarn.app.mapreduce.am.command-opts",
2283        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2284    }
2285
2286    // Allow the user to override FS URI for this map-reduce cluster to use.
2287    mrCluster =
2288      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2289        1, null, null, new JobConf(this.conf));
2290    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2291    if (jobConf == null) {
2292      jobConf = mrCluster.createJobConf();
2293    }
2294
2295    // Hadoop MiniMR overwrites this while it should not
2296    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2297    LOG.info("Mini mapreduce cluster started");
2298
2299    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2300    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2301    // necessary config properties here. YARN-129 required adding a few properties.
2302    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2303    // this for mrv2 support; mr1 ignores this
2304    conf.set("mapreduce.framework.name", "yarn");
2305    conf.setBoolean("yarn.is.minicluster", true);
2306    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2307    if (rmAddress != null) {
2308      conf.set("yarn.resourcemanager.address", rmAddress);
2309    }
2310    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2311    if (historyAddress != null) {
2312      conf.set("mapreduce.jobhistory.address", historyAddress);
2313    }
2314    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2315    if (schedulerAddress != null) {
2316      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2317    }
2318    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2319    if (mrJobHistoryWebappAddress != null) {
2320      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2321    }
2322    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2323    if (yarnRMWebappAddress != null) {
2324      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2325    }
2326  }
2327
2328  /**
2329   * Stops the previously started <code>MiniMRCluster</code>.
2330   */
2331  public void shutdownMiniMapReduceCluster() {
2332    if (mrCluster != null) {
2333      LOG.info("Stopping mini mapreduce cluster...");
2334      mrCluster.shutdown();
2335      mrCluster = null;
2336      LOG.info("Mini mapreduce cluster stopped");
2337    }
2338    // Restore configuration to point to local jobtracker
2339    conf.set("mapreduce.jobtracker.address", "local");
2340  }
2341
2342  /**
2343   * Create a stubbed out RegionServerService, mainly for getting FS.
2344   */
2345  public RegionServerServices createMockRegionServerService() throws IOException {
2346    return createMockRegionServerService((ServerName) null);
2347  }
2348
2349  /**
2350   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2351   * TestTokenAuthentication
2352   */
2353  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2354    throws IOException {
2355    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2356    rss.setFileSystem(getTestFileSystem());
2357    rss.setRpcServer(rpc);
2358    return rss;
2359  }
2360
2361  /**
2362   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2363   * TestOpenRegionHandler
2364   */
2365  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2366    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2367    rss.setFileSystem(getTestFileSystem());
2368    return rss;
2369  }
2370
2371  /**
2372   * Expire the Master's session
2373   */
2374  public void expireMasterSession() throws Exception {
2375    HMaster master = getMiniHBaseCluster().getMaster();
2376    expireSession(master.getZooKeeper(), false);
2377  }
2378
2379  /**
2380   * Expire a region server's session
2381   * @param index which RS
2382   */
2383  public void expireRegionServerSession(int index) throws Exception {
2384    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2385    expireSession(rs.getZooKeeper(), false);
2386    decrementMinRegionServerCount();
2387  }
2388
2389  private void decrementMinRegionServerCount() {
2390    // decrement the count for this.conf, for newly spwaned master
2391    // this.hbaseCluster shares this configuration too
2392    decrementMinRegionServerCount(getConfiguration());
2393
2394    // each master thread keeps a copy of configuration
2395    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2396      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2397    }
2398  }
2399
2400  private void decrementMinRegionServerCount(Configuration conf) {
2401    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2402    if (currentCount != -1) {
2403      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2404    }
2405  }
2406
2407  public void expireSession(ZKWatcher nodeZK) throws Exception {
2408    expireSession(nodeZK, false);
2409  }
2410
2411  /**
2412   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2413   * http://hbase.apache.org/book.html#trouble.zookeeper
2414   * <p/>
2415   * There are issues when doing this:
2416   * <ol>
2417   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2418   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2419   * </ol>
2420   * @param nodeZK      - the ZK watcher to expire
2421   * @param checkStatus - true to check if we can create a Table with the current configuration.
2422   */
2423  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2424    Configuration c = new Configuration(this.conf);
2425    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2426    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2427    byte[] password = zk.getSessionPasswd();
2428    long sessionID = zk.getSessionId();
2429
2430    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2431    // so we create a first watcher to be sure that the
2432    // event was sent. We expect that if our watcher receives the event
2433    // other watchers on the same machine will get is as well.
2434    // When we ask to close the connection, ZK does not close it before
2435    // we receive all the events, so don't have to capture the event, just
2436    // closing the connection should be enough.
2437    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2438      @Override
2439      public void process(WatchedEvent watchedEvent) {
2440        LOG.info("Monitor ZKW received event=" + watchedEvent);
2441      }
2442    }, sessionID, password);
2443
2444    // Making it expire
2445    ZooKeeper newZK =
2446      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2447
2448    // ensure that we have connection to the server before closing down, otherwise
2449    // the close session event will be eaten out before we start CONNECTING state
2450    long start = EnvironmentEdgeManager.currentTime();
2451    while (
2452      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2453    ) {
2454      Thread.sleep(1);
2455    }
2456    newZK.close();
2457    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2458
2459    // Now closing & waiting to be sure that the clients get it.
2460    monitor.close();
2461
2462    if (checkStatus) {
2463      getConnection().getTable(TableName.META_TABLE_NAME).close();
2464    }
2465  }
2466
2467  /**
2468   * Get the Mini HBase cluster.
2469   * @return hbase cluster
2470   * @see #getHBaseClusterInterface()
2471   */
2472  public SingleProcessHBaseCluster getHBaseCluster() {
2473    return getMiniHBaseCluster();
2474  }
2475
2476  /**
2477   * Returns the HBaseCluster instance.
2478   * <p>
2479   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2480   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2481   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2482   * instead w/o the need to type-cast.
2483   */
2484  public HBaseClusterInterface getHBaseClusterInterface() {
2485    // implementation note: we should rename this method as #getHBaseCluster(),
2486    // but this would require refactoring 90+ calls.
2487    return hbaseCluster;
2488  }
2489
2490  /**
2491   * Resets the connections so that the next time getConnection() is called, a new connection is
2492   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2493   * the connection is not valid anymore.
2494   * <p/>
2495   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2496   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2497   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2498   * the connection state automatically. Cleaning this is a much bigger refactor.
2499   */
2500  public void invalidateConnection() throws IOException {
2501    closeConnection();
2502    // Update the master addresses if they changed.
2503    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2504    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2505    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2506      masterConfigBefore, masterConfAfter);
2507    conf.set(HConstants.MASTER_ADDRS_KEY,
2508      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2509  }
2510
2511  /**
2512   * Get a shared Connection to the cluster. this method is thread safe.
2513   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2514   */
2515  public Connection getConnection() throws IOException {
2516    return getAsyncConnection().toConnection();
2517  }
2518
2519  /**
2520   * Get a assigned Connection to the cluster. this method is thread safe.
2521   * @param user assigned user
2522   * @return A Connection with assigned user.
2523   */
2524  public Connection getConnection(User user) throws IOException {
2525    return getAsyncConnection(user).toConnection();
2526  }
2527
2528  /**
2529   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2530   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2531   *         of cluster.
2532   */
2533  public AsyncClusterConnection getAsyncConnection() throws IOException {
2534    try {
2535      return asyncConnection.updateAndGet(connection -> {
2536        if (connection == null) {
2537          try {
2538            User user = UserProvider.instantiate(conf).getCurrent();
2539            connection = getAsyncConnection(user);
2540          } catch (IOException ioe) {
2541            throw new UncheckedIOException("Failed to create connection", ioe);
2542          }
2543        }
2544        return connection;
2545      });
2546    } catch (UncheckedIOException exception) {
2547      throw exception.getCause();
2548    }
2549  }
2550
2551  /**
2552   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2553   * @param user assigned user
2554   * @return An AsyncClusterConnection with assigned user.
2555   */
2556  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2557    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2558  }
2559
2560  public void closeConnection() throws IOException {
2561    if (hbaseAdmin != null) {
2562      Closeables.close(hbaseAdmin, true);
2563      hbaseAdmin = null;
2564    }
2565    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2566    if (asyncConnection != null) {
2567      Closeables.close(asyncConnection, true);
2568    }
2569  }
2570
2571  /**
2572   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2573   * it has no effect, it will be closed automatically when the cluster shutdowns
2574   */
2575  public Admin getAdmin() throws IOException {
2576    if (hbaseAdmin == null) {
2577      this.hbaseAdmin = getConnection().getAdmin();
2578    }
2579    return hbaseAdmin;
2580  }
2581
2582  private Admin hbaseAdmin = null;
2583
2584  /**
2585   * Returns an {@link Hbck} instance. Needs be closed when done.
2586   */
2587  public Hbck getHbck() throws IOException {
2588    return getConnection().getHbck();
2589  }
2590
2591  /**
2592   * Unassign the named region.
2593   * @param regionName The region to unassign.
2594   */
2595  public void unassignRegion(String regionName) throws IOException {
2596    unassignRegion(Bytes.toBytes(regionName));
2597  }
2598
2599  /**
2600   * Unassign the named region.
2601   * @param regionName The region to unassign.
2602   */
2603  public void unassignRegion(byte[] regionName) throws IOException {
2604    getAdmin().unassign(regionName);
2605  }
2606
2607  /**
2608   * Closes the region containing the given row.
2609   * @param row   The row to find the containing region.
2610   * @param table The table to find the region.
2611   */
2612  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2613    unassignRegionByRow(Bytes.toBytes(row), table);
2614  }
2615
2616  /**
2617   * Closes the region containing the given row.
2618   * @param row   The row to find the containing region.
2619   * @param table The table to find the region. n
2620   */
2621  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2622    HRegionLocation hrl = table.getRegionLocation(row);
2623    unassignRegion(hrl.getRegion().getRegionName());
2624  }
2625
2626  /**
2627   * Retrieves a splittable region randomly from tableName
2628   * @param tableName   name of table
2629   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2630   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2631   */
2632  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2633    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2634    int regCount = regions.size();
2635    Set<Integer> attempted = new HashSet<>();
2636    int idx;
2637    int attempts = 0;
2638    do {
2639      regions = getHBaseCluster().getRegions(tableName);
2640      if (regCount != regions.size()) {
2641        // if there was region movement, clear attempted Set
2642        attempted.clear();
2643      }
2644      regCount = regions.size();
2645      // There are chances that before we get the region for the table from an RS the region may
2646      // be going for CLOSE. This may be because online schema change is enabled
2647      if (regCount > 0) {
2648        idx = ThreadLocalRandom.current().nextInt(regCount);
2649        // if we have just tried this region, there is no need to try again
2650        if (attempted.contains(idx)) {
2651          continue;
2652        }
2653        HRegion region = regions.get(idx);
2654        if (region.checkSplit().isPresent()) {
2655          return region;
2656        }
2657        attempted.add(idx);
2658      }
2659      attempts++;
2660    } while (maxAttempts == -1 || attempts < maxAttempts);
2661    return null;
2662  }
2663
2664  public MiniDFSCluster getDFSCluster() {
2665    return dfsCluster;
2666  }
2667
2668  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2669    setDFSCluster(cluster, true);
2670  }
2671
2672  /**
2673   * Set the MiniDFSCluster
2674   * @param cluster     cluster to use
2675   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2676   *                    is set.
2677   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2678   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2679   */
2680  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2681    throws IllegalStateException, IOException {
2682    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2683      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2684    }
2685    this.dfsCluster = cluster;
2686    this.setFs();
2687  }
2688
2689  public FileSystem getTestFileSystem() throws IOException {
2690    return HFileSystem.get(conf);
2691  }
2692
2693  /**
2694   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2695   * (30 seconds).
2696   * @param table Table to wait on.
2697   */
2698  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2699    waitTableAvailable(table.getName(), 30000);
2700  }
2701
2702  public void waitTableAvailable(TableName table, long timeoutMillis)
2703    throws InterruptedException, IOException {
2704    waitFor(timeoutMillis, predicateTableAvailable(table));
2705  }
2706
2707  /**
2708   * Wait until all regions in a table have been assigned
2709   * @param table         Table to wait on.
2710   * @param timeoutMillis Timeout.
2711   */
2712  public void waitTableAvailable(byte[] table, long timeoutMillis)
2713    throws InterruptedException, IOException {
2714    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2715  }
2716
2717  public String explainTableAvailability(TableName tableName) throws IOException {
2718    StringBuilder msg =
2719      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2720    if (getHBaseCluster().getMaster().isAlive()) {
2721      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2722        .getRegionStates().getRegionAssignments();
2723      final List<Pair<RegionInfo, ServerName>> metaLocations =
2724        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2725      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2726        RegionInfo hri = metaLocation.getFirst();
2727        ServerName sn = metaLocation.getSecond();
2728        if (!assignments.containsKey(hri)) {
2729          msg.append(", region ").append(hri)
2730            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2731        } else if (sn == null) {
2732          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2733        } else if (!sn.equals(assignments.get(hri))) {
2734          msg.append(",  region ").append(hri)
2735            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2736            .append(" <> ").append(assignments.get(hri));
2737        }
2738      }
2739    }
2740    return msg.toString();
2741  }
2742
2743  public String explainTableState(final TableName table, TableState.State state)
2744    throws IOException {
2745    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2746    if (tableState == null) {
2747      return "TableState in META: No table state in META for table " + table
2748        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2749    } else if (!tableState.inStates(state)) {
2750      return "TableState in META: Not " + state + " state, but " + tableState;
2751    } else {
2752      return "TableState in META: OK";
2753    }
2754  }
2755
2756  @Nullable
2757  public TableState findLastTableState(final TableName table) throws IOException {
2758    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2759    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2760      @Override
2761      public boolean visit(Result r) throws IOException {
2762        if (!Arrays.equals(r.getRow(), table.getName())) {
2763          return false;
2764        }
2765        TableState state = CatalogFamilyFormat.getTableState(r);
2766        if (state != null) {
2767          lastTableState.set(state);
2768        }
2769        return true;
2770      }
2771    };
2772    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2773      Integer.MAX_VALUE, visitor);
2774    return lastTableState.get();
2775  }
2776
2777  /**
2778   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2779   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2780   * table.
2781   * @param table the table to wait on.
2782   * @throws InterruptedException if interrupted while waiting
2783   * @throws IOException          if an IO problem is encountered
2784   */
2785  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2786    waitTableEnabled(table, 30000);
2787  }
2788
2789  /**
2790   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2791   * have been all assigned.
2792   * @see #waitTableEnabled(TableName, long)
2793   * @param table         Table to wait on.
2794   * @param timeoutMillis Time to wait on it being marked enabled.
2795   */
2796  public void waitTableEnabled(byte[] table, long timeoutMillis)
2797    throws InterruptedException, IOException {
2798    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2799  }
2800
2801  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2802    waitFor(timeoutMillis, predicateTableEnabled(table));
2803  }
2804
2805  /**
2806   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2807   * after default period (30 seconds)
2808   * @param table Table to wait on.
2809   */
2810  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2811    waitTableDisabled(table, 30000);
2812  }
2813
2814  public void waitTableDisabled(TableName table, long millisTimeout)
2815    throws InterruptedException, IOException {
2816    waitFor(millisTimeout, predicateTableDisabled(table));
2817  }
2818
2819  /**
2820   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2821   * @param table         Table to wait on.
2822   * @param timeoutMillis Time to wait on it being marked disabled.
2823   */
2824  public void waitTableDisabled(byte[] table, long timeoutMillis)
2825    throws InterruptedException, IOException {
2826    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2827  }
2828
2829  /**
2830   * Make sure that at least the specified number of region servers are running
2831   * @param num minimum number of region servers that should be running
2832   * @return true if we started some servers
2833   */
2834  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2835    boolean startedServer = false;
2836    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2837    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2838      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2839      startedServer = true;
2840    }
2841
2842    return startedServer;
2843  }
2844
2845  /**
2846   * Make sure that at least the specified number of region servers are running. We don't count the
2847   * ones that are currently stopping or are stopped.
2848   * @param num minimum number of region servers that should be running
2849   * @return true if we started some servers
2850   */
2851  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2852    boolean startedServer = ensureSomeRegionServersAvailable(num);
2853
2854    int nonStoppedServers = 0;
2855    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2856
2857      HRegionServer hrs = rst.getRegionServer();
2858      if (hrs.isStopping() || hrs.isStopped()) {
2859        LOG.info("A region server is stopped or stopping:" + hrs);
2860      } else {
2861        nonStoppedServers++;
2862      }
2863    }
2864    for (int i = nonStoppedServers; i < num; ++i) {
2865      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2866      startedServer = true;
2867    }
2868    return startedServer;
2869  }
2870
2871  /**
2872   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2873   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2874   * @param c                     Initial configuration
2875   * @param differentiatingSuffix Suffix to differentiate this user from others.
2876   * @return A new configuration instance with a different user set into it.
2877   */
2878  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2879    throws IOException {
2880    FileSystem currentfs = FileSystem.get(c);
2881    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2882      return User.getCurrent();
2883    }
2884    // Else distributed filesystem. Make a new instance per daemon. Below
2885    // code is taken from the AppendTestUtil over in hdfs.
2886    String username = User.getCurrent().getName() + differentiatingSuffix;
2887    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2888    return user;
2889  }
2890
2891  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2892    throws IOException {
2893    NavigableSet<String> online = new TreeSet<>();
2894    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2895      try {
2896        for (RegionInfo region : ProtobufUtil
2897          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2898          online.add(region.getRegionNameAsString());
2899        }
2900      } catch (RegionServerStoppedException e) {
2901        // That's fine.
2902      }
2903    }
2904    return online;
2905  }
2906
2907  /**
2908   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2909   * linger. Here is the exception you'll see:
2910   *
2911   * <pre>
2912   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2913   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2914   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2915   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2916   * </pre>
2917   *
2918   * @param stream A DFSClient.DFSOutputStream.
2919   */
2920  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2921    try {
2922      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2923      for (Class<?> clazz : clazzes) {
2924        String className = clazz.getSimpleName();
2925        if (className.equals("DFSOutputStream")) {
2926          if (clazz.isInstance(stream)) {
2927            Field maxRecoveryErrorCountField =
2928              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2929            maxRecoveryErrorCountField.setAccessible(true);
2930            maxRecoveryErrorCountField.setInt(stream, max);
2931            break;
2932          }
2933        }
2934      }
2935    } catch (Exception e) {
2936      LOG.info("Could not set max recovery field", e);
2937    }
2938  }
2939
2940  /**
2941   * Uses directly the assignment manager to assign the region. and waits until the specified region
2942   * has completed assignment.
2943   * @return true if the region is assigned false otherwise.
2944   */
2945  public boolean assignRegion(final RegionInfo regionInfo)
2946    throws IOException, InterruptedException {
2947    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
2948    am.assign(regionInfo);
2949    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
2950  }
2951
2952  /**
2953   * Move region to destination server and wait till region is completely moved and online
2954   * @param destRegion region to move
2955   * @param destServer destination server of the region
2956   */
2957  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
2958    throws InterruptedException, IOException {
2959    HMaster master = getMiniHBaseCluster().getMaster();
2960    // TODO: Here we start the move. The move can take a while.
2961    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
2962    while (true) {
2963      ServerName serverName =
2964        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
2965      if (serverName != null && serverName.equals(destServer)) {
2966        assertRegionOnServer(destRegion, serverName, 2000);
2967        break;
2968      }
2969      Thread.sleep(10);
2970    }
2971  }
2972
2973  /**
2974   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
2975   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
2976   * master has been informed and updated hbase:meta with the regions deployed server.
2977   * @param tableName the table name
2978   */
2979  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
2980    waitUntilAllRegionsAssigned(tableName,
2981      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
2982  }
2983
2984  /**
2985   * Waith until all system table's regions get assigned
2986   */
2987  public void waitUntilAllSystemRegionsAssigned() throws IOException {
2988    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
2989  }
2990
2991  /**
2992   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
2993   * timeout. This means all regions have been deployed, master has been informed and updated
2994   * hbase:meta with the regions deployed server.
2995   * @param tableName the table name
2996   * @param timeout   timeout, in milliseconds
2997   */
2998  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
2999    throws IOException {
3000    if (!TableName.isMetaTableName(tableName)) {
3001      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3002        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3003          + timeout + "ms");
3004        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3005          @Override
3006          public String explainFailure() throws IOException {
3007            return explainTableAvailability(tableName);
3008          }
3009
3010          @Override
3011          public boolean evaluate() throws IOException {
3012            Scan scan = new Scan();
3013            scan.addFamily(HConstants.CATALOG_FAMILY);
3014            boolean tableFound = false;
3015            try (ResultScanner s = meta.getScanner(scan)) {
3016              for (Result r; (r = s.next()) != null;) {
3017                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3018                RegionInfo info = RegionInfo.parseFromOrNull(b);
3019                if (info != null && info.getTable().equals(tableName)) {
3020                  // Get server hosting this region from catalog family. Return false if no server
3021                  // hosting this region, or if the server hosting this region was recently killed
3022                  // (for fault tolerance testing).
3023                  tableFound = true;
3024                  byte[] server =
3025                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3026                  if (server == null) {
3027                    return false;
3028                  } else {
3029                    byte[] startCode =
3030                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3031                    ServerName serverName =
3032                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3033                        + Bytes.toLong(startCode));
3034                    if (
3035                      !getHBaseClusterInterface().isDistributedCluster()
3036                        && getHBaseCluster().isKilledRS(serverName)
3037                    ) {
3038                      return false;
3039                    }
3040                  }
3041                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3042                    return false;
3043                  }
3044                }
3045              }
3046            }
3047            if (!tableFound) {
3048              LOG.warn(
3049                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3050            }
3051            return tableFound;
3052          }
3053        });
3054      }
3055    }
3056    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3057    // check from the master state if we are using a mini cluster
3058    if (!getHBaseClusterInterface().isDistributedCluster()) {
3059      // So, all regions are in the meta table but make sure master knows of the assignments before
3060      // returning -- sometimes this can lag.
3061      HMaster master = getHBaseCluster().getMaster();
3062      final RegionStates states = master.getAssignmentManager().getRegionStates();
3063      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3064        @Override
3065        public String explainFailure() throws IOException {
3066          return explainTableAvailability(tableName);
3067        }
3068
3069        @Override
3070        public boolean evaluate() throws IOException {
3071          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3072          return hris != null && !hris.isEmpty();
3073        }
3074      });
3075    }
3076    LOG.info("All regions for table " + tableName + " assigned.");
3077  }
3078
3079  /**
3080   * Do a small get/scan against one store. This is required because store has no actual methods of
3081   * querying itself, and relies on StoreScanner.
3082   */
3083  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3084    Scan scan = new Scan(get);
3085    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3086      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3087      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3088      // readpoint 0.
3089      0);
3090
3091    List<Cell> result = new ArrayList<>();
3092    scanner.next(result);
3093    if (!result.isEmpty()) {
3094      // verify that we are on the row we want:
3095      Cell kv = result.get(0);
3096      if (!CellUtil.matchingRows(kv, get.getRow())) {
3097        result.clear();
3098      }
3099    }
3100    scanner.close();
3101    return result;
3102  }
3103
3104  /**
3105   * Create region split keys between startkey and endKey
3106   * @param numRegions the number of regions to be created. it has to be greater than 3.
3107   * @return resulting split keys
3108   */
3109  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3110    assertTrue(numRegions > 3);
3111    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3112    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3113    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3114    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3115    return result;
3116  }
3117
3118  /**
3119   * Do a small get/scan against one store. This is required because store has no actual methods of
3120   * querying itself, and relies on StoreScanner.
3121   */
3122  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3123    throws IOException {
3124    Get get = new Get(row);
3125    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3126    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3127
3128    return getFromStoreFile(store, get);
3129  }
3130
3131  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3132    final List<? extends Cell> actual) {
3133    final int eLen = expected.size();
3134    final int aLen = actual.size();
3135    final int minLen = Math.min(eLen, aLen);
3136
3137    int i = 0;
3138    while (
3139      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3140    ) {
3141      i++;
3142    }
3143
3144    if (additionalMsg == null) {
3145      additionalMsg = "";
3146    }
3147    if (!additionalMsg.isEmpty()) {
3148      additionalMsg = ". " + additionalMsg;
3149    }
3150
3151    if (eLen != aLen || i != minLen) {
3152      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3153        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3154        + " (length " + aLen + ")" + additionalMsg);
3155    }
3156  }
3157
3158  public static <T> String safeGetAsStr(List<T> lst, int i) {
3159    if (0 <= i && i < lst.size()) {
3160      return lst.get(i).toString();
3161    } else {
3162      return "<out_of_range>";
3163    }
3164  }
3165
3166  public String getClusterKey() {
3167    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3168      + ":"
3169      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3170  }
3171
3172  /**
3173   * Creates a random table with the given parameters
3174   */
3175  public Table createRandomTable(TableName tableName, final Collection<String> families,
3176    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3177    final int numRowsPerFlush) throws IOException, InterruptedException {
3178    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3179      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3180      + maxVersions + "\n");
3181
3182    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3183    final int numCF = families.size();
3184    final byte[][] cfBytes = new byte[numCF][];
3185    {
3186      int cfIndex = 0;
3187      for (String cf : families) {
3188        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3189      }
3190    }
3191
3192    final int actualStartKey = 0;
3193    final int actualEndKey = Integer.MAX_VALUE;
3194    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3195    final int splitStartKey = actualStartKey + keysPerRegion;
3196    final int splitEndKey = actualEndKey - keysPerRegion;
3197    final String keyFormat = "%08x";
3198    final Table table = createTable(tableName, cfBytes, maxVersions,
3199      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3200      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3201
3202    if (hbaseCluster != null) {
3203      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3204    }
3205
3206    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3207
3208    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3209      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3210        final byte[] row = Bytes.toBytes(
3211          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3212
3213        Put put = new Put(row);
3214        Delete del = new Delete(row);
3215        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3216          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3217          final long ts = rand.nextInt();
3218          final byte[] qual = Bytes.toBytes("col" + iCol);
3219          if (rand.nextBoolean()) {
3220            final byte[] value =
3221              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3222                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3223            put.addColumn(cf, qual, ts, value);
3224          } else if (rand.nextDouble() < 0.8) {
3225            del.addColumn(cf, qual, ts);
3226          } else {
3227            del.addColumns(cf, qual, ts);
3228          }
3229        }
3230
3231        if (!put.isEmpty()) {
3232          mutator.mutate(put);
3233        }
3234
3235        if (!del.isEmpty()) {
3236          mutator.mutate(del);
3237        }
3238      }
3239      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3240      mutator.flush();
3241      if (hbaseCluster != null) {
3242        getMiniHBaseCluster().flushcache(table.getName());
3243      }
3244    }
3245    mutator.close();
3246
3247    return table;
3248  }
3249
3250  public static int randomFreePort() {
3251    return HBaseCommonTestingUtil.randomFreePort();
3252  }
3253
3254  public static String randomMultiCastAddress() {
3255    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3256  }
3257
3258  public static void waitForHostPort(String host, int port) throws IOException {
3259    final int maxTimeMs = 10000;
3260    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3261    IOException savedException = null;
3262    LOG.info("Waiting for server at " + host + ":" + port);
3263    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3264      try {
3265        Socket sock = new Socket(InetAddress.getByName(host), port);
3266        sock.close();
3267        savedException = null;
3268        LOG.info("Server at " + host + ":" + port + " is available");
3269        break;
3270      } catch (UnknownHostException e) {
3271        throw new IOException("Failed to look up " + host, e);
3272      } catch (IOException e) {
3273        savedException = e;
3274      }
3275      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3276    }
3277
3278    if (savedException != null) {
3279      throw savedException;
3280    }
3281  }
3282
3283  /**
3284   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3285   * continues.
3286   * @return the number of regions the table was split into
3287   */
3288  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3289    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3290    throws IOException {
3291    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3292      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3293  }
3294
3295  /**
3296   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3297   * continues.
3298   * @return the number of regions the table was split into
3299   */
3300  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3301    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3302    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3303    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3304    builder.setDurability(durability);
3305    builder.setRegionReplication(regionReplication);
3306    ColumnFamilyDescriptorBuilder cfBuilder =
3307      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3308    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3309    cfBuilder.setCompressionType(compression);
3310    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3311      numRegionsPerServer);
3312  }
3313
3314  /**
3315   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3316   * continues.
3317   * @return the number of regions the table was split into
3318   */
3319  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3320    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3321    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3322    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3323    builder.setDurability(durability);
3324    builder.setRegionReplication(regionReplication);
3325    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3326    for (int i = 0; i < columnFamilies.length; i++) {
3327      ColumnFamilyDescriptorBuilder cfBuilder =
3328        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3329      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3330      cfBuilder.setCompressionType(compression);
3331      hcds[i] = cfBuilder.build();
3332    }
3333    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3334  }
3335
3336  /**
3337   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3338   * continues.
3339   * @return the number of regions the table was split into
3340   */
3341  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3342    ColumnFamilyDescriptor hcd) throws IOException {
3343    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3344  }
3345
3346  /**
3347   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3348   * continues.
3349   * @return the number of regions the table was split into
3350   */
3351  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3352    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3353    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3354      numRegionsPerServer);
3355  }
3356
3357  /**
3358   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3359   * continues.
3360   * @return the number of regions the table was split into
3361   */
3362  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3363    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3364    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3365      numRegionsPerServer);
3366  }
3367
3368  /**
3369   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3370   * continues.
3371   * @return the number of regions the table was split into
3372   */
3373  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3374    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3375    throws IOException {
3376    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3377    for (ColumnFamilyDescriptor cd : cds) {
3378      if (!td.hasColumnFamily(cd.getName())) {
3379        builder.setColumnFamily(cd);
3380      }
3381    }
3382    td = builder.build();
3383    int totalNumberOfRegions = 0;
3384    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3385    Admin admin = unmanagedConnection.getAdmin();
3386
3387    try {
3388      // create a table a pre-splits regions.
3389      // The number of splits is set as:
3390      // region servers * regions per region server).
3391      int numberOfServers = admin.getRegionServers().size();
3392      if (numberOfServers == 0) {
3393        throw new IllegalStateException("No live regionservers");
3394      }
3395
3396      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3397      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3398        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3399        + numRegionsPerServer + ")");
3400
3401      byte[][] splits = splitter.split(totalNumberOfRegions);
3402
3403      admin.createTable(td, splits);
3404    } catch (MasterNotRunningException e) {
3405      LOG.error("Master not running", e);
3406      throw new IOException(e);
3407    } catch (TableExistsException e) {
3408      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3409    } finally {
3410      admin.close();
3411      unmanagedConnection.close();
3412    }
3413    return totalNumberOfRegions;
3414  }
3415
3416  public static int getMetaRSPort(Connection connection) throws IOException {
3417    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3418      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3419    }
3420  }
3421
3422  /**
3423   * Due to async racing issue, a region may not be in the online region list of a region server
3424   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3425   */
3426  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3427    final long timeout) throws IOException, InterruptedException {
3428    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3429    while (true) {
3430      List<RegionInfo> regions = getAdmin().getRegions(server);
3431      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3432      long now = EnvironmentEdgeManager.currentTime();
3433      if (now > timeoutTime) break;
3434      Thread.sleep(10);
3435    }
3436    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3437  }
3438
3439  /**
3440   * Check to make sure the region is open on the specified region server, but not on any other one.
3441   */
3442  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3443    final long timeout) throws IOException, InterruptedException {
3444    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3445    while (true) {
3446      List<RegionInfo> regions = getAdmin().getRegions(server);
3447      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3448        List<JVMClusterUtil.RegionServerThread> rsThreads =
3449          getHBaseCluster().getLiveRegionServerThreads();
3450        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3451          HRegionServer rs = rsThread.getRegionServer();
3452          if (server.equals(rs.getServerName())) {
3453            continue;
3454          }
3455          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3456          for (HRegion r : hrs) {
3457            assertTrue("Region should not be double assigned",
3458              r.getRegionInfo().getRegionId() != hri.getRegionId());
3459          }
3460        }
3461        return; // good, we are happy
3462      }
3463      long now = EnvironmentEdgeManager.currentTime();
3464      if (now > timeoutTime) break;
3465      Thread.sleep(10);
3466    }
3467    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3468  }
3469
3470  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3471    TableDescriptor td =
3472      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3473    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3474    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3475  }
3476
3477  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3478    BlockCache blockCache) throws IOException {
3479    TableDescriptor td =
3480      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3481    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3482    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3483  }
3484
3485  public static void setFileSystemURI(String fsURI) {
3486    FS_URI = fsURI;
3487  }
3488
3489  /**
3490   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3491   */
3492  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3493    return new ExplainingPredicate<IOException>() {
3494      @Override
3495      public String explainFailure() throws IOException {
3496        final RegionStates regionStates =
3497          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3498        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3499      }
3500
3501      @Override
3502      public boolean evaluate() throws IOException {
3503        HMaster master = getMiniHBaseCluster().getMaster();
3504        if (master == null) return false;
3505        AssignmentManager am = master.getAssignmentManager();
3506        if (am == null) return false;
3507        return !am.hasRegionsInTransition();
3508      }
3509    };
3510  }
3511
3512  /**
3513   * Returns a {@link Predicate} for checking that table is enabled
3514   */
3515  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3516    return new ExplainingPredicate<IOException>() {
3517      @Override
3518      public String explainFailure() throws IOException {
3519        return explainTableState(tableName, TableState.State.ENABLED);
3520      }
3521
3522      @Override
3523      public boolean evaluate() throws IOException {
3524        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3525      }
3526    };
3527  }
3528
3529  /**
3530   * Returns a {@link Predicate} for checking that table is enabled
3531   */
3532  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3533    return new ExplainingPredicate<IOException>() {
3534      @Override
3535      public String explainFailure() throws IOException {
3536        return explainTableState(tableName, TableState.State.DISABLED);
3537      }
3538
3539      @Override
3540      public boolean evaluate() throws IOException {
3541        return getAdmin().isTableDisabled(tableName);
3542      }
3543    };
3544  }
3545
3546  /**
3547   * Returns a {@link Predicate} for checking that table is enabled
3548   */
3549  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3550    return new ExplainingPredicate<IOException>() {
3551      @Override
3552      public String explainFailure() throws IOException {
3553        return explainTableAvailability(tableName);
3554      }
3555
3556      @Override
3557      public boolean evaluate() throws IOException {
3558        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3559        if (tableAvailable) {
3560          try (Table table = getConnection().getTable(tableName)) {
3561            TableDescriptor htd = table.getDescriptor();
3562            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3563              .getAllRegionLocations()) {
3564              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3565                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3566                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3567              for (byte[] family : htd.getColumnFamilyNames()) {
3568                scan.addFamily(family);
3569              }
3570              try (ResultScanner scanner = table.getScanner(scan)) {
3571                scanner.next();
3572              }
3573            }
3574          }
3575        }
3576        return tableAvailable;
3577      }
3578    };
3579  }
3580
3581  /**
3582   * Wait until no regions in transition.
3583   * @param timeout How long to wait.
3584   */
3585  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3586    waitFor(timeout, predicateNoRegionsInTransition());
3587  }
3588
3589  /**
3590   * Wait until no regions in transition. (time limit 15min) n
3591   */
3592  public void waitUntilNoRegionsInTransition() throws IOException {
3593    waitUntilNoRegionsInTransition(15 * 60000);
3594  }
3595
3596  /**
3597   * Wait until labels is ready in VisibilityLabelsCache.
3598   */
3599  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3600    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3601    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3602
3603      @Override
3604      public boolean evaluate() {
3605        for (String label : labels) {
3606          if (labelsCache.getLabelOrdinal(label) == 0) {
3607            return false;
3608          }
3609        }
3610        return true;
3611      }
3612
3613      @Override
3614      public String explainFailure() {
3615        for (String label : labels) {
3616          if (labelsCache.getLabelOrdinal(label) == 0) {
3617            return label + " is not available yet";
3618          }
3619        }
3620        return "";
3621      }
3622    });
3623  }
3624
3625  /**
3626   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3627   * available.
3628   * @return the list of column descriptors
3629   */
3630  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3631    return generateColumnDescriptors("");
3632  }
3633
3634  /**
3635   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3636   * available.
3637   * @param prefix family names prefix
3638   * @return the list of column descriptors
3639   */
3640  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3641    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3642    long familyId = 0;
3643    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3644      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3645        for (BloomType bloomType : BloomType.values()) {
3646          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3647          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3648            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3649          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3650          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3651          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3652          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3653          familyId++;
3654        }
3655      }
3656    }
3657    return columnFamilyDescriptors;
3658  }
3659
3660  /**
3661   * Get supported compression algorithms.
3662   * @return supported compression algorithms.
3663   */
3664  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3665    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3666    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3667    for (String algoName : allAlgos) {
3668      try {
3669        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3670        algo.getCompressor();
3671        supportedAlgos.add(algo);
3672      } catch (Throwable t) {
3673        // this algo is not available
3674      }
3675    }
3676    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3677  }
3678
3679  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3680    Scan scan = new Scan().withStartRow(row);
3681    scan.setReadType(ReadType.PREAD);
3682    scan.setCaching(1);
3683    scan.setReversed(true);
3684    scan.addFamily(family);
3685    try (RegionScanner scanner = r.getScanner(scan)) {
3686      List<Cell> cells = new ArrayList<>(1);
3687      scanner.next(cells);
3688      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3689        return null;
3690      }
3691      return Result.create(cells);
3692    }
3693  }
3694
3695  private boolean isTargetTable(final byte[] inRow, Cell c) {
3696    String inputRowString = Bytes.toString(inRow);
3697    int i = inputRowString.indexOf(HConstants.DELIMITER);
3698    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3699    int o = outputRowString.indexOf(HConstants.DELIMITER);
3700    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3701  }
3702
3703  /**
3704   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3705   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3706   * kerby KDC server and utility for using it,
3707   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3708   * less baggage. It came in in HBASE-5291.
3709   */
3710  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3711    Properties conf = MiniKdc.createConf();
3712    conf.put(MiniKdc.DEBUG, true);
3713    MiniKdc kdc = null;
3714    File dir = null;
3715    // There is time lag between selecting a port and trying to bind with it. It's possible that
3716    // another service captures the port in between which'll result in BindException.
3717    boolean bindException;
3718    int numTries = 0;
3719    do {
3720      try {
3721        bindException = false;
3722        dir = new File(getDataTestDir("kdc").toUri().getPath());
3723        kdc = new MiniKdc(conf, dir);
3724        kdc.start();
3725      } catch (BindException e) {
3726        FileUtils.deleteDirectory(dir); // clean directory
3727        numTries++;
3728        if (numTries == 3) {
3729          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3730          throw e;
3731        }
3732        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3733        bindException = true;
3734      }
3735    } while (bindException);
3736    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3737    return kdc;
3738  }
3739
3740  public int getNumHFiles(final TableName tableName, final byte[] family) {
3741    int numHFiles = 0;
3742    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3743      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3744    }
3745    return numHFiles;
3746  }
3747
3748  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3749    final byte[] family) {
3750    int numHFiles = 0;
3751    for (Region region : rs.getRegions(tableName)) {
3752      numHFiles += region.getStore(family).getStorefilesCount();
3753    }
3754    return numHFiles;
3755  }
3756
3757  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3758    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3759    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3760    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3761    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3762    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3763        it2 = rtdFamilies.iterator(); it.hasNext();) {
3764      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3765    }
3766  }
3767
3768  /**
3769   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3770   * invocations.
3771   */
3772  public static void await(final long sleepMillis, final BooleanSupplier condition)
3773    throws InterruptedException {
3774    try {
3775      while (!condition.getAsBoolean()) {
3776        Thread.sleep(sleepMillis);
3777      }
3778    } catch (RuntimeException e) {
3779      if (e.getCause() instanceof AssertionError) {
3780        throw (AssertionError) e.getCause();
3781      }
3782      throw e;
3783    }
3784  }
3785}