001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.io.UncheckedIOException;
024import java.lang.reflect.Field;
025import java.lang.reflect.Modifier;
026import java.net.BindException;
027import java.net.DatagramSocket;
028import java.net.InetAddress;
029import java.net.ServerSocket;
030import java.net.Socket;
031import java.net.UnknownHostException;
032import java.nio.charset.StandardCharsets;
033import java.security.MessageDigest;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Properties;
044import java.util.Random;
045import java.util.Set;
046import java.util.TreeSet;
047import java.util.concurrent.ThreadLocalRandom;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicReference;
050import java.util.function.BooleanSupplier;
051import org.apache.commons.io.FileUtils;
052import org.apache.commons.lang3.RandomStringUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
057import org.apache.hadoop.hbase.Waiter.Predicate;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.AsyncClusterConnection;
060import org.apache.hadoop.hbase.client.BufferedMutator;
061import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
064import org.apache.hadoop.hbase.client.Connection;
065import org.apache.hadoop.hbase.client.ConnectionFactory;
066import org.apache.hadoop.hbase.client.Consistency;
067import org.apache.hadoop.hbase.client.Delete;
068import org.apache.hadoop.hbase.client.Durability;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Hbck;
071import org.apache.hadoop.hbase.client.MasterRegistry;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.client.RegionLocator;
076import org.apache.hadoop.hbase.client.Result;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.Scan.ReadType;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.client.TableState;
084import org.apache.hadoop.hbase.fs.HFileSystem;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.BlockCache;
089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
090import org.apache.hadoop.hbase.io.hfile.HFile;
091import org.apache.hadoop.hbase.ipc.RpcServerInterface;
092import org.apache.hadoop.hbase.logging.Log4jUtils;
093import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
094import org.apache.hadoop.hbase.master.HMaster;
095import org.apache.hadoop.hbase.master.RegionState;
096import org.apache.hadoop.hbase.master.ServerManager;
097import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
098import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
099import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
100import org.apache.hadoop.hbase.master.assignment.RegionStates;
101import org.apache.hadoop.hbase.mob.MobFileCache;
102import org.apache.hadoop.hbase.regionserver.BloomType;
103import org.apache.hadoop.hbase.regionserver.ChunkCreator;
104import org.apache.hadoop.hbase.regionserver.HRegion;
105import org.apache.hadoop.hbase.regionserver.HRegionServer;
106import org.apache.hadoop.hbase.regionserver.HStore;
107import org.apache.hadoop.hbase.regionserver.InternalScanner;
108import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
109import org.apache.hadoop.hbase.regionserver.Region;
110import org.apache.hadoop.hbase.regionserver.RegionScanner;
111import org.apache.hadoop.hbase.regionserver.RegionServerServices;
112import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
113import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
114import org.apache.hadoop.hbase.security.User;
115import org.apache.hadoop.hbase.security.UserProvider;
116import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
117import org.apache.hadoop.hbase.util.Bytes;
118import org.apache.hadoop.hbase.util.CommonFSUtils;
119import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
120import org.apache.hadoop.hbase.util.FSUtils;
121import org.apache.hadoop.hbase.util.JVMClusterUtil;
122import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
123import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
124import org.apache.hadoop.hbase.util.Pair;
125import org.apache.hadoop.hbase.util.ReflectionUtils;
126import org.apache.hadoop.hbase.util.RegionSplitter;
127import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
128import org.apache.hadoop.hbase.util.RetryCounter;
129import org.apache.hadoop.hbase.util.Threads;
130import org.apache.hadoop.hbase.wal.WAL;
131import org.apache.hadoop.hbase.wal.WALFactory;
132import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
133import org.apache.hadoop.hbase.zookeeper.ZKConfig;
134import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
135import org.apache.hadoop.hdfs.DFSClient;
136import org.apache.hadoop.hdfs.DistributedFileSystem;
137import org.apache.hadoop.hdfs.MiniDFSCluster;
138import org.apache.hadoop.hdfs.server.datanode.DataNode;
139import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
140import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
141import org.apache.hadoop.mapred.JobConf;
142import org.apache.hadoop.mapred.MiniMRCluster;
143import org.apache.hadoop.mapred.TaskLog;
144import org.apache.hadoop.minikdc.MiniKdc;
145import org.apache.yetus.audience.InterfaceAudience;
146import org.apache.zookeeper.WatchedEvent;
147import org.apache.zookeeper.ZooKeeper;
148import org.apache.zookeeper.ZooKeeper.States;
149
150import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
151
152import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
153
154/**
155 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
156 * functionality. Create an instance and keep it around testing HBase. This class is meant to be
157 * your one-stop shop for anything you might need testing. Manages one cluster at a time only.
158 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type
159 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j
160 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set
161 * logging levels. In the configuration properties, default values for master-info-port and
162 * region-server-port are overridden such that a random port will be assigned (thus avoiding port
163 * contention if another local HBase instance is already running).
164 * <p>
165 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
166 * setting it to true.
167 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
168 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
169 */
170@InterfaceAudience.Public
171@Deprecated
172public class HBaseTestingUtility extends HBaseZKTestingUtility {
173
174  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
175  /**
176   * The default number of regions per regionserver when creating a pre-split table.
177   */
178  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
179
180  private MiniDFSCluster dfsCluster = null;
181  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
182
183  private volatile HBaseCluster hbaseCluster = null;
184  private MiniMRCluster mrCluster = null;
185
186  /** If there is a mini cluster running for this testing utility instance. */
187  private volatile boolean miniClusterRunning;
188
189  private String hadoopLogDir;
190
191  /**
192   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
193   */
194  private Path dataTestDirOnTestFS = null;
195
196  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
197
198  /** Filesystem URI used for map-reduce mini-cluster setup */
199  private static String FS_URI;
200
201  /** This is for unit tests parameterized with a single boolean. */
202  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
203
204  /**
205   * Checks to see if a specific port is available.
206   * @param port the port number to check for availability
207   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
208   */
209  public static boolean available(int port) {
210    ServerSocket ss = null;
211    DatagramSocket ds = null;
212    try {
213      ss = new ServerSocket(port);
214      ss.setReuseAddress(true);
215      ds = new DatagramSocket(port);
216      ds.setReuseAddress(true);
217      return true;
218    } catch (IOException e) {
219      // Do nothing
220    } finally {
221      if (ds != null) {
222        ds.close();
223      }
224
225      if (ss != null) {
226        try {
227          ss.close();
228        } catch (IOException e) {
229          /* should not be thrown */
230        }
231      }
232    }
233
234    return false;
235  }
236
237  /**
238   * Create all combinations of Bloom filters and compression algorithms for testing.
239   */
240  private static List<Object[]> bloomAndCompressionCombinations() {
241    List<Object[]> configurations = new ArrayList<>();
242    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
243      for (BloomType bloomType : BloomType.values()) {
244        configurations.add(new Object[] { comprAlgo, bloomType });
245      }
246    }
247    return Collections.unmodifiableList(configurations);
248  }
249
250  /**
251   * Create combination of memstoreTS and tags
252   */
253  private static List<Object[]> memStoreTSAndTagsCombination() {
254    List<Object[]> configurations = new ArrayList<>();
255    configurations.add(new Object[] { false, false });
256    configurations.add(new Object[] { false, true });
257    configurations.add(new Object[] { true, false });
258    configurations.add(new Object[] { true, true });
259    return Collections.unmodifiableList(configurations);
260  }
261
262  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
263    List<Object[]> configurations = new ArrayList<>();
264    configurations.add(new Object[] { false, false, true });
265    configurations.add(new Object[] { false, false, false });
266    configurations.add(new Object[] { false, true, true });
267    configurations.add(new Object[] { false, true, false });
268    configurations.add(new Object[] { true, false, true });
269    configurations.add(new Object[] { true, false, false });
270    configurations.add(new Object[] { true, true, true });
271    configurations.add(new Object[] { true, true, false });
272    return Collections.unmodifiableList(configurations);
273  }
274
275  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
276    bloomAndCompressionCombinations();
277
278  /**
279   * <p>
280   * Create an HBaseTestingUtility using a default configuration.
281   * <p>
282   * Initially, all tmp files are written to a local test data directory. Once
283   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
284   * data will be written to the DFS directory instead.
285   */
286  public HBaseTestingUtility() {
287    this(HBaseConfiguration.create());
288  }
289
290  /**
291   * <p>
292   * Create an HBaseTestingUtility using a given configuration.
293   * <p>
294   * Initially, all tmp files are written to a local test data directory. Once
295   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
296   * data will be written to the DFS directory instead.
297   * @param conf The configuration to use for further operations
298   */
299  public HBaseTestingUtility(Configuration conf) {
300    super(conf);
301
302    // a hbase checksum verification failure will cause unit tests to fail
303    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
304
305    // Save this for when setting default file:// breaks things
306    if (this.conf.get("fs.defaultFS") != null) {
307      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
308    }
309    if (this.conf.get(HConstants.HBASE_DIR) != null) {
310      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
311    }
312    // Every cluster is a local cluster until we start DFS
313    // Note that conf could be null, but this.conf will not be
314    String dataTestDir = getDataTestDir().toString();
315    this.conf.set("fs.defaultFS", "file:///");
316    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
317    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
318    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
319    // If the value for random ports isn't set set it to true, thus making
320    // tests opt-out for random port assignment
321    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
322      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
323  }
324
325  /**
326   * Close both the region {@code r} and it's underlying WAL. For use in tests.
327   */
328  public static void closeRegionAndWAL(final Region r) throws IOException {
329    closeRegionAndWAL((HRegion) r);
330  }
331
332  /**
333   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
334   */
335  public static void closeRegionAndWAL(final HRegion r) throws IOException {
336    if (r == null) return;
337    r.close();
338    if (r.getWAL() == null) return;
339    r.getWAL().close();
340  }
341
342  /**
343   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
344   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
345   * by the Configuration. If say, a Connection was being used against a cluster that had been
346   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
347   * Rather than use the return direct, its usually best to make a copy and use that. Do
348   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
349   * @return Instance of Configuration.
350   */
351  @Override
352  public Configuration getConfiguration() {
353    return super.getConfiguration();
354  }
355
356  public void setHBaseCluster(HBaseCluster hbaseCluster) {
357    this.hbaseCluster = hbaseCluster;
358  }
359
360  /**
361   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
362   * have many concurrent tests running if we need to. Moding a System property is not the way to do
363   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
364   * not anything can do about it at moment; single instance only is how the minidfscluster works.
365   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
366   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
367   * (We do not create them!).
368   * @return The calculated data test build directory, if newly-created.
369   */
370  @Override
371  protected Path setupDataTestDir() {
372    Path testPath = super.setupDataTestDir();
373    if (null == testPath) {
374      return null;
375    }
376
377    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
378
379    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
380    // we want our own value to ensure uniqueness on the same machine
381    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
382
383    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
384    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
385    return testPath;
386  }
387
388  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
389
390    String sysValue = System.getProperty(propertyName);
391
392    if (sysValue != null) {
393      // There is already a value set. So we do nothing but hope
394      // that there will be no conflicts
395      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
396        + " so I do NOT create it in " + parent);
397      String confValue = conf.get(propertyName);
398      if (confValue != null && !confValue.endsWith(sysValue)) {
399        LOG.warn(propertyName + " property value differs in configuration and system: "
400          + "Configuration=" + confValue + " while System=" + sysValue
401          + " Erasing configuration value by system value.");
402      }
403      conf.set(propertyName, sysValue);
404    } else {
405      // Ok, it's not set, so we create it as a subdirectory
406      createSubDir(propertyName, parent, subDirName);
407      System.setProperty(propertyName, conf.get(propertyName));
408    }
409  }
410
411  /**
412   * @return Where to write test data on the test filesystem; Returns working directory for the test
413   *         filesystem by default
414   * @see #setupDataTestDirOnTestFS()
415   * @see #getTestFileSystem()
416   */
417  private Path getBaseTestDirOnTestFS() throws IOException {
418    FileSystem fs = getTestFileSystem();
419    return new Path(fs.getWorkingDirectory(), "test-data");
420  }
421
422  /**
423   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
424   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
425   * on it.
426   * @return a unique path in the test filesystem
427   */
428  public Path getDataTestDirOnTestFS() throws IOException {
429    if (dataTestDirOnTestFS == null) {
430      setupDataTestDirOnTestFS();
431    }
432
433    return dataTestDirOnTestFS;
434  }
435
436  /**
437   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
438   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
439   * on it.
440   * @return a unique path in the test filesystem
441   * @param subdirName name of the subdir to create under the base test dir
442   */
443  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
444    return new Path(getDataTestDirOnTestFS(), subdirName);
445  }
446
447  /**
448   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
449   * setup.
450   */
451  private void setupDataTestDirOnTestFS() throws IOException {
452    if (dataTestDirOnTestFS != null) {
453      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
454      return;
455    }
456    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
457  }
458
459  /**
460   * Sets up a new path in test filesystem to be used by tests.
461   */
462  private Path getNewDataTestDirOnTestFS() throws IOException {
463    // The file system can be either local, mini dfs, or if the configuration
464    // is supplied externally, it can be an external cluster FS. If it is a local
465    // file system, the tests should use getBaseTestDir, otherwise, we can use
466    // the working directory, and create a unique sub dir there
467    FileSystem fs = getTestFileSystem();
468    Path newDataTestDir;
469    String randomStr = getRandomUUID().toString();
470    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
471      newDataTestDir = new Path(getDataTestDir(), randomStr);
472      File dataTestDir = new File(newDataTestDir.toString());
473      if (deleteOnExit()) dataTestDir.deleteOnExit();
474    } else {
475      Path base = getBaseTestDirOnTestFS();
476      newDataTestDir = new Path(base, randomStr);
477      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
478    }
479    return newDataTestDir;
480  }
481
482  /**
483   * Cleans the test data directory on the test filesystem.
484   * @return True if we removed the test dirs
485   */
486  public boolean cleanupDataTestDirOnTestFS() throws IOException {
487    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
488    if (ret) dataTestDirOnTestFS = null;
489    return ret;
490  }
491
492  /**
493   * Cleans a subdirectory under the test data directory on the test filesystem.
494   * @return True if we removed child
495   */
496  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
497    Path cpath = getDataTestDirOnTestFS(subdirName);
498    return getTestFileSystem().delete(cpath, true);
499  }
500
501  // Workaround to avoid IllegalThreadStateException
502  // See HBASE-27148 for more details
503  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
504
505    private volatile boolean stopped = false;
506
507    private final MiniDFSCluster cluster;
508
509    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
510      super("FsDatasetAsyncDiskServiceFixer");
511      setDaemon(true);
512      this.cluster = cluster;
513    }
514
515    @Override
516    public void run() {
517      while (!stopped) {
518        try {
519          Thread.sleep(30000);
520        } catch (InterruptedException e) {
521          Thread.currentThread().interrupt();
522          continue;
523        }
524        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
525        // timeout of the thread pool executor is 60 seconds by default.
526        try {
527          for (DataNode dn : cluster.getDataNodes()) {
528            FsDatasetSpi<?> dataset = dn.getFSDataset();
529            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
530            service.setAccessible(true);
531            Object asyncDiskService = service.get(dataset);
532            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
533            group.setAccessible(true);
534            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
535            if (threadGroup.isDaemon()) {
536              threadGroup.setDaemon(false);
537            }
538          }
539        } catch (NoSuchFieldException e) {
540          LOG.debug("NoSuchFieldException: " + e.getMessage()
541            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
542            + "See HBASE-27595 for details.");
543        } catch (Exception e) {
544          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
545        }
546      }
547    }
548
549    void shutdown() {
550      stopped = true;
551      interrupt();
552    }
553  }
554
555  /**
556   * Start a minidfscluster.
557   * @param servers How many DNs to start.
558   * @see #shutdownMiniDFSCluster()
559   * @return The mini dfs cluster created.
560   */
561  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
562    return startMiniDFSCluster(servers, null);
563  }
564
565  /**
566   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
567   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
568   * instances of the datanodes will have the same host name.
569   * @param hosts hostnames DNs to run on.
570   * @see #shutdownMiniDFSCluster()
571   * @return The mini dfs cluster created.
572   */
573  public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception {
574    if (hosts != null && hosts.length != 0) {
575      return startMiniDFSCluster(hosts.length, hosts);
576    } else {
577      return startMiniDFSCluster(1, null);
578    }
579  }
580
581  /**
582   * Start a minidfscluster. Can only create one.
583   * @param servers How many DNs to start.
584   * @param hosts   hostnames DNs to run on.
585   * @see #shutdownMiniDFSCluster()
586   * @return The mini dfs cluster created.
587   */
588  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception {
589    return startMiniDFSCluster(servers, null, hosts);
590  }
591
592  private void setFs() throws IOException {
593    if (this.dfsCluster == null) {
594      LOG.info("Skipping setting fs because dfsCluster is null");
595      return;
596    }
597    FileSystem fs = this.dfsCluster.getFileSystem();
598    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
599
600    // re-enable this check with dfs
601    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
602  }
603
604  public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
605    throws Exception {
606    createDirsAndSetProperties();
607    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
608
609    this.dfsCluster =
610      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
611    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
612    this.dfsClusterFixer.start();
613    // Set this just-started cluster as our filesystem.
614    setFs();
615
616    // Wait for the cluster to be totally up
617    this.dfsCluster.waitClusterUp();
618
619    // reset the test directory for test file system
620    dataTestDirOnTestFS = null;
621    String dataTestDir = getDataTestDir().toString();
622    conf.set(HConstants.HBASE_DIR, dataTestDir);
623    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
624
625    return this.dfsCluster;
626  }
627
628  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
629    createDirsAndSetProperties();
630    dfsCluster =
631      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
632    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
633    this.dfsClusterFixer.start();
634    return dfsCluster;
635  }
636
637  /**
638   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
639   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
640   * the conf.
641   *
642   * <pre>
643   * Configuration conf = TEST_UTIL.getConfiguration();
644   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
645   *   Map.Entry&lt;String, String&gt; e = i.next();
646   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
647   * }
648   * </pre>
649   */
650  private void createDirsAndSetProperties() throws IOException {
651    setupClusterTestDir();
652    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
653    createDirAndSetProperty("test.cache.data");
654    createDirAndSetProperty("hadoop.tmp.dir");
655    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
656    createDirAndSetProperty("mapreduce.cluster.local.dir");
657    createDirAndSetProperty("mapreduce.cluster.temp.dir");
658    enableShortCircuit();
659
660    Path root = getDataTestDirOnTestFS("hadoop");
661    conf.set(MapreduceTestingShim.getMROutputDirProp(),
662      new Path(root, "mapred-output-dir").toString());
663    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
664    conf.set("mapreduce.jobtracker.staging.root.dir",
665      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
666    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
667    conf.set("yarn.app.mapreduce.am.staging-dir",
668      new Path(root, "mapreduce-am-staging-root-dir").toString());
669
670    // Frustrate yarn's and hdfs's attempts at writing /tmp.
671    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
672    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
673    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
674    createDirAndSetProperty("yarn.nodemanager.log-dirs");
675    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
676    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
677    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
678    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
679    createDirAndSetProperty("dfs.journalnode.edits.dir");
680    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
681    createDirAndSetProperty("nfs.dump.dir");
682    createDirAndSetProperty("java.io.tmpdir");
683    createDirAndSetProperty("dfs.journalnode.edits.dir");
684    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
685    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
686  }
687
688  /**
689   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
690   * Default to false.
691   */
692  public boolean isNewVersionBehaviorEnabled() {
693    final String propName = "hbase.tests.new.version.behavior";
694    String v = System.getProperty(propName);
695    if (v != null) {
696      return Boolean.parseBoolean(v);
697    }
698    return false;
699  }
700
701  /**
702   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
703   * allows to specify this parameter on the command line. If not set, default is true.
704   */
705  public boolean isReadShortCircuitOn() {
706    final String propName = "hbase.tests.use.shortcircuit.reads";
707    String readOnProp = System.getProperty(propName);
708    if (readOnProp != null) {
709      return Boolean.parseBoolean(readOnProp);
710    } else {
711      return conf.getBoolean(propName, false);
712    }
713  }
714
715  /**
716   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
717   * including skipping the hdfs checksum checks.
718   */
719  private void enableShortCircuit() {
720    if (isReadShortCircuitOn()) {
721      String curUser = System.getProperty("user.name");
722      LOG.info("read short circuit is ON for user " + curUser);
723      // read short circuit, for hdfs
724      conf.set("dfs.block.local-path-access.user", curUser);
725      // read short circuit, for hbase
726      conf.setBoolean("dfs.client.read.shortcircuit", true);
727      // Skip checking checksum, for the hdfs client and the datanode
728      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
729    } else {
730      LOG.info("read short circuit is OFF");
731    }
732  }
733
734  private String createDirAndSetProperty(final String property) {
735    return createDirAndSetProperty(property, property);
736  }
737
738  private String createDirAndSetProperty(final String relPath, String property) {
739    String path = getDataTestDir(relPath).toString();
740    System.setProperty(property, path);
741    conf.set(property, path);
742    new File(path).mkdirs();
743    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
744    return path;
745  }
746
747  /**
748   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
749   */
750  public void shutdownMiniDFSCluster() throws IOException {
751    if (this.dfsCluster != null) {
752      // The below throws an exception per dn, AsynchronousCloseException.
753      this.dfsCluster.shutdown();
754      dfsCluster = null;
755      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
756      // have a fixer
757      if (dfsClusterFixer != null) {
758        this.dfsClusterFixer.shutdown();
759        dfsClusterFixer = null;
760      }
761      dataTestDirOnTestFS = null;
762      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
763    }
764  }
765
766  /**
767   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
768   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
769   * @param createWALDir Whether to create a new WAL directory.
770   * @return The mini HBase cluster created.
771   * @see #shutdownMiniCluster()
772   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
773   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
774   * @see #startMiniCluster(StartMiniClusterOption)
775   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
776   */
777  @Deprecated
778  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
779    StartMiniClusterOption option =
780      StartMiniClusterOption.builder().createWALDir(createWALDir).build();
781    return startMiniCluster(option);
782  }
783
784  /**
785   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
786   * defined in {@link StartMiniClusterOption.Builder}.
787   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
788   * @param createRootDir Whether to create a new root or data directory path.
789   * @return The mini HBase cluster created.
790   * @see #shutdownMiniCluster()
791   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
792   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
793   * @see #startMiniCluster(StartMiniClusterOption)
794   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
795   */
796  @Deprecated
797  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception {
798    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
799      .numDataNodes(numSlaves).createRootDir(createRootDir).build();
800    return startMiniCluster(option);
801  }
802
803  /**
804   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
805   * defined in {@link StartMiniClusterOption.Builder}.
806   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
807   * @param createRootDir Whether to create a new root or data directory path.
808   * @param createWALDir  Whether to create a new WAL directory.
809   * @return The mini HBase cluster created.
810   * @see #shutdownMiniCluster()
811   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
812   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
813   * @see #startMiniCluster(StartMiniClusterOption)
814   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
815   */
816  @Deprecated
817  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
818    boolean createWALDir) throws Exception {
819    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
820      .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build();
821    return startMiniCluster(option);
822  }
823
824  /**
825   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
826   * defined in {@link StartMiniClusterOption.Builder}.
827   * @param numMasters    Master node number.
828   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
829   * @param createRootDir Whether to create a new root or data directory path.
830   * @return The mini HBase cluster created.
831   * @see #shutdownMiniCluster()
832   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
833   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
834   * @see #startMiniCluster(StartMiniClusterOption)
835   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
836   */
837  @Deprecated
838  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
839    throws Exception {
840    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
841      .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build();
842    return startMiniCluster(option);
843  }
844
845  /**
846   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
847   * defined in {@link StartMiniClusterOption.Builder}.
848   * @param numMasters Master node number.
849   * @param numSlaves  Slave node number, for both HBase region server and HDFS data node.
850   * @return The mini HBase cluster created.
851   * @see #shutdownMiniCluster()
852   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
853   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
854   * @see #startMiniCluster(StartMiniClusterOption)
855   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
856   */
857  @Deprecated
858  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
859    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
860      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
861    return startMiniCluster(option);
862  }
863
864  /**
865   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
866   * defined in {@link StartMiniClusterOption.Builder}.
867   * @param numMasters    Master node number.
868   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
869   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
870   *                      HDFS data node number.
871   * @param createRootDir Whether to create a new root or data directory path.
872   * @return The mini HBase cluster created.
873   * @see #shutdownMiniCluster()
874   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
875   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
876   * @see #startMiniCluster(StartMiniClusterOption)
877   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
878   */
879  @Deprecated
880  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
881    boolean createRootDir) throws Exception {
882    StartMiniClusterOption option =
883      StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves)
884        .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
885    return startMiniCluster(option);
886  }
887
888  /**
889   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
890   * defined in {@link StartMiniClusterOption.Builder}.
891   * @param numMasters    Master node number.
892   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
893   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
894   *                      HDFS data node number.
895   * @return The mini HBase cluster created.
896   * @see #shutdownMiniCluster()
897   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
898   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
899   * @see #startMiniCluster(StartMiniClusterOption)
900   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
901   */
902  @Deprecated
903  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
904    throws Exception {
905    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
906      .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
907    return startMiniCluster(option);
908  }
909
910  /**
911   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
912   * defined in {@link StartMiniClusterOption.Builder}.
913   * @param numMasters       Master node number.
914   * @param numRegionServers Number of region servers.
915   * @param numDataNodes     Number of datanodes.
916   * @return The mini HBase cluster created.
917   * @see #shutdownMiniCluster()
918   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
919   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
920   * @see #startMiniCluster(StartMiniClusterOption)
921   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
922   */
923  @Deprecated
924  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
925    throws Exception {
926    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
927      .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build();
928    return startMiniCluster(option);
929  }
930
931  /**
932   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
933   * defined in {@link StartMiniClusterOption.Builder}.
934   * @param numMasters    Master node number.
935   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
936   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
937   *                      HDFS data node number.
938   * @param masterClass   The class to use as HMaster, or null for default.
939   * @param rsClass       The class to use as HRegionServer, or null for default.
940   * @return The mini HBase cluster created.
941   * @see #shutdownMiniCluster()
942   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
943   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
944   * @see #startMiniCluster(StartMiniClusterOption)
945   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
946   */
947  @Deprecated
948  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
949    Class<? extends HMaster> masterClass,
950    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
951    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
952      .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves)
953      .dataNodeHosts(dataNodeHosts).build();
954    return startMiniCluster(option);
955  }
956
957  /**
958   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
959   * defined in {@link StartMiniClusterOption.Builder}.
960   * @param numMasters       Master node number.
961   * @param numRegionServers Number of region servers.
962   * @param numDataNodes     Number of datanodes.
963   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
964   *                         overwrite HDFS data node number.
965   * @param masterClass      The class to use as HMaster, or null for default.
966   * @param rsClass          The class to use as HRegionServer, or null for default.
967   * @return The mini HBase cluster created.
968   * @see #shutdownMiniCluster()
969   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
970   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
971   * @see #startMiniCluster(StartMiniClusterOption)
972   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
973   */
974  @Deprecated
975  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
976    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
977    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
978    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
979      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
980      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build();
981    return startMiniCluster(option);
982  }
983
984  /**
985   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
986   * defined in {@link StartMiniClusterOption.Builder}.
987   * @param numMasters       Master node number.
988   * @param numRegionServers Number of region servers.
989   * @param numDataNodes     Number of datanodes.
990   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
991   *                         overwrite HDFS data node number.
992   * @param masterClass      The class to use as HMaster, or null for default.
993   * @param rsClass          The class to use as HRegionServer, or null for default.
994   * @param createRootDir    Whether to create a new root or data directory path.
995   * @param createWALDir     Whether to create a new WAL directory.
996   * @return The mini HBase cluster created.
997   * @see #shutdownMiniCluster()
998   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
999   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
1000   * @see #startMiniCluster(StartMiniClusterOption)
1001   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1002   */
1003  @Deprecated
1004  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1005    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1006    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1007    boolean createWALDir) throws Exception {
1008    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1009      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
1010      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir)
1011      .createWALDir(createWALDir).build();
1012    return startMiniCluster(option);
1013  }
1014
1015  /**
1016   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
1017   * other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1018   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1019   * @see #startMiniCluster(StartMiniClusterOption option)
1020   * @see #shutdownMiniDFSCluster()
1021   */
1022  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1023    StartMiniClusterOption option =
1024      StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1025    return startMiniCluster(option);
1026  }
1027
1028  /**
1029   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
1030   * value can be found in {@link StartMiniClusterOption.Builder}.
1031   * @see #startMiniCluster(StartMiniClusterOption option)
1032   * @see #shutdownMiniDFSCluster()
1033   */
1034  public MiniHBaseCluster startMiniCluster() throws Exception {
1035    return startMiniCluster(StartMiniClusterOption.builder().build());
1036  }
1037
1038  /**
1039   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
1040   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
1041   * under System property test.build.data, to be cleaned up on exit.
1042   * @see #shutdownMiniDFSCluster()
1043   */
1044  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1045    LOG.info("Starting up minicluster with option: {}", option);
1046
1047    // If we already put up a cluster, fail.
1048    if (miniClusterRunning) {
1049      throw new IllegalStateException("A mini-cluster is already running");
1050    }
1051    miniClusterRunning = true;
1052
1053    setupClusterTestDir();
1054
1055    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1056    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1057    if (dfsCluster == null) {
1058      LOG.info("STARTING DFS");
1059      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1060    } else {
1061      LOG.info("NOT STARTING DFS");
1062    }
1063
1064    // Start up a zk cluster.
1065    if (getZkCluster() == null) {
1066      startMiniZKCluster(option.getNumZkServers());
1067    }
1068
1069    // Start the MiniHBaseCluster
1070    return startMiniHBaseCluster(option);
1071  }
1072
1073  /**
1074   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1075   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1076   * @return Reference to the hbase mini hbase cluster.
1077   * @see #startMiniCluster(StartMiniClusterOption)
1078   * @see #shutdownMiniHBaseCluster()
1079   */
1080  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1081    throws IOException, InterruptedException {
1082    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1083    createRootDir(option.isCreateRootDir());
1084    if (option.isCreateWALDir()) {
1085      createWALRootDir();
1086    }
1087    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1088    // for tests that do not read hbase-defaults.xml
1089    setHBaseFsTmpDir();
1090
1091    // These settings will make the server waits until this exact number of
1092    // regions servers are connected.
1093    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1094      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1095    }
1096    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1097      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1098    }
1099
1100    Configuration c = new Configuration(this.conf);
1101    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1102      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1103      option.getMasterClass(), option.getRsClass());
1104    // Populate the master address configuration from mini cluster configuration.
1105    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1106    // Don't leave here till we've done a successful scan of the hbase:meta
1107    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1108      ResultScanner s = t.getScanner(new Scan())) {
1109      for (;;) {
1110        if (s.next() == null) {
1111          break;
1112        }
1113      }
1114    }
1115
1116    getAdmin(); // create immediately the hbaseAdmin
1117    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1118
1119    return (MiniHBaseCluster) hbaseCluster;
1120  }
1121
1122  /**
1123   * Starts up mini hbase cluster using default options. Default options can be found in
1124   * {@link StartMiniClusterOption.Builder}.
1125   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1126   * @see #shutdownMiniHBaseCluster()
1127   */
1128  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1129    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1130  }
1131
1132  /**
1133   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1134   * {@link #startMiniCluster()}. All other options will use default values, defined in
1135   * {@link StartMiniClusterOption.Builder}.
1136   * @param numMasters       Master node number.
1137   * @param numRegionServers Number of region servers.
1138   * @return The mini HBase cluster created.
1139   * @see #shutdownMiniHBaseCluster()
1140   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1141   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1142   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1143   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1144   */
1145  @Deprecated
1146  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1147    throws IOException, InterruptedException {
1148    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1149      .numRegionServers(numRegionServers).build();
1150    return startMiniHBaseCluster(option);
1151  }
1152
1153  /**
1154   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1155   * {@link #startMiniCluster()}. All other options will use default values, defined in
1156   * {@link StartMiniClusterOption.Builder}.
1157   * @param numMasters       Master node number.
1158   * @param numRegionServers Number of region servers.
1159   * @param rsPorts          Ports that RegionServer should use.
1160   * @return The mini HBase cluster created.
1161   * @see #shutdownMiniHBaseCluster()
1162   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1163   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1164   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1165   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1166   */
1167  @Deprecated
1168  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1169    List<Integer> rsPorts) throws IOException, InterruptedException {
1170    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1171      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1172    return startMiniHBaseCluster(option);
1173  }
1174
1175  /**
1176   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1177   * {@link #startMiniCluster()}. All other options will use default values, defined in
1178   * {@link StartMiniClusterOption.Builder}.
1179   * @param numMasters       Master node number.
1180   * @param numRegionServers Number of region servers.
1181   * @param rsPorts          Ports that RegionServer should use.
1182   * @param masterClass      The class to use as HMaster, or null for default.
1183   * @param rsClass          The class to use as HRegionServer, or null for default.
1184   * @param createRootDir    Whether to create a new root or data directory path.
1185   * @param createWALDir     Whether to create a new WAL directory.
1186   * @return The mini HBase cluster created.
1187   * @see #shutdownMiniHBaseCluster()
1188   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1189   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1190   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1191   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1192   */
1193  @Deprecated
1194  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1195    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1196    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1197    boolean createWALDir) throws IOException, InterruptedException {
1198    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1199      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1200      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1201    return startMiniHBaseCluster(option);
1202  }
1203
1204  /**
1205   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1206   * want to keep dfs/zk up and just stop/start hbase.
1207   * @param servers number of region servers
1208   */
1209  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1210    this.restartHBaseCluster(servers, null);
1211  }
1212
1213  public void restartHBaseCluster(int servers, List<Integer> ports)
1214    throws IOException, InterruptedException {
1215    StartMiniClusterOption option =
1216      StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1217    restartHBaseCluster(option);
1218    invalidateConnection();
1219  }
1220
1221  public void restartHBaseCluster(StartMiniClusterOption option)
1222    throws IOException, InterruptedException {
1223    closeConnection();
1224    this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(),
1225      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1226      option.getMasterClass(), option.getRsClass());
1227    // Don't leave here till we've done a successful scan of the hbase:meta
1228    Connection conn = ConnectionFactory.createConnection(this.conf);
1229    Table t = conn.getTable(TableName.META_TABLE_NAME);
1230    ResultScanner s = t.getScanner(new Scan());
1231    while (s.next() != null) {
1232      // do nothing
1233    }
1234    LOG.info("HBase has been restarted");
1235    s.close();
1236    t.close();
1237    conn.close();
1238  }
1239
1240  /**
1241   * @return Current mini hbase cluster. Only has something in it after a call to
1242   *         {@link #startMiniCluster()}.
1243   * @see #startMiniCluster()
1244   */
1245  public MiniHBaseCluster getMiniHBaseCluster() {
1246    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1247      return (MiniHBaseCluster) this.hbaseCluster;
1248    }
1249    throw new RuntimeException(
1250      hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName());
1251  }
1252
1253  /**
1254   * Stops mini hbase, zk, and hdfs clusters.
1255   * @see #startMiniCluster(int)
1256   */
1257  public void shutdownMiniCluster() throws IOException {
1258    LOG.info("Shutting down minicluster");
1259    shutdownMiniHBaseCluster();
1260    shutdownMiniDFSCluster();
1261    shutdownMiniZKCluster();
1262
1263    cleanupTestDir();
1264    miniClusterRunning = false;
1265    LOG.info("Minicluster is down");
1266  }
1267
1268  /**
1269   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1270   * @throws java.io.IOException in case command is unsuccessful
1271   */
1272  public void shutdownMiniHBaseCluster() throws IOException {
1273    cleanup();
1274    if (this.hbaseCluster != null) {
1275      this.hbaseCluster.shutdown();
1276      // Wait till hbase is down before going on to shutdown zk.
1277      this.hbaseCluster.waitUntilShutDown();
1278      this.hbaseCluster = null;
1279    }
1280    if (zooKeeperWatcher != null) {
1281      zooKeeperWatcher.close();
1282      zooKeeperWatcher = null;
1283    }
1284  }
1285
1286  /**
1287   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1288   * @throws java.io.IOException throws in case command is unsuccessful
1289   */
1290  public void killMiniHBaseCluster() throws IOException {
1291    cleanup();
1292    if (this.hbaseCluster != null) {
1293      getMiniHBaseCluster().killAll();
1294      this.hbaseCluster = null;
1295    }
1296    if (zooKeeperWatcher != null) {
1297      zooKeeperWatcher.close();
1298      zooKeeperWatcher = null;
1299    }
1300  }
1301
1302  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1303  private void cleanup() throws IOException {
1304    closeConnection();
1305    // unset the configuration for MIN and MAX RS to start
1306    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1307    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1308  }
1309
1310  /**
1311   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1312   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1313   * If false, previous path is used. Note: this does not cause the root dir to be created.
1314   * @return Fully qualified path for the default hbase root dir
1315   */
1316  public Path getDefaultRootDirPath(boolean create) throws IOException {
1317    if (!create) {
1318      return getDataTestDirOnTestFS();
1319    } else {
1320      return getNewDataTestDirOnTestFS();
1321    }
1322  }
1323
1324  /**
1325   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that
1326   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1327   * @return Fully qualified path for the default hbase root dir
1328   */
1329  public Path getDefaultRootDirPath() throws IOException {
1330    return getDefaultRootDirPath(false);
1331  }
1332
1333  /**
1334   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1335   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1336   * startup. You'd only use this method if you were doing manual operation.
1337   * @param create This flag decides whether to get a new root or data directory path or not, if it
1338   *               has been fetched already. Note : Directory will be made irrespective of whether
1339   *               path has been fetched or not. If directory already exists, it will be overwritten
1340   * @return Fully qualified path to hbase root dir
1341   */
1342  public Path createRootDir(boolean create) throws IOException {
1343    FileSystem fs = FileSystem.get(this.conf);
1344    Path hbaseRootdir = getDefaultRootDirPath(create);
1345    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1346    fs.mkdirs(hbaseRootdir);
1347    FSUtils.setVersion(fs, hbaseRootdir);
1348    return hbaseRootdir;
1349  }
1350
1351  /**
1352   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that
1353   * <code>create</code> flag is false.
1354   * @return Fully qualified path to hbase root dir
1355   */
1356  public Path createRootDir() throws IOException {
1357    return createRootDir(false);
1358  }
1359
1360  /**
1361   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1362   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1363   * this method if you were doing manual operation.
1364   * @return Fully qualified path to hbase root dir
1365   */
1366  public Path createWALRootDir() throws IOException {
1367    FileSystem fs = FileSystem.get(this.conf);
1368    Path walDir = getNewDataTestDirOnTestFS();
1369    CommonFSUtils.setWALRootDir(this.conf, walDir);
1370    fs.mkdirs(walDir);
1371    return walDir;
1372  }
1373
1374  private void setHBaseFsTmpDir() throws IOException {
1375    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1376    if (hbaseFsTmpDirInString == null) {
1377      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1378      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1379    } else {
1380      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1381    }
1382  }
1383
1384  /**
1385   * Flushes all caches in the mini hbase cluster
1386   */
1387  public void flush() throws IOException {
1388    getMiniHBaseCluster().flushcache();
1389  }
1390
1391  /**
1392   * Flushes all caches in the mini hbase cluster
1393   */
1394  public void flush(TableName tableName) throws IOException {
1395    getMiniHBaseCluster().flushcache(tableName);
1396  }
1397
1398  /**
1399   * Compact all regions in the mini hbase cluster
1400   */
1401  public void compact(boolean major) throws IOException {
1402    getMiniHBaseCluster().compact(major);
1403  }
1404
1405  /**
1406   * Compact all of a table's reagion in the mini hbase cluster
1407   */
1408  public void compact(TableName tableName, boolean major) throws IOException {
1409    getMiniHBaseCluster().compact(tableName, major);
1410  }
1411
1412  /**
1413   * Create a table.
1414   * @return A Table instance for the created table.
1415   */
1416  public Table createTable(TableName tableName, String family) throws IOException {
1417    return createTable(tableName, new String[] { family });
1418  }
1419
1420  /**
1421   * Create a table.
1422   * @return A Table instance for the created table.
1423   */
1424  public Table createTable(TableName tableName, String[] families) throws IOException {
1425    List<byte[]> fams = new ArrayList<>(families.length);
1426    for (String family : families) {
1427      fams.add(Bytes.toBytes(family));
1428    }
1429    return createTable(tableName, fams.toArray(new byte[0][]));
1430  }
1431
1432  /**
1433   * Create a table.
1434   * @return A Table instance for the created table.
1435   */
1436  public Table createTable(TableName tableName, byte[] family) throws IOException {
1437    return createTable(tableName, new byte[][] { family });
1438  }
1439
1440  /**
1441   * Create a table with multiple regions.
1442   * @return A Table instance for the created table.
1443   */
1444  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1445    throws IOException {
1446    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1447    byte[] startKey = Bytes.toBytes("aaaaa");
1448    byte[] endKey = Bytes.toBytes("zzzzz");
1449    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1450
1451    return createTable(tableName, new byte[][] { family }, splitKeys);
1452  }
1453
1454  /**
1455   * Create a table.
1456   * @return A Table instance for the created table.
1457   */
1458  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1459    return createTable(tableName, families, (byte[][]) null);
1460  }
1461
1462  /**
1463   * Create a table with multiple regions.
1464   * @return A Table instance for the created table.
1465   */
1466  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1467    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1468  }
1469
1470  /**
1471   * Create a table with multiple regions.
1472   * @param replicaCount replica count.
1473   * @return A Table instance for the created table.
1474   */
1475  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1476    throws IOException {
1477    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1478  }
1479
1480  /**
1481   * Create a table.
1482   * @return A Table instance for the created table.
1483   */
1484  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1485    throws IOException {
1486    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1487  }
1488
1489  /**
1490   * Create a table.
1491   * @param tableName    the table name
1492   * @param families     the families
1493   * @param splitKeys    the splitkeys
1494   * @param replicaCount the region replica count
1495   * @return A Table instance for the created table.
1496   * @throws IOException throws IOException
1497   */
1498  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1499    int replicaCount) throws IOException {
1500    return createTable(tableName, families, splitKeys, replicaCount,
1501      new Configuration(getConfiguration()));
1502  }
1503
1504  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1505    byte[] endKey, int numRegions) throws IOException {
1506    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1507
1508    getAdmin().createTable(desc, startKey, endKey, numRegions);
1509    // HBaseAdmin only waits for regions to appear in hbase:meta we
1510    // should wait until they are assigned
1511    waitUntilAllRegionsAssigned(tableName);
1512    return getConnection().getTable(tableName);
1513  }
1514
1515  /**
1516   * Create a table.
1517   * @param c Configuration to use
1518   * @return A Table instance for the created table.
1519   */
1520  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1521    throws IOException {
1522    return createTable(htd, families, null, c);
1523  }
1524
1525  /**
1526   * Create a table.
1527   * @param htd       table descriptor
1528   * @param families  array of column families
1529   * @param splitKeys array of split keys
1530   * @param c         Configuration to use
1531   * @return A Table instance for the created table.
1532   * @throws IOException if getAdmin or createTable fails
1533   */
1534  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1535    Configuration c) throws IOException {
1536    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1537    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1538    // on is interfering.
1539    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1540  }
1541
1542  /**
1543   * Create a table.
1544   * @param htd       table descriptor
1545   * @param families  array of column families
1546   * @param splitKeys array of split keys
1547   * @param type      Bloom type
1548   * @param blockSize block size
1549   * @param c         Configuration to use
1550   * @return A Table instance for the created table.
1551   * @throws IOException if getAdmin or createTable fails
1552   */
1553
1554  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1555    BloomType type, int blockSize, Configuration c) throws IOException {
1556    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1557    for (byte[] family : families) {
1558      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1559        .setBloomFilterType(type).setBlocksize(blockSize);
1560      if (isNewVersionBehaviorEnabled()) {
1561        cfdb.setNewVersionBehavior(true);
1562      }
1563      builder.setColumnFamily(cfdb.build());
1564    }
1565    TableDescriptor td = builder.build();
1566    if (splitKeys != null) {
1567      getAdmin().createTable(td, splitKeys);
1568    } else {
1569      getAdmin().createTable(td);
1570    }
1571    // HBaseAdmin only waits for regions to appear in hbase:meta
1572    // we should wait until they are assigned
1573    waitUntilAllRegionsAssigned(td.getTableName());
1574    return getConnection().getTable(td.getTableName());
1575  }
1576
1577  /**
1578   * Create a table.
1579   * @param htd       table descriptor
1580   * @param splitRows array of split keys
1581   * @return A Table instance for the created table.
1582   */
1583  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1584    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1585    if (isNewVersionBehaviorEnabled()) {
1586      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1587        builder.setColumnFamily(
1588          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1589      }
1590    }
1591    if (splitRows != null) {
1592      getAdmin().createTable(builder.build(), splitRows);
1593    } else {
1594      getAdmin().createTable(builder.build());
1595    }
1596    // HBaseAdmin only waits for regions to appear in hbase:meta
1597    // we should wait until they are assigned
1598    waitUntilAllRegionsAssigned(htd.getTableName());
1599    return getConnection().getTable(htd.getTableName());
1600  }
1601
1602  /**
1603   * Create a table.
1604   * @param tableName    the table name
1605   * @param families     the families
1606   * @param splitKeys    the split keys
1607   * @param replicaCount the replica count
1608   * @param c            Configuration to use
1609   * @return A Table instance for the created table.
1610   */
1611  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1612    int replicaCount, final Configuration c) throws IOException {
1613    TableDescriptor htd =
1614      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1615    return createTable(htd, families, splitKeys, c);
1616  }
1617
1618  /**
1619   * Create a table.
1620   * @return A Table instance for the created table.
1621   */
1622  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1623    return createTable(tableName, new byte[][] { family }, numVersions);
1624  }
1625
1626  /**
1627   * Create a table.
1628   * @return A Table instance for the created table.
1629   */
1630  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1631    throws IOException {
1632    return createTable(tableName, families, numVersions, (byte[][]) null);
1633  }
1634
1635  /**
1636   * Create a table.
1637   * @return A Table instance for the created table.
1638   */
1639  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1640    byte[][] splitKeys) throws IOException {
1641    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1642    for (byte[] family : families) {
1643      ColumnFamilyDescriptorBuilder cfBuilder =
1644        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1645      if (isNewVersionBehaviorEnabled()) {
1646        cfBuilder.setNewVersionBehavior(true);
1647      }
1648      builder.setColumnFamily(cfBuilder.build());
1649    }
1650    if (splitKeys != null) {
1651      getAdmin().createTable(builder.build(), splitKeys);
1652    } else {
1653      getAdmin().createTable(builder.build());
1654    }
1655    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1656    // assigned
1657    waitUntilAllRegionsAssigned(tableName);
1658    return getConnection().getTable(tableName);
1659  }
1660
1661  /**
1662   * Create a table with multiple regions.
1663   * @return A Table instance for the created table.
1664   */
1665  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1666    throws IOException {
1667    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1668  }
1669
1670  /**
1671   * Create a table.
1672   * @return A Table instance for the created table.
1673   */
1674  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1675    throws IOException {
1676    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1677    for (byte[] family : families) {
1678      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1679        .setMaxVersions(numVersions).setBlocksize(blockSize);
1680      if (isNewVersionBehaviorEnabled()) {
1681        cfBuilder.setNewVersionBehavior(true);
1682      }
1683      builder.setColumnFamily(cfBuilder.build());
1684    }
1685    getAdmin().createTable(builder.build());
1686    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1687    // assigned
1688    waitUntilAllRegionsAssigned(tableName);
1689    return getConnection().getTable(tableName);
1690  }
1691
1692  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1693    String cpName) throws IOException {
1694    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1695    for (byte[] family : families) {
1696      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1697        .setMaxVersions(numVersions).setBlocksize(blockSize);
1698      if (isNewVersionBehaviorEnabled()) {
1699        cfBuilder.setNewVersionBehavior(true);
1700      }
1701      builder.setColumnFamily(cfBuilder.build());
1702    }
1703    if (cpName != null) {
1704      builder.setCoprocessor(cpName);
1705    }
1706    getAdmin().createTable(builder.build());
1707    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1708    // assigned
1709    waitUntilAllRegionsAssigned(tableName);
1710    return getConnection().getTable(tableName);
1711  }
1712
1713  /**
1714   * Create a table.
1715   * @return A Table instance for the created table.
1716   */
1717  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1718    throws IOException {
1719    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1720    int i = 0;
1721    for (byte[] family : families) {
1722      ColumnFamilyDescriptorBuilder cfBuilder =
1723        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1724      if (isNewVersionBehaviorEnabled()) {
1725        cfBuilder.setNewVersionBehavior(true);
1726      }
1727      builder.setColumnFamily(cfBuilder.build());
1728      i++;
1729    }
1730    getAdmin().createTable(builder.build());
1731    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1732    // assigned
1733    waitUntilAllRegionsAssigned(tableName);
1734    return getConnection().getTable(tableName);
1735  }
1736
1737  /**
1738   * Create a table.
1739   * @return A Table instance for the created table.
1740   */
1741  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1742    throws IOException {
1743    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1744    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1745    if (isNewVersionBehaviorEnabled()) {
1746      cfBuilder.setNewVersionBehavior(true);
1747    }
1748    builder.setColumnFamily(cfBuilder.build());
1749    getAdmin().createTable(builder.build(), splitRows);
1750    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1751    // assigned
1752    waitUntilAllRegionsAssigned(tableName);
1753    return getConnection().getTable(tableName);
1754  }
1755
1756  /**
1757   * Create a table with multiple regions.
1758   * @return A Table instance for the created table.
1759   */
1760  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1761    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1762  }
1763
1764  /**
1765   * Modify a table, synchronous.
1766   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1767   *             {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1768   * @see Admin#modifyTable(TableDescriptor)
1769   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1770   */
1771  @Deprecated
1772  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1773    throws IOException, InterruptedException {
1774    admin.modifyTable(desc);
1775  }
1776
1777  /**
1778   * Set the number of Region replicas.
1779   */
1780  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1781    throws IOException, InterruptedException {
1782    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1783      .setRegionReplication(replicaCount).build();
1784    admin.modifyTable(desc);
1785  }
1786
1787  /**
1788   * Drop an existing table
1789   * @param tableName existing table
1790   */
1791  public void deleteTable(TableName tableName) throws IOException {
1792    try {
1793      getAdmin().disableTable(tableName);
1794    } catch (TableNotEnabledException e) {
1795      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1796    }
1797    getAdmin().deleteTable(tableName);
1798  }
1799
1800  /**
1801   * Drop an existing table
1802   * @param tableName existing table
1803   */
1804  public void deleteTableIfAny(TableName tableName) throws IOException {
1805    try {
1806      deleteTable(tableName);
1807    } catch (TableNotFoundException e) {
1808      // ignore
1809    }
1810  }
1811
1812  // ==========================================================================
1813  // Canned table and table descriptor creation
1814
1815  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1816  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1817  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1818  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1819  private static final int MAXVERSIONS = 3;
1820
1821  public static final char FIRST_CHAR = 'a';
1822  public static final char LAST_CHAR = 'z';
1823  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1824  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1825
1826  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1827    return createModifyableTableDescriptor(TableName.valueOf(name),
1828      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1829      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1830  }
1831
1832  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1833    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1834    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1835    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1836      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1837        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1838        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1839      if (isNewVersionBehaviorEnabled()) {
1840        cfBuilder.setNewVersionBehavior(true);
1841      }
1842      builder.setColumnFamily(cfBuilder.build());
1843    }
1844    return builder.build();
1845  }
1846
1847  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1848    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1849    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1850    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1851      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1852        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1853        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1854      if (isNewVersionBehaviorEnabled()) {
1855        cfBuilder.setNewVersionBehavior(true);
1856      }
1857      builder.setColumnFamily(cfBuilder.build());
1858    }
1859    return builder;
1860  }
1861
1862  /**
1863   * Create a table of name <code>name</code>.
1864   * @param name Name to give table.
1865   * @return Column descriptor.
1866   */
1867  public TableDescriptor createTableDescriptor(final TableName name) {
1868    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1869      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1870  }
1871
1872  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1873    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1874  }
1875
1876  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1877    int maxVersions) {
1878    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1879    for (byte[] family : families) {
1880      ColumnFamilyDescriptorBuilder cfBuilder =
1881        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1882      if (isNewVersionBehaviorEnabled()) {
1883        cfBuilder.setNewVersionBehavior(true);
1884      }
1885      builder.setColumnFamily(cfBuilder.build());
1886    }
1887    return builder.build();
1888  }
1889
1890  /**
1891   * Create an HRegion that writes to the local tmp dirs
1892   * @param desc     a table descriptor indicating which table the region belongs to
1893   * @param startKey the start boundary of the region
1894   * @param endKey   the end boundary of the region
1895   * @return a region that writes to local dir for testing
1896   */
1897  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1898    throws IOException {
1899    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1900      .setEndKey(endKey).build();
1901    return createLocalHRegion(hri, desc);
1902  }
1903
1904  /**
1905   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1906   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1907   */
1908  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1909    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1910  }
1911
1912  /**
1913   * Create an HRegion that writes to the local tmp dirs with specified wal
1914   * @param info regioninfo
1915   * @param conf configuration
1916   * @param desc table descriptor
1917   * @param wal  wal for this region.
1918   * @return created hregion
1919   */
1920  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1921    WAL wal) throws IOException {
1922    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1923  }
1924
1925  /**
1926   * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
1927   * when done.
1928   */
1929  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1930    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1931    throws IOException {
1932    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1933      durability, wal, null, families);
1934  }
1935
1936  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1937    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1938    boolean[] compactedMemStore, byte[]... families) throws IOException {
1939    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1940    builder.setReadOnly(isReadOnly);
1941    int i = 0;
1942    for (byte[] family : families) {
1943      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1944      if (compactedMemStore != null && i < compactedMemStore.length) {
1945        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1946      } else {
1947        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1948
1949      }
1950      i++;
1951      // Set default to be three versions.
1952      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1953      builder.setColumnFamily(cfBuilder.build());
1954    }
1955    builder.setDurability(durability);
1956    RegionInfo info =
1957      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1958    return createLocalHRegion(info, conf, builder.build(), wal);
1959  }
1960
1961  //
1962  // ==========================================================================
1963
1964  /**
1965   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1966   * read.
1967   * @param tableName existing table
1968   * @return HTable to that new table
1969   */
1970  public Table deleteTableData(TableName tableName) throws IOException {
1971    Table table = getConnection().getTable(tableName);
1972    Scan scan = new Scan();
1973    ResultScanner resScan = table.getScanner(scan);
1974    for (Result res : resScan) {
1975      Delete del = new Delete(res.getRow());
1976      table.delete(del);
1977    }
1978    resScan = table.getScanner(scan);
1979    resScan.close();
1980    return table;
1981  }
1982
1983  /**
1984   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1985   * table.
1986   * @param tableName       table which must exist.
1987   * @param preserveRegions keep the existing split points
1988   * @return HTable for the new table
1989   */
1990  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1991    throws IOException {
1992    Admin admin = getAdmin();
1993    if (!admin.isTableDisabled(tableName)) {
1994      admin.disableTable(tableName);
1995    }
1996    admin.truncateTable(tableName, preserveRegions);
1997    return getConnection().getTable(tableName);
1998  }
1999
2000  /**
2001   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2002   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
2003   * preserve regions of existing table.
2004   * @param tableName table which must exist.
2005   * @return HTable for the new table
2006   */
2007  public Table truncateTable(final TableName tableName) throws IOException {
2008    return truncateTable(tableName, false);
2009  }
2010
2011  /**
2012   * Load table with rows from 'aaa' to 'zzz'.
2013   * @param t Table
2014   * @param f Family
2015   * @return Count of rows loaded.
2016   */
2017  public int loadTable(final Table t, final byte[] f) throws IOException {
2018    return loadTable(t, new byte[][] { f });
2019  }
2020
2021  /**
2022   * Load table with rows from 'aaa' to 'zzz'.
2023   * @param t Table
2024   * @param f Family
2025   * @return Count of rows loaded.
2026   */
2027  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2028    return loadTable(t, new byte[][] { f }, null, writeToWAL);
2029  }
2030
2031  /**
2032   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2033   * @param t Table
2034   * @param f Array of Families to load
2035   * @return Count of rows loaded.
2036   */
2037  public int loadTable(final Table t, final byte[][] f) throws IOException {
2038    return loadTable(t, f, null);
2039  }
2040
2041  /**
2042   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2043   * @param t     Table
2044   * @param f     Array of Families to load
2045   * @param value the values of the cells. If null is passed, the row key is used as value
2046   * @return Count of rows loaded.
2047   */
2048  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2049    return loadTable(t, f, value, true);
2050  }
2051
2052  /**
2053   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2054   * @param t     Table
2055   * @param f     Array of Families to load
2056   * @param value the values of the cells. If null is passed, the row key is used as value
2057   * @return Count of rows loaded.
2058   */
2059  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
2060    throws IOException {
2061    List<Put> puts = new ArrayList<>();
2062    for (byte[] row : HBaseTestingUtility.ROWS) {
2063      Put put = new Put(row);
2064      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2065      for (int i = 0; i < f.length; i++) {
2066        byte[] value1 = value != null ? value : row;
2067        put.addColumn(f[i], f[i], value1);
2068      }
2069      puts.add(put);
2070    }
2071    t.put(puts);
2072    return puts.size();
2073  }
2074
2075  /**
2076   * A tracker for tracking and validating table rows generated with
2077   * {@link HBaseTestingUtility#loadTable(Table, byte[])}
2078   */
2079  public static class SeenRowTracker {
2080    int dim = 'z' - 'a' + 1;
2081    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
2082    byte[] startRow;
2083    byte[] stopRow;
2084
2085    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2086      this.startRow = startRow;
2087      this.stopRow = stopRow;
2088    }
2089
2090    void reset() {
2091      for (byte[] row : ROWS) {
2092        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2093      }
2094    }
2095
2096    int i(byte b) {
2097      return b - 'a';
2098    }
2099
2100    public void addRow(byte[] row) {
2101      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2102    }
2103
2104    /**
2105     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
2106     * rows none
2107     */
2108    public void validate() {
2109      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2110        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2111          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2112            int count = seenRows[i(b1)][i(b2)][i(b3)];
2113            int expectedCount = 0;
2114            if (
2115              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
2116                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
2117            ) {
2118              expectedCount = 1;
2119            }
2120            if (count != expectedCount) {
2121              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
2122              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
2123                + "instead of " + expectedCount);
2124            }
2125          }
2126        }
2127      }
2128    }
2129  }
2130
2131  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2132    return loadRegion(r, f, false);
2133  }
2134
2135  public int loadRegion(final Region r, final byte[] f) throws IOException {
2136    return loadRegion((HRegion) r, f);
2137  }
2138
2139  /**
2140   * Load region with rows from 'aaa' to 'zzz'.
2141   * @param r     Region
2142   * @param f     Family
2143   * @param flush flush the cache if true
2144   * @return Count of rows loaded.
2145   */
2146  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
2147    byte[] k = new byte[3];
2148    int rowCount = 0;
2149    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2150      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2151        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2152          k[0] = b1;
2153          k[1] = b2;
2154          k[2] = b3;
2155          Put put = new Put(k);
2156          put.setDurability(Durability.SKIP_WAL);
2157          put.addColumn(f, null, k);
2158          if (r.getWAL() == null) {
2159            put.setDurability(Durability.SKIP_WAL);
2160          }
2161          int preRowCount = rowCount;
2162          int pause = 10;
2163          int maxPause = 1000;
2164          while (rowCount == preRowCount) {
2165            try {
2166              r.put(put);
2167              rowCount++;
2168            } catch (RegionTooBusyException e) {
2169              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2170              Threads.sleep(pause);
2171            }
2172          }
2173        }
2174      }
2175      if (flush) {
2176        r.flush(true);
2177      }
2178    }
2179    return rowCount;
2180  }
2181
2182  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2183    throws IOException {
2184    for (int i = startRow; i < endRow; i++) {
2185      byte[] data = Bytes.toBytes(String.valueOf(i));
2186      Put put = new Put(data);
2187      put.addColumn(f, null, data);
2188      t.put(put);
2189    }
2190  }
2191
2192  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2193    throws IOException {
2194    byte[] row = new byte[rowSize];
2195    for (int i = 0; i < totalRows; i++) {
2196      Bytes.random(row);
2197      Put put = new Put(row);
2198      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2199      t.put(put);
2200    }
2201  }
2202
2203  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2204    int replicaId) throws IOException {
2205    for (int i = startRow; i < endRow; i++) {
2206      String failMsg = "Failed verification of row :" + i;
2207      byte[] data = Bytes.toBytes(String.valueOf(i));
2208      Get get = new Get(data);
2209      get.setReplicaId(replicaId);
2210      get.setConsistency(Consistency.TIMELINE);
2211      Result result = table.get(get);
2212      if (!result.containsColumn(f, null)) {
2213        throw new AssertionError(failMsg);
2214      }
2215      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2216      Cell cell = result.getColumnLatestCell(f, null);
2217      if (
2218        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2219          cell.getValueLength())
2220      ) {
2221        throw new AssertionError(failMsg);
2222      }
2223    }
2224  }
2225
2226  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2227    throws IOException {
2228    verifyNumericRows((HRegion) region, f, startRow, endRow);
2229  }
2230
2231  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2232    throws IOException {
2233    verifyNumericRows(region, f, startRow, endRow, true);
2234  }
2235
2236  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2237    final boolean present) throws IOException {
2238    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2239  }
2240
2241  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2242    final boolean present) throws IOException {
2243    for (int i = startRow; i < endRow; i++) {
2244      String failMsg = "Failed verification of row :" + i;
2245      byte[] data = Bytes.toBytes(String.valueOf(i));
2246      Result result = region.get(new Get(data));
2247
2248      boolean hasResult = result != null && !result.isEmpty();
2249      if (present != hasResult) {
2250        throw new AssertionError(
2251          failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">");
2252      }
2253      if (!present) continue;
2254
2255      if (!result.containsColumn(f, null)) {
2256        throw new AssertionError(failMsg);
2257      }
2258      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2259      Cell cell = result.getColumnLatestCell(f, null);
2260      if (
2261        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2262          cell.getValueLength())
2263      ) {
2264        throw new AssertionError(failMsg);
2265      }
2266    }
2267  }
2268
2269  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2270    throws IOException {
2271    for (int i = startRow; i < endRow; i++) {
2272      byte[] data = Bytes.toBytes(String.valueOf(i));
2273      Delete delete = new Delete(data);
2274      delete.addFamily(f);
2275      t.delete(delete);
2276    }
2277  }
2278
2279  /**
2280   * Return the number of rows in the given table.
2281   * @param table to count rows
2282   * @return count of rows
2283   */
2284  public static int countRows(final Table table) throws IOException {
2285    return countRows(table, new Scan());
2286  }
2287
2288  public static int countRows(final Table table, final Scan scan) throws IOException {
2289    try (ResultScanner results = table.getScanner(scan)) {
2290      int count = 0;
2291      while (results.next() != null) {
2292        count++;
2293      }
2294      return count;
2295    }
2296  }
2297
2298  public int countRows(final Table table, final byte[]... families) throws IOException {
2299    Scan scan = new Scan();
2300    for (byte[] family : families) {
2301      scan.addFamily(family);
2302    }
2303    return countRows(table, scan);
2304  }
2305
2306  /**
2307   * Return the number of rows in the given table.
2308   */
2309  public int countRows(final TableName tableName) throws IOException {
2310    Table table = getConnection().getTable(tableName);
2311    try {
2312      return countRows(table);
2313    } finally {
2314      table.close();
2315    }
2316  }
2317
2318  public int countRows(final Region region) throws IOException {
2319    return countRows(region, new Scan());
2320  }
2321
2322  public int countRows(final Region region, final Scan scan) throws IOException {
2323    InternalScanner scanner = region.getScanner(scan);
2324    try {
2325      return countRows(scanner);
2326    } finally {
2327      scanner.close();
2328    }
2329  }
2330
2331  public int countRows(final InternalScanner scanner) throws IOException {
2332    int scannedCount = 0;
2333    List<Cell> results = new ArrayList<>();
2334    boolean hasMore = true;
2335    while (hasMore) {
2336      hasMore = scanner.next(results);
2337      scannedCount += results.size();
2338      results.clear();
2339    }
2340    return scannedCount;
2341  }
2342
2343  /**
2344   * Return an md5 digest of the entire contents of a table.
2345   */
2346  public String checksumRows(final Table table) throws Exception {
2347
2348    Scan scan = new Scan();
2349    ResultScanner results = table.getScanner(scan);
2350    MessageDigest digest = MessageDigest.getInstance("MD5");
2351    for (Result res : results) {
2352      digest.update(res.getRow());
2353    }
2354    results.close();
2355    return digest.toString();
2356  }
2357
2358  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2359  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2360  static {
2361    int i = 0;
2362    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2363      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2364        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2365          ROWS[i][0] = b1;
2366          ROWS[i][1] = b2;
2367          ROWS[i][2] = b3;
2368          i++;
2369        }
2370      }
2371    }
2372  }
2373
2374  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2375    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2376    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2377    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2378    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2379    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2380    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2381
2382  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2383    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2384    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2385    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2386    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2387    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2388    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2389
2390  /**
2391   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2392   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2393   * @return list of region info for regions added to meta
2394   */
2395  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2396    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2397    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2398    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2399    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2400    MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2401      TableState.State.ENABLED);
2402    // add custom ones
2403    for (int i = 0; i < startKeys.length; i++) {
2404      int j = (i + 1) % startKeys.length;
2405      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2406        .setEndKey(startKeys[j]).build();
2407      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2408      newRegions.add(hri);
2409    }
2410
2411    meta.close();
2412    return newRegions;
2413  }
2414
2415  /**
2416   * Create an unmanaged WAL. Be sure to close it when you're through.
2417   */
2418  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2419    throws IOException {
2420    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2421    // unless I pass along via the conf.
2422    Configuration confForWAL = new Configuration(conf);
2423    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2424    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2425  }
2426
2427  /**
2428   * Create a region with it's own WAL. Be sure to call
2429   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2430   */
2431  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2432    final Configuration conf, final TableDescriptor htd) throws IOException {
2433    return createRegionAndWAL(info, rootDir, conf, htd, true);
2434  }
2435
2436  /**
2437   * Create a region with it's own WAL. Be sure to call
2438   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2439   */
2440  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2441    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2442    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2443    region.setBlockCache(blockCache);
2444    region.initialize();
2445    return region;
2446  }
2447
2448  /**
2449   * Create a region with it's own WAL. Be sure to call
2450   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2451   */
2452  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2453    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2454    throws IOException {
2455    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2456    region.setMobFileCache(mobFileCache);
2457    region.initialize();
2458    return region;
2459  }
2460
2461  /**
2462   * Create a region with it's own WAL. Be sure to call
2463   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2464   */
2465  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2466    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2467    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2468      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2469    WAL wal = createWal(conf, rootDir, info);
2470    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2471  }
2472
2473  /**
2474   * Returns all rows from the hbase:meta table.
2475   * @throws IOException When reading the rows fails.
2476   */
2477  public List<byte[]> getMetaTableRows() throws IOException {
2478    // TODO: Redo using MetaTableAccessor class
2479    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2480    List<byte[]> rows = new ArrayList<>();
2481    ResultScanner s = t.getScanner(new Scan());
2482    for (Result result : s) {
2483      LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()));
2484      rows.add(result.getRow());
2485    }
2486    s.close();
2487    t.close();
2488    return rows;
2489  }
2490
2491  /**
2492   * Returns all rows from the hbase:meta table for a given user table
2493   * @throws IOException When reading the rows fails.
2494   */
2495  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2496    // TODO: Redo using MetaTableAccessor.
2497    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2498    List<byte[]> rows = new ArrayList<>();
2499    ResultScanner s = t.getScanner(new Scan());
2500    for (Result result : s) {
2501      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2502      if (info == null) {
2503        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2504        // TODO figure out what to do for this new hosed case.
2505        continue;
2506      }
2507
2508      if (info.getTable().equals(tableName)) {
2509        LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info);
2510        rows.add(result.getRow());
2511      }
2512    }
2513    s.close();
2514    t.close();
2515    return rows;
2516  }
2517
2518  /**
2519   * Returns all regions of the specified table
2520   * @param tableName the table name
2521   * @return all regions of the specified table
2522   * @throws IOException when getting the regions fails.
2523   */
2524  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2525    try (Admin admin = getConnection().getAdmin()) {
2526      return admin.getRegions(tableName);
2527    }
2528  }
2529
2530  /**
2531   * Find any other region server which is different from the one identified by parameter
2532   * @return another region server
2533   */
2534  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2535    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2536      if (!(rst.getRegionServer() == rs)) {
2537        return rst.getRegionServer();
2538      }
2539    }
2540    return null;
2541  }
2542
2543  /**
2544   * Tool to get the reference to the region server object that holds the region of the specified
2545   * user table.
2546   * @param tableName user table to lookup in hbase:meta
2547   * @return region server that holds it, null if the row doesn't exist
2548   */
2549  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2550    throws IOException, InterruptedException {
2551    List<RegionInfo> regions = getRegions(tableName);
2552    if (regions == null || regions.isEmpty()) {
2553      return null;
2554    }
2555    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2556
2557    byte[] firstRegionName =
2558      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2559        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2560
2561    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2562    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2563      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2564    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2565      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2566    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2567    while (retrier.shouldRetry()) {
2568      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2569      if (index != -1) {
2570        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2571      }
2572      // Came back -1. Region may not be online yet. Sleep a while.
2573      retrier.sleepUntilNextRetry();
2574    }
2575    return null;
2576  }
2577
2578  /**
2579   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2580   * @throws IOException When starting the cluster fails.
2581   */
2582  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2583    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2584    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2585      "99.0");
2586    startMiniMapReduceCluster(2);
2587    return mrCluster;
2588  }
2589
2590  /**
2591   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2592   * internal static LOG_DIR variable.
2593   */
2594  private void forceChangeTaskLogDir() {
2595    Field logDirField;
2596    try {
2597      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2598      logDirField.setAccessible(true);
2599
2600      Field modifiersField = ReflectionUtils.getModifiersField();
2601      modifiersField.setAccessible(true);
2602      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2603
2604      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2605    } catch (SecurityException e) {
2606      throw new RuntimeException(e);
2607    } catch (NoSuchFieldException e) {
2608      throw new RuntimeException(e);
2609    } catch (IllegalArgumentException e) {
2610      throw new RuntimeException(e);
2611    } catch (IllegalAccessException e) {
2612      throw new RuntimeException(e);
2613    }
2614  }
2615
2616  /**
2617   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2618   * filesystem.
2619   * @param servers The number of <code>TaskTracker</code>'s to start.
2620   * @throws IOException When starting the cluster fails.
2621   */
2622  private void startMiniMapReduceCluster(final int servers) throws IOException {
2623    if (mrCluster != null) {
2624      throw new IllegalStateException("MiniMRCluster is already running");
2625    }
2626    LOG.info("Starting mini mapreduce cluster...");
2627    setupClusterTestDir();
2628    createDirsAndSetProperties();
2629
2630    forceChangeTaskLogDir();
2631
2632    //// hadoop2 specific settings
2633    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2634    // we up the VM usable so that processes don't get killed.
2635    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2636
2637    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2638    // this avoids the problem by disabling speculative task execution in tests.
2639    conf.setBoolean("mapreduce.map.speculative", false);
2640    conf.setBoolean("mapreduce.reduce.speculative", false);
2641    ////
2642
2643    // Allow the user to override FS URI for this map-reduce cluster to use.
2644    mrCluster =
2645      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2646        1, null, null, new JobConf(this.conf));
2647    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2648    if (jobConf == null) {
2649      jobConf = mrCluster.createJobConf();
2650    }
2651    // Hadoop MiniMR overwrites this while it should not
2652    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2653    LOG.info("Mini mapreduce cluster started");
2654
2655    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2656    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2657    // necessary config properties here. YARN-129 required adding a few properties.
2658    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2659    // this for mrv2 support; mr1 ignores this
2660    conf.set("mapreduce.framework.name", "yarn");
2661    conf.setBoolean("yarn.is.minicluster", true);
2662    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2663    if (rmAddress != null) {
2664      conf.set("yarn.resourcemanager.address", rmAddress);
2665    }
2666    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2667    if (historyAddress != null) {
2668      conf.set("mapreduce.jobhistory.address", historyAddress);
2669    }
2670    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2671    if (schedulerAddress != null) {
2672      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2673    }
2674    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2675    if (mrJobHistoryWebappAddress != null) {
2676      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2677    }
2678    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2679    if (yarnRMWebappAddress != null) {
2680      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2681    }
2682  }
2683
2684  /**
2685   * Stops the previously started <code>MiniMRCluster</code>.
2686   */
2687  public void shutdownMiniMapReduceCluster() {
2688    if (mrCluster != null) {
2689      LOG.info("Stopping mini mapreduce cluster...");
2690      mrCluster.shutdown();
2691      mrCluster = null;
2692      LOG.info("Mini mapreduce cluster stopped");
2693    }
2694    // Restore configuration to point to local jobtracker
2695    conf.set("mapreduce.jobtracker.address", "local");
2696  }
2697
2698  /**
2699   * Create a stubbed out RegionServerService, mainly for getting FS.
2700   */
2701  public RegionServerServices createMockRegionServerService() throws IOException {
2702    return createMockRegionServerService((ServerName) null);
2703  }
2704
2705  /**
2706   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2707   * TestTokenAuthentication
2708   */
2709  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2710    throws IOException {
2711    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2712    rss.setFileSystem(getTestFileSystem());
2713    rss.setRpcServer(rpc);
2714    return rss;
2715  }
2716
2717  /**
2718   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2719   * TestOpenRegionHandler
2720   */
2721  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2722    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2723    rss.setFileSystem(getTestFileSystem());
2724    return rss;
2725  }
2726
2727  /**
2728   * Switches the logger for the given class to DEBUG level.
2729   * @param clazz The class for which to switch to debug logging.
2730   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2731   *             HBase only uses log4j. You should do this by your own as it you know which log
2732   *             framework you are using then set the log level to debug is very easy.
2733   */
2734  @Deprecated
2735  public void enableDebug(Class<?> clazz) {
2736    Log4jUtils.enableDebug(clazz);
2737  }
2738
2739  /**
2740   * Expire the Master's session
2741   */
2742  public void expireMasterSession() throws Exception {
2743    HMaster master = getMiniHBaseCluster().getMaster();
2744    expireSession(master.getZooKeeper(), false);
2745  }
2746
2747  /**
2748   * Expire a region server's session
2749   * @param index which RS
2750   */
2751  public void expireRegionServerSession(int index) throws Exception {
2752    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2753    expireSession(rs.getZooKeeper(), false);
2754    decrementMinRegionServerCount();
2755  }
2756
2757  private void decrementMinRegionServerCount() {
2758    // decrement the count for this.conf, for newly spwaned master
2759    // this.hbaseCluster shares this configuration too
2760    decrementMinRegionServerCount(getConfiguration());
2761
2762    // each master thread keeps a copy of configuration
2763    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2764      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2765    }
2766  }
2767
2768  private void decrementMinRegionServerCount(Configuration conf) {
2769    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2770    if (currentCount != -1) {
2771      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2772    }
2773  }
2774
2775  public void expireSession(ZKWatcher nodeZK) throws Exception {
2776    expireSession(nodeZK, false);
2777  }
2778
2779  /**
2780   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2781   * http://hbase.apache.org/book.html#trouble.zookeeper There are issues when doing this: [1]
2782   * http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html [2]
2783   * https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2784   * @param nodeZK      - the ZK watcher to expire
2785   * @param checkStatus - true to check if we can create a Table with the current configuration.
2786   */
2787  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2788    Configuration c = new Configuration(this.conf);
2789    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2790    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2791    byte[] password = zk.getSessionPasswd();
2792    long sessionID = zk.getSessionId();
2793
2794    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2795    // so we create a first watcher to be sure that the
2796    // event was sent. We expect that if our watcher receives the event
2797    // other watchers on the same machine will get is as well.
2798    // When we ask to close the connection, ZK does not close it before
2799    // we receive all the events, so don't have to capture the event, just
2800    // closing the connection should be enough.
2801    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2802      @Override
2803      public void process(WatchedEvent watchedEvent) {
2804        LOG.info("Monitor ZKW received event=" + watchedEvent);
2805      }
2806    }, sessionID, password);
2807
2808    // Making it expire
2809    ZooKeeper newZK =
2810      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2811
2812    // ensure that we have connection to the server before closing down, otherwise
2813    // the close session event will be eaten out before we start CONNECTING state
2814    long start = EnvironmentEdgeManager.currentTime();
2815    while (
2816      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2817    ) {
2818      Thread.sleep(1);
2819    }
2820    newZK.close();
2821    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2822
2823    // Now closing & waiting to be sure that the clients get it.
2824    monitor.close();
2825
2826    if (checkStatus) {
2827      getConnection().getTable(TableName.META_TABLE_NAME).close();
2828    }
2829  }
2830
2831  /**
2832   * Get the Mini HBase cluster.
2833   * @return hbase cluster
2834   * @see #getHBaseClusterInterface()
2835   */
2836  public MiniHBaseCluster getHBaseCluster() {
2837    return getMiniHBaseCluster();
2838  }
2839
2840  /**
2841   * Returns the HBaseCluster instance.
2842   * <p>
2843   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2844   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2845   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2846   * instead w/o the need to type-cast.
2847   */
2848  public HBaseCluster getHBaseClusterInterface() {
2849    // implementation note: we should rename this method as #getHBaseCluster(),
2850    // but this would require refactoring 90+ calls.
2851    return hbaseCluster;
2852  }
2853
2854  /**
2855   * Resets the connections so that the next time getConnection() is called, a new connection is
2856   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2857   * the connection is not valid anymore. TODO: There should be a more coherent way of doing this.
2858   * Unfortunately the way tests are written, not all start() stop() calls go through this class.
2859   * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult
2860   * for this wrapper class to maintain the connection state automatically. Cleaning this is a much
2861   * bigger refactor.
2862   */
2863  public void invalidateConnection() throws IOException {
2864    closeConnection();
2865    // Update the master addresses if they changed.
2866    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2867    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2868    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2869      masterConfigBefore, masterConfAfter);
2870    conf.set(HConstants.MASTER_ADDRS_KEY,
2871      getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2872  }
2873
2874  /**
2875   * Get a shared Connection to the cluster. this method is thread safe.
2876   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2877   */
2878  public Connection getConnection() throws IOException {
2879    return getAsyncConnection().toConnection();
2880  }
2881
2882  /**
2883   * Get a assigned Connection to the cluster. this method is thread safe.
2884   * @param user assigned user
2885   * @return A Connection with assigned user.
2886   */
2887  public Connection getConnection(User user) throws IOException {
2888    return getAsyncConnection(user).toConnection();
2889  }
2890
2891  /**
2892   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2893   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2894   *         of cluster.
2895   */
2896  public AsyncClusterConnection getAsyncConnection() throws IOException {
2897    try {
2898      return asyncConnection.updateAndGet(connection -> {
2899        if (connection == null) {
2900          try {
2901            User user = UserProvider.instantiate(conf).getCurrent();
2902            connection = getAsyncConnection(user);
2903          } catch (IOException ioe) {
2904            throw new UncheckedIOException("Failed to create connection", ioe);
2905          }
2906        }
2907        return connection;
2908      });
2909    } catch (UncheckedIOException exception) {
2910      throw exception.getCause();
2911    }
2912  }
2913
2914  /**
2915   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2916   * @param user assigned user
2917   * @return An AsyncClusterConnection with assigned user.
2918   */
2919  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2920    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2921  }
2922
2923  public void closeConnection() throws IOException {
2924    if (hbaseAdmin != null) {
2925      Closeables.close(hbaseAdmin, true);
2926      hbaseAdmin = null;
2927    }
2928    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2929    if (asyncConnection != null) {
2930      Closeables.close(asyncConnection, true);
2931    }
2932  }
2933
2934  /**
2935   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2936   * it has no effect, it will be closed automatically when the cluster shutdowns
2937   */
2938  public Admin getAdmin() throws IOException {
2939    if (hbaseAdmin == null) {
2940      this.hbaseAdmin = getConnection().getAdmin();
2941    }
2942    return hbaseAdmin;
2943  }
2944
2945  private Admin hbaseAdmin = null;
2946
2947  /**
2948   * Returns an {@link Hbck} instance. Needs be closed when done.
2949   */
2950  public Hbck getHbck() throws IOException {
2951    return getConnection().getHbck();
2952  }
2953
2954  /**
2955   * Unassign the named region.
2956   * @param regionName The region to unassign.
2957   */
2958  public void unassignRegion(String regionName) throws IOException {
2959    unassignRegion(Bytes.toBytes(regionName));
2960  }
2961
2962  /**
2963   * Unassign the named region.
2964   * @param regionName The region to unassign.
2965   */
2966  public void unassignRegion(byte[] regionName) throws IOException {
2967    getAdmin().unassign(regionName, true);
2968  }
2969
2970  /**
2971   * Closes the region containing the given row.
2972   * @param row   The row to find the containing region.
2973   * @param table The table to find the region.
2974   */
2975  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2976    unassignRegionByRow(Bytes.toBytes(row), table);
2977  }
2978
2979  /**
2980   * Closes the region containing the given row.
2981   * @param row   The row to find the containing region.
2982   * @param table The table to find the region.
2983   */
2984  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2985    HRegionLocation hrl = table.getRegionLocation(row);
2986    unassignRegion(hrl.getRegion().getRegionName());
2987  }
2988
2989  /**
2990   * Retrieves a splittable region randomly from tableName
2991   * @param tableName   name of table
2992   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2993   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2994   */
2995  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2996    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2997    int regCount = regions.size();
2998    Set<Integer> attempted = new HashSet<>();
2999    int idx;
3000    int attempts = 0;
3001    do {
3002      regions = getHBaseCluster().getRegions(tableName);
3003      if (regCount != regions.size()) {
3004        // if there was region movement, clear attempted Set
3005        attempted.clear();
3006      }
3007      regCount = regions.size();
3008      // There are chances that before we get the region for the table from an RS the region may
3009      // be going for CLOSE. This may be because online schema change is enabled
3010      if (regCount > 0) {
3011        idx = ThreadLocalRandom.current().nextInt(regCount);
3012        // if we have just tried this region, there is no need to try again
3013        if (attempted.contains(idx)) {
3014          continue;
3015        }
3016        HRegion region = regions.get(idx);
3017        if (region.checkSplit().isPresent()) {
3018          return region;
3019        }
3020        attempted.add(idx);
3021      }
3022      attempts++;
3023    } while (maxAttempts == -1 || attempts < maxAttempts);
3024    return null;
3025  }
3026
3027  public MiniDFSCluster getDFSCluster() {
3028    return dfsCluster;
3029  }
3030
3031  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3032    setDFSCluster(cluster, true);
3033  }
3034
3035  /**
3036   * Set the MiniDFSCluster
3037   * @param cluster     cluster to use
3038   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
3039   *                    is set.
3040   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3041   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
3042   */
3043  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3044    throws IllegalStateException, IOException {
3045    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3046      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3047    }
3048    this.dfsCluster = cluster;
3049    this.setFs();
3050  }
3051
3052  public FileSystem getTestFileSystem() throws IOException {
3053    return HFileSystem.get(conf);
3054  }
3055
3056  /**
3057   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
3058   * (30 seconds).
3059   * @param table Table to wait on.
3060   */
3061  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
3062    waitTableAvailable(table.getName(), 30000);
3063  }
3064
3065  public void waitTableAvailable(TableName table, long timeoutMillis)
3066    throws InterruptedException, IOException {
3067    waitFor(timeoutMillis, predicateTableAvailable(table));
3068  }
3069
3070  /**
3071   * Wait until all regions in a table have been assigned
3072   * @param table         Table to wait on.
3073   * @param timeoutMillis Timeout.
3074   */
3075  public void waitTableAvailable(byte[] table, long timeoutMillis)
3076    throws InterruptedException, IOException {
3077    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3078  }
3079
3080  public String explainTableAvailability(TableName tableName) throws IOException {
3081    StringBuilder msg =
3082      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
3083    if (getHBaseCluster().getMaster().isAlive()) {
3084      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3085        .getRegionStates().getRegionAssignments();
3086      final List<Pair<RegionInfo, ServerName>> metaLocations =
3087        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3088      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3089        RegionInfo hri = metaLocation.getFirst();
3090        ServerName sn = metaLocation.getSecond();
3091        if (!assignments.containsKey(hri)) {
3092          msg.append(", region ").append(hri)
3093            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
3094        } else if (sn == null) {
3095          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
3096        } else if (!sn.equals(assignments.get(hri))) {
3097          msg.append(",  region ").append(hri)
3098            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
3099            .append(" <> ").append(assignments.get(hri));
3100        }
3101      }
3102    }
3103    return msg.toString();
3104  }
3105
3106  public String explainTableState(final TableName table, TableState.State state)
3107    throws IOException {
3108    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3109    if (tableState == null) {
3110      return "TableState in META: No table state in META for table " + table
3111        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3112    } else if (!tableState.inStates(state)) {
3113      return "TableState in META: Not " + state + " state, but " + tableState;
3114    } else {
3115      return "TableState in META: OK";
3116    }
3117  }
3118
3119  public TableState findLastTableState(final TableName table) throws IOException {
3120    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3121    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3122      @Override
3123      public boolean visit(Result r) throws IOException {
3124        if (!Arrays.equals(r.getRow(), table.getName())) {
3125          return false;
3126        }
3127        TableState state = CatalogFamilyFormat.getTableState(r);
3128        if (state != null) {
3129          lastTableState.set(state);
3130        }
3131        return true;
3132      }
3133    };
3134    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
3135      Integer.MAX_VALUE, visitor);
3136    return lastTableState.get();
3137  }
3138
3139  /**
3140   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3141   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
3142   * table.
3143   * @param table the table to wait on.
3144   * @throws InterruptedException if interrupted while waiting
3145   * @throws IOException          if an IO problem is encountered
3146   */
3147  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
3148    waitTableEnabled(table, 30000);
3149  }
3150
3151  /**
3152   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3153   * have been all assigned.
3154   * @see #waitTableEnabled(TableName, long)
3155   * @param table         Table to wait on.
3156   * @param timeoutMillis Time to wait on it being marked enabled.
3157   */
3158  public void waitTableEnabled(byte[] table, long timeoutMillis)
3159    throws InterruptedException, IOException {
3160    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3161  }
3162
3163  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
3164    waitFor(timeoutMillis, predicateTableEnabled(table));
3165  }
3166
3167  /**
3168   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
3169   * after default period (30 seconds)
3170   * @param table Table to wait on.
3171   */
3172  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
3173    waitTableDisabled(table, 30000);
3174  }
3175
3176  public void waitTableDisabled(TableName table, long millisTimeout)
3177    throws InterruptedException, IOException {
3178    waitFor(millisTimeout, predicateTableDisabled(table));
3179  }
3180
3181  /**
3182   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
3183   * @param table         Table to wait on.
3184   * @param timeoutMillis Time to wait on it being marked disabled.
3185   */
3186  public void waitTableDisabled(byte[] table, long timeoutMillis)
3187    throws InterruptedException, IOException {
3188    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3189  }
3190
3191  /**
3192   * Make sure that at least the specified number of region servers are running
3193   * @param num minimum number of region servers that should be running
3194   * @return true if we started some servers
3195   */
3196  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
3197    boolean startedServer = false;
3198    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3199    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
3200      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3201      startedServer = true;
3202    }
3203
3204    return startedServer;
3205  }
3206
3207  /**
3208   * Make sure that at least the specified number of region servers are running. We don't count the
3209   * ones that are currently stopping or are stopped.
3210   * @param num minimum number of region servers that should be running
3211   * @return true if we started some servers
3212   */
3213  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
3214    boolean startedServer = ensureSomeRegionServersAvailable(num);
3215
3216    int nonStoppedServers = 0;
3217    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
3218
3219      HRegionServer hrs = rst.getRegionServer();
3220      if (hrs.isStopping() || hrs.isStopped()) {
3221        LOG.info("A region server is stopped or stopping:" + hrs);
3222      } else {
3223        nonStoppedServers++;
3224      }
3225    }
3226    for (int i = nonStoppedServers; i < num; ++i) {
3227      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3228      startedServer = true;
3229    }
3230    return startedServer;
3231  }
3232
3233  /**
3234   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
3235   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
3236   * @param c                     Initial configuration
3237   * @param differentiatingSuffix Suffix to differentiate this user from others.
3238   * @return A new configuration instance with a different user set into it.
3239   */
3240  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
3241    throws IOException {
3242    FileSystem currentfs = FileSystem.get(c);
3243    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3244      return User.getCurrent();
3245    }
3246    // Else distributed filesystem. Make a new instance per daemon. Below
3247    // code is taken from the AppendTestUtil over in hdfs.
3248    String username = User.getCurrent().getName() + differentiatingSuffix;
3249    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
3250    return user;
3251  }
3252
3253  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3254    throws IOException {
3255    NavigableSet<String> online = new TreeSet<>();
3256    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3257      try {
3258        for (RegionInfo region : ProtobufUtil
3259          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3260          online.add(region.getRegionNameAsString());
3261        }
3262      } catch (RegionServerStoppedException e) {
3263        // That's fine.
3264      }
3265    }
3266    return online;
3267  }
3268
3269  /**
3270   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
3271   * linger. Here is the exception you'll see:
3272   *
3273   * <pre>
3274   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3275   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3276   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3277   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3278   * </pre>
3279   *
3280   * @param stream A DFSClient.DFSOutputStream.
3281   */
3282  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3283    try {
3284      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3285      for (Class<?> clazz : clazzes) {
3286        String className = clazz.getSimpleName();
3287        if (className.equals("DFSOutputStream")) {
3288          if (clazz.isInstance(stream)) {
3289            Field maxRecoveryErrorCountField =
3290              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3291            maxRecoveryErrorCountField.setAccessible(true);
3292            maxRecoveryErrorCountField.setInt(stream, max);
3293            break;
3294          }
3295        }
3296      }
3297    } catch (Exception e) {
3298      LOG.info("Could not set max recovery field", e);
3299    }
3300  }
3301
3302  /**
3303   * Uses directly the assignment manager to assign the region. and waits until the specified region
3304   * has completed assignment.
3305   * @return true if the region is assigned false otherwise.
3306   */
3307  public boolean assignRegion(final RegionInfo regionInfo)
3308    throws IOException, InterruptedException {
3309    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3310    am.assign(regionInfo);
3311    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3312  }
3313
3314  /**
3315   * Move region to destination server and wait till region is completely moved and online
3316   * @param destRegion region to move
3317   * @param destServer destination server of the region
3318   */
3319  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3320    throws InterruptedException, IOException {
3321    HMaster master = getMiniHBaseCluster().getMaster();
3322    // TODO: Here we start the move. The move can take a while.
3323    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3324    while (true) {
3325      ServerName serverName =
3326        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3327      if (serverName != null && serverName.equals(destServer)) {
3328        assertRegionOnServer(destRegion, serverName, 2000);
3329        break;
3330      }
3331      Thread.sleep(10);
3332    }
3333  }
3334
3335  /**
3336   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3337   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3338   * master has been informed and updated hbase:meta with the regions deployed server.
3339   * @param tableName the table name
3340   */
3341  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3342    waitUntilAllRegionsAssigned(tableName,
3343      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3344  }
3345
3346  /**
3347   * Waith until all system table's regions get assigned
3348   */
3349  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3350    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3351  }
3352
3353  /**
3354   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3355   * timeout. This means all regions have been deployed, master has been informed and updated
3356   * hbase:meta with the regions deployed server.
3357   * @param tableName the table name
3358   * @param timeout   timeout, in milliseconds
3359   */
3360  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3361    throws IOException {
3362    if (!TableName.isMetaTableName(tableName)) {
3363      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3364        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3365          + timeout + "ms");
3366        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3367          @Override
3368          public String explainFailure() throws IOException {
3369            return explainTableAvailability(tableName);
3370          }
3371
3372          @Override
3373          public boolean evaluate() throws IOException {
3374            Scan scan = new Scan();
3375            scan.addFamily(HConstants.CATALOG_FAMILY);
3376            boolean tableFound = false;
3377            try (ResultScanner s = meta.getScanner(scan)) {
3378              for (Result r; (r = s.next()) != null;) {
3379                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3380                RegionInfo info = RegionInfo.parseFromOrNull(b);
3381                if (info != null && info.getTable().equals(tableName)) {
3382                  // Get server hosting this region from catalog family. Return false if no server
3383                  // hosting this region, or if the server hosting this region was recently killed
3384                  // (for fault tolerance testing).
3385                  tableFound = true;
3386                  byte[] server =
3387                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3388                  if (server == null) {
3389                    return false;
3390                  } else {
3391                    byte[] startCode =
3392                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3393                    ServerName serverName =
3394                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3395                        + Bytes.toLong(startCode));
3396                    if (
3397                      !getHBaseClusterInterface().isDistributedCluster()
3398                        && getHBaseCluster().isKilledRS(serverName)
3399                    ) {
3400                      return false;
3401                    }
3402                  }
3403                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3404                    return false;
3405                  }
3406                }
3407              }
3408            }
3409            if (!tableFound) {
3410              LOG.warn(
3411                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3412            }
3413            return tableFound;
3414          }
3415        });
3416      }
3417    }
3418    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3419    // check from the master state if we are using a mini cluster
3420    if (!getHBaseClusterInterface().isDistributedCluster()) {
3421      // So, all regions are in the meta table but make sure master knows of the assignments before
3422      // returning -- sometimes this can lag.
3423      HMaster master = getHBaseCluster().getMaster();
3424      final RegionStates states = master.getAssignmentManager().getRegionStates();
3425      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3426        @Override
3427        public String explainFailure() throws IOException {
3428          return explainTableAvailability(tableName);
3429        }
3430
3431        @Override
3432        public boolean evaluate() throws IOException {
3433          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3434          return hris != null && !hris.isEmpty();
3435        }
3436      });
3437    }
3438    LOG.info("All regions for table " + tableName + " assigned.");
3439  }
3440
3441  /**
3442   * Do a small get/scan against one store. This is required because store has no actual methods of
3443   * querying itself, and relies on StoreScanner.
3444   */
3445  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3446    Scan scan = new Scan(get);
3447    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3448      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3449      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3450      // readpoint 0.
3451      0);
3452
3453    List<Cell> result = new ArrayList<>();
3454    scanner.next(result);
3455    if (!result.isEmpty()) {
3456      // verify that we are on the row we want:
3457      Cell kv = result.get(0);
3458      if (!CellUtil.matchingRows(kv, get.getRow())) {
3459        result.clear();
3460      }
3461    }
3462    scanner.close();
3463    return result;
3464  }
3465
3466  /**
3467   * Create region split keys between startkey and endKey
3468   * @param numRegions the number of regions to be created. it has to be greater than 3.
3469   * @return resulting split keys
3470   */
3471  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3472    if (numRegions <= 3) {
3473      throw new AssertionError();
3474    }
3475    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3476    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3477    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3478    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3479    return result;
3480  }
3481
3482  /**
3483   * Do a small get/scan against one store. This is required because store has no actual methods of
3484   * querying itself, and relies on StoreScanner.
3485   */
3486  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3487    throws IOException {
3488    Get get = new Get(row);
3489    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3490    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3491
3492    return getFromStoreFile(store, get);
3493  }
3494
3495  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3496    final List<? extends Cell> actual) {
3497    final int eLen = expected.size();
3498    final int aLen = actual.size();
3499    final int minLen = Math.min(eLen, aLen);
3500
3501    int i;
3502    for (i = 0; i < minLen
3503      && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) {
3504    }
3505
3506    if (additionalMsg == null) {
3507      additionalMsg = "";
3508    }
3509    if (!additionalMsg.isEmpty()) {
3510      additionalMsg = ". " + additionalMsg;
3511    }
3512
3513    if (eLen != aLen || i != minLen) {
3514      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3515        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3516        + " (length " + aLen + ")" + additionalMsg);
3517    }
3518  }
3519
3520  public static <T> String safeGetAsStr(List<T> lst, int i) {
3521    if (0 <= i && i < lst.size()) {
3522      return lst.get(i).toString();
3523    } else {
3524      return "<out_of_range>";
3525    }
3526  }
3527
3528  public String getRpcConnnectionURI() throws UnknownHostException {
3529    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3530  }
3531
3532  public String getZkConnectionURI() {
3533    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3534      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3535      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3536  }
3537
3538  /**
3539   * Get the zk based cluster key for this cluster.
3540   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3541   *             connection info of a cluster. Keep here only for compatibility.
3542   * @see #getRpcConnnectionURI()
3543   * @see #getZkConnectionURI()
3544   */
3545  @Deprecated
3546  public String getClusterKey() {
3547    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3548      + ":"
3549      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3550  }
3551
3552  /** Creates a random table with the given parameters */
3553  public Table createRandomTable(TableName tableName, final Collection<String> families,
3554    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3555    final int numRowsPerFlush) throws IOException, InterruptedException {
3556
3557    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3558      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3559      + maxVersions + "\n");
3560
3561    final int numCF = families.size();
3562    final byte[][] cfBytes = new byte[numCF][];
3563    {
3564      int cfIndex = 0;
3565      for (String cf : families) {
3566        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3567      }
3568    }
3569
3570    final int actualStartKey = 0;
3571    final int actualEndKey = Integer.MAX_VALUE;
3572    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3573    final int splitStartKey = actualStartKey + keysPerRegion;
3574    final int splitEndKey = actualEndKey - keysPerRegion;
3575    final String keyFormat = "%08x";
3576    final Table table = createTable(tableName, cfBytes, maxVersions,
3577      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3578      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3579
3580    if (hbaseCluster != null) {
3581      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3582    }
3583
3584    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3585
3586    final Random rand = ThreadLocalRandom.current();
3587    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3588      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3589        final byte[] row = Bytes.toBytes(
3590          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3591
3592        Put put = new Put(row);
3593        Delete del = new Delete(row);
3594        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3595          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3596          final long ts = rand.nextInt();
3597          final byte[] qual = Bytes.toBytes("col" + iCol);
3598          if (rand.nextBoolean()) {
3599            final byte[] value =
3600              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3601                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3602            put.addColumn(cf, qual, ts, value);
3603          } else if (rand.nextDouble() < 0.8) {
3604            del.addColumn(cf, qual, ts);
3605          } else {
3606            del.addColumns(cf, qual, ts);
3607          }
3608        }
3609
3610        if (!put.isEmpty()) {
3611          mutator.mutate(put);
3612        }
3613
3614        if (!del.isEmpty()) {
3615          mutator.mutate(del);
3616        }
3617      }
3618      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3619      mutator.flush();
3620      if (hbaseCluster != null) {
3621        getMiniHBaseCluster().flushcache(table.getName());
3622      }
3623    }
3624    mutator.close();
3625
3626    return table;
3627  }
3628
3629  public static int randomFreePort() {
3630    return HBaseCommonTestingUtility.randomFreePort();
3631  }
3632
3633  public static String randomMultiCastAddress() {
3634    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3635  }
3636
3637  public static void waitForHostPort(String host, int port) throws IOException {
3638    final int maxTimeMs = 10000;
3639    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3640    IOException savedException = null;
3641    LOG.info("Waiting for server at " + host + ":" + port);
3642    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3643      try {
3644        Socket sock = new Socket(InetAddress.getByName(host), port);
3645        sock.close();
3646        savedException = null;
3647        LOG.info("Server at " + host + ":" + port + " is available");
3648        break;
3649      } catch (UnknownHostException e) {
3650        throw new IOException("Failed to look up " + host, e);
3651      } catch (IOException e) {
3652        savedException = e;
3653      }
3654      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3655    }
3656
3657    if (savedException != null) {
3658      throw savedException;
3659    }
3660  }
3661
3662  /**
3663   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3664   * continues.
3665   * @return the number of regions the table was split into
3666   */
3667  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3668    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3669    throws IOException {
3670    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3671      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3672  }
3673
3674  /**
3675   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3676   * continues.
3677   * @return the number of regions the table was split into
3678   */
3679  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3680    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3681    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3682    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3683    builder.setDurability(durability);
3684    builder.setRegionReplication(regionReplication);
3685    ColumnFamilyDescriptorBuilder cfBuilder =
3686      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3687    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3688    cfBuilder.setCompressionType(compression);
3689    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3690      numRegionsPerServer);
3691  }
3692
3693  /**
3694   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3695   * continues.
3696   * @return the number of regions the table was split into
3697   */
3698  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3699    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3700    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3701    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3702    builder.setDurability(durability);
3703    builder.setRegionReplication(regionReplication);
3704    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3705    for (int i = 0; i < columnFamilies.length; i++) {
3706      ColumnFamilyDescriptorBuilder cfBuilder =
3707        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3708      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3709      cfBuilder.setCompressionType(compression);
3710      hcds[i] = cfBuilder.build();
3711    }
3712    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3713  }
3714
3715  /**
3716   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3717   * continues.
3718   * @return the number of regions the table was split into
3719   */
3720  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3721    ColumnFamilyDescriptor hcd) throws IOException {
3722    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3723  }
3724
3725  /**
3726   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3727   * continues.
3728   * @return the number of regions the table was split into
3729   */
3730  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3731    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3732    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3733      numRegionsPerServer);
3734  }
3735
3736  /**
3737   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3738   * continues.
3739   * @return the number of regions the table was split into
3740   */
3741  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3742    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3743    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3744      numRegionsPerServer);
3745  }
3746
3747  /**
3748   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3749   * continues.
3750   * @return the number of regions the table was split into
3751   */
3752  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3753    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3754    throws IOException {
3755    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3756    for (ColumnFamilyDescriptor cd : cds) {
3757      if (!td.hasColumnFamily(cd.getName())) {
3758        builder.setColumnFamily(cd);
3759      }
3760    }
3761    td = builder.build();
3762    int totalNumberOfRegions = 0;
3763    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3764    Admin admin = unmanagedConnection.getAdmin();
3765
3766    try {
3767      // create a table a pre-splits regions.
3768      // The number of splits is set as:
3769      // region servers * regions per region server).
3770      int numberOfServers = admin.getRegionServers().size();
3771      if (numberOfServers == 0) {
3772        throw new IllegalStateException("No live regionservers");
3773      }
3774
3775      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3776      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3777        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3778        + numRegionsPerServer + ")");
3779
3780      byte[][] splits = splitter.split(totalNumberOfRegions);
3781
3782      admin.createTable(td, splits);
3783    } catch (MasterNotRunningException e) {
3784      LOG.error("Master not running", e);
3785      throw new IOException(e);
3786    } catch (TableExistsException e) {
3787      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3788    } finally {
3789      admin.close();
3790      unmanagedConnection.close();
3791    }
3792    return totalNumberOfRegions;
3793  }
3794
3795  public static int getMetaRSPort(Connection connection) throws IOException {
3796    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3797      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3798    }
3799  }
3800
3801  /**
3802   * Due to async racing issue, a region may not be in the online region list of a region server
3803   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3804   */
3805  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3806    final long timeout) throws IOException, InterruptedException {
3807    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3808    while (true) {
3809      List<RegionInfo> regions = getAdmin().getRegions(server);
3810      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3811      long now = EnvironmentEdgeManager.currentTime();
3812      if (now > timeoutTime) break;
3813      Thread.sleep(10);
3814    }
3815    throw new AssertionError(
3816      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3817  }
3818
3819  /**
3820   * Check to make sure the region is open on the specified region server, but not on any other one.
3821   */
3822  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3823    final long timeout) throws IOException, InterruptedException {
3824    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3825    while (true) {
3826      List<RegionInfo> regions = getAdmin().getRegions(server);
3827      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3828        List<JVMClusterUtil.RegionServerThread> rsThreads =
3829          getHBaseCluster().getLiveRegionServerThreads();
3830        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3831          HRegionServer rs = rsThread.getRegionServer();
3832          if (server.equals(rs.getServerName())) {
3833            continue;
3834          }
3835          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3836          for (HRegion r : hrs) {
3837            if (r.getRegionInfo().getRegionId() == hri.getRegionId()) {
3838              throw new AssertionError("Region should not be double assigned");
3839            }
3840          }
3841        }
3842        return; // good, we are happy
3843      }
3844      long now = EnvironmentEdgeManager.currentTime();
3845      if (now > timeoutTime) break;
3846      Thread.sleep(10);
3847    }
3848    throw new AssertionError(
3849      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3850  }
3851
3852  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3853    TableDescriptor td =
3854      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3855    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3856    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3857  }
3858
3859  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3860    BlockCache blockCache) throws IOException {
3861    TableDescriptor td =
3862      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3863    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3864    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3865  }
3866
3867  public static void setFileSystemURI(String fsURI) {
3868    FS_URI = fsURI;
3869  }
3870
3871  /**
3872   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3873   */
3874  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3875    return new ExplainingPredicate<IOException>() {
3876      @Override
3877      public String explainFailure() throws IOException {
3878        final RegionStates regionStates =
3879          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3880        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3881      }
3882
3883      @Override
3884      public boolean evaluate() throws IOException {
3885        HMaster master = getMiniHBaseCluster().getMaster();
3886        if (master == null) return false;
3887        AssignmentManager am = master.getAssignmentManager();
3888        if (am == null) return false;
3889        return !am.hasRegionsInTransition();
3890      }
3891    };
3892  }
3893
3894  /**
3895   * Returns a {@link Predicate} for checking that table is enabled
3896   */
3897  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3898    return new ExplainingPredicate<IOException>() {
3899      @Override
3900      public String explainFailure() throws IOException {
3901        return explainTableState(tableName, TableState.State.ENABLED);
3902      }
3903
3904      @Override
3905      public boolean evaluate() throws IOException {
3906        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3907      }
3908    };
3909  }
3910
3911  /**
3912   * Returns a {@link Predicate} for checking that table is enabled
3913   */
3914  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3915    return new ExplainingPredicate<IOException>() {
3916      @Override
3917      public String explainFailure() throws IOException {
3918        return explainTableState(tableName, TableState.State.DISABLED);
3919      }
3920
3921      @Override
3922      public boolean evaluate() throws IOException {
3923        return getAdmin().isTableDisabled(tableName);
3924      }
3925    };
3926  }
3927
3928  /**
3929   * Returns a {@link Predicate} for checking that table is enabled
3930   */
3931  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3932    return new ExplainingPredicate<IOException>() {
3933      @Override
3934      public String explainFailure() throws IOException {
3935        return explainTableAvailability(tableName);
3936      }
3937
3938      @Override
3939      public boolean evaluate() throws IOException {
3940        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3941        if (tableAvailable) {
3942          try (Table table = getConnection().getTable(tableName)) {
3943            TableDescriptor htd = table.getDescriptor();
3944            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3945              .getAllRegionLocations()) {
3946              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3947                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3948                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3949              for (byte[] family : htd.getColumnFamilyNames()) {
3950                scan.addFamily(family);
3951              }
3952              try (ResultScanner scanner = table.getScanner(scan)) {
3953                scanner.next();
3954              }
3955            }
3956          }
3957        }
3958        return tableAvailable;
3959      }
3960    };
3961  }
3962
3963  /**
3964   * Wait until no regions in transition.
3965   * @param timeout How long to wait.
3966   */
3967  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3968    waitFor(timeout, predicateNoRegionsInTransition());
3969  }
3970
3971  /**
3972   * Wait until no regions in transition. (time limit 15min)
3973   */
3974  public void waitUntilNoRegionsInTransition() throws IOException {
3975    waitUntilNoRegionsInTransition(15 * 60000);
3976  }
3977
3978  /**
3979   * Wait until labels is ready in VisibilityLabelsCache.
3980   */
3981  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3982    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3983    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3984
3985      @Override
3986      public boolean evaluate() {
3987        for (String label : labels) {
3988          if (labelsCache.getLabelOrdinal(label) == 0) {
3989            return false;
3990          }
3991        }
3992        return true;
3993      }
3994
3995      @Override
3996      public String explainFailure() {
3997        for (String label : labels) {
3998          if (labelsCache.getLabelOrdinal(label) == 0) {
3999            return label + " is not available yet";
4000          }
4001        }
4002        return "";
4003      }
4004    });
4005  }
4006
4007  /**
4008   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4009   * available.
4010   * @return the list of column descriptors
4011   */
4012  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4013    return generateColumnDescriptors("");
4014  }
4015
4016  /**
4017   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4018   * available.
4019   * @param prefix family names prefix
4020   * @return the list of column descriptors
4021   */
4022  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4023    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4024    long familyId = 0;
4025    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
4026      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
4027        for (BloomType bloomType : BloomType.values()) {
4028          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4029          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4030            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4031          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4032          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4033          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4034          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4035          familyId++;
4036        }
4037      }
4038    }
4039    return columnFamilyDescriptors;
4040  }
4041
4042  /**
4043   * Get supported compression algorithms.
4044   * @return supported compression algorithms.
4045   */
4046  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4047    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4048    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4049    for (String algoName : allAlgos) {
4050      try {
4051        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4052        algo.getCompressor();
4053        supportedAlgos.add(algo);
4054      } catch (Throwable t) {
4055        // this algo is not available
4056      }
4057    }
4058    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4059  }
4060
4061  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4062    Scan scan = new Scan().withStartRow(row);
4063    scan.setReadType(ReadType.PREAD);
4064    scan.setCaching(1);
4065    scan.setReversed(true);
4066    scan.addFamily(family);
4067    try (RegionScanner scanner = r.getScanner(scan)) {
4068      List<Cell> cells = new ArrayList<>(1);
4069      scanner.next(cells);
4070      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4071        return null;
4072      }
4073      return Result.create(cells);
4074    }
4075  }
4076
4077  private boolean isTargetTable(final byte[] inRow, Cell c) {
4078    String inputRowString = Bytes.toString(inRow);
4079    int i = inputRowString.indexOf(HConstants.DELIMITER);
4080    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4081    int o = outputRowString.indexOf(HConstants.DELIMITER);
4082    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4083  }
4084
4085  /**
4086   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
4087   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
4088   * kerby KDC server and utility for using it,
4089   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4090   * less baggage. It came in in HBASE-5291.
4091   */
4092  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4093    Properties conf = MiniKdc.createConf();
4094    conf.put(MiniKdc.DEBUG, true);
4095    MiniKdc kdc = null;
4096    File dir = null;
4097    // There is time lag between selecting a port and trying to bind with it. It's possible that
4098    // another service captures the port in between which'll result in BindException.
4099    boolean bindException;
4100    int numTries = 0;
4101    do {
4102      try {
4103        bindException = false;
4104        dir = new File(getDataTestDir("kdc").toUri().getPath());
4105        kdc = new MiniKdc(conf, dir);
4106        kdc.start();
4107      } catch (BindException e) {
4108        FileUtils.deleteDirectory(dir); // clean directory
4109        numTries++;
4110        if (numTries == 3) {
4111          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4112          throw e;
4113        }
4114        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4115        bindException = true;
4116      }
4117    } while (bindException);
4118    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4119    return kdc;
4120  }
4121
4122  public int getNumHFiles(final TableName tableName, final byte[] family) {
4123    int numHFiles = 0;
4124    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4125      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
4126    }
4127    return numHFiles;
4128  }
4129
4130  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4131    final byte[] family) {
4132    int numHFiles = 0;
4133    for (Region region : rs.getRegions(tableName)) {
4134      numHFiles += region.getStore(family).getStorefilesCount();
4135    }
4136    return numHFiles;
4137  }
4138
4139  private void assertEquals(String message, int expected, int actual) {
4140    if (expected == actual) {
4141      return;
4142    }
4143    String formatted = "";
4144    if (message != null && !"".equals(message)) {
4145      formatted = message + " ";
4146    }
4147    throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">");
4148  }
4149
4150  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4151    if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) {
4152      throw new AssertionError();
4153    }
4154    assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode());
4155    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4156    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4157    assertEquals("", ltdFamilies.size(), rtdFamilies.size());
4158    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
4159        it2 = rtdFamilies.iterator(); it.hasNext();) {
4160      assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4161    }
4162  }
4163
4164  /**
4165   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4166   * invocations.
4167   */
4168  public static void await(final long sleepMillis, final BooleanSupplier condition)
4169    throws InterruptedException {
4170    try {
4171      while (!condition.getAsBoolean()) {
4172        Thread.sleep(sleepMillis);
4173      }
4174    } catch (RuntimeException e) {
4175      if (e.getCause() instanceof AssertionError) {
4176        throw (AssertionError) e.getCause();
4177      }
4178      throw e;
4179    }
4180  }
4181}