001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.File;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.io.UncheckedIOException;
030import java.lang.reflect.Field;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
092import org.apache.hadoop.hbase.fs.HFileSystem;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.BlockCache;
097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
098import org.apache.hadoop.hbase.io.hfile.HFile;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
101import org.apache.hadoop.hbase.master.HMaster;
102import org.apache.hadoop.hbase.master.MasterFileSystem;
103import org.apache.hadoop.hbase.master.RegionState;
104import org.apache.hadoop.hbase.master.ServerManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
107import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
108import org.apache.hadoop.hbase.master.assignment.RegionStates;
109import org.apache.hadoop.hbase.mob.MobFileCache;
110import org.apache.hadoop.hbase.regionserver.BloomType;
111import org.apache.hadoop.hbase.regionserver.ChunkCreator;
112import org.apache.hadoop.hbase.regionserver.HRegion;
113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
114import org.apache.hadoop.hbase.regionserver.HRegionServer;
115import org.apache.hadoop.hbase.regionserver.HStore;
116import org.apache.hadoop.hbase.regionserver.InternalScanner;
117import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
118import org.apache.hadoop.hbase.regionserver.Region;
119import org.apache.hadoop.hbase.regionserver.RegionScanner;
120import org.apache.hadoop.hbase.regionserver.RegionServerServices;
121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
122import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
123import org.apache.hadoop.hbase.security.User;
124import org.apache.hadoop.hbase.security.UserProvider;
125import org.apache.hadoop.hbase.security.access.AccessController;
126import org.apache.hadoop.hbase.security.access.PermissionStorage;
127import org.apache.hadoop.hbase.security.access.SecureTestUtil;
128import org.apache.hadoop.hbase.security.token.TokenProvider;
129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
131import org.apache.hadoop.hbase.util.Bytes;
132import org.apache.hadoop.hbase.util.CommonFSUtils;
133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
134import org.apache.hadoop.hbase.util.FSUtils;
135import org.apache.hadoop.hbase.util.JVM;
136import org.apache.hadoop.hbase.util.JVMClusterUtil;
137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
139import org.apache.hadoop.hbase.util.Pair;
140import org.apache.hadoop.hbase.util.RetryCounter;
141import org.apache.hadoop.hbase.util.Threads;
142import org.apache.hadoop.hbase.wal.WAL;
143import org.apache.hadoop.hbase.wal.WALFactory;
144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
145import org.apache.hadoop.hbase.zookeeper.ZKConfig;
146import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
147import org.apache.hadoop.hdfs.DFSClient;
148import org.apache.hadoop.hdfs.DistributedFileSystem;
149import org.apache.hadoop.hdfs.MiniDFSCluster;
150import org.apache.hadoop.hdfs.server.datanode.DataNode;
151import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
152import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
153import org.apache.hadoop.mapred.JobConf;
154import org.apache.hadoop.mapred.MiniMRCluster;
155import org.apache.hadoop.minikdc.MiniKdc;
156import org.apache.yetus.audience.InterfaceAudience;
157import org.apache.yetus.audience.InterfaceStability;
158import org.apache.zookeeper.WatchedEvent;
159import org.apache.zookeeper.ZooKeeper;
160import org.apache.zookeeper.ZooKeeper.States;
161
162import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
163
164import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
165
166/**
167 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
168 * functionality. Create an instance and keep it around testing HBase.
169 * <p/>
170 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
171 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
172 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
173 * cluster.
174 * <p/>
175 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
176 * <p/>
177 * It does not set logging levels.
178 * <p/>
179 * In the configuration properties, default values for master-info-port and region-server-port are
180 * overridden such that a random port will be assigned (thus avoiding port contention if another
181 * local HBase instance is already running).
182 * <p/>
183 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
184 * setting it to true.
185 */
186@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
187@InterfaceStability.Evolving
188public class HBaseTestingUtil extends HBaseZKTestingUtil {
189
190  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
191
192  private MiniDFSCluster dfsCluster = null;
193  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
194
195  private volatile HBaseClusterInterface hbaseCluster = null;
196  private MiniMRCluster mrCluster = null;
197
198  /** If there is a mini cluster running for this testing utility instance. */
199  private volatile boolean miniClusterRunning;
200
201  private String hadoopLogDir;
202
203  /**
204   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
205   */
206  private Path dataTestDirOnTestFS = null;
207
208  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
209
210  /** Filesystem URI used for map-reduce mini-cluster setup */
211  private static String FS_URI;
212
213  /** This is for unit tests parameterized with a single boolean. */
214  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
215
216  /**
217   * Checks to see if a specific port is available.
218   * @param port the port number to check for availability
219   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
220   */
221  public static boolean available(int port) {
222    ServerSocket ss = null;
223    DatagramSocket ds = null;
224    try {
225      ss = new ServerSocket(port);
226      ss.setReuseAddress(true);
227      ds = new DatagramSocket(port);
228      ds.setReuseAddress(true);
229      return true;
230    } catch (IOException e) {
231      // Do nothing
232    } finally {
233      if (ds != null) {
234        ds.close();
235      }
236
237      if (ss != null) {
238        try {
239          ss.close();
240        } catch (IOException e) {
241          /* should not be thrown */
242        }
243      }
244    }
245
246    return false;
247  }
248
249  /**
250   * Create all combinations of Bloom filters and compression algorithms for testing.
251   */
252  private static List<Object[]> bloomAndCompressionCombinations() {
253    List<Object[]> configurations = new ArrayList<>();
254    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
255      for (BloomType bloomType : BloomType.values()) {
256        configurations.add(new Object[] { comprAlgo, bloomType });
257      }
258    }
259    return Collections.unmodifiableList(configurations);
260  }
261
262  /**
263   * Create combination of memstoreTS and tags
264   */
265  private static List<Object[]> memStoreTSAndTagsCombination() {
266    List<Object[]> configurations = new ArrayList<>();
267    configurations.add(new Object[] { false, false });
268    configurations.add(new Object[] { false, true });
269    configurations.add(new Object[] { true, false });
270    configurations.add(new Object[] { true, true });
271    return Collections.unmodifiableList(configurations);
272  }
273
274  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
275    List<Object[]> configurations = new ArrayList<>();
276    configurations.add(new Object[] { false, false, true });
277    configurations.add(new Object[] { false, false, false });
278    configurations.add(new Object[] { false, true, true });
279    configurations.add(new Object[] { false, true, false });
280    configurations.add(new Object[] { true, false, true });
281    configurations.add(new Object[] { true, false, false });
282    configurations.add(new Object[] { true, true, true });
283    configurations.add(new Object[] { true, true, false });
284    return Collections.unmodifiableList(configurations);
285  }
286
287  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
288    bloomAndCompressionCombinations();
289
290  /**
291   * <p>
292   * Create an HBaseTestingUtility using a default configuration.
293   * <p>
294   * Initially, all tmp files are written to a local test data directory. Once
295   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
296   * data will be written to the DFS directory instead.
297   */
298  public HBaseTestingUtil() {
299    this(HBaseConfiguration.create());
300  }
301
302  /**
303   * <p>
304   * Create an HBaseTestingUtility using a given configuration.
305   * <p>
306   * Initially, all tmp files are written to a local test data directory. Once
307   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
308   * data will be written to the DFS directory instead.
309   * @param conf The configuration to use for further operations
310   */
311  public HBaseTestingUtil(@Nullable Configuration conf) {
312    super(conf);
313
314    // a hbase checksum verification failure will cause unit tests to fail
315    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
316
317    // Save this for when setting default file:// breaks things
318    if (this.conf.get("fs.defaultFS") != null) {
319      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
320    }
321    if (this.conf.get(HConstants.HBASE_DIR) != null) {
322      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
323    }
324    // Every cluster is a local cluster until we start DFS
325    // Note that conf could be null, but this.conf will not be
326    String dataTestDir = getDataTestDir().toString();
327    this.conf.set("fs.defaultFS", "file:///");
328    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
329    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
330    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
331    // If the value for random ports isn't set set it to true, thus making
332    // tests opt-out for random port assignment
333    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
334      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
335  }
336
337  /**
338   * Close both the region {@code r} and it's underlying WAL. For use in tests.
339   */
340  public static void closeRegionAndWAL(final Region r) throws IOException {
341    closeRegionAndWAL((HRegion) r);
342  }
343
344  /**
345   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
346   */
347  public static void closeRegionAndWAL(final HRegion r) throws IOException {
348    if (r == null) return;
349    r.close();
350    if (r.getWAL() == null) return;
351    r.getWAL().close();
352  }
353
354  /**
355   * Start mini secure cluster with given kdc and principals.
356   * @param kdc              Mini kdc server
357   * @param servicePrincipal Service principal without realm.
358   * @param spnegoPrincipal  Spnego principal without realm.
359   * @return Handler to shutdown the cluster
360   */
361  public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
362    String spnegoPrincipal) throws Exception {
363    Configuration conf = getConfiguration();
364
365    SecureTestUtil.enableSecurity(conf);
366    VisibilityTestUtil.enableVisiblityLabels(conf);
367    SecureTestUtil.verifyConfiguration(conf);
368
369    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
370      AccessController.class.getName() + ',' + TokenProvider.class.getName());
371
372    HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
373      spnegoPrincipal + '@' + kdc.getRealm());
374
375    startMiniCluster();
376    try {
377      waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
378    } catch (Exception e) {
379      shutdownMiniCluster();
380      throw e;
381    }
382
383    return this::shutdownMiniCluster;
384  }
385
386  /**
387   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
388   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
389   * by the Configuration. If say, a Connection was being used against a cluster that had been
390   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
391   * Rather than use the return direct, its usually best to make a copy and use that. Do
392   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
393   * @return Instance of Configuration.
394   */
395  @Override
396  public Configuration getConfiguration() {
397    return super.getConfiguration();
398  }
399
400  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
401    this.hbaseCluster = hbaseCluster;
402  }
403
404  /**
405   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
406   * have many concurrent tests running if we need to. Moding a System property is not the way to do
407   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
408   * not anything can do about it at moment; single instance only is how the minidfscluster works.
409   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
410   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
411   * (We do not create them!).
412   * @return The calculated data test build directory, if newly-created.
413   */
414  @Override
415  protected Path setupDataTestDir() {
416    Path testPath = super.setupDataTestDir();
417    if (null == testPath) {
418      return null;
419    }
420
421    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
422
423    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
424    // we want our own value to ensure uniqueness on the same machine
425    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
426
427    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
428    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
429    return testPath;
430  }
431
432  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
433
434    String sysValue = System.getProperty(propertyName);
435
436    if (sysValue != null) {
437      // There is already a value set. So we do nothing but hope
438      // that there will be no conflicts
439      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
440        + " so I do NOT create it in " + parent);
441      String confValue = conf.get(propertyName);
442      if (confValue != null && !confValue.endsWith(sysValue)) {
443        LOG.warn(propertyName + " property value differs in configuration and system: "
444          + "Configuration=" + confValue + " while System=" + sysValue
445          + " Erasing configuration value by system value.");
446      }
447      conf.set(propertyName, sysValue);
448    } else {
449      // Ok, it's not set, so we create it as a subdirectory
450      createSubDir(propertyName, parent, subDirName);
451      System.setProperty(propertyName, conf.get(propertyName));
452    }
453  }
454
455  /**
456   * @return Where to write test data on the test filesystem; Returns working directory for the test
457   *         filesystem by default
458   * @see #setupDataTestDirOnTestFS()
459   * @see #getTestFileSystem()
460   */
461  private Path getBaseTestDirOnTestFS() throws IOException {
462    FileSystem fs = getTestFileSystem();
463    return new Path(fs.getWorkingDirectory(), "test-data");
464  }
465
466  /**
467   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
468   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
469   * on it.
470   * @return a unique path in the test filesystem
471   */
472  public Path getDataTestDirOnTestFS() throws IOException {
473    if (dataTestDirOnTestFS == null) {
474      setupDataTestDirOnTestFS();
475    }
476
477    return dataTestDirOnTestFS;
478  }
479
480  /**
481   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
482   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
483   * on it.
484   * @return a unique path in the test filesystem
485   * @param subdirName name of the subdir to create under the base test dir
486   */
487  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
488    return new Path(getDataTestDirOnTestFS(), subdirName);
489  }
490
491  /**
492   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
493   * setup.
494   */
495  private void setupDataTestDirOnTestFS() throws IOException {
496    if (dataTestDirOnTestFS != null) {
497      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
498      return;
499    }
500    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
501  }
502
503  /**
504   * Sets up a new path in test filesystem to be used by tests.
505   */
506  private Path getNewDataTestDirOnTestFS() throws IOException {
507    // The file system can be either local, mini dfs, or if the configuration
508    // is supplied externally, it can be an external cluster FS. If it is a local
509    // file system, the tests should use getBaseTestDir, otherwise, we can use
510    // the working directory, and create a unique sub dir there
511    FileSystem fs = getTestFileSystem();
512    Path newDataTestDir;
513    String randomStr = getRandomUUID().toString();
514    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
515      newDataTestDir = new Path(getDataTestDir(), randomStr);
516      File dataTestDir = new File(newDataTestDir.toString());
517      if (deleteOnExit()) dataTestDir.deleteOnExit();
518    } else {
519      Path base = getBaseTestDirOnTestFS();
520      newDataTestDir = new Path(base, randomStr);
521      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
522    }
523    return newDataTestDir;
524  }
525
526  /**
527   * Cleans the test data directory on the test filesystem.
528   * @return True if we removed the test dirs
529   */
530  public boolean cleanupDataTestDirOnTestFS() throws IOException {
531    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
532    if (ret) {
533      dataTestDirOnTestFS = null;
534    }
535    return ret;
536  }
537
538  /**
539   * Cleans a subdirectory under the test data directory on the test filesystem.
540   * @return True if we removed child
541   */
542  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
543    Path cpath = getDataTestDirOnTestFS(subdirName);
544    return getTestFileSystem().delete(cpath, true);
545  }
546
547  /**
548   * Start a minidfscluster.
549   * @param servers How many DNs to start.
550   * @see #shutdownMiniDFSCluster()
551   * @return The mini dfs cluster created.
552   */
553  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
554    return startMiniDFSCluster(servers, null);
555  }
556
557  /**
558   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
559   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
560   * instances of the datanodes will have the same host name.
561   * @param hosts hostnames DNs to run on.
562   * @see #shutdownMiniDFSCluster()
563   * @return The mini dfs cluster created.
564   */
565  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
566    if (hosts != null && hosts.length != 0) {
567      return startMiniDFSCluster(hosts.length, hosts);
568    } else {
569      return startMiniDFSCluster(1, null);
570    }
571  }
572
573  /**
574   * Start a minidfscluster. Can only create one.
575   * @param servers How many DNs to start.
576   * @param hosts   hostnames DNs to run on.
577   * @see #shutdownMiniDFSCluster()
578   * @return The mini dfs cluster created.
579   */
580  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
581    return startMiniDFSCluster(servers, null, hosts);
582  }
583
584  private void setFs() throws IOException {
585    if (this.dfsCluster == null) {
586      LOG.info("Skipping setting fs because dfsCluster is null");
587      return;
588    }
589    FileSystem fs = this.dfsCluster.getFileSystem();
590    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
591
592    // re-enable this check with dfs
593    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
594  }
595
596  // Workaround to avoid IllegalThreadStateException
597  // See HBASE-27148 for more details
598  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
599
600    private volatile boolean stopped = false;
601
602    private final MiniDFSCluster cluster;
603
604    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
605      super("FsDatasetAsyncDiskServiceFixer");
606      setDaemon(true);
607      this.cluster = cluster;
608    }
609
610    @Override
611    public void run() {
612      while (!stopped) {
613        try {
614          Thread.sleep(30000);
615        } catch (InterruptedException e) {
616          Thread.currentThread().interrupt();
617          continue;
618        }
619        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
620        // timeout of the thread pool executor is 60 seconds by default.
621        try {
622          for (DataNode dn : cluster.getDataNodes()) {
623            FsDatasetSpi<?> dataset = dn.getFSDataset();
624            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
625            service.setAccessible(true);
626            Object asyncDiskService = service.get(dataset);
627            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
628            group.setAccessible(true);
629            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
630            if (threadGroup.isDaemon()) {
631              threadGroup.setDaemon(false);
632            }
633          }
634        } catch (NoSuchFieldException e) {
635          LOG.debug("NoSuchFieldException: " + e.getMessage()
636            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
637            + "See HBASE-27595 for details.");
638        } catch (Exception e) {
639          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
640        }
641      }
642    }
643
644    void shutdown() {
645      stopped = true;
646      interrupt();
647    }
648  }
649
650  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
651    throws Exception {
652    createDirsAndSetProperties();
653    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
654
655    this.dfsCluster =
656      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
657    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
658    this.dfsClusterFixer.start();
659    // Set this just-started cluster as our filesystem.
660    setFs();
661
662    // Wait for the cluster to be totally up
663    this.dfsCluster.waitClusterUp();
664
665    // reset the test directory for test file system
666    dataTestDirOnTestFS = null;
667    String dataTestDir = getDataTestDir().toString();
668    conf.set(HConstants.HBASE_DIR, dataTestDir);
669    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
670
671    return this.dfsCluster;
672  }
673
674  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
675    createDirsAndSetProperties();
676    dfsCluster =
677      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
678    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
679    this.dfsClusterFixer.start();
680    return dfsCluster;
681  }
682
683  /**
684   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
685   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
686   * the conf.
687   *
688   * <pre>
689   * Configuration conf = TEST_UTIL.getConfiguration();
690   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
691   *   Map.Entry&lt;String, String&gt; e = i.next();
692   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
693   * }
694   * </pre>
695   */
696  private void createDirsAndSetProperties() throws IOException {
697    setupClusterTestDir();
698    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
699    createDirAndSetProperty("test.cache.data");
700    createDirAndSetProperty("hadoop.tmp.dir");
701    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
702    createDirAndSetProperty("mapreduce.cluster.local.dir");
703    createDirAndSetProperty("mapreduce.cluster.temp.dir");
704    enableShortCircuit();
705
706    Path root = getDataTestDirOnTestFS("hadoop");
707    conf.set(MapreduceTestingShim.getMROutputDirProp(),
708      new Path(root, "mapred-output-dir").toString());
709    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
710    conf.set("mapreduce.jobtracker.staging.root.dir",
711      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
712    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
713    conf.set("yarn.app.mapreduce.am.staging-dir",
714      new Path(root, "mapreduce-am-staging-root-dir").toString());
715
716    // Frustrate yarn's and hdfs's attempts at writing /tmp.
717    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
718    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
719    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
720    createDirAndSetProperty("yarn.nodemanager.log-dirs");
721    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
722    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
723    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
724    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
725    createDirAndSetProperty("dfs.journalnode.edits.dir");
726    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
727    createDirAndSetProperty("nfs.dump.dir");
728    createDirAndSetProperty("java.io.tmpdir");
729    createDirAndSetProperty("dfs.journalnode.edits.dir");
730    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
731    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
732  }
733
734  /**
735   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
736   * Default to false.
737   */
738  public boolean isNewVersionBehaviorEnabled() {
739    final String propName = "hbase.tests.new.version.behavior";
740    String v = System.getProperty(propName);
741    if (v != null) {
742      return Boolean.parseBoolean(v);
743    }
744    return false;
745  }
746
747  /**
748   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
749   * allows to specify this parameter on the command line. If not set, default is true.
750   */
751  public boolean isReadShortCircuitOn() {
752    final String propName = "hbase.tests.use.shortcircuit.reads";
753    String readOnProp = System.getProperty(propName);
754    if (readOnProp != null) {
755      return Boolean.parseBoolean(readOnProp);
756    } else {
757      return conf.getBoolean(propName, false);
758    }
759  }
760
761  /**
762   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
763   * including skipping the hdfs checksum checks.
764   */
765  private void enableShortCircuit() {
766    if (isReadShortCircuitOn()) {
767      String curUser = System.getProperty("user.name");
768      LOG.info("read short circuit is ON for user " + curUser);
769      // read short circuit, for hdfs
770      conf.set("dfs.block.local-path-access.user", curUser);
771      // read short circuit, for hbase
772      conf.setBoolean("dfs.client.read.shortcircuit", true);
773      // Skip checking checksum, for the hdfs client and the datanode
774      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
775    } else {
776      LOG.info("read short circuit is OFF");
777    }
778  }
779
780  private String createDirAndSetProperty(final String property) {
781    return createDirAndSetProperty(property, property);
782  }
783
784  private String createDirAndSetProperty(final String relPath, String property) {
785    String path = getDataTestDir(relPath).toString();
786    System.setProperty(property, path);
787    conf.set(property, path);
788    new File(path).mkdirs();
789    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
790    return path;
791  }
792
793  /**
794   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
795   */
796  public void shutdownMiniDFSCluster() throws IOException {
797    if (this.dfsCluster != null) {
798      // The below throws an exception per dn, AsynchronousCloseException.
799      this.dfsCluster.shutdown();
800      dfsCluster = null;
801      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
802      // have a fixer
803      if (dfsClusterFixer != null) {
804        this.dfsClusterFixer.shutdown();
805        dfsClusterFixer = null;
806      }
807      dataTestDirOnTestFS = null;
808      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
809    }
810  }
811
812  /**
813   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
814   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
815   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
816   * @see #startMiniCluster(StartTestingClusterOption option)
817   * @see #shutdownMiniDFSCluster()
818   */
819  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
820    StartTestingClusterOption option = StartTestingClusterOption.builder()
821      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
822    return startMiniCluster(option);
823  }
824
825  /**
826   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
827   * value can be found in {@link StartTestingClusterOption.Builder}.
828   * @see #startMiniCluster(StartTestingClusterOption option)
829   * @see #shutdownMiniDFSCluster()
830   */
831  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
832    return startMiniCluster(StartTestingClusterOption.builder().build());
833  }
834
835  /**
836   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
837   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
838   * under System property test.build.data, to be cleaned up on exit.
839   * @see #shutdownMiniDFSCluster()
840   */
841  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
842    throws Exception {
843    LOG.info("Starting up minicluster with option: {}", option);
844
845    // If we already put up a cluster, fail.
846    if (miniClusterRunning) {
847      throw new IllegalStateException("A mini-cluster is already running");
848    }
849    miniClusterRunning = true;
850
851    setupClusterTestDir();
852
853    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
854    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
855    if (dfsCluster == null) {
856      LOG.info("STARTING DFS");
857      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
858    } else {
859      LOG.info("NOT STARTING DFS");
860    }
861
862    // Start up a zk cluster.
863    if (getZkCluster() == null) {
864      startMiniZKCluster(option.getNumZkServers());
865    }
866
867    // Start the MiniHBaseCluster
868    return startMiniHBaseCluster(option);
869  }
870
871  /**
872   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
873   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
874   * @return Reference to the hbase mini hbase cluster.
875   * @see #startMiniCluster(StartTestingClusterOption)
876   * @see #shutdownMiniHBaseCluster()
877   */
878  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
879    throws IOException, InterruptedException {
880    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
881    createRootDir(option.isCreateRootDir());
882    if (option.isCreateWALDir()) {
883      createWALRootDir();
884    }
885    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
886    // for tests that do not read hbase-defaults.xml
887    setHBaseFsTmpDir();
888
889    // These settings will make the server waits until this exact number of
890    // regions servers are connected.
891    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
892      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
893    }
894    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
895      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
896    }
897
898    Configuration c = new Configuration(this.conf);
899    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
900      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
901      option.getMasterClass(), option.getRsClass());
902    // Populate the master address configuration from mini cluster configuration.
903    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
904    // Don't leave here till we've done a successful scan of the hbase:meta
905    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
906      ResultScanner s = t.getScanner(new Scan())) {
907      for (;;) {
908        if (s.next() == null) {
909          break;
910        }
911      }
912    }
913
914    getAdmin(); // create immediately the hbaseAdmin
915    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
916
917    return (SingleProcessHBaseCluster) hbaseCluster;
918  }
919
920  /**
921   * Starts up mini hbase cluster using default options. Default options can be found in
922   * {@link StartTestingClusterOption.Builder}.
923   * @see #startMiniHBaseCluster(StartTestingClusterOption)
924   * @see #shutdownMiniHBaseCluster()
925   */
926  public SingleProcessHBaseCluster startMiniHBaseCluster()
927    throws IOException, InterruptedException {
928    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
929  }
930
931  /**
932   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
933   * {@link #startMiniCluster()}. All other options will use default values, defined in
934   * {@link StartTestingClusterOption.Builder}.
935   * @param numMasters       Master node number.
936   * @param numRegionServers Number of region servers.
937   * @return The mini HBase cluster created.
938   * @see #shutdownMiniHBaseCluster()
939   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
940   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
941   * @see #startMiniHBaseCluster(StartTestingClusterOption)
942   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
943   */
944  @Deprecated
945  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
946    throws IOException, InterruptedException {
947    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
948      .numRegionServers(numRegionServers).build();
949    return startMiniHBaseCluster(option);
950  }
951
952  /**
953   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
954   * {@link #startMiniCluster()}. All other options will use default values, defined in
955   * {@link StartTestingClusterOption.Builder}.
956   * @param numMasters       Master node number.
957   * @param numRegionServers Number of region servers.
958   * @param rsPorts          Ports that RegionServer should use.
959   * @return The mini HBase cluster created.
960   * @see #shutdownMiniHBaseCluster()
961   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
962   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
963   * @see #startMiniHBaseCluster(StartTestingClusterOption)
964   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
965   */
966  @Deprecated
967  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
968    List<Integer> rsPorts) throws IOException, InterruptedException {
969    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
970      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
971    return startMiniHBaseCluster(option);
972  }
973
974  /**
975   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
976   * {@link #startMiniCluster()}. All other options will use default values, defined in
977   * {@link StartTestingClusterOption.Builder}.
978   * @param numMasters       Master node number.
979   * @param numRegionServers Number of region servers.
980   * @param rsPorts          Ports that RegionServer should use.
981   * @param masterClass      The class to use as HMaster, or null for default.
982   * @param rsClass          The class to use as HRegionServer, or null for default.
983   * @param createRootDir    Whether to create a new root or data directory path.
984   * @param createWALDir     Whether to create a new WAL directory.
985   * @return The mini HBase cluster created.
986   * @see #shutdownMiniHBaseCluster()
987   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
988   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
989   * @see #startMiniHBaseCluster(StartTestingClusterOption)
990   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
991   */
992  @Deprecated
993  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
994    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
995    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
996    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
997    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
998      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
999      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1000    return startMiniHBaseCluster(option);
1001  }
1002
1003  /**
1004   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1005   * want to keep dfs/zk up and just stop/start hbase.
1006   * @param servers number of region servers
1007   */
1008  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1009    this.restartHBaseCluster(servers, null);
1010  }
1011
1012  public void restartHBaseCluster(int servers, List<Integer> ports)
1013    throws IOException, InterruptedException {
1014    StartTestingClusterOption option =
1015      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1016    restartHBaseCluster(option);
1017    invalidateConnection();
1018  }
1019
1020  public void restartHBaseCluster(StartTestingClusterOption option)
1021    throws IOException, InterruptedException {
1022    closeConnection();
1023    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1024      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1025      option.getMasterClass(), option.getRsClass());
1026    // Don't leave here till we've done a successful scan of the hbase:meta
1027    Connection conn = ConnectionFactory.createConnection(this.conf);
1028    Table t = conn.getTable(TableName.META_TABLE_NAME);
1029    ResultScanner s = t.getScanner(new Scan());
1030    while (s.next() != null) {
1031      // do nothing
1032    }
1033    LOG.info("HBase has been restarted");
1034    s.close();
1035    t.close();
1036    conn.close();
1037  }
1038
1039  /**
1040   * Returns current mini hbase cluster. Only has something in it after a call to
1041   * {@link #startMiniCluster()}.
1042   * @see #startMiniCluster()
1043   */
1044  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1045    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1046      return (SingleProcessHBaseCluster) this.hbaseCluster;
1047    }
1048    throw new RuntimeException(
1049      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1050  }
1051
1052  /**
1053   * Stops mini hbase, zk, and hdfs clusters.
1054   * @see #startMiniCluster(int)
1055   */
1056  public void shutdownMiniCluster() throws IOException {
1057    LOG.info("Shutting down minicluster");
1058    shutdownMiniHBaseCluster();
1059    shutdownMiniDFSCluster();
1060    shutdownMiniZKCluster();
1061
1062    cleanupTestDir();
1063    miniClusterRunning = false;
1064    LOG.info("Minicluster is down");
1065  }
1066
1067  /**
1068   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1069   * @throws java.io.IOException in case command is unsuccessful
1070   */
1071  public void shutdownMiniHBaseCluster() throws IOException {
1072    cleanup();
1073    if (this.hbaseCluster != null) {
1074      this.hbaseCluster.shutdown();
1075      // Wait till hbase is down before going on to shutdown zk.
1076      this.hbaseCluster.waitUntilShutDown();
1077      this.hbaseCluster = null;
1078    }
1079    if (zooKeeperWatcher != null) {
1080      zooKeeperWatcher.close();
1081      zooKeeperWatcher = null;
1082    }
1083  }
1084
1085  /**
1086   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1087   * @throws java.io.IOException throws in case command is unsuccessful
1088   */
1089  public void killMiniHBaseCluster() throws IOException {
1090    cleanup();
1091    if (this.hbaseCluster != null) {
1092      getMiniHBaseCluster().killAll();
1093      this.hbaseCluster = null;
1094    }
1095    if (zooKeeperWatcher != null) {
1096      zooKeeperWatcher.close();
1097      zooKeeperWatcher = null;
1098    }
1099  }
1100
1101  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1102  private void cleanup() throws IOException {
1103    closeConnection();
1104    // unset the configuration for MIN and MAX RS to start
1105    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1106    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1107  }
1108
1109  /**
1110   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1111   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1112   * If false, previous path is used. Note: this does not cause the root dir to be created.
1113   * @return Fully qualified path for the default hbase root dir
1114   */
1115  public Path getDefaultRootDirPath(boolean create) throws IOException {
1116    if (!create) {
1117      return getDataTestDirOnTestFS();
1118    } else {
1119      return getNewDataTestDirOnTestFS();
1120    }
1121  }
1122
1123  /**
1124   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1125   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1126   * @return Fully qualified path for the default hbase root dir
1127   */
1128  public Path getDefaultRootDirPath() throws IOException {
1129    return getDefaultRootDirPath(false);
1130  }
1131
1132  /**
1133   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1134   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1135   * startup. You'd only use this method if you were doing manual operation.
1136   * @param create This flag decides whether to get a new root or data directory path or not, if it
1137   *               has been fetched already. Note : Directory will be made irrespective of whether
1138   *               path has been fetched or not. If directory already exists, it will be overwritten
1139   * @return Fully qualified path to hbase root dir
1140   */
1141  public Path createRootDir(boolean create) throws IOException {
1142    FileSystem fs = FileSystem.get(this.conf);
1143    Path hbaseRootdir = getDefaultRootDirPath(create);
1144    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1145    fs.mkdirs(hbaseRootdir);
1146    FSUtils.setVersion(fs, hbaseRootdir);
1147    return hbaseRootdir;
1148  }
1149
1150  /**
1151   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1152   * flag is false.
1153   * @return Fully qualified path to hbase root dir
1154   */
1155  public Path createRootDir() throws IOException {
1156    return createRootDir(false);
1157  }
1158
1159  /**
1160   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1161   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1162   * this method if you were doing manual operation.
1163   * @return Fully qualified path to hbase root dir
1164   */
1165  public Path createWALRootDir() throws IOException {
1166    FileSystem fs = FileSystem.get(this.conf);
1167    Path walDir = getNewDataTestDirOnTestFS();
1168    CommonFSUtils.setWALRootDir(this.conf, walDir);
1169    fs.mkdirs(walDir);
1170    return walDir;
1171  }
1172
1173  private void setHBaseFsTmpDir() throws IOException {
1174    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1175    if (hbaseFsTmpDirInString == null) {
1176      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1177      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1178    } else {
1179      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1180    }
1181  }
1182
1183  /**
1184   * Flushes all caches in the mini hbase cluster
1185   */
1186  public void flush() throws IOException {
1187    getMiniHBaseCluster().flushcache();
1188  }
1189
1190  /**
1191   * Flushes all caches in the mini hbase cluster
1192   */
1193  public void flush(TableName tableName) throws IOException {
1194    getMiniHBaseCluster().flushcache(tableName);
1195  }
1196
1197  /**
1198   * Compact all regions in the mini hbase cluster
1199   */
1200  public void compact(boolean major) throws IOException {
1201    getMiniHBaseCluster().compact(major);
1202  }
1203
1204  /**
1205   * Compact all of a table's reagion in the mini hbase cluster
1206   */
1207  public void compact(TableName tableName, boolean major) throws IOException {
1208    getMiniHBaseCluster().compact(tableName, major);
1209  }
1210
1211  /**
1212   * Create a table.
1213   * @return A Table instance for the created table.
1214   */
1215  public Table createTable(TableName tableName, String family) throws IOException {
1216    return createTable(tableName, new String[] { family });
1217  }
1218
1219  /**
1220   * Create a table.
1221   * @return A Table instance for the created table.
1222   */
1223  public Table createTable(TableName tableName, String[] families) throws IOException {
1224    List<byte[]> fams = new ArrayList<>(families.length);
1225    for (String family : families) {
1226      fams.add(Bytes.toBytes(family));
1227    }
1228    return createTable(tableName, fams.toArray(new byte[0][]));
1229  }
1230
1231  /**
1232   * Create a table.
1233   * @return A Table instance for the created table.
1234   */
1235  public Table createTable(TableName tableName, byte[] family) throws IOException {
1236    return createTable(tableName, new byte[][] { family });
1237  }
1238
1239  /**
1240   * Create a table with multiple regions.
1241   * @return A Table instance for the created table.
1242   */
1243  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1244    throws IOException {
1245    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1246    byte[] startKey = Bytes.toBytes("aaaaa");
1247    byte[] endKey = Bytes.toBytes("zzzzz");
1248    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1249
1250    return createTable(tableName, new byte[][] { family }, splitKeys);
1251  }
1252
1253  /**
1254   * Create a table.
1255   * @return A Table instance for the created table.
1256   */
1257  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1258    return createTable(tableName, families, (byte[][]) null);
1259  }
1260
1261  /**
1262   * Create a table with multiple regions.
1263   * @return A Table instance for the created table.
1264   */
1265  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1266    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1267  }
1268
1269  /**
1270   * Create a table with multiple regions.
1271   * @param replicaCount replica count.
1272   * @return A Table instance for the created table.
1273   */
1274  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1275    throws IOException {
1276    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1277  }
1278
1279  /**
1280   * Create a table.
1281   * @return A Table instance for the created table.
1282   */
1283  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1284    throws IOException {
1285    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1286  }
1287
1288  /**
1289   * Create a table.
1290   * @param tableName    the table name
1291   * @param families     the families
1292   * @param splitKeys    the splitkeys
1293   * @param replicaCount the region replica count
1294   * @return A Table instance for the created table.
1295   * @throws IOException throws IOException
1296   */
1297  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1298    int replicaCount) throws IOException {
1299    return createTable(tableName, families, splitKeys, replicaCount,
1300      new Configuration(getConfiguration()));
1301  }
1302
1303  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1304    byte[] endKey, int numRegions) throws IOException {
1305    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1306
1307    getAdmin().createTable(desc, startKey, endKey, numRegions);
1308    // HBaseAdmin only waits for regions to appear in hbase:meta we
1309    // should wait until they are assigned
1310    waitUntilAllRegionsAssigned(tableName);
1311    return getConnection().getTable(tableName);
1312  }
1313
1314  /**
1315   * Create a table.
1316   * @param c Configuration to use
1317   * @return A Table instance for the created table.
1318   */
1319  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1320    throws IOException {
1321    return createTable(htd, families, null, c);
1322  }
1323
1324  /**
1325   * Create a table.
1326   * @param htd       table descriptor
1327   * @param families  array of column families
1328   * @param splitKeys array of split keys
1329   * @param c         Configuration to use
1330   * @return A Table instance for the created table.
1331   * @throws IOException if getAdmin or createTable fails
1332   */
1333  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1334    Configuration c) throws IOException {
1335    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1336    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1337    // on is interfering.
1338    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1339  }
1340
1341  /**
1342   * Create a table.
1343   * @param htd       table descriptor
1344   * @param families  array of column families
1345   * @param splitKeys array of split keys
1346   * @param type      Bloom type
1347   * @param blockSize block size
1348   * @param c         Configuration to use
1349   * @return A Table instance for the created table.
1350   * @throws IOException if getAdmin or createTable fails
1351   */
1352
1353  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1354    BloomType type, int blockSize, Configuration c) throws IOException {
1355    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1356    for (byte[] family : families) {
1357      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1358        .setBloomFilterType(type).setBlocksize(blockSize);
1359      if (isNewVersionBehaviorEnabled()) {
1360        cfdb.setNewVersionBehavior(true);
1361      }
1362      builder.setColumnFamily(cfdb.build());
1363    }
1364    TableDescriptor td = builder.build();
1365    if (splitKeys != null) {
1366      getAdmin().createTable(td, splitKeys);
1367    } else {
1368      getAdmin().createTable(td);
1369    }
1370    // HBaseAdmin only waits for regions to appear in hbase:meta
1371    // we should wait until they are assigned
1372    waitUntilAllRegionsAssigned(td.getTableName());
1373    return getConnection().getTable(td.getTableName());
1374  }
1375
1376  /**
1377   * Create a table.
1378   * @param htd       table descriptor
1379   * @param splitRows array of split keys
1380   * @return A Table instance for the created table.
1381   */
1382  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1383    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1384    if (isNewVersionBehaviorEnabled()) {
1385      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1386        builder.setColumnFamily(
1387          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1388      }
1389    }
1390    if (splitRows != null) {
1391      getAdmin().createTable(builder.build(), splitRows);
1392    } else {
1393      getAdmin().createTable(builder.build());
1394    }
1395    // HBaseAdmin only waits for regions to appear in hbase:meta
1396    // we should wait until they are assigned
1397    waitUntilAllRegionsAssigned(htd.getTableName());
1398    return getConnection().getTable(htd.getTableName());
1399  }
1400
1401  /**
1402   * Create a table.
1403   * @param tableName    the table name
1404   * @param families     the families
1405   * @param splitKeys    the split keys
1406   * @param replicaCount the replica count
1407   * @param c            Configuration to use
1408   * @return A Table instance for the created table.
1409   */
1410  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1411    int replicaCount, final Configuration c) throws IOException {
1412    TableDescriptor htd =
1413      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1414    return createTable(htd, families, splitKeys, c);
1415  }
1416
1417  /**
1418   * Create a table.
1419   * @return A Table instance for the created table.
1420   */
1421  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1422    return createTable(tableName, new byte[][] { family }, numVersions);
1423  }
1424
1425  /**
1426   * Create a table.
1427   * @return A Table instance for the created table.
1428   */
1429  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1430    throws IOException {
1431    return createTable(tableName, families, numVersions, (byte[][]) null);
1432  }
1433
1434  /**
1435   * Create a table.
1436   * @return A Table instance for the created table.
1437   */
1438  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1439    byte[][] splitKeys) throws IOException {
1440    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1441    for (byte[] family : families) {
1442      ColumnFamilyDescriptorBuilder cfBuilder =
1443        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1444      if (isNewVersionBehaviorEnabled()) {
1445        cfBuilder.setNewVersionBehavior(true);
1446      }
1447      builder.setColumnFamily(cfBuilder.build());
1448    }
1449    if (splitKeys != null) {
1450      getAdmin().createTable(builder.build(), splitKeys);
1451    } else {
1452      getAdmin().createTable(builder.build());
1453    }
1454    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1455    // assigned
1456    waitUntilAllRegionsAssigned(tableName);
1457    return getConnection().getTable(tableName);
1458  }
1459
1460  /**
1461   * Create a table with multiple regions.
1462   * @return A Table instance for the created table.
1463   */
1464  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1465    throws IOException {
1466    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1467  }
1468
1469  /**
1470   * Create a table.
1471   * @return A Table instance for the created table.
1472   */
1473  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1474    throws IOException {
1475    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1476    for (byte[] family : families) {
1477      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1478        .setMaxVersions(numVersions).setBlocksize(blockSize);
1479      if (isNewVersionBehaviorEnabled()) {
1480        cfBuilder.setNewVersionBehavior(true);
1481      }
1482      builder.setColumnFamily(cfBuilder.build());
1483    }
1484    getAdmin().createTable(builder.build());
1485    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1486    // assigned
1487    waitUntilAllRegionsAssigned(tableName);
1488    return getConnection().getTable(tableName);
1489  }
1490
1491  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1492    String cpName) throws IOException {
1493    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1494    for (byte[] family : families) {
1495      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1496        .setMaxVersions(numVersions).setBlocksize(blockSize);
1497      if (isNewVersionBehaviorEnabled()) {
1498        cfBuilder.setNewVersionBehavior(true);
1499      }
1500      builder.setColumnFamily(cfBuilder.build());
1501    }
1502    if (cpName != null) {
1503      builder.setCoprocessor(cpName);
1504    }
1505    getAdmin().createTable(builder.build());
1506    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1507    // assigned
1508    waitUntilAllRegionsAssigned(tableName);
1509    return getConnection().getTable(tableName);
1510  }
1511
1512  /**
1513   * Create a table.
1514   * @return A Table instance for the created table.
1515   */
1516  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1517    throws IOException {
1518    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1519    int i = 0;
1520    for (byte[] family : families) {
1521      ColumnFamilyDescriptorBuilder cfBuilder =
1522        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1523      if (isNewVersionBehaviorEnabled()) {
1524        cfBuilder.setNewVersionBehavior(true);
1525      }
1526      builder.setColumnFamily(cfBuilder.build());
1527      i++;
1528    }
1529    getAdmin().createTable(builder.build());
1530    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1531    // assigned
1532    waitUntilAllRegionsAssigned(tableName);
1533    return getConnection().getTable(tableName);
1534  }
1535
1536  /**
1537   * Create a table.
1538   * @return A Table instance for the created table.
1539   */
1540  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1541    throws IOException {
1542    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1543    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1544    if (isNewVersionBehaviorEnabled()) {
1545      cfBuilder.setNewVersionBehavior(true);
1546    }
1547    builder.setColumnFamily(cfBuilder.build());
1548    getAdmin().createTable(builder.build(), splitRows);
1549    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1550    // assigned
1551    waitUntilAllRegionsAssigned(tableName);
1552    return getConnection().getTable(tableName);
1553  }
1554
1555  /**
1556   * Create a table with multiple regions.
1557   * @return A Table instance for the created table.
1558   */
1559  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1560    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1561  }
1562
1563  /**
1564   * Set the number of Region replicas.
1565   */
1566  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1567    throws IOException, InterruptedException {
1568    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1569      .setRegionReplication(replicaCount).build();
1570    admin.modifyTable(desc);
1571  }
1572
1573  /**
1574   * Set the number of Region replicas.
1575   */
1576  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1577    throws ExecutionException, IOException, InterruptedException {
1578    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1579      .setRegionReplication(replicaCount).build();
1580    admin.modifyTable(desc).get();
1581  }
1582
1583  /**
1584   * Drop an existing table
1585   * @param tableName existing table
1586   */
1587  public void deleteTable(TableName tableName) throws IOException {
1588    try {
1589      getAdmin().disableTable(tableName);
1590    } catch (TableNotEnabledException e) {
1591      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1592    }
1593    getAdmin().deleteTable(tableName);
1594  }
1595
1596  /**
1597   * Drop an existing table
1598   * @param tableName existing table
1599   */
1600  public void deleteTableIfAny(TableName tableName) throws IOException {
1601    try {
1602      deleteTable(tableName);
1603    } catch (TableNotFoundException e) {
1604      // ignore
1605    }
1606  }
1607
1608  // ==========================================================================
1609  // Canned table and table descriptor creation
1610
1611  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1612  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1613  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1614  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1615  private static final int MAXVERSIONS = 3;
1616
1617  public static final char FIRST_CHAR = 'a';
1618  public static final char LAST_CHAR = 'z';
1619  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1620  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1621
1622  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1623    return createModifyableTableDescriptor(TableName.valueOf(name),
1624      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1625      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1626  }
1627
1628  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1629    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1630    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1631    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1632      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1633        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1634        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1635      if (isNewVersionBehaviorEnabled()) {
1636        cfBuilder.setNewVersionBehavior(true);
1637      }
1638      builder.setColumnFamily(cfBuilder.build());
1639    }
1640    return builder.build();
1641  }
1642
1643  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1644    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1645    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1646    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1647      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1648        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1649        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1650      if (isNewVersionBehaviorEnabled()) {
1651        cfBuilder.setNewVersionBehavior(true);
1652      }
1653      builder.setColumnFamily(cfBuilder.build());
1654    }
1655    return builder;
1656  }
1657
1658  /**
1659   * Create a table of name <code>name</code>.
1660   * @param name Name to give table.
1661   * @return Column descriptor.
1662   */
1663  public TableDescriptor createTableDescriptor(final TableName name) {
1664    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1665      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1666  }
1667
1668  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1669    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1670  }
1671
1672  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1673    int maxVersions) {
1674    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1675    for (byte[] family : families) {
1676      ColumnFamilyDescriptorBuilder cfBuilder =
1677        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1678      if (isNewVersionBehaviorEnabled()) {
1679        cfBuilder.setNewVersionBehavior(true);
1680      }
1681      builder.setColumnFamily(cfBuilder.build());
1682    }
1683    return builder.build();
1684  }
1685
1686  /**
1687   * Create an HRegion that writes to the local tmp dirs
1688   * @param desc     a table descriptor indicating which table the region belongs to
1689   * @param startKey the start boundary of the region
1690   * @param endKey   the end boundary of the region
1691   * @return a region that writes to local dir for testing
1692   */
1693  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1694    throws IOException {
1695    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1696      .setEndKey(endKey).build();
1697    return createLocalHRegion(hri, desc);
1698  }
1699
1700  /**
1701   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1702   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1703   */
1704  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1705    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1706  }
1707
1708  /**
1709   * Create an HRegion that writes to the local tmp dirs with specified wal
1710   * @param info regioninfo
1711   * @param conf configuration
1712   * @param desc table descriptor
1713   * @param wal  wal for this region.
1714   * @return created hregion
1715   */
1716  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1717    WAL wal) throws IOException {
1718    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1719      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1720    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1721  }
1722
1723  /**
1724   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1725   *         when done.
1726   */
1727  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1728    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1729    throws IOException {
1730    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1731      durability, wal, null, families);
1732  }
1733
1734  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1735    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1736    boolean[] compactedMemStore, byte[]... families) throws IOException {
1737    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1738    builder.setReadOnly(isReadOnly);
1739    int i = 0;
1740    for (byte[] family : families) {
1741      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1742      if (compactedMemStore != null && i < compactedMemStore.length) {
1743        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1744      } else {
1745        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1746
1747      }
1748      i++;
1749      // Set default to be three versions.
1750      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1751      builder.setColumnFamily(cfBuilder.build());
1752    }
1753    builder.setDurability(durability);
1754    RegionInfo info =
1755      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1756    return createLocalHRegion(info, conf, builder.build(), wal);
1757  }
1758
1759  //
1760  // ==========================================================================
1761
1762  /**
1763   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1764   * read.
1765   * @param tableName existing table
1766   * @return HTable to that new table
1767   */
1768  public Table deleteTableData(TableName tableName) throws IOException {
1769    Table table = getConnection().getTable(tableName);
1770    Scan scan = new Scan();
1771    ResultScanner resScan = table.getScanner(scan);
1772    for (Result res : resScan) {
1773      Delete del = new Delete(res.getRow());
1774      table.delete(del);
1775    }
1776    resScan = table.getScanner(scan);
1777    resScan.close();
1778    return table;
1779  }
1780
1781  /**
1782   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1783   * table.
1784   * @param tableName       table which must exist.
1785   * @param preserveRegions keep the existing split points
1786   * @return HTable for the new table
1787   */
1788  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1789    throws IOException {
1790    Admin admin = getAdmin();
1791    if (!admin.isTableDisabled(tableName)) {
1792      admin.disableTable(tableName);
1793    }
1794    admin.truncateTable(tableName, preserveRegions);
1795    return getConnection().getTable(tableName);
1796  }
1797
1798  /**
1799   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1800   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1801   * preserve regions of existing table.
1802   * @param tableName table which must exist.
1803   * @return HTable for the new table
1804   */
1805  public Table truncateTable(final TableName tableName) throws IOException {
1806    return truncateTable(tableName, false);
1807  }
1808
1809  /**
1810   * Load table with rows from 'aaa' to 'zzz'.
1811   * @param t Table
1812   * @param f Family
1813   * @return Count of rows loaded.
1814   */
1815  public int loadTable(final Table t, final byte[] f) throws IOException {
1816    return loadTable(t, new byte[][] { f });
1817  }
1818
1819  /**
1820   * Load table with rows from 'aaa' to 'zzz'.
1821   * @param t Table
1822   * @param f Family
1823   * @return Count of rows loaded.
1824   */
1825  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1826    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1827  }
1828
1829  /**
1830   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1831   * @param t Table
1832   * @param f Array of Families to load
1833   * @return Count of rows loaded.
1834   */
1835  public int loadTable(final Table t, final byte[][] f) throws IOException {
1836    return loadTable(t, f, null);
1837  }
1838
1839  /**
1840   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1841   * @param t     Table
1842   * @param f     Array of Families to load
1843   * @param value the values of the cells. If null is passed, the row key is used as value
1844   * @return Count of rows loaded.
1845   */
1846  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1847    return loadTable(t, f, value, true);
1848  }
1849
1850  /**
1851   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1852   * @param t     Table
1853   * @param f     Array of Families to load
1854   * @param value the values of the cells. If null is passed, the row key is used as value
1855   * @return Count of rows loaded.
1856   */
1857  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1858    throws IOException {
1859    List<Put> puts = new ArrayList<>();
1860    for (byte[] row : HBaseTestingUtil.ROWS) {
1861      Put put = new Put(row);
1862      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1863      for (int i = 0; i < f.length; i++) {
1864        byte[] value1 = value != null ? value : row;
1865        put.addColumn(f[i], f[i], value1);
1866      }
1867      puts.add(put);
1868    }
1869    t.put(puts);
1870    return puts.size();
1871  }
1872
1873  /**
1874   * A tracker for tracking and validating table rows generated with
1875   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1876   */
1877  public static class SeenRowTracker {
1878    int dim = 'z' - 'a' + 1;
1879    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1880    byte[] startRow;
1881    byte[] stopRow;
1882
1883    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1884      this.startRow = startRow;
1885      this.stopRow = stopRow;
1886    }
1887
1888    void reset() {
1889      for (byte[] row : ROWS) {
1890        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1891      }
1892    }
1893
1894    int i(byte b) {
1895      return b - 'a';
1896    }
1897
1898    public void addRow(byte[] row) {
1899      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1900    }
1901
1902    /**
1903     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1904     * rows none
1905     */
1906    public void validate() {
1907      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1908        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1909          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1910            int count = seenRows[i(b1)][i(b2)][i(b3)];
1911            int expectedCount = 0;
1912            if (
1913              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1914                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1915            ) {
1916              expectedCount = 1;
1917            }
1918            if (count != expectedCount) {
1919              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1920              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1921                + "instead of " + expectedCount);
1922            }
1923          }
1924        }
1925      }
1926    }
1927  }
1928
1929  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1930    return loadRegion(r, f, false);
1931  }
1932
1933  public int loadRegion(final Region r, final byte[] f) throws IOException {
1934    return loadRegion((HRegion) r, f);
1935  }
1936
1937  /**
1938   * Load region with rows from 'aaa' to 'zzz'.
1939   * @param r     Region
1940   * @param f     Family
1941   * @param flush flush the cache if true
1942   * @return Count of rows loaded.
1943   */
1944  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1945    byte[] k = new byte[3];
1946    int rowCount = 0;
1947    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1948      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1949        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1950          k[0] = b1;
1951          k[1] = b2;
1952          k[2] = b3;
1953          Put put = new Put(k);
1954          put.setDurability(Durability.SKIP_WAL);
1955          put.addColumn(f, null, k);
1956          if (r.getWAL() == null) {
1957            put.setDurability(Durability.SKIP_WAL);
1958          }
1959          int preRowCount = rowCount;
1960          int pause = 10;
1961          int maxPause = 1000;
1962          while (rowCount == preRowCount) {
1963            try {
1964              r.put(put);
1965              rowCount++;
1966            } catch (RegionTooBusyException e) {
1967              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1968              Threads.sleep(pause);
1969            }
1970          }
1971        }
1972      }
1973      if (flush) {
1974        r.flush(true);
1975      }
1976    }
1977    return rowCount;
1978  }
1979
1980  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1981    throws IOException {
1982    for (int i = startRow; i < endRow; i++) {
1983      byte[] data = Bytes.toBytes(String.valueOf(i));
1984      Put put = new Put(data);
1985      put.addColumn(f, null, data);
1986      t.put(put);
1987    }
1988  }
1989
1990  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1991    throws IOException {
1992    for (int i = 0; i < totalRows; i++) {
1993      byte[] row = new byte[rowSize];
1994      Bytes.random(row);
1995      Put put = new Put(row);
1996      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1997      t.put(put);
1998    }
1999  }
2000
2001  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2002    int replicaId) throws IOException {
2003    for (int i = startRow; i < endRow; i++) {
2004      String failMsg = "Failed verification of row :" + i;
2005      byte[] data = Bytes.toBytes(String.valueOf(i));
2006      Get get = new Get(data);
2007      get.setReplicaId(replicaId);
2008      get.setConsistency(Consistency.TIMELINE);
2009      Result result = table.get(get);
2010      assertTrue(failMsg, result.containsColumn(f, null));
2011      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2012      Cell cell = result.getColumnLatestCell(f, null);
2013      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2014        cell.getValueOffset(), cell.getValueLength()));
2015    }
2016  }
2017
2018  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2019    throws IOException {
2020    verifyNumericRows((HRegion) region, f, startRow, endRow);
2021  }
2022
2023  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2024    throws IOException {
2025    verifyNumericRows(region, f, startRow, endRow, true);
2026  }
2027
2028  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2029    final boolean present) throws IOException {
2030    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2031  }
2032
2033  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2034    final boolean present) throws IOException {
2035    for (int i = startRow; i < endRow; i++) {
2036      String failMsg = "Failed verification of row :" + i;
2037      byte[] data = Bytes.toBytes(String.valueOf(i));
2038      Result result = region.get(new Get(data));
2039
2040      boolean hasResult = result != null && !result.isEmpty();
2041      assertEquals(failMsg + result, present, hasResult);
2042      if (!present) continue;
2043
2044      assertTrue(failMsg, result.containsColumn(f, null));
2045      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2046      Cell cell = result.getColumnLatestCell(f, null);
2047      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2048        cell.getValueOffset(), cell.getValueLength()));
2049    }
2050  }
2051
2052  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2053    throws IOException {
2054    for (int i = startRow; i < endRow; i++) {
2055      byte[] data = Bytes.toBytes(String.valueOf(i));
2056      Delete delete = new Delete(data);
2057      delete.addFamily(f);
2058      t.delete(delete);
2059    }
2060  }
2061
2062  /**
2063   * Return the number of rows in the given table.
2064   * @param table to count rows
2065   * @return count of rows
2066   */
2067  public static int countRows(final Table table) throws IOException {
2068    return countRows(table, new Scan());
2069  }
2070
2071  public static int countRows(final Table table, final Scan scan) throws IOException {
2072    try (ResultScanner results = table.getScanner(scan)) {
2073      int count = 0;
2074      while (results.next() != null) {
2075        count++;
2076      }
2077      return count;
2078    }
2079  }
2080
2081  public static int countRows(final Table table, final byte[]... families) throws IOException {
2082    Scan scan = new Scan();
2083    for (byte[] family : families) {
2084      scan.addFamily(family);
2085    }
2086    return countRows(table, scan);
2087  }
2088
2089  /**
2090   * Return the number of rows in the given table.
2091   */
2092  public int countRows(final TableName tableName) throws IOException {
2093    try (Table table = getConnection().getTable(tableName)) {
2094      return countRows(table);
2095    }
2096  }
2097
2098  public static int countRows(final Region region) throws IOException {
2099    return countRows(region, new Scan());
2100  }
2101
2102  public static int countRows(final Region region, final Scan scan) throws IOException {
2103    try (InternalScanner scanner = region.getScanner(scan)) {
2104      return countRows(scanner);
2105    }
2106  }
2107
2108  public static int countRows(final InternalScanner scanner) throws IOException {
2109    int scannedCount = 0;
2110    List<Cell> results = new ArrayList<>();
2111    boolean hasMore = true;
2112    while (hasMore) {
2113      hasMore = scanner.next(results);
2114      scannedCount += results.size();
2115      results.clear();
2116    }
2117    return scannedCount;
2118  }
2119
2120  /**
2121   * Return an md5 digest of the entire contents of a table.
2122   */
2123  public String checksumRows(final Table table) throws Exception {
2124    MessageDigest digest = MessageDigest.getInstance("MD5");
2125    try (ResultScanner results = table.getScanner(new Scan())) {
2126      for (Result res : results) {
2127        digest.update(res.getRow());
2128      }
2129    }
2130    return digest.toString();
2131  }
2132
2133  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2134  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2135  static {
2136    int i = 0;
2137    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2138      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2139        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2140          ROWS[i][0] = b1;
2141          ROWS[i][1] = b2;
2142          ROWS[i][2] = b3;
2143          i++;
2144        }
2145      }
2146    }
2147  }
2148
2149  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2150    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2151    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2152    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2153    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2154    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2155    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2156
2157  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2158    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2159    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2160    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2161    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2162    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2163    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2164
2165  /**
2166   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2167   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2168   * @return list of region info for regions added to meta
2169   */
2170  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2171    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2172    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2173      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2174      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2175      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2176        TableState.State.ENABLED);
2177      // add custom ones
2178      for (int i = 0; i < startKeys.length; i++) {
2179        int j = (i + 1) % startKeys.length;
2180        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2181          .setEndKey(startKeys[j]).build();
2182        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2183        newRegions.add(hri);
2184      }
2185      return newRegions;
2186    }
2187  }
2188
2189  /**
2190   * Create an unmanaged WAL. Be sure to close it when you're through.
2191   */
2192  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2193    throws IOException {
2194    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2195    // unless I pass along via the conf.
2196    Configuration confForWAL = new Configuration(conf);
2197    CommonFSUtils.setRootDir(confForWAL, rootDir);
2198    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8))
2199      .getWAL(hri);
2200  }
2201
2202  /**
2203   * Create a region with it's own WAL. Be sure to call
2204   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2205   */
2206  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2207    final Configuration conf, final TableDescriptor htd) throws IOException {
2208    return createRegionAndWAL(info, rootDir, conf, htd, true);
2209  }
2210
2211  /**
2212   * Create a region with it's own WAL. Be sure to call
2213   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2214   */
2215  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2216    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2217    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2218    region.setBlockCache(blockCache);
2219    region.initialize();
2220    return region;
2221  }
2222
2223  /**
2224   * Create a region with it's own WAL. Be sure to call
2225   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2226   */
2227  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2228    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2229    throws IOException {
2230    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2231    region.setMobFileCache(mobFileCache);
2232    region.initialize();
2233    return region;
2234  }
2235
2236  /**
2237   * Create a region with it's own WAL. Be sure to call
2238   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2239   */
2240  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2241    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2242    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2243      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2244    WAL wal = createWal(conf, rootDir, info);
2245    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2246  }
2247
2248  /**
2249   * Find any other region server which is different from the one identified by parameter
2250   * @return another region server
2251   */
2252  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2253    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2254      if (!(rst.getRegionServer() == rs)) {
2255        return rst.getRegionServer();
2256      }
2257    }
2258    return null;
2259  }
2260
2261  /**
2262   * Tool to get the reference to the region server object that holds the region of the specified
2263   * user table.
2264   * @param tableName user table to lookup in hbase:meta
2265   * @return region server that holds it, null if the row doesn't exist
2266   */
2267  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2268    throws IOException, InterruptedException {
2269    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2270    if (regions == null || regions.isEmpty()) {
2271      return null;
2272    }
2273    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2274
2275    byte[] firstRegionName =
2276      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2277        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2278
2279    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2280    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2281      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2282    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2283      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2284    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2285    while (retrier.shouldRetry()) {
2286      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2287      if (index != -1) {
2288        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2289      }
2290      // Came back -1. Region may not be online yet. Sleep a while.
2291      retrier.sleepUntilNextRetry();
2292    }
2293    return null;
2294  }
2295
2296  /**
2297   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2298   * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple
2299   * MiniMRCluster instances with different log dirs.
2300   * @throws IOException When starting the cluster fails.
2301   */
2302  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2303    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2304    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2305      "99.0");
2306    startMiniMapReduceCluster(2);
2307    return mrCluster;
2308  }
2309
2310  /**
2311   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2312   * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start
2313   * multiple MiniMRCluster instances with different log dirs.
2314   * @param servers The number of <code>TaskTracker</code>'s to start.
2315   * @throws IOException When starting the cluster fails.
2316   */
2317  private void startMiniMapReduceCluster(final int servers) throws IOException {
2318    if (mrCluster != null) {
2319      throw new IllegalStateException("MiniMRCluster is already running");
2320    }
2321    LOG.info("Starting mini mapreduce cluster...");
2322    setupClusterTestDir();
2323    createDirsAndSetProperties();
2324
2325    //// hadoop2 specific settings
2326    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2327    // we up the VM usable so that processes don't get killed.
2328    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2329
2330    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2331    // this avoids the problem by disabling speculative task execution in tests.
2332    conf.setBoolean("mapreduce.map.speculative", false);
2333    conf.setBoolean("mapreduce.reduce.speculative", false);
2334    ////
2335
2336    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2337    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2338    if (JVM.getJVMSpecVersion() >= 17) {
2339      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2340      conf.set("yarn.app.mapreduce.am.command-opts",
2341        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2342    }
2343
2344    // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem
2345    // instance and cause trouble in HBase. See HBASE-29802 for more details.
2346    JobConf mrClusterConf = new JobConf(conf);
2347    mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true);
2348    // Allow the user to override FS URI for this map-reduce cluster to use.
2349    mrCluster =
2350      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2351        1, null, null, mrClusterConf);
2352    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2353    if (jobConf == null) {
2354      jobConf = mrCluster.createJobConf();
2355    }
2356
2357    // Hadoop MiniMR overwrites this while it should not
2358    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2359    LOG.info("Mini mapreduce cluster started");
2360
2361    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2362    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2363    // necessary config properties here. YARN-129 required adding a few properties.
2364    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2365    // this for mrv2 support; mr1 ignores this
2366    conf.set("mapreduce.framework.name", "yarn");
2367    conf.setBoolean("yarn.is.minicluster", true);
2368    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2369    if (rmAddress != null) {
2370      conf.set("yarn.resourcemanager.address", rmAddress);
2371    }
2372    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2373    if (historyAddress != null) {
2374      conf.set("mapreduce.jobhistory.address", historyAddress);
2375    }
2376    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2377    if (schedulerAddress != null) {
2378      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2379    }
2380    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2381    if (mrJobHistoryWebappAddress != null) {
2382      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2383    }
2384    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2385    if (yarnRMWebappAddress != null) {
2386      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2387    }
2388  }
2389
2390  /**
2391   * Stops the previously started <code>MiniMRCluster</code>.
2392   */
2393  public void shutdownMiniMapReduceCluster() {
2394    if (mrCluster != null) {
2395      LOG.info("Stopping mini mapreduce cluster...");
2396      mrCluster.shutdown();
2397      mrCluster = null;
2398      LOG.info("Mini mapreduce cluster stopped");
2399    }
2400    // Restore configuration to point to local jobtracker
2401    conf.set("mapreduce.jobtracker.address", "local");
2402  }
2403
2404  /**
2405   * Create a stubbed out RegionServerService, mainly for getting FS.
2406   */
2407  public RegionServerServices createMockRegionServerService() throws IOException {
2408    return createMockRegionServerService((ServerName) null);
2409  }
2410
2411  /**
2412   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2413   * TestTokenAuthentication
2414   */
2415  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2416    throws IOException {
2417    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2418    rss.setFileSystem(getTestFileSystem());
2419    rss.setRpcServer(rpc);
2420    return rss;
2421  }
2422
2423  /**
2424   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2425   * TestOpenRegionHandler
2426   */
2427  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2428    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2429    rss.setFileSystem(getTestFileSystem());
2430    return rss;
2431  }
2432
2433  /**
2434   * Expire the Master's session
2435   */
2436  public void expireMasterSession() throws Exception {
2437    HMaster master = getMiniHBaseCluster().getMaster();
2438    expireSession(master.getZooKeeper(), false);
2439  }
2440
2441  /**
2442   * Expire a region server's session
2443   * @param index which RS
2444   */
2445  public void expireRegionServerSession(int index) throws Exception {
2446    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2447    expireSession(rs.getZooKeeper(), false);
2448    decrementMinRegionServerCount();
2449  }
2450
2451  private void decrementMinRegionServerCount() {
2452    // decrement the count for this.conf, for newly spwaned master
2453    // this.hbaseCluster shares this configuration too
2454    decrementMinRegionServerCount(getConfiguration());
2455
2456    // each master thread keeps a copy of configuration
2457    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2458      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2459    }
2460  }
2461
2462  private void decrementMinRegionServerCount(Configuration conf) {
2463    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2464    if (currentCount != -1) {
2465      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2466    }
2467  }
2468
2469  public void expireSession(ZKWatcher nodeZK) throws Exception {
2470    expireSession(nodeZK, false);
2471  }
2472
2473  /**
2474   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2475   * http://hbase.apache.org/book.html#trouble.zookeeper
2476   * <p/>
2477   * There are issues when doing this:
2478   * <ol>
2479   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2480   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2481   * </ol>
2482   * @param nodeZK      - the ZK watcher to expire
2483   * @param checkStatus - true to check if we can create a Table with the current configuration.
2484   */
2485  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2486    Configuration c = new Configuration(this.conf);
2487    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2488    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2489    byte[] password = zk.getSessionPasswd();
2490    long sessionID = zk.getSessionId();
2491
2492    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2493    // so we create a first watcher to be sure that the
2494    // event was sent. We expect that if our watcher receives the event
2495    // other watchers on the same machine will get is as well.
2496    // When we ask to close the connection, ZK does not close it before
2497    // we receive all the events, so don't have to capture the event, just
2498    // closing the connection should be enough.
2499    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2500      @Override
2501      public void process(WatchedEvent watchedEvent) {
2502        LOG.info("Monitor ZKW received event=" + watchedEvent);
2503      }
2504    }, sessionID, password);
2505
2506    // Making it expire
2507    ZooKeeper newZK =
2508      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2509
2510    // ensure that we have connection to the server before closing down, otherwise
2511    // the close session event will be eaten out before we start CONNECTING state
2512    long start = EnvironmentEdgeManager.currentTime();
2513    while (
2514      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2515    ) {
2516      Thread.sleep(1);
2517    }
2518    newZK.close();
2519    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2520
2521    // Now closing & waiting to be sure that the clients get it.
2522    monitor.close();
2523
2524    if (checkStatus) {
2525      getConnection().getTable(TableName.META_TABLE_NAME).close();
2526    }
2527  }
2528
2529  /**
2530   * Get the Mini HBase cluster.
2531   * @return hbase cluster
2532   * @see #getHBaseClusterInterface()
2533   */
2534  public SingleProcessHBaseCluster getHBaseCluster() {
2535    return getMiniHBaseCluster();
2536  }
2537
2538  /**
2539   * Returns the HBaseCluster instance.
2540   * <p>
2541   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2542   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2543   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2544   * instead w/o the need to type-cast.
2545   */
2546  public HBaseClusterInterface getHBaseClusterInterface() {
2547    // implementation note: we should rename this method as #getHBaseCluster(),
2548    // but this would require refactoring 90+ calls.
2549    return hbaseCluster;
2550  }
2551
2552  /**
2553   * Resets the connections so that the next time getConnection() is called, a new connection is
2554   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2555   * the connection is not valid anymore.
2556   * <p/>
2557   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2558   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2559   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2560   * the connection state automatically. Cleaning this is a much bigger refactor.
2561   */
2562  public void invalidateConnection() throws IOException {
2563    closeConnection();
2564    // Update the master addresses if they changed.
2565    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2566    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2567    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2568      masterConfigBefore, masterConfAfter);
2569    conf.set(HConstants.MASTER_ADDRS_KEY,
2570      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2571  }
2572
2573  /**
2574   * Get a shared Connection to the cluster. this method is thread safe.
2575   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2576   */
2577  public Connection getConnection() throws IOException {
2578    return getAsyncConnection().toConnection();
2579  }
2580
2581  /**
2582   * Get a assigned Connection to the cluster. this method is thread safe.
2583   * @param user assigned user
2584   * @return A Connection with assigned user.
2585   */
2586  public Connection getConnection(User user) throws IOException {
2587    return getAsyncConnection(user).toConnection();
2588  }
2589
2590  /**
2591   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2592   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2593   *         of cluster.
2594   */
2595  public AsyncClusterConnection getAsyncConnection() throws IOException {
2596    try {
2597      return asyncConnection.updateAndGet(connection -> {
2598        if (connection == null) {
2599          try {
2600            User user = UserProvider.instantiate(conf).getCurrent();
2601            connection = getAsyncConnection(user);
2602          } catch (IOException ioe) {
2603            throw new UncheckedIOException("Failed to create connection", ioe);
2604          }
2605        }
2606        return connection;
2607      });
2608    } catch (UncheckedIOException exception) {
2609      throw exception.getCause();
2610    }
2611  }
2612
2613  /**
2614   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2615   * @param user assigned user
2616   * @return An AsyncClusterConnection with assigned user.
2617   */
2618  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2619    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2620  }
2621
2622  public void closeConnection() throws IOException {
2623    if (hbaseAdmin != null) {
2624      Closeables.close(hbaseAdmin, true);
2625      hbaseAdmin = null;
2626    }
2627    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2628    if (asyncConnection != null) {
2629      Closeables.close(asyncConnection, true);
2630    }
2631  }
2632
2633  /**
2634   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2635   * it has no effect, it will be closed automatically when the cluster shutdowns
2636   */
2637  public Admin getAdmin() throws IOException {
2638    if (hbaseAdmin == null) {
2639      this.hbaseAdmin = getConnection().getAdmin();
2640    }
2641    return hbaseAdmin;
2642  }
2643
2644  private Admin hbaseAdmin = null;
2645
2646  /**
2647   * Returns an {@link Hbck} instance. Needs be closed when done.
2648   */
2649  public Hbck getHbck() throws IOException {
2650    return getConnection().getHbck();
2651  }
2652
2653  /**
2654   * Unassign the named region.
2655   * @param regionName The region to unassign.
2656   */
2657  public void unassignRegion(String regionName) throws IOException {
2658    unassignRegion(Bytes.toBytes(regionName));
2659  }
2660
2661  /**
2662   * Unassign the named region.
2663   * @param regionName The region to unassign.
2664   */
2665  public void unassignRegion(byte[] regionName) throws IOException {
2666    getAdmin().unassign(regionName);
2667  }
2668
2669  /**
2670   * Closes the region containing the given row.
2671   * @param row   The row to find the containing region.
2672   * @param table The table to find the region.
2673   */
2674  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2675    unassignRegionByRow(Bytes.toBytes(row), table);
2676  }
2677
2678  /**
2679   * Closes the region containing the given row.
2680   * @param row   The row to find the containing region.
2681   * @param table The table to find the region.
2682   */
2683  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2684    HRegionLocation hrl = table.getRegionLocation(row);
2685    unassignRegion(hrl.getRegion().getRegionName());
2686  }
2687
2688  /**
2689   * Retrieves a splittable region randomly from tableName
2690   * @param tableName   name of table
2691   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2692   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2693   */
2694  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2695    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2696    int regCount = regions.size();
2697    Set<Integer> attempted = new HashSet<>();
2698    int idx;
2699    int attempts = 0;
2700    do {
2701      regions = getHBaseCluster().getRegions(tableName);
2702      if (regCount != regions.size()) {
2703        // if there was region movement, clear attempted Set
2704        attempted.clear();
2705      }
2706      regCount = regions.size();
2707      // There are chances that before we get the region for the table from an RS the region may
2708      // be going for CLOSE. This may be because online schema change is enabled
2709      if (regCount > 0) {
2710        idx = ThreadLocalRandom.current().nextInt(regCount);
2711        // if we have just tried this region, there is no need to try again
2712        if (attempted.contains(idx)) {
2713          continue;
2714        }
2715        HRegion region = regions.get(idx);
2716        if (region.checkSplit().isPresent()) {
2717          return region;
2718        }
2719        attempted.add(idx);
2720      }
2721      attempts++;
2722    } while (maxAttempts == -1 || attempts < maxAttempts);
2723    return null;
2724  }
2725
2726  public MiniDFSCluster getDFSCluster() {
2727    return dfsCluster;
2728  }
2729
2730  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2731    setDFSCluster(cluster, true);
2732  }
2733
2734  /**
2735   * Set the MiniDFSCluster
2736   * @param cluster     cluster to use
2737   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2738   *                    is set.
2739   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2740   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2741   */
2742  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2743    throws IllegalStateException, IOException {
2744    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2745      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2746    }
2747    this.dfsCluster = cluster;
2748    this.setFs();
2749  }
2750
2751  public FileSystem getTestFileSystem() throws IOException {
2752    return HFileSystem.get(conf);
2753  }
2754
2755  /**
2756   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2757   * (30 seconds).
2758   * @param table Table to wait on.
2759   */
2760  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2761    waitTableAvailable(table.getName(), 30000);
2762  }
2763
2764  public void waitTableAvailable(TableName table, long timeoutMillis)
2765    throws InterruptedException, IOException {
2766    waitFor(timeoutMillis, predicateTableAvailable(table));
2767  }
2768
2769  /**
2770   * Wait until all regions in a table have been assigned
2771   * @param table         Table to wait on.
2772   * @param timeoutMillis Timeout.
2773   */
2774  public void waitTableAvailable(byte[] table, long timeoutMillis)
2775    throws InterruptedException, IOException {
2776    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2777  }
2778
2779  public String explainTableAvailability(TableName tableName) throws IOException {
2780    StringBuilder msg =
2781      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2782    if (getHBaseCluster().getMaster().isAlive()) {
2783      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2784        .getRegionStates().getRegionAssignments();
2785      final List<Pair<RegionInfo, ServerName>> metaLocations =
2786        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2787      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2788        RegionInfo hri = metaLocation.getFirst();
2789        ServerName sn = metaLocation.getSecond();
2790        if (!assignments.containsKey(hri)) {
2791          msg.append(", region ").append(hri)
2792            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2793        } else if (sn == null) {
2794          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2795        } else if (!sn.equals(assignments.get(hri))) {
2796          msg.append(",  region ").append(hri)
2797            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2798            .append(" <> ").append(assignments.get(hri));
2799        }
2800      }
2801    }
2802    return msg.toString();
2803  }
2804
2805  public String explainTableState(final TableName table, TableState.State state)
2806    throws IOException {
2807    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2808    if (tableState == null) {
2809      return "TableState in META: No table state in META for table " + table
2810        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2811    } else if (!tableState.inStates(state)) {
2812      return "TableState in META: Not " + state + " state, but " + tableState;
2813    } else {
2814      return "TableState in META: OK";
2815    }
2816  }
2817
2818  @Nullable
2819  public TableState findLastTableState(final TableName table) throws IOException {
2820    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2821    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2822      @Override
2823      public boolean visit(Result r) throws IOException {
2824        if (!Arrays.equals(r.getRow(), table.getName())) {
2825          return false;
2826        }
2827        TableState state = CatalogFamilyFormat.getTableState(r);
2828        if (state != null) {
2829          lastTableState.set(state);
2830        }
2831        return true;
2832      }
2833    };
2834    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2835      Integer.MAX_VALUE, visitor);
2836    return lastTableState.get();
2837  }
2838
2839  /**
2840   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2841   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2842   * table.
2843   * @param table the table to wait on.
2844   * @throws InterruptedException if interrupted while waiting
2845   * @throws IOException          if an IO problem is encountered
2846   */
2847  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2848    waitTableEnabled(table, 30000);
2849  }
2850
2851  /**
2852   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2853   * have been all assigned.
2854   * @see #waitTableEnabled(TableName, long)
2855   * @param table         Table to wait on.
2856   * @param timeoutMillis Time to wait on it being marked enabled.
2857   */
2858  public void waitTableEnabled(byte[] table, long timeoutMillis)
2859    throws InterruptedException, IOException {
2860    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2861  }
2862
2863  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2864    waitFor(timeoutMillis, predicateTableEnabled(table));
2865  }
2866
2867  /**
2868   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2869   * after default period (30 seconds)
2870   * @param table Table to wait on.
2871   */
2872  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2873    waitTableDisabled(table, 30000);
2874  }
2875
2876  public void waitTableDisabled(TableName table, long millisTimeout)
2877    throws InterruptedException, IOException {
2878    waitFor(millisTimeout, predicateTableDisabled(table));
2879  }
2880
2881  /**
2882   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2883   * @param table         Table to wait on.
2884   * @param timeoutMillis Time to wait on it being marked disabled.
2885   */
2886  public void waitTableDisabled(byte[] table, long timeoutMillis)
2887    throws InterruptedException, IOException {
2888    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2889  }
2890
2891  /**
2892   * Make sure that at least the specified number of region servers are running
2893   * @param num minimum number of region servers that should be running
2894   * @return true if we started some servers
2895   */
2896  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2897    boolean startedServer = false;
2898    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2899    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2900      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2901      startedServer = true;
2902    }
2903
2904    return startedServer;
2905  }
2906
2907  /**
2908   * Make sure that at least the specified number of region servers are running. We don't count the
2909   * ones that are currently stopping or are stopped.
2910   * @param num minimum number of region servers that should be running
2911   * @return true if we started some servers
2912   */
2913  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2914    boolean startedServer = ensureSomeRegionServersAvailable(num);
2915
2916    int nonStoppedServers = 0;
2917    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2918
2919      HRegionServer hrs = rst.getRegionServer();
2920      if (hrs.isStopping() || hrs.isStopped()) {
2921        LOG.info("A region server is stopped or stopping:" + hrs);
2922      } else {
2923        nonStoppedServers++;
2924      }
2925    }
2926    for (int i = nonStoppedServers; i < num; ++i) {
2927      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2928      startedServer = true;
2929    }
2930    return startedServer;
2931  }
2932
2933  /**
2934   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2935   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2936   * @param c                     Initial configuration
2937   * @param differentiatingSuffix Suffix to differentiate this user from others.
2938   * @return A new configuration instance with a different user set into it.
2939   */
2940  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2941    throws IOException {
2942    FileSystem currentfs = FileSystem.get(c);
2943    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2944      return User.getCurrent();
2945    }
2946    // Else distributed filesystem. Make a new instance per daemon. Below
2947    // code is taken from the AppendTestUtil over in hdfs.
2948    String username = User.getCurrent().getName() + differentiatingSuffix;
2949    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2950    return user;
2951  }
2952
2953  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2954    throws IOException {
2955    NavigableSet<String> online = new TreeSet<>();
2956    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2957      try {
2958        for (RegionInfo region : ProtobufUtil
2959          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2960          online.add(region.getRegionNameAsString());
2961        }
2962      } catch (RegionServerStoppedException e) {
2963        // That's fine.
2964      }
2965    }
2966    return online;
2967  }
2968
2969  /**
2970   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2971   * linger. Here is the exception you'll see:
2972   *
2973   * <pre>
2974   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2975   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2976   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2977   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2978   * </pre>
2979   *
2980   * @param stream A DFSClient.DFSOutputStream.
2981   */
2982  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2983    try {
2984      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2985      for (Class<?> clazz : clazzes) {
2986        String className = clazz.getSimpleName();
2987        if (className.equals("DFSOutputStream")) {
2988          if (clazz.isInstance(stream)) {
2989            Field maxRecoveryErrorCountField =
2990              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2991            maxRecoveryErrorCountField.setAccessible(true);
2992            maxRecoveryErrorCountField.setInt(stream, max);
2993            break;
2994          }
2995        }
2996      }
2997    } catch (Exception e) {
2998      LOG.info("Could not set max recovery field", e);
2999    }
3000  }
3001
3002  /**
3003   * Uses directly the assignment manager to assign the region. and waits until the specified region
3004   * has completed assignment.
3005   * @return true if the region is assigned false otherwise.
3006   */
3007  public boolean assignRegion(final RegionInfo regionInfo)
3008    throws IOException, InterruptedException {
3009    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3010    am.assign(regionInfo);
3011    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3012  }
3013
3014  /**
3015   * Move region to destination server and wait till region is completely moved and online
3016   * @param destRegion region to move
3017   * @param destServer destination server of the region
3018   */
3019  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3020    throws InterruptedException, IOException {
3021    HMaster master = getMiniHBaseCluster().getMaster();
3022    // TODO: Here we start the move. The move can take a while.
3023    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3024    while (true) {
3025      ServerName serverName =
3026        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3027      if (serverName != null && serverName.equals(destServer)) {
3028        assertRegionOnServer(destRegion, serverName, 2000);
3029        break;
3030      }
3031      Thread.sleep(10);
3032    }
3033  }
3034
3035  /**
3036   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3037   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3038   * master has been informed and updated hbase:meta with the regions deployed server.
3039   * @param tableName the table name
3040   */
3041  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3042    waitUntilAllRegionsAssigned(tableName,
3043      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3044  }
3045
3046  /**
3047   * Waith until all system table's regions get assigned
3048   */
3049  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3050    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3051  }
3052
3053  /**
3054   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3055   * timeout. This means all regions have been deployed, master has been informed and updated
3056   * hbase:meta with the regions deployed server.
3057   * @param tableName the table name
3058   * @param timeout   timeout, in milliseconds
3059   */
3060  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3061    throws IOException {
3062    if (!TableName.isMetaTableName(tableName)) {
3063      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3064        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3065          + timeout + "ms");
3066        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3067          @Override
3068          public String explainFailure() throws IOException {
3069            return explainTableAvailability(tableName);
3070          }
3071
3072          @Override
3073          public boolean evaluate() throws IOException {
3074            Scan scan = new Scan();
3075            scan.addFamily(HConstants.CATALOG_FAMILY);
3076            boolean tableFound = false;
3077            try (ResultScanner s = meta.getScanner(scan)) {
3078              for (Result r; (r = s.next()) != null;) {
3079                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3080                RegionInfo info = RegionInfo.parseFromOrNull(b);
3081                if (info != null && info.getTable().equals(tableName)) {
3082                  // Get server hosting this region from catalog family. Return false if no server
3083                  // hosting this region, or if the server hosting this region was recently killed
3084                  // (for fault tolerance testing).
3085                  tableFound = true;
3086                  byte[] server =
3087                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3088                  if (server == null) {
3089                    return false;
3090                  } else {
3091                    byte[] startCode =
3092                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3093                    ServerName serverName =
3094                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3095                        + Bytes.toLong(startCode));
3096                    if (
3097                      !getHBaseClusterInterface().isDistributedCluster()
3098                        && getHBaseCluster().isKilledRS(serverName)
3099                    ) {
3100                      return false;
3101                    }
3102                  }
3103                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3104                    return false;
3105                  }
3106                }
3107              }
3108            }
3109            if (!tableFound) {
3110              LOG.warn(
3111                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3112            }
3113            return tableFound;
3114          }
3115        });
3116      }
3117    }
3118    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3119    // check from the master state if we are using a mini cluster
3120    if (!getHBaseClusterInterface().isDistributedCluster()) {
3121      // So, all regions are in the meta table but make sure master knows of the assignments before
3122      // returning -- sometimes this can lag.
3123      HMaster master = getHBaseCluster().getMaster();
3124      final RegionStates states = master.getAssignmentManager().getRegionStates();
3125      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3126        @Override
3127        public String explainFailure() throws IOException {
3128          return explainTableAvailability(tableName);
3129        }
3130
3131        @Override
3132        public boolean evaluate() throws IOException {
3133          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3134          return hris != null && !hris.isEmpty();
3135        }
3136      });
3137    }
3138    LOG.info("All regions for table " + tableName + " assigned.");
3139  }
3140
3141  /**
3142   * Do a small get/scan against one store. This is required because store has no actual methods of
3143   * querying itself, and relies on StoreScanner.
3144   */
3145  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3146    Scan scan = new Scan(get);
3147    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3148      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3149      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3150      // readpoint 0.
3151      0);
3152
3153    List<Cell> result = new ArrayList<>();
3154    scanner.next(result);
3155    if (!result.isEmpty()) {
3156      // verify that we are on the row we want:
3157      Cell kv = result.get(0);
3158      if (!CellUtil.matchingRows(kv, get.getRow())) {
3159        result.clear();
3160      }
3161    }
3162    scanner.close();
3163    return result;
3164  }
3165
3166  /**
3167   * Create region split keys between startkey and endKey
3168   * @param numRegions the number of regions to be created. it has to be greater than 3.
3169   * @return resulting split keys
3170   */
3171  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3172    assertTrue(numRegions > 3);
3173    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3174    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3175    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3176    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3177    return result;
3178  }
3179
3180  /**
3181   * Do a small get/scan against one store. This is required because store has no actual methods of
3182   * querying itself, and relies on StoreScanner.
3183   */
3184  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3185    throws IOException {
3186    Get get = new Get(row);
3187    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3188    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3189
3190    return getFromStoreFile(store, get);
3191  }
3192
3193  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3194    final List<? extends Cell> actual) {
3195    final int eLen = expected.size();
3196    final int aLen = actual.size();
3197    final int minLen = Math.min(eLen, aLen);
3198
3199    int i = 0;
3200    while (
3201      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3202    ) {
3203      i++;
3204    }
3205
3206    if (additionalMsg == null) {
3207      additionalMsg = "";
3208    }
3209    if (!additionalMsg.isEmpty()) {
3210      additionalMsg = ". " + additionalMsg;
3211    }
3212
3213    if (eLen != aLen || i != minLen) {
3214      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3215        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3216        + " (length " + aLen + ")" + additionalMsg);
3217    }
3218  }
3219
3220  public static <T> String safeGetAsStr(List<T> lst, int i) {
3221    if (0 <= i && i < lst.size()) {
3222      return lst.get(i).toString();
3223    } else {
3224      return "<out_of_range>";
3225    }
3226  }
3227
3228  public String getRpcConnnectionURI() throws UnknownHostException {
3229    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3230  }
3231
3232  public String getZkConnectionURI() {
3233    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3234      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3235      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3236  }
3237
3238  /**
3239   * Get the zk based cluster key for this cluster.
3240   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3241   *             connection info of a cluster. Keep here only for compatibility.
3242   * @see #getRpcConnnectionURI()
3243   * @see #getZkConnectionURI()
3244   */
3245  @Deprecated
3246  public String getClusterKey() {
3247    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3248      + ":"
3249      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3250  }
3251
3252  /**
3253   * Creates a random table with the given parameters
3254   */
3255  public Table createRandomTable(TableName tableName, final Collection<String> families,
3256    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3257    final int numRowsPerFlush) throws IOException, InterruptedException {
3258    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3259      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3260      + maxVersions + "\n");
3261
3262    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3263    final int numCF = families.size();
3264    final byte[][] cfBytes = new byte[numCF][];
3265    {
3266      int cfIndex = 0;
3267      for (String cf : families) {
3268        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3269      }
3270    }
3271
3272    final int actualStartKey = 0;
3273    final int actualEndKey = Integer.MAX_VALUE;
3274    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3275    final int splitStartKey = actualStartKey + keysPerRegion;
3276    final int splitEndKey = actualEndKey - keysPerRegion;
3277    final String keyFormat = "%08x";
3278    final Table table = createTable(tableName, cfBytes, maxVersions,
3279      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3280      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3281
3282    if (hbaseCluster != null) {
3283      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3284    }
3285
3286    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3287
3288    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3289      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3290        final byte[] row = Bytes.toBytes(
3291          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3292
3293        Put put = new Put(row);
3294        Delete del = new Delete(row);
3295        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3296          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3297          final long ts = rand.nextInt();
3298          final byte[] qual = Bytes.toBytes("col" + iCol);
3299          if (rand.nextBoolean()) {
3300            final byte[] value =
3301              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3302                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3303            put.addColumn(cf, qual, ts, value);
3304          } else if (rand.nextDouble() < 0.8) {
3305            del.addColumn(cf, qual, ts);
3306          } else {
3307            del.addColumns(cf, qual, ts);
3308          }
3309        }
3310
3311        if (!put.isEmpty()) {
3312          mutator.mutate(put);
3313        }
3314
3315        if (!del.isEmpty()) {
3316          mutator.mutate(del);
3317        }
3318      }
3319      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3320      mutator.flush();
3321      if (hbaseCluster != null) {
3322        getMiniHBaseCluster().flushcache(table.getName());
3323      }
3324    }
3325    mutator.close();
3326
3327    return table;
3328  }
3329
3330  public static int randomFreePort() {
3331    return HBaseCommonTestingUtil.randomFreePort();
3332  }
3333
3334  public static String randomMultiCastAddress() {
3335    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3336  }
3337
3338  public static void waitForHostPort(String host, int port) throws IOException {
3339    final int maxTimeMs = 10000;
3340    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3341    IOException savedException = null;
3342    LOG.info("Waiting for server at " + host + ":" + port);
3343    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3344      try {
3345        Socket sock = new Socket(InetAddress.getByName(host), port);
3346        sock.close();
3347        savedException = null;
3348        LOG.info("Server at " + host + ":" + port + " is available");
3349        break;
3350      } catch (UnknownHostException e) {
3351        throw new IOException("Failed to look up " + host, e);
3352      } catch (IOException e) {
3353        savedException = e;
3354      }
3355      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3356    }
3357
3358    if (savedException != null) {
3359      throw savedException;
3360    }
3361  }
3362
3363  public static int getMetaRSPort(Connection connection) throws IOException {
3364    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3365      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3366    }
3367  }
3368
3369  /**
3370   * Due to async racing issue, a region may not be in the online region list of a region server
3371   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3372   */
3373  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3374    final long timeout) throws IOException, InterruptedException {
3375    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3376    while (true) {
3377      List<RegionInfo> regions = getAdmin().getRegions(server);
3378      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3379      long now = EnvironmentEdgeManager.currentTime();
3380      if (now > timeoutTime) break;
3381      Thread.sleep(10);
3382    }
3383    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3384  }
3385
3386  /**
3387   * Check to make sure the region is open on the specified region server, but not on any other one.
3388   */
3389  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3390    final long timeout) throws IOException, InterruptedException {
3391    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3392    while (true) {
3393      List<RegionInfo> regions = getAdmin().getRegions(server);
3394      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3395        List<JVMClusterUtil.RegionServerThread> rsThreads =
3396          getHBaseCluster().getLiveRegionServerThreads();
3397        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3398          HRegionServer rs = rsThread.getRegionServer();
3399          if (server.equals(rs.getServerName())) {
3400            continue;
3401          }
3402          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3403          for (HRegion r : hrs) {
3404            assertTrue("Region should not be double assigned",
3405              r.getRegionInfo().getRegionId() != hri.getRegionId());
3406          }
3407        }
3408        return; // good, we are happy
3409      }
3410      long now = EnvironmentEdgeManager.currentTime();
3411      if (now > timeoutTime) break;
3412      Thread.sleep(10);
3413    }
3414    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3415  }
3416
3417  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3418    TableDescriptor td =
3419      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3420    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3421    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3422  }
3423
3424  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3425    BlockCache blockCache) throws IOException {
3426    TableDescriptor td =
3427      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3428    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3429    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3430  }
3431
3432  public static void setFileSystemURI(String fsURI) {
3433    FS_URI = fsURI;
3434  }
3435
3436  /**
3437   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3438   */
3439  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3440    return new ExplainingPredicate<IOException>() {
3441      @Override
3442      public String explainFailure() throws IOException {
3443        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3444        return "found in transition: " + am.getRegionsInTransition().toString();
3445      }
3446
3447      @Override
3448      public boolean evaluate() throws IOException {
3449        HMaster master = getMiniHBaseCluster().getMaster();
3450        if (master == null) return false;
3451        AssignmentManager am = master.getAssignmentManager();
3452        if (am == null) return false;
3453        return !am.hasRegionsInTransition();
3454      }
3455    };
3456  }
3457
3458  /**
3459   * Returns a {@link Predicate} for checking that there are no procedure to region transition in
3460   * master
3461   */
3462  public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() {
3463    return new ExplainingPredicate<IOException>() {
3464      @Override
3465      public String explainFailure() throws IOException {
3466        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3467        return "Number of procedure scheduled for region transit: "
3468          + am.getRegionTransitScheduledCount();
3469      }
3470
3471      @Override
3472      public boolean evaluate() throws IOException {
3473        HMaster master = getMiniHBaseCluster().getMaster();
3474        if (master == null) {
3475          return false;
3476        }
3477        AssignmentManager am = master.getAssignmentManager();
3478        if (am == null) {
3479          return false;
3480        }
3481        return am.getRegionTransitScheduledCount() == 0;
3482      }
3483    };
3484  }
3485
3486  /**
3487   * Returns a {@link Predicate} for checking that table is enabled
3488   */
3489  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3490    return new ExplainingPredicate<IOException>() {
3491      @Override
3492      public String explainFailure() throws IOException {
3493        return explainTableState(tableName, TableState.State.ENABLED);
3494      }
3495
3496      @Override
3497      public boolean evaluate() throws IOException {
3498        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3499      }
3500    };
3501  }
3502
3503  /**
3504   * Returns a {@link Predicate} for checking that table is enabled
3505   */
3506  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3507    return new ExplainingPredicate<IOException>() {
3508      @Override
3509      public String explainFailure() throws IOException {
3510        return explainTableState(tableName, TableState.State.DISABLED);
3511      }
3512
3513      @Override
3514      public boolean evaluate() throws IOException {
3515        return getAdmin().isTableDisabled(tableName);
3516      }
3517    };
3518  }
3519
3520  /**
3521   * Returns a {@link Predicate} for checking that table is enabled
3522   */
3523  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3524    return new ExplainingPredicate<IOException>() {
3525      @Override
3526      public String explainFailure() throws IOException {
3527        return explainTableAvailability(tableName);
3528      }
3529
3530      @Override
3531      public boolean evaluate() throws IOException {
3532        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3533        if (tableAvailable) {
3534          try (Table table = getConnection().getTable(tableName)) {
3535            TableDescriptor htd = table.getDescriptor();
3536            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3537              .getAllRegionLocations()) {
3538              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3539                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3540                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3541              for (byte[] family : htd.getColumnFamilyNames()) {
3542                scan.addFamily(family);
3543              }
3544              try (ResultScanner scanner = table.getScanner(scan)) {
3545                scanner.next();
3546              }
3547            }
3548          }
3549        }
3550        return tableAvailable;
3551      }
3552    };
3553  }
3554
3555  /**
3556   * Wait until no regions in transition.
3557   * @param timeout How long to wait.
3558   */
3559  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3560    waitFor(timeout, predicateNoRegionsInTransition());
3561  }
3562
3563  /**
3564   * Wait until no regions in transition.
3565   * @param timeout How long to wait.
3566   */
3567  public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException {
3568    waitFor(timeout, predicateNoRegionTransitScheduled());
3569  }
3570
3571  /**
3572   * Wait until no regions in transition. (time limit 15min)
3573   */
3574  public void waitUntilNoRegionsInTransition() throws IOException {
3575    waitUntilNoRegionsInTransition(15 * 60000);
3576  }
3577
3578  /**
3579   * Wait until no TRSP is present
3580   */
3581  public void waitUntilNoRegionTransitScheduled() throws IOException {
3582    waitUntilNoRegionTransitScheduled(15 * 60000);
3583  }
3584
3585  /**
3586   * Wait until labels is ready in VisibilityLabelsCache.
3587   */
3588  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3589    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3590    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3591
3592      @Override
3593      public boolean evaluate() {
3594        for (String label : labels) {
3595          if (labelsCache.getLabelOrdinal(label) == 0) {
3596            return false;
3597          }
3598        }
3599        return true;
3600      }
3601
3602      @Override
3603      public String explainFailure() {
3604        for (String label : labels) {
3605          if (labelsCache.getLabelOrdinal(label) == 0) {
3606            return label + " is not available yet";
3607          }
3608        }
3609        return "";
3610      }
3611    });
3612  }
3613
3614  /**
3615   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3616   * available.
3617   * @return the list of column descriptors
3618   */
3619  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3620    return generateColumnDescriptors("");
3621  }
3622
3623  /**
3624   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3625   * available.
3626   * @param prefix family names prefix
3627   * @return the list of column descriptors
3628   */
3629  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3630    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3631    long familyId = 0;
3632    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3633      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3634        for (BloomType bloomType : BloomType.values()) {
3635          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3636          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3637            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3638          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3639          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3640          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3641          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3642          familyId++;
3643        }
3644      }
3645    }
3646    return columnFamilyDescriptors;
3647  }
3648
3649  /**
3650   * Get supported compression algorithms.
3651   * @return supported compression algorithms.
3652   */
3653  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3654    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3655    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3656    for (String algoName : allAlgos) {
3657      try {
3658        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3659        algo.getCompressor();
3660        supportedAlgos.add(algo);
3661      } catch (Throwable t) {
3662        // this algo is not available
3663      }
3664    }
3665    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3666  }
3667
3668  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3669    Scan scan = new Scan().withStartRow(row);
3670    scan.setReadType(ReadType.PREAD);
3671    scan.setCaching(1);
3672    scan.setReversed(true);
3673    scan.addFamily(family);
3674    try (RegionScanner scanner = r.getScanner(scan)) {
3675      List<Cell> cells = new ArrayList<>(1);
3676      scanner.next(cells);
3677      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3678        return null;
3679      }
3680      return Result.create(cells);
3681    }
3682  }
3683
3684  private boolean isTargetTable(final byte[] inRow, Cell c) {
3685    String inputRowString = Bytes.toString(inRow);
3686    int i = inputRowString.indexOf(HConstants.DELIMITER);
3687    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3688    int o = outputRowString.indexOf(HConstants.DELIMITER);
3689    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3690  }
3691
3692  /**
3693   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3694   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3695   * kerby KDC server and utility for using it,
3696   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3697   * less baggage. It came in in HBASE-5291.
3698   */
3699  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3700    Properties conf = MiniKdc.createConf();
3701    conf.put(MiniKdc.DEBUG, true);
3702    MiniKdc kdc = null;
3703    File dir = null;
3704    // There is time lag between selecting a port and trying to bind with it. It's possible that
3705    // another service captures the port in between which'll result in BindException.
3706    boolean bindException;
3707    int numTries = 0;
3708    do {
3709      try {
3710        bindException = false;
3711        dir = new File(getDataTestDir("kdc").toUri().getPath());
3712        kdc = new MiniKdc(conf, dir);
3713        kdc.start();
3714      } catch (BindException e) {
3715        FileUtils.deleteDirectory(dir); // clean directory
3716        numTries++;
3717        if (numTries == 3) {
3718          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3719          throw e;
3720        }
3721        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3722        bindException = true;
3723      }
3724    } while (bindException);
3725    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3726    return kdc;
3727  }
3728
3729  public int getNumHFiles(final TableName tableName, final byte[] family) {
3730    int numHFiles = 0;
3731    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3732      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3733    }
3734    return numHFiles;
3735  }
3736
3737  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3738    final byte[] family) {
3739    int numHFiles = 0;
3740    for (Region region : rs.getRegions(tableName)) {
3741      numHFiles += region.getStore(family).getStorefilesCount();
3742    }
3743    return numHFiles;
3744  }
3745
3746  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3747    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3748    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3749    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3750    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3751    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3752        it2 = rtdFamilies.iterator(); it.hasNext();) {
3753      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3754    }
3755  }
3756
3757  /**
3758   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3759   * invocations.
3760   */
3761  public static void await(final long sleepMillis, final BooleanSupplier condition)
3762    throws InterruptedException {
3763    try {
3764      while (!condition.getAsBoolean()) {
3765        Thread.sleep(sleepMillis);
3766      }
3767    } catch (RuntimeException e) {
3768      if (e.getCause() instanceof AssertionError) {
3769        throw (AssertionError) e.getCause();
3770      }
3771      throw e;
3772    }
3773  }
3774
3775  public void createRegionDir(RegionInfo hri) throws IOException {
3776    Path rootDir = getDataTestDir();
3777    Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable());
3778    Path regionDir = new Path(tableDir, hri.getEncodedName());
3779    FileSystem fs = getTestFileSystem();
3780    if (!fs.exists(regionDir)) {
3781      fs.mkdirs(regionDir);
3782    }
3783  }
3784
3785  public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem)
3786    throws IOException {
3787    Path tableDir =
3788      CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable());
3789    HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir,
3790      regionInfo);
3791  }
3792}