001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.File;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.io.UncheckedIOException;
030import java.lang.reflect.Field;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
092import org.apache.hadoop.hbase.fs.HFileSystem;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.BlockCache;
097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
098import org.apache.hadoop.hbase.io.hfile.HFile;
099import org.apache.hadoop.hbase.ipc.RpcServerInterface;
100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
101import org.apache.hadoop.hbase.master.HMaster;
102import org.apache.hadoop.hbase.master.MasterFileSystem;
103import org.apache.hadoop.hbase.master.RegionState;
104import org.apache.hadoop.hbase.master.ServerManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
107import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
108import org.apache.hadoop.hbase.master.assignment.RegionStates;
109import org.apache.hadoop.hbase.mob.MobFileCache;
110import org.apache.hadoop.hbase.regionserver.BloomType;
111import org.apache.hadoop.hbase.regionserver.ChunkCreator;
112import org.apache.hadoop.hbase.regionserver.HRegion;
113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
114import org.apache.hadoop.hbase.regionserver.HRegionServer;
115import org.apache.hadoop.hbase.regionserver.HStore;
116import org.apache.hadoop.hbase.regionserver.InternalScanner;
117import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
118import org.apache.hadoop.hbase.regionserver.Region;
119import org.apache.hadoop.hbase.regionserver.RegionScanner;
120import org.apache.hadoop.hbase.regionserver.RegionServerServices;
121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
122import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
123import org.apache.hadoop.hbase.security.User;
124import org.apache.hadoop.hbase.security.UserProvider;
125import org.apache.hadoop.hbase.security.access.AccessController;
126import org.apache.hadoop.hbase.security.access.PermissionStorage;
127import org.apache.hadoop.hbase.security.access.SecureTestUtil;
128import org.apache.hadoop.hbase.security.token.TokenProvider;
129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
131import org.apache.hadoop.hbase.util.Bytes;
132import org.apache.hadoop.hbase.util.CommonFSUtils;
133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
134import org.apache.hadoop.hbase.util.FSUtils;
135import org.apache.hadoop.hbase.util.JVM;
136import org.apache.hadoop.hbase.util.JVMClusterUtil;
137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
139import org.apache.hadoop.hbase.util.Pair;
140import org.apache.hadoop.hbase.util.RetryCounter;
141import org.apache.hadoop.hbase.util.Threads;
142import org.apache.hadoop.hbase.wal.WAL;
143import org.apache.hadoop.hbase.wal.WALFactory;
144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
145import org.apache.hadoop.hbase.zookeeper.ZKConfig;
146import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
147import org.apache.hadoop.hdfs.DFSClient;
148import org.apache.hadoop.hdfs.DFSConfigKeys;
149import org.apache.hadoop.hdfs.DistributedFileSystem;
150import org.apache.hadoop.hdfs.MiniDFSCluster;
151import org.apache.hadoop.hdfs.server.datanode.DataNode;
152import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
153import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
154import org.apache.hadoop.mapred.JobConf;
155import org.apache.hadoop.mapred.MiniMRCluster;
156import org.apache.hadoop.metrics2.impl.JmxCacheBuster;
157import org.apache.hadoop.minikdc.MiniKdc;
158import org.apache.yetus.audience.InterfaceAudience;
159import org.apache.yetus.audience.InterfaceStability;
160import org.apache.zookeeper.WatchedEvent;
161import org.apache.zookeeper.ZooKeeper;
162import org.apache.zookeeper.ZooKeeper.States;
163
164import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
165
166import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
167
168/**
169 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
170 * functionality. Create an instance and keep it around testing HBase.
171 * <p/>
172 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
173 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
174 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
175 * cluster.
176 * <p/>
177 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
178 * <p/>
179 * It does not set logging levels.
180 * <p/>
181 * In the configuration properties, default values for master-info-port and region-server-port are
182 * overridden such that a random port will be assigned (thus avoiding port contention if another
183 * local HBase instance is already running).
184 * <p/>
185 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
186 * setting it to true.
187 */
188@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
189@InterfaceStability.Evolving
190public class HBaseTestingUtil extends HBaseZKTestingUtil {
191
192  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
193
194  private MiniDFSCluster dfsCluster = null;
195  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
196
197  private volatile HBaseClusterInterface hbaseCluster = null;
198  private MiniMRCluster mrCluster = null;
199
200  /** If there is a mini cluster running for this testing utility instance. */
201  private volatile boolean miniClusterRunning;
202
203  private String hadoopLogDir;
204
205  /**
206   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
207   */
208  private Path dataTestDirOnTestFS = null;
209
210  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
211
212  /** Filesystem URI used for map-reduce mini-cluster setup */
213  private static String FS_URI;
214
215  /** This is for unit tests parameterized with a single boolean. */
216  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
217
218  static {
219    // JmxCacheBuster may cause dead lock in test environment. As on master side, the table/region
220    // related metrics updating will finally lead to a meta access, so if meta is not online yet, we
221    // will block when updating while holding the metrics lock. But when we assign meta, there are
222    // bunch of places where we need to register a new metrics thus need to get the metrics lock,
223    // and then lead to a dead lock and cause the test to hang forever.
224    // The code is in hadoop so there is no easy way for us to fix, so here we just stop
225    // JmxCacheBuster to stabilize our tests first. See HBASE-30118 for more details and future
226    // plans.
227    JmxCacheBuster.stop();
228  }
229
230  /**
231   * Checks to see if a specific port is available.
232   * @param port the port number to check for availability
233   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
234   */
235  public static boolean available(int port) {
236    ServerSocket ss = null;
237    DatagramSocket ds = null;
238    try {
239      ss = new ServerSocket(port);
240      ss.setReuseAddress(true);
241      ds = new DatagramSocket(port);
242      ds.setReuseAddress(true);
243      return true;
244    } catch (IOException e) {
245      // Do nothing
246    } finally {
247      if (ds != null) {
248        ds.close();
249      }
250
251      if (ss != null) {
252        try {
253          ss.close();
254        } catch (IOException e) {
255          /* should not be thrown */
256        }
257      }
258    }
259
260    return false;
261  }
262
263  /**
264   * Create all combinations of Bloom filters and compression algorithms for testing.
265   */
266  private static List<Object[]> bloomAndCompressionCombinations() {
267    List<Object[]> configurations = new ArrayList<>();
268    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
269      for (BloomType bloomType : BloomType.values()) {
270        configurations.add(new Object[] { comprAlgo, bloomType });
271      }
272    }
273    return Collections.unmodifiableList(configurations);
274  }
275
276  /**
277   * Create combination of memstoreTS and tags
278   */
279  private static List<Object[]> memStoreTSAndTagsCombination() {
280    List<Object[]> configurations = new ArrayList<>();
281    configurations.add(new Object[] { false, false });
282    configurations.add(new Object[] { false, true });
283    configurations.add(new Object[] { true, false });
284    configurations.add(new Object[] { true, true });
285    return Collections.unmodifiableList(configurations);
286  }
287
288  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
289    List<Object[]> configurations = new ArrayList<>();
290    configurations.add(new Object[] { false, false, true });
291    configurations.add(new Object[] { false, false, false });
292    configurations.add(new Object[] { false, true, true });
293    configurations.add(new Object[] { false, true, false });
294    configurations.add(new Object[] { true, false, true });
295    configurations.add(new Object[] { true, false, false });
296    configurations.add(new Object[] { true, true, true });
297    configurations.add(new Object[] { true, true, false });
298    return Collections.unmodifiableList(configurations);
299  }
300
301  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
302    bloomAndCompressionCombinations();
303
304  /**
305   * <p>
306   * Create an HBaseTestingUtility using a default configuration.
307   * <p>
308   * Initially, all tmp files are written to a local test data directory. Once
309   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
310   * data will be written to the DFS directory instead.
311   */
312  public HBaseTestingUtil() {
313    this(HBaseConfiguration.create());
314  }
315
316  /**
317   * <p>
318   * Create an HBaseTestingUtility using a given configuration.
319   * <p>
320   * Initially, all tmp files are written to a local test data directory. Once
321   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
322   * data will be written to the DFS directory instead.
323   * @param conf The configuration to use for further operations
324   */
325  public HBaseTestingUtil(@Nullable Configuration conf) {
326    super(conf);
327
328    // a hbase checksum verification failure will cause unit tests to fail
329    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
330
331    // Save this for when setting default file:// breaks things
332    if (this.conf.get("fs.defaultFS") != null) {
333      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
334    }
335    if (this.conf.get(HConstants.HBASE_DIR) != null) {
336      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
337    }
338    // Every cluster is a local cluster until we start DFS
339    // Note that conf could be null, but this.conf will not be
340    String dataTestDir = getDataTestDir().toString();
341    this.conf.set("fs.defaultFS", "file:///");
342    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
343    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
344    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
345    // If the value for random ports isn't set set it to true, thus making
346    // tests opt-out for random port assignment
347    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
348      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
349  }
350
351  /**
352   * Close both the region {@code r} and it's underlying WAL. For use in tests.
353   */
354  public static void closeRegionAndWAL(final Region r) throws IOException {
355    closeRegionAndWAL((HRegion) r);
356  }
357
358  /**
359   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
360   */
361  public static void closeRegionAndWAL(final HRegion r) throws IOException {
362    if (r == null) return;
363    r.close();
364    if (r.getWAL() == null) return;
365    r.getWAL().close();
366  }
367
368  /**
369   * Start mini secure cluster with given kdc and principals.
370   * @param kdc              Mini kdc server
371   * @param servicePrincipal Service principal without realm.
372   * @param spnegoPrincipal  Spnego principal without realm.
373   * @return Handler to shutdown the cluster
374   */
375  public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
376    String spnegoPrincipal) throws Exception {
377    Configuration conf = getConfiguration();
378
379    SecureTestUtil.enableSecurity(conf);
380    VisibilityTestUtil.enableVisiblityLabels(conf);
381    SecureTestUtil.verifyConfiguration(conf);
382
383    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
384      AccessController.class.getName() + ',' + TokenProvider.class.getName());
385
386    HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
387      spnegoPrincipal + '@' + kdc.getRealm());
388
389    startMiniCluster();
390    try {
391      waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
392    } catch (Exception e) {
393      shutdownMiniCluster();
394      throw e;
395    }
396
397    return this::shutdownMiniCluster;
398  }
399
400  /**
401   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
402   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
403   * by the Configuration. If say, a Connection was being used against a cluster that had been
404   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
405   * Rather than use the return direct, its usually best to make a copy and use that. Do
406   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
407   * @return Instance of Configuration.
408   */
409  @Override
410  public Configuration getConfiguration() {
411    return super.getConfiguration();
412  }
413
414  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
415    this.hbaseCluster = hbaseCluster;
416  }
417
418  /**
419   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
420   * have many concurrent tests running if we need to. Moding a System property is not the way to do
421   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
422   * not anything can do about it at moment; single instance only is how the minidfscluster works.
423   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
424   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
425   * (We do not create them!).
426   * @return The calculated data test build directory, if newly-created.
427   */
428  @Override
429  protected Path setupDataTestDir() {
430    Path testPath = super.setupDataTestDir();
431    if (null == testPath) {
432      return null;
433    }
434
435    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
436
437    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
438    // we want our own value to ensure uniqueness on the same machine
439    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
440
441    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
442    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
443    return testPath;
444  }
445
446  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
447
448    String sysValue = System.getProperty(propertyName);
449
450    if (sysValue != null) {
451      // There is already a value set. So we do nothing but hope
452      // that there will be no conflicts
453      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
454        + " so I do NOT create it in " + parent);
455      String confValue = conf.get(propertyName);
456      if (confValue != null && !confValue.endsWith(sysValue)) {
457        LOG.warn(propertyName + " property value differs in configuration and system: "
458          + "Configuration=" + confValue + " while System=" + sysValue
459          + " Erasing configuration value by system value.");
460      }
461      conf.set(propertyName, sysValue);
462    } else {
463      // Ok, it's not set, so we create it as a subdirectory
464      createSubDir(propertyName, parent, subDirName);
465      System.setProperty(propertyName, conf.get(propertyName));
466    }
467  }
468
469  /**
470   * @return Where to write test data on the test filesystem; Returns working directory for the test
471   *         filesystem by default
472   * @see #setupDataTestDirOnTestFS()
473   * @see #getTestFileSystem()
474   */
475  private Path getBaseTestDirOnTestFS() throws IOException {
476    FileSystem fs = getTestFileSystem();
477    return new Path(fs.getWorkingDirectory(), "test-data");
478  }
479
480  /**
481   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
482   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
483   * on it.
484   * @return a unique path in the test filesystem
485   */
486  public Path getDataTestDirOnTestFS() throws IOException {
487    if (dataTestDirOnTestFS == null) {
488      setupDataTestDirOnTestFS();
489    }
490
491    return dataTestDirOnTestFS;
492  }
493
494  /**
495   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
496   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
497   * on it.
498   * @return a unique path in the test filesystem
499   * @param subdirName name of the subdir to create under the base test dir
500   */
501  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
502    return new Path(getDataTestDirOnTestFS(), subdirName);
503  }
504
505  /**
506   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
507   * setup.
508   */
509  private void setupDataTestDirOnTestFS() throws IOException {
510    if (dataTestDirOnTestFS != null) {
511      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
512      return;
513    }
514    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
515  }
516
517  /**
518   * Sets up a new path in test filesystem to be used by tests.
519   */
520  private Path getNewDataTestDirOnTestFS() throws IOException {
521    // The file system can be either local, mini dfs, or if the configuration
522    // is supplied externally, it can be an external cluster FS. If it is a local
523    // file system, the tests should use getBaseTestDir, otherwise, we can use
524    // the working directory, and create a unique sub dir there
525    FileSystem fs = getTestFileSystem();
526    Path newDataTestDir;
527    String randomStr = getRandomUUID().toString();
528    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
529      newDataTestDir = new Path(getDataTestDir(), randomStr);
530      File dataTestDir = new File(newDataTestDir.toString());
531      if (deleteOnExit()) dataTestDir.deleteOnExit();
532    } else {
533      Path base = getBaseTestDirOnTestFS();
534      newDataTestDir = new Path(base, randomStr);
535      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
536    }
537    return newDataTestDir;
538  }
539
540  /**
541   * Cleans the test data directory on the test filesystem.
542   * @return True if we removed the test dirs
543   */
544  public boolean cleanupDataTestDirOnTestFS() throws IOException {
545    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
546    if (ret) {
547      dataTestDirOnTestFS = null;
548    }
549    return ret;
550  }
551
552  /**
553   * Cleans a subdirectory under the test data directory on the test filesystem.
554   * @return True if we removed child
555   */
556  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
557    Path cpath = getDataTestDirOnTestFS(subdirName);
558    return getTestFileSystem().delete(cpath, true);
559  }
560
561  /**
562   * Start a minidfscluster.
563   * @param servers How many DNs to start.
564   * @see #shutdownMiniDFSCluster()
565   * @return The mini dfs cluster created.
566   */
567  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
568    return startMiniDFSCluster(servers, null);
569  }
570
571  /**
572   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
573   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
574   * instances of the datanodes will have the same host name.
575   * @param hosts hostnames DNs to run on.
576   * @see #shutdownMiniDFSCluster()
577   * @return The mini dfs cluster created.
578   */
579  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
580    if (hosts != null && hosts.length != 0) {
581      return startMiniDFSCluster(hosts.length, hosts);
582    } else {
583      return startMiniDFSCluster(1, null);
584    }
585  }
586
587  /**
588   * Start a minidfscluster. Can only create one.
589   * @param servers How many DNs to start.
590   * @param hosts   hostnames DNs to run on.
591   * @see #shutdownMiniDFSCluster()
592   * @return The mini dfs cluster created.
593   */
594  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
595    return startMiniDFSCluster(servers, null, hosts);
596  }
597
598  private void setFs() throws IOException {
599    if (this.dfsCluster == null) {
600      LOG.info("Skipping setting fs because dfsCluster is null");
601      return;
602    }
603    FileSystem fs = this.dfsCluster.getFileSystem();
604    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
605
606    // re-enable this check with dfs
607    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
608  }
609
610  // Workaround to avoid IllegalThreadStateException
611  // See HBASE-27148 for more details
612  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
613
614    private volatile boolean stopped = false;
615
616    private final MiniDFSCluster cluster;
617
618    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
619      super("FsDatasetAsyncDiskServiceFixer");
620      setDaemon(true);
621      this.cluster = cluster;
622    }
623
624    @Override
625    public void run() {
626      while (!stopped) {
627        try {
628          Thread.sleep(30000);
629        } catch (InterruptedException e) {
630          Thread.currentThread().interrupt();
631          continue;
632        }
633        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
634        // timeout of the thread pool executor is 60 seconds by default.
635        try {
636          for (DataNode dn : cluster.getDataNodes()) {
637            FsDatasetSpi<?> dataset = dn.getFSDataset();
638            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
639            service.setAccessible(true);
640            Object asyncDiskService = service.get(dataset);
641            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
642            group.setAccessible(true);
643            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
644            if (threadGroup.isDaemon()) {
645              threadGroup.setDaemon(false);
646            }
647          }
648        } catch (NoSuchFieldException e) {
649          LOG.debug("NoSuchFieldException: " + e.getMessage()
650            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
651            + "See HBASE-27595 for details.");
652        } catch (Exception e) {
653          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
654        }
655      }
656    }
657
658    void shutdown() {
659      stopped = true;
660      interrupt();
661    }
662  }
663
664  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
665    throws Exception {
666    createDirsAndSetProperties();
667    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
668
669    this.dfsCluster =
670      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
671    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
672    this.dfsClusterFixer.start();
673    // Set this just-started cluster as our filesystem.
674    setFs();
675
676    // Wait for the cluster to be totally up
677    this.dfsCluster.waitClusterUp();
678
679    // reset the test directory for test file system
680    dataTestDirOnTestFS = null;
681    String dataTestDir = getDataTestDir().toString();
682    conf.set(HConstants.HBASE_DIR, dataTestDir);
683    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
684
685    return this.dfsCluster;
686  }
687
688  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
689    createDirsAndSetProperties();
690    dfsCluster =
691      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
692    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
693    this.dfsClusterFixer.start();
694    return dfsCluster;
695  }
696
697  /**
698   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
699   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
700   * the conf.
701   *
702   * <pre>
703   * Configuration conf = TEST_UTIL.getConfiguration();
704   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
705   *   Map.Entry&lt;String, String&gt; e = i.next();
706   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
707   * }
708   * </pre>
709   */
710  private void createDirsAndSetProperties() throws IOException {
711    setupClusterTestDir();
712    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
713    createDirAndSetProperty("test.cache.data");
714    createDirAndSetProperty("hadoop.tmp.dir");
715    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
716    createDirAndSetProperty("mapreduce.cluster.local.dir");
717    createDirAndSetProperty("mapreduce.cluster.temp.dir");
718    enableShortCircuit();
719
720    Path root = getDataTestDirOnTestFS("hadoop");
721    conf.set(MapreduceTestingShim.getMROutputDirProp(),
722      new Path(root, "mapred-output-dir").toString());
723    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
724    conf.set("mapreduce.jobtracker.staging.root.dir",
725      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
726    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
727    conf.set("yarn.app.mapreduce.am.staging-dir",
728      new Path(root, "mapreduce-am-staging-root-dir").toString());
729
730    // Frustrate yarn's and hdfs's attempts at writing /tmp.
731    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
732    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
733    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
734    createDirAndSetProperty("yarn.nodemanager.log-dirs");
735    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
736    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
737    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
738    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
739    createDirAndSetProperty("dfs.journalnode.edits.dir");
740    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
741    createDirAndSetProperty("nfs.dump.dir");
742    createDirAndSetProperty("java.io.tmpdir");
743    createDirAndSetProperty("dfs.journalnode.edits.dir");
744    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
745    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
746
747    // disable metrics logger since it depend on commons-logging internal classes and we do not want
748    // commons-logging on our classpath
749    conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
750    conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
751  }
752
753  /**
754   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
755   * Default to false.
756   */
757  public boolean isNewVersionBehaviorEnabled() {
758    final String propName = "hbase.tests.new.version.behavior";
759    String v = System.getProperty(propName);
760    if (v != null) {
761      return Boolean.parseBoolean(v);
762    }
763    return false;
764  }
765
766  /**
767   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
768   * allows to specify this parameter on the command line. If not set, default is true.
769   */
770  public boolean isReadShortCircuitOn() {
771    final String propName = "hbase.tests.use.shortcircuit.reads";
772    String readOnProp = System.getProperty(propName);
773    if (readOnProp != null) {
774      return Boolean.parseBoolean(readOnProp);
775    } else {
776      return conf.getBoolean(propName, false);
777    }
778  }
779
780  /**
781   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
782   * including skipping the hdfs checksum checks.
783   */
784  private void enableShortCircuit() {
785    if (isReadShortCircuitOn()) {
786      String curUser = System.getProperty("user.name");
787      LOG.info("read short circuit is ON for user " + curUser);
788      // read short circuit, for hdfs
789      conf.set("dfs.block.local-path-access.user", curUser);
790      // read short circuit, for hbase
791      conf.setBoolean("dfs.client.read.shortcircuit", true);
792      // Skip checking checksum, for the hdfs client and the datanode
793      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
794    } else {
795      LOG.info("read short circuit is OFF");
796    }
797  }
798
799  private String createDirAndSetProperty(final String property) {
800    return createDirAndSetProperty(property, property);
801  }
802
803  private String createDirAndSetProperty(final String relPath, String property) {
804    String path = getDataTestDir(relPath).toString();
805    System.setProperty(property, path);
806    conf.set(property, path);
807    new File(path).mkdirs();
808    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
809    return path;
810  }
811
812  /**
813   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
814   */
815  public void shutdownMiniDFSCluster() throws IOException {
816    if (this.dfsCluster != null) {
817      // The below throws an exception per dn, AsynchronousCloseException.
818      this.dfsCluster.shutdown();
819      dfsCluster = null;
820      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
821      // have a fixer
822      if (dfsClusterFixer != null) {
823        this.dfsClusterFixer.shutdown();
824        dfsClusterFixer = null;
825      }
826      dataTestDirOnTestFS = null;
827      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
828    }
829  }
830
831  /**
832   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
833   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
834   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
835   * @see #startMiniCluster(StartTestingClusterOption option)
836   * @see #shutdownMiniDFSCluster()
837   */
838  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
839    StartTestingClusterOption option = StartTestingClusterOption.builder()
840      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
841    return startMiniCluster(option);
842  }
843
844  /**
845   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
846   * value can be found in {@link StartTestingClusterOption.Builder}.
847   * @see #startMiniCluster(StartTestingClusterOption option)
848   * @see #shutdownMiniDFSCluster()
849   */
850  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
851    return startMiniCluster(StartTestingClusterOption.builder().build());
852  }
853
854  /**
855   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
856   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
857   * under System property test.build.data, to be cleaned up on exit.
858   * @see #shutdownMiniDFSCluster()
859   */
860  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
861    throws Exception {
862    LOG.info("Starting up minicluster with option: {}", option);
863
864    // If we already put up a cluster, fail.
865    if (miniClusterRunning) {
866      throw new IllegalStateException("A mini-cluster is already running");
867    }
868    miniClusterRunning = true;
869
870    setupClusterTestDir();
871
872    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
873    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
874    if (dfsCluster == null) {
875      LOG.info("STARTING DFS");
876      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
877    } else {
878      LOG.info("NOT STARTING DFS");
879    }
880
881    // Start up a zk cluster.
882    if (getZkCluster() == null) {
883      startMiniZKCluster(option.getNumZkServers());
884    }
885
886    // Start the MiniHBaseCluster
887    return startMiniHBaseCluster(option);
888  }
889
890  /**
891   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
892   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
893   * @return Reference to the hbase mini hbase cluster.
894   * @see #startMiniCluster(StartTestingClusterOption)
895   * @see #shutdownMiniHBaseCluster()
896   */
897  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
898    throws IOException, InterruptedException {
899    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
900    createRootDir(option.isCreateRootDir());
901    if (option.isCreateWALDir()) {
902      createWALRootDir();
903    }
904    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
905    // for tests that do not read hbase-defaults.xml
906    setHBaseFsTmpDir();
907
908    // These settings will make the server waits until this exact number of
909    // regions servers are connected.
910    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
911      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
912    }
913    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
914      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
915    }
916
917    Configuration c = new Configuration(this.conf);
918    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
919      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
920      option.getMasterClass(), option.getRsClass());
921    // Populate the master address configuration from mini cluster configuration.
922    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
923    // Don't leave here till we've done a successful scan of the hbase:meta
924    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
925      ResultScanner s = t.getScanner(new Scan())) {
926      for (;;) {
927        if (s.next() == null) {
928          break;
929        }
930      }
931    }
932
933    getAdmin(); // create immediately the hbaseAdmin
934    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
935
936    return (SingleProcessHBaseCluster) hbaseCluster;
937  }
938
939  /**
940   * Starts up mini hbase cluster using default options. Default options can be found in
941   * {@link StartTestingClusterOption.Builder}.
942   * @see #startMiniHBaseCluster(StartTestingClusterOption)
943   * @see #shutdownMiniHBaseCluster()
944   */
945  public SingleProcessHBaseCluster startMiniHBaseCluster()
946    throws IOException, InterruptedException {
947    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
948  }
949
950  /**
951   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
952   * {@link #startMiniCluster()}. All other options will use default values, defined in
953   * {@link StartTestingClusterOption.Builder}.
954   * @param numMasters       Master node number.
955   * @param numRegionServers Number of region servers.
956   * @return The mini HBase cluster created.
957   * @see #shutdownMiniHBaseCluster()
958   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
959   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
960   * @see #startMiniHBaseCluster(StartTestingClusterOption)
961   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
962   */
963  @Deprecated
964  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
965    throws IOException, InterruptedException {
966    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
967      .numRegionServers(numRegionServers).build();
968    return startMiniHBaseCluster(option);
969  }
970
971  /**
972   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
973   * {@link #startMiniCluster()}. All other options will use default values, defined in
974   * {@link StartTestingClusterOption.Builder}.
975   * @param numMasters       Master node number.
976   * @param numRegionServers Number of region servers.
977   * @param rsPorts          Ports that RegionServer should use.
978   * @return The mini HBase cluster created.
979   * @see #shutdownMiniHBaseCluster()
980   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
981   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
982   * @see #startMiniHBaseCluster(StartTestingClusterOption)
983   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
984   */
985  @Deprecated
986  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
987    List<Integer> rsPorts) throws IOException, InterruptedException {
988    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
989      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
990    return startMiniHBaseCluster(option);
991  }
992
993  /**
994   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
995   * {@link #startMiniCluster()}. All other options will use default values, defined in
996   * {@link StartTestingClusterOption.Builder}.
997   * @param numMasters       Master node number.
998   * @param numRegionServers Number of region servers.
999   * @param rsPorts          Ports that RegionServer should use.
1000   * @param masterClass      The class to use as HMaster, or null for default.
1001   * @param rsClass          The class to use as HRegionServer, or null for default.
1002   * @param createRootDir    Whether to create a new root or data directory path.
1003   * @param createWALDir     Whether to create a new WAL directory.
1004   * @return The mini HBase cluster created.
1005   * @see #shutdownMiniHBaseCluster()
1006   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1007   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
1008   * @see #startMiniHBaseCluster(StartTestingClusterOption)
1009   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1010   */
1011  @Deprecated
1012  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1013    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1014    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
1015    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
1016    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
1017      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1018      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1019    return startMiniHBaseCluster(option);
1020  }
1021
1022  /**
1023   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1024   * want to keep dfs/zk up and just stop/start hbase.
1025   * @param servers number of region servers
1026   */
1027  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1028    this.restartHBaseCluster(servers, null);
1029  }
1030
1031  public void restartHBaseCluster(int servers, List<Integer> ports)
1032    throws IOException, InterruptedException {
1033    StartTestingClusterOption option =
1034      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1035    restartHBaseCluster(option);
1036    invalidateConnection();
1037  }
1038
1039  public void restartHBaseCluster(StartTestingClusterOption option)
1040    throws IOException, InterruptedException {
1041    closeConnection();
1042    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1043      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1044      option.getMasterClass(), option.getRsClass());
1045    // Don't leave here till we've done a successful scan of the hbase:meta
1046    Connection conn = ConnectionFactory.createConnection(this.conf);
1047    Table t = conn.getTable(TableName.META_TABLE_NAME);
1048    ResultScanner s = t.getScanner(new Scan());
1049    while (s.next() != null) {
1050      // do nothing
1051    }
1052    LOG.info("HBase has been restarted");
1053    s.close();
1054    t.close();
1055    conn.close();
1056  }
1057
1058  /**
1059   * Returns current mini hbase cluster. Only has something in it after a call to
1060   * {@link #startMiniCluster()}.
1061   * @see #startMiniCluster()
1062   */
1063  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1064    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1065      return (SingleProcessHBaseCluster) this.hbaseCluster;
1066    }
1067    throw new RuntimeException(
1068      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1069  }
1070
1071  /**
1072   * Stops mini hbase, zk, and hdfs clusters.
1073   * @see #startMiniCluster(int)
1074   */
1075  public void shutdownMiniCluster() throws IOException {
1076    LOG.info("Shutting down minicluster");
1077    shutdownMiniHBaseCluster();
1078    shutdownMiniDFSCluster();
1079    shutdownMiniZKCluster();
1080
1081    cleanupTestDir();
1082    miniClusterRunning = false;
1083    LOG.info("Minicluster is down");
1084  }
1085
1086  /**
1087   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1088   * @throws java.io.IOException in case command is unsuccessful
1089   */
1090  public void shutdownMiniHBaseCluster() throws IOException {
1091    cleanup();
1092    if (this.hbaseCluster != null) {
1093      this.hbaseCluster.shutdown();
1094      // Wait till hbase is down before going on to shutdown zk.
1095      this.hbaseCluster.waitUntilShutDown();
1096      this.hbaseCluster = null;
1097    }
1098    if (zooKeeperWatcher != null) {
1099      zooKeeperWatcher.close();
1100      zooKeeperWatcher = null;
1101    }
1102  }
1103
1104  /**
1105   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1106   * @throws java.io.IOException throws in case command is unsuccessful
1107   */
1108  public void killMiniHBaseCluster() throws IOException {
1109    cleanup();
1110    if (this.hbaseCluster != null) {
1111      getMiniHBaseCluster().killAll();
1112      this.hbaseCluster = null;
1113    }
1114    if (zooKeeperWatcher != null) {
1115      zooKeeperWatcher.close();
1116      zooKeeperWatcher = null;
1117    }
1118  }
1119
1120  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1121  private void cleanup() throws IOException {
1122    closeConnection();
1123    // unset the configuration for MIN and MAX RS to start
1124    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1125    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1126  }
1127
1128  /**
1129   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1130   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1131   * If false, previous path is used. Note: this does not cause the root dir to be created.
1132   * @return Fully qualified path for the default hbase root dir
1133   */
1134  public Path getDefaultRootDirPath(boolean create) throws IOException {
1135    if (!create) {
1136      return getDataTestDirOnTestFS();
1137    } else {
1138      return getNewDataTestDirOnTestFS();
1139    }
1140  }
1141
1142  /**
1143   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1144   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1145   * @return Fully qualified path for the default hbase root dir
1146   */
1147  public Path getDefaultRootDirPath() throws IOException {
1148    return getDefaultRootDirPath(false);
1149  }
1150
1151  /**
1152   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1153   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1154   * startup. You'd only use this method if you were doing manual operation.
1155   * @param create This flag decides whether to get a new root or data directory path or not, if it
1156   *               has been fetched already. Note : Directory will be made irrespective of whether
1157   *               path has been fetched or not. If directory already exists, it will be overwritten
1158   * @return Fully qualified path to hbase root dir
1159   */
1160  public Path createRootDir(boolean create) throws IOException {
1161    FileSystem fs = FileSystem.get(this.conf);
1162    Path hbaseRootdir = getDefaultRootDirPath(create);
1163    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1164    fs.mkdirs(hbaseRootdir);
1165    FSUtils.setVersion(fs, hbaseRootdir);
1166    return hbaseRootdir;
1167  }
1168
1169  /**
1170   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1171   * flag is false.
1172   * @return Fully qualified path to hbase root dir
1173   */
1174  public Path createRootDir() throws IOException {
1175    return createRootDir(false);
1176  }
1177
1178  /**
1179   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1180   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1181   * this method if you were doing manual operation.
1182   * @return Fully qualified path to hbase root dir
1183   */
1184  public Path createWALRootDir() throws IOException {
1185    FileSystem fs = FileSystem.get(this.conf);
1186    Path walDir = getNewDataTestDirOnTestFS();
1187    CommonFSUtils.setWALRootDir(this.conf, walDir);
1188    fs.mkdirs(walDir);
1189    return walDir;
1190  }
1191
1192  private void setHBaseFsTmpDir() throws IOException {
1193    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1194    if (hbaseFsTmpDirInString == null) {
1195      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1196      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1197    } else {
1198      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1199    }
1200  }
1201
1202  /**
1203   * Flushes all caches in the mini hbase cluster
1204   */
1205  public void flush() throws IOException {
1206    getMiniHBaseCluster().flushcache();
1207  }
1208
1209  /**
1210   * Flushes all caches in the mini hbase cluster
1211   */
1212  public void flush(TableName tableName) throws IOException {
1213    getMiniHBaseCluster().flushcache(tableName);
1214  }
1215
1216  /**
1217   * Compact all regions in the mini hbase cluster
1218   */
1219  public void compact(boolean major) throws IOException {
1220    getMiniHBaseCluster().compact(major);
1221  }
1222
1223  /**
1224   * Compact all of a table's reagion in the mini hbase cluster
1225   */
1226  public void compact(TableName tableName, boolean major) throws IOException {
1227    getMiniHBaseCluster().compact(tableName, major);
1228  }
1229
1230  /**
1231   * Create a table.
1232   * @return A Table instance for the created table.
1233   */
1234  public Table createTable(TableName tableName, String family) throws IOException {
1235    return createTable(tableName, new String[] { family });
1236  }
1237
1238  /**
1239   * Create a table.
1240   * @return A Table instance for the created table.
1241   */
1242  public Table createTable(TableName tableName, String[] families) throws IOException {
1243    List<byte[]> fams = new ArrayList<>(families.length);
1244    for (String family : families) {
1245      fams.add(Bytes.toBytes(family));
1246    }
1247    return createTable(tableName, fams.toArray(new byte[0][]));
1248  }
1249
1250  /**
1251   * Create a table.
1252   * @return A Table instance for the created table.
1253   */
1254  public Table createTable(TableName tableName, byte[] family) throws IOException {
1255    return createTable(tableName, new byte[][] { family });
1256  }
1257
1258  /**
1259   * Create a table with multiple regions.
1260   * @return A Table instance for the created table.
1261   */
1262  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1263    throws IOException {
1264    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1265    byte[] startKey = Bytes.toBytes("aaaaa");
1266    byte[] endKey = Bytes.toBytes("zzzzz");
1267    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1268
1269    return createTable(tableName, new byte[][] { family }, splitKeys);
1270  }
1271
1272  /**
1273   * Create a table.
1274   * @return A Table instance for the created table.
1275   */
1276  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1277    return createTable(tableName, families, (byte[][]) null);
1278  }
1279
1280  /**
1281   * Create a table with multiple regions.
1282   * @return A Table instance for the created table.
1283   */
1284  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1285    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1286  }
1287
1288  /**
1289   * Create a table with multiple regions.
1290   * @param replicaCount replica count.
1291   * @return A Table instance for the created table.
1292   */
1293  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1294    throws IOException {
1295    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1296  }
1297
1298  /**
1299   * Create a table.
1300   * @return A Table instance for the created table.
1301   */
1302  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1303    throws IOException {
1304    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1305  }
1306
1307  /**
1308   * Create a table.
1309   * @param tableName    the table name
1310   * @param families     the families
1311   * @param splitKeys    the splitkeys
1312   * @param replicaCount the region replica count
1313   * @return A Table instance for the created table.
1314   * @throws IOException throws IOException
1315   */
1316  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1317    int replicaCount) throws IOException {
1318    return createTable(tableName, families, splitKeys, replicaCount,
1319      new Configuration(getConfiguration()));
1320  }
1321
1322  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1323    byte[] endKey, int numRegions) throws IOException {
1324    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1325
1326    getAdmin().createTable(desc, startKey, endKey, numRegions);
1327    // HBaseAdmin only waits for regions to appear in hbase:meta we
1328    // should wait until they are assigned
1329    waitUntilAllRegionsAssigned(tableName);
1330    return getConnection().getTable(tableName);
1331  }
1332
1333  /**
1334   * Create a table.
1335   * @param c Configuration to use
1336   * @return A Table instance for the created table.
1337   */
1338  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1339    throws IOException {
1340    return createTable(htd, families, null, c);
1341  }
1342
1343  /**
1344   * Create a table.
1345   * @param htd       table descriptor
1346   * @param families  array of column families
1347   * @param splitKeys array of split keys
1348   * @param c         Configuration to use
1349   * @return A Table instance for the created table.
1350   * @throws IOException if getAdmin or createTable fails
1351   */
1352  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1353    Configuration c) throws IOException {
1354    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1355    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1356    // on is interfering.
1357    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1358  }
1359
1360  /**
1361   * Create a table.
1362   * @param htd       table descriptor
1363   * @param families  array of column families
1364   * @param splitKeys array of split keys
1365   * @param type      Bloom type
1366   * @param blockSize block size
1367   * @param c         Configuration to use
1368   * @return A Table instance for the created table.
1369   * @throws IOException if getAdmin or createTable fails
1370   */
1371
1372  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1373    BloomType type, int blockSize, Configuration c) throws IOException {
1374    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1375    for (byte[] family : families) {
1376      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1377        .setBloomFilterType(type).setBlocksize(blockSize);
1378      if (isNewVersionBehaviorEnabled()) {
1379        cfdb.setNewVersionBehavior(true);
1380      }
1381      builder.setColumnFamily(cfdb.build());
1382    }
1383    TableDescriptor td = builder.build();
1384    if (splitKeys != null) {
1385      getAdmin().createTable(td, splitKeys);
1386    } else {
1387      getAdmin().createTable(td);
1388    }
1389    // HBaseAdmin only waits for regions to appear in hbase:meta
1390    // we should wait until they are assigned
1391    waitUntilAllRegionsAssigned(td.getTableName());
1392    return getConnection().getTable(td.getTableName());
1393  }
1394
1395  /**
1396   * Create a table.
1397   * @param htd       table descriptor
1398   * @param splitRows array of split keys
1399   * @return A Table instance for the created table.
1400   */
1401  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1402    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1403    if (isNewVersionBehaviorEnabled()) {
1404      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1405        builder.setColumnFamily(
1406          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1407      }
1408    }
1409    if (splitRows != null) {
1410      getAdmin().createTable(builder.build(), splitRows);
1411    } else {
1412      getAdmin().createTable(builder.build());
1413    }
1414    // HBaseAdmin only waits for regions to appear in hbase:meta
1415    // we should wait until they are assigned
1416    waitUntilAllRegionsAssigned(htd.getTableName());
1417    return getConnection().getTable(htd.getTableName());
1418  }
1419
1420  /**
1421   * Create a table.
1422   * @param tableName    the table name
1423   * @param families     the families
1424   * @param splitKeys    the split keys
1425   * @param replicaCount the replica count
1426   * @param c            Configuration to use
1427   * @return A Table instance for the created table.
1428   */
1429  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1430    int replicaCount, final Configuration c) throws IOException {
1431    TableDescriptor htd =
1432      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1433    return createTable(htd, families, splitKeys, c);
1434  }
1435
1436  /**
1437   * Create a table.
1438   * @return A Table instance for the created table.
1439   */
1440  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1441    return createTable(tableName, new byte[][] { family }, numVersions);
1442  }
1443
1444  /**
1445   * Create a table.
1446   * @return A Table instance for the created table.
1447   */
1448  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1449    throws IOException {
1450    return createTable(tableName, families, numVersions, (byte[][]) null);
1451  }
1452
1453  /**
1454   * Create a table.
1455   * @return A Table instance for the created table.
1456   */
1457  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1458    byte[][] splitKeys) throws IOException {
1459    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1460    for (byte[] family : families) {
1461      ColumnFamilyDescriptorBuilder cfBuilder =
1462        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1463      if (isNewVersionBehaviorEnabled()) {
1464        cfBuilder.setNewVersionBehavior(true);
1465      }
1466      builder.setColumnFamily(cfBuilder.build());
1467    }
1468    if (splitKeys != null) {
1469      getAdmin().createTable(builder.build(), splitKeys);
1470    } else {
1471      getAdmin().createTable(builder.build());
1472    }
1473    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1474    // assigned
1475    waitUntilAllRegionsAssigned(tableName);
1476    return getConnection().getTable(tableName);
1477  }
1478
1479  /**
1480   * Create a table with multiple regions.
1481   * @return A Table instance for the created table.
1482   */
1483  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1484    throws IOException {
1485    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1486  }
1487
1488  /**
1489   * Create a table.
1490   * @return A Table instance for the created table.
1491   */
1492  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1493    throws IOException {
1494    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1495    for (byte[] family : families) {
1496      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1497        .setMaxVersions(numVersions).setBlocksize(blockSize);
1498      if (isNewVersionBehaviorEnabled()) {
1499        cfBuilder.setNewVersionBehavior(true);
1500      }
1501      builder.setColumnFamily(cfBuilder.build());
1502    }
1503    getAdmin().createTable(builder.build());
1504    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1505    // assigned
1506    waitUntilAllRegionsAssigned(tableName);
1507    return getConnection().getTable(tableName);
1508  }
1509
1510  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1511    String cpName) throws IOException {
1512    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1513    for (byte[] family : families) {
1514      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1515        .setMaxVersions(numVersions).setBlocksize(blockSize);
1516      if (isNewVersionBehaviorEnabled()) {
1517        cfBuilder.setNewVersionBehavior(true);
1518      }
1519      builder.setColumnFamily(cfBuilder.build());
1520    }
1521    if (cpName != null) {
1522      builder.setCoprocessor(cpName);
1523    }
1524    getAdmin().createTable(builder.build());
1525    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1526    // assigned
1527    waitUntilAllRegionsAssigned(tableName);
1528    return getConnection().getTable(tableName);
1529  }
1530
1531  /**
1532   * Create a table.
1533   * @return A Table instance for the created table.
1534   */
1535  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1536    throws IOException {
1537    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1538    int i = 0;
1539    for (byte[] family : families) {
1540      ColumnFamilyDescriptorBuilder cfBuilder =
1541        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1542      if (isNewVersionBehaviorEnabled()) {
1543        cfBuilder.setNewVersionBehavior(true);
1544      }
1545      builder.setColumnFamily(cfBuilder.build());
1546      i++;
1547    }
1548    getAdmin().createTable(builder.build());
1549    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1550    // assigned
1551    waitUntilAllRegionsAssigned(tableName);
1552    return getConnection().getTable(tableName);
1553  }
1554
1555  /**
1556   * Create a table.
1557   * @return A Table instance for the created table.
1558   */
1559  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1560    throws IOException {
1561    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1562    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1563    if (isNewVersionBehaviorEnabled()) {
1564      cfBuilder.setNewVersionBehavior(true);
1565    }
1566    builder.setColumnFamily(cfBuilder.build());
1567    getAdmin().createTable(builder.build(), splitRows);
1568    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1569    // assigned
1570    waitUntilAllRegionsAssigned(tableName);
1571    return getConnection().getTable(tableName);
1572  }
1573
1574  /**
1575   * Create a table with multiple regions.
1576   * @return A Table instance for the created table.
1577   */
1578  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1579    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1580  }
1581
1582  /**
1583   * Set the number of Region replicas.
1584   */
1585  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1586    throws IOException, InterruptedException {
1587    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1588      .setRegionReplication(replicaCount).build();
1589    admin.modifyTable(desc);
1590  }
1591
1592  /**
1593   * Set the number of Region replicas.
1594   */
1595  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1596    throws ExecutionException, IOException, InterruptedException {
1597    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1598      .setRegionReplication(replicaCount).build();
1599    admin.modifyTable(desc).get();
1600  }
1601
1602  /**
1603   * Drop an existing table
1604   * @param tableName existing table
1605   */
1606  public void deleteTable(TableName tableName) throws IOException {
1607    try {
1608      getAdmin().disableTable(tableName);
1609    } catch (TableNotEnabledException e) {
1610      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1611    }
1612    getAdmin().deleteTable(tableName);
1613  }
1614
1615  /**
1616   * Drop an existing table
1617   * @param tableName existing table
1618   */
1619  public void deleteTableIfAny(TableName tableName) throws IOException {
1620    try {
1621      deleteTable(tableName);
1622    } catch (TableNotFoundException e) {
1623      // ignore
1624    }
1625  }
1626
1627  // ==========================================================================
1628  // Canned table and table descriptor creation
1629
1630  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1631  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1632  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1633  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1634  private static final int MAXVERSIONS = 3;
1635
1636  public static final char FIRST_CHAR = 'a';
1637  public static final char LAST_CHAR = 'z';
1638  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1639  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1640
1641  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1642    return createModifyableTableDescriptor(TableName.valueOf(name),
1643      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1644      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1645  }
1646
1647  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1648    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1649    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1650    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1651      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1652        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1653        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1654      if (isNewVersionBehaviorEnabled()) {
1655        cfBuilder.setNewVersionBehavior(true);
1656      }
1657      builder.setColumnFamily(cfBuilder.build());
1658    }
1659    return builder.build();
1660  }
1661
1662  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1663    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1664    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1665    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1666      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1667        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1668        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1669      if (isNewVersionBehaviorEnabled()) {
1670        cfBuilder.setNewVersionBehavior(true);
1671      }
1672      builder.setColumnFamily(cfBuilder.build());
1673    }
1674    return builder;
1675  }
1676
1677  /**
1678   * Create a table of name <code>name</code>.
1679   * @param name Name to give table.
1680   * @return Column descriptor.
1681   */
1682  public TableDescriptor createTableDescriptor(final TableName name) {
1683    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1684      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1685  }
1686
1687  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1688    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1689  }
1690
1691  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1692    int maxVersions) {
1693    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1694    for (byte[] family : families) {
1695      ColumnFamilyDescriptorBuilder cfBuilder =
1696        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1697      if (isNewVersionBehaviorEnabled()) {
1698        cfBuilder.setNewVersionBehavior(true);
1699      }
1700      builder.setColumnFamily(cfBuilder.build());
1701    }
1702    return builder.build();
1703  }
1704
1705  /**
1706   * Create an HRegion that writes to the local tmp dirs
1707   * @param desc     a table descriptor indicating which table the region belongs to
1708   * @param startKey the start boundary of the region
1709   * @param endKey   the end boundary of the region
1710   * @return a region that writes to local dir for testing
1711   */
1712  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1713    throws IOException {
1714    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1715      .setEndKey(endKey).build();
1716    return createLocalHRegion(hri, desc);
1717  }
1718
1719  /**
1720   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1721   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1722   */
1723  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1724    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1725  }
1726
1727  /**
1728   * Create an HRegion that writes to the local tmp dirs with specified wal
1729   * @param info regioninfo
1730   * @param conf configuration
1731   * @param desc table descriptor
1732   * @param wal  wal for this region.
1733   * @return created hregion
1734   */
1735  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1736    WAL wal) throws IOException {
1737    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1738      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1739    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1740  }
1741
1742  /**
1743   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1744   *         when done.
1745   */
1746  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1747    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1748    throws IOException {
1749    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1750      durability, wal, null, families);
1751  }
1752
1753  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1754    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1755    boolean[] compactedMemStore, byte[]... families) throws IOException {
1756    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1757    builder.setReadOnly(isReadOnly);
1758    int i = 0;
1759    for (byte[] family : families) {
1760      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1761      if (compactedMemStore != null && i < compactedMemStore.length) {
1762        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1763      } else {
1764        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1765
1766      }
1767      i++;
1768      // Set default to be three versions.
1769      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1770      builder.setColumnFamily(cfBuilder.build());
1771    }
1772    builder.setDurability(durability);
1773    RegionInfo info =
1774      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1775    return createLocalHRegion(info, conf, builder.build(), wal);
1776  }
1777
1778  //
1779  // ==========================================================================
1780
1781  /**
1782   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1783   * read.
1784   * @param tableName existing table
1785   * @return HTable to that new table
1786   */
1787  public Table deleteTableData(TableName tableName) throws IOException {
1788    Table table = getConnection().getTable(tableName);
1789    Scan scan = new Scan();
1790    ResultScanner resScan = table.getScanner(scan);
1791    for (Result res : resScan) {
1792      Delete del = new Delete(res.getRow());
1793      table.delete(del);
1794    }
1795    resScan = table.getScanner(scan);
1796    resScan.close();
1797    return table;
1798  }
1799
1800  /**
1801   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1802   * table.
1803   * @param tableName       table which must exist.
1804   * @param preserveRegions keep the existing split points
1805   * @return HTable for the new table
1806   */
1807  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1808    throws IOException {
1809    Admin admin = getAdmin();
1810    if (!admin.isTableDisabled(tableName)) {
1811      admin.disableTable(tableName);
1812    }
1813    admin.truncateTable(tableName, preserveRegions);
1814    return getConnection().getTable(tableName);
1815  }
1816
1817  /**
1818   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1819   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1820   * preserve regions of existing table.
1821   * @param tableName table which must exist.
1822   * @return HTable for the new table
1823   */
1824  public Table truncateTable(final TableName tableName) throws IOException {
1825    return truncateTable(tableName, false);
1826  }
1827
1828  /**
1829   * Load table with rows from 'aaa' to 'zzz'.
1830   * @param t Table
1831   * @param f Family
1832   * @return Count of rows loaded.
1833   */
1834  public int loadTable(final Table t, final byte[] f) throws IOException {
1835    return loadTable(t, new byte[][] { f });
1836  }
1837
1838  /**
1839   * Load table with rows from 'aaa' to 'zzz'.
1840   * @param t Table
1841   * @param f Family
1842   * @return Count of rows loaded.
1843   */
1844  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1845    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1846  }
1847
1848  /**
1849   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1850   * @param t Table
1851   * @param f Array of Families to load
1852   * @return Count of rows loaded.
1853   */
1854  public int loadTable(final Table t, final byte[][] f) throws IOException {
1855    return loadTable(t, f, null);
1856  }
1857
1858  /**
1859   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1860   * @param t     Table
1861   * @param f     Array of Families to load
1862   * @param value the values of the cells. If null is passed, the row key is used as value
1863   * @return Count of rows loaded.
1864   */
1865  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1866    return loadTable(t, f, value, true);
1867  }
1868
1869  /**
1870   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1871   * @param t     Table
1872   * @param f     Array of Families to load
1873   * @param value the values of the cells. If null is passed, the row key is used as value
1874   * @return Count of rows loaded.
1875   */
1876  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1877    throws IOException {
1878    List<Put> puts = new ArrayList<>();
1879    for (byte[] row : HBaseTestingUtil.ROWS) {
1880      Put put = new Put(row);
1881      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1882      for (int i = 0; i < f.length; i++) {
1883        byte[] value1 = value != null ? value : row;
1884        put.addColumn(f[i], f[i], value1);
1885      }
1886      puts.add(put);
1887    }
1888    t.put(puts);
1889    return puts.size();
1890  }
1891
1892  /**
1893   * A tracker for tracking and validating table rows generated with
1894   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1895   */
1896  public static class SeenRowTracker {
1897    int dim = 'z' - 'a' + 1;
1898    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1899    byte[] startRow;
1900    byte[] stopRow;
1901
1902    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1903      this.startRow = startRow;
1904      this.stopRow = stopRow;
1905    }
1906
1907    void reset() {
1908      for (byte[] row : ROWS) {
1909        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1910      }
1911    }
1912
1913    int i(byte b) {
1914      return b - 'a';
1915    }
1916
1917    public void addRow(byte[] row) {
1918      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1919    }
1920
1921    /**
1922     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1923     * rows none
1924     */
1925    public void validate() {
1926      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1927        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1928          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1929            int count = seenRows[i(b1)][i(b2)][i(b3)];
1930            int expectedCount = 0;
1931            if (
1932              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1933                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1934            ) {
1935              expectedCount = 1;
1936            }
1937            if (count != expectedCount) {
1938              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1939              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1940                + "instead of " + expectedCount);
1941            }
1942          }
1943        }
1944      }
1945    }
1946  }
1947
1948  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1949    return loadRegion(r, f, false);
1950  }
1951
1952  public int loadRegion(final Region r, final byte[] f) throws IOException {
1953    return loadRegion((HRegion) r, f);
1954  }
1955
1956  /**
1957   * Load region with rows from 'aaa' to 'zzz'.
1958   * @param r     Region
1959   * @param f     Family
1960   * @param flush flush the cache if true
1961   * @return Count of rows loaded.
1962   */
1963  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1964    byte[] k = new byte[3];
1965    int rowCount = 0;
1966    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1967      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1968        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1969          k[0] = b1;
1970          k[1] = b2;
1971          k[2] = b3;
1972          Put put = new Put(k);
1973          put.setDurability(Durability.SKIP_WAL);
1974          put.addColumn(f, null, k);
1975          if (r.getWAL() == null) {
1976            put.setDurability(Durability.SKIP_WAL);
1977          }
1978          int preRowCount = rowCount;
1979          int pause = 10;
1980          int maxPause = 1000;
1981          while (rowCount == preRowCount) {
1982            try {
1983              r.put(put);
1984              rowCount++;
1985            } catch (RegionTooBusyException e) {
1986              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1987              Threads.sleep(pause);
1988            }
1989          }
1990        }
1991      }
1992      if (flush) {
1993        r.flush(true);
1994      }
1995    }
1996    return rowCount;
1997  }
1998
1999  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2000    throws IOException {
2001    for (int i = startRow; i < endRow; i++) {
2002      byte[] data = Bytes.toBytes(String.valueOf(i));
2003      Put put = new Put(data);
2004      put.addColumn(f, null, data);
2005      t.put(put);
2006    }
2007  }
2008
2009  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2010    throws IOException {
2011    for (int i = 0; i < totalRows; i++) {
2012      byte[] row = new byte[rowSize];
2013      Bytes.random(row);
2014      Put put = new Put(row);
2015      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2016      t.put(put);
2017    }
2018  }
2019
2020  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2021    int replicaId) throws IOException {
2022    for (int i = startRow; i < endRow; i++) {
2023      String failMsg = "Failed verification of row :" + i;
2024      byte[] data = Bytes.toBytes(String.valueOf(i));
2025      Get get = new Get(data);
2026      get.setReplicaId(replicaId);
2027      get.setConsistency(Consistency.TIMELINE);
2028      Result result = table.get(get);
2029      assertTrue(result.containsColumn(f, null), failMsg);
2030      assertEquals(1, result.getColumnCells(f, null).size(), failMsg);
2031      Cell cell = result.getColumnLatestCell(f, null);
2032      assertTrue(Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2033        cell.getValueLength()), failMsg);
2034    }
2035  }
2036
2037  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2038    throws IOException {
2039    verifyNumericRows((HRegion) region, f, startRow, endRow);
2040  }
2041
2042  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2043    throws IOException {
2044    verifyNumericRows(region, f, startRow, endRow, true);
2045  }
2046
2047  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2048    final boolean present) throws IOException {
2049    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2050  }
2051
2052  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2053    final boolean present) throws IOException {
2054    for (int i = startRow; i < endRow; i++) {
2055      String failMsg = "Failed verification of row :" + i;
2056      byte[] data = Bytes.toBytes(String.valueOf(i));
2057      Result result = region.get(new Get(data));
2058
2059      boolean hasResult = result != null && !result.isEmpty();
2060      assertEquals(present, hasResult, failMsg + result);
2061      if (!present) continue;
2062
2063      assertTrue(result.containsColumn(f, null), failMsg);
2064      assertEquals(1, result.getColumnCells(f, null).size(), failMsg);
2065      Cell cell = result.getColumnLatestCell(f, null);
2066      assertTrue(Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2067        cell.getValueLength()), failMsg);
2068    }
2069  }
2070
2071  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2072    throws IOException {
2073    for (int i = startRow; i < endRow; i++) {
2074      byte[] data = Bytes.toBytes(String.valueOf(i));
2075      Delete delete = new Delete(data);
2076      delete.addFamily(f);
2077      t.delete(delete);
2078    }
2079  }
2080
2081  /**
2082   * Return the number of rows in the given table.
2083   * @param table to count rows
2084   * @return count of rows
2085   */
2086  public static int countRows(final Table table) throws IOException {
2087    return countRows(table, new Scan());
2088  }
2089
2090  public static int countRows(final Table table, final Scan scan) throws IOException {
2091    try (ResultScanner results = table.getScanner(scan)) {
2092      int count = 0;
2093      while (results.next() != null) {
2094        count++;
2095      }
2096      return count;
2097    }
2098  }
2099
2100  public static int countRows(final Table table, final byte[]... families) throws IOException {
2101    Scan scan = new Scan();
2102    for (byte[] family : families) {
2103      scan.addFamily(family);
2104    }
2105    return countRows(table, scan);
2106  }
2107
2108  /**
2109   * Return the number of rows in the given table.
2110   */
2111  public int countRows(final TableName tableName) throws IOException {
2112    try (Table table = getConnection().getTable(tableName)) {
2113      return countRows(table);
2114    }
2115  }
2116
2117  public static int countRows(final Region region) throws IOException {
2118    return countRows(region, new Scan());
2119  }
2120
2121  public static int countRows(final Region region, final Scan scan) throws IOException {
2122    try (InternalScanner scanner = region.getScanner(scan)) {
2123      return countRows(scanner);
2124    }
2125  }
2126
2127  public static int countRows(final InternalScanner scanner) throws IOException {
2128    int scannedCount = 0;
2129    List<Cell> results = new ArrayList<>();
2130    boolean hasMore = true;
2131    while (hasMore) {
2132      hasMore = scanner.next(results);
2133      scannedCount += results.size();
2134      results.clear();
2135    }
2136    return scannedCount;
2137  }
2138
2139  /**
2140   * Return an md5 digest of the entire contents of a table.
2141   */
2142  public String checksumRows(final Table table) throws Exception {
2143    MessageDigest digest = MessageDigest.getInstance("MD5");
2144    try (ResultScanner results = table.getScanner(new Scan())) {
2145      for (Result res : results) {
2146        digest.update(res.getRow());
2147      }
2148    }
2149    return digest.toString();
2150  }
2151
2152  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2153  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2154  static {
2155    int i = 0;
2156    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2157      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2158        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2159          ROWS[i][0] = b1;
2160          ROWS[i][1] = b2;
2161          ROWS[i][2] = b3;
2162          i++;
2163        }
2164      }
2165    }
2166  }
2167
2168  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2169    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2170    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2171    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2172    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2173    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2174    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2175
2176  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2177    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2178    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2179    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2180    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2181    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2182    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2183
2184  /**
2185   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2186   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2187   * @return list of region info for regions added to meta
2188   */
2189  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2190    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2191    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2192      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2193      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2194      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2195        TableState.State.ENABLED);
2196      // add custom ones
2197      for (int i = 0; i < startKeys.length; i++) {
2198        int j = (i + 1) % startKeys.length;
2199        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2200          .setEndKey(startKeys[j]).build();
2201        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2202        newRegions.add(hri);
2203      }
2204      return newRegions;
2205    }
2206  }
2207
2208  /**
2209   * Create an unmanaged WAL. Be sure to close it when you're through.
2210   */
2211  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2212    throws IOException {
2213    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2214    // unless I pass along via the conf.
2215    Configuration confForWAL = new Configuration(conf);
2216    CommonFSUtils.setRootDir(confForWAL, rootDir);
2217    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8))
2218      .getWAL(hri);
2219  }
2220
2221  /**
2222   * Create a region with it's own WAL. Be sure to call
2223   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2224   */
2225  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2226    final Configuration conf, final TableDescriptor htd) throws IOException {
2227    return createRegionAndWAL(info, rootDir, conf, htd, true);
2228  }
2229
2230  /**
2231   * Create a region with it's own WAL. Be sure to call
2232   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2233   */
2234  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2235    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2236    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2237    region.setBlockCache(blockCache);
2238    region.initialize();
2239    return region;
2240  }
2241
2242  /**
2243   * Create a region with it's own WAL. Be sure to call
2244   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2245   */
2246  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2247    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2248    throws IOException {
2249    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2250    region.setMobFileCache(mobFileCache);
2251    region.initialize();
2252    return region;
2253  }
2254
2255  /**
2256   * Create a region with it's own WAL. Be sure to call
2257   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2258   */
2259  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2260    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2261    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2262      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2263    WAL wal = createWal(conf, rootDir, info);
2264    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2265  }
2266
2267  /**
2268   * Find any other region server which is different from the one identified by parameter
2269   * @return another region server
2270   */
2271  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2272    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2273      if (!(rst.getRegionServer() == rs)) {
2274        return rst.getRegionServer();
2275      }
2276    }
2277    return null;
2278  }
2279
2280  /**
2281   * Tool to get the reference to the region server object that holds the region of the specified
2282   * user table.
2283   * @param tableName user table to lookup in hbase:meta
2284   * @return region server that holds it, null if the row doesn't exist
2285   */
2286  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2287    throws IOException, InterruptedException {
2288    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2289    if (regions == null || regions.isEmpty()) {
2290      return null;
2291    }
2292    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2293
2294    byte[] firstRegionName =
2295      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2296        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2297
2298    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2299    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2300      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2301    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2302      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2303    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2304    while (retrier.shouldRetry()) {
2305      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2306      if (index != -1) {
2307        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2308      }
2309      // Came back -1. Region may not be online yet. Sleep a while.
2310      retrier.sleepUntilNextRetry();
2311    }
2312    return null;
2313  }
2314
2315  /**
2316   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2317   * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple
2318   * MiniMRCluster instances with different log dirs.
2319   * @throws IOException When starting the cluster fails.
2320   */
2321  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2322    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2323    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2324      "99.0");
2325    startMiniMapReduceCluster(2);
2326    return mrCluster;
2327  }
2328
2329  /**
2330   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2331   * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start
2332   * multiple MiniMRCluster instances with different log dirs.
2333   * @param servers The number of <code>TaskTracker</code>'s to start.
2334   * @throws IOException When starting the cluster fails.
2335   */
2336  private void startMiniMapReduceCluster(final int servers) throws IOException {
2337    if (mrCluster != null) {
2338      throw new IllegalStateException("MiniMRCluster is already running");
2339    }
2340    LOG.info("Starting mini mapreduce cluster...");
2341    setupClusterTestDir();
2342    createDirsAndSetProperties();
2343
2344    //// hadoop2 specific settings
2345    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2346    // we up the VM usable so that processes don't get killed.
2347    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2348
2349    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2350    // this avoids the problem by disabling speculative task execution in tests.
2351    conf.setBoolean("mapreduce.map.speculative", false);
2352    conf.setBoolean("mapreduce.reduce.speculative", false);
2353    ////
2354
2355    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2356    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2357    if (JVM.getJVMSpecVersion() >= 17) {
2358      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2359      conf.set("yarn.app.mapreduce.am.command-opts",
2360        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2361    }
2362
2363    // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem
2364    // instance and cause trouble in HBase. See HBASE-29802 for more details.
2365    JobConf mrClusterConf = new JobConf(conf);
2366    mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true);
2367    // Allow the user to override FS URI for this map-reduce cluster to use.
2368    mrCluster =
2369      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2370        1, null, null, mrClusterConf);
2371    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2372    if (jobConf == null) {
2373      jobConf = mrCluster.createJobConf();
2374    }
2375
2376    // Hadoop MiniMR overwrites this while it should not
2377    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2378    LOG.info("Mini mapreduce cluster started");
2379
2380    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2381    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2382    // necessary config properties here. YARN-129 required adding a few properties.
2383    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2384    // this for mrv2 support; mr1 ignores this
2385    conf.set("mapreduce.framework.name", "yarn");
2386    conf.setBoolean("yarn.is.minicluster", true);
2387    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2388    if (rmAddress != null) {
2389      conf.set("yarn.resourcemanager.address", rmAddress);
2390    }
2391    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2392    if (historyAddress != null) {
2393      conf.set("mapreduce.jobhistory.address", historyAddress);
2394    }
2395    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2396    if (schedulerAddress != null) {
2397      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2398    }
2399    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2400    if (mrJobHistoryWebappAddress != null) {
2401      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2402    }
2403    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2404    if (yarnRMWebappAddress != null) {
2405      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2406    }
2407  }
2408
2409  /**
2410   * Stops the previously started <code>MiniMRCluster</code>.
2411   */
2412  public void shutdownMiniMapReduceCluster() {
2413    if (mrCluster != null) {
2414      LOG.info("Stopping mini mapreduce cluster...");
2415      mrCluster.shutdown();
2416      mrCluster = null;
2417      LOG.info("Mini mapreduce cluster stopped");
2418    }
2419    // Restore configuration to point to local jobtracker
2420    conf.set("mapreduce.jobtracker.address", "local");
2421  }
2422
2423  /**
2424   * Create a stubbed out RegionServerService, mainly for getting FS.
2425   */
2426  public RegionServerServices createMockRegionServerService() throws IOException {
2427    return createMockRegionServerService((ServerName) null);
2428  }
2429
2430  /**
2431   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2432   * TestTokenAuthentication
2433   */
2434  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2435    throws IOException {
2436    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2437    rss.setFileSystem(getTestFileSystem());
2438    rss.setRpcServer(rpc);
2439    return rss;
2440  }
2441
2442  /**
2443   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2444   * TestOpenRegionHandler
2445   */
2446  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2447    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2448    rss.setFileSystem(getTestFileSystem());
2449    return rss;
2450  }
2451
2452  /**
2453   * Expire the Master's session
2454   */
2455  public void expireMasterSession() throws Exception {
2456    HMaster master = getMiniHBaseCluster().getMaster();
2457    expireSession(master.getZooKeeper(), false);
2458  }
2459
2460  /**
2461   * Expire a region server's session
2462   * @param index which RS
2463   */
2464  public void expireRegionServerSession(int index) throws Exception {
2465    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2466    expireSession(rs.getZooKeeper(), false);
2467    decrementMinRegionServerCount();
2468  }
2469
2470  private void decrementMinRegionServerCount() {
2471    // decrement the count for this.conf, for newly spwaned master
2472    // this.hbaseCluster shares this configuration too
2473    decrementMinRegionServerCount(getConfiguration());
2474
2475    // each master thread keeps a copy of configuration
2476    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2477      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2478    }
2479  }
2480
2481  private void decrementMinRegionServerCount(Configuration conf) {
2482    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2483    if (currentCount != -1) {
2484      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2485    }
2486  }
2487
2488  public void expireSession(ZKWatcher nodeZK) throws Exception {
2489    expireSession(nodeZK, false);
2490  }
2491
2492  /**
2493   * Expire a ZooKeeper session as recommended in ZooKeeper documentation <a href=
2494   * "https://hbase.apache.org/docs/troubleshooting#troubleshooting-zookeeper">Troubleshooting
2495   * ZooKeeper</a>
2496   * <p/>
2497   * There are issues when doing this:
2498   * <ol>
2499   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2500   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2501   * </ol>
2502   * @param nodeZK      - the ZK watcher to expire
2503   * @param checkStatus - true to check if we can create a Table with the current configuration.
2504   */
2505  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2506    Configuration c = new Configuration(this.conf);
2507    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2508    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2509    byte[] password = zk.getSessionPasswd();
2510    long sessionID = zk.getSessionId();
2511
2512    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2513    // so we create a first watcher to be sure that the
2514    // event was sent. We expect that if our watcher receives the event
2515    // other watchers on the same machine will get is as well.
2516    // When we ask to close the connection, ZK does not close it before
2517    // we receive all the events, so don't have to capture the event, just
2518    // closing the connection should be enough.
2519    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2520      @Override
2521      public void process(WatchedEvent watchedEvent) {
2522        LOG.info("Monitor ZKW received event=" + watchedEvent);
2523      }
2524    }, sessionID, password);
2525
2526    // Making it expire
2527    ZooKeeper newZK =
2528      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2529
2530    // ensure that we have connection to the server before closing down, otherwise
2531    // the close session event will be eaten out before we start CONNECTING state
2532    long start = EnvironmentEdgeManager.currentTime();
2533    while (
2534      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2535    ) {
2536      Thread.sleep(1);
2537    }
2538    newZK.close();
2539    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2540
2541    // Now closing & waiting to be sure that the clients get it.
2542    monitor.close();
2543
2544    if (checkStatus) {
2545      getConnection().getTable(TableName.META_TABLE_NAME).close();
2546    }
2547  }
2548
2549  /**
2550   * Get the Mini HBase cluster.
2551   * @return hbase cluster
2552   * @see #getHBaseClusterInterface()
2553   */
2554  public SingleProcessHBaseCluster getHBaseCluster() {
2555    return getMiniHBaseCluster();
2556  }
2557
2558  /**
2559   * Returns the HBaseCluster instance.
2560   * <p>
2561   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2562   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2563   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2564   * instead w/o the need to type-cast.
2565   */
2566  public HBaseClusterInterface getHBaseClusterInterface() {
2567    // implementation note: we should rename this method as #getHBaseCluster(),
2568    // but this would require refactoring 90+ calls.
2569    return hbaseCluster;
2570  }
2571
2572  /**
2573   * Resets the connections so that the next time getConnection() is called, a new connection is
2574   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2575   * the connection is not valid anymore.
2576   * <p/>
2577   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2578   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2579   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2580   * the connection state automatically. Cleaning this is a much bigger refactor.
2581   */
2582  public void invalidateConnection() throws IOException {
2583    closeConnection();
2584    // Update the master addresses if they changed.
2585    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2586    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2587    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2588      masterConfigBefore, masterConfAfter);
2589    conf.set(HConstants.MASTER_ADDRS_KEY,
2590      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2591  }
2592
2593  /**
2594   * Get a shared Connection to the cluster. this method is thread safe.
2595   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2596   */
2597  public Connection getConnection() throws IOException {
2598    return getAsyncConnection().toConnection();
2599  }
2600
2601  /**
2602   * Get a assigned Connection to the cluster. this method is thread safe.
2603   * @param user assigned user
2604   * @return A Connection with assigned user.
2605   */
2606  public Connection getConnection(User user) throws IOException {
2607    return getAsyncConnection(user).toConnection();
2608  }
2609
2610  /**
2611   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2612   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2613   *         of cluster.
2614   */
2615  public AsyncClusterConnection getAsyncConnection() throws IOException {
2616    try {
2617      return asyncConnection.updateAndGet(connection -> {
2618        if (connection == null) {
2619          try {
2620            User user = UserProvider.instantiate(conf).getCurrent();
2621            connection = getAsyncConnection(user);
2622          } catch (IOException ioe) {
2623            throw new UncheckedIOException("Failed to create connection", ioe);
2624          }
2625        }
2626        return connection;
2627      });
2628    } catch (UncheckedIOException exception) {
2629      throw exception.getCause();
2630    }
2631  }
2632
2633  /**
2634   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2635   * @param user assigned user
2636   * @return An AsyncClusterConnection with assigned user.
2637   */
2638  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2639    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2640  }
2641
2642  public void closeConnection() throws IOException {
2643    if (hbaseAdmin != null) {
2644      Closeables.close(hbaseAdmin, true);
2645      hbaseAdmin = null;
2646    }
2647    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2648    if (asyncConnection != null) {
2649      Closeables.close(asyncConnection, true);
2650    }
2651  }
2652
2653  /**
2654   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2655   * it has no effect, it will be closed automatically when the cluster shutdowns
2656   */
2657  public Admin getAdmin() throws IOException {
2658    if (hbaseAdmin == null) {
2659      this.hbaseAdmin = getConnection().getAdmin();
2660    }
2661    return hbaseAdmin;
2662  }
2663
2664  private Admin hbaseAdmin = null;
2665
2666  /**
2667   * Returns an {@link Hbck} instance. Needs be closed when done.
2668   */
2669  public Hbck getHbck() throws IOException {
2670    return getConnection().getHbck();
2671  }
2672
2673  /**
2674   * Unassign the named region.
2675   * @param regionName The region to unassign.
2676   */
2677  public void unassignRegion(String regionName) throws IOException {
2678    unassignRegion(Bytes.toBytes(regionName));
2679  }
2680
2681  /**
2682   * Unassign the named region.
2683   * @param regionName The region to unassign.
2684   */
2685  public void unassignRegion(byte[] regionName) throws IOException {
2686    getAdmin().unassign(regionName);
2687  }
2688
2689  /**
2690   * Closes the region containing the given row.
2691   * @param row   The row to find the containing region.
2692   * @param table The table to find the region.
2693   */
2694  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2695    unassignRegionByRow(Bytes.toBytes(row), table);
2696  }
2697
2698  /**
2699   * Closes the region containing the given row.
2700   * @param row   The row to find the containing region.
2701   * @param table The table to find the region.
2702   */
2703  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2704    HRegionLocation hrl = table.getRegionLocation(row);
2705    unassignRegion(hrl.getRegion().getRegionName());
2706  }
2707
2708  /**
2709   * Retrieves a splittable region randomly from tableName
2710   * @param tableName   name of table
2711   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2712   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2713   */
2714  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2715    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2716    int regCount = regions.size();
2717    Set<Integer> attempted = new HashSet<>();
2718    int idx;
2719    int attempts = 0;
2720    do {
2721      regions = getHBaseCluster().getRegions(tableName);
2722      if (regCount != regions.size()) {
2723        // if there was region movement, clear attempted Set
2724        attempted.clear();
2725      }
2726      regCount = regions.size();
2727      // There are chances that before we get the region for the table from an RS the region may
2728      // be going for CLOSE. This may be because online schema change is enabled
2729      if (regCount > 0) {
2730        idx = ThreadLocalRandom.current().nextInt(regCount);
2731        // if we have just tried this region, there is no need to try again
2732        if (attempted.contains(idx)) {
2733          continue;
2734        }
2735        HRegion region = regions.get(idx);
2736        if (region.checkSplit().isPresent()) {
2737          return region;
2738        }
2739        attempted.add(idx);
2740      }
2741      attempts++;
2742    } while (maxAttempts == -1 || attempts < maxAttempts);
2743    return null;
2744  }
2745
2746  public MiniDFSCluster getDFSCluster() {
2747    return dfsCluster;
2748  }
2749
2750  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2751    setDFSCluster(cluster, true);
2752  }
2753
2754  /**
2755   * Set the MiniDFSCluster
2756   * @param cluster     cluster to use
2757   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2758   *                    is set.
2759   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2760   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2761   */
2762  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2763    throws IllegalStateException, IOException {
2764    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2765      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2766    }
2767    this.dfsCluster = cluster;
2768    this.setFs();
2769  }
2770
2771  public FileSystem getTestFileSystem() throws IOException {
2772    return HFileSystem.get(conf);
2773  }
2774
2775  /**
2776   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2777   * (30 seconds).
2778   * @param table Table to wait on.
2779   */
2780  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2781    waitTableAvailable(table.getName(), 30000);
2782  }
2783
2784  public void waitTableAvailable(TableName table, long timeoutMillis)
2785    throws InterruptedException, IOException {
2786    waitFor(timeoutMillis, predicateTableAvailable(table));
2787  }
2788
2789  /**
2790   * Wait until all regions in a table have been assigned
2791   * @param table         Table to wait on.
2792   * @param timeoutMillis Timeout.
2793   */
2794  public void waitTableAvailable(byte[] table, long timeoutMillis)
2795    throws InterruptedException, IOException {
2796    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2797  }
2798
2799  public String explainTableAvailability(TableName tableName) throws IOException {
2800    StringBuilder msg =
2801      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2802    if (getHBaseCluster().getMaster().isAlive()) {
2803      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2804        .getRegionStates().getRegionAssignments();
2805      final List<Pair<RegionInfo, ServerName>> metaLocations =
2806        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2807      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2808        RegionInfo hri = metaLocation.getFirst();
2809        ServerName sn = metaLocation.getSecond();
2810        if (!assignments.containsKey(hri)) {
2811          msg.append(", region ").append(hri)
2812            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2813        } else if (sn == null) {
2814          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2815        } else if (!sn.equals(assignments.get(hri))) {
2816          msg.append(",  region ").append(hri)
2817            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2818            .append(" <> ").append(assignments.get(hri));
2819        }
2820      }
2821    }
2822    return msg.toString();
2823  }
2824
2825  public String explainTableState(final TableName table, TableState.State state)
2826    throws IOException {
2827    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2828    if (tableState == null) {
2829      return "TableState in META: No table state in META for table " + table
2830        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2831    } else if (!tableState.inStates(state)) {
2832      return "TableState in META: Not " + state + " state, but " + tableState;
2833    } else {
2834      return "TableState in META: OK";
2835    }
2836  }
2837
2838  @Nullable
2839  public TableState findLastTableState(final TableName table) throws IOException {
2840    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2841    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2842      @Override
2843      public boolean visit(Result r) throws IOException {
2844        if (!Arrays.equals(r.getRow(), table.getName())) {
2845          return false;
2846        }
2847        TableState state = CatalogFamilyFormat.getTableState(r);
2848        if (state != null) {
2849          lastTableState.set(state);
2850        }
2851        return true;
2852      }
2853    };
2854    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2855      Integer.MAX_VALUE, visitor);
2856    return lastTableState.get();
2857  }
2858
2859  /**
2860   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2861   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2862   * table.
2863   * @param table the table to wait on.
2864   * @throws InterruptedException if interrupted while waiting
2865   * @throws IOException          if an IO problem is encountered
2866   */
2867  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2868    waitTableEnabled(table, 30000);
2869  }
2870
2871  /**
2872   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2873   * have been all assigned.
2874   * @see #waitTableEnabled(TableName, long)
2875   * @param table         Table to wait on.
2876   * @param timeoutMillis Time to wait on it being marked enabled.
2877   */
2878  public void waitTableEnabled(byte[] table, long timeoutMillis)
2879    throws InterruptedException, IOException {
2880    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2881  }
2882
2883  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2884    waitFor(timeoutMillis, predicateTableEnabled(table));
2885  }
2886
2887  /**
2888   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2889   * after default period (30 seconds)
2890   * @param table Table to wait on.
2891   */
2892  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2893    waitTableDisabled(table, 30000);
2894  }
2895
2896  public void waitTableDisabled(TableName table, long millisTimeout)
2897    throws InterruptedException, IOException {
2898    waitFor(millisTimeout, predicateTableDisabled(table));
2899  }
2900
2901  /**
2902   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2903   * @param table         Table to wait on.
2904   * @param timeoutMillis Time to wait on it being marked disabled.
2905   */
2906  public void waitTableDisabled(byte[] table, long timeoutMillis)
2907    throws InterruptedException, IOException {
2908    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2909  }
2910
2911  /**
2912   * Make sure that at least the specified number of region servers are running
2913   * @param num minimum number of region servers that should be running
2914   * @return true if we started some servers
2915   */
2916  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2917    boolean startedServer = false;
2918    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2919    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2920      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2921      startedServer = true;
2922    }
2923
2924    return startedServer;
2925  }
2926
2927  /**
2928   * Make sure that at least the specified number of region servers are running. We don't count the
2929   * ones that are currently stopping or are stopped.
2930   * @param num minimum number of region servers that should be running
2931   * @return true if we started some servers
2932   */
2933  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2934    boolean startedServer = ensureSomeRegionServersAvailable(num);
2935
2936    int nonStoppedServers = 0;
2937    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2938
2939      HRegionServer hrs = rst.getRegionServer();
2940      if (hrs.isStopping() || hrs.isStopped()) {
2941        LOG.info("A region server is stopped or stopping:" + hrs);
2942      } else {
2943        nonStoppedServers++;
2944      }
2945    }
2946    for (int i = nonStoppedServers; i < num; ++i) {
2947      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2948      startedServer = true;
2949    }
2950    return startedServer;
2951  }
2952
2953  /**
2954   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2955   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2956   * @param c                     Initial configuration
2957   * @param differentiatingSuffix Suffix to differentiate this user from others.
2958   * @return A new configuration instance with a different user set into it.
2959   */
2960  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2961    throws IOException {
2962    FileSystem currentfs = FileSystem.get(c);
2963    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2964      return User.getCurrent();
2965    }
2966    // Else distributed filesystem. Make a new instance per daemon. Below
2967    // code is taken from the AppendTestUtil over in hdfs.
2968    String username = User.getCurrent().getName() + differentiatingSuffix;
2969    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2970    return user;
2971  }
2972
2973  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2974    throws IOException {
2975    NavigableSet<String> online = new TreeSet<>();
2976    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2977      try {
2978        for (RegionInfo region : ProtobufUtil
2979          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2980          online.add(region.getRegionNameAsString());
2981        }
2982      } catch (RegionServerStoppedException e) {
2983        // That's fine.
2984      }
2985    }
2986    return online;
2987  }
2988
2989  /**
2990   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2991   * linger. Here is the exception you'll see:
2992   *
2993   * <pre>
2994   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2995   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2996   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2997   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2998   * </pre>
2999   *
3000   * @param stream A DFSClient.DFSOutputStream.
3001   */
3002  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3003    try {
3004      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3005      for (Class<?> clazz : clazzes) {
3006        String className = clazz.getSimpleName();
3007        if (className.equals("DFSOutputStream")) {
3008          if (clazz.isInstance(stream)) {
3009            Field maxRecoveryErrorCountField =
3010              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3011            maxRecoveryErrorCountField.setAccessible(true);
3012            maxRecoveryErrorCountField.setInt(stream, max);
3013            break;
3014          }
3015        }
3016      }
3017    } catch (Exception e) {
3018      LOG.info("Could not set max recovery field", e);
3019    }
3020  }
3021
3022  /**
3023   * Uses directly the assignment manager to assign the region. and waits until the specified region
3024   * has completed assignment.
3025   * @return true if the region is assigned false otherwise.
3026   */
3027  public boolean assignRegion(final RegionInfo regionInfo)
3028    throws IOException, InterruptedException {
3029    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3030    am.assign(regionInfo);
3031    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3032  }
3033
3034  /**
3035   * Move region to destination server and wait till region is completely moved and online
3036   * @param destRegion region to move
3037   * @param destServer destination server of the region
3038   */
3039  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3040    throws InterruptedException, IOException {
3041    HMaster master = getMiniHBaseCluster().getMaster();
3042    // TODO: Here we start the move. The move can take a while.
3043    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3044    while (true) {
3045      ServerName serverName =
3046        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3047      if (serverName != null && serverName.equals(destServer)) {
3048        assertRegionOnServer(destRegion, serverName, 2000);
3049        break;
3050      }
3051      Thread.sleep(10);
3052    }
3053  }
3054
3055  /**
3056   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3057   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3058   * master has been informed and updated hbase:meta with the regions deployed server.
3059   * @param tableName the table name
3060   */
3061  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3062    waitUntilAllRegionsAssigned(tableName,
3063      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3064  }
3065
3066  /**
3067   * Waith until all system table's regions get assigned
3068   */
3069  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3070    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3071  }
3072
3073  /**
3074   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3075   * timeout. This means all regions have been deployed, master has been informed and updated
3076   * hbase:meta with the regions deployed server.
3077   * @param tableName the table name
3078   * @param timeout   timeout, in milliseconds
3079   */
3080  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3081    throws IOException {
3082    if (!TableName.isMetaTableName(tableName)) {
3083      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3084        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3085          + timeout + "ms");
3086        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3087          @Override
3088          public String explainFailure() throws IOException {
3089            return explainTableAvailability(tableName);
3090          }
3091
3092          @Override
3093          public boolean evaluate() throws IOException {
3094            Scan scan = new Scan();
3095            scan.addFamily(HConstants.CATALOG_FAMILY);
3096            boolean tableFound = false;
3097            try (ResultScanner s = meta.getScanner(scan)) {
3098              for (Result r; (r = s.next()) != null;) {
3099                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3100                RegionInfo info = RegionInfo.parseFromOrNull(b);
3101                if (info != null && info.getTable().equals(tableName)) {
3102                  // Get server hosting this region from catalog family. Return false if no server
3103                  // hosting this region, or if the server hosting this region was recently killed
3104                  // (for fault tolerance testing).
3105                  tableFound = true;
3106                  byte[] server =
3107                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3108                  if (server == null) {
3109                    return false;
3110                  } else {
3111                    byte[] startCode =
3112                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3113                    ServerName serverName =
3114                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3115                        + Bytes.toLong(startCode));
3116                    if (
3117                      !getHBaseClusterInterface().isDistributedCluster()
3118                        && getHBaseCluster().isKilledRS(serverName)
3119                    ) {
3120                      return false;
3121                    }
3122                  }
3123                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3124                    return false;
3125                  }
3126                }
3127              }
3128            }
3129            if (!tableFound) {
3130              LOG.warn(
3131                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3132            }
3133            return tableFound;
3134          }
3135        });
3136      }
3137    }
3138    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3139    // check from the master state if we are using a mini cluster
3140    if (!getHBaseClusterInterface().isDistributedCluster()) {
3141      // So, all regions are in the meta table but make sure master knows of the assignments before
3142      // returning -- sometimes this can lag.
3143      HMaster master = getHBaseCluster().getMaster();
3144      final RegionStates states = master.getAssignmentManager().getRegionStates();
3145      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3146        @Override
3147        public String explainFailure() throws IOException {
3148          return explainTableAvailability(tableName);
3149        }
3150
3151        @Override
3152        public boolean evaluate() throws IOException {
3153          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3154          return hris != null && !hris.isEmpty();
3155        }
3156      });
3157    }
3158    LOG.info("All regions for table " + tableName + " assigned.");
3159  }
3160
3161  /**
3162   * Do a small get/scan against one store. This is required because store has no actual methods of
3163   * querying itself, and relies on StoreScanner.
3164   */
3165  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3166    Scan scan = new Scan(get);
3167    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3168      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3169      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3170      // readpoint 0.
3171      0);
3172
3173    List<Cell> result = new ArrayList<>();
3174    scanner.next(result);
3175    if (!result.isEmpty()) {
3176      // verify that we are on the row we want:
3177      Cell kv = result.get(0);
3178      if (!CellUtil.matchingRows(kv, get.getRow())) {
3179        result.clear();
3180      }
3181    }
3182    scanner.close();
3183    return result;
3184  }
3185
3186  /**
3187   * Create region split keys between startkey and endKey
3188   * @param numRegions the number of regions to be created. it has to be greater than 3.
3189   * @return resulting split keys
3190   */
3191  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3192    assertTrue(numRegions > 3);
3193    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3194    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3195    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3196    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3197    return result;
3198  }
3199
3200  /**
3201   * Do a small get/scan against one store. This is required because store has no actual methods of
3202   * querying itself, and relies on StoreScanner.
3203   */
3204  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3205    throws IOException {
3206    Get get = new Get(row);
3207    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3208    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3209
3210    return getFromStoreFile(store, get);
3211  }
3212
3213  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3214    final List<? extends Cell> actual) {
3215    final int eLen = expected.size();
3216    final int aLen = actual.size();
3217    final int minLen = Math.min(eLen, aLen);
3218
3219    int i = 0;
3220    while (
3221      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3222    ) {
3223      i++;
3224    }
3225
3226    if (additionalMsg == null) {
3227      additionalMsg = "";
3228    }
3229    if (!additionalMsg.isEmpty()) {
3230      additionalMsg = ". " + additionalMsg;
3231    }
3232
3233    if (eLen != aLen || i != minLen) {
3234      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3235        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3236        + " (length " + aLen + ")" + additionalMsg);
3237    }
3238  }
3239
3240  public static <T> String safeGetAsStr(List<T> lst, int i) {
3241    if (0 <= i && i < lst.size()) {
3242      return lst.get(i).toString();
3243    } else {
3244      return "<out_of_range>";
3245    }
3246  }
3247
3248  public String getRpcConnnectionURI() throws UnknownHostException {
3249    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3250  }
3251
3252  public String getZkConnectionURI() {
3253    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3254      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3255      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3256  }
3257
3258  /**
3259   * Get the zk based cluster key for this cluster.
3260   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3261   *             connection info of a cluster. Keep here only for compatibility.
3262   * @see #getRpcConnnectionURI()
3263   * @see #getZkConnectionURI()
3264   */
3265  @Deprecated
3266  public String getClusterKey() {
3267    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3268      + ":"
3269      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3270  }
3271
3272  /**
3273   * Creates a random table with the given parameters
3274   */
3275  public Table createRandomTable(TableName tableName, final Collection<String> families,
3276    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3277    final int numRowsPerFlush) throws IOException, InterruptedException {
3278    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3279      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3280      + maxVersions + "\n");
3281
3282    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3283    final int numCF = families.size();
3284    final byte[][] cfBytes = new byte[numCF][];
3285    {
3286      int cfIndex = 0;
3287      for (String cf : families) {
3288        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3289      }
3290    }
3291
3292    final int actualStartKey = 0;
3293    final int actualEndKey = Integer.MAX_VALUE;
3294    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3295    final int splitStartKey = actualStartKey + keysPerRegion;
3296    final int splitEndKey = actualEndKey - keysPerRegion;
3297    final String keyFormat = "%08x";
3298    final Table table = createTable(tableName, cfBytes, maxVersions,
3299      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3300      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3301
3302    if (hbaseCluster != null) {
3303      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3304    }
3305
3306    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3307
3308    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3309      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3310        final byte[] row = Bytes.toBytes(
3311          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3312
3313        Put put = new Put(row);
3314        Delete del = new Delete(row);
3315        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3316          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3317          final long ts = rand.nextInt();
3318          final byte[] qual = Bytes.toBytes("col" + iCol);
3319          if (rand.nextBoolean()) {
3320            final byte[] value =
3321              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3322                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3323            put.addColumn(cf, qual, ts, value);
3324          } else if (rand.nextDouble() < 0.8) {
3325            del.addColumn(cf, qual, ts);
3326          } else {
3327            del.addColumns(cf, qual, ts);
3328          }
3329        }
3330
3331        if (!put.isEmpty()) {
3332          mutator.mutate(put);
3333        }
3334
3335        if (!del.isEmpty()) {
3336          mutator.mutate(del);
3337        }
3338      }
3339      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3340      mutator.flush();
3341      if (hbaseCluster != null) {
3342        getMiniHBaseCluster().flushcache(table.getName());
3343      }
3344    }
3345    mutator.close();
3346
3347    return table;
3348  }
3349
3350  public static int randomFreePort() {
3351    return HBaseCommonTestingUtil.randomFreePort();
3352  }
3353
3354  public static String randomMultiCastAddress() {
3355    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3356  }
3357
3358  public static void waitForHostPort(String host, int port) throws IOException {
3359    final int maxTimeMs = 10000;
3360    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3361    IOException savedException = null;
3362    LOG.info("Waiting for server at " + host + ":" + port);
3363    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3364      try {
3365        Socket sock = new Socket(InetAddress.getByName(host), port);
3366        sock.close();
3367        savedException = null;
3368        LOG.info("Server at " + host + ":" + port + " is available");
3369        break;
3370      } catch (UnknownHostException e) {
3371        throw new IOException("Failed to look up " + host, e);
3372      } catch (IOException e) {
3373        savedException = e;
3374      }
3375      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3376    }
3377
3378    if (savedException != null) {
3379      throw savedException;
3380    }
3381  }
3382
3383  public static int getMetaRSPort(Connection connection) throws IOException {
3384    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3385      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3386    }
3387  }
3388
3389  /**
3390   * Due to async racing issue, a region may not be in the online region list of a region server
3391   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3392   */
3393  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3394    final long timeout) throws IOException, InterruptedException {
3395    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3396    while (true) {
3397      List<RegionInfo> regions = getAdmin().getRegions(server);
3398      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3399      long now = EnvironmentEdgeManager.currentTime();
3400      if (now > timeoutTime) break;
3401      Thread.sleep(10);
3402    }
3403    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3404  }
3405
3406  /**
3407   * Check to make sure the region is open on the specified region server, but not on any other one.
3408   */
3409  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3410    final long timeout) throws IOException, InterruptedException {
3411    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3412    while (true) {
3413      List<RegionInfo> regions = getAdmin().getRegions(server);
3414      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3415        List<JVMClusterUtil.RegionServerThread> rsThreads =
3416          getHBaseCluster().getLiveRegionServerThreads();
3417        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3418          HRegionServer rs = rsThread.getRegionServer();
3419          if (server.equals(rs.getServerName())) {
3420            continue;
3421          }
3422          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3423          for (HRegion r : hrs) {
3424            assertTrue(r.getRegionInfo().getRegionId() != hri.getRegionId(),
3425              "Region should not be double assigned");
3426          }
3427        }
3428        return; // good, we are happy
3429      }
3430      long now = EnvironmentEdgeManager.currentTime();
3431      if (now > timeoutTime) break;
3432      Thread.sleep(10);
3433    }
3434    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3435  }
3436
3437  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3438    TableDescriptor td =
3439      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3440    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3441    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3442  }
3443
3444  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3445    BlockCache blockCache) throws IOException {
3446    TableDescriptor td =
3447      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3448    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3449    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3450  }
3451
3452  public static void setFileSystemURI(String fsURI) {
3453    FS_URI = fsURI;
3454  }
3455
3456  /**
3457   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3458   */
3459  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3460    return new ExplainingPredicate<IOException>() {
3461      @Override
3462      public String explainFailure() throws IOException {
3463        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3464        return "found in transition: " + am.getRegionsInTransition().toString();
3465      }
3466
3467      @Override
3468      public boolean evaluate() throws IOException {
3469        HMaster master = getMiniHBaseCluster().getMaster();
3470        if (master == null) return false;
3471        AssignmentManager am = master.getAssignmentManager();
3472        if (am == null) return false;
3473        return !am.hasRegionsInTransition();
3474      }
3475    };
3476  }
3477
3478  /**
3479   * Returns a {@link Predicate} for checking that there are no procedure to region transition in
3480   * master
3481   */
3482  public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() {
3483    return new ExplainingPredicate<IOException>() {
3484      @Override
3485      public String explainFailure() throws IOException {
3486        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3487        return "Number of procedure scheduled for region transit: "
3488          + am.getRegionTransitScheduledCount();
3489      }
3490
3491      @Override
3492      public boolean evaluate() throws IOException {
3493        HMaster master = getMiniHBaseCluster().getMaster();
3494        if (master == null) {
3495          return false;
3496        }
3497        AssignmentManager am = master.getAssignmentManager();
3498        if (am == null) {
3499          return false;
3500        }
3501        return am.getRegionTransitScheduledCount() == 0;
3502      }
3503    };
3504  }
3505
3506  /**
3507   * Returns a {@link Predicate} for checking that table is enabled
3508   */
3509  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3510    return new ExplainingPredicate<IOException>() {
3511      @Override
3512      public String explainFailure() throws IOException {
3513        return explainTableState(tableName, TableState.State.ENABLED);
3514      }
3515
3516      @Override
3517      public boolean evaluate() throws IOException {
3518        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3519      }
3520    };
3521  }
3522
3523  /**
3524   * Returns a {@link Predicate} for checking that table is enabled
3525   */
3526  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3527    return new ExplainingPredicate<IOException>() {
3528      @Override
3529      public String explainFailure() throws IOException {
3530        return explainTableState(tableName, TableState.State.DISABLED);
3531      }
3532
3533      @Override
3534      public boolean evaluate() throws IOException {
3535        return getAdmin().isTableDisabled(tableName);
3536      }
3537    };
3538  }
3539
3540  /**
3541   * Returns a {@link Predicate} for checking that table is enabled
3542   */
3543  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3544    return new ExplainingPredicate<IOException>() {
3545      @Override
3546      public String explainFailure() throws IOException {
3547        return explainTableAvailability(tableName);
3548      }
3549
3550      @Override
3551      public boolean evaluate() throws IOException {
3552        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3553        if (tableAvailable) {
3554          try (Table table = getConnection().getTable(tableName)) {
3555            TableDescriptor htd = table.getDescriptor();
3556            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3557              .getAllRegionLocations()) {
3558              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3559                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3560                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3561              for (byte[] family : htd.getColumnFamilyNames()) {
3562                scan.addFamily(family);
3563              }
3564              try (ResultScanner scanner = table.getScanner(scan)) {
3565                scanner.next();
3566              }
3567            }
3568          }
3569        }
3570        return tableAvailable;
3571      }
3572    };
3573  }
3574
3575  /**
3576   * Wait until no regions in transition.
3577   * @param timeout How long to wait.
3578   */
3579  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3580    waitFor(timeout, predicateNoRegionsInTransition());
3581  }
3582
3583  /**
3584   * Wait until no regions in transition.
3585   * @param timeout How long to wait.
3586   */
3587  public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException {
3588    waitFor(timeout, predicateNoRegionTransitScheduled());
3589  }
3590
3591  /**
3592   * Wait until no regions in transition. (time limit 15min)
3593   */
3594  public void waitUntilNoRegionsInTransition() throws IOException {
3595    waitUntilNoRegionsInTransition(15 * 60000);
3596  }
3597
3598  /**
3599   * Wait until no TRSP is present
3600   */
3601  public void waitUntilNoRegionTransitScheduled() throws IOException {
3602    waitUntilNoRegionTransitScheduled(15 * 60000);
3603  }
3604
3605  /**
3606   * Wait until labels is ready in VisibilityLabelsCache.
3607   */
3608  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3609    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3610    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3611
3612      @Override
3613      public boolean evaluate() {
3614        for (String label : labels) {
3615          if (labelsCache.getLabelOrdinal(label) == 0) {
3616            return false;
3617          }
3618        }
3619        return true;
3620      }
3621
3622      @Override
3623      public String explainFailure() {
3624        for (String label : labels) {
3625          if (labelsCache.getLabelOrdinal(label) == 0) {
3626            return label + " is not available yet";
3627          }
3628        }
3629        return "";
3630      }
3631    });
3632  }
3633
3634  /**
3635   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3636   * available.
3637   * @return the list of column descriptors
3638   */
3639  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3640    return generateColumnDescriptors("");
3641  }
3642
3643  /**
3644   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3645   * available.
3646   * @param prefix family names prefix
3647   * @return the list of column descriptors
3648   */
3649  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3650    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3651    long familyId = 0;
3652    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3653      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3654        for (BloomType bloomType : BloomType.values()) {
3655          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3656          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3657            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3658          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3659          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3660          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3661          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3662          familyId++;
3663        }
3664      }
3665    }
3666    return columnFamilyDescriptors;
3667  }
3668
3669  /**
3670   * Get supported compression algorithms.
3671   * @return supported compression algorithms.
3672   */
3673  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3674    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3675    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3676    for (String algoName : allAlgos) {
3677      try {
3678        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3679        algo.getCompressor();
3680        supportedAlgos.add(algo);
3681      } catch (Throwable t) {
3682        // this algo is not available
3683      }
3684    }
3685    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3686  }
3687
3688  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3689    Scan scan = new Scan().withStartRow(row);
3690    scan.setReadType(ReadType.PREAD);
3691    scan.setCaching(1);
3692    scan.setReversed(true);
3693    scan.addFamily(family);
3694    try (RegionScanner scanner = r.getScanner(scan)) {
3695      List<Cell> cells = new ArrayList<>(1);
3696      scanner.next(cells);
3697      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3698        return null;
3699      }
3700      return Result.create(cells);
3701    }
3702  }
3703
3704  private boolean isTargetTable(final byte[] inRow, Cell c) {
3705    String inputRowString = Bytes.toString(inRow);
3706    int i = inputRowString.indexOf(HConstants.DELIMITER);
3707    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3708    int o = outputRowString.indexOf(HConstants.DELIMITER);
3709    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3710  }
3711
3712  /**
3713   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3714   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3715   * kerby KDC server and utility for using it,
3716   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3717   * less baggage. It came in in HBASE-5291.
3718   */
3719  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3720    Properties conf = MiniKdc.createConf();
3721    conf.put(MiniKdc.DEBUG, true);
3722    MiniKdc kdc = null;
3723    File dir = null;
3724    // There is time lag between selecting a port and trying to bind with it. It's possible that
3725    // another service captures the port in between which'll result in BindException.
3726    boolean bindException;
3727    int numTries = 0;
3728    do {
3729      try {
3730        bindException = false;
3731        dir = new File(getDataTestDir("kdc").toUri().getPath());
3732        kdc = new MiniKdc(conf, dir);
3733        kdc.start();
3734      } catch (BindException e) {
3735        FileUtils.deleteDirectory(dir); // clean directory
3736        numTries++;
3737        if (numTries == 3) {
3738          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3739          throw e;
3740        }
3741        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3742        bindException = true;
3743      }
3744    } while (bindException);
3745    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3746    return kdc;
3747  }
3748
3749  public int getNumHFiles(final TableName tableName, final byte[] family) {
3750    int numHFiles = 0;
3751    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3752      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3753    }
3754    return numHFiles;
3755  }
3756
3757  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3758    final byte[] family) {
3759    int numHFiles = 0;
3760    for (Region region : rs.getRegions(tableName)) {
3761      numHFiles += region.getStore(family).getStorefilesCount();
3762    }
3763    return numHFiles;
3764  }
3765
3766  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3767    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3768    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3769    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3770    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3771    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3772        it2 = rtdFamilies.iterator(); it.hasNext();) {
3773      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3774    }
3775  }
3776
3777  /**
3778   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3779   * invocations.
3780   */
3781  public static void await(final long sleepMillis, final BooleanSupplier condition)
3782    throws InterruptedException {
3783    try {
3784      while (!condition.getAsBoolean()) {
3785        Thread.sleep(sleepMillis);
3786      }
3787    } catch (RuntimeException e) {
3788      if (e.getCause() instanceof AssertionError) {
3789        throw (AssertionError) e.getCause();
3790      }
3791      throw e;
3792    }
3793  }
3794
3795  public void createRegionDir(RegionInfo hri) throws IOException {
3796    Path rootDir = getDataTestDir();
3797    Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable());
3798    Path regionDir = new Path(tableDir, hri.getEncodedName());
3799    FileSystem fs = getTestFileSystem();
3800    if (!fs.exists(regionDir)) {
3801      fs.mkdirs(regionDir);
3802    }
3803  }
3804
3805  public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem)
3806    throws IOException {
3807    Path tableDir =
3808      CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable());
3809    HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir,
3810      regionInfo);
3811  }
3812}