001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.io.UncheckedIOException;
024import java.lang.reflect.Field;
025import java.lang.reflect.Modifier;
026import java.net.BindException;
027import java.net.DatagramSocket;
028import java.net.InetAddress;
029import java.net.ServerSocket;
030import java.net.Socket;
031import java.net.UnknownHostException;
032import java.nio.charset.StandardCharsets;
033import java.security.MessageDigest;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Properties;
044import java.util.Random;
045import java.util.Set;
046import java.util.TreeSet;
047import java.util.concurrent.ThreadLocalRandom;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicReference;
050import java.util.function.BooleanSupplier;
051import org.apache.commons.io.FileUtils;
052import org.apache.commons.lang3.RandomStringUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
057import org.apache.hadoop.hbase.Waiter.Predicate;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.AsyncClusterConnection;
060import org.apache.hadoop.hbase.client.BufferedMutator;
061import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
064import org.apache.hadoop.hbase.client.Connection;
065import org.apache.hadoop.hbase.client.ConnectionFactory;
066import org.apache.hadoop.hbase.client.Consistency;
067import org.apache.hadoop.hbase.client.Delete;
068import org.apache.hadoop.hbase.client.Durability;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Hbck;
071import org.apache.hadoop.hbase.client.MasterRegistry;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.client.RegionLocator;
076import org.apache.hadoop.hbase.client.Result;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.Scan.ReadType;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.client.TableState;
084import org.apache.hadoop.hbase.fs.HFileSystem;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.BlockCache;
089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
090import org.apache.hadoop.hbase.io.hfile.HFile;
091import org.apache.hadoop.hbase.ipc.RpcServerInterface;
092import org.apache.hadoop.hbase.keymeta.KeymetaAdminClient;
093import org.apache.hadoop.hbase.logging.Log4jUtils;
094import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
095import org.apache.hadoop.hbase.master.HMaster;
096import org.apache.hadoop.hbase.master.RegionState;
097import org.apache.hadoop.hbase.master.ServerManager;
098import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
099import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
100import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
101import org.apache.hadoop.hbase.master.assignment.RegionStates;
102import org.apache.hadoop.hbase.mob.MobFileCache;
103import org.apache.hadoop.hbase.regionserver.BloomType;
104import org.apache.hadoop.hbase.regionserver.ChunkCreator;
105import org.apache.hadoop.hbase.regionserver.HRegion;
106import org.apache.hadoop.hbase.regionserver.HRegionServer;
107import org.apache.hadoop.hbase.regionserver.HStore;
108import org.apache.hadoop.hbase.regionserver.InternalScanner;
109import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
110import org.apache.hadoop.hbase.regionserver.Region;
111import org.apache.hadoop.hbase.regionserver.RegionScanner;
112import org.apache.hadoop.hbase.regionserver.RegionServerServices;
113import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
114import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
115import org.apache.hadoop.hbase.security.User;
116import org.apache.hadoop.hbase.security.UserProvider;
117import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
118import org.apache.hadoop.hbase.util.Bytes;
119import org.apache.hadoop.hbase.util.CommonFSUtils;
120import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
121import org.apache.hadoop.hbase.util.FSUtils;
122import org.apache.hadoop.hbase.util.JVMClusterUtil;
123import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
124import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
125import org.apache.hadoop.hbase.util.Pair;
126import org.apache.hadoop.hbase.util.ReflectionUtils;
127import org.apache.hadoop.hbase.util.RegionSplitter;
128import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
129import org.apache.hadoop.hbase.util.RetryCounter;
130import org.apache.hadoop.hbase.util.Threads;
131import org.apache.hadoop.hbase.wal.WAL;
132import org.apache.hadoop.hbase.wal.WALFactory;
133import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
134import org.apache.hadoop.hbase.zookeeper.ZKConfig;
135import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
136import org.apache.hadoop.hdfs.DFSClient;
137import org.apache.hadoop.hdfs.DFSConfigKeys;
138import org.apache.hadoop.hdfs.DistributedFileSystem;
139import org.apache.hadoop.hdfs.MiniDFSCluster;
140import org.apache.hadoop.hdfs.server.datanode.DataNode;
141import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
142import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
143import org.apache.hadoop.mapred.JobConf;
144import org.apache.hadoop.mapred.MiniMRCluster;
145import org.apache.hadoop.mapred.TaskLog;
146import org.apache.hadoop.minikdc.MiniKdc;
147import org.apache.yetus.audience.InterfaceAudience;
148import org.apache.zookeeper.WatchedEvent;
149import org.apache.zookeeper.ZooKeeper;
150import org.apache.zookeeper.ZooKeeper.States;
151
152import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
153
154import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
155
156/**
157 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
158 * functionality. Create an instance and keep it around testing HBase. This class is meant to be
159 * your one-stop shop for anything you might need testing. Manages one cluster at a time only.
160 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type
161 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j
162 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set
163 * logging levels. In the configuration properties, default values for master-info-port and
164 * region-server-port are overridden such that a random port will be assigned (thus avoiding port
165 * contention if another local HBase instance is already running).
166 * <p>
167 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
168 * setting it to true.
169 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
170 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
171 */
172@InterfaceAudience.Public
173@Deprecated
174public class HBaseTestingUtility extends HBaseZKTestingUtility {
175
176  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
177  /**
178   * The default number of regions per regionserver when creating a pre-split table.
179   */
180  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
181
182  private MiniDFSCluster dfsCluster = null;
183  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
184
185  private volatile HBaseCluster hbaseCluster = null;
186  private MiniMRCluster mrCluster = null;
187
188  /** If there is a mini cluster running for this testing utility instance. */
189  private volatile boolean miniClusterRunning;
190
191  private String hadoopLogDir;
192
193  /**
194   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
195   */
196  private Path dataTestDirOnTestFS = null;
197
198  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
199
200  /** Filesystem URI used for map-reduce mini-cluster setup */
201  private static String FS_URI;
202
203  /** This is for unit tests parameterized with a single boolean. */
204  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
205
206  private Admin hbaseAdmin = null;
207
208  /**
209   * Checks to see if a specific port is available.
210   * @param port the port number to check for availability
211   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
212   */
213  public static boolean available(int port) {
214    ServerSocket ss = null;
215    DatagramSocket ds = null;
216    try {
217      ss = new ServerSocket(port);
218      ss.setReuseAddress(true);
219      ds = new DatagramSocket(port);
220      ds.setReuseAddress(true);
221      return true;
222    } catch (IOException e) {
223      // Do nothing
224    } finally {
225      if (ds != null) {
226        ds.close();
227      }
228
229      if (ss != null) {
230        try {
231          ss.close();
232        } catch (IOException e) {
233          /* should not be thrown */
234        }
235      }
236    }
237
238    return false;
239  }
240
241  /**
242   * Create all combinations of Bloom filters and compression algorithms for testing.
243   */
244  private static List<Object[]> bloomAndCompressionCombinations() {
245    List<Object[]> configurations = new ArrayList<>();
246    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
247      for (BloomType bloomType : BloomType.values()) {
248        configurations.add(new Object[] { comprAlgo, bloomType });
249      }
250    }
251    return Collections.unmodifiableList(configurations);
252  }
253
254  /**
255   * Create combination of memstoreTS and tags
256   */
257  private static List<Object[]> memStoreTSAndTagsCombination() {
258    List<Object[]> configurations = new ArrayList<>();
259    configurations.add(new Object[] { false, false });
260    configurations.add(new Object[] { false, true });
261    configurations.add(new Object[] { true, false });
262    configurations.add(new Object[] { true, true });
263    return Collections.unmodifiableList(configurations);
264  }
265
266  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
267    List<Object[]> configurations = new ArrayList<>();
268    configurations.add(new Object[] { false, false, true });
269    configurations.add(new Object[] { false, false, false });
270    configurations.add(new Object[] { false, true, true });
271    configurations.add(new Object[] { false, true, false });
272    configurations.add(new Object[] { true, false, true });
273    configurations.add(new Object[] { true, false, false });
274    configurations.add(new Object[] { true, true, true });
275    configurations.add(new Object[] { true, true, false });
276    return Collections.unmodifiableList(configurations);
277  }
278
279  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
280    bloomAndCompressionCombinations();
281
282  /**
283   * <p>
284   * Create an HBaseTestingUtility using a default configuration.
285   * <p>
286   * Initially, all tmp files are written to a local test data directory. Once
287   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
288   * data will be written to the DFS directory instead.
289   */
290  public HBaseTestingUtility() {
291    this(HBaseConfiguration.create());
292  }
293
294  /**
295   * <p>
296   * Create an HBaseTestingUtility using a given configuration.
297   * <p>
298   * Initially, all tmp files are written to a local test data directory. Once
299   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
300   * data will be written to the DFS directory instead.
301   * @param conf The configuration to use for further operations
302   */
303  public HBaseTestingUtility(Configuration conf) {
304    super(conf);
305
306    // a hbase checksum verification failure will cause unit tests to fail
307    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
308
309    // Save this for when setting default file:// breaks things
310    if (this.conf.get("fs.defaultFS") != null) {
311      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
312    }
313    if (this.conf.get(HConstants.HBASE_DIR) != null) {
314      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
315    }
316    // Every cluster is a local cluster until we start DFS
317    // Note that conf could be null, but this.conf will not be
318    String dataTestDir = getDataTestDir().toString();
319    this.conf.set("fs.defaultFS", "file:///");
320    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
321    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
322    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
323    // If the value for random ports isn't set set it to true, thus making
324    // tests opt-out for random port assignment
325    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
326      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
327  }
328
329  /**
330   * Close both the region {@code r} and it's underlying WAL. For use in tests.
331   */
332  public static void closeRegionAndWAL(final Region r) throws IOException {
333    closeRegionAndWAL((HRegion) r);
334  }
335
336  /**
337   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
338   */
339  public static void closeRegionAndWAL(final HRegion r) throws IOException {
340    if (r == null) return;
341    r.close();
342    if (r.getWAL() == null) return;
343    r.getWAL().close();
344  }
345
346  /**
347   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
348   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
349   * by the Configuration. If say, a Connection was being used against a cluster that had been
350   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
351   * Rather than use the return direct, its usually best to make a copy and use that. Do
352   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
353   * @return Instance of Configuration.
354   */
355  @Override
356  public Configuration getConfiguration() {
357    return super.getConfiguration();
358  }
359
360  public void setHBaseCluster(HBaseCluster hbaseCluster) {
361    this.hbaseCluster = hbaseCluster;
362  }
363
364  /**
365   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
366   * have many concurrent tests running if we need to. Moding a System property is not the way to do
367   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
368   * not anything can do about it at moment; single instance only is how the minidfscluster works.
369   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
370   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
371   * (We do not create them!).
372   * @return The calculated data test build directory, if newly-created.
373   */
374  @Override
375  protected Path setupDataTestDir() {
376    Path testPath = super.setupDataTestDir();
377    if (null == testPath) {
378      return null;
379    }
380
381    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
382
383    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
384    // we want our own value to ensure uniqueness on the same machine
385    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
386
387    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
388    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
389    return testPath;
390  }
391
392  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
393
394    String sysValue = System.getProperty(propertyName);
395
396    if (sysValue != null) {
397      // There is already a value set. So we do nothing but hope
398      // that there will be no conflicts
399      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
400        + " so I do NOT create it in " + parent);
401      String confValue = conf.get(propertyName);
402      if (confValue != null && !confValue.endsWith(sysValue)) {
403        LOG.warn(propertyName + " property value differs in configuration and system: "
404          + "Configuration=" + confValue + " while System=" + sysValue
405          + " Erasing configuration value by system value.");
406      }
407      conf.set(propertyName, sysValue);
408    } else {
409      // Ok, it's not set, so we create it as a subdirectory
410      createSubDir(propertyName, parent, subDirName);
411      System.setProperty(propertyName, conf.get(propertyName));
412    }
413  }
414
415  /**
416   * @return Where to write test data on the test filesystem; Returns working directory for the test
417   *         filesystem by default
418   * @see #setupDataTestDirOnTestFS()
419   * @see #getTestFileSystem()
420   */
421  private Path getBaseTestDirOnTestFS() throws IOException {
422    FileSystem fs = getTestFileSystem();
423    return new Path(fs.getWorkingDirectory(), "test-data");
424  }
425
426  /**
427   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
428   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
429   * on it.
430   * @return a unique path in the test filesystem
431   */
432  public Path getDataTestDirOnTestFS() throws IOException {
433    if (dataTestDirOnTestFS == null) {
434      setupDataTestDirOnTestFS();
435    }
436
437    return dataTestDirOnTestFS;
438  }
439
440  /**
441   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
442   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
443   * on it.
444   * @return a unique path in the test filesystem
445   * @param subdirName name of the subdir to create under the base test dir
446   */
447  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
448    return new Path(getDataTestDirOnTestFS(), subdirName);
449  }
450
451  /**
452   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
453   * setup.
454   */
455  private void setupDataTestDirOnTestFS() throws IOException {
456    if (dataTestDirOnTestFS != null) {
457      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
458      return;
459    }
460    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
461  }
462
463  /**
464   * Sets up a new path in test filesystem to be used by tests.
465   */
466  private Path getNewDataTestDirOnTestFS() throws IOException {
467    // The file system can be either local, mini dfs, or if the configuration
468    // is supplied externally, it can be an external cluster FS. If it is a local
469    // file system, the tests should use getBaseTestDir, otherwise, we can use
470    // the working directory, and create a unique sub dir there
471    FileSystem fs = getTestFileSystem();
472    Path newDataTestDir;
473    String randomStr = getRandomUUID().toString();
474    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
475      newDataTestDir = new Path(getDataTestDir(), randomStr);
476      File dataTestDir = new File(newDataTestDir.toString());
477      if (deleteOnExit()) dataTestDir.deleteOnExit();
478    } else {
479      Path base = getBaseTestDirOnTestFS();
480      newDataTestDir = new Path(base, randomStr);
481      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
482    }
483    return newDataTestDir;
484  }
485
486  /**
487   * Cleans the test data directory on the test filesystem.
488   * @return True if we removed the test dirs
489   */
490  public boolean cleanupDataTestDirOnTestFS() throws IOException {
491    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
492    if (ret) dataTestDirOnTestFS = null;
493    return ret;
494  }
495
496  /**
497   * Cleans a subdirectory under the test data directory on the test filesystem.
498   * @return True if we removed child
499   */
500  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
501    Path cpath = getDataTestDirOnTestFS(subdirName);
502    return getTestFileSystem().delete(cpath, true);
503  }
504
505  // Workaround to avoid IllegalThreadStateException
506  // See HBASE-27148 for more details
507  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
508
509    private volatile boolean stopped = false;
510
511    private final MiniDFSCluster cluster;
512
513    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
514      super("FsDatasetAsyncDiskServiceFixer");
515      setDaemon(true);
516      this.cluster = cluster;
517    }
518
519    @Override
520    public void run() {
521      while (!stopped) {
522        try {
523          Thread.sleep(30000);
524        } catch (InterruptedException e) {
525          Thread.currentThread().interrupt();
526          continue;
527        }
528        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
529        // timeout of the thread pool executor is 60 seconds by default.
530        try {
531          for (DataNode dn : cluster.getDataNodes()) {
532            FsDatasetSpi<?> dataset = dn.getFSDataset();
533            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
534            service.setAccessible(true);
535            Object asyncDiskService = service.get(dataset);
536            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
537            group.setAccessible(true);
538            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
539            if (threadGroup.isDaemon()) {
540              threadGroup.setDaemon(false);
541            }
542          }
543        } catch (NoSuchFieldException e) {
544          LOG.debug("NoSuchFieldException: " + e.getMessage()
545            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
546            + "See HBASE-27595 for details.");
547        } catch (Exception e) {
548          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
549        }
550      }
551    }
552
553    void shutdown() {
554      stopped = true;
555      interrupt();
556    }
557  }
558
559  /**
560   * Start a minidfscluster.
561   * @param servers How many DNs to start.
562   * @see #shutdownMiniDFSCluster()
563   * @return The mini dfs cluster created.
564   */
565  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
566    return startMiniDFSCluster(servers, null);
567  }
568
569  /**
570   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
571   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
572   * instances of the datanodes will have the same host name.
573   * @param hosts hostnames DNs to run on.
574   * @see #shutdownMiniDFSCluster()
575   * @return The mini dfs cluster created.
576   */
577  public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception {
578    if (hosts != null && hosts.length != 0) {
579      return startMiniDFSCluster(hosts.length, hosts);
580    } else {
581      return startMiniDFSCluster(1, null);
582    }
583  }
584
585  /**
586   * Start a minidfscluster. Can only create one.
587   * @param servers How many DNs to start.
588   * @param hosts   hostnames DNs to run on.
589   * @see #shutdownMiniDFSCluster()
590   * @return The mini dfs cluster created.
591   */
592  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception {
593    return startMiniDFSCluster(servers, null, hosts);
594  }
595
596  private void setFs() throws IOException {
597    if (this.dfsCluster == null) {
598      LOG.info("Skipping setting fs because dfsCluster is null");
599      return;
600    }
601    FileSystem fs = this.dfsCluster.getFileSystem();
602    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
603
604    // re-enable this check with dfs
605    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
606  }
607
608  public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
609    throws Exception {
610    createDirsAndSetProperties();
611    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
612
613    this.dfsCluster =
614      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
615    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
616    this.dfsClusterFixer.start();
617    // Set this just-started cluster as our filesystem.
618    setFs();
619
620    // Wait for the cluster to be totally up
621    this.dfsCluster.waitClusterUp();
622
623    // reset the test directory for test file system
624    dataTestDirOnTestFS = null;
625    String dataTestDir = getDataTestDir().toString();
626    conf.set(HConstants.HBASE_DIR, dataTestDir);
627    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
628
629    return this.dfsCluster;
630  }
631
632  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
633    createDirsAndSetProperties();
634    dfsCluster =
635      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
636    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
637    this.dfsClusterFixer.start();
638    return dfsCluster;
639  }
640
641  /**
642   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
643   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
644   * the conf.
645   *
646   * <pre>
647   * Configuration conf = TEST_UTIL.getConfiguration();
648   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
649   *   Map.Entry&lt;String, String&gt; e = i.next();
650   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
651   * }
652   * </pre>
653   */
654  private void createDirsAndSetProperties() throws IOException {
655    setupClusterTestDir();
656    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
657    createDirAndSetProperty("test.cache.data");
658    createDirAndSetProperty("hadoop.tmp.dir");
659    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
660    createDirAndSetProperty("mapreduce.cluster.local.dir");
661    createDirAndSetProperty("mapreduce.cluster.temp.dir");
662    enableShortCircuit();
663
664    Path root = getDataTestDirOnTestFS("hadoop");
665    conf.set(MapreduceTestingShim.getMROutputDirProp(),
666      new Path(root, "mapred-output-dir").toString());
667    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
668    conf.set("mapreduce.jobtracker.staging.root.dir",
669      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
670    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
671    conf.set("yarn.app.mapreduce.am.staging-dir",
672      new Path(root, "mapreduce-am-staging-root-dir").toString());
673
674    // Frustrate yarn's and hdfs's attempts at writing /tmp.
675    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
676    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
677    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
678    createDirAndSetProperty("yarn.nodemanager.log-dirs");
679    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
680    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
681    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
682    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
683    createDirAndSetProperty("dfs.journalnode.edits.dir");
684    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
685    createDirAndSetProperty("nfs.dump.dir");
686    createDirAndSetProperty("java.io.tmpdir");
687    createDirAndSetProperty("dfs.journalnode.edits.dir");
688    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
689    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
690
691    // disable metrics logger since it depend on commons-logging internal classes and we do not want
692    // commons-logging on our classpath
693    conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
694    conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
695  }
696
697  /**
698   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
699   * Default to false.
700   */
701  public boolean isNewVersionBehaviorEnabled() {
702    final String propName = "hbase.tests.new.version.behavior";
703    String v = System.getProperty(propName);
704    if (v != null) {
705      return Boolean.parseBoolean(v);
706    }
707    return false;
708  }
709
710  /**
711   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
712   * allows to specify this parameter on the command line. If not set, default is true.
713   */
714  public boolean isReadShortCircuitOn() {
715    final String propName = "hbase.tests.use.shortcircuit.reads";
716    String readOnProp = System.getProperty(propName);
717    if (readOnProp != null) {
718      return Boolean.parseBoolean(readOnProp);
719    } else {
720      return conf.getBoolean(propName, false);
721    }
722  }
723
724  /**
725   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
726   * including skipping the hdfs checksum checks.
727   */
728  private void enableShortCircuit() {
729    if (isReadShortCircuitOn()) {
730      String curUser = System.getProperty("user.name");
731      LOG.info("read short circuit is ON for user " + curUser);
732      // read short circuit, for hdfs
733      conf.set("dfs.block.local-path-access.user", curUser);
734      // read short circuit, for hbase
735      conf.setBoolean("dfs.client.read.shortcircuit", true);
736      // Skip checking checksum, for the hdfs client and the datanode
737      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
738    } else {
739      LOG.info("read short circuit is OFF");
740    }
741  }
742
743  private String createDirAndSetProperty(final String property) {
744    return createDirAndSetProperty(property, property);
745  }
746
747  private String createDirAndSetProperty(final String relPath, String property) {
748    String path = getDataTestDir(relPath).toString();
749    System.setProperty(property, path);
750    conf.set(property, path);
751    new File(path).mkdirs();
752    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
753    return path;
754  }
755
756  /**
757   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
758   */
759  public void shutdownMiniDFSCluster() throws IOException {
760    if (this.dfsCluster != null) {
761      // The below throws an exception per dn, AsynchronousCloseException.
762      this.dfsCluster.shutdown();
763      dfsCluster = null;
764      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
765      // have a fixer
766      if (dfsClusterFixer != null) {
767        this.dfsClusterFixer.shutdown();
768        dfsClusterFixer = null;
769      }
770      dataTestDirOnTestFS = null;
771      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
772    }
773  }
774
775  /**
776   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
777   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
778   * @param createWALDir Whether to create a new WAL directory.
779   * @return The mini HBase cluster created.
780   * @see #shutdownMiniCluster()
781   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
782   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
783   * @see #startMiniCluster(StartMiniClusterOption)
784   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
785   */
786  @Deprecated
787  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
788    StartMiniClusterOption option =
789      StartMiniClusterOption.builder().createWALDir(createWALDir).build();
790    return startMiniCluster(option);
791  }
792
793  /**
794   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
795   * defined in {@link StartMiniClusterOption.Builder}.
796   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
797   * @param createRootDir Whether to create a new root or data directory path.
798   * @return The mini HBase cluster created.
799   * @see #shutdownMiniCluster()
800   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
801   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
802   * @see #startMiniCluster(StartMiniClusterOption)
803   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
804   */
805  @Deprecated
806  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception {
807    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
808      .numDataNodes(numSlaves).createRootDir(createRootDir).build();
809    return startMiniCluster(option);
810  }
811
812  /**
813   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
814   * defined in {@link StartMiniClusterOption.Builder}.
815   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
816   * @param createRootDir Whether to create a new root or data directory path.
817   * @param createWALDir  Whether to create a new WAL directory.
818   * @return The mini HBase cluster created.
819   * @see #shutdownMiniCluster()
820   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
821   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
822   * @see #startMiniCluster(StartMiniClusterOption)
823   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
824   */
825  @Deprecated
826  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
827    boolean createWALDir) throws Exception {
828    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
829      .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build();
830    return startMiniCluster(option);
831  }
832
833  /**
834   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
835   * defined in {@link StartMiniClusterOption.Builder}.
836   * @param numMasters    Master node number.
837   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
838   * @param createRootDir Whether to create a new root or data directory path.
839   * @return The mini HBase cluster created.
840   * @see #shutdownMiniCluster()
841   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
842   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
843   * @see #startMiniCluster(StartMiniClusterOption)
844   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
845   */
846  @Deprecated
847  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
848    throws Exception {
849    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
850      .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build();
851    return startMiniCluster(option);
852  }
853
854  /**
855   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
856   * defined in {@link StartMiniClusterOption.Builder}.
857   * @param numMasters Master node number.
858   * @param numSlaves  Slave node number, for both HBase region server and HDFS data node.
859   * @return The mini HBase cluster created.
860   * @see #shutdownMiniCluster()
861   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
862   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
863   * @see #startMiniCluster(StartMiniClusterOption)
864   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
865   */
866  @Deprecated
867  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
868    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
869      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
870    return startMiniCluster(option);
871  }
872
873  /**
874   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
875   * defined in {@link StartMiniClusterOption.Builder}.
876   * @param numMasters    Master node number.
877   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
878   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
879   *                      HDFS data node number.
880   * @param createRootDir Whether to create a new root or data directory path.
881   * @return The mini HBase cluster created.
882   * @see #shutdownMiniCluster()
883   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
884   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
885   * @see #startMiniCluster(StartMiniClusterOption)
886   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
887   */
888  @Deprecated
889  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
890    boolean createRootDir) throws Exception {
891    StartMiniClusterOption option =
892      StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves)
893        .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
894    return startMiniCluster(option);
895  }
896
897  /**
898   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
899   * defined in {@link StartMiniClusterOption.Builder}.
900   * @param numMasters    Master node number.
901   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
902   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
903   *                      HDFS data node number.
904   * @return The mini HBase cluster created.
905   * @see #shutdownMiniCluster()
906   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
907   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
908   * @see #startMiniCluster(StartMiniClusterOption)
909   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
910   */
911  @Deprecated
912  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
913    throws Exception {
914    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
915      .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
916    return startMiniCluster(option);
917  }
918
919  /**
920   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
921   * defined in {@link StartMiniClusterOption.Builder}.
922   * @param numMasters       Master node number.
923   * @param numRegionServers Number of region servers.
924   * @param numDataNodes     Number of datanodes.
925   * @return The mini HBase cluster created.
926   * @see #shutdownMiniCluster()
927   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
928   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
929   * @see #startMiniCluster(StartMiniClusterOption)
930   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
931   */
932  @Deprecated
933  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
934    throws Exception {
935    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
936      .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build();
937    return startMiniCluster(option);
938  }
939
940  /**
941   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
942   * defined in {@link StartMiniClusterOption.Builder}.
943   * @param numMasters    Master node number.
944   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
945   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
946   *                      HDFS data node number.
947   * @param masterClass   The class to use as HMaster, or null for default.
948   * @param rsClass       The class to use as HRegionServer, or null for default.
949   * @return The mini HBase cluster created.
950   * @see #shutdownMiniCluster()
951   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
952   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
953   * @see #startMiniCluster(StartMiniClusterOption)
954   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
955   */
956  @Deprecated
957  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
958    Class<? extends HMaster> masterClass,
959    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
960    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
961      .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves)
962      .dataNodeHosts(dataNodeHosts).build();
963    return startMiniCluster(option);
964  }
965
966  /**
967   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
968   * defined in {@link StartMiniClusterOption.Builder}.
969   * @param numMasters       Master node number.
970   * @param numRegionServers Number of region servers.
971   * @param numDataNodes     Number of datanodes.
972   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
973   *                         overwrite HDFS data node number.
974   * @param masterClass      The class to use as HMaster, or null for default.
975   * @param rsClass          The class to use as HRegionServer, or null for default.
976   * @return The mini HBase cluster created.
977   * @see #shutdownMiniCluster()
978   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
979   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
980   * @see #startMiniCluster(StartMiniClusterOption)
981   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
982   */
983  @Deprecated
984  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
985    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
986    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
987    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
988      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
989      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build();
990    return startMiniCluster(option);
991  }
992
993  /**
994   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
995   * defined in {@link StartMiniClusterOption.Builder}.
996   * @param numMasters       Master node number.
997   * @param numRegionServers Number of region servers.
998   * @param numDataNodes     Number of datanodes.
999   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
1000   *                         overwrite HDFS data node number.
1001   * @param masterClass      The class to use as HMaster, or null for default.
1002   * @param rsClass          The class to use as HRegionServer, or null for default.
1003   * @param createRootDir    Whether to create a new root or data directory path.
1004   * @param createWALDir     Whether to create a new WAL directory.
1005   * @return The mini HBase cluster created.
1006   * @see #shutdownMiniCluster()
1007   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1008   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
1009   * @see #startMiniCluster(StartMiniClusterOption)
1010   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1011   */
1012  @Deprecated
1013  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1014    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1015    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1016    boolean createWALDir) throws Exception {
1017    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1018      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
1019      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir)
1020      .createWALDir(createWALDir).build();
1021    return startMiniCluster(option);
1022  }
1023
1024  /**
1025   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
1026   * other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1027   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1028   * @see #startMiniCluster(StartMiniClusterOption option)
1029   * @see #shutdownMiniDFSCluster()
1030   */
1031  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1032    StartMiniClusterOption option =
1033      StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1034    return startMiniCluster(option);
1035  }
1036
1037  /**
1038   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
1039   * value can be found in {@link StartMiniClusterOption.Builder}.
1040   * @see #startMiniCluster(StartMiniClusterOption option)
1041   * @see #shutdownMiniDFSCluster()
1042   */
1043  public MiniHBaseCluster startMiniCluster() throws Exception {
1044    return startMiniCluster(StartMiniClusterOption.builder().build());
1045  }
1046
1047  /**
1048   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
1049   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
1050   * under System property test.build.data, to be cleaned up on exit.
1051   * @see #shutdownMiniDFSCluster()
1052   */
1053  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1054    LOG.info("Starting up minicluster with option: {}", option);
1055
1056    // If we already put up a cluster, fail.
1057    if (miniClusterRunning) {
1058      throw new IllegalStateException("A mini-cluster is already running");
1059    }
1060    miniClusterRunning = true;
1061
1062    setupClusterTestDir();
1063
1064    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1065    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1066    if (dfsCluster == null) {
1067      LOG.info("STARTING DFS");
1068      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1069    } else {
1070      LOG.info("NOT STARTING DFS");
1071    }
1072
1073    // Start up a zk cluster.
1074    if (getZkCluster() == null) {
1075      startMiniZKCluster(option.getNumZkServers());
1076    }
1077
1078    // Start the MiniHBaseCluster
1079    return startMiniHBaseCluster(option);
1080  }
1081
1082  /**
1083   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1084   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1085   * @return Reference to the hbase mini hbase cluster.
1086   * @see #startMiniCluster(StartMiniClusterOption)
1087   * @see #shutdownMiniHBaseCluster()
1088   */
1089  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1090    throws IOException, InterruptedException {
1091    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1092    createRootDir(option.isCreateRootDir());
1093    if (option.isCreateWALDir()) {
1094      createWALRootDir();
1095    }
1096    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1097    // for tests that do not read hbase-defaults.xml
1098    setHBaseFsTmpDir();
1099
1100    // These settings will make the server waits until this exact number of
1101    // regions servers are connected.
1102    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1103      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1104    }
1105    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1106      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1107    }
1108
1109    Configuration c = new Configuration(this.conf);
1110    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1111      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1112      option.getMasterClass(), option.getRsClass());
1113    // Populate the master address configuration from mini cluster configuration.
1114    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1115    // Don't leave here till we've done a successful scan of the hbase:meta
1116    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1117      ResultScanner s = t.getScanner(new Scan())) {
1118      for (;;) {
1119        if (s.next() == null) {
1120          break;
1121        }
1122      }
1123    }
1124
1125    getAdmin(); // create immediately the hbaseAdmin
1126    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1127
1128    return (MiniHBaseCluster) hbaseCluster;
1129  }
1130
1131  /**
1132   * Starts up mini hbase cluster using default options. Default options can be found in
1133   * {@link StartMiniClusterOption.Builder}.
1134   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1135   * @see #shutdownMiniHBaseCluster()
1136   */
1137  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1138    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1139  }
1140
1141  /**
1142   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1143   * {@link #startMiniCluster()}. All other options will use default values, defined in
1144   * {@link StartMiniClusterOption.Builder}.
1145   * @param numMasters       Master node number.
1146   * @param numRegionServers Number of region servers.
1147   * @return The mini HBase cluster created.
1148   * @see #shutdownMiniHBaseCluster()
1149   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1150   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1151   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1152   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1153   */
1154  @Deprecated
1155  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1156    throws IOException, InterruptedException {
1157    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1158      .numRegionServers(numRegionServers).build();
1159    return startMiniHBaseCluster(option);
1160  }
1161
1162  /**
1163   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1164   * {@link #startMiniCluster()}. All other options will use default values, defined in
1165   * {@link StartMiniClusterOption.Builder}.
1166   * @param numMasters       Master node number.
1167   * @param numRegionServers Number of region servers.
1168   * @param rsPorts          Ports that RegionServer should use.
1169   * @return The mini HBase cluster created.
1170   * @see #shutdownMiniHBaseCluster()
1171   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1172   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1173   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1174   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1175   */
1176  @Deprecated
1177  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1178    List<Integer> rsPorts) throws IOException, InterruptedException {
1179    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1180      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1181    return startMiniHBaseCluster(option);
1182  }
1183
1184  /**
1185   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1186   * {@link #startMiniCluster()}. All other options will use default values, defined in
1187   * {@link StartMiniClusterOption.Builder}.
1188   * @param numMasters       Master node number.
1189   * @param numRegionServers Number of region servers.
1190   * @param rsPorts          Ports that RegionServer should use.
1191   * @param masterClass      The class to use as HMaster, or null for default.
1192   * @param rsClass          The class to use as HRegionServer, or null for default.
1193   * @param createRootDir    Whether to create a new root or data directory path.
1194   * @param createWALDir     Whether to create a new WAL directory.
1195   * @return The mini HBase cluster created.
1196   * @see #shutdownMiniHBaseCluster()
1197   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1198   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1199   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1200   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1201   */
1202  @Deprecated
1203  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1204    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1205    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1206    boolean createWALDir) throws IOException, InterruptedException {
1207    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1208      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1209      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1210    return startMiniHBaseCluster(option);
1211  }
1212
1213  /**
1214   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1215   * want to keep dfs/zk up and just stop/start hbase.
1216   * @param servers number of region servers
1217   */
1218  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1219    this.restartHBaseCluster(servers, null);
1220  }
1221
1222  public void restartHBaseCluster(int servers, List<Integer> ports)
1223    throws IOException, InterruptedException {
1224    StartMiniClusterOption option =
1225      StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1226    restartHBaseCluster(option);
1227    invalidateConnection();
1228  }
1229
1230  public void restartHBaseCluster(StartMiniClusterOption option)
1231    throws IOException, InterruptedException {
1232    closeConnection();
1233    this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(),
1234      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1235      option.getMasterClass(), option.getRsClass());
1236    // Don't leave here till we've done a successful scan of the hbase:meta
1237    Connection conn = ConnectionFactory.createConnection(this.conf);
1238    Table t = conn.getTable(TableName.META_TABLE_NAME);
1239    ResultScanner s = t.getScanner(new Scan());
1240    while (s.next() != null) {
1241      // do nothing
1242    }
1243    LOG.info("HBase has been restarted");
1244    s.close();
1245    t.close();
1246    conn.close();
1247  }
1248
1249  /**
1250   * @return Current mini hbase cluster. Only has something in it after a call to
1251   *         {@link #startMiniCluster()}.
1252   * @see #startMiniCluster()
1253   */
1254  public MiniHBaseCluster getMiniHBaseCluster() {
1255    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1256      return (MiniHBaseCluster) this.hbaseCluster;
1257    }
1258    throw new RuntimeException(
1259      hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName());
1260  }
1261
1262  /**
1263   * Stops mini hbase, zk, and hdfs clusters.
1264   * @see #startMiniCluster(int)
1265   */
1266  public void shutdownMiniCluster() throws IOException {
1267    LOG.info("Shutting down minicluster");
1268    shutdownMiniHBaseCluster();
1269    shutdownMiniDFSCluster();
1270    shutdownMiniZKCluster();
1271
1272    cleanupTestDir();
1273    miniClusterRunning = false;
1274    LOG.info("Minicluster is down");
1275  }
1276
1277  /**
1278   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1279   * @throws java.io.IOException in case command is unsuccessful
1280   */
1281  public void shutdownMiniHBaseCluster() throws IOException {
1282    cleanup();
1283    if (this.hbaseCluster != null) {
1284      this.hbaseCluster.shutdown();
1285      // Wait till hbase is down before going on to shutdown zk.
1286      this.hbaseCluster.waitUntilShutDown();
1287      this.hbaseCluster = null;
1288    }
1289    if (zooKeeperWatcher != null) {
1290      zooKeeperWatcher.close();
1291      zooKeeperWatcher = null;
1292    }
1293  }
1294
1295  /**
1296   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1297   * @throws java.io.IOException throws in case command is unsuccessful
1298   */
1299  public void killMiniHBaseCluster() throws IOException {
1300    cleanup();
1301    if (this.hbaseCluster != null) {
1302      getMiniHBaseCluster().killAll();
1303      this.hbaseCluster = null;
1304    }
1305    if (zooKeeperWatcher != null) {
1306      zooKeeperWatcher.close();
1307      zooKeeperWatcher = null;
1308    }
1309  }
1310
1311  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1312  private void cleanup() throws IOException {
1313    closeConnection();
1314    // unset the configuration for MIN and MAX RS to start
1315    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1316    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1317  }
1318
1319  /**
1320   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1321   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1322   * If false, previous path is used. Note: this does not cause the root dir to be created.
1323   * @return Fully qualified path for the default hbase root dir
1324   */
1325  public Path getDefaultRootDirPath(boolean create) throws IOException {
1326    if (!create) {
1327      return getDataTestDirOnTestFS();
1328    } else {
1329      return getNewDataTestDirOnTestFS();
1330    }
1331  }
1332
1333  /**
1334   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that
1335   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1336   * @return Fully qualified path for the default hbase root dir
1337   */
1338  public Path getDefaultRootDirPath() throws IOException {
1339    return getDefaultRootDirPath(false);
1340  }
1341
1342  /**
1343   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1344   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1345   * startup. You'd only use this method if you were doing manual operation.
1346   * @param create This flag decides whether to get a new root or data directory path or not, if it
1347   *               has been fetched already. Note : Directory will be made irrespective of whether
1348   *               path has been fetched or not. If directory already exists, it will be overwritten
1349   * @return Fully qualified path to hbase root dir
1350   */
1351  public Path createRootDir(boolean create) throws IOException {
1352    FileSystem fs = FileSystem.get(this.conf);
1353    Path hbaseRootdir = getDefaultRootDirPath(create);
1354    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1355    fs.mkdirs(hbaseRootdir);
1356    FSUtils.setVersion(fs, hbaseRootdir);
1357    return hbaseRootdir;
1358  }
1359
1360  /**
1361   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that
1362   * <code>create</code> flag is false.
1363   * @return Fully qualified path to hbase root dir
1364   */
1365  public Path createRootDir() throws IOException {
1366    return createRootDir(false);
1367  }
1368
1369  /**
1370   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1371   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1372   * this method if you were doing manual operation.
1373   * @return Fully qualified path to hbase root dir
1374   */
1375  public Path createWALRootDir() throws IOException {
1376    FileSystem fs = FileSystem.get(this.conf);
1377    Path walDir = getNewDataTestDirOnTestFS();
1378    CommonFSUtils.setWALRootDir(this.conf, walDir);
1379    fs.mkdirs(walDir);
1380    return walDir;
1381  }
1382
1383  private void setHBaseFsTmpDir() throws IOException {
1384    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1385    if (hbaseFsTmpDirInString == null) {
1386      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1387      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1388    } else {
1389      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1390    }
1391  }
1392
1393  /**
1394   * Flushes all caches in the mini hbase cluster
1395   */
1396  public void flush() throws IOException {
1397    getMiniHBaseCluster().flushcache();
1398  }
1399
1400  /**
1401   * Flushes all caches in the mini hbase cluster
1402   */
1403  public void flush(TableName tableName) throws IOException {
1404    getMiniHBaseCluster().flushcache(tableName);
1405  }
1406
1407  /**
1408   * Compact all regions in the mini hbase cluster
1409   */
1410  public void compact(boolean major) throws IOException {
1411    getMiniHBaseCluster().compact(major);
1412  }
1413
1414  /**
1415   * Compact all of a table's reagion in the mini hbase cluster
1416   */
1417  public void compact(TableName tableName, boolean major) throws IOException {
1418    getMiniHBaseCluster().compact(tableName, major);
1419  }
1420
1421  /**
1422   * Create a table.
1423   * @return A Table instance for the created table.
1424   */
1425  public Table createTable(TableName tableName, String family) throws IOException {
1426    return createTable(tableName, new String[] { family });
1427  }
1428
1429  /**
1430   * Create a table.
1431   * @return A Table instance for the created table.
1432   */
1433  public Table createTable(TableName tableName, String[] families) throws IOException {
1434    List<byte[]> fams = new ArrayList<>(families.length);
1435    for (String family : families) {
1436      fams.add(Bytes.toBytes(family));
1437    }
1438    return createTable(tableName, fams.toArray(new byte[0][]));
1439  }
1440
1441  /**
1442   * Create a table.
1443   * @return A Table instance for the created table.
1444   */
1445  public Table createTable(TableName tableName, byte[] family) throws IOException {
1446    return createTable(tableName, new byte[][] { family });
1447  }
1448
1449  /**
1450   * Create a table with multiple regions.
1451   * @return A Table instance for the created table.
1452   */
1453  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1454    throws IOException {
1455    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1456    byte[] startKey = Bytes.toBytes("aaaaa");
1457    byte[] endKey = Bytes.toBytes("zzzzz");
1458    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1459
1460    return createTable(tableName, new byte[][] { family }, splitKeys);
1461  }
1462
1463  /**
1464   * Create a table.
1465   * @return A Table instance for the created table.
1466   */
1467  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1468    return createTable(tableName, families, (byte[][]) null);
1469  }
1470
1471  /**
1472   * Create a table with multiple regions.
1473   * @return A Table instance for the created table.
1474   */
1475  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1476    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1477  }
1478
1479  /**
1480   * Create a table with multiple regions.
1481   * @param replicaCount replica count.
1482   * @return A Table instance for the created table.
1483   */
1484  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1485    throws IOException {
1486    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1487  }
1488
1489  /**
1490   * Create a table.
1491   * @return A Table instance for the created table.
1492   */
1493  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1494    throws IOException {
1495    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1496  }
1497
1498  /**
1499   * Create a table.
1500   * @param tableName    the table name
1501   * @param families     the families
1502   * @param splitKeys    the splitkeys
1503   * @param replicaCount the region replica count
1504   * @return A Table instance for the created table.
1505   * @throws IOException throws IOException
1506   */
1507  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1508    int replicaCount) throws IOException {
1509    return createTable(tableName, families, splitKeys, replicaCount,
1510      new Configuration(getConfiguration()));
1511  }
1512
1513  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1514    byte[] endKey, int numRegions) throws IOException {
1515    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1516
1517    getAdmin().createTable(desc, startKey, endKey, numRegions);
1518    // HBaseAdmin only waits for regions to appear in hbase:meta we
1519    // should wait until they are assigned
1520    waitUntilAllRegionsAssigned(tableName);
1521    return getConnection().getTable(tableName);
1522  }
1523
1524  /**
1525   * Create a table.
1526   * @param c Configuration to use
1527   * @return A Table instance for the created table.
1528   */
1529  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1530    throws IOException {
1531    return createTable(htd, families, null, c);
1532  }
1533
1534  /**
1535   * Create a table.
1536   * @param htd       table descriptor
1537   * @param families  array of column families
1538   * @param splitKeys array of split keys
1539   * @param c         Configuration to use
1540   * @return A Table instance for the created table.
1541   * @throws IOException if getAdmin or createTable fails
1542   */
1543  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1544    Configuration c) throws IOException {
1545    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1546    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1547    // on is interfering.
1548    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1549  }
1550
1551  /**
1552   * Create a table.
1553   * @param htd       table descriptor
1554   * @param families  array of column families
1555   * @param splitKeys array of split keys
1556   * @param type      Bloom type
1557   * @param blockSize block size
1558   * @param c         Configuration to use
1559   * @return A Table instance for the created table.
1560   * @throws IOException if getAdmin or createTable fails
1561   */
1562
1563  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1564    BloomType type, int blockSize, Configuration c) throws IOException {
1565    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1566    for (byte[] family : families) {
1567      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1568        .setBloomFilterType(type).setBlocksize(blockSize);
1569      if (isNewVersionBehaviorEnabled()) {
1570        cfdb.setNewVersionBehavior(true);
1571      }
1572      builder.setColumnFamily(cfdb.build());
1573    }
1574    TableDescriptor td = builder.build();
1575    if (splitKeys != null) {
1576      getAdmin().createTable(td, splitKeys);
1577    } else {
1578      getAdmin().createTable(td);
1579    }
1580    // HBaseAdmin only waits for regions to appear in hbase:meta
1581    // we should wait until they are assigned
1582    waitUntilAllRegionsAssigned(td.getTableName());
1583    return getConnection().getTable(td.getTableName());
1584  }
1585
1586  /**
1587   * Create a table.
1588   * @param htd       table descriptor
1589   * @param splitRows array of split keys
1590   * @return A Table instance for the created table.
1591   */
1592  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1593    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1594    if (isNewVersionBehaviorEnabled()) {
1595      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1596        builder.setColumnFamily(
1597          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1598      }
1599    }
1600    if (splitRows != null) {
1601      getAdmin().createTable(builder.build(), splitRows);
1602    } else {
1603      getAdmin().createTable(builder.build());
1604    }
1605    // HBaseAdmin only waits for regions to appear in hbase:meta
1606    // we should wait until they are assigned
1607    waitUntilAllRegionsAssigned(htd.getTableName());
1608    return getConnection().getTable(htd.getTableName());
1609  }
1610
1611  /**
1612   * Create a table.
1613   * @param tableName    the table name
1614   * @param families     the families
1615   * @param splitKeys    the split keys
1616   * @param replicaCount the replica count
1617   * @param c            Configuration to use
1618   * @return A Table instance for the created table.
1619   */
1620  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1621    int replicaCount, final Configuration c) throws IOException {
1622    TableDescriptor htd =
1623      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1624    return createTable(htd, families, splitKeys, c);
1625  }
1626
1627  /**
1628   * Create a table.
1629   * @return A Table instance for the created table.
1630   */
1631  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1632    return createTable(tableName, new byte[][] { family }, numVersions);
1633  }
1634
1635  /**
1636   * Create a table.
1637   * @return A Table instance for the created table.
1638   */
1639  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1640    throws IOException {
1641    return createTable(tableName, families, numVersions, (byte[][]) null);
1642  }
1643
1644  /**
1645   * Create a table.
1646   * @return A Table instance for the created table.
1647   */
1648  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1649    byte[][] splitKeys) throws IOException {
1650    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1651    for (byte[] family : families) {
1652      ColumnFamilyDescriptorBuilder cfBuilder =
1653        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1654      if (isNewVersionBehaviorEnabled()) {
1655        cfBuilder.setNewVersionBehavior(true);
1656      }
1657      builder.setColumnFamily(cfBuilder.build());
1658    }
1659    if (splitKeys != null) {
1660      getAdmin().createTable(builder.build(), splitKeys);
1661    } else {
1662      getAdmin().createTable(builder.build());
1663    }
1664    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1665    // assigned
1666    waitUntilAllRegionsAssigned(tableName);
1667    return getConnection().getTable(tableName);
1668  }
1669
1670  /**
1671   * Create a table with multiple regions.
1672   * @return A Table instance for the created table.
1673   */
1674  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1675    throws IOException {
1676    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1677  }
1678
1679  /**
1680   * Create a table.
1681   * @return A Table instance for the created table.
1682   */
1683  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1684    throws IOException {
1685    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1686    for (byte[] family : families) {
1687      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1688        .setMaxVersions(numVersions).setBlocksize(blockSize);
1689      if (isNewVersionBehaviorEnabled()) {
1690        cfBuilder.setNewVersionBehavior(true);
1691      }
1692      builder.setColumnFamily(cfBuilder.build());
1693    }
1694    getAdmin().createTable(builder.build());
1695    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1696    // assigned
1697    waitUntilAllRegionsAssigned(tableName);
1698    return getConnection().getTable(tableName);
1699  }
1700
1701  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1702    String cpName) throws IOException {
1703    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1704    for (byte[] family : families) {
1705      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1706        .setMaxVersions(numVersions).setBlocksize(blockSize);
1707      if (isNewVersionBehaviorEnabled()) {
1708        cfBuilder.setNewVersionBehavior(true);
1709      }
1710      builder.setColumnFamily(cfBuilder.build());
1711    }
1712    if (cpName != null) {
1713      builder.setCoprocessor(cpName);
1714    }
1715    getAdmin().createTable(builder.build());
1716    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1717    // assigned
1718    waitUntilAllRegionsAssigned(tableName);
1719    return getConnection().getTable(tableName);
1720  }
1721
1722  /**
1723   * Create a table.
1724   * @return A Table instance for the created table.
1725   */
1726  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1727    throws IOException {
1728    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1729    int i = 0;
1730    for (byte[] family : families) {
1731      ColumnFamilyDescriptorBuilder cfBuilder =
1732        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1733      if (isNewVersionBehaviorEnabled()) {
1734        cfBuilder.setNewVersionBehavior(true);
1735      }
1736      builder.setColumnFamily(cfBuilder.build());
1737      i++;
1738    }
1739    getAdmin().createTable(builder.build());
1740    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1741    // assigned
1742    waitUntilAllRegionsAssigned(tableName);
1743    return getConnection().getTable(tableName);
1744  }
1745
1746  /**
1747   * Create a table.
1748   * @return A Table instance for the created table.
1749   */
1750  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1751    throws IOException {
1752    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1753    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1754    if (isNewVersionBehaviorEnabled()) {
1755      cfBuilder.setNewVersionBehavior(true);
1756    }
1757    builder.setColumnFamily(cfBuilder.build());
1758    getAdmin().createTable(builder.build(), splitRows);
1759    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1760    // assigned
1761    waitUntilAllRegionsAssigned(tableName);
1762    return getConnection().getTable(tableName);
1763  }
1764
1765  /**
1766   * Create a table with multiple regions.
1767   * @return A Table instance for the created table.
1768   */
1769  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1770    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1771  }
1772
1773  /**
1774   * Modify a table, synchronous.
1775   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1776   *             {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1777   * @see Admin#modifyTable(TableDescriptor)
1778   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1779   */
1780  @Deprecated
1781  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1782    throws IOException, InterruptedException {
1783    admin.modifyTable(desc);
1784  }
1785
1786  /**
1787   * Set the number of Region replicas.
1788   */
1789  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1790    throws IOException, InterruptedException {
1791    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1792      .setRegionReplication(replicaCount).build();
1793    admin.modifyTable(desc);
1794  }
1795
1796  /**
1797   * Drop an existing table
1798   * @param tableName existing table
1799   */
1800  public void deleteTable(TableName tableName) throws IOException {
1801    try {
1802      getAdmin().disableTable(tableName);
1803    } catch (TableNotEnabledException e) {
1804      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1805    }
1806    getAdmin().deleteTable(tableName);
1807  }
1808
1809  /**
1810   * Drop an existing table
1811   * @param tableName existing table
1812   */
1813  public void deleteTableIfAny(TableName tableName) throws IOException {
1814    try {
1815      deleteTable(tableName);
1816    } catch (TableNotFoundException e) {
1817      // ignore
1818    }
1819  }
1820
1821  // ==========================================================================
1822  // Canned table and table descriptor creation
1823
1824  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1825  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1826  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1827  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1828  private static final int MAXVERSIONS = 3;
1829
1830  public static final char FIRST_CHAR = 'a';
1831  public static final char LAST_CHAR = 'z';
1832  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1833  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1834
1835  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1836    return createModifyableTableDescriptor(TableName.valueOf(name),
1837      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1838      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1839  }
1840
1841  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1842    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1843    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1844    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1845      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1846        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1847        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1848      if (isNewVersionBehaviorEnabled()) {
1849        cfBuilder.setNewVersionBehavior(true);
1850      }
1851      builder.setColumnFamily(cfBuilder.build());
1852    }
1853    return builder.build();
1854  }
1855
1856  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1857    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1858    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1859    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1860      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1861        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1862        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1863      if (isNewVersionBehaviorEnabled()) {
1864        cfBuilder.setNewVersionBehavior(true);
1865      }
1866      builder.setColumnFamily(cfBuilder.build());
1867    }
1868    return builder;
1869  }
1870
1871  /**
1872   * Create a table of name <code>name</code>.
1873   * @param name Name to give table.
1874   * @return Column descriptor.
1875   */
1876  public TableDescriptor createTableDescriptor(final TableName name) {
1877    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1878      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1879  }
1880
1881  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1882    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1883  }
1884
1885  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1886    int maxVersions) {
1887    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1888    for (byte[] family : families) {
1889      ColumnFamilyDescriptorBuilder cfBuilder =
1890        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1891      if (isNewVersionBehaviorEnabled()) {
1892        cfBuilder.setNewVersionBehavior(true);
1893      }
1894      builder.setColumnFamily(cfBuilder.build());
1895    }
1896    return builder.build();
1897  }
1898
1899  /**
1900   * Create an HRegion that writes to the local tmp dirs
1901   * @param desc     a table descriptor indicating which table the region belongs to
1902   * @param startKey the start boundary of the region
1903   * @param endKey   the end boundary of the region
1904   * @return a region that writes to local dir for testing
1905   */
1906  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1907    throws IOException {
1908    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1909      .setEndKey(endKey).build();
1910    return createLocalHRegion(hri, desc);
1911  }
1912
1913  /**
1914   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1915   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1916   */
1917  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1918    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1919  }
1920
1921  /**
1922   * Create an HRegion that writes to the local tmp dirs with specified wal
1923   * @param info regioninfo
1924   * @param conf configuration
1925   * @param desc table descriptor
1926   * @param wal  wal for this region.
1927   * @return created hregion
1928   */
1929  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1930    WAL wal) throws IOException {
1931    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1932  }
1933
1934  /**
1935   * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
1936   * when done.
1937   */
1938  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1939    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1940    throws IOException {
1941    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1942      durability, wal, null, families);
1943  }
1944
1945  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1946    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1947    boolean[] compactedMemStore, byte[]... families) throws IOException {
1948    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1949    builder.setReadOnly(isReadOnly);
1950    int i = 0;
1951    for (byte[] family : families) {
1952      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1953      if (compactedMemStore != null && i < compactedMemStore.length) {
1954        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1955      } else {
1956        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1957
1958      }
1959      i++;
1960      // Set default to be three versions.
1961      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1962      builder.setColumnFamily(cfBuilder.build());
1963    }
1964    builder.setDurability(durability);
1965    RegionInfo info =
1966      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1967    return createLocalHRegion(info, conf, builder.build(), wal);
1968  }
1969
1970  //
1971  // ==========================================================================
1972
1973  /**
1974   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1975   * read.
1976   * @param tableName existing table
1977   * @return HTable to that new table
1978   */
1979  public Table deleteTableData(TableName tableName) throws IOException {
1980    Table table = getConnection().getTable(tableName);
1981    Scan scan = new Scan();
1982    ResultScanner resScan = table.getScanner(scan);
1983    for (Result res : resScan) {
1984      Delete del = new Delete(res.getRow());
1985      table.delete(del);
1986    }
1987    resScan = table.getScanner(scan);
1988    resScan.close();
1989    return table;
1990  }
1991
1992  /**
1993   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1994   * table.
1995   * @param tableName       table which must exist.
1996   * @param preserveRegions keep the existing split points
1997   * @return HTable for the new table
1998   */
1999  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
2000    throws IOException {
2001    Admin admin = getAdmin();
2002    if (!admin.isTableDisabled(tableName)) {
2003      admin.disableTable(tableName);
2004    }
2005    admin.truncateTable(tableName, preserveRegions);
2006    return getConnection().getTable(tableName);
2007  }
2008
2009  /**
2010   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2011   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
2012   * preserve regions of existing table.
2013   * @param tableName table which must exist.
2014   * @return HTable for the new table
2015   */
2016  public Table truncateTable(final TableName tableName) throws IOException {
2017    return truncateTable(tableName, false);
2018  }
2019
2020  /**
2021   * Load table with rows from 'aaa' to 'zzz'.
2022   * @param t Table
2023   * @param f Family
2024   * @return Count of rows loaded.
2025   */
2026  public int loadTable(final Table t, final byte[] f) throws IOException {
2027    return loadTable(t, new byte[][] { f });
2028  }
2029
2030  /**
2031   * Load table with rows from 'aaa' to 'zzz'.
2032   * @param t Table
2033   * @param f Family
2034   * @return Count of rows loaded.
2035   */
2036  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2037    return loadTable(t, new byte[][] { f }, null, writeToWAL);
2038  }
2039
2040  /**
2041   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2042   * @param t Table
2043   * @param f Array of Families to load
2044   * @return Count of rows loaded.
2045   */
2046  public int loadTable(final Table t, final byte[][] f) throws IOException {
2047    return loadTable(t, f, null);
2048  }
2049
2050  /**
2051   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2052   * @param t     Table
2053   * @param f     Array of Families to load
2054   * @param value the values of the cells. If null is passed, the row key is used as value
2055   * @return Count of rows loaded.
2056   */
2057  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2058    return loadTable(t, f, value, true);
2059  }
2060
2061  /**
2062   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2063   * @param t     Table
2064   * @param f     Array of Families to load
2065   * @param value the values of the cells. If null is passed, the row key is used as value
2066   * @return Count of rows loaded.
2067   */
2068  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
2069    throws IOException {
2070    List<Put> puts = new ArrayList<>();
2071    for (byte[] row : HBaseTestingUtility.ROWS) {
2072      Put put = new Put(row);
2073      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2074      for (int i = 0; i < f.length; i++) {
2075        byte[] value1 = value != null ? value : row;
2076        put.addColumn(f[i], f[i], value1);
2077      }
2078      puts.add(put);
2079    }
2080    t.put(puts);
2081    return puts.size();
2082  }
2083
2084  /**
2085   * A tracker for tracking and validating table rows generated with
2086   * {@link HBaseTestingUtility#loadTable(Table, byte[])}
2087   */
2088  public static class SeenRowTracker {
2089    int dim = 'z' - 'a' + 1;
2090    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
2091    byte[] startRow;
2092    byte[] stopRow;
2093
2094    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2095      this.startRow = startRow;
2096      this.stopRow = stopRow;
2097    }
2098
2099    void reset() {
2100      for (byte[] row : ROWS) {
2101        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2102      }
2103    }
2104
2105    int i(byte b) {
2106      return b - 'a';
2107    }
2108
2109    public void addRow(byte[] row) {
2110      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2111    }
2112
2113    /**
2114     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
2115     * rows none
2116     */
2117    public void validate() {
2118      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2119        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2120          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2121            int count = seenRows[i(b1)][i(b2)][i(b3)];
2122            int expectedCount = 0;
2123            if (
2124              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
2125                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
2126            ) {
2127              expectedCount = 1;
2128            }
2129            if (count != expectedCount) {
2130              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
2131              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
2132                + "instead of " + expectedCount);
2133            }
2134          }
2135        }
2136      }
2137    }
2138  }
2139
2140  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2141    return loadRegion(r, f, false);
2142  }
2143
2144  public int loadRegion(final Region r, final byte[] f) throws IOException {
2145    return loadRegion((HRegion) r, f);
2146  }
2147
2148  /**
2149   * Load region with rows from 'aaa' to 'zzz'.
2150   * @param r     Region
2151   * @param f     Family
2152   * @param flush flush the cache if true
2153   * @return Count of rows loaded.
2154   */
2155  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
2156    byte[] k = new byte[3];
2157    int rowCount = 0;
2158    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2159      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2160        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2161          k[0] = b1;
2162          k[1] = b2;
2163          k[2] = b3;
2164          Put put = new Put(k);
2165          put.setDurability(Durability.SKIP_WAL);
2166          put.addColumn(f, null, k);
2167          if (r.getWAL() == null) {
2168            put.setDurability(Durability.SKIP_WAL);
2169          }
2170          int preRowCount = rowCount;
2171          int pause = 10;
2172          int maxPause = 1000;
2173          while (rowCount == preRowCount) {
2174            try {
2175              r.put(put);
2176              rowCount++;
2177            } catch (RegionTooBusyException e) {
2178              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2179              Threads.sleep(pause);
2180            }
2181          }
2182        }
2183      }
2184      if (flush) {
2185        r.flush(true);
2186      }
2187    }
2188    return rowCount;
2189  }
2190
2191  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2192    throws IOException {
2193    for (int i = startRow; i < endRow; i++) {
2194      byte[] data = Bytes.toBytes(String.valueOf(i));
2195      Put put = new Put(data);
2196      put.addColumn(f, null, data);
2197      t.put(put);
2198    }
2199  }
2200
2201  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2202    throws IOException {
2203    byte[] row = new byte[rowSize];
2204    for (int i = 0; i < totalRows; i++) {
2205      Bytes.random(row);
2206      Put put = new Put(row);
2207      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2208      t.put(put);
2209    }
2210  }
2211
2212  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2213    int replicaId) throws IOException {
2214    for (int i = startRow; i < endRow; i++) {
2215      String failMsg = "Failed verification of row :" + i;
2216      byte[] data = Bytes.toBytes(String.valueOf(i));
2217      Get get = new Get(data);
2218      get.setReplicaId(replicaId);
2219      get.setConsistency(Consistency.TIMELINE);
2220      Result result = table.get(get);
2221      if (!result.containsColumn(f, null)) {
2222        throw new AssertionError(failMsg);
2223      }
2224      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2225      Cell cell = result.getColumnLatestCell(f, null);
2226      if (
2227        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2228          cell.getValueLength())
2229      ) {
2230        throw new AssertionError(failMsg);
2231      }
2232    }
2233  }
2234
2235  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2236    throws IOException {
2237    verifyNumericRows((HRegion) region, f, startRow, endRow);
2238  }
2239
2240  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2241    throws IOException {
2242    verifyNumericRows(region, f, startRow, endRow, true);
2243  }
2244
2245  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2246    final boolean present) throws IOException {
2247    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2248  }
2249
2250  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2251    final boolean present) throws IOException {
2252    for (int i = startRow; i < endRow; i++) {
2253      String failMsg = "Failed verification of row :" + i;
2254      byte[] data = Bytes.toBytes(String.valueOf(i));
2255      Result result = region.get(new Get(data));
2256
2257      boolean hasResult = result != null && !result.isEmpty();
2258      if (present != hasResult) {
2259        throw new AssertionError(
2260          failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">");
2261      }
2262      if (!present) continue;
2263
2264      if (!result.containsColumn(f, null)) {
2265        throw new AssertionError(failMsg);
2266      }
2267      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2268      Cell cell = result.getColumnLatestCell(f, null);
2269      if (
2270        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2271          cell.getValueLength())
2272      ) {
2273        throw new AssertionError(failMsg);
2274      }
2275    }
2276  }
2277
2278  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2279    throws IOException {
2280    for (int i = startRow; i < endRow; i++) {
2281      byte[] data = Bytes.toBytes(String.valueOf(i));
2282      Delete delete = new Delete(data);
2283      delete.addFamily(f);
2284      t.delete(delete);
2285    }
2286  }
2287
2288  /**
2289   * Return the number of rows in the given table.
2290   * @param table to count rows
2291   * @return count of rows
2292   */
2293  public static int countRows(final Table table) throws IOException {
2294    return countRows(table, new Scan());
2295  }
2296
2297  public static int countRows(final Table table, final Scan scan) throws IOException {
2298    try (ResultScanner results = table.getScanner(scan)) {
2299      int count = 0;
2300      while (results.next() != null) {
2301        count++;
2302      }
2303      return count;
2304    }
2305  }
2306
2307  public int countRows(final Table table, final byte[]... families) throws IOException {
2308    Scan scan = new Scan();
2309    for (byte[] family : families) {
2310      scan.addFamily(family);
2311    }
2312    return countRows(table, scan);
2313  }
2314
2315  /**
2316   * Return the number of rows in the given table.
2317   */
2318  public int countRows(final TableName tableName) throws IOException {
2319    Table table = getConnection().getTable(tableName);
2320    try {
2321      return countRows(table);
2322    } finally {
2323      table.close();
2324    }
2325  }
2326
2327  public int countRows(final Region region) throws IOException {
2328    return countRows(region, new Scan());
2329  }
2330
2331  public int countRows(final Region region, final Scan scan) throws IOException {
2332    InternalScanner scanner = region.getScanner(scan);
2333    try {
2334      return countRows(scanner);
2335    } finally {
2336      scanner.close();
2337    }
2338  }
2339
2340  public int countRows(final InternalScanner scanner) throws IOException {
2341    int scannedCount = 0;
2342    List<Cell> results = new ArrayList<>();
2343    boolean hasMore = true;
2344    while (hasMore) {
2345      hasMore = scanner.next(results);
2346      scannedCount += results.size();
2347      results.clear();
2348    }
2349    return scannedCount;
2350  }
2351
2352  /**
2353   * Return an md5 digest of the entire contents of a table.
2354   */
2355  public String checksumRows(final Table table) throws Exception {
2356
2357    Scan scan = new Scan();
2358    ResultScanner results = table.getScanner(scan);
2359    MessageDigest digest = MessageDigest.getInstance("MD5");
2360    for (Result res : results) {
2361      digest.update(res.getRow());
2362    }
2363    results.close();
2364    return digest.toString();
2365  }
2366
2367  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2368  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2369  static {
2370    int i = 0;
2371    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2372      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2373        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2374          ROWS[i][0] = b1;
2375          ROWS[i][1] = b2;
2376          ROWS[i][2] = b3;
2377          i++;
2378        }
2379      }
2380    }
2381  }
2382
2383  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2384    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2385    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2386    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2387    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2388    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2389    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2390
2391  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2392    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2393    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2394    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2395    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2396    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2397    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2398
2399  /**
2400   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2401   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2402   * @return list of region info for regions added to meta
2403   */
2404  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2405    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2406    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2407    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2408    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2409    MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2410      TableState.State.ENABLED);
2411    // add custom ones
2412    for (int i = 0; i < startKeys.length; i++) {
2413      int j = (i + 1) % startKeys.length;
2414      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2415        .setEndKey(startKeys[j]).build();
2416      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2417      newRegions.add(hri);
2418    }
2419
2420    meta.close();
2421    return newRegions;
2422  }
2423
2424  /**
2425   * Create an unmanaged WAL. Be sure to close it when you're through.
2426   */
2427  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2428    throws IOException {
2429    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2430    // unless I pass along via the conf.
2431    Configuration confForWAL = new Configuration(conf);
2432    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2433    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2434  }
2435
2436  /**
2437   * Create a region with it's own WAL. Be sure to call
2438   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2439   */
2440  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2441    final Configuration conf, final TableDescriptor htd) throws IOException {
2442    return createRegionAndWAL(info, rootDir, conf, htd, true);
2443  }
2444
2445  /**
2446   * Create a region with it's own WAL. Be sure to call
2447   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2448   */
2449  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2450    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2451    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2452    region.setBlockCache(blockCache);
2453    region.initialize();
2454    return region;
2455  }
2456
2457  /**
2458   * Create a region with it's own WAL. Be sure to call
2459   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2460   */
2461  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2462    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2463    throws IOException {
2464    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2465    region.setMobFileCache(mobFileCache);
2466    region.initialize();
2467    return region;
2468  }
2469
2470  /**
2471   * Create a region with it's own WAL. Be sure to call
2472   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2473   */
2474  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2475    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2476    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2477      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2478    WAL wal = createWal(conf, rootDir, info);
2479    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2480  }
2481
2482  /**
2483   * Returns all rows from the hbase:meta table.
2484   * @throws IOException When reading the rows fails.
2485   */
2486  public List<byte[]> getMetaTableRows() throws IOException {
2487    // TODO: Redo using MetaTableAccessor class
2488    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2489    List<byte[]> rows = new ArrayList<>();
2490    ResultScanner s = t.getScanner(new Scan());
2491    for (Result result : s) {
2492      LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()));
2493      rows.add(result.getRow());
2494    }
2495    s.close();
2496    t.close();
2497    return rows;
2498  }
2499
2500  /**
2501   * Returns all rows from the hbase:meta table for a given user table
2502   * @throws IOException When reading the rows fails.
2503   */
2504  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2505    // TODO: Redo using MetaTableAccessor.
2506    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2507    List<byte[]> rows = new ArrayList<>();
2508    ResultScanner s = t.getScanner(new Scan());
2509    for (Result result : s) {
2510      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2511      if (info == null) {
2512        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2513        // TODO figure out what to do for this new hosed case.
2514        continue;
2515      }
2516
2517      if (info.getTable().equals(tableName)) {
2518        LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info);
2519        rows.add(result.getRow());
2520      }
2521    }
2522    s.close();
2523    t.close();
2524    return rows;
2525  }
2526
2527  /**
2528   * Returns all regions of the specified table
2529   * @param tableName the table name
2530   * @return all regions of the specified table
2531   * @throws IOException when getting the regions fails.
2532   */
2533  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2534    try (Admin admin = getConnection().getAdmin()) {
2535      return admin.getRegions(tableName);
2536    }
2537  }
2538
2539  /**
2540   * Find any other region server which is different from the one identified by parameter
2541   * @return another region server
2542   */
2543  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2544    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2545      if (!(rst.getRegionServer() == rs)) {
2546        return rst.getRegionServer();
2547      }
2548    }
2549    return null;
2550  }
2551
2552  /**
2553   * Tool to get the reference to the region server object that holds the region of the specified
2554   * user table.
2555   * @param tableName user table to lookup in hbase:meta
2556   * @return region server that holds it, null if the row doesn't exist
2557   */
2558  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2559    throws IOException, InterruptedException {
2560    List<RegionInfo> regions = getRegions(tableName);
2561    if (regions == null || regions.isEmpty()) {
2562      return null;
2563    }
2564    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2565
2566    byte[] firstRegionName =
2567      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2568        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2569
2570    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2571    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2572      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2573    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2574      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2575    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2576    while (retrier.shouldRetry()) {
2577      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2578      if (index != -1) {
2579        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2580      }
2581      // Came back -1. Region may not be online yet. Sleep a while.
2582      retrier.sleepUntilNextRetry();
2583    }
2584    return null;
2585  }
2586
2587  /**
2588   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2589   * @throws IOException When starting the cluster fails.
2590   */
2591  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2592    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2593    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2594      "99.0");
2595    startMiniMapReduceCluster(2);
2596    return mrCluster;
2597  }
2598
2599  /**
2600   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2601   * internal static LOG_DIR variable.
2602   */
2603  private void forceChangeTaskLogDir() {
2604    Field logDirField;
2605    try {
2606      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2607      logDirField.setAccessible(true);
2608
2609      Field modifiersField = ReflectionUtils.getModifiersField();
2610      modifiersField.setAccessible(true);
2611      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2612
2613      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2614    } catch (SecurityException e) {
2615      throw new RuntimeException(e);
2616    } catch (NoSuchFieldException e) {
2617      throw new RuntimeException(e);
2618    } catch (IllegalArgumentException e) {
2619      throw new RuntimeException(e);
2620    } catch (IllegalAccessException e) {
2621      throw new RuntimeException(e);
2622    }
2623  }
2624
2625  /**
2626   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2627   * filesystem.
2628   * @param servers The number of <code>TaskTracker</code>'s to start.
2629   * @throws IOException When starting the cluster fails.
2630   */
2631  private void startMiniMapReduceCluster(final int servers) throws IOException {
2632    if (mrCluster != null) {
2633      throw new IllegalStateException("MiniMRCluster is already running");
2634    }
2635    LOG.info("Starting mini mapreduce cluster...");
2636    setupClusterTestDir();
2637    createDirsAndSetProperties();
2638
2639    forceChangeTaskLogDir();
2640
2641    //// hadoop2 specific settings
2642    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2643    // we up the VM usable so that processes don't get killed.
2644    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2645
2646    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2647    // this avoids the problem by disabling speculative task execution in tests.
2648    conf.setBoolean("mapreduce.map.speculative", false);
2649    conf.setBoolean("mapreduce.reduce.speculative", false);
2650    ////
2651
2652    // Allow the user to override FS URI for this map-reduce cluster to use.
2653    mrCluster =
2654      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2655        1, null, null, new JobConf(this.conf));
2656    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2657    if (jobConf == null) {
2658      jobConf = mrCluster.createJobConf();
2659    }
2660    // Hadoop MiniMR overwrites this while it should not
2661    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2662    LOG.info("Mini mapreduce cluster started");
2663
2664    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2665    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2666    // necessary config properties here. YARN-129 required adding a few properties.
2667    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2668    // this for mrv2 support; mr1 ignores this
2669    conf.set("mapreduce.framework.name", "yarn");
2670    conf.setBoolean("yarn.is.minicluster", true);
2671    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2672    if (rmAddress != null) {
2673      conf.set("yarn.resourcemanager.address", rmAddress);
2674    }
2675    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2676    if (historyAddress != null) {
2677      conf.set("mapreduce.jobhistory.address", historyAddress);
2678    }
2679    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2680    if (schedulerAddress != null) {
2681      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2682    }
2683    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2684    if (mrJobHistoryWebappAddress != null) {
2685      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2686    }
2687    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2688    if (yarnRMWebappAddress != null) {
2689      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2690    }
2691  }
2692
2693  /**
2694   * Stops the previously started <code>MiniMRCluster</code>.
2695   */
2696  public void shutdownMiniMapReduceCluster() {
2697    if (mrCluster != null) {
2698      LOG.info("Stopping mini mapreduce cluster...");
2699      mrCluster.shutdown();
2700      mrCluster = null;
2701      LOG.info("Mini mapreduce cluster stopped");
2702    }
2703    // Restore configuration to point to local jobtracker
2704    conf.set("mapreduce.jobtracker.address", "local");
2705  }
2706
2707  /**
2708   * Create a stubbed out RegionServerService, mainly for getting FS.
2709   */
2710  public RegionServerServices createMockRegionServerService() throws IOException {
2711    return createMockRegionServerService((ServerName) null);
2712  }
2713
2714  /**
2715   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2716   * TestTokenAuthentication
2717   */
2718  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2719    throws IOException {
2720    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2721    rss.setFileSystem(getTestFileSystem());
2722    rss.setRpcServer(rpc);
2723    return rss;
2724  }
2725
2726  /**
2727   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2728   * TestOpenRegionHandler
2729   */
2730  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2731    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2732    rss.setFileSystem(getTestFileSystem());
2733    return rss;
2734  }
2735
2736  /**
2737   * Switches the logger for the given class to DEBUG level.
2738   * @param clazz The class for which to switch to debug logging.
2739   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2740   *             HBase only uses log4j. You should do this by your own as it you know which log
2741   *             framework you are using then set the log level to debug is very easy.
2742   */
2743  @Deprecated
2744  public void enableDebug(Class<?> clazz) {
2745    Log4jUtils.enableDebug(clazz);
2746  }
2747
2748  /**
2749   * Expire the Master's session
2750   */
2751  public void expireMasterSession() throws Exception {
2752    HMaster master = getMiniHBaseCluster().getMaster();
2753    expireSession(master.getZooKeeper(), false);
2754  }
2755
2756  /**
2757   * Expire a region server's session
2758   * @param index which RS
2759   */
2760  public void expireRegionServerSession(int index) throws Exception {
2761    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2762    expireSession(rs.getZooKeeper(), false);
2763    decrementMinRegionServerCount();
2764  }
2765
2766  private void decrementMinRegionServerCount() {
2767    // decrement the count for this.conf, for newly spwaned master
2768    // this.hbaseCluster shares this configuration too
2769    decrementMinRegionServerCount(getConfiguration());
2770
2771    // each master thread keeps a copy of configuration
2772    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2773      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2774    }
2775  }
2776
2777  private void decrementMinRegionServerCount(Configuration conf) {
2778    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2779    if (currentCount != -1) {
2780      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2781    }
2782  }
2783
2784  public void expireSession(ZKWatcher nodeZK) throws Exception {
2785    expireSession(nodeZK, false);
2786  }
2787
2788  /**
2789   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2790   * http://hbase.apache.org/book.html#trouble.zookeeper There are issues when doing this: [1]
2791   * http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html [2]
2792   * https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2793   * @param nodeZK      - the ZK watcher to expire
2794   * @param checkStatus - true to check if we can create a Table with the current configuration.
2795   */
2796  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2797    Configuration c = new Configuration(this.conf);
2798    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2799    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2800    byte[] password = zk.getSessionPasswd();
2801    long sessionID = zk.getSessionId();
2802
2803    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2804    // so we create a first watcher to be sure that the
2805    // event was sent. We expect that if our watcher receives the event
2806    // other watchers on the same machine will get is as well.
2807    // When we ask to close the connection, ZK does not close it before
2808    // we receive all the events, so don't have to capture the event, just
2809    // closing the connection should be enough.
2810    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2811      @Override
2812      public void process(WatchedEvent watchedEvent) {
2813        LOG.info("Monitor ZKW received event=" + watchedEvent);
2814      }
2815    }, sessionID, password);
2816
2817    // Making it expire
2818    ZooKeeper newZK =
2819      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2820
2821    // ensure that we have connection to the server before closing down, otherwise
2822    // the close session event will be eaten out before we start CONNECTING state
2823    long start = EnvironmentEdgeManager.currentTime();
2824    while (
2825      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2826    ) {
2827      Thread.sleep(1);
2828    }
2829    newZK.close();
2830    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2831
2832    // Now closing & waiting to be sure that the clients get it.
2833    monitor.close();
2834
2835    if (checkStatus) {
2836      getConnection().getTable(TableName.META_TABLE_NAME).close();
2837    }
2838  }
2839
2840  /**
2841   * Get the Mini HBase cluster.
2842   * @return hbase cluster
2843   * @see #getHBaseClusterInterface()
2844   */
2845  public MiniHBaseCluster getHBaseCluster() {
2846    return getMiniHBaseCluster();
2847  }
2848
2849  /**
2850   * Returns the HBaseCluster instance.
2851   * <p>
2852   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2853   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2854   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2855   * instead w/o the need to type-cast.
2856   */
2857  public HBaseCluster getHBaseClusterInterface() {
2858    // implementation note: we should rename this method as #getHBaseCluster(),
2859    // but this would require refactoring 90+ calls.
2860    return hbaseCluster;
2861  }
2862
2863  /**
2864   * Resets the connections so that the next time getConnection() is called, a new connection is
2865   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2866   * the connection is not valid anymore. TODO: There should be a more coherent way of doing this.
2867   * Unfortunately the way tests are written, not all start() stop() calls go through this class.
2868   * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult
2869   * for this wrapper class to maintain the connection state automatically. Cleaning this is a much
2870   * bigger refactor.
2871   */
2872  public void invalidateConnection() throws IOException {
2873    closeConnection();
2874    // Update the master addresses if they changed.
2875    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2876    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2877    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2878      masterConfigBefore, masterConfAfter);
2879    conf.set(HConstants.MASTER_ADDRS_KEY,
2880      getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2881  }
2882
2883  /**
2884   * Get a shared Connection to the cluster. this method is thread safe.
2885   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2886   */
2887  public Connection getConnection() throws IOException {
2888    return getAsyncConnection().toConnection();
2889  }
2890
2891  /**
2892   * Get a assigned Connection to the cluster. this method is thread safe.
2893   * @param user assigned user
2894   * @return A Connection with assigned user.
2895   */
2896  public Connection getConnection(User user) throws IOException {
2897    return getAsyncConnection(user).toConnection();
2898  }
2899
2900  /**
2901   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2902   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2903   *         of cluster.
2904   */
2905  public AsyncClusterConnection getAsyncConnection() throws IOException {
2906    try {
2907      return asyncConnection.updateAndGet(connection -> {
2908        if (connection == null) {
2909          try {
2910            User user = UserProvider.instantiate(conf).getCurrent();
2911            connection = getAsyncConnection(user);
2912          } catch (IOException ioe) {
2913            throw new UncheckedIOException("Failed to create connection", ioe);
2914          }
2915        }
2916        return connection;
2917      });
2918    } catch (UncheckedIOException exception) {
2919      throw exception.getCause();
2920    }
2921  }
2922
2923  /**
2924   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2925   * @param user assigned user
2926   * @return An AsyncClusterConnection with assigned user.
2927   */
2928  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2929    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2930  }
2931
2932  public void closeConnection() throws IOException {
2933    if (hbaseAdmin != null) {
2934      Closeables.close(hbaseAdmin, true);
2935      hbaseAdmin = null;
2936    }
2937    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2938    if (asyncConnection != null) {
2939      Closeables.close(asyncConnection, true);
2940    }
2941  }
2942
2943  /**
2944   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2945   * it has no effect, it will be closed automatically when the cluster shutdowns
2946   */
2947  public Admin getAdmin() throws IOException {
2948    if (hbaseAdmin == null) {
2949      this.hbaseAdmin = getConnection().getAdmin();
2950    }
2951    return hbaseAdmin;
2952  }
2953
2954  public KeymetaAdminClient getKeymetaAdmin() throws IOException {
2955    return new KeymetaAdminClient(getConnection());
2956  }
2957
2958  /**
2959   * Returns an {@link Hbck} instance. Needs be closed when done.
2960   */
2961  public Hbck getHbck() throws IOException {
2962    return getConnection().getHbck();
2963  }
2964
2965  /**
2966   * Unassign the named region.
2967   * @param regionName The region to unassign.
2968   */
2969  public void unassignRegion(String regionName) throws IOException {
2970    unassignRegion(Bytes.toBytes(regionName));
2971  }
2972
2973  /**
2974   * Unassign the named region.
2975   * @param regionName The region to unassign.
2976   */
2977  public void unassignRegion(byte[] regionName) throws IOException {
2978    getAdmin().unassign(regionName, true);
2979  }
2980
2981  /**
2982   * Closes the region containing the given row.
2983   * @param row   The row to find the containing region.
2984   * @param table The table to find the region.
2985   */
2986  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2987    unassignRegionByRow(Bytes.toBytes(row), table);
2988  }
2989
2990  /**
2991   * Closes the region containing the given row.
2992   * @param row   The row to find the containing region.
2993   * @param table The table to find the region.
2994   */
2995  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2996    HRegionLocation hrl = table.getRegionLocation(row);
2997    unassignRegion(hrl.getRegion().getRegionName());
2998  }
2999
3000  /**
3001   * Retrieves a splittable region randomly from tableName
3002   * @param tableName   name of table
3003   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3004   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3005   */
3006  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3007    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3008    int regCount = regions.size();
3009    Set<Integer> attempted = new HashSet<>();
3010    int idx;
3011    int attempts = 0;
3012    do {
3013      regions = getHBaseCluster().getRegions(tableName);
3014      if (regCount != regions.size()) {
3015        // if there was region movement, clear attempted Set
3016        attempted.clear();
3017      }
3018      regCount = regions.size();
3019      // There are chances that before we get the region for the table from an RS the region may
3020      // be going for CLOSE. This may be because online schema change is enabled
3021      if (regCount > 0) {
3022        idx = ThreadLocalRandom.current().nextInt(regCount);
3023        // if we have just tried this region, there is no need to try again
3024        if (attempted.contains(idx)) {
3025          continue;
3026        }
3027        HRegion region = regions.get(idx);
3028        if (region.checkSplit().isPresent()) {
3029          return region;
3030        }
3031        attempted.add(idx);
3032      }
3033      attempts++;
3034    } while (maxAttempts == -1 || attempts < maxAttempts);
3035    return null;
3036  }
3037
3038  public MiniDFSCluster getDFSCluster() {
3039    return dfsCluster;
3040  }
3041
3042  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3043    setDFSCluster(cluster, true);
3044  }
3045
3046  /**
3047   * Set the MiniDFSCluster
3048   * @param cluster     cluster to use
3049   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
3050   *                    is set.
3051   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3052   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
3053   */
3054  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3055    throws IllegalStateException, IOException {
3056    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3057      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3058    }
3059    this.dfsCluster = cluster;
3060    this.setFs();
3061  }
3062
3063  public FileSystem getTestFileSystem() throws IOException {
3064    return HFileSystem.get(conf);
3065  }
3066
3067  /**
3068   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
3069   * (30 seconds).
3070   * @param table Table to wait on.
3071   */
3072  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
3073    waitTableAvailable(table.getName(), 30000);
3074  }
3075
3076  public void waitTableAvailable(TableName table, long timeoutMillis)
3077    throws InterruptedException, IOException {
3078    waitFor(timeoutMillis, predicateTableAvailable(table));
3079  }
3080
3081  /**
3082   * Wait until all regions in a table have been assigned
3083   * @param table         Table to wait on.
3084   * @param timeoutMillis Timeout.
3085   */
3086  public void waitTableAvailable(byte[] table, long timeoutMillis)
3087    throws InterruptedException, IOException {
3088    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3089  }
3090
3091  public String explainTableAvailability(TableName tableName) throws IOException {
3092    StringBuilder msg =
3093      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
3094    if (getHBaseCluster().getMaster().isAlive()) {
3095      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3096        .getRegionStates().getRegionAssignments();
3097      final List<Pair<RegionInfo, ServerName>> metaLocations =
3098        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3099      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3100        RegionInfo hri = metaLocation.getFirst();
3101        ServerName sn = metaLocation.getSecond();
3102        if (!assignments.containsKey(hri)) {
3103          msg.append(", region ").append(hri)
3104            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
3105        } else if (sn == null) {
3106          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
3107        } else if (!sn.equals(assignments.get(hri))) {
3108          msg.append(",  region ").append(hri)
3109            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
3110            .append(" <> ").append(assignments.get(hri));
3111        }
3112      }
3113    }
3114    return msg.toString();
3115  }
3116
3117  public String explainTableState(final TableName table, TableState.State state)
3118    throws IOException {
3119    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3120    if (tableState == null) {
3121      return "TableState in META: No table state in META for table " + table
3122        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3123    } else if (!tableState.inStates(state)) {
3124      return "TableState in META: Not " + state + " state, but " + tableState;
3125    } else {
3126      return "TableState in META: OK";
3127    }
3128  }
3129
3130  public TableState findLastTableState(final TableName table) throws IOException {
3131    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3132    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3133      @Override
3134      public boolean visit(Result r) throws IOException {
3135        if (!Arrays.equals(r.getRow(), table.getName())) {
3136          return false;
3137        }
3138        TableState state = CatalogFamilyFormat.getTableState(r);
3139        if (state != null) {
3140          lastTableState.set(state);
3141        }
3142        return true;
3143      }
3144    };
3145    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
3146      Integer.MAX_VALUE, visitor);
3147    return lastTableState.get();
3148  }
3149
3150  /**
3151   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3152   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
3153   * table.
3154   * @param table the table to wait on.
3155   * @throws InterruptedException if interrupted while waiting
3156   * @throws IOException          if an IO problem is encountered
3157   */
3158  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
3159    waitTableEnabled(table, 30000);
3160  }
3161
3162  /**
3163   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3164   * have been all assigned.
3165   * @see #waitTableEnabled(TableName, long)
3166   * @param table         Table to wait on.
3167   * @param timeoutMillis Time to wait on it being marked enabled.
3168   */
3169  public void waitTableEnabled(byte[] table, long timeoutMillis)
3170    throws InterruptedException, IOException {
3171    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3172  }
3173
3174  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
3175    waitFor(timeoutMillis, predicateTableEnabled(table));
3176  }
3177
3178  /**
3179   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
3180   * after default period (30 seconds)
3181   * @param table Table to wait on.
3182   */
3183  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
3184    waitTableDisabled(table, 30000);
3185  }
3186
3187  public void waitTableDisabled(TableName table, long millisTimeout)
3188    throws InterruptedException, IOException {
3189    waitFor(millisTimeout, predicateTableDisabled(table));
3190  }
3191
3192  /**
3193   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
3194   * @param table         Table to wait on.
3195   * @param timeoutMillis Time to wait on it being marked disabled.
3196   */
3197  public void waitTableDisabled(byte[] table, long timeoutMillis)
3198    throws InterruptedException, IOException {
3199    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3200  }
3201
3202  /**
3203   * Make sure that at least the specified number of region servers are running
3204   * @param num minimum number of region servers that should be running
3205   * @return true if we started some servers
3206   */
3207  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
3208    boolean startedServer = false;
3209    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3210    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
3211      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3212      startedServer = true;
3213    }
3214
3215    return startedServer;
3216  }
3217
3218  /**
3219   * Make sure that at least the specified number of region servers are running. We don't count the
3220   * ones that are currently stopping or are stopped.
3221   * @param num minimum number of region servers that should be running
3222   * @return true if we started some servers
3223   */
3224  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
3225    boolean startedServer = ensureSomeRegionServersAvailable(num);
3226
3227    int nonStoppedServers = 0;
3228    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
3229
3230      HRegionServer hrs = rst.getRegionServer();
3231      if (hrs.isStopping() || hrs.isStopped()) {
3232        LOG.info("A region server is stopped or stopping:" + hrs);
3233      } else {
3234        nonStoppedServers++;
3235      }
3236    }
3237    for (int i = nonStoppedServers; i < num; ++i) {
3238      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3239      startedServer = true;
3240    }
3241    return startedServer;
3242  }
3243
3244  /**
3245   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
3246   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
3247   * @param c                     Initial configuration
3248   * @param differentiatingSuffix Suffix to differentiate this user from others.
3249   * @return A new configuration instance with a different user set into it.
3250   */
3251  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
3252    throws IOException {
3253    FileSystem currentfs = FileSystem.get(c);
3254    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3255      return User.getCurrent();
3256    }
3257    // Else distributed filesystem. Make a new instance per daemon. Below
3258    // code is taken from the AppendTestUtil over in hdfs.
3259    String username = User.getCurrent().getName() + differentiatingSuffix;
3260    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
3261    return user;
3262  }
3263
3264  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3265    throws IOException {
3266    NavigableSet<String> online = new TreeSet<>();
3267    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3268      try {
3269        for (RegionInfo region : ProtobufUtil
3270          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3271          online.add(region.getRegionNameAsString());
3272        }
3273      } catch (RegionServerStoppedException e) {
3274        // That's fine.
3275      }
3276    }
3277    return online;
3278  }
3279
3280  /**
3281   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
3282   * linger. Here is the exception you'll see:
3283   *
3284   * <pre>
3285   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3286   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3287   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3288   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3289   * </pre>
3290   *
3291   * @param stream A DFSClient.DFSOutputStream.
3292   */
3293  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3294    try {
3295      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3296      for (Class<?> clazz : clazzes) {
3297        String className = clazz.getSimpleName();
3298        if (className.equals("DFSOutputStream")) {
3299          if (clazz.isInstance(stream)) {
3300            Field maxRecoveryErrorCountField =
3301              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3302            maxRecoveryErrorCountField.setAccessible(true);
3303            maxRecoveryErrorCountField.setInt(stream, max);
3304            break;
3305          }
3306        }
3307      }
3308    } catch (Exception e) {
3309      LOG.info("Could not set max recovery field", e);
3310    }
3311  }
3312
3313  /**
3314   * Uses directly the assignment manager to assign the region. and waits until the specified region
3315   * has completed assignment.
3316   * @return true if the region is assigned false otherwise.
3317   */
3318  public boolean assignRegion(final RegionInfo regionInfo)
3319    throws IOException, InterruptedException {
3320    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3321    am.assign(regionInfo);
3322    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3323  }
3324
3325  /**
3326   * Move region to destination server and wait till region is completely moved and online
3327   * @param destRegion region to move
3328   * @param destServer destination server of the region
3329   */
3330  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3331    throws InterruptedException, IOException {
3332    HMaster master = getMiniHBaseCluster().getMaster();
3333    // TODO: Here we start the move. The move can take a while.
3334    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3335    while (true) {
3336      ServerName serverName =
3337        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3338      if (serverName != null && serverName.equals(destServer)) {
3339        assertRegionOnServer(destRegion, serverName, 2000);
3340        break;
3341      }
3342      Thread.sleep(10);
3343    }
3344  }
3345
3346  /**
3347   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3348   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3349   * master has been informed and updated hbase:meta with the regions deployed server.
3350   * @param tableName the table name
3351   */
3352  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3353    waitUntilAllRegionsAssigned(tableName,
3354      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3355  }
3356
3357  /**
3358   * Waith until all system table's regions get assigned
3359   */
3360  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3361    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3362  }
3363
3364  /**
3365   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3366   * timeout. This means all regions have been deployed, master has been informed and updated
3367   * hbase:meta with the regions deployed server.
3368   * @param tableName the table name
3369   * @param timeout   timeout, in milliseconds
3370   */
3371  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3372    throws IOException {
3373    if (!TableName.isMetaTableName(tableName)) {
3374      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3375        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3376          + timeout + "ms");
3377        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3378          @Override
3379          public String explainFailure() throws IOException {
3380            return explainTableAvailability(tableName);
3381          }
3382
3383          @Override
3384          public boolean evaluate() throws IOException {
3385            Scan scan = new Scan();
3386            scan.addFamily(HConstants.CATALOG_FAMILY);
3387            boolean tableFound = false;
3388            try (ResultScanner s = meta.getScanner(scan)) {
3389              for (Result r; (r = s.next()) != null;) {
3390                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3391                RegionInfo info = RegionInfo.parseFromOrNull(b);
3392                if (info != null && info.getTable().equals(tableName)) {
3393                  // Get server hosting this region from catalog family. Return false if no server
3394                  // hosting this region, or if the server hosting this region was recently killed
3395                  // (for fault tolerance testing).
3396                  tableFound = true;
3397                  byte[] server =
3398                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3399                  if (server == null) {
3400                    return false;
3401                  } else {
3402                    byte[] startCode =
3403                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3404                    ServerName serverName =
3405                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3406                        + Bytes.toLong(startCode));
3407                    if (
3408                      !getHBaseClusterInterface().isDistributedCluster()
3409                        && getHBaseCluster().isKilledRS(serverName)
3410                    ) {
3411                      return false;
3412                    }
3413                  }
3414                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3415                    return false;
3416                  }
3417                }
3418              }
3419            }
3420            if (!tableFound) {
3421              LOG.warn(
3422                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3423            }
3424            return tableFound;
3425          }
3426        });
3427      }
3428    }
3429    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3430    // check from the master state if we are using a mini cluster
3431    if (!getHBaseClusterInterface().isDistributedCluster()) {
3432      // So, all regions are in the meta table but make sure master knows of the assignments before
3433      // returning -- sometimes this can lag.
3434      HMaster master = getHBaseCluster().getMaster();
3435      final RegionStates states = master.getAssignmentManager().getRegionStates();
3436      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3437        @Override
3438        public String explainFailure() throws IOException {
3439          return explainTableAvailability(tableName);
3440        }
3441
3442        @Override
3443        public boolean evaluate() throws IOException {
3444          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3445          return hris != null && !hris.isEmpty();
3446        }
3447      });
3448    }
3449    LOG.info("All regions for table " + tableName + " assigned.");
3450  }
3451
3452  /**
3453   * Do a small get/scan against one store. This is required because store has no actual methods of
3454   * querying itself, and relies on StoreScanner.
3455   */
3456  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3457    Scan scan = new Scan(get);
3458    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3459      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3460      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3461      // readpoint 0.
3462      0);
3463
3464    List<Cell> result = new ArrayList<>();
3465    scanner.next(result);
3466    if (!result.isEmpty()) {
3467      // verify that we are on the row we want:
3468      Cell kv = result.get(0);
3469      if (!CellUtil.matchingRows(kv, get.getRow())) {
3470        result.clear();
3471      }
3472    }
3473    scanner.close();
3474    return result;
3475  }
3476
3477  /**
3478   * Create region split keys between startkey and endKey
3479   * @param numRegions the number of regions to be created. it has to be greater than 3.
3480   * @return resulting split keys
3481   */
3482  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3483    if (numRegions <= 3) {
3484      throw new AssertionError();
3485    }
3486    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3487    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3488    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3489    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3490    return result;
3491  }
3492
3493  /**
3494   * Do a small get/scan against one store. This is required because store has no actual methods of
3495   * querying itself, and relies on StoreScanner.
3496   */
3497  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3498    throws IOException {
3499    Get get = new Get(row);
3500    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3501    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3502
3503    return getFromStoreFile(store, get);
3504  }
3505
3506  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3507    final List<? extends Cell> actual) {
3508    final int eLen = expected.size();
3509    final int aLen = actual.size();
3510    final int minLen = Math.min(eLen, aLen);
3511
3512    int i;
3513    for (i = 0; i < minLen
3514      && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) {
3515    }
3516
3517    if (additionalMsg == null) {
3518      additionalMsg = "";
3519    }
3520    if (!additionalMsg.isEmpty()) {
3521      additionalMsg = ". " + additionalMsg;
3522    }
3523
3524    if (eLen != aLen || i != minLen) {
3525      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3526        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3527        + " (length " + aLen + ")" + additionalMsg);
3528    }
3529  }
3530
3531  public static <T> String safeGetAsStr(List<T> lst, int i) {
3532    if (0 <= i && i < lst.size()) {
3533      return lst.get(i).toString();
3534    } else {
3535      return "<out_of_range>";
3536    }
3537  }
3538
3539  public String getRpcConnnectionURI() throws UnknownHostException {
3540    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3541  }
3542
3543  public String getZkConnectionURI() {
3544    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3545      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3546      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3547  }
3548
3549  /**
3550   * Get the zk based cluster key for this cluster.
3551   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3552   *             connection info of a cluster. Keep here only for compatibility.
3553   * @see #getRpcConnnectionURI()
3554   * @see #getZkConnectionURI()
3555   */
3556  @Deprecated
3557  public String getClusterKey() {
3558    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3559      + ":"
3560      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3561  }
3562
3563  /** Creates a random table with the given parameters */
3564  public Table createRandomTable(TableName tableName, final Collection<String> families,
3565    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3566    final int numRowsPerFlush) throws IOException, InterruptedException {
3567
3568    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3569      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3570      + maxVersions + "\n");
3571
3572    final int numCF = families.size();
3573    final byte[][] cfBytes = new byte[numCF][];
3574    {
3575      int cfIndex = 0;
3576      for (String cf : families) {
3577        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3578      }
3579    }
3580
3581    final int actualStartKey = 0;
3582    final int actualEndKey = Integer.MAX_VALUE;
3583    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3584    final int splitStartKey = actualStartKey + keysPerRegion;
3585    final int splitEndKey = actualEndKey - keysPerRegion;
3586    final String keyFormat = "%08x";
3587    final Table table = createTable(tableName, cfBytes, maxVersions,
3588      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3589      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3590
3591    if (hbaseCluster != null) {
3592      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3593    }
3594
3595    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3596
3597    final Random rand = ThreadLocalRandom.current();
3598    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3599      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3600        final byte[] row = Bytes.toBytes(
3601          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3602
3603        Put put = new Put(row);
3604        Delete del = new Delete(row);
3605        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3606          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3607          final long ts = rand.nextInt();
3608          final byte[] qual = Bytes.toBytes("col" + iCol);
3609          if (rand.nextBoolean()) {
3610            final byte[] value =
3611              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3612                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3613            put.addColumn(cf, qual, ts, value);
3614          } else if (rand.nextDouble() < 0.8) {
3615            del.addColumn(cf, qual, ts);
3616          } else {
3617            del.addColumns(cf, qual, ts);
3618          }
3619        }
3620
3621        if (!put.isEmpty()) {
3622          mutator.mutate(put);
3623        }
3624
3625        if (!del.isEmpty()) {
3626          mutator.mutate(del);
3627        }
3628      }
3629      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3630      mutator.flush();
3631      if (hbaseCluster != null) {
3632        getMiniHBaseCluster().flushcache(table.getName());
3633      }
3634    }
3635    mutator.close();
3636
3637    return table;
3638  }
3639
3640  public static int randomFreePort() {
3641    return HBaseCommonTestingUtility.randomFreePort();
3642  }
3643
3644  public static String randomMultiCastAddress() {
3645    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3646  }
3647
3648  public static void waitForHostPort(String host, int port) throws IOException {
3649    final int maxTimeMs = 10000;
3650    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3651    IOException savedException = null;
3652    LOG.info("Waiting for server at " + host + ":" + port);
3653    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3654      try {
3655        Socket sock = new Socket(InetAddress.getByName(host), port);
3656        sock.close();
3657        savedException = null;
3658        LOG.info("Server at " + host + ":" + port + " is available");
3659        break;
3660      } catch (UnknownHostException e) {
3661        throw new IOException("Failed to look up " + host, e);
3662      } catch (IOException e) {
3663        savedException = e;
3664      }
3665      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3666    }
3667
3668    if (savedException != null) {
3669      throw savedException;
3670    }
3671  }
3672
3673  /**
3674   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3675   * continues.
3676   * @return the number of regions the table was split into
3677   */
3678  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3679    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3680    throws IOException {
3681    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3682      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3683  }
3684
3685  /**
3686   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3687   * continues.
3688   * @return the number of regions the table was split into
3689   */
3690  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3691    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3692    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3693    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3694    builder.setDurability(durability);
3695    builder.setRegionReplication(regionReplication);
3696    ColumnFamilyDescriptorBuilder cfBuilder =
3697      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3698    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3699    cfBuilder.setCompressionType(compression);
3700    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3701      numRegionsPerServer);
3702  }
3703
3704  /**
3705   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3706   * continues.
3707   * @return the number of regions the table was split into
3708   */
3709  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3710    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3711    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3712    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3713    builder.setDurability(durability);
3714    builder.setRegionReplication(regionReplication);
3715    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3716    for (int i = 0; i < columnFamilies.length; i++) {
3717      ColumnFamilyDescriptorBuilder cfBuilder =
3718        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3719      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3720      cfBuilder.setCompressionType(compression);
3721      hcds[i] = cfBuilder.build();
3722    }
3723    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3724  }
3725
3726  /**
3727   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3728   * continues.
3729   * @return the number of regions the table was split into
3730   */
3731  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3732    ColumnFamilyDescriptor hcd) throws IOException {
3733    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3734  }
3735
3736  /**
3737   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3738   * continues.
3739   * @return the number of regions the table was split into
3740   */
3741  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3742    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3743    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3744      numRegionsPerServer);
3745  }
3746
3747  /**
3748   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3749   * continues.
3750   * @return the number of regions the table was split into
3751   */
3752  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3753    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3754    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3755      numRegionsPerServer);
3756  }
3757
3758  /**
3759   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3760   * continues.
3761   * @return the number of regions the table was split into
3762   */
3763  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3764    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3765    throws IOException {
3766    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3767    for (ColumnFamilyDescriptor cd : cds) {
3768      if (!td.hasColumnFamily(cd.getName())) {
3769        builder.setColumnFamily(cd);
3770      }
3771    }
3772    td = builder.build();
3773    int totalNumberOfRegions = 0;
3774    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3775    Admin admin = unmanagedConnection.getAdmin();
3776
3777    try {
3778      // create a table a pre-splits regions.
3779      // The number of splits is set as:
3780      // region servers * regions per region server).
3781      int numberOfServers = admin.getRegionServers().size();
3782      if (numberOfServers == 0) {
3783        throw new IllegalStateException("No live regionservers");
3784      }
3785
3786      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3787      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3788        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3789        + numRegionsPerServer + ")");
3790
3791      byte[][] splits = splitter.split(totalNumberOfRegions);
3792
3793      admin.createTable(td, splits);
3794    } catch (MasterNotRunningException e) {
3795      LOG.error("Master not running", e);
3796      throw new IOException(e);
3797    } catch (TableExistsException e) {
3798      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3799    } finally {
3800      admin.close();
3801      unmanagedConnection.close();
3802    }
3803    return totalNumberOfRegions;
3804  }
3805
3806  public static int getMetaRSPort(Connection connection) throws IOException {
3807    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3808      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3809    }
3810  }
3811
3812  /**
3813   * Due to async racing issue, a region may not be in the online region list of a region server
3814   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3815   */
3816  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3817    final long timeout) throws IOException, InterruptedException {
3818    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3819    while (true) {
3820      List<RegionInfo> regions = getAdmin().getRegions(server);
3821      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3822      long now = EnvironmentEdgeManager.currentTime();
3823      if (now > timeoutTime) break;
3824      Thread.sleep(10);
3825    }
3826    throw new AssertionError(
3827      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3828  }
3829
3830  /**
3831   * Check to make sure the region is open on the specified region server, but not on any other one.
3832   */
3833  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3834    final long timeout) throws IOException, InterruptedException {
3835    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3836    while (true) {
3837      List<RegionInfo> regions = getAdmin().getRegions(server);
3838      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3839        List<JVMClusterUtil.RegionServerThread> rsThreads =
3840          getHBaseCluster().getLiveRegionServerThreads();
3841        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3842          HRegionServer rs = rsThread.getRegionServer();
3843          if (server.equals(rs.getServerName())) {
3844            continue;
3845          }
3846          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3847          for (HRegion r : hrs) {
3848            if (r.getRegionInfo().getRegionId() == hri.getRegionId()) {
3849              throw new AssertionError("Region should not be double assigned");
3850            }
3851          }
3852        }
3853        return; // good, we are happy
3854      }
3855      long now = EnvironmentEdgeManager.currentTime();
3856      if (now > timeoutTime) break;
3857      Thread.sleep(10);
3858    }
3859    throw new AssertionError(
3860      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3861  }
3862
3863  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3864    TableDescriptor td =
3865      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3866    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3867    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3868  }
3869
3870  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3871    BlockCache blockCache) throws IOException {
3872    TableDescriptor td =
3873      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3874    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3875    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3876  }
3877
3878  public static void setFileSystemURI(String fsURI) {
3879    FS_URI = fsURI;
3880  }
3881
3882  /**
3883   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3884   */
3885  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3886    return new ExplainingPredicate<IOException>() {
3887      @Override
3888      public String explainFailure() throws IOException {
3889        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3890        return "found in transition: " + am.getRegionsInTransition().toString();
3891      }
3892
3893      @Override
3894      public boolean evaluate() throws IOException {
3895        HMaster master = getMiniHBaseCluster().getMaster();
3896        if (master == null) return false;
3897        AssignmentManager am = master.getAssignmentManager();
3898        if (am == null) return false;
3899        return !am.hasRegionsInTransition();
3900      }
3901    };
3902  }
3903
3904  /**
3905   * Returns a {@link Predicate} for checking that table is enabled
3906   */
3907  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3908    return new ExplainingPredicate<IOException>() {
3909      @Override
3910      public String explainFailure() throws IOException {
3911        return explainTableState(tableName, TableState.State.ENABLED);
3912      }
3913
3914      @Override
3915      public boolean evaluate() throws IOException {
3916        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3917      }
3918    };
3919  }
3920
3921  /**
3922   * Returns a {@link Predicate} for checking that table is enabled
3923   */
3924  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3925    return new ExplainingPredicate<IOException>() {
3926      @Override
3927      public String explainFailure() throws IOException {
3928        return explainTableState(tableName, TableState.State.DISABLED);
3929      }
3930
3931      @Override
3932      public boolean evaluate() throws IOException {
3933        return getAdmin().isTableDisabled(tableName);
3934      }
3935    };
3936  }
3937
3938  /**
3939   * Returns a {@link Predicate} for checking that table is enabled
3940   */
3941  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3942    return new ExplainingPredicate<IOException>() {
3943      @Override
3944      public String explainFailure() throws IOException {
3945        return explainTableAvailability(tableName);
3946      }
3947
3948      @Override
3949      public boolean evaluate() throws IOException {
3950        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3951        if (tableAvailable) {
3952          try (Table table = getConnection().getTable(tableName)) {
3953            TableDescriptor htd = table.getDescriptor();
3954            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3955              .getAllRegionLocations()) {
3956              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3957                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3958                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3959              for (byte[] family : htd.getColumnFamilyNames()) {
3960                scan.addFamily(family);
3961              }
3962              try (ResultScanner scanner = table.getScanner(scan)) {
3963                scanner.next();
3964              }
3965            }
3966          }
3967        }
3968        return tableAvailable;
3969      }
3970    };
3971  }
3972
3973  /**
3974   * Wait until no regions in transition.
3975   * @param timeout How long to wait.
3976   */
3977  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3978    waitFor(timeout, predicateNoRegionsInTransition());
3979  }
3980
3981  /**
3982   * Wait until no regions in transition. (time limit 15min)
3983   */
3984  public void waitUntilNoRegionsInTransition() throws IOException {
3985    waitUntilNoRegionsInTransition(15 * 60000);
3986  }
3987
3988  /**
3989   * Wait until labels is ready in VisibilityLabelsCache.
3990   */
3991  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3992    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3993    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3994
3995      @Override
3996      public boolean evaluate() {
3997        for (String label : labels) {
3998          if (labelsCache.getLabelOrdinal(label) == 0) {
3999            return false;
4000          }
4001        }
4002        return true;
4003      }
4004
4005      @Override
4006      public String explainFailure() {
4007        for (String label : labels) {
4008          if (labelsCache.getLabelOrdinal(label) == 0) {
4009            return label + " is not available yet";
4010          }
4011        }
4012        return "";
4013      }
4014    });
4015  }
4016
4017  /**
4018   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4019   * available.
4020   * @return the list of column descriptors
4021   */
4022  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4023    return generateColumnDescriptors("");
4024  }
4025
4026  /**
4027   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4028   * available.
4029   * @param prefix family names prefix
4030   * @return the list of column descriptors
4031   */
4032  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4033    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4034    long familyId = 0;
4035    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
4036      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
4037        for (BloomType bloomType : BloomType.values()) {
4038          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4039          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4040            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4041          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4042          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4043          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4044          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4045          familyId++;
4046        }
4047      }
4048    }
4049    return columnFamilyDescriptors;
4050  }
4051
4052  /**
4053   * Get supported compression algorithms.
4054   * @return supported compression algorithms.
4055   */
4056  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4057    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4058    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4059    for (String algoName : allAlgos) {
4060      try {
4061        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4062        algo.getCompressor();
4063        supportedAlgos.add(algo);
4064      } catch (Throwable t) {
4065        // this algo is not available
4066      }
4067    }
4068    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4069  }
4070
4071  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4072    Scan scan = new Scan().withStartRow(row);
4073    scan.setReadType(ReadType.PREAD);
4074    scan.setCaching(1);
4075    scan.setReversed(true);
4076    scan.addFamily(family);
4077    try (RegionScanner scanner = r.getScanner(scan)) {
4078      List<Cell> cells = new ArrayList<>(1);
4079      scanner.next(cells);
4080      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4081        return null;
4082      }
4083      return Result.create(cells);
4084    }
4085  }
4086
4087  private boolean isTargetTable(final byte[] inRow, Cell c) {
4088    String inputRowString = Bytes.toString(inRow);
4089    int i = inputRowString.indexOf(HConstants.DELIMITER);
4090    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4091    int o = outputRowString.indexOf(HConstants.DELIMITER);
4092    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4093  }
4094
4095  /**
4096   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
4097   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
4098   * kerby KDC server and utility for using it,
4099   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4100   * less baggage. It came in in HBASE-5291.
4101   */
4102  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4103    Properties conf = MiniKdc.createConf();
4104    conf.put(MiniKdc.DEBUG, true);
4105    MiniKdc kdc = null;
4106    File dir = null;
4107    // There is time lag between selecting a port and trying to bind with it. It's possible that
4108    // another service captures the port in between which'll result in BindException.
4109    boolean bindException;
4110    int numTries = 0;
4111    do {
4112      try {
4113        bindException = false;
4114        dir = new File(getDataTestDir("kdc").toUri().getPath());
4115        kdc = new MiniKdc(conf, dir);
4116        kdc.start();
4117      } catch (BindException e) {
4118        FileUtils.deleteDirectory(dir); // clean directory
4119        numTries++;
4120        if (numTries == 3) {
4121          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4122          throw e;
4123        }
4124        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4125        bindException = true;
4126      }
4127    } while (bindException);
4128    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4129    return kdc;
4130  }
4131
4132  public int getNumHFiles(final TableName tableName, final byte[] family) {
4133    int numHFiles = 0;
4134    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4135      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
4136    }
4137    return numHFiles;
4138  }
4139
4140  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4141    final byte[] family) {
4142    int numHFiles = 0;
4143    for (Region region : rs.getRegions(tableName)) {
4144      numHFiles += region.getStore(family).getStorefilesCount();
4145    }
4146    return numHFiles;
4147  }
4148
4149  private void assertEquals(String message, int expected, int actual) {
4150    if (expected == actual) {
4151      return;
4152    }
4153    String formatted = "";
4154    if (message != null && !"".equals(message)) {
4155      formatted = message + " ";
4156    }
4157    throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">");
4158  }
4159
4160  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4161    if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) {
4162      throw new AssertionError();
4163    }
4164    assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode());
4165    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4166    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4167    assertEquals("", ltdFamilies.size(), rtdFamilies.size());
4168    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
4169        it2 = rtdFamilies.iterator(); it.hasNext();) {
4170      assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4171    }
4172  }
4173
4174  /**
4175   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4176   * invocations.
4177   */
4178  public static void await(final long sleepMillis, final BooleanSupplier condition)
4179    throws InterruptedException {
4180    try {
4181      while (!condition.getAsBoolean()) {
4182        Thread.sleep(sleepMillis);
4183      }
4184    } catch (RuntimeException e) {
4185      if (e.getCause() instanceof AssertionError) {
4186        throw (AssertionError) e.getCause();
4187      }
4188      throw e;
4189    }
4190  }
4191}