001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.UncheckedIOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.Modifier;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.BlockCache;
096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.ipc.RpcServerInterface;
099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
100import org.apache.hadoop.hbase.master.HMaster;
101import org.apache.hadoop.hbase.master.RegionState;
102import org.apache.hadoop.hbase.master.ServerManager;
103import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
105import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
106import org.apache.hadoop.hbase.master.assignment.RegionStates;
107import org.apache.hadoop.hbase.mob.MobFileCache;
108import org.apache.hadoop.hbase.regionserver.BloomType;
109import org.apache.hadoop.hbase.regionserver.ChunkCreator;
110import org.apache.hadoop.hbase.regionserver.HRegion;
111import org.apache.hadoop.hbase.regionserver.HRegionServer;
112import org.apache.hadoop.hbase.regionserver.HStore;
113import org.apache.hadoop.hbase.regionserver.InternalScanner;
114import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
115import org.apache.hadoop.hbase.regionserver.Region;
116import org.apache.hadoop.hbase.regionserver.RegionScanner;
117import org.apache.hadoop.hbase.regionserver.RegionServerServices;
118import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
119import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
120import org.apache.hadoop.hbase.security.User;
121import org.apache.hadoop.hbase.security.UserProvider;
122import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
123import org.apache.hadoop.hbase.util.Bytes;
124import org.apache.hadoop.hbase.util.CommonFSUtils;
125import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
126import org.apache.hadoop.hbase.util.FSUtils;
127import org.apache.hadoop.hbase.util.JVM;
128import org.apache.hadoop.hbase.util.JVMClusterUtil;
129import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
130import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
131import org.apache.hadoop.hbase.util.Pair;
132import org.apache.hadoop.hbase.util.ReflectionUtils;
133import org.apache.hadoop.hbase.util.RegionSplitter;
134import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
135import org.apache.hadoop.hbase.util.RetryCounter;
136import org.apache.hadoop.hbase.util.Threads;
137import org.apache.hadoop.hbase.wal.WAL;
138import org.apache.hadoop.hbase.wal.WALFactory;
139import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
140import org.apache.hadoop.hbase.zookeeper.ZKConfig;
141import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
142import org.apache.hadoop.hdfs.DFSClient;
143import org.apache.hadoop.hdfs.DistributedFileSystem;
144import org.apache.hadoop.hdfs.MiniDFSCluster;
145import org.apache.hadoop.hdfs.server.datanode.DataNode;
146import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
147import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
148import org.apache.hadoop.mapred.JobConf;
149import org.apache.hadoop.mapred.MiniMRCluster;
150import org.apache.hadoop.mapred.TaskLog;
151import org.apache.hadoop.minikdc.MiniKdc;
152import org.apache.yetus.audience.InterfaceAudience;
153import org.apache.yetus.audience.InterfaceStability;
154import org.apache.zookeeper.WatchedEvent;
155import org.apache.zookeeper.ZooKeeper;
156import org.apache.zookeeper.ZooKeeper.States;
157
158import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
159
160import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
161
162/**
163 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
164 * functionality. Create an instance and keep it around testing HBase.
165 * <p/>
166 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
167 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
168 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
169 * cluster.
170 * <p/>
171 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
172 * <p/>
173 * It does not set logging levels.
174 * <p/>
175 * In the configuration properties, default values for master-info-port and region-server-port are
176 * overridden such that a random port will be assigned (thus avoiding port contention if another
177 * local HBase instance is already running).
178 * <p/>
179 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
180 * setting it to true.
181 */
182@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
183@InterfaceStability.Evolving
184public class HBaseTestingUtil extends HBaseZKTestingUtil {
185
186  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
187  /**
188   * The default number of regions per regionserver when creating a pre-split table.
189   */
190  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
191
192  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
193  public static final boolean PRESPLIT_TEST_TABLE = true;
194
195  private MiniDFSCluster dfsCluster = null;
196  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
197
198  private volatile HBaseClusterInterface hbaseCluster = null;
199  private MiniMRCluster mrCluster = null;
200
201  /** If there is a mini cluster running for this testing utility instance. */
202  private volatile boolean miniClusterRunning;
203
204  private String hadoopLogDir;
205
206  /**
207   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
208   */
209  private Path dataTestDirOnTestFS = null;
210
211  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
212
213  /** Filesystem URI used for map-reduce mini-cluster setup */
214  private static String FS_URI;
215
216  /** This is for unit tests parameterized with a single boolean. */
217  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
218
219  /**
220   * Checks to see if a specific port is available.
221   * @param port the port number to check for availability
222   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
223   */
224  public static boolean available(int port) {
225    ServerSocket ss = null;
226    DatagramSocket ds = null;
227    try {
228      ss = new ServerSocket(port);
229      ss.setReuseAddress(true);
230      ds = new DatagramSocket(port);
231      ds.setReuseAddress(true);
232      return true;
233    } catch (IOException e) {
234      // Do nothing
235    } finally {
236      if (ds != null) {
237        ds.close();
238      }
239
240      if (ss != null) {
241        try {
242          ss.close();
243        } catch (IOException e) {
244          /* should not be thrown */
245        }
246      }
247    }
248
249    return false;
250  }
251
252  /**
253   * Create all combinations of Bloom filters and compression algorithms for testing.
254   */
255  private static List<Object[]> bloomAndCompressionCombinations() {
256    List<Object[]> configurations = new ArrayList<>();
257    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
258      for (BloomType bloomType : BloomType.values()) {
259        configurations.add(new Object[] { comprAlgo, bloomType });
260      }
261    }
262    return Collections.unmodifiableList(configurations);
263  }
264
265  /**
266   * Create combination of memstoreTS and tags
267   */
268  private static List<Object[]> memStoreTSAndTagsCombination() {
269    List<Object[]> configurations = new ArrayList<>();
270    configurations.add(new Object[] { false, false });
271    configurations.add(new Object[] { false, true });
272    configurations.add(new Object[] { true, false });
273    configurations.add(new Object[] { true, true });
274    return Collections.unmodifiableList(configurations);
275  }
276
277  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
278    List<Object[]> configurations = new ArrayList<>();
279    configurations.add(new Object[] { false, false, true });
280    configurations.add(new Object[] { false, false, false });
281    configurations.add(new Object[] { false, true, true });
282    configurations.add(new Object[] { false, true, false });
283    configurations.add(new Object[] { true, false, true });
284    configurations.add(new Object[] { true, false, false });
285    configurations.add(new Object[] { true, true, true });
286    configurations.add(new Object[] { true, true, false });
287    return Collections.unmodifiableList(configurations);
288  }
289
290  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
291    bloomAndCompressionCombinations();
292
293  /**
294   * <p>
295   * Create an HBaseTestingUtility using a default configuration.
296   * <p>
297   * Initially, all tmp files are written to a local test data directory. Once
298   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
299   * data will be written to the DFS directory instead.
300   */
301  public HBaseTestingUtil() {
302    this(HBaseConfiguration.create());
303  }
304
305  /**
306   * <p>
307   * Create an HBaseTestingUtility using a given configuration.
308   * <p>
309   * Initially, all tmp files are written to a local test data directory. Once
310   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
311   * data will be written to the DFS directory instead.
312   * @param conf The configuration to use for further operations
313   */
314  public HBaseTestingUtil(@Nullable Configuration conf) {
315    super(conf);
316
317    // a hbase checksum verification failure will cause unit tests to fail
318    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
319
320    // Save this for when setting default file:// breaks things
321    if (this.conf.get("fs.defaultFS") != null) {
322      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
323    }
324    if (this.conf.get(HConstants.HBASE_DIR) != null) {
325      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
326    }
327    // Every cluster is a local cluster until we start DFS
328    // Note that conf could be null, but this.conf will not be
329    String dataTestDir = getDataTestDir().toString();
330    this.conf.set("fs.defaultFS", "file:///");
331    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
332    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
333    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
334    // If the value for random ports isn't set set it to true, thus making
335    // tests opt-out for random port assignment
336    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
337      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
338  }
339
340  /**
341   * Close both the region {@code r} and it's underlying WAL. For use in tests.
342   */
343  public static void closeRegionAndWAL(final Region r) throws IOException {
344    closeRegionAndWAL((HRegion) r);
345  }
346
347  /**
348   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
349   */
350  public static void closeRegionAndWAL(final HRegion r) throws IOException {
351    if (r == null) return;
352    r.close();
353    if (r.getWAL() == null) return;
354    r.getWAL().close();
355  }
356
357  /**
358   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
359   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
360   * by the Configuration. If say, a Connection was being used against a cluster that had been
361   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
362   * Rather than use the return direct, its usually best to make a copy and use that. Do
363   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
364   * @return Instance of Configuration.
365   */
366  @Override
367  public Configuration getConfiguration() {
368    return super.getConfiguration();
369  }
370
371  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
372    this.hbaseCluster = hbaseCluster;
373  }
374
375  /**
376   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
377   * have many concurrent tests running if we need to. Moding a System property is not the way to do
378   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
379   * not anything can do about it at moment; single instance only is how the minidfscluster works.
380   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
381   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
382   * (We do not create them!).
383   * @return The calculated data test build directory, if newly-created.
384   */
385  @Override
386  protected Path setupDataTestDir() {
387    Path testPath = super.setupDataTestDir();
388    if (null == testPath) {
389      return null;
390    }
391
392    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
393
394    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
395    // we want our own value to ensure uniqueness on the same machine
396    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
397
398    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
399    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
400    return testPath;
401  }
402
403  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
404
405    String sysValue = System.getProperty(propertyName);
406
407    if (sysValue != null) {
408      // There is already a value set. So we do nothing but hope
409      // that there will be no conflicts
410      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
411        + " so I do NOT create it in " + parent);
412      String confValue = conf.get(propertyName);
413      if (confValue != null && !confValue.endsWith(sysValue)) {
414        LOG.warn(propertyName + " property value differs in configuration and system: "
415          + "Configuration=" + confValue + " while System=" + sysValue
416          + " Erasing configuration value by system value.");
417      }
418      conf.set(propertyName, sysValue);
419    } else {
420      // Ok, it's not set, so we create it as a subdirectory
421      createSubDir(propertyName, parent, subDirName);
422      System.setProperty(propertyName, conf.get(propertyName));
423    }
424  }
425
426  /**
427   * @return Where to write test data on the test filesystem; Returns working directory for the test
428   *         filesystem by default
429   * @see #setupDataTestDirOnTestFS()
430   * @see #getTestFileSystem()
431   */
432  private Path getBaseTestDirOnTestFS() throws IOException {
433    FileSystem fs = getTestFileSystem();
434    return new Path(fs.getWorkingDirectory(), "test-data");
435  }
436
437  /**
438   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
439   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
440   * on it.
441   * @return a unique path in the test filesystem
442   */
443  public Path getDataTestDirOnTestFS() throws IOException {
444    if (dataTestDirOnTestFS == null) {
445      setupDataTestDirOnTestFS();
446    }
447
448    return dataTestDirOnTestFS;
449  }
450
451  /**
452   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
453   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
454   * on it.
455   * @return a unique path in the test filesystem
456   * @param subdirName name of the subdir to create under the base test dir
457   */
458  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
459    return new Path(getDataTestDirOnTestFS(), subdirName);
460  }
461
462  /**
463   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
464   * setup.
465   */
466  private void setupDataTestDirOnTestFS() throws IOException {
467    if (dataTestDirOnTestFS != null) {
468      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
469      return;
470    }
471    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
472  }
473
474  /**
475   * Sets up a new path in test filesystem to be used by tests.
476   */
477  private Path getNewDataTestDirOnTestFS() throws IOException {
478    // The file system can be either local, mini dfs, or if the configuration
479    // is supplied externally, it can be an external cluster FS. If it is a local
480    // file system, the tests should use getBaseTestDir, otherwise, we can use
481    // the working directory, and create a unique sub dir there
482    FileSystem fs = getTestFileSystem();
483    Path newDataTestDir;
484    String randomStr = getRandomUUID().toString();
485    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
486      newDataTestDir = new Path(getDataTestDir(), randomStr);
487      File dataTestDir = new File(newDataTestDir.toString());
488      if (deleteOnExit()) dataTestDir.deleteOnExit();
489    } else {
490      Path base = getBaseTestDirOnTestFS();
491      newDataTestDir = new Path(base, randomStr);
492      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
493    }
494    return newDataTestDir;
495  }
496
497  /**
498   * Cleans the test data directory on the test filesystem.
499   * @return True if we removed the test dirs
500   */
501  public boolean cleanupDataTestDirOnTestFS() throws IOException {
502    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
503    if (ret) {
504      dataTestDirOnTestFS = null;
505    }
506    return ret;
507  }
508
509  /**
510   * Cleans a subdirectory under the test data directory on the test filesystem.
511   * @return True if we removed child
512   */
513  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
514    Path cpath = getDataTestDirOnTestFS(subdirName);
515    return getTestFileSystem().delete(cpath, true);
516  }
517
518  /**
519   * Start a minidfscluster.
520   * @param servers How many DNs to start.
521   * @see #shutdownMiniDFSCluster()
522   * @return The mini dfs cluster created.
523   */
524  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
525    return startMiniDFSCluster(servers, null);
526  }
527
528  /**
529   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
530   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
531   * instances of the datanodes will have the same host name.
532   * @param hosts hostnames DNs to run on.
533   * @see #shutdownMiniDFSCluster()
534   * @return The mini dfs cluster created.
535   */
536  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
537    if (hosts != null && hosts.length != 0) {
538      return startMiniDFSCluster(hosts.length, hosts);
539    } else {
540      return startMiniDFSCluster(1, null);
541    }
542  }
543
544  /**
545   * Start a minidfscluster. Can only create one.
546   * @param servers How many DNs to start.
547   * @param hosts   hostnames DNs to run on.
548   * @see #shutdownMiniDFSCluster()
549   * @return The mini dfs cluster created.
550   */
551  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
552    return startMiniDFSCluster(servers, null, hosts);
553  }
554
555  private void setFs() throws IOException {
556    if (this.dfsCluster == null) {
557      LOG.info("Skipping setting fs because dfsCluster is null");
558      return;
559    }
560    FileSystem fs = this.dfsCluster.getFileSystem();
561    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
562
563    // re-enable this check with dfs
564    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
565  }
566
567  // Workaround to avoid IllegalThreadStateException
568  // See HBASE-27148 for more details
569  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
570
571    private volatile boolean stopped = false;
572
573    private final MiniDFSCluster cluster;
574
575    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
576      super("FsDatasetAsyncDiskServiceFixer");
577      setDaemon(true);
578      this.cluster = cluster;
579    }
580
581    @Override
582    public void run() {
583      while (!stopped) {
584        try {
585          Thread.sleep(30000);
586        } catch (InterruptedException e) {
587          Thread.currentThread().interrupt();
588          continue;
589        }
590        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
591        // timeout of the thread pool executor is 60 seconds by default.
592        try {
593          for (DataNode dn : cluster.getDataNodes()) {
594            FsDatasetSpi<?> dataset = dn.getFSDataset();
595            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
596            service.setAccessible(true);
597            Object asyncDiskService = service.get(dataset);
598            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
599            group.setAccessible(true);
600            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
601            if (threadGroup.isDaemon()) {
602              threadGroup.setDaemon(false);
603            }
604          }
605        } catch (NoSuchFieldException e) {
606          LOG.debug("NoSuchFieldException: " + e.getMessage()
607            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
608            + "See HBASE-27595 for details.");
609        } catch (Exception e) {
610          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
611        }
612      }
613    }
614
615    void shutdown() {
616      stopped = true;
617      interrupt();
618    }
619  }
620
621  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
622    throws Exception {
623    createDirsAndSetProperties();
624    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
625
626    this.dfsCluster =
627      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
628    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
629    this.dfsClusterFixer.start();
630    // Set this just-started cluster as our filesystem.
631    setFs();
632
633    // Wait for the cluster to be totally up
634    this.dfsCluster.waitClusterUp();
635
636    // reset the test directory for test file system
637    dataTestDirOnTestFS = null;
638    String dataTestDir = getDataTestDir().toString();
639    conf.set(HConstants.HBASE_DIR, dataTestDir);
640    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
641
642    return this.dfsCluster;
643  }
644
645  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
646    createDirsAndSetProperties();
647    dfsCluster =
648      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
649    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
650    this.dfsClusterFixer.start();
651    return dfsCluster;
652  }
653
654  /**
655   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
656   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
657   * the conf.
658   *
659   * <pre>
660   * Configuration conf = TEST_UTIL.getConfiguration();
661   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
662   *   Map.Entry&lt;String, String&gt; e = i.next();
663   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
664   * }
665   * </pre>
666   */
667  private void createDirsAndSetProperties() throws IOException {
668    setupClusterTestDir();
669    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
670    createDirAndSetProperty("test.cache.data");
671    createDirAndSetProperty("hadoop.tmp.dir");
672    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
673    createDirAndSetProperty("mapreduce.cluster.local.dir");
674    createDirAndSetProperty("mapreduce.cluster.temp.dir");
675    enableShortCircuit();
676
677    Path root = getDataTestDirOnTestFS("hadoop");
678    conf.set(MapreduceTestingShim.getMROutputDirProp(),
679      new Path(root, "mapred-output-dir").toString());
680    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
681    conf.set("mapreduce.jobtracker.staging.root.dir",
682      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
683    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
684    conf.set("yarn.app.mapreduce.am.staging-dir",
685      new Path(root, "mapreduce-am-staging-root-dir").toString());
686
687    // Frustrate yarn's and hdfs's attempts at writing /tmp.
688    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
689    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
690    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
691    createDirAndSetProperty("yarn.nodemanager.log-dirs");
692    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
693    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
694    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
695    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
696    createDirAndSetProperty("dfs.journalnode.edits.dir");
697    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
698    createDirAndSetProperty("nfs.dump.dir");
699    createDirAndSetProperty("java.io.tmpdir");
700    createDirAndSetProperty("dfs.journalnode.edits.dir");
701    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
702    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
703  }
704
705  /**
706   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
707   * Default to false.
708   */
709  public boolean isNewVersionBehaviorEnabled() {
710    final String propName = "hbase.tests.new.version.behavior";
711    String v = System.getProperty(propName);
712    if (v != null) {
713      return Boolean.parseBoolean(v);
714    }
715    return false;
716  }
717
718  /**
719   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
720   * allows to specify this parameter on the command line. If not set, default is true.
721   */
722  public boolean isReadShortCircuitOn() {
723    final String propName = "hbase.tests.use.shortcircuit.reads";
724    String readOnProp = System.getProperty(propName);
725    if (readOnProp != null) {
726      return Boolean.parseBoolean(readOnProp);
727    } else {
728      return conf.getBoolean(propName, false);
729    }
730  }
731
732  /**
733   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
734   * including skipping the hdfs checksum checks.
735   */
736  private void enableShortCircuit() {
737    if (isReadShortCircuitOn()) {
738      String curUser = System.getProperty("user.name");
739      LOG.info("read short circuit is ON for user " + curUser);
740      // read short circuit, for hdfs
741      conf.set("dfs.block.local-path-access.user", curUser);
742      // read short circuit, for hbase
743      conf.setBoolean("dfs.client.read.shortcircuit", true);
744      // Skip checking checksum, for the hdfs client and the datanode
745      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
746    } else {
747      LOG.info("read short circuit is OFF");
748    }
749  }
750
751  private String createDirAndSetProperty(final String property) {
752    return createDirAndSetProperty(property, property);
753  }
754
755  private String createDirAndSetProperty(final String relPath, String property) {
756    String path = getDataTestDir(relPath).toString();
757    System.setProperty(property, path);
758    conf.set(property, path);
759    new File(path).mkdirs();
760    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
761    return path;
762  }
763
764  /**
765   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
766   */
767  public void shutdownMiniDFSCluster() throws IOException {
768    if (this.dfsCluster != null) {
769      // The below throws an exception per dn, AsynchronousCloseException.
770      this.dfsCluster.shutdown();
771      dfsCluster = null;
772      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
773      // have a fixer
774      if (dfsClusterFixer != null) {
775        this.dfsClusterFixer.shutdown();
776        dfsClusterFixer = null;
777      }
778      dataTestDirOnTestFS = null;
779      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
780    }
781  }
782
783  /**
784   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
785   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
786   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
787   * @see #startMiniCluster(StartTestingClusterOption option)
788   * @see #shutdownMiniDFSCluster()
789   */
790  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
791    StartTestingClusterOption option = StartTestingClusterOption.builder()
792      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
793    return startMiniCluster(option);
794  }
795
796  /**
797   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
798   * value can be found in {@link StartTestingClusterOption.Builder}.
799   * @see #startMiniCluster(StartTestingClusterOption option)
800   * @see #shutdownMiniDFSCluster()
801   */
802  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
803    return startMiniCluster(StartTestingClusterOption.builder().build());
804  }
805
806  /**
807   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
808   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
809   * under System property test.build.data, to be cleaned up on exit.
810   * @see #shutdownMiniDFSCluster()
811   */
812  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
813    throws Exception {
814    LOG.info("Starting up minicluster with option: {}", option);
815
816    // If we already put up a cluster, fail.
817    if (miniClusterRunning) {
818      throw new IllegalStateException("A mini-cluster is already running");
819    }
820    miniClusterRunning = true;
821
822    setupClusterTestDir();
823
824    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
825    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
826    if (dfsCluster == null) {
827      LOG.info("STARTING DFS");
828      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
829    } else {
830      LOG.info("NOT STARTING DFS");
831    }
832
833    // Start up a zk cluster.
834    if (getZkCluster() == null) {
835      startMiniZKCluster(option.getNumZkServers());
836    }
837
838    // Start the MiniHBaseCluster
839    return startMiniHBaseCluster(option);
840  }
841
842  /**
843   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
844   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
845   * @return Reference to the hbase mini hbase cluster.
846   * @see #startMiniCluster(StartTestingClusterOption)
847   * @see #shutdownMiniHBaseCluster()
848   */
849  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
850    throws IOException, InterruptedException {
851    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
852    createRootDir(option.isCreateRootDir());
853    if (option.isCreateWALDir()) {
854      createWALRootDir();
855    }
856    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
857    // for tests that do not read hbase-defaults.xml
858    setHBaseFsTmpDir();
859
860    // These settings will make the server waits until this exact number of
861    // regions servers are connected.
862    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
863      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
864    }
865    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
866      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
867    }
868
869    Configuration c = new Configuration(this.conf);
870    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
871      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
872      option.getMasterClass(), option.getRsClass());
873    // Populate the master address configuration from mini cluster configuration.
874    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
875    // Don't leave here till we've done a successful scan of the hbase:meta
876    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
877      ResultScanner s = t.getScanner(new Scan())) {
878      for (;;) {
879        if (s.next() == null) {
880          break;
881        }
882      }
883    }
884
885    getAdmin(); // create immediately the hbaseAdmin
886    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
887
888    return (SingleProcessHBaseCluster) hbaseCluster;
889  }
890
891  /**
892   * Starts up mini hbase cluster using default options. Default options can be found in
893   * {@link StartTestingClusterOption.Builder}.
894   * @see #startMiniHBaseCluster(StartTestingClusterOption)
895   * @see #shutdownMiniHBaseCluster()
896   */
897  public SingleProcessHBaseCluster startMiniHBaseCluster()
898    throws IOException, InterruptedException {
899    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
900  }
901
902  /**
903   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
904   * {@link #startMiniCluster()}. All other options will use default values, defined in
905   * {@link StartTestingClusterOption.Builder}.
906   * @param numMasters       Master node number.
907   * @param numRegionServers Number of region servers.
908   * @return The mini HBase cluster created.
909   * @see #shutdownMiniHBaseCluster()
910   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
911   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
912   * @see #startMiniHBaseCluster(StartTestingClusterOption)
913   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
914   */
915  @Deprecated
916  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
917    throws IOException, InterruptedException {
918    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
919      .numRegionServers(numRegionServers).build();
920    return startMiniHBaseCluster(option);
921  }
922
923  /**
924   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
925   * {@link #startMiniCluster()}. All other options will use default values, defined in
926   * {@link StartTestingClusterOption.Builder}.
927   * @param numMasters       Master node number.
928   * @param numRegionServers Number of region servers.
929   * @param rsPorts          Ports that RegionServer should use.
930   * @return The mini HBase cluster created.
931   * @see #shutdownMiniHBaseCluster()
932   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
933   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
934   * @see #startMiniHBaseCluster(StartTestingClusterOption)
935   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
936   */
937  @Deprecated
938  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
939    List<Integer> rsPorts) throws IOException, InterruptedException {
940    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
941      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
942    return startMiniHBaseCluster(option);
943  }
944
945  /**
946   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
947   * {@link #startMiniCluster()}. All other options will use default values, defined in
948   * {@link StartTestingClusterOption.Builder}.
949   * @param numMasters       Master node number.
950   * @param numRegionServers Number of region servers.
951   * @param rsPorts          Ports that RegionServer should use.
952   * @param masterClass      The class to use as HMaster, or null for default.
953   * @param rsClass          The class to use as HRegionServer, or null for default.
954   * @param createRootDir    Whether to create a new root or data directory path.
955   * @param createWALDir     Whether to create a new WAL directory.
956   * @return The mini HBase cluster created.
957   * @see #shutdownMiniHBaseCluster()
958   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
959   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
960   * @see #startMiniHBaseCluster(StartTestingClusterOption)
961   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
962   */
963  @Deprecated
964  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
965    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
966    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
967    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
968    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
969      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
970      .createRootDir(createRootDir).createWALDir(createWALDir).build();
971    return startMiniHBaseCluster(option);
972  }
973
974  /**
975   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
976   * want to keep dfs/zk up and just stop/start hbase.
977   * @param servers number of region servers
978   */
979  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
980    this.restartHBaseCluster(servers, null);
981  }
982
983  public void restartHBaseCluster(int servers, List<Integer> ports)
984    throws IOException, InterruptedException {
985    StartTestingClusterOption option =
986      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
987    restartHBaseCluster(option);
988    invalidateConnection();
989  }
990
991  public void restartHBaseCluster(StartTestingClusterOption option)
992    throws IOException, InterruptedException {
993    closeConnection();
994    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
995      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
996      option.getMasterClass(), option.getRsClass());
997    // Don't leave here till we've done a successful scan of the hbase:meta
998    Connection conn = ConnectionFactory.createConnection(this.conf);
999    Table t = conn.getTable(TableName.META_TABLE_NAME);
1000    ResultScanner s = t.getScanner(new Scan());
1001    while (s.next() != null) {
1002      // do nothing
1003    }
1004    LOG.info("HBase has been restarted");
1005    s.close();
1006    t.close();
1007    conn.close();
1008  }
1009
1010  /**
1011   * Returns current mini hbase cluster. Only has something in it after a call to
1012   * {@link #startMiniCluster()}.
1013   * @see #startMiniCluster()
1014   */
1015  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1016    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1017      return (SingleProcessHBaseCluster) this.hbaseCluster;
1018    }
1019    throw new RuntimeException(
1020      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1021  }
1022
1023  /**
1024   * Stops mini hbase, zk, and hdfs clusters.
1025   * @see #startMiniCluster(int)
1026   */
1027  public void shutdownMiniCluster() throws IOException {
1028    LOG.info("Shutting down minicluster");
1029    shutdownMiniHBaseCluster();
1030    shutdownMiniDFSCluster();
1031    shutdownMiniZKCluster();
1032
1033    cleanupTestDir();
1034    miniClusterRunning = false;
1035    LOG.info("Minicluster is down");
1036  }
1037
1038  /**
1039   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1040   * @throws java.io.IOException in case command is unsuccessful
1041   */
1042  public void shutdownMiniHBaseCluster() throws IOException {
1043    cleanup();
1044    if (this.hbaseCluster != null) {
1045      this.hbaseCluster.shutdown();
1046      // Wait till hbase is down before going on to shutdown zk.
1047      this.hbaseCluster.waitUntilShutDown();
1048      this.hbaseCluster = null;
1049    }
1050    if (zooKeeperWatcher != null) {
1051      zooKeeperWatcher.close();
1052      zooKeeperWatcher = null;
1053    }
1054  }
1055
1056  /**
1057   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1058   * @throws java.io.IOException throws in case command is unsuccessful
1059   */
1060  public void killMiniHBaseCluster() throws IOException {
1061    cleanup();
1062    if (this.hbaseCluster != null) {
1063      getMiniHBaseCluster().killAll();
1064      this.hbaseCluster = null;
1065    }
1066    if (zooKeeperWatcher != null) {
1067      zooKeeperWatcher.close();
1068      zooKeeperWatcher = null;
1069    }
1070  }
1071
1072  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1073  private void cleanup() throws IOException {
1074    closeConnection();
1075    // unset the configuration for MIN and MAX RS to start
1076    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1077    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1078  }
1079
1080  /**
1081   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1082   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1083   * If false, previous path is used. Note: this does not cause the root dir to be created.
1084   * @return Fully qualified path for the default hbase root dir
1085   */
1086  public Path getDefaultRootDirPath(boolean create) throws IOException {
1087    if (!create) {
1088      return getDataTestDirOnTestFS();
1089    } else {
1090      return getNewDataTestDirOnTestFS();
1091    }
1092  }
1093
1094  /**
1095   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1096   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1097   * @return Fully qualified path for the default hbase root dir
1098   */
1099  public Path getDefaultRootDirPath() throws IOException {
1100    return getDefaultRootDirPath(false);
1101  }
1102
1103  /**
1104   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1105   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1106   * startup. You'd only use this method if you were doing manual operation.
1107   * @param create This flag decides whether to get a new root or data directory path or not, if it
1108   *               has been fetched already. Note : Directory will be made irrespective of whether
1109   *               path has been fetched or not. If directory already exists, it will be overwritten
1110   * @return Fully qualified path to hbase root dir
1111   */
1112  public Path createRootDir(boolean create) throws IOException {
1113    FileSystem fs = FileSystem.get(this.conf);
1114    Path hbaseRootdir = getDefaultRootDirPath(create);
1115    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1116    fs.mkdirs(hbaseRootdir);
1117    FSUtils.setVersion(fs, hbaseRootdir);
1118    return hbaseRootdir;
1119  }
1120
1121  /**
1122   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1123   * flag is false.
1124   * @return Fully qualified path to hbase root dir
1125   */
1126  public Path createRootDir() throws IOException {
1127    return createRootDir(false);
1128  }
1129
1130  /**
1131   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1132   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1133   * this method if you were doing manual operation.
1134   * @return Fully qualified path to hbase root dir
1135   */
1136  public Path createWALRootDir() throws IOException {
1137    FileSystem fs = FileSystem.get(this.conf);
1138    Path walDir = getNewDataTestDirOnTestFS();
1139    CommonFSUtils.setWALRootDir(this.conf, walDir);
1140    fs.mkdirs(walDir);
1141    return walDir;
1142  }
1143
1144  private void setHBaseFsTmpDir() throws IOException {
1145    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1146    if (hbaseFsTmpDirInString == null) {
1147      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1148      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1149    } else {
1150      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1151    }
1152  }
1153
1154  /**
1155   * Flushes all caches in the mini hbase cluster
1156   */
1157  public void flush() throws IOException {
1158    getMiniHBaseCluster().flushcache();
1159  }
1160
1161  /**
1162   * Flushes all caches in the mini hbase cluster
1163   */
1164  public void flush(TableName tableName) throws IOException {
1165    getMiniHBaseCluster().flushcache(tableName);
1166  }
1167
1168  /**
1169   * Compact all regions in the mini hbase cluster
1170   */
1171  public void compact(boolean major) throws IOException {
1172    getMiniHBaseCluster().compact(major);
1173  }
1174
1175  /**
1176   * Compact all of a table's reagion in the mini hbase cluster
1177   */
1178  public void compact(TableName tableName, boolean major) throws IOException {
1179    getMiniHBaseCluster().compact(tableName, major);
1180  }
1181
1182  /**
1183   * Create a table.
1184   * @return A Table instance for the created table.
1185   */
1186  public Table createTable(TableName tableName, String family) throws IOException {
1187    return createTable(tableName, new String[] { family });
1188  }
1189
1190  /**
1191   * Create a table.
1192   * @return A Table instance for the created table.
1193   */
1194  public Table createTable(TableName tableName, String[] families) throws IOException {
1195    List<byte[]> fams = new ArrayList<>(families.length);
1196    for (String family : families) {
1197      fams.add(Bytes.toBytes(family));
1198    }
1199    return createTable(tableName, fams.toArray(new byte[0][]));
1200  }
1201
1202  /**
1203   * Create a table.
1204   * @return A Table instance for the created table.
1205   */
1206  public Table createTable(TableName tableName, byte[] family) throws IOException {
1207    return createTable(tableName, new byte[][] { family });
1208  }
1209
1210  /**
1211   * Create a table with multiple regions.
1212   * @return A Table instance for the created table.
1213   */
1214  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1215    throws IOException {
1216    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1217    byte[] startKey = Bytes.toBytes("aaaaa");
1218    byte[] endKey = Bytes.toBytes("zzzzz");
1219    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1220
1221    return createTable(tableName, new byte[][] { family }, splitKeys);
1222  }
1223
1224  /**
1225   * Create a table.
1226   * @return A Table instance for the created table.
1227   */
1228  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1229    return createTable(tableName, families, (byte[][]) null);
1230  }
1231
1232  /**
1233   * Create a table with multiple regions.
1234   * @return A Table instance for the created table.
1235   */
1236  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1237    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1238  }
1239
1240  /**
1241   * Create a table with multiple regions.
1242   * @param replicaCount replica count.
1243   * @return A Table instance for the created table.
1244   */
1245  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1246    throws IOException {
1247    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1248  }
1249
1250  /**
1251   * Create a table.
1252   * @return A Table instance for the created table.
1253   */
1254  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1255    throws IOException {
1256    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1257  }
1258
1259  /**
1260   * Create a table.
1261   * @param tableName    the table name
1262   * @param families     the families
1263   * @param splitKeys    the splitkeys
1264   * @param replicaCount the region replica count
1265   * @return A Table instance for the created table.
1266   * @throws IOException throws IOException
1267   */
1268  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1269    int replicaCount) throws IOException {
1270    return createTable(tableName, families, splitKeys, replicaCount,
1271      new Configuration(getConfiguration()));
1272  }
1273
1274  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1275    byte[] endKey, int numRegions) throws IOException {
1276    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1277
1278    getAdmin().createTable(desc, startKey, endKey, numRegions);
1279    // HBaseAdmin only waits for regions to appear in hbase:meta we
1280    // should wait until they are assigned
1281    waitUntilAllRegionsAssigned(tableName);
1282    return getConnection().getTable(tableName);
1283  }
1284
1285  /**
1286   * Create a table.
1287   * @param c Configuration to use
1288   * @return A Table instance for the created table.
1289   */
1290  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1291    throws IOException {
1292    return createTable(htd, families, null, c);
1293  }
1294
1295  /**
1296   * Create a table.
1297   * @param htd       table descriptor
1298   * @param families  array of column families
1299   * @param splitKeys array of split keys
1300   * @param c         Configuration to use
1301   * @return A Table instance for the created table.
1302   * @throws IOException if getAdmin or createTable fails
1303   */
1304  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1305    Configuration c) throws IOException {
1306    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1307    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1308    // on is interfering.
1309    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1310  }
1311
1312  /**
1313   * Create a table.
1314   * @param htd       table descriptor
1315   * @param families  array of column families
1316   * @param splitKeys array of split keys
1317   * @param type      Bloom type
1318   * @param blockSize block size
1319   * @param c         Configuration to use
1320   * @return A Table instance for the created table.
1321   * @throws IOException if getAdmin or createTable fails
1322   */
1323
1324  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1325    BloomType type, int blockSize, Configuration c) throws IOException {
1326    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1327    for (byte[] family : families) {
1328      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1329        .setBloomFilterType(type).setBlocksize(blockSize);
1330      if (isNewVersionBehaviorEnabled()) {
1331        cfdb.setNewVersionBehavior(true);
1332      }
1333      builder.setColumnFamily(cfdb.build());
1334    }
1335    TableDescriptor td = builder.build();
1336    if (splitKeys != null) {
1337      getAdmin().createTable(td, splitKeys);
1338    } else {
1339      getAdmin().createTable(td);
1340    }
1341    // HBaseAdmin only waits for regions to appear in hbase:meta
1342    // we should wait until they are assigned
1343    waitUntilAllRegionsAssigned(td.getTableName());
1344    return getConnection().getTable(td.getTableName());
1345  }
1346
1347  /**
1348   * Create a table.
1349   * @param htd       table descriptor
1350   * @param splitRows array of split keys
1351   * @return A Table instance for the created table.
1352   */
1353  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1354    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1355    if (isNewVersionBehaviorEnabled()) {
1356      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1357        builder.setColumnFamily(
1358          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1359      }
1360    }
1361    if (splitRows != null) {
1362      getAdmin().createTable(builder.build(), splitRows);
1363    } else {
1364      getAdmin().createTable(builder.build());
1365    }
1366    // HBaseAdmin only waits for regions to appear in hbase:meta
1367    // we should wait until they are assigned
1368    waitUntilAllRegionsAssigned(htd.getTableName());
1369    return getConnection().getTable(htd.getTableName());
1370  }
1371
1372  /**
1373   * Create a table.
1374   * @param tableName    the table name
1375   * @param families     the families
1376   * @param splitKeys    the split keys
1377   * @param replicaCount the replica count
1378   * @param c            Configuration to use
1379   * @return A Table instance for the created table.
1380   */
1381  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1382    int replicaCount, final Configuration c) throws IOException {
1383    TableDescriptor htd =
1384      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1385    return createTable(htd, families, splitKeys, c);
1386  }
1387
1388  /**
1389   * Create a table.
1390   * @return A Table instance for the created table.
1391   */
1392  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1393    return createTable(tableName, new byte[][] { family }, numVersions);
1394  }
1395
1396  /**
1397   * Create a table.
1398   * @return A Table instance for the created table.
1399   */
1400  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1401    throws IOException {
1402    return createTable(tableName, families, numVersions, (byte[][]) null);
1403  }
1404
1405  /**
1406   * Create a table.
1407   * @return A Table instance for the created table.
1408   */
1409  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1410    byte[][] splitKeys) throws IOException {
1411    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1412    for (byte[] family : families) {
1413      ColumnFamilyDescriptorBuilder cfBuilder =
1414        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1415      if (isNewVersionBehaviorEnabled()) {
1416        cfBuilder.setNewVersionBehavior(true);
1417      }
1418      builder.setColumnFamily(cfBuilder.build());
1419    }
1420    if (splitKeys != null) {
1421      getAdmin().createTable(builder.build(), splitKeys);
1422    } else {
1423      getAdmin().createTable(builder.build());
1424    }
1425    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1426    // assigned
1427    waitUntilAllRegionsAssigned(tableName);
1428    return getConnection().getTable(tableName);
1429  }
1430
1431  /**
1432   * Create a table with multiple regions.
1433   * @return A Table instance for the created table.
1434   */
1435  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1436    throws IOException {
1437    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1438  }
1439
1440  /**
1441   * Create a table.
1442   * @return A Table instance for the created table.
1443   */
1444  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1445    throws IOException {
1446    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1447    for (byte[] family : families) {
1448      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1449        .setMaxVersions(numVersions).setBlocksize(blockSize);
1450      if (isNewVersionBehaviorEnabled()) {
1451        cfBuilder.setNewVersionBehavior(true);
1452      }
1453      builder.setColumnFamily(cfBuilder.build());
1454    }
1455    getAdmin().createTable(builder.build());
1456    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1457    // assigned
1458    waitUntilAllRegionsAssigned(tableName);
1459    return getConnection().getTable(tableName);
1460  }
1461
1462  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1463    String cpName) throws IOException {
1464    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1465    for (byte[] family : families) {
1466      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1467        .setMaxVersions(numVersions).setBlocksize(blockSize);
1468      if (isNewVersionBehaviorEnabled()) {
1469        cfBuilder.setNewVersionBehavior(true);
1470      }
1471      builder.setColumnFamily(cfBuilder.build());
1472    }
1473    if (cpName != null) {
1474      builder.setCoprocessor(cpName);
1475    }
1476    getAdmin().createTable(builder.build());
1477    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1478    // assigned
1479    waitUntilAllRegionsAssigned(tableName);
1480    return getConnection().getTable(tableName);
1481  }
1482
1483  /**
1484   * Create a table.
1485   * @return A Table instance for the created table.
1486   */
1487  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1488    throws IOException {
1489    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1490    int i = 0;
1491    for (byte[] family : families) {
1492      ColumnFamilyDescriptorBuilder cfBuilder =
1493        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1494      if (isNewVersionBehaviorEnabled()) {
1495        cfBuilder.setNewVersionBehavior(true);
1496      }
1497      builder.setColumnFamily(cfBuilder.build());
1498      i++;
1499    }
1500    getAdmin().createTable(builder.build());
1501    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1502    // assigned
1503    waitUntilAllRegionsAssigned(tableName);
1504    return getConnection().getTable(tableName);
1505  }
1506
1507  /**
1508   * Create a table.
1509   * @return A Table instance for the created table.
1510   */
1511  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1512    throws IOException {
1513    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1514    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1515    if (isNewVersionBehaviorEnabled()) {
1516      cfBuilder.setNewVersionBehavior(true);
1517    }
1518    builder.setColumnFamily(cfBuilder.build());
1519    getAdmin().createTable(builder.build(), splitRows);
1520    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1521    // assigned
1522    waitUntilAllRegionsAssigned(tableName);
1523    return getConnection().getTable(tableName);
1524  }
1525
1526  /**
1527   * Create a table with multiple regions.
1528   * @return A Table instance for the created table.
1529   */
1530  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1531    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1532  }
1533
1534  /**
1535   * Set the number of Region replicas.
1536   */
1537  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1538    throws IOException, InterruptedException {
1539    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1540      .setRegionReplication(replicaCount).build();
1541    admin.modifyTable(desc);
1542  }
1543
1544  /**
1545   * Set the number of Region replicas.
1546   */
1547  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1548    throws ExecutionException, IOException, InterruptedException {
1549    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1550      .setRegionReplication(replicaCount).build();
1551    admin.modifyTable(desc).get();
1552  }
1553
1554  /**
1555   * Drop an existing table
1556   * @param tableName existing table
1557   */
1558  public void deleteTable(TableName tableName) throws IOException {
1559    try {
1560      getAdmin().disableTable(tableName);
1561    } catch (TableNotEnabledException e) {
1562      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1563    }
1564    getAdmin().deleteTable(tableName);
1565  }
1566
1567  /**
1568   * Drop an existing table
1569   * @param tableName existing table
1570   */
1571  public void deleteTableIfAny(TableName tableName) throws IOException {
1572    try {
1573      deleteTable(tableName);
1574    } catch (TableNotFoundException e) {
1575      // ignore
1576    }
1577  }
1578
1579  // ==========================================================================
1580  // Canned table and table descriptor creation
1581
1582  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1583  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1584  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1585  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1586  private static final int MAXVERSIONS = 3;
1587
1588  public static final char FIRST_CHAR = 'a';
1589  public static final char LAST_CHAR = 'z';
1590  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1591  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1592
1593  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1594    return createModifyableTableDescriptor(TableName.valueOf(name),
1595      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1596      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1597  }
1598
1599  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1600    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1601    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1602    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1603      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1604        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1605        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1606      if (isNewVersionBehaviorEnabled()) {
1607        cfBuilder.setNewVersionBehavior(true);
1608      }
1609      builder.setColumnFamily(cfBuilder.build());
1610    }
1611    return builder.build();
1612  }
1613
1614  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1615    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1616    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1617    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1618      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1619        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1620        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1621      if (isNewVersionBehaviorEnabled()) {
1622        cfBuilder.setNewVersionBehavior(true);
1623      }
1624      builder.setColumnFamily(cfBuilder.build());
1625    }
1626    return builder;
1627  }
1628
1629  /**
1630   * Create a table of name <code>name</code>.
1631   * @param name Name to give table.
1632   * @return Column descriptor.
1633   */
1634  public TableDescriptor createTableDescriptor(final TableName name) {
1635    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1636      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1637  }
1638
1639  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1640    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1641  }
1642
1643  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1644    int maxVersions) {
1645    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1646    for (byte[] family : families) {
1647      ColumnFamilyDescriptorBuilder cfBuilder =
1648        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1649      if (isNewVersionBehaviorEnabled()) {
1650        cfBuilder.setNewVersionBehavior(true);
1651      }
1652      builder.setColumnFamily(cfBuilder.build());
1653    }
1654    return builder.build();
1655  }
1656
1657  /**
1658   * Create an HRegion that writes to the local tmp dirs
1659   * @param desc     a table descriptor indicating which table the region belongs to
1660   * @param startKey the start boundary of the region
1661   * @param endKey   the end boundary of the region
1662   * @return a region that writes to local dir for testing
1663   */
1664  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1665    throws IOException {
1666    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1667      .setEndKey(endKey).build();
1668    return createLocalHRegion(hri, desc);
1669  }
1670
1671  /**
1672   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1673   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1674   */
1675  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1676    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1677  }
1678
1679  /**
1680   * Create an HRegion that writes to the local tmp dirs with specified wal
1681   * @param info regioninfo
1682   * @param conf configuration
1683   * @param desc table descriptor
1684   * @param wal  wal for this region.
1685   * @return created hregion
1686   */
1687  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1688    WAL wal) throws IOException {
1689    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1690      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1691    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1692  }
1693
1694  /**
1695   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1696   *         when done.
1697   */
1698  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1699    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1700    throws IOException {
1701    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1702      durability, wal, null, families);
1703  }
1704
1705  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1706    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1707    boolean[] compactedMemStore, byte[]... families) throws IOException {
1708    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1709    builder.setReadOnly(isReadOnly);
1710    int i = 0;
1711    for (byte[] family : families) {
1712      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1713      if (compactedMemStore != null && i < compactedMemStore.length) {
1714        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1715      } else {
1716        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1717
1718      }
1719      i++;
1720      // Set default to be three versions.
1721      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1722      builder.setColumnFamily(cfBuilder.build());
1723    }
1724    builder.setDurability(durability);
1725    RegionInfo info =
1726      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1727    return createLocalHRegion(info, conf, builder.build(), wal);
1728  }
1729
1730  //
1731  // ==========================================================================
1732
1733  /**
1734   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1735   * read.
1736   * @param tableName existing table
1737   * @return HTable to that new table
1738   */
1739  public Table deleteTableData(TableName tableName) throws IOException {
1740    Table table = getConnection().getTable(tableName);
1741    Scan scan = new Scan();
1742    ResultScanner resScan = table.getScanner(scan);
1743    for (Result res : resScan) {
1744      Delete del = new Delete(res.getRow());
1745      table.delete(del);
1746    }
1747    resScan = table.getScanner(scan);
1748    resScan.close();
1749    return table;
1750  }
1751
1752  /**
1753   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1754   * table.
1755   * @param tableName       table which must exist.
1756   * @param preserveRegions keep the existing split points
1757   * @return HTable for the new table
1758   */
1759  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1760    throws IOException {
1761    Admin admin = getAdmin();
1762    if (!admin.isTableDisabled(tableName)) {
1763      admin.disableTable(tableName);
1764    }
1765    admin.truncateTable(tableName, preserveRegions);
1766    return getConnection().getTable(tableName);
1767  }
1768
1769  /**
1770   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1771   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1772   * preserve regions of existing table.
1773   * @param tableName table which must exist.
1774   * @return HTable for the new table
1775   */
1776  public Table truncateTable(final TableName tableName) throws IOException {
1777    return truncateTable(tableName, false);
1778  }
1779
1780  /**
1781   * Load table with rows from 'aaa' to 'zzz'.
1782   * @param t Table
1783   * @param f Family
1784   * @return Count of rows loaded.
1785   */
1786  public int loadTable(final Table t, final byte[] f) throws IOException {
1787    return loadTable(t, new byte[][] { f });
1788  }
1789
1790  /**
1791   * Load table with rows from 'aaa' to 'zzz'.
1792   * @param t Table
1793   * @param f Family
1794   * @return Count of rows loaded.
1795   */
1796  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1797    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1798  }
1799
1800  /**
1801   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1802   * @param t Table
1803   * @param f Array of Families to load
1804   * @return Count of rows loaded.
1805   */
1806  public int loadTable(final Table t, final byte[][] f) throws IOException {
1807    return loadTable(t, f, null);
1808  }
1809
1810  /**
1811   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1812   * @param t     Table
1813   * @param f     Array of Families to load
1814   * @param value the values of the cells. If null is passed, the row key is used as value
1815   * @return Count of rows loaded.
1816   */
1817  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1818    return loadTable(t, f, value, true);
1819  }
1820
1821  /**
1822   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1823   * @param t     Table
1824   * @param f     Array of Families to load
1825   * @param value the values of the cells. If null is passed, the row key is used as value
1826   * @return Count of rows loaded.
1827   */
1828  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1829    throws IOException {
1830    List<Put> puts = new ArrayList<>();
1831    for (byte[] row : HBaseTestingUtil.ROWS) {
1832      Put put = new Put(row);
1833      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1834      for (int i = 0; i < f.length; i++) {
1835        byte[] value1 = value != null ? value : row;
1836        put.addColumn(f[i], f[i], value1);
1837      }
1838      puts.add(put);
1839    }
1840    t.put(puts);
1841    return puts.size();
1842  }
1843
1844  /**
1845   * A tracker for tracking and validating table rows generated with
1846   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1847   */
1848  public static class SeenRowTracker {
1849    int dim = 'z' - 'a' + 1;
1850    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1851    byte[] startRow;
1852    byte[] stopRow;
1853
1854    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1855      this.startRow = startRow;
1856      this.stopRow = stopRow;
1857    }
1858
1859    void reset() {
1860      for (byte[] row : ROWS) {
1861        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1862      }
1863    }
1864
1865    int i(byte b) {
1866      return b - 'a';
1867    }
1868
1869    public void addRow(byte[] row) {
1870      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1871    }
1872
1873    /**
1874     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1875     * rows none
1876     */
1877    public void validate() {
1878      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1879        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1880          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1881            int count = seenRows[i(b1)][i(b2)][i(b3)];
1882            int expectedCount = 0;
1883            if (
1884              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1885                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1886            ) {
1887              expectedCount = 1;
1888            }
1889            if (count != expectedCount) {
1890              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1891              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1892                + "instead of " + expectedCount);
1893            }
1894          }
1895        }
1896      }
1897    }
1898  }
1899
1900  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1901    return loadRegion(r, f, false);
1902  }
1903
1904  public int loadRegion(final Region r, final byte[] f) throws IOException {
1905    return loadRegion((HRegion) r, f);
1906  }
1907
1908  /**
1909   * Load region with rows from 'aaa' to 'zzz'.
1910   * @param r     Region
1911   * @param f     Family
1912   * @param flush flush the cache if true
1913   * @return Count of rows loaded.
1914   */
1915  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1916    byte[] k = new byte[3];
1917    int rowCount = 0;
1918    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1919      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1920        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1921          k[0] = b1;
1922          k[1] = b2;
1923          k[2] = b3;
1924          Put put = new Put(k);
1925          put.setDurability(Durability.SKIP_WAL);
1926          put.addColumn(f, null, k);
1927          if (r.getWAL() == null) {
1928            put.setDurability(Durability.SKIP_WAL);
1929          }
1930          int preRowCount = rowCount;
1931          int pause = 10;
1932          int maxPause = 1000;
1933          while (rowCount == preRowCount) {
1934            try {
1935              r.put(put);
1936              rowCount++;
1937            } catch (RegionTooBusyException e) {
1938              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1939              Threads.sleep(pause);
1940            }
1941          }
1942        }
1943      }
1944      if (flush) {
1945        r.flush(true);
1946      }
1947    }
1948    return rowCount;
1949  }
1950
1951  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1952    throws IOException {
1953    for (int i = startRow; i < endRow; i++) {
1954      byte[] data = Bytes.toBytes(String.valueOf(i));
1955      Put put = new Put(data);
1956      put.addColumn(f, null, data);
1957      t.put(put);
1958    }
1959  }
1960
1961  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1962    throws IOException {
1963    for (int i = 0; i < totalRows; i++) {
1964      byte[] row = new byte[rowSize];
1965      Bytes.random(row);
1966      Put put = new Put(row);
1967      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1968      t.put(put);
1969    }
1970  }
1971
1972  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
1973    int replicaId) throws IOException {
1974    for (int i = startRow; i < endRow; i++) {
1975      String failMsg = "Failed verification of row :" + i;
1976      byte[] data = Bytes.toBytes(String.valueOf(i));
1977      Get get = new Get(data);
1978      get.setReplicaId(replicaId);
1979      get.setConsistency(Consistency.TIMELINE);
1980      Result result = table.get(get);
1981      assertTrue(failMsg, result.containsColumn(f, null));
1982      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
1983      Cell cell = result.getColumnLatestCell(f, null);
1984      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
1985        cell.getValueOffset(), cell.getValueLength()));
1986    }
1987  }
1988
1989  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
1990    throws IOException {
1991    verifyNumericRows((HRegion) region, f, startRow, endRow);
1992  }
1993
1994  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
1995    throws IOException {
1996    verifyNumericRows(region, f, startRow, endRow, true);
1997  }
1998
1999  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2000    final boolean present) throws IOException {
2001    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2002  }
2003
2004  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2005    final boolean present) throws IOException {
2006    for (int i = startRow; i < endRow; i++) {
2007      String failMsg = "Failed verification of row :" + i;
2008      byte[] data = Bytes.toBytes(String.valueOf(i));
2009      Result result = region.get(new Get(data));
2010
2011      boolean hasResult = result != null && !result.isEmpty();
2012      assertEquals(failMsg + result, present, hasResult);
2013      if (!present) continue;
2014
2015      assertTrue(failMsg, result.containsColumn(f, null));
2016      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2017      Cell cell = result.getColumnLatestCell(f, null);
2018      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2019        cell.getValueOffset(), cell.getValueLength()));
2020    }
2021  }
2022
2023  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2024    throws IOException {
2025    for (int i = startRow; i < endRow; i++) {
2026      byte[] data = Bytes.toBytes(String.valueOf(i));
2027      Delete delete = new Delete(data);
2028      delete.addFamily(f);
2029      t.delete(delete);
2030    }
2031  }
2032
2033  /**
2034   * Return the number of rows in the given table.
2035   * @param table to count rows
2036   * @return count of rows
2037   */
2038  public static int countRows(final Table table) throws IOException {
2039    return countRows(table, new Scan());
2040  }
2041
2042  public static int countRows(final Table table, final Scan scan) throws IOException {
2043    try (ResultScanner results = table.getScanner(scan)) {
2044      int count = 0;
2045      while (results.next() != null) {
2046        count++;
2047      }
2048      return count;
2049    }
2050  }
2051
2052  public static int countRows(final Table table, final byte[]... families) throws IOException {
2053    Scan scan = new Scan();
2054    for (byte[] family : families) {
2055      scan.addFamily(family);
2056    }
2057    return countRows(table, scan);
2058  }
2059
2060  /**
2061   * Return the number of rows in the given table.
2062   */
2063  public int countRows(final TableName tableName) throws IOException {
2064    try (Table table = getConnection().getTable(tableName)) {
2065      return countRows(table);
2066    }
2067  }
2068
2069  public static int countRows(final Region region) throws IOException {
2070    return countRows(region, new Scan());
2071  }
2072
2073  public static int countRows(final Region region, final Scan scan) throws IOException {
2074    try (InternalScanner scanner = region.getScanner(scan)) {
2075      return countRows(scanner);
2076    }
2077  }
2078
2079  public static int countRows(final InternalScanner scanner) throws IOException {
2080    int scannedCount = 0;
2081    List<Cell> results = new ArrayList<>();
2082    boolean hasMore = true;
2083    while (hasMore) {
2084      hasMore = scanner.next(results);
2085      scannedCount += results.size();
2086      results.clear();
2087    }
2088    return scannedCount;
2089  }
2090
2091  /**
2092   * Return an md5 digest of the entire contents of a table.
2093   */
2094  public String checksumRows(final Table table) throws Exception {
2095    MessageDigest digest = MessageDigest.getInstance("MD5");
2096    try (ResultScanner results = table.getScanner(new Scan())) {
2097      for (Result res : results) {
2098        digest.update(res.getRow());
2099      }
2100    }
2101    return digest.toString();
2102  }
2103
2104  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2105  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2106  static {
2107    int i = 0;
2108    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2109      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2110        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2111          ROWS[i][0] = b1;
2112          ROWS[i][1] = b2;
2113          ROWS[i][2] = b3;
2114          i++;
2115        }
2116      }
2117    }
2118  }
2119
2120  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2121    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2122    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2123    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2124    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2125    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2126    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2127
2128  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2129    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2130    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2131    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2132    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2133    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2134    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2135
2136  /**
2137   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2138   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2139   * @return list of region info for regions added to meta
2140   */
2141  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2142    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2143    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2144      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2145      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2146      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2147        TableState.State.ENABLED);
2148      // add custom ones
2149      for (int i = 0; i < startKeys.length; i++) {
2150        int j = (i + 1) % startKeys.length;
2151        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2152          .setEndKey(startKeys[j]).build();
2153        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2154        newRegions.add(hri);
2155      }
2156      return newRegions;
2157    }
2158  }
2159
2160  /**
2161   * Create an unmanaged WAL. Be sure to close it when you're through.
2162   */
2163  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2164    throws IOException {
2165    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2166    // unless I pass along via the conf.
2167    Configuration confForWAL = new Configuration(conf);
2168    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2169    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2170  }
2171
2172  /**
2173   * Create a region with it's own WAL. Be sure to call
2174   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2175   */
2176  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2177    final Configuration conf, final TableDescriptor htd) throws IOException {
2178    return createRegionAndWAL(info, rootDir, conf, htd, true);
2179  }
2180
2181  /**
2182   * Create a region with it's own WAL. Be sure to call
2183   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2184   */
2185  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2186    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2187    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2188    region.setBlockCache(blockCache);
2189    region.initialize();
2190    return region;
2191  }
2192
2193  /**
2194   * Create a region with it's own WAL. Be sure to call
2195   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2196   */
2197  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2198    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2199    throws IOException {
2200    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2201    region.setMobFileCache(mobFileCache);
2202    region.initialize();
2203    return region;
2204  }
2205
2206  /**
2207   * Create a region with it's own WAL. Be sure to call
2208   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2209   */
2210  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2211    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2212    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2213      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2214    WAL wal = createWal(conf, rootDir, info);
2215    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2216  }
2217
2218  /**
2219   * Find any other region server which is different from the one identified by parameter
2220   * @return another region server
2221   */
2222  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2223    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2224      if (!(rst.getRegionServer() == rs)) {
2225        return rst.getRegionServer();
2226      }
2227    }
2228    return null;
2229  }
2230
2231  /**
2232   * Tool to get the reference to the region server object that holds the region of the specified
2233   * user table.
2234   * @param tableName user table to lookup in hbase:meta
2235   * @return region server that holds it, null if the row doesn't exist
2236   */
2237  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2238    throws IOException, InterruptedException {
2239    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2240    if (regions == null || regions.isEmpty()) {
2241      return null;
2242    }
2243    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2244
2245    byte[] firstRegionName =
2246      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2247        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2248
2249    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2250    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2251      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2252    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2253      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2254    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2255    while (retrier.shouldRetry()) {
2256      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2257      if (index != -1) {
2258        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2259      }
2260      // Came back -1. Region may not be online yet. Sleep a while.
2261      retrier.sleepUntilNextRetry();
2262    }
2263    return null;
2264  }
2265
2266  /**
2267   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2268   * @throws IOException When starting the cluster fails.
2269   */
2270  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2271    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2272    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2273      "99.0");
2274    startMiniMapReduceCluster(2);
2275    return mrCluster;
2276  }
2277
2278  /**
2279   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2280   * internal static LOG_DIR variable.
2281   */
2282  private void forceChangeTaskLogDir() {
2283    Field logDirField;
2284    try {
2285      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2286      logDirField.setAccessible(true);
2287
2288      Field modifiersField = ReflectionUtils.getModifiersField();
2289      modifiersField.setAccessible(true);
2290      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2291
2292      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2293    } catch (SecurityException e) {
2294      throw new RuntimeException(e);
2295    } catch (NoSuchFieldException e) {
2296      throw new RuntimeException(e);
2297    } catch (IllegalArgumentException e) {
2298      throw new RuntimeException(e);
2299    } catch (IllegalAccessException e) {
2300      throw new RuntimeException(e);
2301    }
2302  }
2303
2304  /**
2305   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2306   * filesystem.
2307   * @param servers The number of <code>TaskTracker</code>'s to start.
2308   * @throws IOException When starting the cluster fails.
2309   */
2310  private void startMiniMapReduceCluster(final int servers) throws IOException {
2311    if (mrCluster != null) {
2312      throw new IllegalStateException("MiniMRCluster is already running");
2313    }
2314    LOG.info("Starting mini mapreduce cluster...");
2315    setupClusterTestDir();
2316    createDirsAndSetProperties();
2317
2318    forceChangeTaskLogDir();
2319
2320    //// hadoop2 specific settings
2321    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2322    // we up the VM usable so that processes don't get killed.
2323    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2324
2325    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2326    // this avoids the problem by disabling speculative task execution in tests.
2327    conf.setBoolean("mapreduce.map.speculative", false);
2328    conf.setBoolean("mapreduce.reduce.speculative", false);
2329    ////
2330
2331    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2332    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2333    if (JVM.getJVMSpecVersion() >= 17) {
2334      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2335      conf.set("yarn.app.mapreduce.am.command-opts",
2336        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2337    }
2338
2339    // Allow the user to override FS URI for this map-reduce cluster to use.
2340    mrCluster =
2341      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2342        1, null, null, new JobConf(this.conf));
2343    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2344    if (jobConf == null) {
2345      jobConf = mrCluster.createJobConf();
2346    }
2347
2348    // Hadoop MiniMR overwrites this while it should not
2349    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2350    LOG.info("Mini mapreduce cluster started");
2351
2352    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2353    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2354    // necessary config properties here. YARN-129 required adding a few properties.
2355    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2356    // this for mrv2 support; mr1 ignores this
2357    conf.set("mapreduce.framework.name", "yarn");
2358    conf.setBoolean("yarn.is.minicluster", true);
2359    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2360    if (rmAddress != null) {
2361      conf.set("yarn.resourcemanager.address", rmAddress);
2362    }
2363    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2364    if (historyAddress != null) {
2365      conf.set("mapreduce.jobhistory.address", historyAddress);
2366    }
2367    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2368    if (schedulerAddress != null) {
2369      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2370    }
2371    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2372    if (mrJobHistoryWebappAddress != null) {
2373      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2374    }
2375    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2376    if (yarnRMWebappAddress != null) {
2377      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2378    }
2379  }
2380
2381  /**
2382   * Stops the previously started <code>MiniMRCluster</code>.
2383   */
2384  public void shutdownMiniMapReduceCluster() {
2385    if (mrCluster != null) {
2386      LOG.info("Stopping mini mapreduce cluster...");
2387      mrCluster.shutdown();
2388      mrCluster = null;
2389      LOG.info("Mini mapreduce cluster stopped");
2390    }
2391    // Restore configuration to point to local jobtracker
2392    conf.set("mapreduce.jobtracker.address", "local");
2393  }
2394
2395  /**
2396   * Create a stubbed out RegionServerService, mainly for getting FS.
2397   */
2398  public RegionServerServices createMockRegionServerService() throws IOException {
2399    return createMockRegionServerService((ServerName) null);
2400  }
2401
2402  /**
2403   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2404   * TestTokenAuthentication
2405   */
2406  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2407    throws IOException {
2408    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2409    rss.setFileSystem(getTestFileSystem());
2410    rss.setRpcServer(rpc);
2411    return rss;
2412  }
2413
2414  /**
2415   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2416   * TestOpenRegionHandler
2417   */
2418  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2419    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2420    rss.setFileSystem(getTestFileSystem());
2421    return rss;
2422  }
2423
2424  /**
2425   * Expire the Master's session
2426   */
2427  public void expireMasterSession() throws Exception {
2428    HMaster master = getMiniHBaseCluster().getMaster();
2429    expireSession(master.getZooKeeper(), false);
2430  }
2431
2432  /**
2433   * Expire a region server's session
2434   * @param index which RS
2435   */
2436  public void expireRegionServerSession(int index) throws Exception {
2437    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2438    expireSession(rs.getZooKeeper(), false);
2439    decrementMinRegionServerCount();
2440  }
2441
2442  private void decrementMinRegionServerCount() {
2443    // decrement the count for this.conf, for newly spwaned master
2444    // this.hbaseCluster shares this configuration too
2445    decrementMinRegionServerCount(getConfiguration());
2446
2447    // each master thread keeps a copy of configuration
2448    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2449      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2450    }
2451  }
2452
2453  private void decrementMinRegionServerCount(Configuration conf) {
2454    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2455    if (currentCount != -1) {
2456      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2457    }
2458  }
2459
2460  public void expireSession(ZKWatcher nodeZK) throws Exception {
2461    expireSession(nodeZK, false);
2462  }
2463
2464  /**
2465   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2466   * http://hbase.apache.org/book.html#trouble.zookeeper
2467   * <p/>
2468   * There are issues when doing this:
2469   * <ol>
2470   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2471   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2472   * </ol>
2473   * @param nodeZK      - the ZK watcher to expire
2474   * @param checkStatus - true to check if we can create a Table with the current configuration.
2475   */
2476  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2477    Configuration c = new Configuration(this.conf);
2478    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2479    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2480    byte[] password = zk.getSessionPasswd();
2481    long sessionID = zk.getSessionId();
2482
2483    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2484    // so we create a first watcher to be sure that the
2485    // event was sent. We expect that if our watcher receives the event
2486    // other watchers on the same machine will get is as well.
2487    // When we ask to close the connection, ZK does not close it before
2488    // we receive all the events, so don't have to capture the event, just
2489    // closing the connection should be enough.
2490    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2491      @Override
2492      public void process(WatchedEvent watchedEvent) {
2493        LOG.info("Monitor ZKW received event=" + watchedEvent);
2494      }
2495    }, sessionID, password);
2496
2497    // Making it expire
2498    ZooKeeper newZK =
2499      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2500
2501    // ensure that we have connection to the server before closing down, otherwise
2502    // the close session event will be eaten out before we start CONNECTING state
2503    long start = EnvironmentEdgeManager.currentTime();
2504    while (
2505      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2506    ) {
2507      Thread.sleep(1);
2508    }
2509    newZK.close();
2510    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2511
2512    // Now closing & waiting to be sure that the clients get it.
2513    monitor.close();
2514
2515    if (checkStatus) {
2516      getConnection().getTable(TableName.META_TABLE_NAME).close();
2517    }
2518  }
2519
2520  /**
2521   * Get the Mini HBase cluster.
2522   * @return hbase cluster
2523   * @see #getHBaseClusterInterface()
2524   */
2525  public SingleProcessHBaseCluster getHBaseCluster() {
2526    return getMiniHBaseCluster();
2527  }
2528
2529  /**
2530   * Returns the HBaseCluster instance.
2531   * <p>
2532   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2533   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2534   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2535   * instead w/o the need to type-cast.
2536   */
2537  public HBaseClusterInterface getHBaseClusterInterface() {
2538    // implementation note: we should rename this method as #getHBaseCluster(),
2539    // but this would require refactoring 90+ calls.
2540    return hbaseCluster;
2541  }
2542
2543  /**
2544   * Resets the connections so that the next time getConnection() is called, a new connection is
2545   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2546   * the connection is not valid anymore.
2547   * <p/>
2548   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2549   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2550   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2551   * the connection state automatically. Cleaning this is a much bigger refactor.
2552   */
2553  public void invalidateConnection() throws IOException {
2554    closeConnection();
2555    // Update the master addresses if they changed.
2556    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2557    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2558    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2559      masterConfigBefore, masterConfAfter);
2560    conf.set(HConstants.MASTER_ADDRS_KEY,
2561      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2562  }
2563
2564  /**
2565   * Get a shared Connection to the cluster. this method is thread safe.
2566   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2567   */
2568  public Connection getConnection() throws IOException {
2569    return getAsyncConnection().toConnection();
2570  }
2571
2572  /**
2573   * Get a assigned Connection to the cluster. this method is thread safe.
2574   * @param user assigned user
2575   * @return A Connection with assigned user.
2576   */
2577  public Connection getConnection(User user) throws IOException {
2578    return getAsyncConnection(user).toConnection();
2579  }
2580
2581  /**
2582   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2583   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2584   *         of cluster.
2585   */
2586  public AsyncClusterConnection getAsyncConnection() throws IOException {
2587    try {
2588      return asyncConnection.updateAndGet(connection -> {
2589        if (connection == null) {
2590          try {
2591            User user = UserProvider.instantiate(conf).getCurrent();
2592            connection = getAsyncConnection(user);
2593          } catch (IOException ioe) {
2594            throw new UncheckedIOException("Failed to create connection", ioe);
2595          }
2596        }
2597        return connection;
2598      });
2599    } catch (UncheckedIOException exception) {
2600      throw exception.getCause();
2601    }
2602  }
2603
2604  /**
2605   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2606   * @param user assigned user
2607   * @return An AsyncClusterConnection with assigned user.
2608   */
2609  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2610    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2611  }
2612
2613  public void closeConnection() throws IOException {
2614    if (hbaseAdmin != null) {
2615      Closeables.close(hbaseAdmin, true);
2616      hbaseAdmin = null;
2617    }
2618    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2619    if (asyncConnection != null) {
2620      Closeables.close(asyncConnection, true);
2621    }
2622  }
2623
2624  /**
2625   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2626   * it has no effect, it will be closed automatically when the cluster shutdowns
2627   */
2628  public Admin getAdmin() throws IOException {
2629    if (hbaseAdmin == null) {
2630      this.hbaseAdmin = getConnection().getAdmin();
2631    }
2632    return hbaseAdmin;
2633  }
2634
2635  private Admin hbaseAdmin = null;
2636
2637  /**
2638   * Returns an {@link Hbck} instance. Needs be closed when done.
2639   */
2640  public Hbck getHbck() throws IOException {
2641    return getConnection().getHbck();
2642  }
2643
2644  /**
2645   * Unassign the named region.
2646   * @param regionName The region to unassign.
2647   */
2648  public void unassignRegion(String regionName) throws IOException {
2649    unassignRegion(Bytes.toBytes(regionName));
2650  }
2651
2652  /**
2653   * Unassign the named region.
2654   * @param regionName The region to unassign.
2655   */
2656  public void unassignRegion(byte[] regionName) throws IOException {
2657    getAdmin().unassign(regionName);
2658  }
2659
2660  /**
2661   * Closes the region containing the given row.
2662   * @param row   The row to find the containing region.
2663   * @param table The table to find the region.
2664   */
2665  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2666    unassignRegionByRow(Bytes.toBytes(row), table);
2667  }
2668
2669  /**
2670   * Closes the region containing the given row.
2671   * @param row   The row to find the containing region.
2672   * @param table The table to find the region.
2673   */
2674  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2675    HRegionLocation hrl = table.getRegionLocation(row);
2676    unassignRegion(hrl.getRegion().getRegionName());
2677  }
2678
2679  /**
2680   * Retrieves a splittable region randomly from tableName
2681   * @param tableName   name of table
2682   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2683   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2684   */
2685  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2686    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2687    int regCount = regions.size();
2688    Set<Integer> attempted = new HashSet<>();
2689    int idx;
2690    int attempts = 0;
2691    do {
2692      regions = getHBaseCluster().getRegions(tableName);
2693      if (regCount != regions.size()) {
2694        // if there was region movement, clear attempted Set
2695        attempted.clear();
2696      }
2697      regCount = regions.size();
2698      // There are chances that before we get the region for the table from an RS the region may
2699      // be going for CLOSE. This may be because online schema change is enabled
2700      if (regCount > 0) {
2701        idx = ThreadLocalRandom.current().nextInt(regCount);
2702        // if we have just tried this region, there is no need to try again
2703        if (attempted.contains(idx)) {
2704          continue;
2705        }
2706        HRegion region = regions.get(idx);
2707        if (region.checkSplit().isPresent()) {
2708          return region;
2709        }
2710        attempted.add(idx);
2711      }
2712      attempts++;
2713    } while (maxAttempts == -1 || attempts < maxAttempts);
2714    return null;
2715  }
2716
2717  public MiniDFSCluster getDFSCluster() {
2718    return dfsCluster;
2719  }
2720
2721  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2722    setDFSCluster(cluster, true);
2723  }
2724
2725  /**
2726   * Set the MiniDFSCluster
2727   * @param cluster     cluster to use
2728   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2729   *                    is set.
2730   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2731   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2732   */
2733  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2734    throws IllegalStateException, IOException {
2735    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2736      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2737    }
2738    this.dfsCluster = cluster;
2739    this.setFs();
2740  }
2741
2742  public FileSystem getTestFileSystem() throws IOException {
2743    return HFileSystem.get(conf);
2744  }
2745
2746  /**
2747   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2748   * (30 seconds).
2749   * @param table Table to wait on.
2750   */
2751  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2752    waitTableAvailable(table.getName(), 30000);
2753  }
2754
2755  public void waitTableAvailable(TableName table, long timeoutMillis)
2756    throws InterruptedException, IOException {
2757    waitFor(timeoutMillis, predicateTableAvailable(table));
2758  }
2759
2760  /**
2761   * Wait until all regions in a table have been assigned
2762   * @param table         Table to wait on.
2763   * @param timeoutMillis Timeout.
2764   */
2765  public void waitTableAvailable(byte[] table, long timeoutMillis)
2766    throws InterruptedException, IOException {
2767    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2768  }
2769
2770  public String explainTableAvailability(TableName tableName) throws IOException {
2771    StringBuilder msg =
2772      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2773    if (getHBaseCluster().getMaster().isAlive()) {
2774      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2775        .getRegionStates().getRegionAssignments();
2776      final List<Pair<RegionInfo, ServerName>> metaLocations =
2777        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2778      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2779        RegionInfo hri = metaLocation.getFirst();
2780        ServerName sn = metaLocation.getSecond();
2781        if (!assignments.containsKey(hri)) {
2782          msg.append(", region ").append(hri)
2783            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2784        } else if (sn == null) {
2785          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2786        } else if (!sn.equals(assignments.get(hri))) {
2787          msg.append(",  region ").append(hri)
2788            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2789            .append(" <> ").append(assignments.get(hri));
2790        }
2791      }
2792    }
2793    return msg.toString();
2794  }
2795
2796  public String explainTableState(final TableName table, TableState.State state)
2797    throws IOException {
2798    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2799    if (tableState == null) {
2800      return "TableState in META: No table state in META for table " + table
2801        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2802    } else if (!tableState.inStates(state)) {
2803      return "TableState in META: Not " + state + " state, but " + tableState;
2804    } else {
2805      return "TableState in META: OK";
2806    }
2807  }
2808
2809  @Nullable
2810  public TableState findLastTableState(final TableName table) throws IOException {
2811    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2812    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2813      @Override
2814      public boolean visit(Result r) throws IOException {
2815        if (!Arrays.equals(r.getRow(), table.getName())) {
2816          return false;
2817        }
2818        TableState state = CatalogFamilyFormat.getTableState(r);
2819        if (state != null) {
2820          lastTableState.set(state);
2821        }
2822        return true;
2823      }
2824    };
2825    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2826      Integer.MAX_VALUE, visitor);
2827    return lastTableState.get();
2828  }
2829
2830  /**
2831   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2832   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2833   * table.
2834   * @param table the table to wait on.
2835   * @throws InterruptedException if interrupted while waiting
2836   * @throws IOException          if an IO problem is encountered
2837   */
2838  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2839    waitTableEnabled(table, 30000);
2840  }
2841
2842  /**
2843   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2844   * have been all assigned.
2845   * @see #waitTableEnabled(TableName, long)
2846   * @param table         Table to wait on.
2847   * @param timeoutMillis Time to wait on it being marked enabled.
2848   */
2849  public void waitTableEnabled(byte[] table, long timeoutMillis)
2850    throws InterruptedException, IOException {
2851    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2852  }
2853
2854  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2855    waitFor(timeoutMillis, predicateTableEnabled(table));
2856  }
2857
2858  /**
2859   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2860   * after default period (30 seconds)
2861   * @param table Table to wait on.
2862   */
2863  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2864    waitTableDisabled(table, 30000);
2865  }
2866
2867  public void waitTableDisabled(TableName table, long millisTimeout)
2868    throws InterruptedException, IOException {
2869    waitFor(millisTimeout, predicateTableDisabled(table));
2870  }
2871
2872  /**
2873   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2874   * @param table         Table to wait on.
2875   * @param timeoutMillis Time to wait on it being marked disabled.
2876   */
2877  public void waitTableDisabled(byte[] table, long timeoutMillis)
2878    throws InterruptedException, IOException {
2879    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2880  }
2881
2882  /**
2883   * Make sure that at least the specified number of region servers are running
2884   * @param num minimum number of region servers that should be running
2885   * @return true if we started some servers
2886   */
2887  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2888    boolean startedServer = false;
2889    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2890    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2891      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2892      startedServer = true;
2893    }
2894
2895    return startedServer;
2896  }
2897
2898  /**
2899   * Make sure that at least the specified number of region servers are running. We don't count the
2900   * ones that are currently stopping or are stopped.
2901   * @param num minimum number of region servers that should be running
2902   * @return true if we started some servers
2903   */
2904  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2905    boolean startedServer = ensureSomeRegionServersAvailable(num);
2906
2907    int nonStoppedServers = 0;
2908    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2909
2910      HRegionServer hrs = rst.getRegionServer();
2911      if (hrs.isStopping() || hrs.isStopped()) {
2912        LOG.info("A region server is stopped or stopping:" + hrs);
2913      } else {
2914        nonStoppedServers++;
2915      }
2916    }
2917    for (int i = nonStoppedServers; i < num; ++i) {
2918      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2919      startedServer = true;
2920    }
2921    return startedServer;
2922  }
2923
2924  /**
2925   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2926   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2927   * @param c                     Initial configuration
2928   * @param differentiatingSuffix Suffix to differentiate this user from others.
2929   * @return A new configuration instance with a different user set into it.
2930   */
2931  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2932    throws IOException {
2933    FileSystem currentfs = FileSystem.get(c);
2934    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2935      return User.getCurrent();
2936    }
2937    // Else distributed filesystem. Make a new instance per daemon. Below
2938    // code is taken from the AppendTestUtil over in hdfs.
2939    String username = User.getCurrent().getName() + differentiatingSuffix;
2940    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2941    return user;
2942  }
2943
2944  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2945    throws IOException {
2946    NavigableSet<String> online = new TreeSet<>();
2947    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2948      try {
2949        for (RegionInfo region : ProtobufUtil
2950          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2951          online.add(region.getRegionNameAsString());
2952        }
2953      } catch (RegionServerStoppedException e) {
2954        // That's fine.
2955      }
2956    }
2957    return online;
2958  }
2959
2960  /**
2961   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2962   * linger. Here is the exception you'll see:
2963   *
2964   * <pre>
2965   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2966   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2967   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2968   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2969   * </pre>
2970   *
2971   * @param stream A DFSClient.DFSOutputStream.
2972   */
2973  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2974    try {
2975      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2976      for (Class<?> clazz : clazzes) {
2977        String className = clazz.getSimpleName();
2978        if (className.equals("DFSOutputStream")) {
2979          if (clazz.isInstance(stream)) {
2980            Field maxRecoveryErrorCountField =
2981              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2982            maxRecoveryErrorCountField.setAccessible(true);
2983            maxRecoveryErrorCountField.setInt(stream, max);
2984            break;
2985          }
2986        }
2987      }
2988    } catch (Exception e) {
2989      LOG.info("Could not set max recovery field", e);
2990    }
2991  }
2992
2993  /**
2994   * Uses directly the assignment manager to assign the region. and waits until the specified region
2995   * has completed assignment.
2996   * @return true if the region is assigned false otherwise.
2997   */
2998  public boolean assignRegion(final RegionInfo regionInfo)
2999    throws IOException, InterruptedException {
3000    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3001    am.assign(regionInfo);
3002    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3003  }
3004
3005  /**
3006   * Move region to destination server and wait till region is completely moved and online
3007   * @param destRegion region to move
3008   * @param destServer destination server of the region
3009   */
3010  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3011    throws InterruptedException, IOException {
3012    HMaster master = getMiniHBaseCluster().getMaster();
3013    // TODO: Here we start the move. The move can take a while.
3014    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3015    while (true) {
3016      ServerName serverName =
3017        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3018      if (serverName != null && serverName.equals(destServer)) {
3019        assertRegionOnServer(destRegion, serverName, 2000);
3020        break;
3021      }
3022      Thread.sleep(10);
3023    }
3024  }
3025
3026  /**
3027   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3028   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3029   * master has been informed and updated hbase:meta with the regions deployed server.
3030   * @param tableName the table name
3031   */
3032  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3033    waitUntilAllRegionsAssigned(tableName,
3034      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3035  }
3036
3037  /**
3038   * Waith until all system table's regions get assigned
3039   */
3040  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3041    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3042  }
3043
3044  /**
3045   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3046   * timeout. This means all regions have been deployed, master has been informed and updated
3047   * hbase:meta with the regions deployed server.
3048   * @param tableName the table name
3049   * @param timeout   timeout, in milliseconds
3050   */
3051  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3052    throws IOException {
3053    if (!TableName.isMetaTableName(tableName)) {
3054      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3055        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3056          + timeout + "ms");
3057        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3058          @Override
3059          public String explainFailure() throws IOException {
3060            return explainTableAvailability(tableName);
3061          }
3062
3063          @Override
3064          public boolean evaluate() throws IOException {
3065            Scan scan = new Scan();
3066            scan.addFamily(HConstants.CATALOG_FAMILY);
3067            boolean tableFound = false;
3068            try (ResultScanner s = meta.getScanner(scan)) {
3069              for (Result r; (r = s.next()) != null;) {
3070                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3071                RegionInfo info = RegionInfo.parseFromOrNull(b);
3072                if (info != null && info.getTable().equals(tableName)) {
3073                  // Get server hosting this region from catalog family. Return false if no server
3074                  // hosting this region, or if the server hosting this region was recently killed
3075                  // (for fault tolerance testing).
3076                  tableFound = true;
3077                  byte[] server =
3078                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3079                  if (server == null) {
3080                    return false;
3081                  } else {
3082                    byte[] startCode =
3083                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3084                    ServerName serverName =
3085                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3086                        + Bytes.toLong(startCode));
3087                    if (
3088                      !getHBaseClusterInterface().isDistributedCluster()
3089                        && getHBaseCluster().isKilledRS(serverName)
3090                    ) {
3091                      return false;
3092                    }
3093                  }
3094                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3095                    return false;
3096                  }
3097                }
3098              }
3099            }
3100            if (!tableFound) {
3101              LOG.warn(
3102                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3103            }
3104            return tableFound;
3105          }
3106        });
3107      }
3108    }
3109    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3110    // check from the master state if we are using a mini cluster
3111    if (!getHBaseClusterInterface().isDistributedCluster()) {
3112      // So, all regions are in the meta table but make sure master knows of the assignments before
3113      // returning -- sometimes this can lag.
3114      HMaster master = getHBaseCluster().getMaster();
3115      final RegionStates states = master.getAssignmentManager().getRegionStates();
3116      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3117        @Override
3118        public String explainFailure() throws IOException {
3119          return explainTableAvailability(tableName);
3120        }
3121
3122        @Override
3123        public boolean evaluate() throws IOException {
3124          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3125          return hris != null && !hris.isEmpty();
3126        }
3127      });
3128    }
3129    LOG.info("All regions for table " + tableName + " assigned.");
3130  }
3131
3132  /**
3133   * Do a small get/scan against one store. This is required because store has no actual methods of
3134   * querying itself, and relies on StoreScanner.
3135   */
3136  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3137    Scan scan = new Scan(get);
3138    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3139      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3140      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3141      // readpoint 0.
3142      0);
3143
3144    List<Cell> result = new ArrayList<>();
3145    scanner.next(result);
3146    if (!result.isEmpty()) {
3147      // verify that we are on the row we want:
3148      Cell kv = result.get(0);
3149      if (!CellUtil.matchingRows(kv, get.getRow())) {
3150        result.clear();
3151      }
3152    }
3153    scanner.close();
3154    return result;
3155  }
3156
3157  /**
3158   * Create region split keys between startkey and endKey
3159   * @param numRegions the number of regions to be created. it has to be greater than 3.
3160   * @return resulting split keys
3161   */
3162  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3163    assertTrue(numRegions > 3);
3164    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3165    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3166    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3167    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3168    return result;
3169  }
3170
3171  /**
3172   * Do a small get/scan against one store. This is required because store has no actual methods of
3173   * querying itself, and relies on StoreScanner.
3174   */
3175  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3176    throws IOException {
3177    Get get = new Get(row);
3178    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3179    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3180
3181    return getFromStoreFile(store, get);
3182  }
3183
3184  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3185    final List<? extends Cell> actual) {
3186    final int eLen = expected.size();
3187    final int aLen = actual.size();
3188    final int minLen = Math.min(eLen, aLen);
3189
3190    int i = 0;
3191    while (
3192      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3193    ) {
3194      i++;
3195    }
3196
3197    if (additionalMsg == null) {
3198      additionalMsg = "";
3199    }
3200    if (!additionalMsg.isEmpty()) {
3201      additionalMsg = ". " + additionalMsg;
3202    }
3203
3204    if (eLen != aLen || i != minLen) {
3205      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3206        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3207        + " (length " + aLen + ")" + additionalMsg);
3208    }
3209  }
3210
3211  public static <T> String safeGetAsStr(List<T> lst, int i) {
3212    if (0 <= i && i < lst.size()) {
3213      return lst.get(i).toString();
3214    } else {
3215      return "<out_of_range>";
3216    }
3217  }
3218
3219  public String getClusterKey() {
3220    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3221      + ":"
3222      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3223  }
3224
3225  /**
3226   * Creates a random table with the given parameters
3227   */
3228  public Table createRandomTable(TableName tableName, final Collection<String> families,
3229    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3230    final int numRowsPerFlush) throws IOException, InterruptedException {
3231    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3232      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3233      + maxVersions + "\n");
3234
3235    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3236    final int numCF = families.size();
3237    final byte[][] cfBytes = new byte[numCF][];
3238    {
3239      int cfIndex = 0;
3240      for (String cf : families) {
3241        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3242      }
3243    }
3244
3245    final int actualStartKey = 0;
3246    final int actualEndKey = Integer.MAX_VALUE;
3247    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3248    final int splitStartKey = actualStartKey + keysPerRegion;
3249    final int splitEndKey = actualEndKey - keysPerRegion;
3250    final String keyFormat = "%08x";
3251    final Table table = createTable(tableName, cfBytes, maxVersions,
3252      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3253      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3254
3255    if (hbaseCluster != null) {
3256      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3257    }
3258
3259    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3260
3261    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3262      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3263        final byte[] row = Bytes.toBytes(
3264          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3265
3266        Put put = new Put(row);
3267        Delete del = new Delete(row);
3268        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3269          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3270          final long ts = rand.nextInt();
3271          final byte[] qual = Bytes.toBytes("col" + iCol);
3272          if (rand.nextBoolean()) {
3273            final byte[] value =
3274              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3275                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3276            put.addColumn(cf, qual, ts, value);
3277          } else if (rand.nextDouble() < 0.8) {
3278            del.addColumn(cf, qual, ts);
3279          } else {
3280            del.addColumns(cf, qual, ts);
3281          }
3282        }
3283
3284        if (!put.isEmpty()) {
3285          mutator.mutate(put);
3286        }
3287
3288        if (!del.isEmpty()) {
3289          mutator.mutate(del);
3290        }
3291      }
3292      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3293      mutator.flush();
3294      if (hbaseCluster != null) {
3295        getMiniHBaseCluster().flushcache(table.getName());
3296      }
3297    }
3298    mutator.close();
3299
3300    return table;
3301  }
3302
3303  public static int randomFreePort() {
3304    return HBaseCommonTestingUtil.randomFreePort();
3305  }
3306
3307  public static String randomMultiCastAddress() {
3308    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3309  }
3310
3311  public static void waitForHostPort(String host, int port) throws IOException {
3312    final int maxTimeMs = 10000;
3313    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3314    IOException savedException = null;
3315    LOG.info("Waiting for server at " + host + ":" + port);
3316    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3317      try {
3318        Socket sock = new Socket(InetAddress.getByName(host), port);
3319        sock.close();
3320        savedException = null;
3321        LOG.info("Server at " + host + ":" + port + " is available");
3322        break;
3323      } catch (UnknownHostException e) {
3324        throw new IOException("Failed to look up " + host, e);
3325      } catch (IOException e) {
3326        savedException = e;
3327      }
3328      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3329    }
3330
3331    if (savedException != null) {
3332      throw savedException;
3333    }
3334  }
3335
3336  /**
3337   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3338   * continues.
3339   * @return the number of regions the table was split into
3340   */
3341  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3342    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3343    throws IOException {
3344    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3345      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3346  }
3347
3348  /**
3349   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3350   * continues.
3351   * @return the number of regions the table was split into
3352   */
3353  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3354    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3355    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3356    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3357    builder.setDurability(durability);
3358    builder.setRegionReplication(regionReplication);
3359    ColumnFamilyDescriptorBuilder cfBuilder =
3360      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3361    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3362    cfBuilder.setCompressionType(compression);
3363    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3364      numRegionsPerServer);
3365  }
3366
3367  /**
3368   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3369   * continues.
3370   * @return the number of regions the table was split into
3371   */
3372  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3373    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3374    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3375    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3376    builder.setDurability(durability);
3377    builder.setRegionReplication(regionReplication);
3378    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3379    for (int i = 0; i < columnFamilies.length; i++) {
3380      ColumnFamilyDescriptorBuilder cfBuilder =
3381        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3382      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3383      cfBuilder.setCompressionType(compression);
3384      hcds[i] = cfBuilder.build();
3385    }
3386    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3387  }
3388
3389  /**
3390   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3391   * continues.
3392   * @return the number of regions the table was split into
3393   */
3394  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3395    ColumnFamilyDescriptor hcd) throws IOException {
3396    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3397  }
3398
3399  /**
3400   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3401   * continues.
3402   * @return the number of regions the table was split into
3403   */
3404  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3405    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3406    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3407      numRegionsPerServer);
3408  }
3409
3410  /**
3411   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3412   * continues.
3413   * @return the number of regions the table was split into
3414   */
3415  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3416    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3417    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3418      numRegionsPerServer);
3419  }
3420
3421  /**
3422   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3423   * continues.
3424   * @return the number of regions the table was split into
3425   */
3426  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3427    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3428    throws IOException {
3429    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3430    for (ColumnFamilyDescriptor cd : cds) {
3431      if (!td.hasColumnFamily(cd.getName())) {
3432        builder.setColumnFamily(cd);
3433      }
3434    }
3435    td = builder.build();
3436    int totalNumberOfRegions = 0;
3437    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3438    Admin admin = unmanagedConnection.getAdmin();
3439
3440    try {
3441      // create a table a pre-splits regions.
3442      // The number of splits is set as:
3443      // region servers * regions per region server).
3444      int numberOfServers = admin.getRegionServers().size();
3445      if (numberOfServers == 0) {
3446        throw new IllegalStateException("No live regionservers");
3447      }
3448
3449      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3450      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3451        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3452        + numRegionsPerServer + ")");
3453
3454      byte[][] splits = splitter.split(totalNumberOfRegions);
3455
3456      admin.createTable(td, splits);
3457    } catch (MasterNotRunningException e) {
3458      LOG.error("Master not running", e);
3459      throw new IOException(e);
3460    } catch (TableExistsException e) {
3461      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3462    } finally {
3463      admin.close();
3464      unmanagedConnection.close();
3465    }
3466    return totalNumberOfRegions;
3467  }
3468
3469  public static int getMetaRSPort(Connection connection) throws IOException {
3470    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3471      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3472    }
3473  }
3474
3475  /**
3476   * Due to async racing issue, a region may not be in the online region list of a region server
3477   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3478   */
3479  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3480    final long timeout) throws IOException, InterruptedException {
3481    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3482    while (true) {
3483      List<RegionInfo> regions = getAdmin().getRegions(server);
3484      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3485      long now = EnvironmentEdgeManager.currentTime();
3486      if (now > timeoutTime) break;
3487      Thread.sleep(10);
3488    }
3489    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3490  }
3491
3492  /**
3493   * Check to make sure the region is open on the specified region server, but not on any other one.
3494   */
3495  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3496    final long timeout) throws IOException, InterruptedException {
3497    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3498    while (true) {
3499      List<RegionInfo> regions = getAdmin().getRegions(server);
3500      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3501        List<JVMClusterUtil.RegionServerThread> rsThreads =
3502          getHBaseCluster().getLiveRegionServerThreads();
3503        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3504          HRegionServer rs = rsThread.getRegionServer();
3505          if (server.equals(rs.getServerName())) {
3506            continue;
3507          }
3508          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3509          for (HRegion r : hrs) {
3510            assertTrue("Region should not be double assigned",
3511              r.getRegionInfo().getRegionId() != hri.getRegionId());
3512          }
3513        }
3514        return; // good, we are happy
3515      }
3516      long now = EnvironmentEdgeManager.currentTime();
3517      if (now > timeoutTime) break;
3518      Thread.sleep(10);
3519    }
3520    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3521  }
3522
3523  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3524    TableDescriptor td =
3525      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3526    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3527    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3528  }
3529
3530  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3531    BlockCache blockCache) throws IOException {
3532    TableDescriptor td =
3533      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3534    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3535    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3536  }
3537
3538  public static void setFileSystemURI(String fsURI) {
3539    FS_URI = fsURI;
3540  }
3541
3542  /**
3543   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3544   */
3545  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3546    return new ExplainingPredicate<IOException>() {
3547      @Override
3548      public String explainFailure() throws IOException {
3549        final RegionStates regionStates =
3550          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3551        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3552      }
3553
3554      @Override
3555      public boolean evaluate() throws IOException {
3556        HMaster master = getMiniHBaseCluster().getMaster();
3557        if (master == null) return false;
3558        AssignmentManager am = master.getAssignmentManager();
3559        if (am == null) return false;
3560        return !am.hasRegionsInTransition();
3561      }
3562    };
3563  }
3564
3565  /**
3566   * Returns a {@link Predicate} for checking that table is enabled
3567   */
3568  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3569    return new ExplainingPredicate<IOException>() {
3570      @Override
3571      public String explainFailure() throws IOException {
3572        return explainTableState(tableName, TableState.State.ENABLED);
3573      }
3574
3575      @Override
3576      public boolean evaluate() throws IOException {
3577        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3578      }
3579    };
3580  }
3581
3582  /**
3583   * Returns a {@link Predicate} for checking that table is enabled
3584   */
3585  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3586    return new ExplainingPredicate<IOException>() {
3587      @Override
3588      public String explainFailure() throws IOException {
3589        return explainTableState(tableName, TableState.State.DISABLED);
3590      }
3591
3592      @Override
3593      public boolean evaluate() throws IOException {
3594        return getAdmin().isTableDisabled(tableName);
3595      }
3596    };
3597  }
3598
3599  /**
3600   * Returns a {@link Predicate} for checking that table is enabled
3601   */
3602  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3603    return new ExplainingPredicate<IOException>() {
3604      @Override
3605      public String explainFailure() throws IOException {
3606        return explainTableAvailability(tableName);
3607      }
3608
3609      @Override
3610      public boolean evaluate() throws IOException {
3611        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3612        if (tableAvailable) {
3613          try (Table table = getConnection().getTable(tableName)) {
3614            TableDescriptor htd = table.getDescriptor();
3615            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3616              .getAllRegionLocations()) {
3617              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3618                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3619                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3620              for (byte[] family : htd.getColumnFamilyNames()) {
3621                scan.addFamily(family);
3622              }
3623              try (ResultScanner scanner = table.getScanner(scan)) {
3624                scanner.next();
3625              }
3626            }
3627          }
3628        }
3629        return tableAvailable;
3630      }
3631    };
3632  }
3633
3634  /**
3635   * Wait until no regions in transition.
3636   * @param timeout How long to wait.
3637   */
3638  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3639    waitFor(timeout, predicateNoRegionsInTransition());
3640  }
3641
3642  /**
3643   * Wait until no regions in transition. (time limit 15min)
3644   */
3645  public void waitUntilNoRegionsInTransition() throws IOException {
3646    waitUntilNoRegionsInTransition(15 * 60000);
3647  }
3648
3649  /**
3650   * Wait until labels is ready in VisibilityLabelsCache.
3651   */
3652  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3653    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3654    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3655
3656      @Override
3657      public boolean evaluate() {
3658        for (String label : labels) {
3659          if (labelsCache.getLabelOrdinal(label) == 0) {
3660            return false;
3661          }
3662        }
3663        return true;
3664      }
3665
3666      @Override
3667      public String explainFailure() {
3668        for (String label : labels) {
3669          if (labelsCache.getLabelOrdinal(label) == 0) {
3670            return label + " is not available yet";
3671          }
3672        }
3673        return "";
3674      }
3675    });
3676  }
3677
3678  /**
3679   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3680   * available.
3681   * @return the list of column descriptors
3682   */
3683  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3684    return generateColumnDescriptors("");
3685  }
3686
3687  /**
3688   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3689   * available.
3690   * @param prefix family names prefix
3691   * @return the list of column descriptors
3692   */
3693  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3694    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3695    long familyId = 0;
3696    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3697      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3698        for (BloomType bloomType : BloomType.values()) {
3699          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3700          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3701            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3702          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3703          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3704          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3705          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3706          familyId++;
3707        }
3708      }
3709    }
3710    return columnFamilyDescriptors;
3711  }
3712
3713  /**
3714   * Get supported compression algorithms.
3715   * @return supported compression algorithms.
3716   */
3717  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3718    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3719    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3720    for (String algoName : allAlgos) {
3721      try {
3722        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3723        algo.getCompressor();
3724        supportedAlgos.add(algo);
3725      } catch (Throwable t) {
3726        // this algo is not available
3727      }
3728    }
3729    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3730  }
3731
3732  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3733    Scan scan = new Scan().withStartRow(row);
3734    scan.setReadType(ReadType.PREAD);
3735    scan.setCaching(1);
3736    scan.setReversed(true);
3737    scan.addFamily(family);
3738    try (RegionScanner scanner = r.getScanner(scan)) {
3739      List<Cell> cells = new ArrayList<>(1);
3740      scanner.next(cells);
3741      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3742        return null;
3743      }
3744      return Result.create(cells);
3745    }
3746  }
3747
3748  private boolean isTargetTable(final byte[] inRow, Cell c) {
3749    String inputRowString = Bytes.toString(inRow);
3750    int i = inputRowString.indexOf(HConstants.DELIMITER);
3751    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3752    int o = outputRowString.indexOf(HConstants.DELIMITER);
3753    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3754  }
3755
3756  /**
3757   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3758   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3759   * kerby KDC server and utility for using it,
3760   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3761   * less baggage. It came in in HBASE-5291.
3762   */
3763  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3764    Properties conf = MiniKdc.createConf();
3765    conf.put(MiniKdc.DEBUG, true);
3766    MiniKdc kdc = null;
3767    File dir = null;
3768    // There is time lag between selecting a port and trying to bind with it. It's possible that
3769    // another service captures the port in between which'll result in BindException.
3770    boolean bindException;
3771    int numTries = 0;
3772    do {
3773      try {
3774        bindException = false;
3775        dir = new File(getDataTestDir("kdc").toUri().getPath());
3776        kdc = new MiniKdc(conf, dir);
3777        kdc.start();
3778      } catch (BindException e) {
3779        FileUtils.deleteDirectory(dir); // clean directory
3780        numTries++;
3781        if (numTries == 3) {
3782          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3783          throw e;
3784        }
3785        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3786        bindException = true;
3787      }
3788    } while (bindException);
3789    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3790    return kdc;
3791  }
3792
3793  public int getNumHFiles(final TableName tableName, final byte[] family) {
3794    int numHFiles = 0;
3795    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3796      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3797    }
3798    return numHFiles;
3799  }
3800
3801  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3802    final byte[] family) {
3803    int numHFiles = 0;
3804    for (Region region : rs.getRegions(tableName)) {
3805      numHFiles += region.getStore(family).getStorefilesCount();
3806    }
3807    return numHFiles;
3808  }
3809
3810  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3811    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3812    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3813    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3814    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3815    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3816        it2 = rtdFamilies.iterator(); it.hasNext();) {
3817      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3818    }
3819  }
3820
3821  /**
3822   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3823   * invocations.
3824   */
3825  public static void await(final long sleepMillis, final BooleanSupplier condition)
3826    throws InterruptedException {
3827    try {
3828      while (!condition.getAsBoolean()) {
3829        Thread.sleep(sleepMillis);
3830      }
3831    } catch (RuntimeException e) {
3832      if (e.getCause() instanceof AssertionError) {
3833        throw (AssertionError) e.getCause();
3834      }
3835      throw e;
3836    }
3837  }
3838}