001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.File;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.io.UncheckedIOException;
030import java.lang.reflect.Field;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
092import org.apache.hadoop.hbase.fs.HFileSystem;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.BlockCache;
097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
098import org.apache.hadoop.hbase.io.hfile.HFile;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
101import org.apache.hadoop.hbase.master.HMaster;
102import org.apache.hadoop.hbase.master.RegionState;
103import org.apache.hadoop.hbase.master.ServerManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
106import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
107import org.apache.hadoop.hbase.master.assignment.RegionStates;
108import org.apache.hadoop.hbase.mob.MobFileCache;
109import org.apache.hadoop.hbase.regionserver.BloomType;
110import org.apache.hadoop.hbase.regionserver.ChunkCreator;
111import org.apache.hadoop.hbase.regionserver.HRegion;
112import org.apache.hadoop.hbase.regionserver.HRegionServer;
113import org.apache.hadoop.hbase.regionserver.HStore;
114import org.apache.hadoop.hbase.regionserver.InternalScanner;
115import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
116import org.apache.hadoop.hbase.regionserver.Region;
117import org.apache.hadoop.hbase.regionserver.RegionScanner;
118import org.apache.hadoop.hbase.regionserver.RegionServerServices;
119import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
120import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
121import org.apache.hadoop.hbase.security.User;
122import org.apache.hadoop.hbase.security.UserProvider;
123import org.apache.hadoop.hbase.security.access.AccessController;
124import org.apache.hadoop.hbase.security.access.PermissionStorage;
125import org.apache.hadoop.hbase.security.access.SecureTestUtil;
126import org.apache.hadoop.hbase.security.token.TokenProvider;
127import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
128import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
129import org.apache.hadoop.hbase.util.Bytes;
130import org.apache.hadoop.hbase.util.CommonFSUtils;
131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
132import org.apache.hadoop.hbase.util.FSUtils;
133import org.apache.hadoop.hbase.util.JVM;
134import org.apache.hadoop.hbase.util.JVMClusterUtil;
135import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
136import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
137import org.apache.hadoop.hbase.util.Pair;
138import org.apache.hadoop.hbase.util.RetryCounter;
139import org.apache.hadoop.hbase.util.Threads;
140import org.apache.hadoop.hbase.wal.WAL;
141import org.apache.hadoop.hbase.wal.WALFactory;
142import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
143import org.apache.hadoop.hbase.zookeeper.ZKConfig;
144import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
145import org.apache.hadoop.hdfs.DFSClient;
146import org.apache.hadoop.hdfs.DistributedFileSystem;
147import org.apache.hadoop.hdfs.MiniDFSCluster;
148import org.apache.hadoop.hdfs.server.datanode.DataNode;
149import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
150import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
151import org.apache.hadoop.mapred.JobConf;
152import org.apache.hadoop.mapred.MiniMRCluster;
153import org.apache.hadoop.minikdc.MiniKdc;
154import org.apache.yetus.audience.InterfaceAudience;
155import org.apache.yetus.audience.InterfaceStability;
156import org.apache.zookeeper.WatchedEvent;
157import org.apache.zookeeper.ZooKeeper;
158import org.apache.zookeeper.ZooKeeper.States;
159
160import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
161
162import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
163
164/**
165 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
166 * functionality. Create an instance and keep it around testing HBase.
167 * <p/>
168 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
169 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
170 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
171 * cluster.
172 * <p/>
173 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
174 * <p/>
175 * It does not set logging levels.
176 * <p/>
177 * In the configuration properties, default values for master-info-port and region-server-port are
178 * overridden such that a random port will be assigned (thus avoiding port contention if another
179 * local HBase instance is already running).
180 * <p/>
181 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
182 * setting it to true.
183 */
184@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
185@InterfaceStability.Evolving
186public class HBaseTestingUtil extends HBaseZKTestingUtil {
187
188  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
189
190  private MiniDFSCluster dfsCluster = null;
191  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
192
193  private volatile HBaseClusterInterface hbaseCluster = null;
194  private MiniMRCluster mrCluster = null;
195
196  /** If there is a mini cluster running for this testing utility instance. */
197  private volatile boolean miniClusterRunning;
198
199  private String hadoopLogDir;
200
201  /**
202   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
203   */
204  private Path dataTestDirOnTestFS = null;
205
206  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
207
208  /** Filesystem URI used for map-reduce mini-cluster setup */
209  private static String FS_URI;
210
211  /** This is for unit tests parameterized with a single boolean. */
212  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
213
214  /**
215   * Checks to see if a specific port is available.
216   * @param port the port number to check for availability
217   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
218   */
219  public static boolean available(int port) {
220    ServerSocket ss = null;
221    DatagramSocket ds = null;
222    try {
223      ss = new ServerSocket(port);
224      ss.setReuseAddress(true);
225      ds = new DatagramSocket(port);
226      ds.setReuseAddress(true);
227      return true;
228    } catch (IOException e) {
229      // Do nothing
230    } finally {
231      if (ds != null) {
232        ds.close();
233      }
234
235      if (ss != null) {
236        try {
237          ss.close();
238        } catch (IOException e) {
239          /* should not be thrown */
240        }
241      }
242    }
243
244    return false;
245  }
246
247  /**
248   * Create all combinations of Bloom filters and compression algorithms for testing.
249   */
250  private static List<Object[]> bloomAndCompressionCombinations() {
251    List<Object[]> configurations = new ArrayList<>();
252    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
253      for (BloomType bloomType : BloomType.values()) {
254        configurations.add(new Object[] { comprAlgo, bloomType });
255      }
256    }
257    return Collections.unmodifiableList(configurations);
258  }
259
260  /**
261   * Create combination of memstoreTS and tags
262   */
263  private static List<Object[]> memStoreTSAndTagsCombination() {
264    List<Object[]> configurations = new ArrayList<>();
265    configurations.add(new Object[] { false, false });
266    configurations.add(new Object[] { false, true });
267    configurations.add(new Object[] { true, false });
268    configurations.add(new Object[] { true, true });
269    return Collections.unmodifiableList(configurations);
270  }
271
272  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
273    List<Object[]> configurations = new ArrayList<>();
274    configurations.add(new Object[] { false, false, true });
275    configurations.add(new Object[] { false, false, false });
276    configurations.add(new Object[] { false, true, true });
277    configurations.add(new Object[] { false, true, false });
278    configurations.add(new Object[] { true, false, true });
279    configurations.add(new Object[] { true, false, false });
280    configurations.add(new Object[] { true, true, true });
281    configurations.add(new Object[] { true, true, false });
282    return Collections.unmodifiableList(configurations);
283  }
284
285  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
286    bloomAndCompressionCombinations();
287
288  /**
289   * <p>
290   * Create an HBaseTestingUtility using a default configuration.
291   * <p>
292   * Initially, all tmp files are written to a local test data directory. Once
293   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
294   * data will be written to the DFS directory instead.
295   */
296  public HBaseTestingUtil() {
297    this(HBaseConfiguration.create());
298  }
299
300  /**
301   * <p>
302   * Create an HBaseTestingUtility using a given configuration.
303   * <p>
304   * Initially, all tmp files are written to a local test data directory. Once
305   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
306   * data will be written to the DFS directory instead.
307   * @param conf The configuration to use for further operations
308   */
309  public HBaseTestingUtil(@Nullable Configuration conf) {
310    super(conf);
311
312    // a hbase checksum verification failure will cause unit tests to fail
313    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
314
315    // Save this for when setting default file:// breaks things
316    if (this.conf.get("fs.defaultFS") != null) {
317      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
318    }
319    if (this.conf.get(HConstants.HBASE_DIR) != null) {
320      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
321    }
322    // Every cluster is a local cluster until we start DFS
323    // Note that conf could be null, but this.conf will not be
324    String dataTestDir = getDataTestDir().toString();
325    this.conf.set("fs.defaultFS", "file:///");
326    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
327    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
328    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
329    // If the value for random ports isn't set set it to true, thus making
330    // tests opt-out for random port assignment
331    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
332      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
333  }
334
335  /**
336   * Close both the region {@code r} and it's underlying WAL. For use in tests.
337   */
338  public static void closeRegionAndWAL(final Region r) throws IOException {
339    closeRegionAndWAL((HRegion) r);
340  }
341
342  /**
343   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
344   */
345  public static void closeRegionAndWAL(final HRegion r) throws IOException {
346    if (r == null) return;
347    r.close();
348    if (r.getWAL() == null) return;
349    r.getWAL().close();
350  }
351
352  /**
353   * Start mini secure cluster with given kdc and principals.
354   * @param kdc              Mini kdc server
355   * @param servicePrincipal Service principal without realm.
356   * @param spnegoPrincipal  Spnego principal without realm.
357   * @return Handler to shutdown the cluster
358   */
359  public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
360    String spnegoPrincipal) throws Exception {
361    Configuration conf = getConfiguration();
362
363    SecureTestUtil.enableSecurity(conf);
364    VisibilityTestUtil.enableVisiblityLabels(conf);
365    SecureTestUtil.verifyConfiguration(conf);
366
367    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
368      AccessController.class.getName() + ',' + TokenProvider.class.getName());
369
370    HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
371      spnegoPrincipal + '@' + kdc.getRealm());
372
373    startMiniCluster();
374    try {
375      waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
376    } catch (Exception e) {
377      shutdownMiniCluster();
378      throw e;
379    }
380
381    return this::shutdownMiniCluster;
382  }
383
384  /**
385   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
386   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
387   * by the Configuration. If say, a Connection was being used against a cluster that had been
388   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
389   * Rather than use the return direct, its usually best to make a copy and use that. Do
390   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
391   * @return Instance of Configuration.
392   */
393  @Override
394  public Configuration getConfiguration() {
395    return super.getConfiguration();
396  }
397
398  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
399    this.hbaseCluster = hbaseCluster;
400  }
401
402  /**
403   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
404   * have many concurrent tests running if we need to. Moding a System property is not the way to do
405   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
406   * not anything can do about it at moment; single instance only is how the minidfscluster works.
407   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
408   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
409   * (We do not create them!).
410   * @return The calculated data test build directory, if newly-created.
411   */
412  @Override
413  protected Path setupDataTestDir() {
414    Path testPath = super.setupDataTestDir();
415    if (null == testPath) {
416      return null;
417    }
418
419    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
420
421    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
422    // we want our own value to ensure uniqueness on the same machine
423    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
424
425    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
426    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
427    return testPath;
428  }
429
430  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
431
432    String sysValue = System.getProperty(propertyName);
433
434    if (sysValue != null) {
435      // There is already a value set. So we do nothing but hope
436      // that there will be no conflicts
437      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
438        + " so I do NOT create it in " + parent);
439      String confValue = conf.get(propertyName);
440      if (confValue != null && !confValue.endsWith(sysValue)) {
441        LOG.warn(propertyName + " property value differs in configuration and system: "
442          + "Configuration=" + confValue + " while System=" + sysValue
443          + " Erasing configuration value by system value.");
444      }
445      conf.set(propertyName, sysValue);
446    } else {
447      // Ok, it's not set, so we create it as a subdirectory
448      createSubDir(propertyName, parent, subDirName);
449      System.setProperty(propertyName, conf.get(propertyName));
450    }
451  }
452
453  /**
454   * @return Where to write test data on the test filesystem; Returns working directory for the test
455   *         filesystem by default
456   * @see #setupDataTestDirOnTestFS()
457   * @see #getTestFileSystem()
458   */
459  private Path getBaseTestDirOnTestFS() throws IOException {
460    FileSystem fs = getTestFileSystem();
461    return new Path(fs.getWorkingDirectory(), "test-data");
462  }
463
464  /**
465   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
466   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
467   * on it.
468   * @return a unique path in the test filesystem
469   */
470  public Path getDataTestDirOnTestFS() throws IOException {
471    if (dataTestDirOnTestFS == null) {
472      setupDataTestDirOnTestFS();
473    }
474
475    return dataTestDirOnTestFS;
476  }
477
478  /**
479   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
480   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
481   * on it.
482   * @return a unique path in the test filesystem
483   * @param subdirName name of the subdir to create under the base test dir
484   */
485  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
486    return new Path(getDataTestDirOnTestFS(), subdirName);
487  }
488
489  /**
490   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
491   * setup.
492   */
493  private void setupDataTestDirOnTestFS() throws IOException {
494    if (dataTestDirOnTestFS != null) {
495      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
496      return;
497    }
498    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
499  }
500
501  /**
502   * Sets up a new path in test filesystem to be used by tests.
503   */
504  private Path getNewDataTestDirOnTestFS() throws IOException {
505    // The file system can be either local, mini dfs, or if the configuration
506    // is supplied externally, it can be an external cluster FS. If it is a local
507    // file system, the tests should use getBaseTestDir, otherwise, we can use
508    // the working directory, and create a unique sub dir there
509    FileSystem fs = getTestFileSystem();
510    Path newDataTestDir;
511    String randomStr = getRandomUUID().toString();
512    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
513      newDataTestDir = new Path(getDataTestDir(), randomStr);
514      File dataTestDir = new File(newDataTestDir.toString());
515      if (deleteOnExit()) dataTestDir.deleteOnExit();
516    } else {
517      Path base = getBaseTestDirOnTestFS();
518      newDataTestDir = new Path(base, randomStr);
519      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
520    }
521    return newDataTestDir;
522  }
523
524  /**
525   * Cleans the test data directory on the test filesystem.
526   * @return True if we removed the test dirs
527   */
528  public boolean cleanupDataTestDirOnTestFS() throws IOException {
529    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
530    if (ret) {
531      dataTestDirOnTestFS = null;
532    }
533    return ret;
534  }
535
536  /**
537   * Cleans a subdirectory under the test data directory on the test filesystem.
538   * @return True if we removed child
539   */
540  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
541    Path cpath = getDataTestDirOnTestFS(subdirName);
542    return getTestFileSystem().delete(cpath, true);
543  }
544
545  /**
546   * Start a minidfscluster.
547   * @param servers How many DNs to start.
548   * @see #shutdownMiniDFSCluster()
549   * @return The mini dfs cluster created.
550   */
551  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
552    return startMiniDFSCluster(servers, null);
553  }
554
555  /**
556   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
557   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
558   * instances of the datanodes will have the same host name.
559   * @param hosts hostnames DNs to run on.
560   * @see #shutdownMiniDFSCluster()
561   * @return The mini dfs cluster created.
562   */
563  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
564    if (hosts != null && hosts.length != 0) {
565      return startMiniDFSCluster(hosts.length, hosts);
566    } else {
567      return startMiniDFSCluster(1, null);
568    }
569  }
570
571  /**
572   * Start a minidfscluster. Can only create one.
573   * @param servers How many DNs to start.
574   * @param hosts   hostnames DNs to run on.
575   * @see #shutdownMiniDFSCluster()
576   * @return The mini dfs cluster created.
577   */
578  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
579    return startMiniDFSCluster(servers, null, hosts);
580  }
581
582  private void setFs() throws IOException {
583    if (this.dfsCluster == null) {
584      LOG.info("Skipping setting fs because dfsCluster is null");
585      return;
586    }
587    FileSystem fs = this.dfsCluster.getFileSystem();
588    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
589
590    // re-enable this check with dfs
591    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
592  }
593
594  // Workaround to avoid IllegalThreadStateException
595  // See HBASE-27148 for more details
596  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
597
598    private volatile boolean stopped = false;
599
600    private final MiniDFSCluster cluster;
601
602    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
603      super("FsDatasetAsyncDiskServiceFixer");
604      setDaemon(true);
605      this.cluster = cluster;
606    }
607
608    @Override
609    public void run() {
610      while (!stopped) {
611        try {
612          Thread.sleep(30000);
613        } catch (InterruptedException e) {
614          Thread.currentThread().interrupt();
615          continue;
616        }
617        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
618        // timeout of the thread pool executor is 60 seconds by default.
619        try {
620          for (DataNode dn : cluster.getDataNodes()) {
621            FsDatasetSpi<?> dataset = dn.getFSDataset();
622            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
623            service.setAccessible(true);
624            Object asyncDiskService = service.get(dataset);
625            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
626            group.setAccessible(true);
627            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
628            if (threadGroup.isDaemon()) {
629              threadGroup.setDaemon(false);
630            }
631          }
632        } catch (NoSuchFieldException e) {
633          LOG.debug("NoSuchFieldException: " + e.getMessage()
634            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
635            + "See HBASE-27595 for details.");
636        } catch (Exception e) {
637          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
638        }
639      }
640    }
641
642    void shutdown() {
643      stopped = true;
644      interrupt();
645    }
646  }
647
648  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
649    throws Exception {
650    createDirsAndSetProperties();
651    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
652
653    this.dfsCluster =
654      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
655    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
656    this.dfsClusterFixer.start();
657    // Set this just-started cluster as our filesystem.
658    setFs();
659
660    // Wait for the cluster to be totally up
661    this.dfsCluster.waitClusterUp();
662
663    // reset the test directory for test file system
664    dataTestDirOnTestFS = null;
665    String dataTestDir = getDataTestDir().toString();
666    conf.set(HConstants.HBASE_DIR, dataTestDir);
667    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
668
669    return this.dfsCluster;
670  }
671
672  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
673    createDirsAndSetProperties();
674    dfsCluster =
675      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
676    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
677    this.dfsClusterFixer.start();
678    return dfsCluster;
679  }
680
681  /**
682   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
683   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
684   * the conf.
685   *
686   * <pre>
687   * Configuration conf = TEST_UTIL.getConfiguration();
688   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
689   *   Map.Entry&lt;String, String&gt; e = i.next();
690   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
691   * }
692   * </pre>
693   */
694  private void createDirsAndSetProperties() throws IOException {
695    setupClusterTestDir();
696    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
697    createDirAndSetProperty("test.cache.data");
698    createDirAndSetProperty("hadoop.tmp.dir");
699    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
700    createDirAndSetProperty("mapreduce.cluster.local.dir");
701    createDirAndSetProperty("mapreduce.cluster.temp.dir");
702    enableShortCircuit();
703
704    Path root = getDataTestDirOnTestFS("hadoop");
705    conf.set(MapreduceTestingShim.getMROutputDirProp(),
706      new Path(root, "mapred-output-dir").toString());
707    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
708    conf.set("mapreduce.jobtracker.staging.root.dir",
709      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
710    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
711    conf.set("yarn.app.mapreduce.am.staging-dir",
712      new Path(root, "mapreduce-am-staging-root-dir").toString());
713
714    // Frustrate yarn's and hdfs's attempts at writing /tmp.
715    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
716    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
717    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
718    createDirAndSetProperty("yarn.nodemanager.log-dirs");
719    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
720    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
721    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
722    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
723    createDirAndSetProperty("dfs.journalnode.edits.dir");
724    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
725    createDirAndSetProperty("nfs.dump.dir");
726    createDirAndSetProperty("java.io.tmpdir");
727    createDirAndSetProperty("dfs.journalnode.edits.dir");
728    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
729    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
730  }
731
732  /**
733   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
734   * Default to false.
735   */
736  public boolean isNewVersionBehaviorEnabled() {
737    final String propName = "hbase.tests.new.version.behavior";
738    String v = System.getProperty(propName);
739    if (v != null) {
740      return Boolean.parseBoolean(v);
741    }
742    return false;
743  }
744
745  /**
746   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
747   * allows to specify this parameter on the command line. If not set, default is true.
748   */
749  public boolean isReadShortCircuitOn() {
750    final String propName = "hbase.tests.use.shortcircuit.reads";
751    String readOnProp = System.getProperty(propName);
752    if (readOnProp != null) {
753      return Boolean.parseBoolean(readOnProp);
754    } else {
755      return conf.getBoolean(propName, false);
756    }
757  }
758
759  /**
760   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
761   * including skipping the hdfs checksum checks.
762   */
763  private void enableShortCircuit() {
764    if (isReadShortCircuitOn()) {
765      String curUser = System.getProperty("user.name");
766      LOG.info("read short circuit is ON for user " + curUser);
767      // read short circuit, for hdfs
768      conf.set("dfs.block.local-path-access.user", curUser);
769      // read short circuit, for hbase
770      conf.setBoolean("dfs.client.read.shortcircuit", true);
771      // Skip checking checksum, for the hdfs client and the datanode
772      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
773    } else {
774      LOG.info("read short circuit is OFF");
775    }
776  }
777
778  private String createDirAndSetProperty(final String property) {
779    return createDirAndSetProperty(property, property);
780  }
781
782  private String createDirAndSetProperty(final String relPath, String property) {
783    String path = getDataTestDir(relPath).toString();
784    System.setProperty(property, path);
785    conf.set(property, path);
786    new File(path).mkdirs();
787    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
788    return path;
789  }
790
791  /**
792   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
793   */
794  public void shutdownMiniDFSCluster() throws IOException {
795    if (this.dfsCluster != null) {
796      // The below throws an exception per dn, AsynchronousCloseException.
797      this.dfsCluster.shutdown();
798      dfsCluster = null;
799      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
800      // have a fixer
801      if (dfsClusterFixer != null) {
802        this.dfsClusterFixer.shutdown();
803        dfsClusterFixer = null;
804      }
805      dataTestDirOnTestFS = null;
806      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
807    }
808  }
809
810  /**
811   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
812   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
813   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
814   * @see #startMiniCluster(StartTestingClusterOption option)
815   * @see #shutdownMiniDFSCluster()
816   */
817  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
818    StartTestingClusterOption option = StartTestingClusterOption.builder()
819      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
820    return startMiniCluster(option);
821  }
822
823  /**
824   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
825   * value can be found in {@link StartTestingClusterOption.Builder}.
826   * @see #startMiniCluster(StartTestingClusterOption option)
827   * @see #shutdownMiniDFSCluster()
828   */
829  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
830    return startMiniCluster(StartTestingClusterOption.builder().build());
831  }
832
833  /**
834   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
835   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
836   * under System property test.build.data, to be cleaned up on exit.
837   * @see #shutdownMiniDFSCluster()
838   */
839  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
840    throws Exception {
841    LOG.info("Starting up minicluster with option: {}", option);
842
843    // If we already put up a cluster, fail.
844    if (miniClusterRunning) {
845      throw new IllegalStateException("A mini-cluster is already running");
846    }
847    miniClusterRunning = true;
848
849    setupClusterTestDir();
850
851    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
852    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
853    if (dfsCluster == null) {
854      LOG.info("STARTING DFS");
855      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
856    } else {
857      LOG.info("NOT STARTING DFS");
858    }
859
860    // Start up a zk cluster.
861    if (getZkCluster() == null) {
862      startMiniZKCluster(option.getNumZkServers());
863    }
864
865    // Start the MiniHBaseCluster
866    return startMiniHBaseCluster(option);
867  }
868
869  /**
870   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
871   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
872   * @return Reference to the hbase mini hbase cluster.
873   * @see #startMiniCluster(StartTestingClusterOption)
874   * @see #shutdownMiniHBaseCluster()
875   */
876  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
877    throws IOException, InterruptedException {
878    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
879    createRootDir(option.isCreateRootDir());
880    if (option.isCreateWALDir()) {
881      createWALRootDir();
882    }
883    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
884    // for tests that do not read hbase-defaults.xml
885    setHBaseFsTmpDir();
886
887    // These settings will make the server waits until this exact number of
888    // regions servers are connected.
889    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
890      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
891    }
892    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
893      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
894    }
895
896    Configuration c = new Configuration(this.conf);
897    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
898      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
899      option.getMasterClass(), option.getRsClass());
900    // Populate the master address configuration from mini cluster configuration.
901    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
902    // Don't leave here till we've done a successful scan of the hbase:meta
903    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
904      ResultScanner s = t.getScanner(new Scan())) {
905      for (;;) {
906        if (s.next() == null) {
907          break;
908        }
909      }
910    }
911
912    getAdmin(); // create immediately the hbaseAdmin
913    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
914
915    return (SingleProcessHBaseCluster) hbaseCluster;
916  }
917
918  /**
919   * Starts up mini hbase cluster using default options. Default options can be found in
920   * {@link StartTestingClusterOption.Builder}.
921   * @see #startMiniHBaseCluster(StartTestingClusterOption)
922   * @see #shutdownMiniHBaseCluster()
923   */
924  public SingleProcessHBaseCluster startMiniHBaseCluster()
925    throws IOException, InterruptedException {
926    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
927  }
928
929  /**
930   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
931   * {@link #startMiniCluster()}. All other options will use default values, defined in
932   * {@link StartTestingClusterOption.Builder}.
933   * @param numMasters       Master node number.
934   * @param numRegionServers Number of region servers.
935   * @return The mini HBase cluster created.
936   * @see #shutdownMiniHBaseCluster()
937   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
938   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
939   * @see #startMiniHBaseCluster(StartTestingClusterOption)
940   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
941   */
942  @Deprecated
943  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
944    throws IOException, InterruptedException {
945    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
946      .numRegionServers(numRegionServers).build();
947    return startMiniHBaseCluster(option);
948  }
949
950  /**
951   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
952   * {@link #startMiniCluster()}. All other options will use default values, defined in
953   * {@link StartTestingClusterOption.Builder}.
954   * @param numMasters       Master node number.
955   * @param numRegionServers Number of region servers.
956   * @param rsPorts          Ports that RegionServer should use.
957   * @return The mini HBase cluster created.
958   * @see #shutdownMiniHBaseCluster()
959   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
960   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
961   * @see #startMiniHBaseCluster(StartTestingClusterOption)
962   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
963   */
964  @Deprecated
965  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
966    List<Integer> rsPorts) throws IOException, InterruptedException {
967    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
968      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
969    return startMiniHBaseCluster(option);
970  }
971
972  /**
973   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
974   * {@link #startMiniCluster()}. All other options will use default values, defined in
975   * {@link StartTestingClusterOption.Builder}.
976   * @param numMasters       Master node number.
977   * @param numRegionServers Number of region servers.
978   * @param rsPorts          Ports that RegionServer should use.
979   * @param masterClass      The class to use as HMaster, or null for default.
980   * @param rsClass          The class to use as HRegionServer, or null for default.
981   * @param createRootDir    Whether to create a new root or data directory path.
982   * @param createWALDir     Whether to create a new WAL directory.
983   * @return The mini HBase cluster created.
984   * @see #shutdownMiniHBaseCluster()
985   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
986   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
987   * @see #startMiniHBaseCluster(StartTestingClusterOption)
988   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
989   */
990  @Deprecated
991  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
992    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
993    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
994    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
995    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
996      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
997      .createRootDir(createRootDir).createWALDir(createWALDir).build();
998    return startMiniHBaseCluster(option);
999  }
1000
1001  /**
1002   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1003   * want to keep dfs/zk up and just stop/start hbase.
1004   * @param servers number of region servers
1005   */
1006  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1007    this.restartHBaseCluster(servers, null);
1008  }
1009
1010  public void restartHBaseCluster(int servers, List<Integer> ports)
1011    throws IOException, InterruptedException {
1012    StartTestingClusterOption option =
1013      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1014    restartHBaseCluster(option);
1015    invalidateConnection();
1016  }
1017
1018  public void restartHBaseCluster(StartTestingClusterOption option)
1019    throws IOException, InterruptedException {
1020    closeConnection();
1021    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1022      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1023      option.getMasterClass(), option.getRsClass());
1024    // Don't leave here till we've done a successful scan of the hbase:meta
1025    Connection conn = ConnectionFactory.createConnection(this.conf);
1026    Table t = conn.getTable(TableName.META_TABLE_NAME);
1027    ResultScanner s = t.getScanner(new Scan());
1028    while (s.next() != null) {
1029      // do nothing
1030    }
1031    LOG.info("HBase has been restarted");
1032    s.close();
1033    t.close();
1034    conn.close();
1035  }
1036
1037  /**
1038   * Returns current mini hbase cluster. Only has something in it after a call to
1039   * {@link #startMiniCluster()}.
1040   * @see #startMiniCluster()
1041   */
1042  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1043    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1044      return (SingleProcessHBaseCluster) this.hbaseCluster;
1045    }
1046    throw new RuntimeException(
1047      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1048  }
1049
1050  /**
1051   * Stops mini hbase, zk, and hdfs clusters.
1052   * @see #startMiniCluster(int)
1053   */
1054  public void shutdownMiniCluster() throws IOException {
1055    LOG.info("Shutting down minicluster");
1056    shutdownMiniHBaseCluster();
1057    shutdownMiniDFSCluster();
1058    shutdownMiniZKCluster();
1059
1060    cleanupTestDir();
1061    miniClusterRunning = false;
1062    LOG.info("Minicluster is down");
1063  }
1064
1065  /**
1066   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1067   * @throws java.io.IOException in case command is unsuccessful
1068   */
1069  public void shutdownMiniHBaseCluster() throws IOException {
1070    cleanup();
1071    if (this.hbaseCluster != null) {
1072      this.hbaseCluster.shutdown();
1073      // Wait till hbase is down before going on to shutdown zk.
1074      this.hbaseCluster.waitUntilShutDown();
1075      this.hbaseCluster = null;
1076    }
1077    if (zooKeeperWatcher != null) {
1078      zooKeeperWatcher.close();
1079      zooKeeperWatcher = null;
1080    }
1081  }
1082
1083  /**
1084   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1085   * @throws java.io.IOException throws in case command is unsuccessful
1086   */
1087  public void killMiniHBaseCluster() throws IOException {
1088    cleanup();
1089    if (this.hbaseCluster != null) {
1090      getMiniHBaseCluster().killAll();
1091      this.hbaseCluster = null;
1092    }
1093    if (zooKeeperWatcher != null) {
1094      zooKeeperWatcher.close();
1095      zooKeeperWatcher = null;
1096    }
1097  }
1098
1099  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1100  private void cleanup() throws IOException {
1101    closeConnection();
1102    // unset the configuration for MIN and MAX RS to start
1103    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1104    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1105  }
1106
1107  /**
1108   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1109   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1110   * If false, previous path is used. Note: this does not cause the root dir to be created.
1111   * @return Fully qualified path for the default hbase root dir
1112   */
1113  public Path getDefaultRootDirPath(boolean create) throws IOException {
1114    if (!create) {
1115      return getDataTestDirOnTestFS();
1116    } else {
1117      return getNewDataTestDirOnTestFS();
1118    }
1119  }
1120
1121  /**
1122   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1123   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1124   * @return Fully qualified path for the default hbase root dir
1125   */
1126  public Path getDefaultRootDirPath() throws IOException {
1127    return getDefaultRootDirPath(false);
1128  }
1129
1130  /**
1131   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1132   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1133   * startup. You'd only use this method if you were doing manual operation.
1134   * @param create This flag decides whether to get a new root or data directory path or not, if it
1135   *               has been fetched already. Note : Directory will be made irrespective of whether
1136   *               path has been fetched or not. If directory already exists, it will be overwritten
1137   * @return Fully qualified path to hbase root dir
1138   */
1139  public Path createRootDir(boolean create) throws IOException {
1140    FileSystem fs = FileSystem.get(this.conf);
1141    Path hbaseRootdir = getDefaultRootDirPath(create);
1142    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1143    fs.mkdirs(hbaseRootdir);
1144    FSUtils.setVersion(fs, hbaseRootdir);
1145    return hbaseRootdir;
1146  }
1147
1148  /**
1149   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1150   * flag is false.
1151   * @return Fully qualified path to hbase root dir
1152   */
1153  public Path createRootDir() throws IOException {
1154    return createRootDir(false);
1155  }
1156
1157  /**
1158   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1159   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1160   * this method if you were doing manual operation.
1161   * @return Fully qualified path to hbase root dir
1162   */
1163  public Path createWALRootDir() throws IOException {
1164    FileSystem fs = FileSystem.get(this.conf);
1165    Path walDir = getNewDataTestDirOnTestFS();
1166    CommonFSUtils.setWALRootDir(this.conf, walDir);
1167    fs.mkdirs(walDir);
1168    return walDir;
1169  }
1170
1171  private void setHBaseFsTmpDir() throws IOException {
1172    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1173    if (hbaseFsTmpDirInString == null) {
1174      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1175      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1176    } else {
1177      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1178    }
1179  }
1180
1181  /**
1182   * Flushes all caches in the mini hbase cluster
1183   */
1184  public void flush() throws IOException {
1185    getMiniHBaseCluster().flushcache();
1186  }
1187
1188  /**
1189   * Flushes all caches in the mini hbase cluster
1190   */
1191  public void flush(TableName tableName) throws IOException {
1192    getMiniHBaseCluster().flushcache(tableName);
1193  }
1194
1195  /**
1196   * Compact all regions in the mini hbase cluster
1197   */
1198  public void compact(boolean major) throws IOException {
1199    getMiniHBaseCluster().compact(major);
1200  }
1201
1202  /**
1203   * Compact all of a table's reagion in the mini hbase cluster
1204   */
1205  public void compact(TableName tableName, boolean major) throws IOException {
1206    getMiniHBaseCluster().compact(tableName, major);
1207  }
1208
1209  /**
1210   * Create a table.
1211   * @return A Table instance for the created table.
1212   */
1213  public Table createTable(TableName tableName, String family) throws IOException {
1214    return createTable(tableName, new String[] { family });
1215  }
1216
1217  /**
1218   * Create a table.
1219   * @return A Table instance for the created table.
1220   */
1221  public Table createTable(TableName tableName, String[] families) throws IOException {
1222    List<byte[]> fams = new ArrayList<>(families.length);
1223    for (String family : families) {
1224      fams.add(Bytes.toBytes(family));
1225    }
1226    return createTable(tableName, fams.toArray(new byte[0][]));
1227  }
1228
1229  /**
1230   * Create a table.
1231   * @return A Table instance for the created table.
1232   */
1233  public Table createTable(TableName tableName, byte[] family) throws IOException {
1234    return createTable(tableName, new byte[][] { family });
1235  }
1236
1237  /**
1238   * Create a table with multiple regions.
1239   * @return A Table instance for the created table.
1240   */
1241  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1242    throws IOException {
1243    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1244    byte[] startKey = Bytes.toBytes("aaaaa");
1245    byte[] endKey = Bytes.toBytes("zzzzz");
1246    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1247
1248    return createTable(tableName, new byte[][] { family }, splitKeys);
1249  }
1250
1251  /**
1252   * Create a table.
1253   * @return A Table instance for the created table.
1254   */
1255  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1256    return createTable(tableName, families, (byte[][]) null);
1257  }
1258
1259  /**
1260   * Create a table with multiple regions.
1261   * @return A Table instance for the created table.
1262   */
1263  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1264    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1265  }
1266
1267  /**
1268   * Create a table with multiple regions.
1269   * @param replicaCount replica count.
1270   * @return A Table instance for the created table.
1271   */
1272  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1273    throws IOException {
1274    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1275  }
1276
1277  /**
1278   * Create a table.
1279   * @return A Table instance for the created table.
1280   */
1281  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1282    throws IOException {
1283    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1284  }
1285
1286  /**
1287   * Create a table.
1288   * @param tableName    the table name
1289   * @param families     the families
1290   * @param splitKeys    the splitkeys
1291   * @param replicaCount the region replica count
1292   * @return A Table instance for the created table.
1293   * @throws IOException throws IOException
1294   */
1295  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1296    int replicaCount) throws IOException {
1297    return createTable(tableName, families, splitKeys, replicaCount,
1298      new Configuration(getConfiguration()));
1299  }
1300
1301  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1302    byte[] endKey, int numRegions) throws IOException {
1303    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1304
1305    getAdmin().createTable(desc, startKey, endKey, numRegions);
1306    // HBaseAdmin only waits for regions to appear in hbase:meta we
1307    // should wait until they are assigned
1308    waitUntilAllRegionsAssigned(tableName);
1309    return getConnection().getTable(tableName);
1310  }
1311
1312  /**
1313   * Create a table.
1314   * @param c Configuration to use
1315   * @return A Table instance for the created table.
1316   */
1317  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1318    throws IOException {
1319    return createTable(htd, families, null, c);
1320  }
1321
1322  /**
1323   * Create a table.
1324   * @param htd       table descriptor
1325   * @param families  array of column families
1326   * @param splitKeys array of split keys
1327   * @param c         Configuration to use
1328   * @return A Table instance for the created table.
1329   * @throws IOException if getAdmin or createTable fails
1330   */
1331  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1332    Configuration c) throws IOException {
1333    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1334    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1335    // on is interfering.
1336    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1337  }
1338
1339  /**
1340   * Create a table.
1341   * @param htd       table descriptor
1342   * @param families  array of column families
1343   * @param splitKeys array of split keys
1344   * @param type      Bloom type
1345   * @param blockSize block size
1346   * @param c         Configuration to use
1347   * @return A Table instance for the created table.
1348   * @throws IOException if getAdmin or createTable fails
1349   */
1350
1351  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1352    BloomType type, int blockSize, Configuration c) throws IOException {
1353    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1354    for (byte[] family : families) {
1355      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1356        .setBloomFilterType(type).setBlocksize(blockSize);
1357      if (isNewVersionBehaviorEnabled()) {
1358        cfdb.setNewVersionBehavior(true);
1359      }
1360      builder.setColumnFamily(cfdb.build());
1361    }
1362    TableDescriptor td = builder.build();
1363    if (splitKeys != null) {
1364      getAdmin().createTable(td, splitKeys);
1365    } else {
1366      getAdmin().createTable(td);
1367    }
1368    // HBaseAdmin only waits for regions to appear in hbase:meta
1369    // we should wait until they are assigned
1370    waitUntilAllRegionsAssigned(td.getTableName());
1371    return getConnection().getTable(td.getTableName());
1372  }
1373
1374  /**
1375   * Create a table.
1376   * @param htd       table descriptor
1377   * @param splitRows array of split keys
1378   * @return A Table instance for the created table.
1379   */
1380  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1381    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1382    if (isNewVersionBehaviorEnabled()) {
1383      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1384        builder.setColumnFamily(
1385          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1386      }
1387    }
1388    if (splitRows != null) {
1389      getAdmin().createTable(builder.build(), splitRows);
1390    } else {
1391      getAdmin().createTable(builder.build());
1392    }
1393    // HBaseAdmin only waits for regions to appear in hbase:meta
1394    // we should wait until they are assigned
1395    waitUntilAllRegionsAssigned(htd.getTableName());
1396    return getConnection().getTable(htd.getTableName());
1397  }
1398
1399  /**
1400   * Create a table.
1401   * @param tableName    the table name
1402   * @param families     the families
1403   * @param splitKeys    the split keys
1404   * @param replicaCount the replica count
1405   * @param c            Configuration to use
1406   * @return A Table instance for the created table.
1407   */
1408  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1409    int replicaCount, final Configuration c) throws IOException {
1410    TableDescriptor htd =
1411      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1412    return createTable(htd, families, splitKeys, c);
1413  }
1414
1415  /**
1416   * Create a table.
1417   * @return A Table instance for the created table.
1418   */
1419  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1420    return createTable(tableName, new byte[][] { family }, numVersions);
1421  }
1422
1423  /**
1424   * Create a table.
1425   * @return A Table instance for the created table.
1426   */
1427  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1428    throws IOException {
1429    return createTable(tableName, families, numVersions, (byte[][]) null);
1430  }
1431
1432  /**
1433   * Create a table.
1434   * @return A Table instance for the created table.
1435   */
1436  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1437    byte[][] splitKeys) throws IOException {
1438    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1439    for (byte[] family : families) {
1440      ColumnFamilyDescriptorBuilder cfBuilder =
1441        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1442      if (isNewVersionBehaviorEnabled()) {
1443        cfBuilder.setNewVersionBehavior(true);
1444      }
1445      builder.setColumnFamily(cfBuilder.build());
1446    }
1447    if (splitKeys != null) {
1448      getAdmin().createTable(builder.build(), splitKeys);
1449    } else {
1450      getAdmin().createTable(builder.build());
1451    }
1452    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1453    // assigned
1454    waitUntilAllRegionsAssigned(tableName);
1455    return getConnection().getTable(tableName);
1456  }
1457
1458  /**
1459   * Create a table with multiple regions.
1460   * @return A Table instance for the created table.
1461   */
1462  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1463    throws IOException {
1464    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1465  }
1466
1467  /**
1468   * Create a table.
1469   * @return A Table instance for the created table.
1470   */
1471  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1472    throws IOException {
1473    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1474    for (byte[] family : families) {
1475      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1476        .setMaxVersions(numVersions).setBlocksize(blockSize);
1477      if (isNewVersionBehaviorEnabled()) {
1478        cfBuilder.setNewVersionBehavior(true);
1479      }
1480      builder.setColumnFamily(cfBuilder.build());
1481    }
1482    getAdmin().createTable(builder.build());
1483    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1484    // assigned
1485    waitUntilAllRegionsAssigned(tableName);
1486    return getConnection().getTable(tableName);
1487  }
1488
1489  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1490    String cpName) throws IOException {
1491    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1492    for (byte[] family : families) {
1493      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1494        .setMaxVersions(numVersions).setBlocksize(blockSize);
1495      if (isNewVersionBehaviorEnabled()) {
1496        cfBuilder.setNewVersionBehavior(true);
1497      }
1498      builder.setColumnFamily(cfBuilder.build());
1499    }
1500    if (cpName != null) {
1501      builder.setCoprocessor(cpName);
1502    }
1503    getAdmin().createTable(builder.build());
1504    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1505    // assigned
1506    waitUntilAllRegionsAssigned(tableName);
1507    return getConnection().getTable(tableName);
1508  }
1509
1510  /**
1511   * Create a table.
1512   * @return A Table instance for the created table.
1513   */
1514  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1515    throws IOException {
1516    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1517    int i = 0;
1518    for (byte[] family : families) {
1519      ColumnFamilyDescriptorBuilder cfBuilder =
1520        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1521      if (isNewVersionBehaviorEnabled()) {
1522        cfBuilder.setNewVersionBehavior(true);
1523      }
1524      builder.setColumnFamily(cfBuilder.build());
1525      i++;
1526    }
1527    getAdmin().createTable(builder.build());
1528    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1529    // assigned
1530    waitUntilAllRegionsAssigned(tableName);
1531    return getConnection().getTable(tableName);
1532  }
1533
1534  /**
1535   * Create a table.
1536   * @return A Table instance for the created table.
1537   */
1538  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1539    throws IOException {
1540    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1541    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1542    if (isNewVersionBehaviorEnabled()) {
1543      cfBuilder.setNewVersionBehavior(true);
1544    }
1545    builder.setColumnFamily(cfBuilder.build());
1546    getAdmin().createTable(builder.build(), splitRows);
1547    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1548    // assigned
1549    waitUntilAllRegionsAssigned(tableName);
1550    return getConnection().getTable(tableName);
1551  }
1552
1553  /**
1554   * Create a table with multiple regions.
1555   * @return A Table instance for the created table.
1556   */
1557  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1558    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1559  }
1560
1561  /**
1562   * Set the number of Region replicas.
1563   */
1564  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1565    throws IOException, InterruptedException {
1566    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1567      .setRegionReplication(replicaCount).build();
1568    admin.modifyTable(desc);
1569  }
1570
1571  /**
1572   * Set the number of Region replicas.
1573   */
1574  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1575    throws ExecutionException, IOException, InterruptedException {
1576    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1577      .setRegionReplication(replicaCount).build();
1578    admin.modifyTable(desc).get();
1579  }
1580
1581  /**
1582   * Drop an existing table
1583   * @param tableName existing table
1584   */
1585  public void deleteTable(TableName tableName) throws IOException {
1586    try {
1587      getAdmin().disableTable(tableName);
1588    } catch (TableNotEnabledException e) {
1589      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1590    }
1591    getAdmin().deleteTable(tableName);
1592  }
1593
1594  /**
1595   * Drop an existing table
1596   * @param tableName existing table
1597   */
1598  public void deleteTableIfAny(TableName tableName) throws IOException {
1599    try {
1600      deleteTable(tableName);
1601    } catch (TableNotFoundException e) {
1602      // ignore
1603    }
1604  }
1605
1606  // ==========================================================================
1607  // Canned table and table descriptor creation
1608
1609  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1610  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1611  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1612  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1613  private static final int MAXVERSIONS = 3;
1614
1615  public static final char FIRST_CHAR = 'a';
1616  public static final char LAST_CHAR = 'z';
1617  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1618  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1619
1620  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1621    return createModifyableTableDescriptor(TableName.valueOf(name),
1622      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1623      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1624  }
1625
1626  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1627    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1628    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1629    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1630      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1631        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1632        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1633      if (isNewVersionBehaviorEnabled()) {
1634        cfBuilder.setNewVersionBehavior(true);
1635      }
1636      builder.setColumnFamily(cfBuilder.build());
1637    }
1638    return builder.build();
1639  }
1640
1641  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1642    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1643    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1644    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1645      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1646        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1647        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1648      if (isNewVersionBehaviorEnabled()) {
1649        cfBuilder.setNewVersionBehavior(true);
1650      }
1651      builder.setColumnFamily(cfBuilder.build());
1652    }
1653    return builder;
1654  }
1655
1656  /**
1657   * Create a table of name <code>name</code>.
1658   * @param name Name to give table.
1659   * @return Column descriptor.
1660   */
1661  public TableDescriptor createTableDescriptor(final TableName name) {
1662    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1663      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1664  }
1665
1666  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1667    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1668  }
1669
1670  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1671    int maxVersions) {
1672    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1673    for (byte[] family : families) {
1674      ColumnFamilyDescriptorBuilder cfBuilder =
1675        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1676      if (isNewVersionBehaviorEnabled()) {
1677        cfBuilder.setNewVersionBehavior(true);
1678      }
1679      builder.setColumnFamily(cfBuilder.build());
1680    }
1681    return builder.build();
1682  }
1683
1684  /**
1685   * Create an HRegion that writes to the local tmp dirs
1686   * @param desc     a table descriptor indicating which table the region belongs to
1687   * @param startKey the start boundary of the region
1688   * @param endKey   the end boundary of the region
1689   * @return a region that writes to local dir for testing
1690   */
1691  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1692    throws IOException {
1693    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1694      .setEndKey(endKey).build();
1695    return createLocalHRegion(hri, desc);
1696  }
1697
1698  /**
1699   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1700   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1701   */
1702  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1703    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1704  }
1705
1706  /**
1707   * Create an HRegion that writes to the local tmp dirs with specified wal
1708   * @param info regioninfo
1709   * @param conf configuration
1710   * @param desc table descriptor
1711   * @param wal  wal for this region.
1712   * @return created hregion
1713   */
1714  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1715    WAL wal) throws IOException {
1716    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1717      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1718    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1719  }
1720
1721  /**
1722   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1723   *         when done.
1724   */
1725  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1726    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1727    throws IOException {
1728    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1729      durability, wal, null, families);
1730  }
1731
1732  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1733    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1734    boolean[] compactedMemStore, byte[]... families) throws IOException {
1735    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1736    builder.setReadOnly(isReadOnly);
1737    int i = 0;
1738    for (byte[] family : families) {
1739      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1740      if (compactedMemStore != null && i < compactedMemStore.length) {
1741        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1742      } else {
1743        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1744
1745      }
1746      i++;
1747      // Set default to be three versions.
1748      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1749      builder.setColumnFamily(cfBuilder.build());
1750    }
1751    builder.setDurability(durability);
1752    RegionInfo info =
1753      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1754    return createLocalHRegion(info, conf, builder.build(), wal);
1755  }
1756
1757  //
1758  // ==========================================================================
1759
1760  /**
1761   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1762   * read.
1763   * @param tableName existing table
1764   * @return HTable to that new table
1765   */
1766  public Table deleteTableData(TableName tableName) throws IOException {
1767    Table table = getConnection().getTable(tableName);
1768    Scan scan = new Scan();
1769    ResultScanner resScan = table.getScanner(scan);
1770    for (Result res : resScan) {
1771      Delete del = new Delete(res.getRow());
1772      table.delete(del);
1773    }
1774    resScan = table.getScanner(scan);
1775    resScan.close();
1776    return table;
1777  }
1778
1779  /**
1780   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1781   * table.
1782   * @param tableName       table which must exist.
1783   * @param preserveRegions keep the existing split points
1784   * @return HTable for the new table
1785   */
1786  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1787    throws IOException {
1788    Admin admin = getAdmin();
1789    if (!admin.isTableDisabled(tableName)) {
1790      admin.disableTable(tableName);
1791    }
1792    admin.truncateTable(tableName, preserveRegions);
1793    return getConnection().getTable(tableName);
1794  }
1795
1796  /**
1797   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1798   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1799   * preserve regions of existing table.
1800   * @param tableName table which must exist.
1801   * @return HTable for the new table
1802   */
1803  public Table truncateTable(final TableName tableName) throws IOException {
1804    return truncateTable(tableName, false);
1805  }
1806
1807  /**
1808   * Load table with rows from 'aaa' to 'zzz'.
1809   * @param t Table
1810   * @param f Family
1811   * @return Count of rows loaded.
1812   */
1813  public int loadTable(final Table t, final byte[] f) throws IOException {
1814    return loadTable(t, new byte[][] { f });
1815  }
1816
1817  /**
1818   * Load table with rows from 'aaa' to 'zzz'.
1819   * @param t Table
1820   * @param f Family
1821   * @return Count of rows loaded.
1822   */
1823  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1824    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1825  }
1826
1827  /**
1828   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1829   * @param t Table
1830   * @param f Array of Families to load
1831   * @return Count of rows loaded.
1832   */
1833  public int loadTable(final Table t, final byte[][] f) throws IOException {
1834    return loadTable(t, f, null);
1835  }
1836
1837  /**
1838   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1839   * @param t     Table
1840   * @param f     Array of Families to load
1841   * @param value the values of the cells. If null is passed, the row key is used as value
1842   * @return Count of rows loaded.
1843   */
1844  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1845    return loadTable(t, f, value, true);
1846  }
1847
1848  /**
1849   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1850   * @param t     Table
1851   * @param f     Array of Families to load
1852   * @param value the values of the cells. If null is passed, the row key is used as value
1853   * @return Count of rows loaded.
1854   */
1855  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1856    throws IOException {
1857    List<Put> puts = new ArrayList<>();
1858    for (byte[] row : HBaseTestingUtil.ROWS) {
1859      Put put = new Put(row);
1860      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1861      for (int i = 0; i < f.length; i++) {
1862        byte[] value1 = value != null ? value : row;
1863        put.addColumn(f[i], f[i], value1);
1864      }
1865      puts.add(put);
1866    }
1867    t.put(puts);
1868    return puts.size();
1869  }
1870
1871  /**
1872   * A tracker for tracking and validating table rows generated with
1873   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1874   */
1875  public static class SeenRowTracker {
1876    int dim = 'z' - 'a' + 1;
1877    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1878    byte[] startRow;
1879    byte[] stopRow;
1880
1881    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1882      this.startRow = startRow;
1883      this.stopRow = stopRow;
1884    }
1885
1886    void reset() {
1887      for (byte[] row : ROWS) {
1888        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1889      }
1890    }
1891
1892    int i(byte b) {
1893      return b - 'a';
1894    }
1895
1896    public void addRow(byte[] row) {
1897      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1898    }
1899
1900    /**
1901     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1902     * rows none
1903     */
1904    public void validate() {
1905      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1906        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1907          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1908            int count = seenRows[i(b1)][i(b2)][i(b3)];
1909            int expectedCount = 0;
1910            if (
1911              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1912                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1913            ) {
1914              expectedCount = 1;
1915            }
1916            if (count != expectedCount) {
1917              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1918              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1919                + "instead of " + expectedCount);
1920            }
1921          }
1922        }
1923      }
1924    }
1925  }
1926
1927  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1928    return loadRegion(r, f, false);
1929  }
1930
1931  public int loadRegion(final Region r, final byte[] f) throws IOException {
1932    return loadRegion((HRegion) r, f);
1933  }
1934
1935  /**
1936   * Load region with rows from 'aaa' to 'zzz'.
1937   * @param r     Region
1938   * @param f     Family
1939   * @param flush flush the cache if true
1940   * @return Count of rows loaded.
1941   */
1942  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1943    byte[] k = new byte[3];
1944    int rowCount = 0;
1945    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1946      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1947        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1948          k[0] = b1;
1949          k[1] = b2;
1950          k[2] = b3;
1951          Put put = new Put(k);
1952          put.setDurability(Durability.SKIP_WAL);
1953          put.addColumn(f, null, k);
1954          if (r.getWAL() == null) {
1955            put.setDurability(Durability.SKIP_WAL);
1956          }
1957          int preRowCount = rowCount;
1958          int pause = 10;
1959          int maxPause = 1000;
1960          while (rowCount == preRowCount) {
1961            try {
1962              r.put(put);
1963              rowCount++;
1964            } catch (RegionTooBusyException e) {
1965              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1966              Threads.sleep(pause);
1967            }
1968          }
1969        }
1970      }
1971      if (flush) {
1972        r.flush(true);
1973      }
1974    }
1975    return rowCount;
1976  }
1977
1978  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1979    throws IOException {
1980    for (int i = startRow; i < endRow; i++) {
1981      byte[] data = Bytes.toBytes(String.valueOf(i));
1982      Put put = new Put(data);
1983      put.addColumn(f, null, data);
1984      t.put(put);
1985    }
1986  }
1987
1988  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1989    throws IOException {
1990    for (int i = 0; i < totalRows; i++) {
1991      byte[] row = new byte[rowSize];
1992      Bytes.random(row);
1993      Put put = new Put(row);
1994      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1995      t.put(put);
1996    }
1997  }
1998
1999  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2000    int replicaId) throws IOException {
2001    for (int i = startRow; i < endRow; i++) {
2002      String failMsg = "Failed verification of row :" + i;
2003      byte[] data = Bytes.toBytes(String.valueOf(i));
2004      Get get = new Get(data);
2005      get.setReplicaId(replicaId);
2006      get.setConsistency(Consistency.TIMELINE);
2007      Result result = table.get(get);
2008      assertTrue(failMsg, result.containsColumn(f, null));
2009      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2010      Cell cell = result.getColumnLatestCell(f, null);
2011      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2012        cell.getValueOffset(), cell.getValueLength()));
2013    }
2014  }
2015
2016  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2017    throws IOException {
2018    verifyNumericRows((HRegion) region, f, startRow, endRow);
2019  }
2020
2021  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2022    throws IOException {
2023    verifyNumericRows(region, f, startRow, endRow, true);
2024  }
2025
2026  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2027    final boolean present) throws IOException {
2028    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2029  }
2030
2031  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2032    final boolean present) throws IOException {
2033    for (int i = startRow; i < endRow; i++) {
2034      String failMsg = "Failed verification of row :" + i;
2035      byte[] data = Bytes.toBytes(String.valueOf(i));
2036      Result result = region.get(new Get(data));
2037
2038      boolean hasResult = result != null && !result.isEmpty();
2039      assertEquals(failMsg + result, present, hasResult);
2040      if (!present) continue;
2041
2042      assertTrue(failMsg, result.containsColumn(f, null));
2043      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2044      Cell cell = result.getColumnLatestCell(f, null);
2045      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2046        cell.getValueOffset(), cell.getValueLength()));
2047    }
2048  }
2049
2050  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2051    throws IOException {
2052    for (int i = startRow; i < endRow; i++) {
2053      byte[] data = Bytes.toBytes(String.valueOf(i));
2054      Delete delete = new Delete(data);
2055      delete.addFamily(f);
2056      t.delete(delete);
2057    }
2058  }
2059
2060  /**
2061   * Return the number of rows in the given table.
2062   * @param table to count rows
2063   * @return count of rows
2064   */
2065  public static int countRows(final Table table) throws IOException {
2066    return countRows(table, new Scan());
2067  }
2068
2069  public static int countRows(final Table table, final Scan scan) throws IOException {
2070    try (ResultScanner results = table.getScanner(scan)) {
2071      int count = 0;
2072      while (results.next() != null) {
2073        count++;
2074      }
2075      return count;
2076    }
2077  }
2078
2079  public static int countRows(final Table table, final byte[]... families) throws IOException {
2080    Scan scan = new Scan();
2081    for (byte[] family : families) {
2082      scan.addFamily(family);
2083    }
2084    return countRows(table, scan);
2085  }
2086
2087  /**
2088   * Return the number of rows in the given table.
2089   */
2090  public int countRows(final TableName tableName) throws IOException {
2091    try (Table table = getConnection().getTable(tableName)) {
2092      return countRows(table);
2093    }
2094  }
2095
2096  public static int countRows(final Region region) throws IOException {
2097    return countRows(region, new Scan());
2098  }
2099
2100  public static int countRows(final Region region, final Scan scan) throws IOException {
2101    try (InternalScanner scanner = region.getScanner(scan)) {
2102      return countRows(scanner);
2103    }
2104  }
2105
2106  public static int countRows(final InternalScanner scanner) throws IOException {
2107    int scannedCount = 0;
2108    List<Cell> results = new ArrayList<>();
2109    boolean hasMore = true;
2110    while (hasMore) {
2111      hasMore = scanner.next(results);
2112      scannedCount += results.size();
2113      results.clear();
2114    }
2115    return scannedCount;
2116  }
2117
2118  /**
2119   * Return an md5 digest of the entire contents of a table.
2120   */
2121  public String checksumRows(final Table table) throws Exception {
2122    MessageDigest digest = MessageDigest.getInstance("MD5");
2123    try (ResultScanner results = table.getScanner(new Scan())) {
2124      for (Result res : results) {
2125        digest.update(res.getRow());
2126      }
2127    }
2128    return digest.toString();
2129  }
2130
2131  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2132  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2133  static {
2134    int i = 0;
2135    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2136      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2137        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2138          ROWS[i][0] = b1;
2139          ROWS[i][1] = b2;
2140          ROWS[i][2] = b3;
2141          i++;
2142        }
2143      }
2144    }
2145  }
2146
2147  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2148    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2149    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2150    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2151    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2152    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2153    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2154
2155  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2156    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2157    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2158    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2159    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2160    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2161    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2162
2163  /**
2164   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2165   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2166   * @return list of region info for regions added to meta
2167   */
2168  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2169    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2170    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2171      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2172      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2173      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2174        TableState.State.ENABLED);
2175      // add custom ones
2176      for (int i = 0; i < startKeys.length; i++) {
2177        int j = (i + 1) % startKeys.length;
2178        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2179          .setEndKey(startKeys[j]).build();
2180        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2181        newRegions.add(hri);
2182      }
2183      return newRegions;
2184    }
2185  }
2186
2187  /**
2188   * Create an unmanaged WAL. Be sure to close it when you're through.
2189   */
2190  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2191    throws IOException {
2192    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2193    // unless I pass along via the conf.
2194    Configuration confForWAL = new Configuration(conf);
2195    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2196    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2197  }
2198
2199  /**
2200   * Create a region with it's own WAL. Be sure to call
2201   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2202   */
2203  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2204    final Configuration conf, final TableDescriptor htd) throws IOException {
2205    return createRegionAndWAL(info, rootDir, conf, htd, true);
2206  }
2207
2208  /**
2209   * Create a region with it's own WAL. Be sure to call
2210   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2211   */
2212  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2213    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2214    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2215    region.setBlockCache(blockCache);
2216    region.initialize();
2217    return region;
2218  }
2219
2220  /**
2221   * Create a region with it's own WAL. Be sure to call
2222   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2223   */
2224  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2225    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2226    throws IOException {
2227    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2228    region.setMobFileCache(mobFileCache);
2229    region.initialize();
2230    return region;
2231  }
2232
2233  /**
2234   * Create a region with it's own WAL. Be sure to call
2235   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2236   */
2237  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2238    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2239    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2240      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2241    WAL wal = createWal(conf, rootDir, info);
2242    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2243  }
2244
2245  /**
2246   * Find any other region server which is different from the one identified by parameter
2247   * @return another region server
2248   */
2249  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2250    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2251      if (!(rst.getRegionServer() == rs)) {
2252        return rst.getRegionServer();
2253      }
2254    }
2255    return null;
2256  }
2257
2258  /**
2259   * Tool to get the reference to the region server object that holds the region of the specified
2260   * user table.
2261   * @param tableName user table to lookup in hbase:meta
2262   * @return region server that holds it, null if the row doesn't exist
2263   */
2264  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2265    throws IOException, InterruptedException {
2266    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2267    if (regions == null || regions.isEmpty()) {
2268      return null;
2269    }
2270    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2271
2272    byte[] firstRegionName =
2273      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2274        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2275
2276    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2277    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2278      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2279    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2280      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2281    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2282    while (retrier.shouldRetry()) {
2283      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2284      if (index != -1) {
2285        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2286      }
2287      // Came back -1. Region may not be online yet. Sleep a while.
2288      retrier.sleepUntilNextRetry();
2289    }
2290    return null;
2291  }
2292
2293  /**
2294   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2295   * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple
2296   * MiniMRCluster instances with different log dirs.
2297   * @throws IOException When starting the cluster fails.
2298   */
2299  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2300    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2301    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2302      "99.0");
2303    startMiniMapReduceCluster(2);
2304    return mrCluster;
2305  }
2306
2307  /**
2308   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2309   * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start
2310   * multiple MiniMRCluster instances with different log dirs.
2311   * @param servers The number of <code>TaskTracker</code>'s to start.
2312   * @throws IOException When starting the cluster fails.
2313   */
2314  private void startMiniMapReduceCluster(final int servers) throws IOException {
2315    if (mrCluster != null) {
2316      throw new IllegalStateException("MiniMRCluster is already running");
2317    }
2318    LOG.info("Starting mini mapreduce cluster...");
2319    setupClusterTestDir();
2320    createDirsAndSetProperties();
2321
2322    //// hadoop2 specific settings
2323    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2324    // we up the VM usable so that processes don't get killed.
2325    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2326
2327    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2328    // this avoids the problem by disabling speculative task execution in tests.
2329    conf.setBoolean("mapreduce.map.speculative", false);
2330    conf.setBoolean("mapreduce.reduce.speculative", false);
2331    ////
2332
2333    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2334    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2335    if (JVM.getJVMSpecVersion() >= 17) {
2336      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2337      conf.set("yarn.app.mapreduce.am.command-opts",
2338        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2339    }
2340
2341    // Allow the user to override FS URI for this map-reduce cluster to use.
2342    mrCluster =
2343      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2344        1, null, null, new JobConf(this.conf));
2345    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2346    if (jobConf == null) {
2347      jobConf = mrCluster.createJobConf();
2348    }
2349
2350    // Hadoop MiniMR overwrites this while it should not
2351    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2352    LOG.info("Mini mapreduce cluster started");
2353
2354    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2355    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2356    // necessary config properties here. YARN-129 required adding a few properties.
2357    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2358    // this for mrv2 support; mr1 ignores this
2359    conf.set("mapreduce.framework.name", "yarn");
2360    conf.setBoolean("yarn.is.minicluster", true);
2361    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2362    if (rmAddress != null) {
2363      conf.set("yarn.resourcemanager.address", rmAddress);
2364    }
2365    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2366    if (historyAddress != null) {
2367      conf.set("mapreduce.jobhistory.address", historyAddress);
2368    }
2369    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2370    if (schedulerAddress != null) {
2371      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2372    }
2373    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2374    if (mrJobHistoryWebappAddress != null) {
2375      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2376    }
2377    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2378    if (yarnRMWebappAddress != null) {
2379      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2380    }
2381  }
2382
2383  /**
2384   * Stops the previously started <code>MiniMRCluster</code>.
2385   */
2386  public void shutdownMiniMapReduceCluster() {
2387    if (mrCluster != null) {
2388      LOG.info("Stopping mini mapreduce cluster...");
2389      mrCluster.shutdown();
2390      mrCluster = null;
2391      LOG.info("Mini mapreduce cluster stopped");
2392    }
2393    // Restore configuration to point to local jobtracker
2394    conf.set("mapreduce.jobtracker.address", "local");
2395  }
2396
2397  /**
2398   * Create a stubbed out RegionServerService, mainly for getting FS.
2399   */
2400  public RegionServerServices createMockRegionServerService() throws IOException {
2401    return createMockRegionServerService((ServerName) null);
2402  }
2403
2404  /**
2405   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2406   * TestTokenAuthentication
2407   */
2408  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2409    throws IOException {
2410    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2411    rss.setFileSystem(getTestFileSystem());
2412    rss.setRpcServer(rpc);
2413    return rss;
2414  }
2415
2416  /**
2417   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2418   * TestOpenRegionHandler
2419   */
2420  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2421    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2422    rss.setFileSystem(getTestFileSystem());
2423    return rss;
2424  }
2425
2426  /**
2427   * Expire the Master's session
2428   */
2429  public void expireMasterSession() throws Exception {
2430    HMaster master = getMiniHBaseCluster().getMaster();
2431    expireSession(master.getZooKeeper(), false);
2432  }
2433
2434  /**
2435   * Expire a region server's session
2436   * @param index which RS
2437   */
2438  public void expireRegionServerSession(int index) throws Exception {
2439    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2440    expireSession(rs.getZooKeeper(), false);
2441    decrementMinRegionServerCount();
2442  }
2443
2444  private void decrementMinRegionServerCount() {
2445    // decrement the count for this.conf, for newly spwaned master
2446    // this.hbaseCluster shares this configuration too
2447    decrementMinRegionServerCount(getConfiguration());
2448
2449    // each master thread keeps a copy of configuration
2450    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2451      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2452    }
2453  }
2454
2455  private void decrementMinRegionServerCount(Configuration conf) {
2456    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2457    if (currentCount != -1) {
2458      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2459    }
2460  }
2461
2462  public void expireSession(ZKWatcher nodeZK) throws Exception {
2463    expireSession(nodeZK, false);
2464  }
2465
2466  /**
2467   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2468   * http://hbase.apache.org/book.html#trouble.zookeeper
2469   * <p/>
2470   * There are issues when doing this:
2471   * <ol>
2472   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2473   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2474   * </ol>
2475   * @param nodeZK      - the ZK watcher to expire
2476   * @param checkStatus - true to check if we can create a Table with the current configuration.
2477   */
2478  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2479    Configuration c = new Configuration(this.conf);
2480    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2481    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2482    byte[] password = zk.getSessionPasswd();
2483    long sessionID = zk.getSessionId();
2484
2485    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2486    // so we create a first watcher to be sure that the
2487    // event was sent. We expect that if our watcher receives the event
2488    // other watchers on the same machine will get is as well.
2489    // When we ask to close the connection, ZK does not close it before
2490    // we receive all the events, so don't have to capture the event, just
2491    // closing the connection should be enough.
2492    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2493      @Override
2494      public void process(WatchedEvent watchedEvent) {
2495        LOG.info("Monitor ZKW received event=" + watchedEvent);
2496      }
2497    }, sessionID, password);
2498
2499    // Making it expire
2500    ZooKeeper newZK =
2501      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2502
2503    // ensure that we have connection to the server before closing down, otherwise
2504    // the close session event will be eaten out before we start CONNECTING state
2505    long start = EnvironmentEdgeManager.currentTime();
2506    while (
2507      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2508    ) {
2509      Thread.sleep(1);
2510    }
2511    newZK.close();
2512    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2513
2514    // Now closing & waiting to be sure that the clients get it.
2515    monitor.close();
2516
2517    if (checkStatus) {
2518      getConnection().getTable(TableName.META_TABLE_NAME).close();
2519    }
2520  }
2521
2522  /**
2523   * Get the Mini HBase cluster.
2524   * @return hbase cluster
2525   * @see #getHBaseClusterInterface()
2526   */
2527  public SingleProcessHBaseCluster getHBaseCluster() {
2528    return getMiniHBaseCluster();
2529  }
2530
2531  /**
2532   * Returns the HBaseCluster instance.
2533   * <p>
2534   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2535   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2536   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2537   * instead w/o the need to type-cast.
2538   */
2539  public HBaseClusterInterface getHBaseClusterInterface() {
2540    // implementation note: we should rename this method as #getHBaseCluster(),
2541    // but this would require refactoring 90+ calls.
2542    return hbaseCluster;
2543  }
2544
2545  /**
2546   * Resets the connections so that the next time getConnection() is called, a new connection is
2547   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2548   * the connection is not valid anymore.
2549   * <p/>
2550   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2551   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2552   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2553   * the connection state automatically. Cleaning this is a much bigger refactor.
2554   */
2555  public void invalidateConnection() throws IOException {
2556    closeConnection();
2557    // Update the master addresses if they changed.
2558    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2559    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2560    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2561      masterConfigBefore, masterConfAfter);
2562    conf.set(HConstants.MASTER_ADDRS_KEY,
2563      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2564  }
2565
2566  /**
2567   * Get a shared Connection to the cluster. this method is thread safe.
2568   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2569   */
2570  public Connection getConnection() throws IOException {
2571    return getAsyncConnection().toConnection();
2572  }
2573
2574  /**
2575   * Get a assigned Connection to the cluster. this method is thread safe.
2576   * @param user assigned user
2577   * @return A Connection with assigned user.
2578   */
2579  public Connection getConnection(User user) throws IOException {
2580    return getAsyncConnection(user).toConnection();
2581  }
2582
2583  /**
2584   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2585   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2586   *         of cluster.
2587   */
2588  public AsyncClusterConnection getAsyncConnection() throws IOException {
2589    try {
2590      return asyncConnection.updateAndGet(connection -> {
2591        if (connection == null) {
2592          try {
2593            User user = UserProvider.instantiate(conf).getCurrent();
2594            connection = getAsyncConnection(user);
2595          } catch (IOException ioe) {
2596            throw new UncheckedIOException("Failed to create connection", ioe);
2597          }
2598        }
2599        return connection;
2600      });
2601    } catch (UncheckedIOException exception) {
2602      throw exception.getCause();
2603    }
2604  }
2605
2606  /**
2607   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2608   * @param user assigned user
2609   * @return An AsyncClusterConnection with assigned user.
2610   */
2611  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2612    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2613  }
2614
2615  public void closeConnection() throws IOException {
2616    if (hbaseAdmin != null) {
2617      Closeables.close(hbaseAdmin, true);
2618      hbaseAdmin = null;
2619    }
2620    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2621    if (asyncConnection != null) {
2622      Closeables.close(asyncConnection, true);
2623    }
2624  }
2625
2626  /**
2627   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2628   * it has no effect, it will be closed automatically when the cluster shutdowns
2629   */
2630  public Admin getAdmin() throws IOException {
2631    if (hbaseAdmin == null) {
2632      this.hbaseAdmin = getConnection().getAdmin();
2633    }
2634    return hbaseAdmin;
2635  }
2636
2637  private Admin hbaseAdmin = null;
2638
2639  /**
2640   * Returns an {@link Hbck} instance. Needs be closed when done.
2641   */
2642  public Hbck getHbck() throws IOException {
2643    return getConnection().getHbck();
2644  }
2645
2646  /**
2647   * Unassign the named region.
2648   * @param regionName The region to unassign.
2649   */
2650  public void unassignRegion(String regionName) throws IOException {
2651    unassignRegion(Bytes.toBytes(regionName));
2652  }
2653
2654  /**
2655   * Unassign the named region.
2656   * @param regionName The region to unassign.
2657   */
2658  public void unassignRegion(byte[] regionName) throws IOException {
2659    getAdmin().unassign(regionName);
2660  }
2661
2662  /**
2663   * Closes the region containing the given row.
2664   * @param row   The row to find the containing region.
2665   * @param table The table to find the region.
2666   */
2667  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2668    unassignRegionByRow(Bytes.toBytes(row), table);
2669  }
2670
2671  /**
2672   * Closes the region containing the given row.
2673   * @param row   The row to find the containing region.
2674   * @param table The table to find the region.
2675   */
2676  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2677    HRegionLocation hrl = table.getRegionLocation(row);
2678    unassignRegion(hrl.getRegion().getRegionName());
2679  }
2680
2681  /**
2682   * Retrieves a splittable region randomly from tableName
2683   * @param tableName   name of table
2684   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2685   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2686   */
2687  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2688    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2689    int regCount = regions.size();
2690    Set<Integer> attempted = new HashSet<>();
2691    int idx;
2692    int attempts = 0;
2693    do {
2694      regions = getHBaseCluster().getRegions(tableName);
2695      if (regCount != regions.size()) {
2696        // if there was region movement, clear attempted Set
2697        attempted.clear();
2698      }
2699      regCount = regions.size();
2700      // There are chances that before we get the region for the table from an RS the region may
2701      // be going for CLOSE. This may be because online schema change is enabled
2702      if (regCount > 0) {
2703        idx = ThreadLocalRandom.current().nextInt(regCount);
2704        // if we have just tried this region, there is no need to try again
2705        if (attempted.contains(idx)) {
2706          continue;
2707        }
2708        HRegion region = regions.get(idx);
2709        if (region.checkSplit().isPresent()) {
2710          return region;
2711        }
2712        attempted.add(idx);
2713      }
2714      attempts++;
2715    } while (maxAttempts == -1 || attempts < maxAttempts);
2716    return null;
2717  }
2718
2719  public MiniDFSCluster getDFSCluster() {
2720    return dfsCluster;
2721  }
2722
2723  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2724    setDFSCluster(cluster, true);
2725  }
2726
2727  /**
2728   * Set the MiniDFSCluster
2729   * @param cluster     cluster to use
2730   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2731   *                    is set.
2732   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2733   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2734   */
2735  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2736    throws IllegalStateException, IOException {
2737    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2738      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2739    }
2740    this.dfsCluster = cluster;
2741    this.setFs();
2742  }
2743
2744  public FileSystem getTestFileSystem() throws IOException {
2745    return HFileSystem.get(conf);
2746  }
2747
2748  /**
2749   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2750   * (30 seconds).
2751   * @param table Table to wait on.
2752   */
2753  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2754    waitTableAvailable(table.getName(), 30000);
2755  }
2756
2757  public void waitTableAvailable(TableName table, long timeoutMillis)
2758    throws InterruptedException, IOException {
2759    waitFor(timeoutMillis, predicateTableAvailable(table));
2760  }
2761
2762  /**
2763   * Wait until all regions in a table have been assigned
2764   * @param table         Table to wait on.
2765   * @param timeoutMillis Timeout.
2766   */
2767  public void waitTableAvailable(byte[] table, long timeoutMillis)
2768    throws InterruptedException, IOException {
2769    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2770  }
2771
2772  public String explainTableAvailability(TableName tableName) throws IOException {
2773    StringBuilder msg =
2774      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2775    if (getHBaseCluster().getMaster().isAlive()) {
2776      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2777        .getRegionStates().getRegionAssignments();
2778      final List<Pair<RegionInfo, ServerName>> metaLocations =
2779        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2780      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2781        RegionInfo hri = metaLocation.getFirst();
2782        ServerName sn = metaLocation.getSecond();
2783        if (!assignments.containsKey(hri)) {
2784          msg.append(", region ").append(hri)
2785            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2786        } else if (sn == null) {
2787          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2788        } else if (!sn.equals(assignments.get(hri))) {
2789          msg.append(",  region ").append(hri)
2790            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2791            .append(" <> ").append(assignments.get(hri));
2792        }
2793      }
2794    }
2795    return msg.toString();
2796  }
2797
2798  public String explainTableState(final TableName table, TableState.State state)
2799    throws IOException {
2800    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2801    if (tableState == null) {
2802      return "TableState in META: No table state in META for table " + table
2803        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2804    } else if (!tableState.inStates(state)) {
2805      return "TableState in META: Not " + state + " state, but " + tableState;
2806    } else {
2807      return "TableState in META: OK";
2808    }
2809  }
2810
2811  @Nullable
2812  public TableState findLastTableState(final TableName table) throws IOException {
2813    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2814    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2815      @Override
2816      public boolean visit(Result r) throws IOException {
2817        if (!Arrays.equals(r.getRow(), table.getName())) {
2818          return false;
2819        }
2820        TableState state = CatalogFamilyFormat.getTableState(r);
2821        if (state != null) {
2822          lastTableState.set(state);
2823        }
2824        return true;
2825      }
2826    };
2827    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2828      Integer.MAX_VALUE, visitor);
2829    return lastTableState.get();
2830  }
2831
2832  /**
2833   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2834   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2835   * table.
2836   * @param table the table to wait on.
2837   * @throws InterruptedException if interrupted while waiting
2838   * @throws IOException          if an IO problem is encountered
2839   */
2840  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2841    waitTableEnabled(table, 30000);
2842  }
2843
2844  /**
2845   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2846   * have been all assigned.
2847   * @see #waitTableEnabled(TableName, long)
2848   * @param table         Table to wait on.
2849   * @param timeoutMillis Time to wait on it being marked enabled.
2850   */
2851  public void waitTableEnabled(byte[] table, long timeoutMillis)
2852    throws InterruptedException, IOException {
2853    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2854  }
2855
2856  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2857    waitFor(timeoutMillis, predicateTableEnabled(table));
2858  }
2859
2860  /**
2861   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2862   * after default period (30 seconds)
2863   * @param table Table to wait on.
2864   */
2865  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2866    waitTableDisabled(table, 30000);
2867  }
2868
2869  public void waitTableDisabled(TableName table, long millisTimeout)
2870    throws InterruptedException, IOException {
2871    waitFor(millisTimeout, predicateTableDisabled(table));
2872  }
2873
2874  /**
2875   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2876   * @param table         Table to wait on.
2877   * @param timeoutMillis Time to wait on it being marked disabled.
2878   */
2879  public void waitTableDisabled(byte[] table, long timeoutMillis)
2880    throws InterruptedException, IOException {
2881    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2882  }
2883
2884  /**
2885   * Make sure that at least the specified number of region servers are running
2886   * @param num minimum number of region servers that should be running
2887   * @return true if we started some servers
2888   */
2889  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2890    boolean startedServer = false;
2891    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2892    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2893      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2894      startedServer = true;
2895    }
2896
2897    return startedServer;
2898  }
2899
2900  /**
2901   * Make sure that at least the specified number of region servers are running. We don't count the
2902   * ones that are currently stopping or are stopped.
2903   * @param num minimum number of region servers that should be running
2904   * @return true if we started some servers
2905   */
2906  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2907    boolean startedServer = ensureSomeRegionServersAvailable(num);
2908
2909    int nonStoppedServers = 0;
2910    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2911
2912      HRegionServer hrs = rst.getRegionServer();
2913      if (hrs.isStopping() || hrs.isStopped()) {
2914        LOG.info("A region server is stopped or stopping:" + hrs);
2915      } else {
2916        nonStoppedServers++;
2917      }
2918    }
2919    for (int i = nonStoppedServers; i < num; ++i) {
2920      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2921      startedServer = true;
2922    }
2923    return startedServer;
2924  }
2925
2926  /**
2927   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2928   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2929   * @param c                     Initial configuration
2930   * @param differentiatingSuffix Suffix to differentiate this user from others.
2931   * @return A new configuration instance with a different user set into it.
2932   */
2933  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2934    throws IOException {
2935    FileSystem currentfs = FileSystem.get(c);
2936    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2937      return User.getCurrent();
2938    }
2939    // Else distributed filesystem. Make a new instance per daemon. Below
2940    // code is taken from the AppendTestUtil over in hdfs.
2941    String username = User.getCurrent().getName() + differentiatingSuffix;
2942    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2943    return user;
2944  }
2945
2946  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2947    throws IOException {
2948    NavigableSet<String> online = new TreeSet<>();
2949    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2950      try {
2951        for (RegionInfo region : ProtobufUtil
2952          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2953          online.add(region.getRegionNameAsString());
2954        }
2955      } catch (RegionServerStoppedException e) {
2956        // That's fine.
2957      }
2958    }
2959    return online;
2960  }
2961
2962  /**
2963   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2964   * linger. Here is the exception you'll see:
2965   *
2966   * <pre>
2967   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2968   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2969   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2970   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2971   * </pre>
2972   *
2973   * @param stream A DFSClient.DFSOutputStream.
2974   */
2975  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2976    try {
2977      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2978      for (Class<?> clazz : clazzes) {
2979        String className = clazz.getSimpleName();
2980        if (className.equals("DFSOutputStream")) {
2981          if (clazz.isInstance(stream)) {
2982            Field maxRecoveryErrorCountField =
2983              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2984            maxRecoveryErrorCountField.setAccessible(true);
2985            maxRecoveryErrorCountField.setInt(stream, max);
2986            break;
2987          }
2988        }
2989      }
2990    } catch (Exception e) {
2991      LOG.info("Could not set max recovery field", e);
2992    }
2993  }
2994
2995  /**
2996   * Uses directly the assignment manager to assign the region. and waits until the specified region
2997   * has completed assignment.
2998   * @return true if the region is assigned false otherwise.
2999   */
3000  public boolean assignRegion(final RegionInfo regionInfo)
3001    throws IOException, InterruptedException {
3002    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3003    am.assign(regionInfo);
3004    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3005  }
3006
3007  /**
3008   * Move region to destination server and wait till region is completely moved and online
3009   * @param destRegion region to move
3010   * @param destServer destination server of the region
3011   */
3012  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3013    throws InterruptedException, IOException {
3014    HMaster master = getMiniHBaseCluster().getMaster();
3015    // TODO: Here we start the move. The move can take a while.
3016    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3017    while (true) {
3018      ServerName serverName =
3019        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3020      if (serverName != null && serverName.equals(destServer)) {
3021        assertRegionOnServer(destRegion, serverName, 2000);
3022        break;
3023      }
3024      Thread.sleep(10);
3025    }
3026  }
3027
3028  /**
3029   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3030   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3031   * master has been informed and updated hbase:meta with the regions deployed server.
3032   * @param tableName the table name
3033   */
3034  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3035    waitUntilAllRegionsAssigned(tableName,
3036      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3037  }
3038
3039  /**
3040   * Waith until all system table's regions get assigned
3041   */
3042  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3043    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3044  }
3045
3046  /**
3047   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3048   * timeout. This means all regions have been deployed, master has been informed and updated
3049   * hbase:meta with the regions deployed server.
3050   * @param tableName the table name
3051   * @param timeout   timeout, in milliseconds
3052   */
3053  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3054    throws IOException {
3055    if (!TableName.isMetaTableName(tableName)) {
3056      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3057        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3058          + timeout + "ms");
3059        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3060          @Override
3061          public String explainFailure() throws IOException {
3062            return explainTableAvailability(tableName);
3063          }
3064
3065          @Override
3066          public boolean evaluate() throws IOException {
3067            Scan scan = new Scan();
3068            scan.addFamily(HConstants.CATALOG_FAMILY);
3069            boolean tableFound = false;
3070            try (ResultScanner s = meta.getScanner(scan)) {
3071              for (Result r; (r = s.next()) != null;) {
3072                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3073                RegionInfo info = RegionInfo.parseFromOrNull(b);
3074                if (info != null && info.getTable().equals(tableName)) {
3075                  // Get server hosting this region from catalog family. Return false if no server
3076                  // hosting this region, or if the server hosting this region was recently killed
3077                  // (for fault tolerance testing).
3078                  tableFound = true;
3079                  byte[] server =
3080                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3081                  if (server == null) {
3082                    return false;
3083                  } else {
3084                    byte[] startCode =
3085                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3086                    ServerName serverName =
3087                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3088                        + Bytes.toLong(startCode));
3089                    if (
3090                      !getHBaseClusterInterface().isDistributedCluster()
3091                        && getHBaseCluster().isKilledRS(serverName)
3092                    ) {
3093                      return false;
3094                    }
3095                  }
3096                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3097                    return false;
3098                  }
3099                }
3100              }
3101            }
3102            if (!tableFound) {
3103              LOG.warn(
3104                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3105            }
3106            return tableFound;
3107          }
3108        });
3109      }
3110    }
3111    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3112    // check from the master state if we are using a mini cluster
3113    if (!getHBaseClusterInterface().isDistributedCluster()) {
3114      // So, all regions are in the meta table but make sure master knows of the assignments before
3115      // returning -- sometimes this can lag.
3116      HMaster master = getHBaseCluster().getMaster();
3117      final RegionStates states = master.getAssignmentManager().getRegionStates();
3118      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3119        @Override
3120        public String explainFailure() throws IOException {
3121          return explainTableAvailability(tableName);
3122        }
3123
3124        @Override
3125        public boolean evaluate() throws IOException {
3126          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3127          return hris != null && !hris.isEmpty();
3128        }
3129      });
3130    }
3131    LOG.info("All regions for table " + tableName + " assigned.");
3132  }
3133
3134  /**
3135   * Do a small get/scan against one store. This is required because store has no actual methods of
3136   * querying itself, and relies on StoreScanner.
3137   */
3138  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3139    Scan scan = new Scan(get);
3140    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3141      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3142      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3143      // readpoint 0.
3144      0);
3145
3146    List<Cell> result = new ArrayList<>();
3147    scanner.next(result);
3148    if (!result.isEmpty()) {
3149      // verify that we are on the row we want:
3150      Cell kv = result.get(0);
3151      if (!CellUtil.matchingRows(kv, get.getRow())) {
3152        result.clear();
3153      }
3154    }
3155    scanner.close();
3156    return result;
3157  }
3158
3159  /**
3160   * Create region split keys between startkey and endKey
3161   * @param numRegions the number of regions to be created. it has to be greater than 3.
3162   * @return resulting split keys
3163   */
3164  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3165    assertTrue(numRegions > 3);
3166    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3167    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3168    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3169    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3170    return result;
3171  }
3172
3173  /**
3174   * Do a small get/scan against one store. This is required because store has no actual methods of
3175   * querying itself, and relies on StoreScanner.
3176   */
3177  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3178    throws IOException {
3179    Get get = new Get(row);
3180    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3181    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3182
3183    return getFromStoreFile(store, get);
3184  }
3185
3186  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3187    final List<? extends Cell> actual) {
3188    final int eLen = expected.size();
3189    final int aLen = actual.size();
3190    final int minLen = Math.min(eLen, aLen);
3191
3192    int i = 0;
3193    while (
3194      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3195    ) {
3196      i++;
3197    }
3198
3199    if (additionalMsg == null) {
3200      additionalMsg = "";
3201    }
3202    if (!additionalMsg.isEmpty()) {
3203      additionalMsg = ". " + additionalMsg;
3204    }
3205
3206    if (eLen != aLen || i != minLen) {
3207      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3208        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3209        + " (length " + aLen + ")" + additionalMsg);
3210    }
3211  }
3212
3213  public static <T> String safeGetAsStr(List<T> lst, int i) {
3214    if (0 <= i && i < lst.size()) {
3215      return lst.get(i).toString();
3216    } else {
3217      return "<out_of_range>";
3218    }
3219  }
3220
3221  public String getRpcConnnectionURI() throws UnknownHostException {
3222    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3223  }
3224
3225  public String getZkConnectionURI() {
3226    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3227      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3228      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3229  }
3230
3231  /**
3232   * Get the zk based cluster key for this cluster.
3233   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3234   *             connection info of a cluster. Keep here only for compatibility.
3235   * @see #getRpcConnnectionURI()
3236   * @see #getZkConnectionURI()
3237   */
3238  @Deprecated
3239  public String getClusterKey() {
3240    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3241      + ":"
3242      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3243  }
3244
3245  /**
3246   * Creates a random table with the given parameters
3247   */
3248  public Table createRandomTable(TableName tableName, final Collection<String> families,
3249    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3250    final int numRowsPerFlush) throws IOException, InterruptedException {
3251    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3252      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3253      + maxVersions + "\n");
3254
3255    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3256    final int numCF = families.size();
3257    final byte[][] cfBytes = new byte[numCF][];
3258    {
3259      int cfIndex = 0;
3260      for (String cf : families) {
3261        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3262      }
3263    }
3264
3265    final int actualStartKey = 0;
3266    final int actualEndKey = Integer.MAX_VALUE;
3267    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3268    final int splitStartKey = actualStartKey + keysPerRegion;
3269    final int splitEndKey = actualEndKey - keysPerRegion;
3270    final String keyFormat = "%08x";
3271    final Table table = createTable(tableName, cfBytes, maxVersions,
3272      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3273      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3274
3275    if (hbaseCluster != null) {
3276      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3277    }
3278
3279    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3280
3281    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3282      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3283        final byte[] row = Bytes.toBytes(
3284          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3285
3286        Put put = new Put(row);
3287        Delete del = new Delete(row);
3288        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3289          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3290          final long ts = rand.nextInt();
3291          final byte[] qual = Bytes.toBytes("col" + iCol);
3292          if (rand.nextBoolean()) {
3293            final byte[] value =
3294              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3295                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3296            put.addColumn(cf, qual, ts, value);
3297          } else if (rand.nextDouble() < 0.8) {
3298            del.addColumn(cf, qual, ts);
3299          } else {
3300            del.addColumns(cf, qual, ts);
3301          }
3302        }
3303
3304        if (!put.isEmpty()) {
3305          mutator.mutate(put);
3306        }
3307
3308        if (!del.isEmpty()) {
3309          mutator.mutate(del);
3310        }
3311      }
3312      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3313      mutator.flush();
3314      if (hbaseCluster != null) {
3315        getMiniHBaseCluster().flushcache(table.getName());
3316      }
3317    }
3318    mutator.close();
3319
3320    return table;
3321  }
3322
3323  public static int randomFreePort() {
3324    return HBaseCommonTestingUtil.randomFreePort();
3325  }
3326
3327  public static String randomMultiCastAddress() {
3328    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3329  }
3330
3331  public static void waitForHostPort(String host, int port) throws IOException {
3332    final int maxTimeMs = 10000;
3333    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3334    IOException savedException = null;
3335    LOG.info("Waiting for server at " + host + ":" + port);
3336    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3337      try {
3338        Socket sock = new Socket(InetAddress.getByName(host), port);
3339        sock.close();
3340        savedException = null;
3341        LOG.info("Server at " + host + ":" + port + " is available");
3342        break;
3343      } catch (UnknownHostException e) {
3344        throw new IOException("Failed to look up " + host, e);
3345      } catch (IOException e) {
3346        savedException = e;
3347      }
3348      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3349    }
3350
3351    if (savedException != null) {
3352      throw savedException;
3353    }
3354  }
3355
3356  public static int getMetaRSPort(Connection connection) throws IOException {
3357    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3358      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3359    }
3360  }
3361
3362  /**
3363   * Due to async racing issue, a region may not be in the online region list of a region server
3364   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3365   */
3366  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3367    final long timeout) throws IOException, InterruptedException {
3368    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3369    while (true) {
3370      List<RegionInfo> regions = getAdmin().getRegions(server);
3371      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3372      long now = EnvironmentEdgeManager.currentTime();
3373      if (now > timeoutTime) break;
3374      Thread.sleep(10);
3375    }
3376    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3377  }
3378
3379  /**
3380   * Check to make sure the region is open on the specified region server, but not on any other one.
3381   */
3382  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3383    final long timeout) throws IOException, InterruptedException {
3384    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3385    while (true) {
3386      List<RegionInfo> regions = getAdmin().getRegions(server);
3387      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3388        List<JVMClusterUtil.RegionServerThread> rsThreads =
3389          getHBaseCluster().getLiveRegionServerThreads();
3390        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3391          HRegionServer rs = rsThread.getRegionServer();
3392          if (server.equals(rs.getServerName())) {
3393            continue;
3394          }
3395          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3396          for (HRegion r : hrs) {
3397            assertTrue("Region should not be double assigned",
3398              r.getRegionInfo().getRegionId() != hri.getRegionId());
3399          }
3400        }
3401        return; // good, we are happy
3402      }
3403      long now = EnvironmentEdgeManager.currentTime();
3404      if (now > timeoutTime) break;
3405      Thread.sleep(10);
3406    }
3407    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3408  }
3409
3410  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3411    TableDescriptor td =
3412      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3413    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3414    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3415  }
3416
3417  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3418    BlockCache blockCache) throws IOException {
3419    TableDescriptor td =
3420      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3421    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3422    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3423  }
3424
3425  public static void setFileSystemURI(String fsURI) {
3426    FS_URI = fsURI;
3427  }
3428
3429  /**
3430   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3431   */
3432  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3433    return new ExplainingPredicate<IOException>() {
3434      @Override
3435      public String explainFailure() throws IOException {
3436        final RegionStates regionStates =
3437          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3438        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3439      }
3440
3441      @Override
3442      public boolean evaluate() throws IOException {
3443        HMaster master = getMiniHBaseCluster().getMaster();
3444        if (master == null) return false;
3445        AssignmentManager am = master.getAssignmentManager();
3446        if (am == null) return false;
3447        return !am.hasRegionsInTransition();
3448      }
3449    };
3450  }
3451
3452  /**
3453   * Returns a {@link Predicate} for checking that table is enabled
3454   */
3455  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3456    return new ExplainingPredicate<IOException>() {
3457      @Override
3458      public String explainFailure() throws IOException {
3459        return explainTableState(tableName, TableState.State.ENABLED);
3460      }
3461
3462      @Override
3463      public boolean evaluate() throws IOException {
3464        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3465      }
3466    };
3467  }
3468
3469  /**
3470   * Returns a {@link Predicate} for checking that table is enabled
3471   */
3472  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3473    return new ExplainingPredicate<IOException>() {
3474      @Override
3475      public String explainFailure() throws IOException {
3476        return explainTableState(tableName, TableState.State.DISABLED);
3477      }
3478
3479      @Override
3480      public boolean evaluate() throws IOException {
3481        return getAdmin().isTableDisabled(tableName);
3482      }
3483    };
3484  }
3485
3486  /**
3487   * Returns a {@link Predicate} for checking that table is enabled
3488   */
3489  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3490    return new ExplainingPredicate<IOException>() {
3491      @Override
3492      public String explainFailure() throws IOException {
3493        return explainTableAvailability(tableName);
3494      }
3495
3496      @Override
3497      public boolean evaluate() throws IOException {
3498        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3499        if (tableAvailable) {
3500          try (Table table = getConnection().getTable(tableName)) {
3501            TableDescriptor htd = table.getDescriptor();
3502            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3503              .getAllRegionLocations()) {
3504              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3505                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3506                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3507              for (byte[] family : htd.getColumnFamilyNames()) {
3508                scan.addFamily(family);
3509              }
3510              try (ResultScanner scanner = table.getScanner(scan)) {
3511                scanner.next();
3512              }
3513            }
3514          }
3515        }
3516        return tableAvailable;
3517      }
3518    };
3519  }
3520
3521  /**
3522   * Wait until no regions in transition.
3523   * @param timeout How long to wait.
3524   */
3525  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3526    waitFor(timeout, predicateNoRegionsInTransition());
3527  }
3528
3529  /**
3530   * Wait until no regions in transition. (time limit 15min)
3531   */
3532  public void waitUntilNoRegionsInTransition() throws IOException {
3533    waitUntilNoRegionsInTransition(15 * 60000);
3534  }
3535
3536  /**
3537   * Wait until labels is ready in VisibilityLabelsCache.
3538   */
3539  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3540    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3541    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3542
3543      @Override
3544      public boolean evaluate() {
3545        for (String label : labels) {
3546          if (labelsCache.getLabelOrdinal(label) == 0) {
3547            return false;
3548          }
3549        }
3550        return true;
3551      }
3552
3553      @Override
3554      public String explainFailure() {
3555        for (String label : labels) {
3556          if (labelsCache.getLabelOrdinal(label) == 0) {
3557            return label + " is not available yet";
3558          }
3559        }
3560        return "";
3561      }
3562    });
3563  }
3564
3565  /**
3566   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3567   * available.
3568   * @return the list of column descriptors
3569   */
3570  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3571    return generateColumnDescriptors("");
3572  }
3573
3574  /**
3575   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3576   * available.
3577   * @param prefix family names prefix
3578   * @return the list of column descriptors
3579   */
3580  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3581    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3582    long familyId = 0;
3583    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3584      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3585        for (BloomType bloomType : BloomType.values()) {
3586          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3587          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3588            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3589          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3590          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3591          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3592          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3593          familyId++;
3594        }
3595      }
3596    }
3597    return columnFamilyDescriptors;
3598  }
3599
3600  /**
3601   * Get supported compression algorithms.
3602   * @return supported compression algorithms.
3603   */
3604  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3605    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3606    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3607    for (String algoName : allAlgos) {
3608      try {
3609        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3610        algo.getCompressor();
3611        supportedAlgos.add(algo);
3612      } catch (Throwable t) {
3613        // this algo is not available
3614      }
3615    }
3616    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3617  }
3618
3619  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3620    Scan scan = new Scan().withStartRow(row);
3621    scan.setReadType(ReadType.PREAD);
3622    scan.setCaching(1);
3623    scan.setReversed(true);
3624    scan.addFamily(family);
3625    try (RegionScanner scanner = r.getScanner(scan)) {
3626      List<Cell> cells = new ArrayList<>(1);
3627      scanner.next(cells);
3628      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3629        return null;
3630      }
3631      return Result.create(cells);
3632    }
3633  }
3634
3635  private boolean isTargetTable(final byte[] inRow, Cell c) {
3636    String inputRowString = Bytes.toString(inRow);
3637    int i = inputRowString.indexOf(HConstants.DELIMITER);
3638    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3639    int o = outputRowString.indexOf(HConstants.DELIMITER);
3640    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3641  }
3642
3643  /**
3644   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3645   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3646   * kerby KDC server and utility for using it,
3647   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3648   * less baggage. It came in in HBASE-5291.
3649   */
3650  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3651    Properties conf = MiniKdc.createConf();
3652    conf.put(MiniKdc.DEBUG, true);
3653    MiniKdc kdc = null;
3654    File dir = null;
3655    // There is time lag between selecting a port and trying to bind with it. It's possible that
3656    // another service captures the port in between which'll result in BindException.
3657    boolean bindException;
3658    int numTries = 0;
3659    do {
3660      try {
3661        bindException = false;
3662        dir = new File(getDataTestDir("kdc").toUri().getPath());
3663        kdc = new MiniKdc(conf, dir);
3664        kdc.start();
3665      } catch (BindException e) {
3666        FileUtils.deleteDirectory(dir); // clean directory
3667        numTries++;
3668        if (numTries == 3) {
3669          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3670          throw e;
3671        }
3672        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3673        bindException = true;
3674      }
3675    } while (bindException);
3676    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3677    return kdc;
3678  }
3679
3680  public int getNumHFiles(final TableName tableName, final byte[] family) {
3681    int numHFiles = 0;
3682    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3683      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3684    }
3685    return numHFiles;
3686  }
3687
3688  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3689    final byte[] family) {
3690    int numHFiles = 0;
3691    for (Region region : rs.getRegions(tableName)) {
3692      numHFiles += region.getStore(family).getStorefilesCount();
3693    }
3694    return numHFiles;
3695  }
3696
3697  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3698    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3699    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3700    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3701    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3702    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3703        it2 = rtdFamilies.iterator(); it.hasNext();) {
3704      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3705    }
3706  }
3707
3708  /**
3709   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3710   * invocations.
3711   */
3712  public static void await(final long sleepMillis, final BooleanSupplier condition)
3713    throws InterruptedException {
3714    try {
3715      while (!condition.getAsBoolean()) {
3716        Thread.sleep(sleepMillis);
3717      }
3718    } catch (RuntimeException e) {
3719      if (e.getCause() instanceof AssertionError) {
3720        throw (AssertionError) e.getCause();
3721      }
3722      throw e;
3723    }
3724  }
3725}