001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.File;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.io.UncheckedIOException;
030import java.lang.reflect.Field;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
092import org.apache.hadoop.hbase.fs.HFileSystem;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.BlockCache;
097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
098import org.apache.hadoop.hbase.io.hfile.HFile;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
101import org.apache.hadoop.hbase.master.HMaster;
102import org.apache.hadoop.hbase.master.MasterFileSystem;
103import org.apache.hadoop.hbase.master.RegionState;
104import org.apache.hadoop.hbase.master.ServerManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
107import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
108import org.apache.hadoop.hbase.master.assignment.RegionStates;
109import org.apache.hadoop.hbase.mob.MobFileCache;
110import org.apache.hadoop.hbase.regionserver.BloomType;
111import org.apache.hadoop.hbase.regionserver.ChunkCreator;
112import org.apache.hadoop.hbase.regionserver.HRegion;
113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
114import org.apache.hadoop.hbase.regionserver.HRegionServer;
115import org.apache.hadoop.hbase.regionserver.HStore;
116import org.apache.hadoop.hbase.regionserver.InternalScanner;
117import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
118import org.apache.hadoop.hbase.regionserver.Region;
119import org.apache.hadoop.hbase.regionserver.RegionScanner;
120import org.apache.hadoop.hbase.regionserver.RegionServerServices;
121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
122import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
123import org.apache.hadoop.hbase.security.User;
124import org.apache.hadoop.hbase.security.UserProvider;
125import org.apache.hadoop.hbase.security.access.AccessController;
126import org.apache.hadoop.hbase.security.access.PermissionStorage;
127import org.apache.hadoop.hbase.security.access.SecureTestUtil;
128import org.apache.hadoop.hbase.security.token.TokenProvider;
129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
131import org.apache.hadoop.hbase.util.Bytes;
132import org.apache.hadoop.hbase.util.CommonFSUtils;
133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
134import org.apache.hadoop.hbase.util.FSUtils;
135import org.apache.hadoop.hbase.util.JVM;
136import org.apache.hadoop.hbase.util.JVMClusterUtil;
137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
139import org.apache.hadoop.hbase.util.Pair;
140import org.apache.hadoop.hbase.util.RetryCounter;
141import org.apache.hadoop.hbase.util.Threads;
142import org.apache.hadoop.hbase.wal.WAL;
143import org.apache.hadoop.hbase.wal.WALFactory;
144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
145import org.apache.hadoop.hbase.zookeeper.ZKConfig;
146import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
147import org.apache.hadoop.hdfs.DFSClient;
148import org.apache.hadoop.hdfs.DFSConfigKeys;
149import org.apache.hadoop.hdfs.DistributedFileSystem;
150import org.apache.hadoop.hdfs.MiniDFSCluster;
151import org.apache.hadoop.hdfs.server.datanode.DataNode;
152import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
153import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
154import org.apache.hadoop.mapred.JobConf;
155import org.apache.hadoop.mapred.MiniMRCluster;
156import org.apache.hadoop.minikdc.MiniKdc;
157import org.apache.yetus.audience.InterfaceAudience;
158import org.apache.yetus.audience.InterfaceStability;
159import org.apache.zookeeper.WatchedEvent;
160import org.apache.zookeeper.ZooKeeper;
161import org.apache.zookeeper.ZooKeeper.States;
162
163import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
164
165import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
166
167/**
168 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
169 * functionality. Create an instance and keep it around testing HBase.
170 * <p/>
171 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
172 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
173 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
174 * cluster.
175 * <p/>
176 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
177 * <p/>
178 * It does not set logging levels.
179 * <p/>
180 * In the configuration properties, default values for master-info-port and region-server-port are
181 * overridden such that a random port will be assigned (thus avoiding port contention if another
182 * local HBase instance is already running).
183 * <p/>
184 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
185 * setting it to true.
186 */
187@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
188@InterfaceStability.Evolving
189public class HBaseTestingUtil extends HBaseZKTestingUtil {
190
191  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
192
193  private MiniDFSCluster dfsCluster = null;
194  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
195
196  private volatile HBaseClusterInterface hbaseCluster = null;
197  private MiniMRCluster mrCluster = null;
198
199  /** If there is a mini cluster running for this testing utility instance. */
200  private volatile boolean miniClusterRunning;
201
202  private String hadoopLogDir;
203
204  /**
205   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
206   */
207  private Path dataTestDirOnTestFS = null;
208
209  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
210
211  /** Filesystem URI used for map-reduce mini-cluster setup */
212  private static String FS_URI;
213
214  /** This is for unit tests parameterized with a single boolean. */
215  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
216
217  /**
218   * Checks to see if a specific port is available.
219   * @param port the port number to check for availability
220   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
221   */
222  public static boolean available(int port) {
223    ServerSocket ss = null;
224    DatagramSocket ds = null;
225    try {
226      ss = new ServerSocket(port);
227      ss.setReuseAddress(true);
228      ds = new DatagramSocket(port);
229      ds.setReuseAddress(true);
230      return true;
231    } catch (IOException e) {
232      // Do nothing
233    } finally {
234      if (ds != null) {
235        ds.close();
236      }
237
238      if (ss != null) {
239        try {
240          ss.close();
241        } catch (IOException e) {
242          /* should not be thrown */
243        }
244      }
245    }
246
247    return false;
248  }
249
250  /**
251   * Create all combinations of Bloom filters and compression algorithms for testing.
252   */
253  private static List<Object[]> bloomAndCompressionCombinations() {
254    List<Object[]> configurations = new ArrayList<>();
255    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
256      for (BloomType bloomType : BloomType.values()) {
257        configurations.add(new Object[] { comprAlgo, bloomType });
258      }
259    }
260    return Collections.unmodifiableList(configurations);
261  }
262
263  /**
264   * Create combination of memstoreTS and tags
265   */
266  private static List<Object[]> memStoreTSAndTagsCombination() {
267    List<Object[]> configurations = new ArrayList<>();
268    configurations.add(new Object[] { false, false });
269    configurations.add(new Object[] { false, true });
270    configurations.add(new Object[] { true, false });
271    configurations.add(new Object[] { true, true });
272    return Collections.unmodifiableList(configurations);
273  }
274
275  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
276    List<Object[]> configurations = new ArrayList<>();
277    configurations.add(new Object[] { false, false, true });
278    configurations.add(new Object[] { false, false, false });
279    configurations.add(new Object[] { false, true, true });
280    configurations.add(new Object[] { false, true, false });
281    configurations.add(new Object[] { true, false, true });
282    configurations.add(new Object[] { true, false, false });
283    configurations.add(new Object[] { true, true, true });
284    configurations.add(new Object[] { true, true, false });
285    return Collections.unmodifiableList(configurations);
286  }
287
288  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
289    bloomAndCompressionCombinations();
290
291  /**
292   * <p>
293   * Create an HBaseTestingUtility using a default configuration.
294   * <p>
295   * Initially, all tmp files are written to a local test data directory. Once
296   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
297   * data will be written to the DFS directory instead.
298   */
299  public HBaseTestingUtil() {
300    this(HBaseConfiguration.create());
301  }
302
303  /**
304   * <p>
305   * Create an HBaseTestingUtility using a given configuration.
306   * <p>
307   * Initially, all tmp files are written to a local test data directory. Once
308   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
309   * data will be written to the DFS directory instead.
310   * @param conf The configuration to use for further operations
311   */
312  public HBaseTestingUtil(@Nullable Configuration conf) {
313    super(conf);
314
315    // a hbase checksum verification failure will cause unit tests to fail
316    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
317
318    // Save this for when setting default file:// breaks things
319    if (this.conf.get("fs.defaultFS") != null) {
320      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
321    }
322    if (this.conf.get(HConstants.HBASE_DIR) != null) {
323      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
324    }
325    // Every cluster is a local cluster until we start DFS
326    // Note that conf could be null, but this.conf will not be
327    String dataTestDir = getDataTestDir().toString();
328    this.conf.set("fs.defaultFS", "file:///");
329    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
330    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
331    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
332    // If the value for random ports isn't set set it to true, thus making
333    // tests opt-out for random port assignment
334    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
335      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
336  }
337
338  /**
339   * Close both the region {@code r} and it's underlying WAL. For use in tests.
340   */
341  public static void closeRegionAndWAL(final Region r) throws IOException {
342    closeRegionAndWAL((HRegion) r);
343  }
344
345  /**
346   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
347   */
348  public static void closeRegionAndWAL(final HRegion r) throws IOException {
349    if (r == null) return;
350    r.close();
351    if (r.getWAL() == null) return;
352    r.getWAL().close();
353  }
354
355  /**
356   * Start mini secure cluster with given kdc and principals.
357   * @param kdc              Mini kdc server
358   * @param servicePrincipal Service principal without realm.
359   * @param spnegoPrincipal  Spnego principal without realm.
360   * @return Handler to shutdown the cluster
361   */
362  public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
363    String spnegoPrincipal) throws Exception {
364    Configuration conf = getConfiguration();
365
366    SecureTestUtil.enableSecurity(conf);
367    VisibilityTestUtil.enableVisiblityLabels(conf);
368    SecureTestUtil.verifyConfiguration(conf);
369
370    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
371      AccessController.class.getName() + ',' + TokenProvider.class.getName());
372
373    HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
374      spnegoPrincipal + '@' + kdc.getRealm());
375
376    startMiniCluster();
377    try {
378      waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
379    } catch (Exception e) {
380      shutdownMiniCluster();
381      throw e;
382    }
383
384    return this::shutdownMiniCluster;
385  }
386
387  /**
388   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
389   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
390   * by the Configuration. If say, a Connection was being used against a cluster that had been
391   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
392   * Rather than use the return direct, its usually best to make a copy and use that. Do
393   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
394   * @return Instance of Configuration.
395   */
396  @Override
397  public Configuration getConfiguration() {
398    return super.getConfiguration();
399  }
400
401  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
402    this.hbaseCluster = hbaseCluster;
403  }
404
405  /**
406   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
407   * have many concurrent tests running if we need to. Moding a System property is not the way to do
408   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
409   * not anything can do about it at moment; single instance only is how the minidfscluster works.
410   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
411   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
412   * (We do not create them!).
413   * @return The calculated data test build directory, if newly-created.
414   */
415  @Override
416  protected Path setupDataTestDir() {
417    Path testPath = super.setupDataTestDir();
418    if (null == testPath) {
419      return null;
420    }
421
422    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
423
424    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
425    // we want our own value to ensure uniqueness on the same machine
426    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
427
428    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
429    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
430    return testPath;
431  }
432
433  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
434
435    String sysValue = System.getProperty(propertyName);
436
437    if (sysValue != null) {
438      // There is already a value set. So we do nothing but hope
439      // that there will be no conflicts
440      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
441        + " so I do NOT create it in " + parent);
442      String confValue = conf.get(propertyName);
443      if (confValue != null && !confValue.endsWith(sysValue)) {
444        LOG.warn(propertyName + " property value differs in configuration and system: "
445          + "Configuration=" + confValue + " while System=" + sysValue
446          + " Erasing configuration value by system value.");
447      }
448      conf.set(propertyName, sysValue);
449    } else {
450      // Ok, it's not set, so we create it as a subdirectory
451      createSubDir(propertyName, parent, subDirName);
452      System.setProperty(propertyName, conf.get(propertyName));
453    }
454  }
455
456  /**
457   * @return Where to write test data on the test filesystem; Returns working directory for the test
458   *         filesystem by default
459   * @see #setupDataTestDirOnTestFS()
460   * @see #getTestFileSystem()
461   */
462  private Path getBaseTestDirOnTestFS() throws IOException {
463    FileSystem fs = getTestFileSystem();
464    return new Path(fs.getWorkingDirectory(), "test-data");
465  }
466
467  /**
468   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
469   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
470   * on it.
471   * @return a unique path in the test filesystem
472   */
473  public Path getDataTestDirOnTestFS() throws IOException {
474    if (dataTestDirOnTestFS == null) {
475      setupDataTestDirOnTestFS();
476    }
477
478    return dataTestDirOnTestFS;
479  }
480
481  /**
482   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
483   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
484   * on it.
485   * @return a unique path in the test filesystem
486   * @param subdirName name of the subdir to create under the base test dir
487   */
488  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
489    return new Path(getDataTestDirOnTestFS(), subdirName);
490  }
491
492  /**
493   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
494   * setup.
495   */
496  private void setupDataTestDirOnTestFS() throws IOException {
497    if (dataTestDirOnTestFS != null) {
498      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
499      return;
500    }
501    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
502  }
503
504  /**
505   * Sets up a new path in test filesystem to be used by tests.
506   */
507  private Path getNewDataTestDirOnTestFS() throws IOException {
508    // The file system can be either local, mini dfs, or if the configuration
509    // is supplied externally, it can be an external cluster FS. If it is a local
510    // file system, the tests should use getBaseTestDir, otherwise, we can use
511    // the working directory, and create a unique sub dir there
512    FileSystem fs = getTestFileSystem();
513    Path newDataTestDir;
514    String randomStr = getRandomUUID().toString();
515    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
516      newDataTestDir = new Path(getDataTestDir(), randomStr);
517      File dataTestDir = new File(newDataTestDir.toString());
518      if (deleteOnExit()) dataTestDir.deleteOnExit();
519    } else {
520      Path base = getBaseTestDirOnTestFS();
521      newDataTestDir = new Path(base, randomStr);
522      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
523    }
524    return newDataTestDir;
525  }
526
527  /**
528   * Cleans the test data directory on the test filesystem.
529   * @return True if we removed the test dirs
530   */
531  public boolean cleanupDataTestDirOnTestFS() throws IOException {
532    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
533    if (ret) {
534      dataTestDirOnTestFS = null;
535    }
536    return ret;
537  }
538
539  /**
540   * Cleans a subdirectory under the test data directory on the test filesystem.
541   * @return True if we removed child
542   */
543  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
544    Path cpath = getDataTestDirOnTestFS(subdirName);
545    return getTestFileSystem().delete(cpath, true);
546  }
547
548  /**
549   * Start a minidfscluster.
550   * @param servers How many DNs to start.
551   * @see #shutdownMiniDFSCluster()
552   * @return The mini dfs cluster created.
553   */
554  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
555    return startMiniDFSCluster(servers, null);
556  }
557
558  /**
559   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
560   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
561   * instances of the datanodes will have the same host name.
562   * @param hosts hostnames DNs to run on.
563   * @see #shutdownMiniDFSCluster()
564   * @return The mini dfs cluster created.
565   */
566  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
567    if (hosts != null && hosts.length != 0) {
568      return startMiniDFSCluster(hosts.length, hosts);
569    } else {
570      return startMiniDFSCluster(1, null);
571    }
572  }
573
574  /**
575   * Start a minidfscluster. Can only create one.
576   * @param servers How many DNs to start.
577   * @param hosts   hostnames DNs to run on.
578   * @see #shutdownMiniDFSCluster()
579   * @return The mini dfs cluster created.
580   */
581  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
582    return startMiniDFSCluster(servers, null, hosts);
583  }
584
585  private void setFs() throws IOException {
586    if (this.dfsCluster == null) {
587      LOG.info("Skipping setting fs because dfsCluster is null");
588      return;
589    }
590    FileSystem fs = this.dfsCluster.getFileSystem();
591    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
592
593    // re-enable this check with dfs
594    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
595  }
596
597  // Workaround to avoid IllegalThreadStateException
598  // See HBASE-27148 for more details
599  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
600
601    private volatile boolean stopped = false;
602
603    private final MiniDFSCluster cluster;
604
605    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
606      super("FsDatasetAsyncDiskServiceFixer");
607      setDaemon(true);
608      this.cluster = cluster;
609    }
610
611    @Override
612    public void run() {
613      while (!stopped) {
614        try {
615          Thread.sleep(30000);
616        } catch (InterruptedException e) {
617          Thread.currentThread().interrupt();
618          continue;
619        }
620        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
621        // timeout of the thread pool executor is 60 seconds by default.
622        try {
623          for (DataNode dn : cluster.getDataNodes()) {
624            FsDatasetSpi<?> dataset = dn.getFSDataset();
625            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
626            service.setAccessible(true);
627            Object asyncDiskService = service.get(dataset);
628            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
629            group.setAccessible(true);
630            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
631            if (threadGroup.isDaemon()) {
632              threadGroup.setDaemon(false);
633            }
634          }
635        } catch (NoSuchFieldException e) {
636          LOG.debug("NoSuchFieldException: " + e.getMessage()
637            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
638            + "See HBASE-27595 for details.");
639        } catch (Exception e) {
640          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
641        }
642      }
643    }
644
645    void shutdown() {
646      stopped = true;
647      interrupt();
648    }
649  }
650
651  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
652    throws Exception {
653    createDirsAndSetProperties();
654    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
655
656    this.dfsCluster =
657      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
658    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
659    this.dfsClusterFixer.start();
660    // Set this just-started cluster as our filesystem.
661    setFs();
662
663    // Wait for the cluster to be totally up
664    this.dfsCluster.waitClusterUp();
665
666    // reset the test directory for test file system
667    dataTestDirOnTestFS = null;
668    String dataTestDir = getDataTestDir().toString();
669    conf.set(HConstants.HBASE_DIR, dataTestDir);
670    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
671
672    return this.dfsCluster;
673  }
674
675  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
676    createDirsAndSetProperties();
677    dfsCluster =
678      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
679    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
680    this.dfsClusterFixer.start();
681    return dfsCluster;
682  }
683
684  /**
685   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
686   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
687   * the conf.
688   *
689   * <pre>
690   * Configuration conf = TEST_UTIL.getConfiguration();
691   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
692   *   Map.Entry&lt;String, String&gt; e = i.next();
693   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
694   * }
695   * </pre>
696   */
697  private void createDirsAndSetProperties() throws IOException {
698    setupClusterTestDir();
699    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
700    createDirAndSetProperty("test.cache.data");
701    createDirAndSetProperty("hadoop.tmp.dir");
702    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
703    createDirAndSetProperty("mapreduce.cluster.local.dir");
704    createDirAndSetProperty("mapreduce.cluster.temp.dir");
705    enableShortCircuit();
706
707    Path root = getDataTestDirOnTestFS("hadoop");
708    conf.set(MapreduceTestingShim.getMROutputDirProp(),
709      new Path(root, "mapred-output-dir").toString());
710    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
711    conf.set("mapreduce.jobtracker.staging.root.dir",
712      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
713    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
714    conf.set("yarn.app.mapreduce.am.staging-dir",
715      new Path(root, "mapreduce-am-staging-root-dir").toString());
716
717    // Frustrate yarn's and hdfs's attempts at writing /tmp.
718    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
719    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
720    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
721    createDirAndSetProperty("yarn.nodemanager.log-dirs");
722    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
723    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
724    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
725    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
726    createDirAndSetProperty("dfs.journalnode.edits.dir");
727    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
728    createDirAndSetProperty("nfs.dump.dir");
729    createDirAndSetProperty("java.io.tmpdir");
730    createDirAndSetProperty("dfs.journalnode.edits.dir");
731    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
732    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
733
734    // disable metrics logger since it depend on commons-logging internal classes and we do not want
735    // commons-logging on our classpath
736    conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
737    conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
738  }
739
740  /**
741   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
742   * Default to false.
743   */
744  public boolean isNewVersionBehaviorEnabled() {
745    final String propName = "hbase.tests.new.version.behavior";
746    String v = System.getProperty(propName);
747    if (v != null) {
748      return Boolean.parseBoolean(v);
749    }
750    return false;
751  }
752
753  /**
754   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
755   * allows to specify this parameter on the command line. If not set, default is true.
756   */
757  public boolean isReadShortCircuitOn() {
758    final String propName = "hbase.tests.use.shortcircuit.reads";
759    String readOnProp = System.getProperty(propName);
760    if (readOnProp != null) {
761      return Boolean.parseBoolean(readOnProp);
762    } else {
763      return conf.getBoolean(propName, false);
764    }
765  }
766
767  /**
768   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
769   * including skipping the hdfs checksum checks.
770   */
771  private void enableShortCircuit() {
772    if (isReadShortCircuitOn()) {
773      String curUser = System.getProperty("user.name");
774      LOG.info("read short circuit is ON for user " + curUser);
775      // read short circuit, for hdfs
776      conf.set("dfs.block.local-path-access.user", curUser);
777      // read short circuit, for hbase
778      conf.setBoolean("dfs.client.read.shortcircuit", true);
779      // Skip checking checksum, for the hdfs client and the datanode
780      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
781    } else {
782      LOG.info("read short circuit is OFF");
783    }
784  }
785
786  private String createDirAndSetProperty(final String property) {
787    return createDirAndSetProperty(property, property);
788  }
789
790  private String createDirAndSetProperty(final String relPath, String property) {
791    String path = getDataTestDir(relPath).toString();
792    System.setProperty(property, path);
793    conf.set(property, path);
794    new File(path).mkdirs();
795    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
796    return path;
797  }
798
799  /**
800   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
801   */
802  public void shutdownMiniDFSCluster() throws IOException {
803    if (this.dfsCluster != null) {
804      // The below throws an exception per dn, AsynchronousCloseException.
805      this.dfsCluster.shutdown();
806      dfsCluster = null;
807      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
808      // have a fixer
809      if (dfsClusterFixer != null) {
810        this.dfsClusterFixer.shutdown();
811        dfsClusterFixer = null;
812      }
813      dataTestDirOnTestFS = null;
814      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
815    }
816  }
817
818  /**
819   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
820   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
821   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
822   * @see #startMiniCluster(StartTestingClusterOption option)
823   * @see #shutdownMiniDFSCluster()
824   */
825  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
826    StartTestingClusterOption option = StartTestingClusterOption.builder()
827      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
828    return startMiniCluster(option);
829  }
830
831  /**
832   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
833   * value can be found in {@link StartTestingClusterOption.Builder}.
834   * @see #startMiniCluster(StartTestingClusterOption option)
835   * @see #shutdownMiniDFSCluster()
836   */
837  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
838    return startMiniCluster(StartTestingClusterOption.builder().build());
839  }
840
841  /**
842   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
843   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
844   * under System property test.build.data, to be cleaned up on exit.
845   * @see #shutdownMiniDFSCluster()
846   */
847  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
848    throws Exception {
849    LOG.info("Starting up minicluster with option: {}", option);
850
851    // If we already put up a cluster, fail.
852    if (miniClusterRunning) {
853      throw new IllegalStateException("A mini-cluster is already running");
854    }
855    miniClusterRunning = true;
856
857    setupClusterTestDir();
858
859    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
860    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
861    if (dfsCluster == null) {
862      LOG.info("STARTING DFS");
863      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
864    } else {
865      LOG.info("NOT STARTING DFS");
866    }
867
868    // Start up a zk cluster.
869    if (getZkCluster() == null) {
870      startMiniZKCluster(option.getNumZkServers());
871    }
872
873    // Start the MiniHBaseCluster
874    return startMiniHBaseCluster(option);
875  }
876
877  /**
878   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
879   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
880   * @return Reference to the hbase mini hbase cluster.
881   * @see #startMiniCluster(StartTestingClusterOption)
882   * @see #shutdownMiniHBaseCluster()
883   */
884  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
885    throws IOException, InterruptedException {
886    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
887    createRootDir(option.isCreateRootDir());
888    if (option.isCreateWALDir()) {
889      createWALRootDir();
890    }
891    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
892    // for tests that do not read hbase-defaults.xml
893    setHBaseFsTmpDir();
894
895    // These settings will make the server waits until this exact number of
896    // regions servers are connected.
897    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
898      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
899    }
900    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
901      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
902    }
903
904    Configuration c = new Configuration(this.conf);
905    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
906      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
907      option.getMasterClass(), option.getRsClass());
908    // Populate the master address configuration from mini cluster configuration.
909    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
910    // Don't leave here till we've done a successful scan of the hbase:meta
911    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
912      ResultScanner s = t.getScanner(new Scan())) {
913      for (;;) {
914        if (s.next() == null) {
915          break;
916        }
917      }
918    }
919
920    getAdmin(); // create immediately the hbaseAdmin
921    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
922
923    return (SingleProcessHBaseCluster) hbaseCluster;
924  }
925
926  /**
927   * Starts up mini hbase cluster using default options. Default options can be found in
928   * {@link StartTestingClusterOption.Builder}.
929   * @see #startMiniHBaseCluster(StartTestingClusterOption)
930   * @see #shutdownMiniHBaseCluster()
931   */
932  public SingleProcessHBaseCluster startMiniHBaseCluster()
933    throws IOException, InterruptedException {
934    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
935  }
936
937  /**
938   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
939   * {@link #startMiniCluster()}. All other options will use default values, defined in
940   * {@link StartTestingClusterOption.Builder}.
941   * @param numMasters       Master node number.
942   * @param numRegionServers Number of region servers.
943   * @return The mini HBase cluster created.
944   * @see #shutdownMiniHBaseCluster()
945   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
946   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
947   * @see #startMiniHBaseCluster(StartTestingClusterOption)
948   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
949   */
950  @Deprecated
951  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
952    throws IOException, InterruptedException {
953    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
954      .numRegionServers(numRegionServers).build();
955    return startMiniHBaseCluster(option);
956  }
957
958  /**
959   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
960   * {@link #startMiniCluster()}. All other options will use default values, defined in
961   * {@link StartTestingClusterOption.Builder}.
962   * @param numMasters       Master node number.
963   * @param numRegionServers Number of region servers.
964   * @param rsPorts          Ports that RegionServer should use.
965   * @return The mini HBase cluster created.
966   * @see #shutdownMiniHBaseCluster()
967   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
968   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
969   * @see #startMiniHBaseCluster(StartTestingClusterOption)
970   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
971   */
972  @Deprecated
973  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
974    List<Integer> rsPorts) throws IOException, InterruptedException {
975    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
976      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
977    return startMiniHBaseCluster(option);
978  }
979
980  /**
981   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
982   * {@link #startMiniCluster()}. All other options will use default values, defined in
983   * {@link StartTestingClusterOption.Builder}.
984   * @param numMasters       Master node number.
985   * @param numRegionServers Number of region servers.
986   * @param rsPorts          Ports that RegionServer should use.
987   * @param masterClass      The class to use as HMaster, or null for default.
988   * @param rsClass          The class to use as HRegionServer, or null for default.
989   * @param createRootDir    Whether to create a new root or data directory path.
990   * @param createWALDir     Whether to create a new WAL directory.
991   * @return The mini HBase cluster created.
992   * @see #shutdownMiniHBaseCluster()
993   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
994   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
995   * @see #startMiniHBaseCluster(StartTestingClusterOption)
996   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
997   */
998  @Deprecated
999  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1000    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1001    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
1002    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
1003    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
1004      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1005      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1006    return startMiniHBaseCluster(option);
1007  }
1008
1009  /**
1010   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1011   * want to keep dfs/zk up and just stop/start hbase.
1012   * @param servers number of region servers
1013   */
1014  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1015    this.restartHBaseCluster(servers, null);
1016  }
1017
1018  public void restartHBaseCluster(int servers, List<Integer> ports)
1019    throws IOException, InterruptedException {
1020    StartTestingClusterOption option =
1021      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1022    restartHBaseCluster(option);
1023    invalidateConnection();
1024  }
1025
1026  public void restartHBaseCluster(StartTestingClusterOption option)
1027    throws IOException, InterruptedException {
1028    closeConnection();
1029    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1030      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1031      option.getMasterClass(), option.getRsClass());
1032    // Don't leave here till we've done a successful scan of the hbase:meta
1033    Connection conn = ConnectionFactory.createConnection(this.conf);
1034    Table t = conn.getTable(TableName.META_TABLE_NAME);
1035    ResultScanner s = t.getScanner(new Scan());
1036    while (s.next() != null) {
1037      // do nothing
1038    }
1039    LOG.info("HBase has been restarted");
1040    s.close();
1041    t.close();
1042    conn.close();
1043  }
1044
1045  /**
1046   * Returns current mini hbase cluster. Only has something in it after a call to
1047   * {@link #startMiniCluster()}.
1048   * @see #startMiniCluster()
1049   */
1050  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1051    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1052      return (SingleProcessHBaseCluster) this.hbaseCluster;
1053    }
1054    throw new RuntimeException(
1055      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1056  }
1057
1058  /**
1059   * Stops mini hbase, zk, and hdfs clusters.
1060   * @see #startMiniCluster(int)
1061   */
1062  public void shutdownMiniCluster() throws IOException {
1063    LOG.info("Shutting down minicluster");
1064    shutdownMiniHBaseCluster();
1065    shutdownMiniDFSCluster();
1066    shutdownMiniZKCluster();
1067
1068    cleanupTestDir();
1069    miniClusterRunning = false;
1070    LOG.info("Minicluster is down");
1071  }
1072
1073  /**
1074   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1075   * @throws java.io.IOException in case command is unsuccessful
1076   */
1077  public void shutdownMiniHBaseCluster() throws IOException {
1078    cleanup();
1079    if (this.hbaseCluster != null) {
1080      this.hbaseCluster.shutdown();
1081      // Wait till hbase is down before going on to shutdown zk.
1082      this.hbaseCluster.waitUntilShutDown();
1083      this.hbaseCluster = null;
1084    }
1085    if (zooKeeperWatcher != null) {
1086      zooKeeperWatcher.close();
1087      zooKeeperWatcher = null;
1088    }
1089  }
1090
1091  /**
1092   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1093   * @throws java.io.IOException throws in case command is unsuccessful
1094   */
1095  public void killMiniHBaseCluster() throws IOException {
1096    cleanup();
1097    if (this.hbaseCluster != null) {
1098      getMiniHBaseCluster().killAll();
1099      this.hbaseCluster = null;
1100    }
1101    if (zooKeeperWatcher != null) {
1102      zooKeeperWatcher.close();
1103      zooKeeperWatcher = null;
1104    }
1105  }
1106
1107  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1108  private void cleanup() throws IOException {
1109    closeConnection();
1110    // unset the configuration for MIN and MAX RS to start
1111    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1112    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1113  }
1114
1115  /**
1116   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1117   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1118   * If false, previous path is used. Note: this does not cause the root dir to be created.
1119   * @return Fully qualified path for the default hbase root dir
1120   */
1121  public Path getDefaultRootDirPath(boolean create) throws IOException {
1122    if (!create) {
1123      return getDataTestDirOnTestFS();
1124    } else {
1125      return getNewDataTestDirOnTestFS();
1126    }
1127  }
1128
1129  /**
1130   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1131   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1132   * @return Fully qualified path for the default hbase root dir
1133   */
1134  public Path getDefaultRootDirPath() throws IOException {
1135    return getDefaultRootDirPath(false);
1136  }
1137
1138  /**
1139   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1140   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1141   * startup. You'd only use this method if you were doing manual operation.
1142   * @param create This flag decides whether to get a new root or data directory path or not, if it
1143   *               has been fetched already. Note : Directory will be made irrespective of whether
1144   *               path has been fetched or not. If directory already exists, it will be overwritten
1145   * @return Fully qualified path to hbase root dir
1146   */
1147  public Path createRootDir(boolean create) throws IOException {
1148    FileSystem fs = FileSystem.get(this.conf);
1149    Path hbaseRootdir = getDefaultRootDirPath(create);
1150    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1151    fs.mkdirs(hbaseRootdir);
1152    FSUtils.setVersion(fs, hbaseRootdir);
1153    return hbaseRootdir;
1154  }
1155
1156  /**
1157   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1158   * flag is false.
1159   * @return Fully qualified path to hbase root dir
1160   */
1161  public Path createRootDir() throws IOException {
1162    return createRootDir(false);
1163  }
1164
1165  /**
1166   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1167   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1168   * this method if you were doing manual operation.
1169   * @return Fully qualified path to hbase root dir
1170   */
1171  public Path createWALRootDir() throws IOException {
1172    FileSystem fs = FileSystem.get(this.conf);
1173    Path walDir = getNewDataTestDirOnTestFS();
1174    CommonFSUtils.setWALRootDir(this.conf, walDir);
1175    fs.mkdirs(walDir);
1176    return walDir;
1177  }
1178
1179  private void setHBaseFsTmpDir() throws IOException {
1180    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1181    if (hbaseFsTmpDirInString == null) {
1182      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1183      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1184    } else {
1185      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1186    }
1187  }
1188
1189  /**
1190   * Flushes all caches in the mini hbase cluster
1191   */
1192  public void flush() throws IOException {
1193    getMiniHBaseCluster().flushcache();
1194  }
1195
1196  /**
1197   * Flushes all caches in the mini hbase cluster
1198   */
1199  public void flush(TableName tableName) throws IOException {
1200    getMiniHBaseCluster().flushcache(tableName);
1201  }
1202
1203  /**
1204   * Compact all regions in the mini hbase cluster
1205   */
1206  public void compact(boolean major) throws IOException {
1207    getMiniHBaseCluster().compact(major);
1208  }
1209
1210  /**
1211   * Compact all of a table's reagion in the mini hbase cluster
1212   */
1213  public void compact(TableName tableName, boolean major) throws IOException {
1214    getMiniHBaseCluster().compact(tableName, major);
1215  }
1216
1217  /**
1218   * Create a table.
1219   * @return A Table instance for the created table.
1220   */
1221  public Table createTable(TableName tableName, String family) throws IOException {
1222    return createTable(tableName, new String[] { family });
1223  }
1224
1225  /**
1226   * Create a table.
1227   * @return A Table instance for the created table.
1228   */
1229  public Table createTable(TableName tableName, String[] families) throws IOException {
1230    List<byte[]> fams = new ArrayList<>(families.length);
1231    for (String family : families) {
1232      fams.add(Bytes.toBytes(family));
1233    }
1234    return createTable(tableName, fams.toArray(new byte[0][]));
1235  }
1236
1237  /**
1238   * Create a table.
1239   * @return A Table instance for the created table.
1240   */
1241  public Table createTable(TableName tableName, byte[] family) throws IOException {
1242    return createTable(tableName, new byte[][] { family });
1243  }
1244
1245  /**
1246   * Create a table with multiple regions.
1247   * @return A Table instance for the created table.
1248   */
1249  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1250    throws IOException {
1251    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1252    byte[] startKey = Bytes.toBytes("aaaaa");
1253    byte[] endKey = Bytes.toBytes("zzzzz");
1254    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1255
1256    return createTable(tableName, new byte[][] { family }, splitKeys);
1257  }
1258
1259  /**
1260   * Create a table.
1261   * @return A Table instance for the created table.
1262   */
1263  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1264    return createTable(tableName, families, (byte[][]) null);
1265  }
1266
1267  /**
1268   * Create a table with multiple regions.
1269   * @return A Table instance for the created table.
1270   */
1271  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1272    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1273  }
1274
1275  /**
1276   * Create a table with multiple regions.
1277   * @param replicaCount replica count.
1278   * @return A Table instance for the created table.
1279   */
1280  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1281    throws IOException {
1282    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1283  }
1284
1285  /**
1286   * Create a table.
1287   * @return A Table instance for the created table.
1288   */
1289  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1290    throws IOException {
1291    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1292  }
1293
1294  /**
1295   * Create a table.
1296   * @param tableName    the table name
1297   * @param families     the families
1298   * @param splitKeys    the splitkeys
1299   * @param replicaCount the region replica count
1300   * @return A Table instance for the created table.
1301   * @throws IOException throws IOException
1302   */
1303  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1304    int replicaCount) throws IOException {
1305    return createTable(tableName, families, splitKeys, replicaCount,
1306      new Configuration(getConfiguration()));
1307  }
1308
1309  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1310    byte[] endKey, int numRegions) throws IOException {
1311    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1312
1313    getAdmin().createTable(desc, startKey, endKey, numRegions);
1314    // HBaseAdmin only waits for regions to appear in hbase:meta we
1315    // should wait until they are assigned
1316    waitUntilAllRegionsAssigned(tableName);
1317    return getConnection().getTable(tableName);
1318  }
1319
1320  /**
1321   * Create a table.
1322   * @param c Configuration to use
1323   * @return A Table instance for the created table.
1324   */
1325  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1326    throws IOException {
1327    return createTable(htd, families, null, c);
1328  }
1329
1330  /**
1331   * Create a table.
1332   * @param htd       table descriptor
1333   * @param families  array of column families
1334   * @param splitKeys array of split keys
1335   * @param c         Configuration to use
1336   * @return A Table instance for the created table.
1337   * @throws IOException if getAdmin or createTable fails
1338   */
1339  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1340    Configuration c) throws IOException {
1341    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1342    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1343    // on is interfering.
1344    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1345  }
1346
1347  /**
1348   * Create a table.
1349   * @param htd       table descriptor
1350   * @param families  array of column families
1351   * @param splitKeys array of split keys
1352   * @param type      Bloom type
1353   * @param blockSize block size
1354   * @param c         Configuration to use
1355   * @return A Table instance for the created table.
1356   * @throws IOException if getAdmin or createTable fails
1357   */
1358
1359  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1360    BloomType type, int blockSize, Configuration c) throws IOException {
1361    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1362    for (byte[] family : families) {
1363      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1364        .setBloomFilterType(type).setBlocksize(blockSize);
1365      if (isNewVersionBehaviorEnabled()) {
1366        cfdb.setNewVersionBehavior(true);
1367      }
1368      builder.setColumnFamily(cfdb.build());
1369    }
1370    TableDescriptor td = builder.build();
1371    if (splitKeys != null) {
1372      getAdmin().createTable(td, splitKeys);
1373    } else {
1374      getAdmin().createTable(td);
1375    }
1376    // HBaseAdmin only waits for regions to appear in hbase:meta
1377    // we should wait until they are assigned
1378    waitUntilAllRegionsAssigned(td.getTableName());
1379    return getConnection().getTable(td.getTableName());
1380  }
1381
1382  /**
1383   * Create a table.
1384   * @param htd       table descriptor
1385   * @param splitRows array of split keys
1386   * @return A Table instance for the created table.
1387   */
1388  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1389    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1390    if (isNewVersionBehaviorEnabled()) {
1391      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1392        builder.setColumnFamily(
1393          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1394      }
1395    }
1396    if (splitRows != null) {
1397      getAdmin().createTable(builder.build(), splitRows);
1398    } else {
1399      getAdmin().createTable(builder.build());
1400    }
1401    // HBaseAdmin only waits for regions to appear in hbase:meta
1402    // we should wait until they are assigned
1403    waitUntilAllRegionsAssigned(htd.getTableName());
1404    return getConnection().getTable(htd.getTableName());
1405  }
1406
1407  /**
1408   * Create a table.
1409   * @param tableName    the table name
1410   * @param families     the families
1411   * @param splitKeys    the split keys
1412   * @param replicaCount the replica count
1413   * @param c            Configuration to use
1414   * @return A Table instance for the created table.
1415   */
1416  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1417    int replicaCount, final Configuration c) throws IOException {
1418    TableDescriptor htd =
1419      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1420    return createTable(htd, families, splitKeys, c);
1421  }
1422
1423  /**
1424   * Create a table.
1425   * @return A Table instance for the created table.
1426   */
1427  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1428    return createTable(tableName, new byte[][] { family }, numVersions);
1429  }
1430
1431  /**
1432   * Create a table.
1433   * @return A Table instance for the created table.
1434   */
1435  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1436    throws IOException {
1437    return createTable(tableName, families, numVersions, (byte[][]) null);
1438  }
1439
1440  /**
1441   * Create a table.
1442   * @return A Table instance for the created table.
1443   */
1444  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1445    byte[][] splitKeys) throws IOException {
1446    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1447    for (byte[] family : families) {
1448      ColumnFamilyDescriptorBuilder cfBuilder =
1449        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1450      if (isNewVersionBehaviorEnabled()) {
1451        cfBuilder.setNewVersionBehavior(true);
1452      }
1453      builder.setColumnFamily(cfBuilder.build());
1454    }
1455    if (splitKeys != null) {
1456      getAdmin().createTable(builder.build(), splitKeys);
1457    } else {
1458      getAdmin().createTable(builder.build());
1459    }
1460    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1461    // assigned
1462    waitUntilAllRegionsAssigned(tableName);
1463    return getConnection().getTable(tableName);
1464  }
1465
1466  /**
1467   * Create a table with multiple regions.
1468   * @return A Table instance for the created table.
1469   */
1470  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1471    throws IOException {
1472    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1473  }
1474
1475  /**
1476   * Create a table.
1477   * @return A Table instance for the created table.
1478   */
1479  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1480    throws IOException {
1481    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1482    for (byte[] family : families) {
1483      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1484        .setMaxVersions(numVersions).setBlocksize(blockSize);
1485      if (isNewVersionBehaviorEnabled()) {
1486        cfBuilder.setNewVersionBehavior(true);
1487      }
1488      builder.setColumnFamily(cfBuilder.build());
1489    }
1490    getAdmin().createTable(builder.build());
1491    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1492    // assigned
1493    waitUntilAllRegionsAssigned(tableName);
1494    return getConnection().getTable(tableName);
1495  }
1496
1497  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1498    String cpName) throws IOException {
1499    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1500    for (byte[] family : families) {
1501      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1502        .setMaxVersions(numVersions).setBlocksize(blockSize);
1503      if (isNewVersionBehaviorEnabled()) {
1504        cfBuilder.setNewVersionBehavior(true);
1505      }
1506      builder.setColumnFamily(cfBuilder.build());
1507    }
1508    if (cpName != null) {
1509      builder.setCoprocessor(cpName);
1510    }
1511    getAdmin().createTable(builder.build());
1512    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1513    // assigned
1514    waitUntilAllRegionsAssigned(tableName);
1515    return getConnection().getTable(tableName);
1516  }
1517
1518  /**
1519   * Create a table.
1520   * @return A Table instance for the created table.
1521   */
1522  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1523    throws IOException {
1524    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1525    int i = 0;
1526    for (byte[] family : families) {
1527      ColumnFamilyDescriptorBuilder cfBuilder =
1528        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1529      if (isNewVersionBehaviorEnabled()) {
1530        cfBuilder.setNewVersionBehavior(true);
1531      }
1532      builder.setColumnFamily(cfBuilder.build());
1533      i++;
1534    }
1535    getAdmin().createTable(builder.build());
1536    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1537    // assigned
1538    waitUntilAllRegionsAssigned(tableName);
1539    return getConnection().getTable(tableName);
1540  }
1541
1542  /**
1543   * Create a table.
1544   * @return A Table instance for the created table.
1545   */
1546  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1547    throws IOException {
1548    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1549    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1550    if (isNewVersionBehaviorEnabled()) {
1551      cfBuilder.setNewVersionBehavior(true);
1552    }
1553    builder.setColumnFamily(cfBuilder.build());
1554    getAdmin().createTable(builder.build(), splitRows);
1555    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1556    // assigned
1557    waitUntilAllRegionsAssigned(tableName);
1558    return getConnection().getTable(tableName);
1559  }
1560
1561  /**
1562   * Create a table with multiple regions.
1563   * @return A Table instance for the created table.
1564   */
1565  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1566    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1567  }
1568
1569  /**
1570   * Set the number of Region replicas.
1571   */
1572  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1573    throws IOException, InterruptedException {
1574    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1575      .setRegionReplication(replicaCount).build();
1576    admin.modifyTable(desc);
1577  }
1578
1579  /**
1580   * Set the number of Region replicas.
1581   */
1582  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1583    throws ExecutionException, IOException, InterruptedException {
1584    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1585      .setRegionReplication(replicaCount).build();
1586    admin.modifyTable(desc).get();
1587  }
1588
1589  /**
1590   * Drop an existing table
1591   * @param tableName existing table
1592   */
1593  public void deleteTable(TableName tableName) throws IOException {
1594    try {
1595      getAdmin().disableTable(tableName);
1596    } catch (TableNotEnabledException e) {
1597      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1598    }
1599    getAdmin().deleteTable(tableName);
1600  }
1601
1602  /**
1603   * Drop an existing table
1604   * @param tableName existing table
1605   */
1606  public void deleteTableIfAny(TableName tableName) throws IOException {
1607    try {
1608      deleteTable(tableName);
1609    } catch (TableNotFoundException e) {
1610      // ignore
1611    }
1612  }
1613
1614  // ==========================================================================
1615  // Canned table and table descriptor creation
1616
1617  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1618  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1619  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1620  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1621  private static final int MAXVERSIONS = 3;
1622
1623  public static final char FIRST_CHAR = 'a';
1624  public static final char LAST_CHAR = 'z';
1625  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1626  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1627
1628  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1629    return createModifyableTableDescriptor(TableName.valueOf(name),
1630      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1631      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1632  }
1633
1634  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1635    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1636    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1637    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1638      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1639        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1640        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1641      if (isNewVersionBehaviorEnabled()) {
1642        cfBuilder.setNewVersionBehavior(true);
1643      }
1644      builder.setColumnFamily(cfBuilder.build());
1645    }
1646    return builder.build();
1647  }
1648
1649  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1650    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1651    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1652    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1653      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1654        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1655        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1656      if (isNewVersionBehaviorEnabled()) {
1657        cfBuilder.setNewVersionBehavior(true);
1658      }
1659      builder.setColumnFamily(cfBuilder.build());
1660    }
1661    return builder;
1662  }
1663
1664  /**
1665   * Create a table of name <code>name</code>.
1666   * @param name Name to give table.
1667   * @return Column descriptor.
1668   */
1669  public TableDescriptor createTableDescriptor(final TableName name) {
1670    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1671      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1672  }
1673
1674  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1675    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1676  }
1677
1678  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1679    int maxVersions) {
1680    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1681    for (byte[] family : families) {
1682      ColumnFamilyDescriptorBuilder cfBuilder =
1683        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1684      if (isNewVersionBehaviorEnabled()) {
1685        cfBuilder.setNewVersionBehavior(true);
1686      }
1687      builder.setColumnFamily(cfBuilder.build());
1688    }
1689    return builder.build();
1690  }
1691
1692  /**
1693   * Create an HRegion that writes to the local tmp dirs
1694   * @param desc     a table descriptor indicating which table the region belongs to
1695   * @param startKey the start boundary of the region
1696   * @param endKey   the end boundary of the region
1697   * @return a region that writes to local dir for testing
1698   */
1699  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1700    throws IOException {
1701    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1702      .setEndKey(endKey).build();
1703    return createLocalHRegion(hri, desc);
1704  }
1705
1706  /**
1707   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1708   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1709   */
1710  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1711    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1712  }
1713
1714  /**
1715   * Create an HRegion that writes to the local tmp dirs with specified wal
1716   * @param info regioninfo
1717   * @param conf configuration
1718   * @param desc table descriptor
1719   * @param wal  wal for this region.
1720   * @return created hregion
1721   */
1722  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1723    WAL wal) throws IOException {
1724    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1725      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1726    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1727  }
1728
1729  /**
1730   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1731   *         when done.
1732   */
1733  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1734    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1735    throws IOException {
1736    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1737      durability, wal, null, families);
1738  }
1739
1740  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1741    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1742    boolean[] compactedMemStore, byte[]... families) throws IOException {
1743    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1744    builder.setReadOnly(isReadOnly);
1745    int i = 0;
1746    for (byte[] family : families) {
1747      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1748      if (compactedMemStore != null && i < compactedMemStore.length) {
1749        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1750      } else {
1751        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1752
1753      }
1754      i++;
1755      // Set default to be three versions.
1756      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1757      builder.setColumnFamily(cfBuilder.build());
1758    }
1759    builder.setDurability(durability);
1760    RegionInfo info =
1761      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1762    return createLocalHRegion(info, conf, builder.build(), wal);
1763  }
1764
1765  //
1766  // ==========================================================================
1767
1768  /**
1769   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1770   * read.
1771   * @param tableName existing table
1772   * @return HTable to that new table
1773   */
1774  public Table deleteTableData(TableName tableName) throws IOException {
1775    Table table = getConnection().getTable(tableName);
1776    Scan scan = new Scan();
1777    ResultScanner resScan = table.getScanner(scan);
1778    for (Result res : resScan) {
1779      Delete del = new Delete(res.getRow());
1780      table.delete(del);
1781    }
1782    resScan = table.getScanner(scan);
1783    resScan.close();
1784    return table;
1785  }
1786
1787  /**
1788   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1789   * table.
1790   * @param tableName       table which must exist.
1791   * @param preserveRegions keep the existing split points
1792   * @return HTable for the new table
1793   */
1794  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1795    throws IOException {
1796    Admin admin = getAdmin();
1797    if (!admin.isTableDisabled(tableName)) {
1798      admin.disableTable(tableName);
1799    }
1800    admin.truncateTable(tableName, preserveRegions);
1801    return getConnection().getTable(tableName);
1802  }
1803
1804  /**
1805   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1806   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1807   * preserve regions of existing table.
1808   * @param tableName table which must exist.
1809   * @return HTable for the new table
1810   */
1811  public Table truncateTable(final TableName tableName) throws IOException {
1812    return truncateTable(tableName, false);
1813  }
1814
1815  /**
1816   * Load table with rows from 'aaa' to 'zzz'.
1817   * @param t Table
1818   * @param f Family
1819   * @return Count of rows loaded.
1820   */
1821  public int loadTable(final Table t, final byte[] f) throws IOException {
1822    return loadTable(t, new byte[][] { f });
1823  }
1824
1825  /**
1826   * Load table with rows from 'aaa' to 'zzz'.
1827   * @param t Table
1828   * @param f Family
1829   * @return Count of rows loaded.
1830   */
1831  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1832    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1833  }
1834
1835  /**
1836   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1837   * @param t Table
1838   * @param f Array of Families to load
1839   * @return Count of rows loaded.
1840   */
1841  public int loadTable(final Table t, final byte[][] f) throws IOException {
1842    return loadTable(t, f, null);
1843  }
1844
1845  /**
1846   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1847   * @param t     Table
1848   * @param f     Array of Families to load
1849   * @param value the values of the cells. If null is passed, the row key is used as value
1850   * @return Count of rows loaded.
1851   */
1852  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1853    return loadTable(t, f, value, true);
1854  }
1855
1856  /**
1857   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1858   * @param t     Table
1859   * @param f     Array of Families to load
1860   * @param value the values of the cells. If null is passed, the row key is used as value
1861   * @return Count of rows loaded.
1862   */
1863  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1864    throws IOException {
1865    List<Put> puts = new ArrayList<>();
1866    for (byte[] row : HBaseTestingUtil.ROWS) {
1867      Put put = new Put(row);
1868      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1869      for (int i = 0; i < f.length; i++) {
1870        byte[] value1 = value != null ? value : row;
1871        put.addColumn(f[i], f[i], value1);
1872      }
1873      puts.add(put);
1874    }
1875    t.put(puts);
1876    return puts.size();
1877  }
1878
1879  /**
1880   * A tracker for tracking and validating table rows generated with
1881   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1882   */
1883  public static class SeenRowTracker {
1884    int dim = 'z' - 'a' + 1;
1885    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1886    byte[] startRow;
1887    byte[] stopRow;
1888
1889    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1890      this.startRow = startRow;
1891      this.stopRow = stopRow;
1892    }
1893
1894    void reset() {
1895      for (byte[] row : ROWS) {
1896        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1897      }
1898    }
1899
1900    int i(byte b) {
1901      return b - 'a';
1902    }
1903
1904    public void addRow(byte[] row) {
1905      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1906    }
1907
1908    /**
1909     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1910     * rows none
1911     */
1912    public void validate() {
1913      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1914        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1915          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1916            int count = seenRows[i(b1)][i(b2)][i(b3)];
1917            int expectedCount = 0;
1918            if (
1919              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1920                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1921            ) {
1922              expectedCount = 1;
1923            }
1924            if (count != expectedCount) {
1925              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1926              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1927                + "instead of " + expectedCount);
1928            }
1929          }
1930        }
1931      }
1932    }
1933  }
1934
1935  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1936    return loadRegion(r, f, false);
1937  }
1938
1939  public int loadRegion(final Region r, final byte[] f) throws IOException {
1940    return loadRegion((HRegion) r, f);
1941  }
1942
1943  /**
1944   * Load region with rows from 'aaa' to 'zzz'.
1945   * @param r     Region
1946   * @param f     Family
1947   * @param flush flush the cache if true
1948   * @return Count of rows loaded.
1949   */
1950  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1951    byte[] k = new byte[3];
1952    int rowCount = 0;
1953    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1954      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1955        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1956          k[0] = b1;
1957          k[1] = b2;
1958          k[2] = b3;
1959          Put put = new Put(k);
1960          put.setDurability(Durability.SKIP_WAL);
1961          put.addColumn(f, null, k);
1962          if (r.getWAL() == null) {
1963            put.setDurability(Durability.SKIP_WAL);
1964          }
1965          int preRowCount = rowCount;
1966          int pause = 10;
1967          int maxPause = 1000;
1968          while (rowCount == preRowCount) {
1969            try {
1970              r.put(put);
1971              rowCount++;
1972            } catch (RegionTooBusyException e) {
1973              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1974              Threads.sleep(pause);
1975            }
1976          }
1977        }
1978      }
1979      if (flush) {
1980        r.flush(true);
1981      }
1982    }
1983    return rowCount;
1984  }
1985
1986  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1987    throws IOException {
1988    for (int i = startRow; i < endRow; i++) {
1989      byte[] data = Bytes.toBytes(String.valueOf(i));
1990      Put put = new Put(data);
1991      put.addColumn(f, null, data);
1992      t.put(put);
1993    }
1994  }
1995
1996  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1997    throws IOException {
1998    for (int i = 0; i < totalRows; i++) {
1999      byte[] row = new byte[rowSize];
2000      Bytes.random(row);
2001      Put put = new Put(row);
2002      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2003      t.put(put);
2004    }
2005  }
2006
2007  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2008    int replicaId) throws IOException {
2009    for (int i = startRow; i < endRow; i++) {
2010      String failMsg = "Failed verification of row :" + i;
2011      byte[] data = Bytes.toBytes(String.valueOf(i));
2012      Get get = new Get(data);
2013      get.setReplicaId(replicaId);
2014      get.setConsistency(Consistency.TIMELINE);
2015      Result result = table.get(get);
2016      assertTrue(failMsg, result.containsColumn(f, null));
2017      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2018      Cell cell = result.getColumnLatestCell(f, null);
2019      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2020        cell.getValueOffset(), cell.getValueLength()));
2021    }
2022  }
2023
2024  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2025    throws IOException {
2026    verifyNumericRows((HRegion) region, f, startRow, endRow);
2027  }
2028
2029  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2030    throws IOException {
2031    verifyNumericRows(region, f, startRow, endRow, true);
2032  }
2033
2034  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2035    final boolean present) throws IOException {
2036    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2037  }
2038
2039  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2040    final boolean present) throws IOException {
2041    for (int i = startRow; i < endRow; i++) {
2042      String failMsg = "Failed verification of row :" + i;
2043      byte[] data = Bytes.toBytes(String.valueOf(i));
2044      Result result = region.get(new Get(data));
2045
2046      boolean hasResult = result != null && !result.isEmpty();
2047      assertEquals(failMsg + result, present, hasResult);
2048      if (!present) continue;
2049
2050      assertTrue(failMsg, result.containsColumn(f, null));
2051      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2052      Cell cell = result.getColumnLatestCell(f, null);
2053      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2054        cell.getValueOffset(), cell.getValueLength()));
2055    }
2056  }
2057
2058  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2059    throws IOException {
2060    for (int i = startRow; i < endRow; i++) {
2061      byte[] data = Bytes.toBytes(String.valueOf(i));
2062      Delete delete = new Delete(data);
2063      delete.addFamily(f);
2064      t.delete(delete);
2065    }
2066  }
2067
2068  /**
2069   * Return the number of rows in the given table.
2070   * @param table to count rows
2071   * @return count of rows
2072   */
2073  public static int countRows(final Table table) throws IOException {
2074    return countRows(table, new Scan());
2075  }
2076
2077  public static int countRows(final Table table, final Scan scan) throws IOException {
2078    try (ResultScanner results = table.getScanner(scan)) {
2079      int count = 0;
2080      while (results.next() != null) {
2081        count++;
2082      }
2083      return count;
2084    }
2085  }
2086
2087  public static int countRows(final Table table, final byte[]... families) throws IOException {
2088    Scan scan = new Scan();
2089    for (byte[] family : families) {
2090      scan.addFamily(family);
2091    }
2092    return countRows(table, scan);
2093  }
2094
2095  /**
2096   * Return the number of rows in the given table.
2097   */
2098  public int countRows(final TableName tableName) throws IOException {
2099    try (Table table = getConnection().getTable(tableName)) {
2100      return countRows(table);
2101    }
2102  }
2103
2104  public static int countRows(final Region region) throws IOException {
2105    return countRows(region, new Scan());
2106  }
2107
2108  public static int countRows(final Region region, final Scan scan) throws IOException {
2109    try (InternalScanner scanner = region.getScanner(scan)) {
2110      return countRows(scanner);
2111    }
2112  }
2113
2114  public static int countRows(final InternalScanner scanner) throws IOException {
2115    int scannedCount = 0;
2116    List<Cell> results = new ArrayList<>();
2117    boolean hasMore = true;
2118    while (hasMore) {
2119      hasMore = scanner.next(results);
2120      scannedCount += results.size();
2121      results.clear();
2122    }
2123    return scannedCount;
2124  }
2125
2126  /**
2127   * Return an md5 digest of the entire contents of a table.
2128   */
2129  public String checksumRows(final Table table) throws Exception {
2130    MessageDigest digest = MessageDigest.getInstance("MD5");
2131    try (ResultScanner results = table.getScanner(new Scan())) {
2132      for (Result res : results) {
2133        digest.update(res.getRow());
2134      }
2135    }
2136    return digest.toString();
2137  }
2138
2139  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2140  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2141  static {
2142    int i = 0;
2143    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2144      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2145        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2146          ROWS[i][0] = b1;
2147          ROWS[i][1] = b2;
2148          ROWS[i][2] = b3;
2149          i++;
2150        }
2151      }
2152    }
2153  }
2154
2155  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2156    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2157    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2158    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2159    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2160    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2161    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2162
2163  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2164    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2165    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2166    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2167    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2168    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2169    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2170
2171  /**
2172   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2173   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2174   * @return list of region info for regions added to meta
2175   */
2176  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2177    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2178    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2179      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2180      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2181      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2182        TableState.State.ENABLED);
2183      // add custom ones
2184      for (int i = 0; i < startKeys.length; i++) {
2185        int j = (i + 1) % startKeys.length;
2186        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2187          .setEndKey(startKeys[j]).build();
2188        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2189        newRegions.add(hri);
2190      }
2191      return newRegions;
2192    }
2193  }
2194
2195  /**
2196   * Create an unmanaged WAL. Be sure to close it when you're through.
2197   */
2198  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2199    throws IOException {
2200    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2201    // unless I pass along via the conf.
2202    Configuration confForWAL = new Configuration(conf);
2203    CommonFSUtils.setRootDir(confForWAL, rootDir);
2204    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8))
2205      .getWAL(hri);
2206  }
2207
2208  /**
2209   * Create a region with it's own WAL. Be sure to call
2210   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2211   */
2212  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2213    final Configuration conf, final TableDescriptor htd) throws IOException {
2214    return createRegionAndWAL(info, rootDir, conf, htd, true);
2215  }
2216
2217  /**
2218   * Create a region with it's own WAL. Be sure to call
2219   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2220   */
2221  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2222    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2223    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2224    region.setBlockCache(blockCache);
2225    region.initialize();
2226    return region;
2227  }
2228
2229  /**
2230   * Create a region with it's own WAL. Be sure to call
2231   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2232   */
2233  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2234    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2235    throws IOException {
2236    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2237    region.setMobFileCache(mobFileCache);
2238    region.initialize();
2239    return region;
2240  }
2241
2242  /**
2243   * Create a region with it's own WAL. Be sure to call
2244   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2245   */
2246  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2247    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2248    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2249      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2250    WAL wal = createWal(conf, rootDir, info);
2251    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2252  }
2253
2254  /**
2255   * Find any other region server which is different from the one identified by parameter
2256   * @return another region server
2257   */
2258  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2259    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2260      if (!(rst.getRegionServer() == rs)) {
2261        return rst.getRegionServer();
2262      }
2263    }
2264    return null;
2265  }
2266
2267  /**
2268   * Tool to get the reference to the region server object that holds the region of the specified
2269   * user table.
2270   * @param tableName user table to lookup in hbase:meta
2271   * @return region server that holds it, null if the row doesn't exist
2272   */
2273  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2274    throws IOException, InterruptedException {
2275    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2276    if (regions == null || regions.isEmpty()) {
2277      return null;
2278    }
2279    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2280
2281    byte[] firstRegionName =
2282      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2283        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2284
2285    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2286    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2287      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2288    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2289      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2290    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2291    while (retrier.shouldRetry()) {
2292      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2293      if (index != -1) {
2294        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2295      }
2296      // Came back -1. Region may not be online yet. Sleep a while.
2297      retrier.sleepUntilNextRetry();
2298    }
2299    return null;
2300  }
2301
2302  /**
2303   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2304   * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple
2305   * MiniMRCluster instances with different log dirs.
2306   * @throws IOException When starting the cluster fails.
2307   */
2308  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2309    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2310    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2311      "99.0");
2312    startMiniMapReduceCluster(2);
2313    return mrCluster;
2314  }
2315
2316  /**
2317   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2318   * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start
2319   * multiple MiniMRCluster instances with different log dirs.
2320   * @param servers The number of <code>TaskTracker</code>'s to start.
2321   * @throws IOException When starting the cluster fails.
2322   */
2323  private void startMiniMapReduceCluster(final int servers) throws IOException {
2324    if (mrCluster != null) {
2325      throw new IllegalStateException("MiniMRCluster is already running");
2326    }
2327    LOG.info("Starting mini mapreduce cluster...");
2328    setupClusterTestDir();
2329    createDirsAndSetProperties();
2330
2331    //// hadoop2 specific settings
2332    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2333    // we up the VM usable so that processes don't get killed.
2334    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2335
2336    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2337    // this avoids the problem by disabling speculative task execution in tests.
2338    conf.setBoolean("mapreduce.map.speculative", false);
2339    conf.setBoolean("mapreduce.reduce.speculative", false);
2340    ////
2341
2342    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2343    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2344    if (JVM.getJVMSpecVersion() >= 17) {
2345      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2346      conf.set("yarn.app.mapreduce.am.command-opts",
2347        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2348    }
2349
2350    // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem
2351    // instance and cause trouble in HBase. See HBASE-29802 for more details.
2352    JobConf mrClusterConf = new JobConf(conf);
2353    mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true);
2354    // Allow the user to override FS URI for this map-reduce cluster to use.
2355    mrCluster =
2356      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2357        1, null, null, mrClusterConf);
2358    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2359    if (jobConf == null) {
2360      jobConf = mrCluster.createJobConf();
2361    }
2362
2363    // Hadoop MiniMR overwrites this while it should not
2364    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2365    LOG.info("Mini mapreduce cluster started");
2366
2367    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2368    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2369    // necessary config properties here. YARN-129 required adding a few properties.
2370    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2371    // this for mrv2 support; mr1 ignores this
2372    conf.set("mapreduce.framework.name", "yarn");
2373    conf.setBoolean("yarn.is.minicluster", true);
2374    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2375    if (rmAddress != null) {
2376      conf.set("yarn.resourcemanager.address", rmAddress);
2377    }
2378    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2379    if (historyAddress != null) {
2380      conf.set("mapreduce.jobhistory.address", historyAddress);
2381    }
2382    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2383    if (schedulerAddress != null) {
2384      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2385    }
2386    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2387    if (mrJobHistoryWebappAddress != null) {
2388      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2389    }
2390    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2391    if (yarnRMWebappAddress != null) {
2392      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2393    }
2394  }
2395
2396  /**
2397   * Stops the previously started <code>MiniMRCluster</code>.
2398   */
2399  public void shutdownMiniMapReduceCluster() {
2400    if (mrCluster != null) {
2401      LOG.info("Stopping mini mapreduce cluster...");
2402      mrCluster.shutdown();
2403      mrCluster = null;
2404      LOG.info("Mini mapreduce cluster stopped");
2405    }
2406    // Restore configuration to point to local jobtracker
2407    conf.set("mapreduce.jobtracker.address", "local");
2408  }
2409
2410  /**
2411   * Create a stubbed out RegionServerService, mainly for getting FS.
2412   */
2413  public RegionServerServices createMockRegionServerService() throws IOException {
2414    return createMockRegionServerService((ServerName) null);
2415  }
2416
2417  /**
2418   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2419   * TestTokenAuthentication
2420   */
2421  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2422    throws IOException {
2423    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2424    rss.setFileSystem(getTestFileSystem());
2425    rss.setRpcServer(rpc);
2426    return rss;
2427  }
2428
2429  /**
2430   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2431   * TestOpenRegionHandler
2432   */
2433  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2434    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2435    rss.setFileSystem(getTestFileSystem());
2436    return rss;
2437  }
2438
2439  /**
2440   * Expire the Master's session
2441   */
2442  public void expireMasterSession() throws Exception {
2443    HMaster master = getMiniHBaseCluster().getMaster();
2444    expireSession(master.getZooKeeper(), false);
2445  }
2446
2447  /**
2448   * Expire a region server's session
2449   * @param index which RS
2450   */
2451  public void expireRegionServerSession(int index) throws Exception {
2452    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2453    expireSession(rs.getZooKeeper(), false);
2454    decrementMinRegionServerCount();
2455  }
2456
2457  private void decrementMinRegionServerCount() {
2458    // decrement the count for this.conf, for newly spwaned master
2459    // this.hbaseCluster shares this configuration too
2460    decrementMinRegionServerCount(getConfiguration());
2461
2462    // each master thread keeps a copy of configuration
2463    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2464      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2465    }
2466  }
2467
2468  private void decrementMinRegionServerCount(Configuration conf) {
2469    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2470    if (currentCount != -1) {
2471      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2472    }
2473  }
2474
2475  public void expireSession(ZKWatcher nodeZK) throws Exception {
2476    expireSession(nodeZK, false);
2477  }
2478
2479  /**
2480   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2481   * http://hbase.apache.org/book.html#trouble.zookeeper
2482   * <p/>
2483   * There are issues when doing this:
2484   * <ol>
2485   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2486   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2487   * </ol>
2488   * @param nodeZK      - the ZK watcher to expire
2489   * @param checkStatus - true to check if we can create a Table with the current configuration.
2490   */
2491  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2492    Configuration c = new Configuration(this.conf);
2493    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2494    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2495    byte[] password = zk.getSessionPasswd();
2496    long sessionID = zk.getSessionId();
2497
2498    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2499    // so we create a first watcher to be sure that the
2500    // event was sent. We expect that if our watcher receives the event
2501    // other watchers on the same machine will get is as well.
2502    // When we ask to close the connection, ZK does not close it before
2503    // we receive all the events, so don't have to capture the event, just
2504    // closing the connection should be enough.
2505    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2506      @Override
2507      public void process(WatchedEvent watchedEvent) {
2508        LOG.info("Monitor ZKW received event=" + watchedEvent);
2509      }
2510    }, sessionID, password);
2511
2512    // Making it expire
2513    ZooKeeper newZK =
2514      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2515
2516    // ensure that we have connection to the server before closing down, otherwise
2517    // the close session event will be eaten out before we start CONNECTING state
2518    long start = EnvironmentEdgeManager.currentTime();
2519    while (
2520      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2521    ) {
2522      Thread.sleep(1);
2523    }
2524    newZK.close();
2525    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2526
2527    // Now closing & waiting to be sure that the clients get it.
2528    monitor.close();
2529
2530    if (checkStatus) {
2531      getConnection().getTable(TableName.META_TABLE_NAME).close();
2532    }
2533  }
2534
2535  /**
2536   * Get the Mini HBase cluster.
2537   * @return hbase cluster
2538   * @see #getHBaseClusterInterface()
2539   */
2540  public SingleProcessHBaseCluster getHBaseCluster() {
2541    return getMiniHBaseCluster();
2542  }
2543
2544  /**
2545   * Returns the HBaseCluster instance.
2546   * <p>
2547   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2548   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2549   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2550   * instead w/o the need to type-cast.
2551   */
2552  public HBaseClusterInterface getHBaseClusterInterface() {
2553    // implementation note: we should rename this method as #getHBaseCluster(),
2554    // but this would require refactoring 90+ calls.
2555    return hbaseCluster;
2556  }
2557
2558  /**
2559   * Resets the connections so that the next time getConnection() is called, a new connection is
2560   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2561   * the connection is not valid anymore.
2562   * <p/>
2563   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2564   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2565   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2566   * the connection state automatically. Cleaning this is a much bigger refactor.
2567   */
2568  public void invalidateConnection() throws IOException {
2569    closeConnection();
2570    // Update the master addresses if they changed.
2571    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2572    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2573    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2574      masterConfigBefore, masterConfAfter);
2575    conf.set(HConstants.MASTER_ADDRS_KEY,
2576      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2577  }
2578
2579  /**
2580   * Get a shared Connection to the cluster. this method is thread safe.
2581   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2582   */
2583  public Connection getConnection() throws IOException {
2584    return getAsyncConnection().toConnection();
2585  }
2586
2587  /**
2588   * Get a assigned Connection to the cluster. this method is thread safe.
2589   * @param user assigned user
2590   * @return A Connection with assigned user.
2591   */
2592  public Connection getConnection(User user) throws IOException {
2593    return getAsyncConnection(user).toConnection();
2594  }
2595
2596  /**
2597   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2598   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2599   *         of cluster.
2600   */
2601  public AsyncClusterConnection getAsyncConnection() throws IOException {
2602    try {
2603      return asyncConnection.updateAndGet(connection -> {
2604        if (connection == null) {
2605          try {
2606            User user = UserProvider.instantiate(conf).getCurrent();
2607            connection = getAsyncConnection(user);
2608          } catch (IOException ioe) {
2609            throw new UncheckedIOException("Failed to create connection", ioe);
2610          }
2611        }
2612        return connection;
2613      });
2614    } catch (UncheckedIOException exception) {
2615      throw exception.getCause();
2616    }
2617  }
2618
2619  /**
2620   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2621   * @param user assigned user
2622   * @return An AsyncClusterConnection with assigned user.
2623   */
2624  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2625    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2626  }
2627
2628  public void closeConnection() throws IOException {
2629    if (hbaseAdmin != null) {
2630      Closeables.close(hbaseAdmin, true);
2631      hbaseAdmin = null;
2632    }
2633    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2634    if (asyncConnection != null) {
2635      Closeables.close(asyncConnection, true);
2636    }
2637  }
2638
2639  /**
2640   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2641   * it has no effect, it will be closed automatically when the cluster shutdowns
2642   */
2643  public Admin getAdmin() throws IOException {
2644    if (hbaseAdmin == null) {
2645      this.hbaseAdmin = getConnection().getAdmin();
2646    }
2647    return hbaseAdmin;
2648  }
2649
2650  private Admin hbaseAdmin = null;
2651
2652  /**
2653   * Returns an {@link Hbck} instance. Needs be closed when done.
2654   */
2655  public Hbck getHbck() throws IOException {
2656    return getConnection().getHbck();
2657  }
2658
2659  /**
2660   * Unassign the named region.
2661   * @param regionName The region to unassign.
2662   */
2663  public void unassignRegion(String regionName) throws IOException {
2664    unassignRegion(Bytes.toBytes(regionName));
2665  }
2666
2667  /**
2668   * Unassign the named region.
2669   * @param regionName The region to unassign.
2670   */
2671  public void unassignRegion(byte[] regionName) throws IOException {
2672    getAdmin().unassign(regionName);
2673  }
2674
2675  /**
2676   * Closes the region containing the given row.
2677   * @param row   The row to find the containing region.
2678   * @param table The table to find the region.
2679   */
2680  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2681    unassignRegionByRow(Bytes.toBytes(row), table);
2682  }
2683
2684  /**
2685   * Closes the region containing the given row.
2686   * @param row   The row to find the containing region.
2687   * @param table The table to find the region.
2688   */
2689  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2690    HRegionLocation hrl = table.getRegionLocation(row);
2691    unassignRegion(hrl.getRegion().getRegionName());
2692  }
2693
2694  /**
2695   * Retrieves a splittable region randomly from tableName
2696   * @param tableName   name of table
2697   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2698   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2699   */
2700  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2701    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2702    int regCount = regions.size();
2703    Set<Integer> attempted = new HashSet<>();
2704    int idx;
2705    int attempts = 0;
2706    do {
2707      regions = getHBaseCluster().getRegions(tableName);
2708      if (regCount != regions.size()) {
2709        // if there was region movement, clear attempted Set
2710        attempted.clear();
2711      }
2712      regCount = regions.size();
2713      // There are chances that before we get the region for the table from an RS the region may
2714      // be going for CLOSE. This may be because online schema change is enabled
2715      if (regCount > 0) {
2716        idx = ThreadLocalRandom.current().nextInt(regCount);
2717        // if we have just tried this region, there is no need to try again
2718        if (attempted.contains(idx)) {
2719          continue;
2720        }
2721        HRegion region = regions.get(idx);
2722        if (region.checkSplit().isPresent()) {
2723          return region;
2724        }
2725        attempted.add(idx);
2726      }
2727      attempts++;
2728    } while (maxAttempts == -1 || attempts < maxAttempts);
2729    return null;
2730  }
2731
2732  public MiniDFSCluster getDFSCluster() {
2733    return dfsCluster;
2734  }
2735
2736  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2737    setDFSCluster(cluster, true);
2738  }
2739
2740  /**
2741   * Set the MiniDFSCluster
2742   * @param cluster     cluster to use
2743   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2744   *                    is set.
2745   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2746   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2747   */
2748  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2749    throws IllegalStateException, IOException {
2750    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2751      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2752    }
2753    this.dfsCluster = cluster;
2754    this.setFs();
2755  }
2756
2757  public FileSystem getTestFileSystem() throws IOException {
2758    return HFileSystem.get(conf);
2759  }
2760
2761  /**
2762   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2763   * (30 seconds).
2764   * @param table Table to wait on.
2765   */
2766  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2767    waitTableAvailable(table.getName(), 30000);
2768  }
2769
2770  public void waitTableAvailable(TableName table, long timeoutMillis)
2771    throws InterruptedException, IOException {
2772    waitFor(timeoutMillis, predicateTableAvailable(table));
2773  }
2774
2775  /**
2776   * Wait until all regions in a table have been assigned
2777   * @param table         Table to wait on.
2778   * @param timeoutMillis Timeout.
2779   */
2780  public void waitTableAvailable(byte[] table, long timeoutMillis)
2781    throws InterruptedException, IOException {
2782    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2783  }
2784
2785  public String explainTableAvailability(TableName tableName) throws IOException {
2786    StringBuilder msg =
2787      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2788    if (getHBaseCluster().getMaster().isAlive()) {
2789      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2790        .getRegionStates().getRegionAssignments();
2791      final List<Pair<RegionInfo, ServerName>> metaLocations =
2792        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2793      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2794        RegionInfo hri = metaLocation.getFirst();
2795        ServerName sn = metaLocation.getSecond();
2796        if (!assignments.containsKey(hri)) {
2797          msg.append(", region ").append(hri)
2798            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2799        } else if (sn == null) {
2800          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2801        } else if (!sn.equals(assignments.get(hri))) {
2802          msg.append(",  region ").append(hri)
2803            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2804            .append(" <> ").append(assignments.get(hri));
2805        }
2806      }
2807    }
2808    return msg.toString();
2809  }
2810
2811  public String explainTableState(final TableName table, TableState.State state)
2812    throws IOException {
2813    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2814    if (tableState == null) {
2815      return "TableState in META: No table state in META for table " + table
2816        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2817    } else if (!tableState.inStates(state)) {
2818      return "TableState in META: Not " + state + " state, but " + tableState;
2819    } else {
2820      return "TableState in META: OK";
2821    }
2822  }
2823
2824  @Nullable
2825  public TableState findLastTableState(final TableName table) throws IOException {
2826    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2827    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2828      @Override
2829      public boolean visit(Result r) throws IOException {
2830        if (!Arrays.equals(r.getRow(), table.getName())) {
2831          return false;
2832        }
2833        TableState state = CatalogFamilyFormat.getTableState(r);
2834        if (state != null) {
2835          lastTableState.set(state);
2836        }
2837        return true;
2838      }
2839    };
2840    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2841      Integer.MAX_VALUE, visitor);
2842    return lastTableState.get();
2843  }
2844
2845  /**
2846   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2847   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2848   * table.
2849   * @param table the table to wait on.
2850   * @throws InterruptedException if interrupted while waiting
2851   * @throws IOException          if an IO problem is encountered
2852   */
2853  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2854    waitTableEnabled(table, 30000);
2855  }
2856
2857  /**
2858   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2859   * have been all assigned.
2860   * @see #waitTableEnabled(TableName, long)
2861   * @param table         Table to wait on.
2862   * @param timeoutMillis Time to wait on it being marked enabled.
2863   */
2864  public void waitTableEnabled(byte[] table, long timeoutMillis)
2865    throws InterruptedException, IOException {
2866    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2867  }
2868
2869  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2870    waitFor(timeoutMillis, predicateTableEnabled(table));
2871  }
2872
2873  /**
2874   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2875   * after default period (30 seconds)
2876   * @param table Table to wait on.
2877   */
2878  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2879    waitTableDisabled(table, 30000);
2880  }
2881
2882  public void waitTableDisabled(TableName table, long millisTimeout)
2883    throws InterruptedException, IOException {
2884    waitFor(millisTimeout, predicateTableDisabled(table));
2885  }
2886
2887  /**
2888   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2889   * @param table         Table to wait on.
2890   * @param timeoutMillis Time to wait on it being marked disabled.
2891   */
2892  public void waitTableDisabled(byte[] table, long timeoutMillis)
2893    throws InterruptedException, IOException {
2894    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2895  }
2896
2897  /**
2898   * Make sure that at least the specified number of region servers are running
2899   * @param num minimum number of region servers that should be running
2900   * @return true if we started some servers
2901   */
2902  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2903    boolean startedServer = false;
2904    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2905    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2906      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2907      startedServer = true;
2908    }
2909
2910    return startedServer;
2911  }
2912
2913  /**
2914   * Make sure that at least the specified number of region servers are running. We don't count the
2915   * ones that are currently stopping or are stopped.
2916   * @param num minimum number of region servers that should be running
2917   * @return true if we started some servers
2918   */
2919  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2920    boolean startedServer = ensureSomeRegionServersAvailable(num);
2921
2922    int nonStoppedServers = 0;
2923    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2924
2925      HRegionServer hrs = rst.getRegionServer();
2926      if (hrs.isStopping() || hrs.isStopped()) {
2927        LOG.info("A region server is stopped or stopping:" + hrs);
2928      } else {
2929        nonStoppedServers++;
2930      }
2931    }
2932    for (int i = nonStoppedServers; i < num; ++i) {
2933      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2934      startedServer = true;
2935    }
2936    return startedServer;
2937  }
2938
2939  /**
2940   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2941   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2942   * @param c                     Initial configuration
2943   * @param differentiatingSuffix Suffix to differentiate this user from others.
2944   * @return A new configuration instance with a different user set into it.
2945   */
2946  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2947    throws IOException {
2948    FileSystem currentfs = FileSystem.get(c);
2949    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2950      return User.getCurrent();
2951    }
2952    // Else distributed filesystem. Make a new instance per daemon. Below
2953    // code is taken from the AppendTestUtil over in hdfs.
2954    String username = User.getCurrent().getName() + differentiatingSuffix;
2955    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2956    return user;
2957  }
2958
2959  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2960    throws IOException {
2961    NavigableSet<String> online = new TreeSet<>();
2962    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2963      try {
2964        for (RegionInfo region : ProtobufUtil
2965          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2966          online.add(region.getRegionNameAsString());
2967        }
2968      } catch (RegionServerStoppedException e) {
2969        // That's fine.
2970      }
2971    }
2972    return online;
2973  }
2974
2975  /**
2976   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2977   * linger. Here is the exception you'll see:
2978   *
2979   * <pre>
2980   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2981   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2982   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2983   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2984   * </pre>
2985   *
2986   * @param stream A DFSClient.DFSOutputStream.
2987   */
2988  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2989    try {
2990      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2991      for (Class<?> clazz : clazzes) {
2992        String className = clazz.getSimpleName();
2993        if (className.equals("DFSOutputStream")) {
2994          if (clazz.isInstance(stream)) {
2995            Field maxRecoveryErrorCountField =
2996              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2997            maxRecoveryErrorCountField.setAccessible(true);
2998            maxRecoveryErrorCountField.setInt(stream, max);
2999            break;
3000          }
3001        }
3002      }
3003    } catch (Exception e) {
3004      LOG.info("Could not set max recovery field", e);
3005    }
3006  }
3007
3008  /**
3009   * Uses directly the assignment manager to assign the region. and waits until the specified region
3010   * has completed assignment.
3011   * @return true if the region is assigned false otherwise.
3012   */
3013  public boolean assignRegion(final RegionInfo regionInfo)
3014    throws IOException, InterruptedException {
3015    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3016    am.assign(regionInfo);
3017    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3018  }
3019
3020  /**
3021   * Move region to destination server and wait till region is completely moved and online
3022   * @param destRegion region to move
3023   * @param destServer destination server of the region
3024   */
3025  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3026    throws InterruptedException, IOException {
3027    HMaster master = getMiniHBaseCluster().getMaster();
3028    // TODO: Here we start the move. The move can take a while.
3029    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3030    while (true) {
3031      ServerName serverName =
3032        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3033      if (serverName != null && serverName.equals(destServer)) {
3034        assertRegionOnServer(destRegion, serverName, 2000);
3035        break;
3036      }
3037      Thread.sleep(10);
3038    }
3039  }
3040
3041  /**
3042   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3043   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3044   * master has been informed and updated hbase:meta with the regions deployed server.
3045   * @param tableName the table name
3046   */
3047  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3048    waitUntilAllRegionsAssigned(tableName,
3049      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3050  }
3051
3052  /**
3053   * Waith until all system table's regions get assigned
3054   */
3055  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3056    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3057  }
3058
3059  /**
3060   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3061   * timeout. This means all regions have been deployed, master has been informed and updated
3062   * hbase:meta with the regions deployed server.
3063   * @param tableName the table name
3064   * @param timeout   timeout, in milliseconds
3065   */
3066  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3067    throws IOException {
3068    if (!TableName.isMetaTableName(tableName)) {
3069      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3070        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3071          + timeout + "ms");
3072        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3073          @Override
3074          public String explainFailure() throws IOException {
3075            return explainTableAvailability(tableName);
3076          }
3077
3078          @Override
3079          public boolean evaluate() throws IOException {
3080            Scan scan = new Scan();
3081            scan.addFamily(HConstants.CATALOG_FAMILY);
3082            boolean tableFound = false;
3083            try (ResultScanner s = meta.getScanner(scan)) {
3084              for (Result r; (r = s.next()) != null;) {
3085                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3086                RegionInfo info = RegionInfo.parseFromOrNull(b);
3087                if (info != null && info.getTable().equals(tableName)) {
3088                  // Get server hosting this region from catalog family. Return false if no server
3089                  // hosting this region, or if the server hosting this region was recently killed
3090                  // (for fault tolerance testing).
3091                  tableFound = true;
3092                  byte[] server =
3093                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3094                  if (server == null) {
3095                    return false;
3096                  } else {
3097                    byte[] startCode =
3098                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3099                    ServerName serverName =
3100                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3101                        + Bytes.toLong(startCode));
3102                    if (
3103                      !getHBaseClusterInterface().isDistributedCluster()
3104                        && getHBaseCluster().isKilledRS(serverName)
3105                    ) {
3106                      return false;
3107                    }
3108                  }
3109                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3110                    return false;
3111                  }
3112                }
3113              }
3114            }
3115            if (!tableFound) {
3116              LOG.warn(
3117                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3118            }
3119            return tableFound;
3120          }
3121        });
3122      }
3123    }
3124    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3125    // check from the master state if we are using a mini cluster
3126    if (!getHBaseClusterInterface().isDistributedCluster()) {
3127      // So, all regions are in the meta table but make sure master knows of the assignments before
3128      // returning -- sometimes this can lag.
3129      HMaster master = getHBaseCluster().getMaster();
3130      final RegionStates states = master.getAssignmentManager().getRegionStates();
3131      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3132        @Override
3133        public String explainFailure() throws IOException {
3134          return explainTableAvailability(tableName);
3135        }
3136
3137        @Override
3138        public boolean evaluate() throws IOException {
3139          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3140          return hris != null && !hris.isEmpty();
3141        }
3142      });
3143    }
3144    LOG.info("All regions for table " + tableName + " assigned.");
3145  }
3146
3147  /**
3148   * Do a small get/scan against one store. This is required because store has no actual methods of
3149   * querying itself, and relies on StoreScanner.
3150   */
3151  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3152    Scan scan = new Scan(get);
3153    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3154      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3155      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3156      // readpoint 0.
3157      0);
3158
3159    List<Cell> result = new ArrayList<>();
3160    scanner.next(result);
3161    if (!result.isEmpty()) {
3162      // verify that we are on the row we want:
3163      Cell kv = result.get(0);
3164      if (!CellUtil.matchingRows(kv, get.getRow())) {
3165        result.clear();
3166      }
3167    }
3168    scanner.close();
3169    return result;
3170  }
3171
3172  /**
3173   * Create region split keys between startkey and endKey
3174   * @param numRegions the number of regions to be created. it has to be greater than 3.
3175   * @return resulting split keys
3176   */
3177  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3178    assertTrue(numRegions > 3);
3179    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3180    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3181    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3182    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3183    return result;
3184  }
3185
3186  /**
3187   * Do a small get/scan against one store. This is required because store has no actual methods of
3188   * querying itself, and relies on StoreScanner.
3189   */
3190  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3191    throws IOException {
3192    Get get = new Get(row);
3193    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3194    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3195
3196    return getFromStoreFile(store, get);
3197  }
3198
3199  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3200    final List<? extends Cell> actual) {
3201    final int eLen = expected.size();
3202    final int aLen = actual.size();
3203    final int minLen = Math.min(eLen, aLen);
3204
3205    int i = 0;
3206    while (
3207      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3208    ) {
3209      i++;
3210    }
3211
3212    if (additionalMsg == null) {
3213      additionalMsg = "";
3214    }
3215    if (!additionalMsg.isEmpty()) {
3216      additionalMsg = ". " + additionalMsg;
3217    }
3218
3219    if (eLen != aLen || i != minLen) {
3220      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3221        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3222        + " (length " + aLen + ")" + additionalMsg);
3223    }
3224  }
3225
3226  public static <T> String safeGetAsStr(List<T> lst, int i) {
3227    if (0 <= i && i < lst.size()) {
3228      return lst.get(i).toString();
3229    } else {
3230      return "<out_of_range>";
3231    }
3232  }
3233
3234  public String getRpcConnnectionURI() throws UnknownHostException {
3235    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3236  }
3237
3238  public String getZkConnectionURI() {
3239    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3240      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3241      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3242  }
3243
3244  /**
3245   * Get the zk based cluster key for this cluster.
3246   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3247   *             connection info of a cluster. Keep here only for compatibility.
3248   * @see #getRpcConnnectionURI()
3249   * @see #getZkConnectionURI()
3250   */
3251  @Deprecated
3252  public String getClusterKey() {
3253    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3254      + ":"
3255      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3256  }
3257
3258  /**
3259   * Creates a random table with the given parameters
3260   */
3261  public Table createRandomTable(TableName tableName, final Collection<String> families,
3262    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3263    final int numRowsPerFlush) throws IOException, InterruptedException {
3264    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3265      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3266      + maxVersions + "\n");
3267
3268    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3269    final int numCF = families.size();
3270    final byte[][] cfBytes = new byte[numCF][];
3271    {
3272      int cfIndex = 0;
3273      for (String cf : families) {
3274        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3275      }
3276    }
3277
3278    final int actualStartKey = 0;
3279    final int actualEndKey = Integer.MAX_VALUE;
3280    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3281    final int splitStartKey = actualStartKey + keysPerRegion;
3282    final int splitEndKey = actualEndKey - keysPerRegion;
3283    final String keyFormat = "%08x";
3284    final Table table = createTable(tableName, cfBytes, maxVersions,
3285      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3286      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3287
3288    if (hbaseCluster != null) {
3289      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3290    }
3291
3292    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3293
3294    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3295      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3296        final byte[] row = Bytes.toBytes(
3297          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3298
3299        Put put = new Put(row);
3300        Delete del = new Delete(row);
3301        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3302          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3303          final long ts = rand.nextInt();
3304          final byte[] qual = Bytes.toBytes("col" + iCol);
3305          if (rand.nextBoolean()) {
3306            final byte[] value =
3307              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3308                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3309            put.addColumn(cf, qual, ts, value);
3310          } else if (rand.nextDouble() < 0.8) {
3311            del.addColumn(cf, qual, ts);
3312          } else {
3313            del.addColumns(cf, qual, ts);
3314          }
3315        }
3316
3317        if (!put.isEmpty()) {
3318          mutator.mutate(put);
3319        }
3320
3321        if (!del.isEmpty()) {
3322          mutator.mutate(del);
3323        }
3324      }
3325      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3326      mutator.flush();
3327      if (hbaseCluster != null) {
3328        getMiniHBaseCluster().flushcache(table.getName());
3329      }
3330    }
3331    mutator.close();
3332
3333    return table;
3334  }
3335
3336  public static int randomFreePort() {
3337    return HBaseCommonTestingUtil.randomFreePort();
3338  }
3339
3340  public static String randomMultiCastAddress() {
3341    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3342  }
3343
3344  public static void waitForHostPort(String host, int port) throws IOException {
3345    final int maxTimeMs = 10000;
3346    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3347    IOException savedException = null;
3348    LOG.info("Waiting for server at " + host + ":" + port);
3349    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3350      try {
3351        Socket sock = new Socket(InetAddress.getByName(host), port);
3352        sock.close();
3353        savedException = null;
3354        LOG.info("Server at " + host + ":" + port + " is available");
3355        break;
3356      } catch (UnknownHostException e) {
3357        throw new IOException("Failed to look up " + host, e);
3358      } catch (IOException e) {
3359        savedException = e;
3360      }
3361      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3362    }
3363
3364    if (savedException != null) {
3365      throw savedException;
3366    }
3367  }
3368
3369  public static int getMetaRSPort(Connection connection) throws IOException {
3370    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3371      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3372    }
3373  }
3374
3375  /**
3376   * Due to async racing issue, a region may not be in the online region list of a region server
3377   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3378   */
3379  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3380    final long timeout) throws IOException, InterruptedException {
3381    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3382    while (true) {
3383      List<RegionInfo> regions = getAdmin().getRegions(server);
3384      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3385      long now = EnvironmentEdgeManager.currentTime();
3386      if (now > timeoutTime) break;
3387      Thread.sleep(10);
3388    }
3389    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3390  }
3391
3392  /**
3393   * Check to make sure the region is open on the specified region server, but not on any other one.
3394   */
3395  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3396    final long timeout) throws IOException, InterruptedException {
3397    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3398    while (true) {
3399      List<RegionInfo> regions = getAdmin().getRegions(server);
3400      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3401        List<JVMClusterUtil.RegionServerThread> rsThreads =
3402          getHBaseCluster().getLiveRegionServerThreads();
3403        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3404          HRegionServer rs = rsThread.getRegionServer();
3405          if (server.equals(rs.getServerName())) {
3406            continue;
3407          }
3408          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3409          for (HRegion r : hrs) {
3410            assertTrue("Region should not be double assigned",
3411              r.getRegionInfo().getRegionId() != hri.getRegionId());
3412          }
3413        }
3414        return; // good, we are happy
3415      }
3416      long now = EnvironmentEdgeManager.currentTime();
3417      if (now > timeoutTime) break;
3418      Thread.sleep(10);
3419    }
3420    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3421  }
3422
3423  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3424    TableDescriptor td =
3425      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3426    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3427    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3428  }
3429
3430  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3431    BlockCache blockCache) throws IOException {
3432    TableDescriptor td =
3433      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3434    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3435    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3436  }
3437
3438  public static void setFileSystemURI(String fsURI) {
3439    FS_URI = fsURI;
3440  }
3441
3442  /**
3443   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3444   */
3445  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3446    return new ExplainingPredicate<IOException>() {
3447      @Override
3448      public String explainFailure() throws IOException {
3449        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3450        return "found in transition: " + am.getRegionsInTransition().toString();
3451      }
3452
3453      @Override
3454      public boolean evaluate() throws IOException {
3455        HMaster master = getMiniHBaseCluster().getMaster();
3456        if (master == null) return false;
3457        AssignmentManager am = master.getAssignmentManager();
3458        if (am == null) return false;
3459        return !am.hasRegionsInTransition();
3460      }
3461    };
3462  }
3463
3464  /**
3465   * Returns a {@link Predicate} for checking that there are no procedure to region transition in
3466   * master
3467   */
3468  public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() {
3469    return new ExplainingPredicate<IOException>() {
3470      @Override
3471      public String explainFailure() throws IOException {
3472        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3473        return "Number of procedure scheduled for region transit: "
3474          + am.getRegionTransitScheduledCount();
3475      }
3476
3477      @Override
3478      public boolean evaluate() throws IOException {
3479        HMaster master = getMiniHBaseCluster().getMaster();
3480        if (master == null) {
3481          return false;
3482        }
3483        AssignmentManager am = master.getAssignmentManager();
3484        if (am == null) {
3485          return false;
3486        }
3487        return am.getRegionTransitScheduledCount() == 0;
3488      }
3489    };
3490  }
3491
3492  /**
3493   * Returns a {@link Predicate} for checking that table is enabled
3494   */
3495  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3496    return new ExplainingPredicate<IOException>() {
3497      @Override
3498      public String explainFailure() throws IOException {
3499        return explainTableState(tableName, TableState.State.ENABLED);
3500      }
3501
3502      @Override
3503      public boolean evaluate() throws IOException {
3504        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3505      }
3506    };
3507  }
3508
3509  /**
3510   * Returns a {@link Predicate} for checking that table is enabled
3511   */
3512  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3513    return new ExplainingPredicate<IOException>() {
3514      @Override
3515      public String explainFailure() throws IOException {
3516        return explainTableState(tableName, TableState.State.DISABLED);
3517      }
3518
3519      @Override
3520      public boolean evaluate() throws IOException {
3521        return getAdmin().isTableDisabled(tableName);
3522      }
3523    };
3524  }
3525
3526  /**
3527   * Returns a {@link Predicate} for checking that table is enabled
3528   */
3529  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3530    return new ExplainingPredicate<IOException>() {
3531      @Override
3532      public String explainFailure() throws IOException {
3533        return explainTableAvailability(tableName);
3534      }
3535
3536      @Override
3537      public boolean evaluate() throws IOException {
3538        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3539        if (tableAvailable) {
3540          try (Table table = getConnection().getTable(tableName)) {
3541            TableDescriptor htd = table.getDescriptor();
3542            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3543              .getAllRegionLocations()) {
3544              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3545                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3546                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3547              for (byte[] family : htd.getColumnFamilyNames()) {
3548                scan.addFamily(family);
3549              }
3550              try (ResultScanner scanner = table.getScanner(scan)) {
3551                scanner.next();
3552              }
3553            }
3554          }
3555        }
3556        return tableAvailable;
3557      }
3558    };
3559  }
3560
3561  /**
3562   * Wait until no regions in transition.
3563   * @param timeout How long to wait.
3564   */
3565  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3566    waitFor(timeout, predicateNoRegionsInTransition());
3567  }
3568
3569  /**
3570   * Wait until no regions in transition.
3571   * @param timeout How long to wait.
3572   */
3573  public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException {
3574    waitFor(timeout, predicateNoRegionTransitScheduled());
3575  }
3576
3577  /**
3578   * Wait until no regions in transition. (time limit 15min)
3579   */
3580  public void waitUntilNoRegionsInTransition() throws IOException {
3581    waitUntilNoRegionsInTransition(15 * 60000);
3582  }
3583
3584  /**
3585   * Wait until no TRSP is present
3586   */
3587  public void waitUntilNoRegionTransitScheduled() throws IOException {
3588    waitUntilNoRegionTransitScheduled(15 * 60000);
3589  }
3590
3591  /**
3592   * Wait until labels is ready in VisibilityLabelsCache.
3593   */
3594  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3595    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3596    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3597
3598      @Override
3599      public boolean evaluate() {
3600        for (String label : labels) {
3601          if (labelsCache.getLabelOrdinal(label) == 0) {
3602            return false;
3603          }
3604        }
3605        return true;
3606      }
3607
3608      @Override
3609      public String explainFailure() {
3610        for (String label : labels) {
3611          if (labelsCache.getLabelOrdinal(label) == 0) {
3612            return label + " is not available yet";
3613          }
3614        }
3615        return "";
3616      }
3617    });
3618  }
3619
3620  /**
3621   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3622   * available.
3623   * @return the list of column descriptors
3624   */
3625  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3626    return generateColumnDescriptors("");
3627  }
3628
3629  /**
3630   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3631   * available.
3632   * @param prefix family names prefix
3633   * @return the list of column descriptors
3634   */
3635  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3636    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3637    long familyId = 0;
3638    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3639      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3640        for (BloomType bloomType : BloomType.values()) {
3641          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3642          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3643            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3644          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3645          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3646          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3647          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3648          familyId++;
3649        }
3650      }
3651    }
3652    return columnFamilyDescriptors;
3653  }
3654
3655  /**
3656   * Get supported compression algorithms.
3657   * @return supported compression algorithms.
3658   */
3659  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3660    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3661    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3662    for (String algoName : allAlgos) {
3663      try {
3664        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3665        algo.getCompressor();
3666        supportedAlgos.add(algo);
3667      } catch (Throwable t) {
3668        // this algo is not available
3669      }
3670    }
3671    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3672  }
3673
3674  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3675    Scan scan = new Scan().withStartRow(row);
3676    scan.setReadType(ReadType.PREAD);
3677    scan.setCaching(1);
3678    scan.setReversed(true);
3679    scan.addFamily(family);
3680    try (RegionScanner scanner = r.getScanner(scan)) {
3681      List<Cell> cells = new ArrayList<>(1);
3682      scanner.next(cells);
3683      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3684        return null;
3685      }
3686      return Result.create(cells);
3687    }
3688  }
3689
3690  private boolean isTargetTable(final byte[] inRow, Cell c) {
3691    String inputRowString = Bytes.toString(inRow);
3692    int i = inputRowString.indexOf(HConstants.DELIMITER);
3693    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3694    int o = outputRowString.indexOf(HConstants.DELIMITER);
3695    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3696  }
3697
3698  /**
3699   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3700   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3701   * kerby KDC server and utility for using it,
3702   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3703   * less baggage. It came in in HBASE-5291.
3704   */
3705  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3706    Properties conf = MiniKdc.createConf();
3707    conf.put(MiniKdc.DEBUG, true);
3708    MiniKdc kdc = null;
3709    File dir = null;
3710    // There is time lag between selecting a port and trying to bind with it. It's possible that
3711    // another service captures the port in between which'll result in BindException.
3712    boolean bindException;
3713    int numTries = 0;
3714    do {
3715      try {
3716        bindException = false;
3717        dir = new File(getDataTestDir("kdc").toUri().getPath());
3718        kdc = new MiniKdc(conf, dir);
3719        kdc.start();
3720      } catch (BindException e) {
3721        FileUtils.deleteDirectory(dir); // clean directory
3722        numTries++;
3723        if (numTries == 3) {
3724          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3725          throw e;
3726        }
3727        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3728        bindException = true;
3729      }
3730    } while (bindException);
3731    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3732    return kdc;
3733  }
3734
3735  public int getNumHFiles(final TableName tableName, final byte[] family) {
3736    int numHFiles = 0;
3737    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3738      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3739    }
3740    return numHFiles;
3741  }
3742
3743  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3744    final byte[] family) {
3745    int numHFiles = 0;
3746    for (Region region : rs.getRegions(tableName)) {
3747      numHFiles += region.getStore(family).getStorefilesCount();
3748    }
3749    return numHFiles;
3750  }
3751
3752  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3753    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3754    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3755    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3756    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3757    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3758        it2 = rtdFamilies.iterator(); it.hasNext();) {
3759      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3760    }
3761  }
3762
3763  /**
3764   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3765   * invocations.
3766   */
3767  public static void await(final long sleepMillis, final BooleanSupplier condition)
3768    throws InterruptedException {
3769    try {
3770      while (!condition.getAsBoolean()) {
3771        Thread.sleep(sleepMillis);
3772      }
3773    } catch (RuntimeException e) {
3774      if (e.getCause() instanceof AssertionError) {
3775        throw (AssertionError) e.getCause();
3776      }
3777      throw e;
3778    }
3779  }
3780
3781  public void createRegionDir(RegionInfo hri) throws IOException {
3782    Path rootDir = getDataTestDir();
3783    Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable());
3784    Path regionDir = new Path(tableDir, hri.getEncodedName());
3785    FileSystem fs = getTestFileSystem();
3786    if (!fs.exists(regionDir)) {
3787      fs.mkdirs(regionDir);
3788    }
3789  }
3790
3791  public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem)
3792    throws IOException {
3793    Path tableDir =
3794      CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable());
3795    HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir,
3796      regionInfo);
3797  }
3798}