001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.io.UncheckedIOException;
024import java.lang.reflect.Field;
025import java.lang.reflect.Modifier;
026import java.net.BindException;
027import java.net.DatagramSocket;
028import java.net.InetAddress;
029import java.net.ServerSocket;
030import java.net.Socket;
031import java.net.UnknownHostException;
032import java.nio.charset.StandardCharsets;
033import java.security.MessageDigest;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Properties;
044import java.util.Random;
045import java.util.Set;
046import java.util.TreeSet;
047import java.util.concurrent.ThreadLocalRandom;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicReference;
050import java.util.function.BooleanSupplier;
051import org.apache.commons.io.FileUtils;
052import org.apache.commons.lang3.RandomStringUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
057import org.apache.hadoop.hbase.Waiter.Predicate;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.AsyncClusterConnection;
060import org.apache.hadoop.hbase.client.BufferedMutator;
061import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
064import org.apache.hadoop.hbase.client.Connection;
065import org.apache.hadoop.hbase.client.ConnectionFactory;
066import org.apache.hadoop.hbase.client.Consistency;
067import org.apache.hadoop.hbase.client.Delete;
068import org.apache.hadoop.hbase.client.Durability;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Hbck;
071import org.apache.hadoop.hbase.client.MasterRegistry;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.client.RegionLocator;
076import org.apache.hadoop.hbase.client.Result;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.Scan.ReadType;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.client.TableState;
084import org.apache.hadoop.hbase.fs.HFileSystem;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.BlockCache;
089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
090import org.apache.hadoop.hbase.io.hfile.HFile;
091import org.apache.hadoop.hbase.ipc.RpcServerInterface;
092import org.apache.hadoop.hbase.logging.Log4jUtils;
093import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
094import org.apache.hadoop.hbase.master.HMaster;
095import org.apache.hadoop.hbase.master.RegionState;
096import org.apache.hadoop.hbase.master.ServerManager;
097import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
098import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
099import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
100import org.apache.hadoop.hbase.master.assignment.RegionStates;
101import org.apache.hadoop.hbase.mob.MobFileCache;
102import org.apache.hadoop.hbase.regionserver.BloomType;
103import org.apache.hadoop.hbase.regionserver.ChunkCreator;
104import org.apache.hadoop.hbase.regionserver.HRegion;
105import org.apache.hadoop.hbase.regionserver.HRegionServer;
106import org.apache.hadoop.hbase.regionserver.HStore;
107import org.apache.hadoop.hbase.regionserver.InternalScanner;
108import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
109import org.apache.hadoop.hbase.regionserver.Region;
110import org.apache.hadoop.hbase.regionserver.RegionScanner;
111import org.apache.hadoop.hbase.regionserver.RegionServerServices;
112import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
113import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
114import org.apache.hadoop.hbase.security.User;
115import org.apache.hadoop.hbase.security.UserProvider;
116import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
117import org.apache.hadoop.hbase.util.Bytes;
118import org.apache.hadoop.hbase.util.CommonFSUtils;
119import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
120import org.apache.hadoop.hbase.util.FSUtils;
121import org.apache.hadoop.hbase.util.JVMClusterUtil;
122import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
123import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
124import org.apache.hadoop.hbase.util.Pair;
125import org.apache.hadoop.hbase.util.ReflectionUtils;
126import org.apache.hadoop.hbase.util.RegionSplitter;
127import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
128import org.apache.hadoop.hbase.util.RetryCounter;
129import org.apache.hadoop.hbase.util.Threads;
130import org.apache.hadoop.hbase.wal.WAL;
131import org.apache.hadoop.hbase.wal.WALFactory;
132import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
133import org.apache.hadoop.hbase.zookeeper.ZKConfig;
134import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
135import org.apache.hadoop.hdfs.DFSClient;
136import org.apache.hadoop.hdfs.DistributedFileSystem;
137import org.apache.hadoop.hdfs.MiniDFSCluster;
138import org.apache.hadoop.hdfs.server.datanode.DataNode;
139import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
140import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
141import org.apache.hadoop.mapred.JobConf;
142import org.apache.hadoop.mapred.MiniMRCluster;
143import org.apache.hadoop.mapred.TaskLog;
144import org.apache.hadoop.minikdc.MiniKdc;
145import org.apache.yetus.audience.InterfaceAudience;
146import org.apache.zookeeper.WatchedEvent;
147import org.apache.zookeeper.ZooKeeper;
148import org.apache.zookeeper.ZooKeeper.States;
149
150import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
151
152import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
153
154/**
155 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
156 * functionality. Create an instance and keep it around testing HBase. This class is meant to be
157 * your one-stop shop for anything you might need testing. Manages one cluster at a time only.
158 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type
159 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j
160 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set
161 * logging levels. In the configuration properties, default values for master-info-port and
162 * region-server-port are overridden such that a random port will be assigned (thus avoiding port
163 * contention if another local HBase instance is already running).
164 * <p>
165 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
166 * setting it to true.
167 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
168 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
169 */
170@InterfaceAudience.Public
171@Deprecated
172public class HBaseTestingUtility extends HBaseZKTestingUtility {
173
174  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
175  /**
176   * The default number of regions per regionserver when creating a pre-split table.
177   */
178  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
179
180  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
181  public static final boolean PRESPLIT_TEST_TABLE = true;
182
183  private MiniDFSCluster dfsCluster = null;
184  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
185
186  private volatile HBaseCluster hbaseCluster = null;
187  private MiniMRCluster mrCluster = null;
188
189  /** If there is a mini cluster running for this testing utility instance. */
190  private volatile boolean miniClusterRunning;
191
192  private String hadoopLogDir;
193
194  /**
195   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
196   */
197  private Path dataTestDirOnTestFS = null;
198
199  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
200
201  /** Filesystem URI used for map-reduce mini-cluster setup */
202  private static String FS_URI;
203
204  /** This is for unit tests parameterized with a single boolean. */
205  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
206
207  /**
208   * Checks to see if a specific port is available.
209   * @param port the port number to check for availability
210   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
211   */
212  public static boolean available(int port) {
213    ServerSocket ss = null;
214    DatagramSocket ds = null;
215    try {
216      ss = new ServerSocket(port);
217      ss.setReuseAddress(true);
218      ds = new DatagramSocket(port);
219      ds.setReuseAddress(true);
220      return true;
221    } catch (IOException e) {
222      // Do nothing
223    } finally {
224      if (ds != null) {
225        ds.close();
226      }
227
228      if (ss != null) {
229        try {
230          ss.close();
231        } catch (IOException e) {
232          /* should not be thrown */
233        }
234      }
235    }
236
237    return false;
238  }
239
240  /**
241   * Create all combinations of Bloom filters and compression algorithms for testing.
242   */
243  private static List<Object[]> bloomAndCompressionCombinations() {
244    List<Object[]> configurations = new ArrayList<>();
245    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
246      for (BloomType bloomType : BloomType.values()) {
247        configurations.add(new Object[] { comprAlgo, bloomType });
248      }
249    }
250    return Collections.unmodifiableList(configurations);
251  }
252
253  /**
254   * Create combination of memstoreTS and tags
255   */
256  private static List<Object[]> memStoreTSAndTagsCombination() {
257    List<Object[]> configurations = new ArrayList<>();
258    configurations.add(new Object[] { false, false });
259    configurations.add(new Object[] { false, true });
260    configurations.add(new Object[] { true, false });
261    configurations.add(new Object[] { true, true });
262    return Collections.unmodifiableList(configurations);
263  }
264
265  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
266    List<Object[]> configurations = new ArrayList<>();
267    configurations.add(new Object[] { false, false, true });
268    configurations.add(new Object[] { false, false, false });
269    configurations.add(new Object[] { false, true, true });
270    configurations.add(new Object[] { false, true, false });
271    configurations.add(new Object[] { true, false, true });
272    configurations.add(new Object[] { true, false, false });
273    configurations.add(new Object[] { true, true, true });
274    configurations.add(new Object[] { true, true, false });
275    return Collections.unmodifiableList(configurations);
276  }
277
278  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
279    bloomAndCompressionCombinations();
280
281  /**
282   * <p>
283   * Create an HBaseTestingUtility using a default configuration.
284   * <p>
285   * Initially, all tmp files are written to a local test data directory. Once
286   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
287   * data will be written to the DFS directory instead.
288   */
289  public HBaseTestingUtility() {
290    this(HBaseConfiguration.create());
291  }
292
293  /**
294   * <p>
295   * Create an HBaseTestingUtility using a given configuration.
296   * <p>
297   * Initially, all tmp files are written to a local test data directory. Once
298   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
299   * data will be written to the DFS directory instead.
300   * @param conf The configuration to use for further operations
301   */
302  public HBaseTestingUtility(Configuration conf) {
303    super(conf);
304
305    // a hbase checksum verification failure will cause unit tests to fail
306    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
307
308    // Save this for when setting default file:// breaks things
309    if (this.conf.get("fs.defaultFS") != null) {
310      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
311    }
312    if (this.conf.get(HConstants.HBASE_DIR) != null) {
313      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
314    }
315    // Every cluster is a local cluster until we start DFS
316    // Note that conf could be null, but this.conf will not be
317    String dataTestDir = getDataTestDir().toString();
318    this.conf.set("fs.defaultFS", "file:///");
319    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
320    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
321    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
322    // If the value for random ports isn't set set it to true, thus making
323    // tests opt-out for random port assignment
324    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
325      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
326  }
327
328  /**
329   * Close both the region {@code r} and it's underlying WAL. For use in tests.
330   */
331  public static void closeRegionAndWAL(final Region r) throws IOException {
332    closeRegionAndWAL((HRegion) r);
333  }
334
335  /**
336   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
337   */
338  public static void closeRegionAndWAL(final HRegion r) throws IOException {
339    if (r == null) return;
340    r.close();
341    if (r.getWAL() == null) return;
342    r.getWAL().close();
343  }
344
345  /**
346   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
347   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
348   * by the Configuration. If say, a Connection was being used against a cluster that had been
349   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
350   * Rather than use the return direct, its usually best to make a copy and use that. Do
351   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
352   * @return Instance of Configuration.
353   */
354  @Override
355  public Configuration getConfiguration() {
356    return super.getConfiguration();
357  }
358
359  public void setHBaseCluster(HBaseCluster hbaseCluster) {
360    this.hbaseCluster = hbaseCluster;
361  }
362
363  /**
364   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
365   * have many concurrent tests running if we need to. Moding a System property is not the way to do
366   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
367   * not anything can do about it at moment; single instance only is how the minidfscluster works.
368   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
369   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
370   * (We do not create them!).
371   * @return The calculated data test build directory, if newly-created.
372   */
373  @Override
374  protected Path setupDataTestDir() {
375    Path testPath = super.setupDataTestDir();
376    if (null == testPath) {
377      return null;
378    }
379
380    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
381
382    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
383    // we want our own value to ensure uniqueness on the same machine
384    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
385
386    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
387    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
388    return testPath;
389  }
390
391  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
392
393    String sysValue = System.getProperty(propertyName);
394
395    if (sysValue != null) {
396      // There is already a value set. So we do nothing but hope
397      // that there will be no conflicts
398      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
399        + " so I do NOT create it in " + parent);
400      String confValue = conf.get(propertyName);
401      if (confValue != null && !confValue.endsWith(sysValue)) {
402        LOG.warn(propertyName + " property value differs in configuration and system: "
403          + "Configuration=" + confValue + " while System=" + sysValue
404          + " Erasing configuration value by system value.");
405      }
406      conf.set(propertyName, sysValue);
407    } else {
408      // Ok, it's not set, so we create it as a subdirectory
409      createSubDir(propertyName, parent, subDirName);
410      System.setProperty(propertyName, conf.get(propertyName));
411    }
412  }
413
414  /**
415   * @return Where to write test data on the test filesystem; Returns working directory for the test
416   *         filesystem by default
417   * @see #setupDataTestDirOnTestFS()
418   * @see #getTestFileSystem()
419   */
420  private Path getBaseTestDirOnTestFS() throws IOException {
421    FileSystem fs = getTestFileSystem();
422    return new Path(fs.getWorkingDirectory(), "test-data");
423  }
424
425  /**
426   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
427   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
428   * on it.
429   * @return a unique path in the test filesystem
430   */
431  public Path getDataTestDirOnTestFS() throws IOException {
432    if (dataTestDirOnTestFS == null) {
433      setupDataTestDirOnTestFS();
434    }
435
436    return dataTestDirOnTestFS;
437  }
438
439  /**
440   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
441   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
442   * on it.
443   * @return a unique path in the test filesystem
444   * @param subdirName name of the subdir to create under the base test dir
445   */
446  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
447    return new Path(getDataTestDirOnTestFS(), subdirName);
448  }
449
450  /**
451   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
452   * setup.
453   */
454  private void setupDataTestDirOnTestFS() throws IOException {
455    if (dataTestDirOnTestFS != null) {
456      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
457      return;
458    }
459    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
460  }
461
462  /**
463   * Sets up a new path in test filesystem to be used by tests.
464   */
465  private Path getNewDataTestDirOnTestFS() throws IOException {
466    // The file system can be either local, mini dfs, or if the configuration
467    // is supplied externally, it can be an external cluster FS. If it is a local
468    // file system, the tests should use getBaseTestDir, otherwise, we can use
469    // the working directory, and create a unique sub dir there
470    FileSystem fs = getTestFileSystem();
471    Path newDataTestDir;
472    String randomStr = getRandomUUID().toString();
473    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
474      newDataTestDir = new Path(getDataTestDir(), randomStr);
475      File dataTestDir = new File(newDataTestDir.toString());
476      if (deleteOnExit()) dataTestDir.deleteOnExit();
477    } else {
478      Path base = getBaseTestDirOnTestFS();
479      newDataTestDir = new Path(base, randomStr);
480      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
481    }
482    return newDataTestDir;
483  }
484
485  /**
486   * Cleans the test data directory on the test filesystem.
487   * @return True if we removed the test dirs
488   */
489  public boolean cleanupDataTestDirOnTestFS() throws IOException {
490    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
491    if (ret) dataTestDirOnTestFS = null;
492    return ret;
493  }
494
495  /**
496   * Cleans a subdirectory under the test data directory on the test filesystem.
497   * @return True if we removed child
498   */
499  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
500    Path cpath = getDataTestDirOnTestFS(subdirName);
501    return getTestFileSystem().delete(cpath, true);
502  }
503
504  // Workaround to avoid IllegalThreadStateException
505  // See HBASE-27148 for more details
506  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
507
508    private volatile boolean stopped = false;
509
510    private final MiniDFSCluster cluster;
511
512    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
513      super("FsDatasetAsyncDiskServiceFixer");
514      setDaemon(true);
515      this.cluster = cluster;
516    }
517
518    @Override
519    public void run() {
520      while (!stopped) {
521        try {
522          Thread.sleep(30000);
523        } catch (InterruptedException e) {
524          Thread.currentThread().interrupt();
525          continue;
526        }
527        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
528        // timeout of the thread pool executor is 60 seconds by default.
529        try {
530          for (DataNode dn : cluster.getDataNodes()) {
531            FsDatasetSpi<?> dataset = dn.getFSDataset();
532            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
533            service.setAccessible(true);
534            Object asyncDiskService = service.get(dataset);
535            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
536            group.setAccessible(true);
537            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
538            if (threadGroup.isDaemon()) {
539              threadGroup.setDaemon(false);
540            }
541          }
542        } catch (NoSuchFieldException e) {
543          LOG.debug("NoSuchFieldException: " + e.getMessage()
544            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
545            + "See HBASE-27595 for details.");
546        } catch (Exception e) {
547          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
548        }
549      }
550    }
551
552    void shutdown() {
553      stopped = true;
554      interrupt();
555    }
556  }
557
558  /**
559   * Start a minidfscluster.
560   * @param servers How many DNs to start.
561   * @see #shutdownMiniDFSCluster()
562   * @return The mini dfs cluster created.
563   */
564  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
565    return startMiniDFSCluster(servers, null);
566  }
567
568  /**
569   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
570   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
571   * instances of the datanodes will have the same host name.
572   * @param hosts hostnames DNs to run on.
573   * @see #shutdownMiniDFSCluster()
574   * @return The mini dfs cluster created.
575   */
576  public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception {
577    if (hosts != null && hosts.length != 0) {
578      return startMiniDFSCluster(hosts.length, hosts);
579    } else {
580      return startMiniDFSCluster(1, null);
581    }
582  }
583
584  /**
585   * Start a minidfscluster. Can only create one.
586   * @param servers How many DNs to start.
587   * @param hosts   hostnames DNs to run on.
588   * @see #shutdownMiniDFSCluster()
589   * @return The mini dfs cluster created.
590   */
591  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception {
592    return startMiniDFSCluster(servers, null, hosts);
593  }
594
595  private void setFs() throws IOException {
596    if (this.dfsCluster == null) {
597      LOG.info("Skipping setting fs because dfsCluster is null");
598      return;
599    }
600    FileSystem fs = this.dfsCluster.getFileSystem();
601    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
602
603    // re-enable this check with dfs
604    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
605  }
606
607  public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
608    throws Exception {
609    createDirsAndSetProperties();
610    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
611
612    this.dfsCluster =
613      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
614    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
615    this.dfsClusterFixer.start();
616    // Set this just-started cluster as our filesystem.
617    setFs();
618
619    // Wait for the cluster to be totally up
620    this.dfsCluster.waitClusterUp();
621
622    // reset the test directory for test file system
623    dataTestDirOnTestFS = null;
624    String dataTestDir = getDataTestDir().toString();
625    conf.set(HConstants.HBASE_DIR, dataTestDir);
626    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
627
628    return this.dfsCluster;
629  }
630
631  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
632    createDirsAndSetProperties();
633    dfsCluster =
634      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
635    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
636    this.dfsClusterFixer.start();
637    return dfsCluster;
638  }
639
640  /**
641   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
642   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
643   * the conf.
644   *
645   * <pre>
646   * Configuration conf = TEST_UTIL.getConfiguration();
647   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
648   *   Map.Entry&lt;String, String&gt; e = i.next();
649   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
650   * }
651   * </pre>
652   */
653  private void createDirsAndSetProperties() throws IOException {
654    setupClusterTestDir();
655    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
656    createDirAndSetProperty("test.cache.data");
657    createDirAndSetProperty("hadoop.tmp.dir");
658    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
659    createDirAndSetProperty("mapreduce.cluster.local.dir");
660    createDirAndSetProperty("mapreduce.cluster.temp.dir");
661    enableShortCircuit();
662
663    Path root = getDataTestDirOnTestFS("hadoop");
664    conf.set(MapreduceTestingShim.getMROutputDirProp(),
665      new Path(root, "mapred-output-dir").toString());
666    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
667    conf.set("mapreduce.jobtracker.staging.root.dir",
668      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
669    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
670    conf.set("yarn.app.mapreduce.am.staging-dir",
671      new Path(root, "mapreduce-am-staging-root-dir").toString());
672
673    // Frustrate yarn's and hdfs's attempts at writing /tmp.
674    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
675    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
676    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
677    createDirAndSetProperty("yarn.nodemanager.log-dirs");
678    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
679    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
680    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
681    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
682    createDirAndSetProperty("dfs.journalnode.edits.dir");
683    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
684    createDirAndSetProperty("nfs.dump.dir");
685    createDirAndSetProperty("java.io.tmpdir");
686    createDirAndSetProperty("dfs.journalnode.edits.dir");
687    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
688    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
689  }
690
691  /**
692   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
693   * Default to false.
694   */
695  public boolean isNewVersionBehaviorEnabled() {
696    final String propName = "hbase.tests.new.version.behavior";
697    String v = System.getProperty(propName);
698    if (v != null) {
699      return Boolean.parseBoolean(v);
700    }
701    return false;
702  }
703
704  /**
705   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
706   * allows to specify this parameter on the command line. If not set, default is true.
707   */
708  public boolean isReadShortCircuitOn() {
709    final String propName = "hbase.tests.use.shortcircuit.reads";
710    String readOnProp = System.getProperty(propName);
711    if (readOnProp != null) {
712      return Boolean.parseBoolean(readOnProp);
713    } else {
714      return conf.getBoolean(propName, false);
715    }
716  }
717
718  /**
719   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
720   * including skipping the hdfs checksum checks.
721   */
722  private void enableShortCircuit() {
723    if (isReadShortCircuitOn()) {
724      String curUser = System.getProperty("user.name");
725      LOG.info("read short circuit is ON for user " + curUser);
726      // read short circuit, for hdfs
727      conf.set("dfs.block.local-path-access.user", curUser);
728      // read short circuit, for hbase
729      conf.setBoolean("dfs.client.read.shortcircuit", true);
730      // Skip checking checksum, for the hdfs client and the datanode
731      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
732    } else {
733      LOG.info("read short circuit is OFF");
734    }
735  }
736
737  private String createDirAndSetProperty(final String property) {
738    return createDirAndSetProperty(property, property);
739  }
740
741  private String createDirAndSetProperty(final String relPath, String property) {
742    String path = getDataTestDir(relPath).toString();
743    System.setProperty(property, path);
744    conf.set(property, path);
745    new File(path).mkdirs();
746    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
747    return path;
748  }
749
750  /**
751   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
752   */
753  public void shutdownMiniDFSCluster() throws IOException {
754    if (this.dfsCluster != null) {
755      // The below throws an exception per dn, AsynchronousCloseException.
756      this.dfsCluster.shutdown();
757      dfsCluster = null;
758      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
759      // have a fixer
760      if (dfsClusterFixer != null) {
761        this.dfsClusterFixer.shutdown();
762        dfsClusterFixer = null;
763      }
764      dataTestDirOnTestFS = null;
765      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
766    }
767  }
768
769  /**
770   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
771   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
772   * @param createWALDir Whether to create a new WAL directory.
773   * @return The mini HBase cluster created.
774   * @see #shutdownMiniCluster()
775   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
776   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
777   * @see #startMiniCluster(StartMiniClusterOption)
778   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
779   */
780  @Deprecated
781  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
782    StartMiniClusterOption option =
783      StartMiniClusterOption.builder().createWALDir(createWALDir).build();
784    return startMiniCluster(option);
785  }
786
787  /**
788   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
789   * defined in {@link StartMiniClusterOption.Builder}.
790   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
791   * @param createRootDir Whether to create a new root or data directory path.
792   * @return The mini HBase cluster created.
793   * @see #shutdownMiniCluster()
794   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
795   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
796   * @see #startMiniCluster(StartMiniClusterOption)
797   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
798   */
799  @Deprecated
800  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception {
801    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
802      .numDataNodes(numSlaves).createRootDir(createRootDir).build();
803    return startMiniCluster(option);
804  }
805
806  /**
807   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
808   * defined in {@link StartMiniClusterOption.Builder}.
809   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
810   * @param createRootDir Whether to create a new root or data directory path.
811   * @param createWALDir  Whether to create a new WAL directory.
812   * @return The mini HBase cluster created.
813   * @see #shutdownMiniCluster()
814   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
815   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
816   * @see #startMiniCluster(StartMiniClusterOption)
817   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
818   */
819  @Deprecated
820  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
821    boolean createWALDir) throws Exception {
822    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
823      .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build();
824    return startMiniCluster(option);
825  }
826
827  /**
828   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
829   * defined in {@link StartMiniClusterOption.Builder}.
830   * @param numMasters    Master node number.
831   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
832   * @param createRootDir Whether to create a new root or data directory path.
833   * @return The mini HBase cluster created.
834   * @see #shutdownMiniCluster()
835   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
836   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
837   * @see #startMiniCluster(StartMiniClusterOption)
838   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
839   */
840  @Deprecated
841  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
842    throws Exception {
843    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
844      .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build();
845    return startMiniCluster(option);
846  }
847
848  /**
849   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
850   * defined in {@link StartMiniClusterOption.Builder}.
851   * @param numMasters Master node number.
852   * @param numSlaves  Slave node number, for both HBase region server and HDFS data node.
853   * @return The mini HBase cluster created.
854   * @see #shutdownMiniCluster()
855   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
856   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
857   * @see #startMiniCluster(StartMiniClusterOption)
858   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
859   */
860  @Deprecated
861  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
862    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
863      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
864    return startMiniCluster(option);
865  }
866
867  /**
868   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
869   * defined in {@link StartMiniClusterOption.Builder}.
870   * @param numMasters    Master node number.
871   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
872   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
873   *                      HDFS data node number.
874   * @param createRootDir Whether to create a new root or data directory path.
875   * @return The mini HBase cluster created.
876   * @see #shutdownMiniCluster()
877   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
878   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
879   * @see #startMiniCluster(StartMiniClusterOption)
880   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
881   */
882  @Deprecated
883  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
884    boolean createRootDir) throws Exception {
885    StartMiniClusterOption option =
886      StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves)
887        .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
888    return startMiniCluster(option);
889  }
890
891  /**
892   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
893   * defined in {@link StartMiniClusterOption.Builder}.
894   * @param numMasters    Master node number.
895   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
896   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
897   *                      HDFS data node number.
898   * @return The mini HBase cluster created.
899   * @see #shutdownMiniCluster()
900   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
901   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
902   * @see #startMiniCluster(StartMiniClusterOption)
903   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
904   */
905  @Deprecated
906  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
907    throws Exception {
908    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
909      .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
910    return startMiniCluster(option);
911  }
912
913  /**
914   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
915   * defined in {@link StartMiniClusterOption.Builder}.
916   * @param numMasters       Master node number.
917   * @param numRegionServers Number of region servers.
918   * @param numDataNodes     Number of datanodes.
919   * @return The mini HBase cluster created.
920   * @see #shutdownMiniCluster()
921   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
922   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
923   * @see #startMiniCluster(StartMiniClusterOption)
924   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
925   */
926  @Deprecated
927  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
928    throws Exception {
929    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
930      .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build();
931    return startMiniCluster(option);
932  }
933
934  /**
935   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
936   * defined in {@link StartMiniClusterOption.Builder}.
937   * @param numMasters    Master node number.
938   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
939   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
940   *                      HDFS data node number.
941   * @param masterClass   The class to use as HMaster, or null for default.
942   * @param rsClass       The class to use as HRegionServer, or null for default.
943   * @return The mini HBase cluster created.
944   * @see #shutdownMiniCluster()
945   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
946   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
947   * @see #startMiniCluster(StartMiniClusterOption)
948   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
949   */
950  @Deprecated
951  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
952    Class<? extends HMaster> masterClass,
953    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
954    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
955      .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves)
956      .dataNodeHosts(dataNodeHosts).build();
957    return startMiniCluster(option);
958  }
959
960  /**
961   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
962   * defined in {@link StartMiniClusterOption.Builder}.
963   * @param numMasters       Master node number.
964   * @param numRegionServers Number of region servers.
965   * @param numDataNodes     Number of datanodes.
966   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
967   *                         overwrite HDFS data node number.
968   * @param masterClass      The class to use as HMaster, or null for default.
969   * @param rsClass          The class to use as HRegionServer, or null for default.
970   * @return The mini HBase cluster created.
971   * @see #shutdownMiniCluster()
972   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
973   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
974   * @see #startMiniCluster(StartMiniClusterOption)
975   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
976   */
977  @Deprecated
978  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
979    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
980    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
981    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
982      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
983      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build();
984    return startMiniCluster(option);
985  }
986
987  /**
988   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
989   * defined in {@link StartMiniClusterOption.Builder}.
990   * @param numMasters       Master node number.
991   * @param numRegionServers Number of region servers.
992   * @param numDataNodes     Number of datanodes.
993   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
994   *                         overwrite HDFS data node number.
995   * @param masterClass      The class to use as HMaster, or null for default.
996   * @param rsClass          The class to use as HRegionServer, or null for default.
997   * @param createRootDir    Whether to create a new root or data directory path.
998   * @param createWALDir     Whether to create a new WAL directory.
999   * @return The mini HBase cluster created.
1000   * @see #shutdownMiniCluster()
1001   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1002   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
1003   * @see #startMiniCluster(StartMiniClusterOption)
1004   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1005   */
1006  @Deprecated
1007  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1008    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1009    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1010    boolean createWALDir) throws Exception {
1011    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1012      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
1013      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir)
1014      .createWALDir(createWALDir).build();
1015    return startMiniCluster(option);
1016  }
1017
1018  /**
1019   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
1020   * other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1021   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1022   * @see #startMiniCluster(StartMiniClusterOption option)
1023   * @see #shutdownMiniDFSCluster()
1024   */
1025  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1026    StartMiniClusterOption option =
1027      StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1028    return startMiniCluster(option);
1029  }
1030
1031  /**
1032   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
1033   * value can be found in {@link StartMiniClusterOption.Builder}.
1034   * @see #startMiniCluster(StartMiniClusterOption option)
1035   * @see #shutdownMiniDFSCluster()
1036   */
1037  public MiniHBaseCluster startMiniCluster() throws Exception {
1038    return startMiniCluster(StartMiniClusterOption.builder().build());
1039  }
1040
1041  /**
1042   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
1043   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
1044   * under System property test.build.data, to be cleaned up on exit.
1045   * @see #shutdownMiniDFSCluster()
1046   */
1047  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1048    LOG.info("Starting up minicluster with option: {}", option);
1049
1050    // If we already put up a cluster, fail.
1051    if (miniClusterRunning) {
1052      throw new IllegalStateException("A mini-cluster is already running");
1053    }
1054    miniClusterRunning = true;
1055
1056    setupClusterTestDir();
1057
1058    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1059    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1060    if (dfsCluster == null) {
1061      LOG.info("STARTING DFS");
1062      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1063    } else {
1064      LOG.info("NOT STARTING DFS");
1065    }
1066
1067    // Start up a zk cluster.
1068    if (getZkCluster() == null) {
1069      startMiniZKCluster(option.getNumZkServers());
1070    }
1071
1072    // Start the MiniHBaseCluster
1073    return startMiniHBaseCluster(option);
1074  }
1075
1076  /**
1077   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1078   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1079   * @return Reference to the hbase mini hbase cluster.
1080   * @see #startMiniCluster(StartMiniClusterOption)
1081   * @see #shutdownMiniHBaseCluster()
1082   */
1083  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1084    throws IOException, InterruptedException {
1085    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1086    createRootDir(option.isCreateRootDir());
1087    if (option.isCreateWALDir()) {
1088      createWALRootDir();
1089    }
1090    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1091    // for tests that do not read hbase-defaults.xml
1092    setHBaseFsTmpDir();
1093
1094    // These settings will make the server waits until this exact number of
1095    // regions servers are connected.
1096    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1097      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1098    }
1099    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1100      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1101    }
1102
1103    Configuration c = new Configuration(this.conf);
1104    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1105      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1106      option.getMasterClass(), option.getRsClass());
1107    // Populate the master address configuration from mini cluster configuration.
1108    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1109    // Don't leave here till we've done a successful scan of the hbase:meta
1110    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1111      ResultScanner s = t.getScanner(new Scan())) {
1112      for (;;) {
1113        if (s.next() == null) {
1114          break;
1115        }
1116      }
1117    }
1118
1119    getAdmin(); // create immediately the hbaseAdmin
1120    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1121
1122    return (MiniHBaseCluster) hbaseCluster;
1123  }
1124
1125  /**
1126   * Starts up mini hbase cluster using default options. Default options can be found in
1127   * {@link StartMiniClusterOption.Builder}.
1128   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1129   * @see #shutdownMiniHBaseCluster()
1130   */
1131  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1132    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1133  }
1134
1135  /**
1136   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1137   * {@link #startMiniCluster()}. All other options will use default values, defined in
1138   * {@link StartMiniClusterOption.Builder}.
1139   * @param numMasters       Master node number.
1140   * @param numRegionServers Number of region servers.
1141   * @return The mini HBase cluster created.
1142   * @see #shutdownMiniHBaseCluster()
1143   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1144   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1145   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1146   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1147   */
1148  @Deprecated
1149  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1150    throws IOException, InterruptedException {
1151    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1152      .numRegionServers(numRegionServers).build();
1153    return startMiniHBaseCluster(option);
1154  }
1155
1156  /**
1157   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1158   * {@link #startMiniCluster()}. All other options will use default values, defined in
1159   * {@link StartMiniClusterOption.Builder}.
1160   * @param numMasters       Master node number.
1161   * @param numRegionServers Number of region servers.
1162   * @param rsPorts          Ports that RegionServer should use.
1163   * @return The mini HBase cluster created.
1164   * @see #shutdownMiniHBaseCluster()
1165   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1166   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1167   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1168   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1169   */
1170  @Deprecated
1171  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1172    List<Integer> rsPorts) throws IOException, InterruptedException {
1173    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1174      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1175    return startMiniHBaseCluster(option);
1176  }
1177
1178  /**
1179   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1180   * {@link #startMiniCluster()}. All other options will use default values, defined in
1181   * {@link StartMiniClusterOption.Builder}.
1182   * @param numMasters       Master node number.
1183   * @param numRegionServers Number of region servers.
1184   * @param rsPorts          Ports that RegionServer should use.
1185   * @param masterClass      The class to use as HMaster, or null for default.
1186   * @param rsClass          The class to use as HRegionServer, or null for default.
1187   * @param createRootDir    Whether to create a new root or data directory path.
1188   * @param createWALDir     Whether to create a new WAL directory.
1189   * @return The mini HBase cluster created.
1190   * @see #shutdownMiniHBaseCluster()
1191   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1192   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1193   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1194   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1195   */
1196  @Deprecated
1197  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1198    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1199    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1200    boolean createWALDir) throws IOException, InterruptedException {
1201    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1202      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1203      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1204    return startMiniHBaseCluster(option);
1205  }
1206
1207  /**
1208   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1209   * want to keep dfs/zk up and just stop/start hbase.
1210   * @param servers number of region servers
1211   */
1212  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1213    this.restartHBaseCluster(servers, null);
1214  }
1215
1216  public void restartHBaseCluster(int servers, List<Integer> ports)
1217    throws IOException, InterruptedException {
1218    StartMiniClusterOption option =
1219      StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1220    restartHBaseCluster(option);
1221    invalidateConnection();
1222  }
1223
1224  public void restartHBaseCluster(StartMiniClusterOption option)
1225    throws IOException, InterruptedException {
1226    closeConnection();
1227    this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(),
1228      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1229      option.getMasterClass(), option.getRsClass());
1230    // Don't leave here till we've done a successful scan of the hbase:meta
1231    Connection conn = ConnectionFactory.createConnection(this.conf);
1232    Table t = conn.getTable(TableName.META_TABLE_NAME);
1233    ResultScanner s = t.getScanner(new Scan());
1234    while (s.next() != null) {
1235      // do nothing
1236    }
1237    LOG.info("HBase has been restarted");
1238    s.close();
1239    t.close();
1240    conn.close();
1241  }
1242
1243  /**
1244   * @return Current mini hbase cluster. Only has something in it after a call to
1245   *         {@link #startMiniCluster()}.
1246   * @see #startMiniCluster()
1247   */
1248  public MiniHBaseCluster getMiniHBaseCluster() {
1249    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1250      return (MiniHBaseCluster) this.hbaseCluster;
1251    }
1252    throw new RuntimeException(
1253      hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName());
1254  }
1255
1256  /**
1257   * Stops mini hbase, zk, and hdfs clusters.
1258   * @see #startMiniCluster(int)
1259   */
1260  public void shutdownMiniCluster() throws IOException {
1261    LOG.info("Shutting down minicluster");
1262    shutdownMiniHBaseCluster();
1263    shutdownMiniDFSCluster();
1264    shutdownMiniZKCluster();
1265
1266    cleanupTestDir();
1267    miniClusterRunning = false;
1268    LOG.info("Minicluster is down");
1269  }
1270
1271  /**
1272   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1273   * @throws java.io.IOException in case command is unsuccessful
1274   */
1275  public void shutdownMiniHBaseCluster() throws IOException {
1276    cleanup();
1277    if (this.hbaseCluster != null) {
1278      this.hbaseCluster.shutdown();
1279      // Wait till hbase is down before going on to shutdown zk.
1280      this.hbaseCluster.waitUntilShutDown();
1281      this.hbaseCluster = null;
1282    }
1283    if (zooKeeperWatcher != null) {
1284      zooKeeperWatcher.close();
1285      zooKeeperWatcher = null;
1286    }
1287  }
1288
1289  /**
1290   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1291   * @throws java.io.IOException throws in case command is unsuccessful
1292   */
1293  public void killMiniHBaseCluster() throws IOException {
1294    cleanup();
1295    if (this.hbaseCluster != null) {
1296      getMiniHBaseCluster().killAll();
1297      this.hbaseCluster = null;
1298    }
1299    if (zooKeeperWatcher != null) {
1300      zooKeeperWatcher.close();
1301      zooKeeperWatcher = null;
1302    }
1303  }
1304
1305  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1306  private void cleanup() throws IOException {
1307    closeConnection();
1308    // unset the configuration for MIN and MAX RS to start
1309    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1310    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1311  }
1312
1313  /**
1314   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1315   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1316   * If false, previous path is used. Note: this does not cause the root dir to be created.
1317   * @return Fully qualified path for the default hbase root dir
1318   */
1319  public Path getDefaultRootDirPath(boolean create) throws IOException {
1320    if (!create) {
1321      return getDataTestDirOnTestFS();
1322    } else {
1323      return getNewDataTestDirOnTestFS();
1324    }
1325  }
1326
1327  /**
1328   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that
1329   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1330   * @return Fully qualified path for the default hbase root dir
1331   */
1332  public Path getDefaultRootDirPath() throws IOException {
1333    return getDefaultRootDirPath(false);
1334  }
1335
1336  /**
1337   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1338   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1339   * startup. You'd only use this method if you were doing manual operation.
1340   * @param create This flag decides whether to get a new root or data directory path or not, if it
1341   *               has been fetched already. Note : Directory will be made irrespective of whether
1342   *               path has been fetched or not. If directory already exists, it will be overwritten
1343   * @return Fully qualified path to hbase root dir
1344   */
1345  public Path createRootDir(boolean create) throws IOException {
1346    FileSystem fs = FileSystem.get(this.conf);
1347    Path hbaseRootdir = getDefaultRootDirPath(create);
1348    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1349    fs.mkdirs(hbaseRootdir);
1350    FSUtils.setVersion(fs, hbaseRootdir);
1351    return hbaseRootdir;
1352  }
1353
1354  /**
1355   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that
1356   * <code>create</code> flag is false.
1357   * @return Fully qualified path to hbase root dir
1358   */
1359  public Path createRootDir() throws IOException {
1360    return createRootDir(false);
1361  }
1362
1363  /**
1364   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1365   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1366   * this method if you were doing manual operation.
1367   * @return Fully qualified path to hbase root dir
1368   */
1369  public Path createWALRootDir() throws IOException {
1370    FileSystem fs = FileSystem.get(this.conf);
1371    Path walDir = getNewDataTestDirOnTestFS();
1372    CommonFSUtils.setWALRootDir(this.conf, walDir);
1373    fs.mkdirs(walDir);
1374    return walDir;
1375  }
1376
1377  private void setHBaseFsTmpDir() throws IOException {
1378    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1379    if (hbaseFsTmpDirInString == null) {
1380      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1381      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1382    } else {
1383      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1384    }
1385  }
1386
1387  /**
1388   * Flushes all caches in the mini hbase cluster
1389   */
1390  public void flush() throws IOException {
1391    getMiniHBaseCluster().flushcache();
1392  }
1393
1394  /**
1395   * Flushes all caches in the mini hbase cluster
1396   */
1397  public void flush(TableName tableName) throws IOException {
1398    getMiniHBaseCluster().flushcache(tableName);
1399  }
1400
1401  /**
1402   * Compact all regions in the mini hbase cluster
1403   */
1404  public void compact(boolean major) throws IOException {
1405    getMiniHBaseCluster().compact(major);
1406  }
1407
1408  /**
1409   * Compact all of a table's reagion in the mini hbase cluster
1410   */
1411  public void compact(TableName tableName, boolean major) throws IOException {
1412    getMiniHBaseCluster().compact(tableName, major);
1413  }
1414
1415  /**
1416   * Create a table.
1417   * @return A Table instance for the created table.
1418   */
1419  public Table createTable(TableName tableName, String family) throws IOException {
1420    return createTable(tableName, new String[] { family });
1421  }
1422
1423  /**
1424   * Create a table.
1425   * @return A Table instance for the created table.
1426   */
1427  public Table createTable(TableName tableName, String[] families) throws IOException {
1428    List<byte[]> fams = new ArrayList<>(families.length);
1429    for (String family : families) {
1430      fams.add(Bytes.toBytes(family));
1431    }
1432    return createTable(tableName, fams.toArray(new byte[0][]));
1433  }
1434
1435  /**
1436   * Create a table.
1437   * @return A Table instance for the created table.
1438   */
1439  public Table createTable(TableName tableName, byte[] family) throws IOException {
1440    return createTable(tableName, new byte[][] { family });
1441  }
1442
1443  /**
1444   * Create a table with multiple regions.
1445   * @return A Table instance for the created table.
1446   */
1447  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1448    throws IOException {
1449    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1450    byte[] startKey = Bytes.toBytes("aaaaa");
1451    byte[] endKey = Bytes.toBytes("zzzzz");
1452    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1453
1454    return createTable(tableName, new byte[][] { family }, splitKeys);
1455  }
1456
1457  /**
1458   * Create a table.
1459   * @return A Table instance for the created table.
1460   */
1461  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1462    return createTable(tableName, families, (byte[][]) null);
1463  }
1464
1465  /**
1466   * Create a table with multiple regions.
1467   * @return A Table instance for the created table.
1468   */
1469  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1470    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1471  }
1472
1473  /**
1474   * Create a table with multiple regions.
1475   * @param replicaCount replica count.
1476   * @return A Table instance for the created table.
1477   */
1478  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1479    throws IOException {
1480    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1481  }
1482
1483  /**
1484   * Create a table.
1485   * @return A Table instance for the created table.
1486   */
1487  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1488    throws IOException {
1489    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1490  }
1491
1492  /**
1493   * Create a table.
1494   * @param tableName    the table name
1495   * @param families     the families
1496   * @param splitKeys    the splitkeys
1497   * @param replicaCount the region replica count
1498   * @return A Table instance for the created table.
1499   * @throws IOException throws IOException
1500   */
1501  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1502    int replicaCount) throws IOException {
1503    return createTable(tableName, families, splitKeys, replicaCount,
1504      new Configuration(getConfiguration()));
1505  }
1506
1507  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1508    byte[] endKey, int numRegions) throws IOException {
1509    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1510
1511    getAdmin().createTable(desc, startKey, endKey, numRegions);
1512    // HBaseAdmin only waits for regions to appear in hbase:meta we
1513    // should wait until they are assigned
1514    waitUntilAllRegionsAssigned(tableName);
1515    return getConnection().getTable(tableName);
1516  }
1517
1518  /**
1519   * Create a table.
1520   * @param c Configuration to use
1521   * @return A Table instance for the created table.
1522   */
1523  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1524    throws IOException {
1525    return createTable(htd, families, null, c);
1526  }
1527
1528  /**
1529   * Create a table.
1530   * @param htd       table descriptor
1531   * @param families  array of column families
1532   * @param splitKeys array of split keys
1533   * @param c         Configuration to use
1534   * @return A Table instance for the created table.
1535   * @throws IOException if getAdmin or createTable fails
1536   */
1537  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1538    Configuration c) throws IOException {
1539    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1540    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1541    // on is interfering.
1542    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1543  }
1544
1545  /**
1546   * Create a table.
1547   * @param htd       table descriptor
1548   * @param families  array of column families
1549   * @param splitKeys array of split keys
1550   * @param type      Bloom type
1551   * @param blockSize block size
1552   * @param c         Configuration to use
1553   * @return A Table instance for the created table.
1554   * @throws IOException if getAdmin or createTable fails
1555   */
1556
1557  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1558    BloomType type, int blockSize, Configuration c) throws IOException {
1559    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1560    for (byte[] family : families) {
1561      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1562        .setBloomFilterType(type).setBlocksize(blockSize);
1563      if (isNewVersionBehaviorEnabled()) {
1564        cfdb.setNewVersionBehavior(true);
1565      }
1566      builder.setColumnFamily(cfdb.build());
1567    }
1568    TableDescriptor td = builder.build();
1569    if (splitKeys != null) {
1570      getAdmin().createTable(td, splitKeys);
1571    } else {
1572      getAdmin().createTable(td);
1573    }
1574    // HBaseAdmin only waits for regions to appear in hbase:meta
1575    // we should wait until they are assigned
1576    waitUntilAllRegionsAssigned(td.getTableName());
1577    return getConnection().getTable(td.getTableName());
1578  }
1579
1580  /**
1581   * Create a table.
1582   * @param htd       table descriptor
1583   * @param splitRows array of split keys
1584   * @return A Table instance for the created table.
1585   */
1586  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1587    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1588    if (isNewVersionBehaviorEnabled()) {
1589      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1590        builder.setColumnFamily(
1591          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1592      }
1593    }
1594    if (splitRows != null) {
1595      getAdmin().createTable(builder.build(), splitRows);
1596    } else {
1597      getAdmin().createTable(builder.build());
1598    }
1599    // HBaseAdmin only waits for regions to appear in hbase:meta
1600    // we should wait until they are assigned
1601    waitUntilAllRegionsAssigned(htd.getTableName());
1602    return getConnection().getTable(htd.getTableName());
1603  }
1604
1605  /**
1606   * Create a table.
1607   * @param tableName    the table name
1608   * @param families     the families
1609   * @param splitKeys    the split keys
1610   * @param replicaCount the replica count
1611   * @param c            Configuration to use
1612   * @return A Table instance for the created table.
1613   */
1614  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1615    int replicaCount, final Configuration c) throws IOException {
1616    TableDescriptor htd =
1617      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1618    return createTable(htd, families, splitKeys, c);
1619  }
1620
1621  /**
1622   * Create a table.
1623   * @return A Table instance for the created table.
1624   */
1625  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1626    return createTable(tableName, new byte[][] { family }, numVersions);
1627  }
1628
1629  /**
1630   * Create a table.
1631   * @return A Table instance for the created table.
1632   */
1633  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1634    throws IOException {
1635    return createTable(tableName, families, numVersions, (byte[][]) null);
1636  }
1637
1638  /**
1639   * Create a table.
1640   * @return A Table instance for the created table.
1641   */
1642  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1643    byte[][] splitKeys) throws IOException {
1644    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1645    for (byte[] family : families) {
1646      ColumnFamilyDescriptorBuilder cfBuilder =
1647        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1648      if (isNewVersionBehaviorEnabled()) {
1649        cfBuilder.setNewVersionBehavior(true);
1650      }
1651      builder.setColumnFamily(cfBuilder.build());
1652    }
1653    if (splitKeys != null) {
1654      getAdmin().createTable(builder.build(), splitKeys);
1655    } else {
1656      getAdmin().createTable(builder.build());
1657    }
1658    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1659    // assigned
1660    waitUntilAllRegionsAssigned(tableName);
1661    return getConnection().getTable(tableName);
1662  }
1663
1664  /**
1665   * Create a table with multiple regions.
1666   * @return A Table instance for the created table.
1667   */
1668  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1669    throws IOException {
1670    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1671  }
1672
1673  /**
1674   * Create a table.
1675   * @return A Table instance for the created table.
1676   */
1677  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1678    throws IOException {
1679    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1680    for (byte[] family : families) {
1681      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1682        .setMaxVersions(numVersions).setBlocksize(blockSize);
1683      if (isNewVersionBehaviorEnabled()) {
1684        cfBuilder.setNewVersionBehavior(true);
1685      }
1686      builder.setColumnFamily(cfBuilder.build());
1687    }
1688    getAdmin().createTable(builder.build());
1689    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1690    // assigned
1691    waitUntilAllRegionsAssigned(tableName);
1692    return getConnection().getTable(tableName);
1693  }
1694
1695  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1696    String cpName) throws IOException {
1697    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1698    for (byte[] family : families) {
1699      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1700        .setMaxVersions(numVersions).setBlocksize(blockSize);
1701      if (isNewVersionBehaviorEnabled()) {
1702        cfBuilder.setNewVersionBehavior(true);
1703      }
1704      builder.setColumnFamily(cfBuilder.build());
1705    }
1706    if (cpName != null) {
1707      builder.setCoprocessor(cpName);
1708    }
1709    getAdmin().createTable(builder.build());
1710    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1711    // assigned
1712    waitUntilAllRegionsAssigned(tableName);
1713    return getConnection().getTable(tableName);
1714  }
1715
1716  /**
1717   * Create a table.
1718   * @return A Table instance for the created table.
1719   */
1720  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1721    throws IOException {
1722    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1723    int i = 0;
1724    for (byte[] family : families) {
1725      ColumnFamilyDescriptorBuilder cfBuilder =
1726        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1727      if (isNewVersionBehaviorEnabled()) {
1728        cfBuilder.setNewVersionBehavior(true);
1729      }
1730      builder.setColumnFamily(cfBuilder.build());
1731      i++;
1732    }
1733    getAdmin().createTable(builder.build());
1734    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1735    // assigned
1736    waitUntilAllRegionsAssigned(tableName);
1737    return getConnection().getTable(tableName);
1738  }
1739
1740  /**
1741   * Create a table.
1742   * @return A Table instance for the created table.
1743   */
1744  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1745    throws IOException {
1746    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1747    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1748    if (isNewVersionBehaviorEnabled()) {
1749      cfBuilder.setNewVersionBehavior(true);
1750    }
1751    builder.setColumnFamily(cfBuilder.build());
1752    getAdmin().createTable(builder.build(), splitRows);
1753    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1754    // assigned
1755    waitUntilAllRegionsAssigned(tableName);
1756    return getConnection().getTable(tableName);
1757  }
1758
1759  /**
1760   * Create a table with multiple regions.
1761   * @return A Table instance for the created table.
1762   */
1763  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1764    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1765  }
1766
1767  /**
1768   * Modify a table, synchronous.
1769   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1770   *             {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1771   * @see Admin#modifyTable(TableDescriptor)
1772   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1773   */
1774  @Deprecated
1775  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1776    throws IOException, InterruptedException {
1777    admin.modifyTable(desc);
1778  }
1779
1780  /**
1781   * Set the number of Region replicas.
1782   */
1783  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1784    throws IOException, InterruptedException {
1785    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1786      .setRegionReplication(replicaCount).build();
1787    admin.modifyTable(desc);
1788  }
1789
1790  /**
1791   * Drop an existing table
1792   * @param tableName existing table
1793   */
1794  public void deleteTable(TableName tableName) throws IOException {
1795    try {
1796      getAdmin().disableTable(tableName);
1797    } catch (TableNotEnabledException e) {
1798      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1799    }
1800    getAdmin().deleteTable(tableName);
1801  }
1802
1803  /**
1804   * Drop an existing table
1805   * @param tableName existing table
1806   */
1807  public void deleteTableIfAny(TableName tableName) throws IOException {
1808    try {
1809      deleteTable(tableName);
1810    } catch (TableNotFoundException e) {
1811      // ignore
1812    }
1813  }
1814
1815  // ==========================================================================
1816  // Canned table and table descriptor creation
1817
1818  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1819  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1820  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1821  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1822  private static final int MAXVERSIONS = 3;
1823
1824  public static final char FIRST_CHAR = 'a';
1825  public static final char LAST_CHAR = 'z';
1826  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1827  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1828
1829  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1830    return createModifyableTableDescriptor(TableName.valueOf(name),
1831      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1832      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1833  }
1834
1835  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1836    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1837    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1838    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1839      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1840        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1841        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1842      if (isNewVersionBehaviorEnabled()) {
1843        cfBuilder.setNewVersionBehavior(true);
1844      }
1845      builder.setColumnFamily(cfBuilder.build());
1846    }
1847    return builder.build();
1848  }
1849
1850  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1851    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1852    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1853    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1854      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1855        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1856        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1857      if (isNewVersionBehaviorEnabled()) {
1858        cfBuilder.setNewVersionBehavior(true);
1859      }
1860      builder.setColumnFamily(cfBuilder.build());
1861    }
1862    return builder;
1863  }
1864
1865  /**
1866   * Create a table of name <code>name</code>.
1867   * @param name Name to give table.
1868   * @return Column descriptor.
1869   */
1870  public TableDescriptor createTableDescriptor(final TableName name) {
1871    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1872      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1873  }
1874
1875  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1876    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1877  }
1878
1879  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1880    int maxVersions) {
1881    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1882    for (byte[] family : families) {
1883      ColumnFamilyDescriptorBuilder cfBuilder =
1884        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1885      if (isNewVersionBehaviorEnabled()) {
1886        cfBuilder.setNewVersionBehavior(true);
1887      }
1888      builder.setColumnFamily(cfBuilder.build());
1889    }
1890    return builder.build();
1891  }
1892
1893  /**
1894   * Create an HRegion that writes to the local tmp dirs
1895   * @param desc     a table descriptor indicating which table the region belongs to
1896   * @param startKey the start boundary of the region
1897   * @param endKey   the end boundary of the region
1898   * @return a region that writes to local dir for testing
1899   */
1900  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1901    throws IOException {
1902    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1903      .setEndKey(endKey).build();
1904    return createLocalHRegion(hri, desc);
1905  }
1906
1907  /**
1908   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1909   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1910   */
1911  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1912    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1913  }
1914
1915  /**
1916   * Create an HRegion that writes to the local tmp dirs with specified wal
1917   * @param info regioninfo
1918   * @param conf configuration
1919   * @param desc table descriptor
1920   * @param wal  wal for this region.
1921   * @return created hregion
1922   */
1923  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1924    WAL wal) throws IOException {
1925    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1926  }
1927
1928  /**
1929   * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
1930   * when done.
1931   */
1932  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1933    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1934    throws IOException {
1935    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1936      durability, wal, null, families);
1937  }
1938
1939  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1940    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1941    boolean[] compactedMemStore, byte[]... families) throws IOException {
1942    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1943    builder.setReadOnly(isReadOnly);
1944    int i = 0;
1945    for (byte[] family : families) {
1946      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1947      if (compactedMemStore != null && i < compactedMemStore.length) {
1948        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1949      } else {
1950        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1951
1952      }
1953      i++;
1954      // Set default to be three versions.
1955      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1956      builder.setColumnFamily(cfBuilder.build());
1957    }
1958    builder.setDurability(durability);
1959    RegionInfo info =
1960      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1961    return createLocalHRegion(info, conf, builder.build(), wal);
1962  }
1963
1964  //
1965  // ==========================================================================
1966
1967  /**
1968   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1969   * read.
1970   * @param tableName existing table
1971   * @return HTable to that new table
1972   */
1973  public Table deleteTableData(TableName tableName) throws IOException {
1974    Table table = getConnection().getTable(tableName);
1975    Scan scan = new Scan();
1976    ResultScanner resScan = table.getScanner(scan);
1977    for (Result res : resScan) {
1978      Delete del = new Delete(res.getRow());
1979      table.delete(del);
1980    }
1981    resScan = table.getScanner(scan);
1982    resScan.close();
1983    return table;
1984  }
1985
1986  /**
1987   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1988   * table.
1989   * @param tableName       table which must exist.
1990   * @param preserveRegions keep the existing split points
1991   * @return HTable for the new table
1992   */
1993  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1994    throws IOException {
1995    Admin admin = getAdmin();
1996    if (!admin.isTableDisabled(tableName)) {
1997      admin.disableTable(tableName);
1998    }
1999    admin.truncateTable(tableName, preserveRegions);
2000    return getConnection().getTable(tableName);
2001  }
2002
2003  /**
2004   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2005   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
2006   * preserve regions of existing table.
2007   * @param tableName table which must exist.
2008   * @return HTable for the new table
2009   */
2010  public Table truncateTable(final TableName tableName) throws IOException {
2011    return truncateTable(tableName, false);
2012  }
2013
2014  /**
2015   * Load table with rows from 'aaa' to 'zzz'.
2016   * @param t Table
2017   * @param f Family
2018   * @return Count of rows loaded.
2019   */
2020  public int loadTable(final Table t, final byte[] f) throws IOException {
2021    return loadTable(t, new byte[][] { f });
2022  }
2023
2024  /**
2025   * Load table with rows from 'aaa' to 'zzz'.
2026   * @param t Table
2027   * @param f Family
2028   * @return Count of rows loaded.
2029   */
2030  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2031    return loadTable(t, new byte[][] { f }, null, writeToWAL);
2032  }
2033
2034  /**
2035   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2036   * @param t Table
2037   * @param f Array of Families to load
2038   * @return Count of rows loaded.
2039   */
2040  public int loadTable(final Table t, final byte[][] f) throws IOException {
2041    return loadTable(t, f, null);
2042  }
2043
2044  /**
2045   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2046   * @param t     Table
2047   * @param f     Array of Families to load
2048   * @param value the values of the cells. If null is passed, the row key is used as value
2049   * @return Count of rows loaded.
2050   */
2051  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2052    return loadTable(t, f, value, true);
2053  }
2054
2055  /**
2056   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2057   * @param t     Table
2058   * @param f     Array of Families to load
2059   * @param value the values of the cells. If null is passed, the row key is used as value
2060   * @return Count of rows loaded.
2061   */
2062  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
2063    throws IOException {
2064    List<Put> puts = new ArrayList<>();
2065    for (byte[] row : HBaseTestingUtility.ROWS) {
2066      Put put = new Put(row);
2067      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2068      for (int i = 0; i < f.length; i++) {
2069        byte[] value1 = value != null ? value : row;
2070        put.addColumn(f[i], f[i], value1);
2071      }
2072      puts.add(put);
2073    }
2074    t.put(puts);
2075    return puts.size();
2076  }
2077
2078  /**
2079   * A tracker for tracking and validating table rows generated with
2080   * {@link HBaseTestingUtility#loadTable(Table, byte[])}
2081   */
2082  public static class SeenRowTracker {
2083    int dim = 'z' - 'a' + 1;
2084    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
2085    byte[] startRow;
2086    byte[] stopRow;
2087
2088    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2089      this.startRow = startRow;
2090      this.stopRow = stopRow;
2091    }
2092
2093    void reset() {
2094      for (byte[] row : ROWS) {
2095        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2096      }
2097    }
2098
2099    int i(byte b) {
2100      return b - 'a';
2101    }
2102
2103    public void addRow(byte[] row) {
2104      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2105    }
2106
2107    /**
2108     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
2109     * rows none
2110     */
2111    public void validate() {
2112      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2113        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2114          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2115            int count = seenRows[i(b1)][i(b2)][i(b3)];
2116            int expectedCount = 0;
2117            if (
2118              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
2119                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
2120            ) {
2121              expectedCount = 1;
2122            }
2123            if (count != expectedCount) {
2124              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
2125              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
2126                + "instead of " + expectedCount);
2127            }
2128          }
2129        }
2130      }
2131    }
2132  }
2133
2134  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2135    return loadRegion(r, f, false);
2136  }
2137
2138  public int loadRegion(final Region r, final byte[] f) throws IOException {
2139    return loadRegion((HRegion) r, f);
2140  }
2141
2142  /**
2143   * Load region with rows from 'aaa' to 'zzz'.
2144   * @param r     Region
2145   * @param f     Family
2146   * @param flush flush the cache if true
2147   * @return Count of rows loaded.
2148   */
2149  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
2150    byte[] k = new byte[3];
2151    int rowCount = 0;
2152    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2153      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2154        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2155          k[0] = b1;
2156          k[1] = b2;
2157          k[2] = b3;
2158          Put put = new Put(k);
2159          put.setDurability(Durability.SKIP_WAL);
2160          put.addColumn(f, null, k);
2161          if (r.getWAL() == null) {
2162            put.setDurability(Durability.SKIP_WAL);
2163          }
2164          int preRowCount = rowCount;
2165          int pause = 10;
2166          int maxPause = 1000;
2167          while (rowCount == preRowCount) {
2168            try {
2169              r.put(put);
2170              rowCount++;
2171            } catch (RegionTooBusyException e) {
2172              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2173              Threads.sleep(pause);
2174            }
2175          }
2176        }
2177      }
2178      if (flush) {
2179        r.flush(true);
2180      }
2181    }
2182    return rowCount;
2183  }
2184
2185  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2186    throws IOException {
2187    for (int i = startRow; i < endRow; i++) {
2188      byte[] data = Bytes.toBytes(String.valueOf(i));
2189      Put put = new Put(data);
2190      put.addColumn(f, null, data);
2191      t.put(put);
2192    }
2193  }
2194
2195  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2196    throws IOException {
2197    byte[] row = new byte[rowSize];
2198    for (int i = 0; i < totalRows; i++) {
2199      Bytes.random(row);
2200      Put put = new Put(row);
2201      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2202      t.put(put);
2203    }
2204  }
2205
2206  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2207    int replicaId) throws IOException {
2208    for (int i = startRow; i < endRow; i++) {
2209      String failMsg = "Failed verification of row :" + i;
2210      byte[] data = Bytes.toBytes(String.valueOf(i));
2211      Get get = new Get(data);
2212      get.setReplicaId(replicaId);
2213      get.setConsistency(Consistency.TIMELINE);
2214      Result result = table.get(get);
2215      if (!result.containsColumn(f, null)) {
2216        throw new AssertionError(failMsg);
2217      }
2218      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2219      Cell cell = result.getColumnLatestCell(f, null);
2220      if (
2221        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2222          cell.getValueLength())
2223      ) {
2224        throw new AssertionError(failMsg);
2225      }
2226    }
2227  }
2228
2229  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2230    throws IOException {
2231    verifyNumericRows((HRegion) region, f, startRow, endRow);
2232  }
2233
2234  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2235    throws IOException {
2236    verifyNumericRows(region, f, startRow, endRow, true);
2237  }
2238
2239  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2240    final boolean present) throws IOException {
2241    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2242  }
2243
2244  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2245    final boolean present) throws IOException {
2246    for (int i = startRow; i < endRow; i++) {
2247      String failMsg = "Failed verification of row :" + i;
2248      byte[] data = Bytes.toBytes(String.valueOf(i));
2249      Result result = region.get(new Get(data));
2250
2251      boolean hasResult = result != null && !result.isEmpty();
2252      if (present != hasResult) {
2253        throw new AssertionError(
2254          failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">");
2255      }
2256      if (!present) continue;
2257
2258      if (!result.containsColumn(f, null)) {
2259        throw new AssertionError(failMsg);
2260      }
2261      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2262      Cell cell = result.getColumnLatestCell(f, null);
2263      if (
2264        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2265          cell.getValueLength())
2266      ) {
2267        throw new AssertionError(failMsg);
2268      }
2269    }
2270  }
2271
2272  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2273    throws IOException {
2274    for (int i = startRow; i < endRow; i++) {
2275      byte[] data = Bytes.toBytes(String.valueOf(i));
2276      Delete delete = new Delete(data);
2277      delete.addFamily(f);
2278      t.delete(delete);
2279    }
2280  }
2281
2282  /**
2283   * Return the number of rows in the given table.
2284   * @param table to count rows
2285   * @return count of rows
2286   */
2287  public static int countRows(final Table table) throws IOException {
2288    return countRows(table, new Scan());
2289  }
2290
2291  public static int countRows(final Table table, final Scan scan) throws IOException {
2292    try (ResultScanner results = table.getScanner(scan)) {
2293      int count = 0;
2294      while (results.next() != null) {
2295        count++;
2296      }
2297      return count;
2298    }
2299  }
2300
2301  public int countRows(final Table table, final byte[]... families) throws IOException {
2302    Scan scan = new Scan();
2303    for (byte[] family : families) {
2304      scan.addFamily(family);
2305    }
2306    return countRows(table, scan);
2307  }
2308
2309  /**
2310   * Return the number of rows in the given table.
2311   */
2312  public int countRows(final TableName tableName) throws IOException {
2313    Table table = getConnection().getTable(tableName);
2314    try {
2315      return countRows(table);
2316    } finally {
2317      table.close();
2318    }
2319  }
2320
2321  public int countRows(final Region region) throws IOException {
2322    return countRows(region, new Scan());
2323  }
2324
2325  public int countRows(final Region region, final Scan scan) throws IOException {
2326    InternalScanner scanner = region.getScanner(scan);
2327    try {
2328      return countRows(scanner);
2329    } finally {
2330      scanner.close();
2331    }
2332  }
2333
2334  public int countRows(final InternalScanner scanner) throws IOException {
2335    int scannedCount = 0;
2336    List<Cell> results = new ArrayList<>();
2337    boolean hasMore = true;
2338    while (hasMore) {
2339      hasMore = scanner.next(results);
2340      scannedCount += results.size();
2341      results.clear();
2342    }
2343    return scannedCount;
2344  }
2345
2346  /**
2347   * Return an md5 digest of the entire contents of a table.
2348   */
2349  public String checksumRows(final Table table) throws Exception {
2350
2351    Scan scan = new Scan();
2352    ResultScanner results = table.getScanner(scan);
2353    MessageDigest digest = MessageDigest.getInstance("MD5");
2354    for (Result res : results) {
2355      digest.update(res.getRow());
2356    }
2357    results.close();
2358    return digest.toString();
2359  }
2360
2361  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2362  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2363  static {
2364    int i = 0;
2365    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2366      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2367        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2368          ROWS[i][0] = b1;
2369          ROWS[i][1] = b2;
2370          ROWS[i][2] = b3;
2371          i++;
2372        }
2373      }
2374    }
2375  }
2376
2377  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2378    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2379    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2380    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2381    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2382    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2383    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2384
2385  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2386    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2387    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2388    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2389    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2390    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2391    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2392
2393  /**
2394   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2395   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2396   * @return list of region info for regions added to meta
2397   */
2398  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2399    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2400    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2401    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2402    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2403    MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2404      TableState.State.ENABLED);
2405    // add custom ones
2406    for (int i = 0; i < startKeys.length; i++) {
2407      int j = (i + 1) % startKeys.length;
2408      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2409        .setEndKey(startKeys[j]).build();
2410      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2411      newRegions.add(hri);
2412    }
2413
2414    meta.close();
2415    return newRegions;
2416  }
2417
2418  /**
2419   * Create an unmanaged WAL. Be sure to close it when you're through.
2420   */
2421  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2422    throws IOException {
2423    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2424    // unless I pass along via the conf.
2425    Configuration confForWAL = new Configuration(conf);
2426    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2427    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2428  }
2429
2430  /**
2431   * Create a region with it's own WAL. Be sure to call
2432   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2433   */
2434  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2435    final Configuration conf, final TableDescriptor htd) throws IOException {
2436    return createRegionAndWAL(info, rootDir, conf, htd, true);
2437  }
2438
2439  /**
2440   * Create a region with it's own WAL. Be sure to call
2441   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2442   */
2443  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2444    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2445    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2446    region.setBlockCache(blockCache);
2447    region.initialize();
2448    return region;
2449  }
2450
2451  /**
2452   * Create a region with it's own WAL. Be sure to call
2453   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2454   */
2455  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2456    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2457    throws IOException {
2458    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2459    region.setMobFileCache(mobFileCache);
2460    region.initialize();
2461    return region;
2462  }
2463
2464  /**
2465   * Create a region with it's own WAL. Be sure to call
2466   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2467   */
2468  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2469    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2470    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2471      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2472    WAL wal = createWal(conf, rootDir, info);
2473    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2474  }
2475
2476  /**
2477   * Returns all rows from the hbase:meta table.
2478   * @throws IOException When reading the rows fails.
2479   */
2480  public List<byte[]> getMetaTableRows() throws IOException {
2481    // TODO: Redo using MetaTableAccessor class
2482    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2483    List<byte[]> rows = new ArrayList<>();
2484    ResultScanner s = t.getScanner(new Scan());
2485    for (Result result : s) {
2486      LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()));
2487      rows.add(result.getRow());
2488    }
2489    s.close();
2490    t.close();
2491    return rows;
2492  }
2493
2494  /**
2495   * Returns all rows from the hbase:meta table for a given user table
2496   * @throws IOException When reading the rows fails.
2497   */
2498  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2499    // TODO: Redo using MetaTableAccessor.
2500    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2501    List<byte[]> rows = new ArrayList<>();
2502    ResultScanner s = t.getScanner(new Scan());
2503    for (Result result : s) {
2504      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2505      if (info == null) {
2506        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2507        // TODO figure out what to do for this new hosed case.
2508        continue;
2509      }
2510
2511      if (info.getTable().equals(tableName)) {
2512        LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info);
2513        rows.add(result.getRow());
2514      }
2515    }
2516    s.close();
2517    t.close();
2518    return rows;
2519  }
2520
2521  /**
2522   * Returns all regions of the specified table
2523   * @param tableName the table name
2524   * @return all regions of the specified table
2525   * @throws IOException when getting the regions fails.
2526   */
2527  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2528    try (Admin admin = getConnection().getAdmin()) {
2529      return admin.getRegions(tableName);
2530    }
2531  }
2532
2533  /**
2534   * Find any other region server which is different from the one identified by parameter
2535   * @return another region server
2536   */
2537  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2538    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2539      if (!(rst.getRegionServer() == rs)) {
2540        return rst.getRegionServer();
2541      }
2542    }
2543    return null;
2544  }
2545
2546  /**
2547   * Tool to get the reference to the region server object that holds the region of the specified
2548   * user table.
2549   * @param tableName user table to lookup in hbase:meta
2550   * @return region server that holds it, null if the row doesn't exist
2551   */
2552  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2553    throws IOException, InterruptedException {
2554    List<RegionInfo> regions = getRegions(tableName);
2555    if (regions == null || regions.isEmpty()) {
2556      return null;
2557    }
2558    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2559
2560    byte[] firstRegionName =
2561      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2562        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2563
2564    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2565    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2566      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2567    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2568      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2569    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2570    while (retrier.shouldRetry()) {
2571      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2572      if (index != -1) {
2573        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2574      }
2575      // Came back -1. Region may not be online yet. Sleep a while.
2576      retrier.sleepUntilNextRetry();
2577    }
2578    return null;
2579  }
2580
2581  /**
2582   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2583   * @throws IOException When starting the cluster fails.
2584   */
2585  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2586    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2587    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2588      "99.0");
2589    startMiniMapReduceCluster(2);
2590    return mrCluster;
2591  }
2592
2593  /**
2594   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2595   * internal static LOG_DIR variable.
2596   */
2597  private void forceChangeTaskLogDir() {
2598    Field logDirField;
2599    try {
2600      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2601      logDirField.setAccessible(true);
2602
2603      Field modifiersField = ReflectionUtils.getModifiersField();
2604      modifiersField.setAccessible(true);
2605      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2606
2607      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2608    } catch (SecurityException e) {
2609      throw new RuntimeException(e);
2610    } catch (NoSuchFieldException e) {
2611      throw new RuntimeException(e);
2612    } catch (IllegalArgumentException e) {
2613      throw new RuntimeException(e);
2614    } catch (IllegalAccessException e) {
2615      throw new RuntimeException(e);
2616    }
2617  }
2618
2619  /**
2620   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2621   * filesystem.
2622   * @param servers The number of <code>TaskTracker</code>'s to start.
2623   * @throws IOException When starting the cluster fails.
2624   */
2625  private void startMiniMapReduceCluster(final int servers) throws IOException {
2626    if (mrCluster != null) {
2627      throw new IllegalStateException("MiniMRCluster is already running");
2628    }
2629    LOG.info("Starting mini mapreduce cluster...");
2630    setupClusterTestDir();
2631    createDirsAndSetProperties();
2632
2633    forceChangeTaskLogDir();
2634
2635    //// hadoop2 specific settings
2636    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2637    // we up the VM usable so that processes don't get killed.
2638    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2639
2640    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2641    // this avoids the problem by disabling speculative task execution in tests.
2642    conf.setBoolean("mapreduce.map.speculative", false);
2643    conf.setBoolean("mapreduce.reduce.speculative", false);
2644    ////
2645
2646    // Allow the user to override FS URI for this map-reduce cluster to use.
2647    mrCluster =
2648      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2649        1, null, null, new JobConf(this.conf));
2650    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2651    if (jobConf == null) {
2652      jobConf = mrCluster.createJobConf();
2653    }
2654    // Hadoop MiniMR overwrites this while it should not
2655    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2656    LOG.info("Mini mapreduce cluster started");
2657
2658    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2659    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2660    // necessary config properties here. YARN-129 required adding a few properties.
2661    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2662    // this for mrv2 support; mr1 ignores this
2663    conf.set("mapreduce.framework.name", "yarn");
2664    conf.setBoolean("yarn.is.minicluster", true);
2665    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2666    if (rmAddress != null) {
2667      conf.set("yarn.resourcemanager.address", rmAddress);
2668    }
2669    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2670    if (historyAddress != null) {
2671      conf.set("mapreduce.jobhistory.address", historyAddress);
2672    }
2673    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2674    if (schedulerAddress != null) {
2675      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2676    }
2677    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2678    if (mrJobHistoryWebappAddress != null) {
2679      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2680    }
2681    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2682    if (yarnRMWebappAddress != null) {
2683      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2684    }
2685  }
2686
2687  /**
2688   * Stops the previously started <code>MiniMRCluster</code>.
2689   */
2690  public void shutdownMiniMapReduceCluster() {
2691    if (mrCluster != null) {
2692      LOG.info("Stopping mini mapreduce cluster...");
2693      mrCluster.shutdown();
2694      mrCluster = null;
2695      LOG.info("Mini mapreduce cluster stopped");
2696    }
2697    // Restore configuration to point to local jobtracker
2698    conf.set("mapreduce.jobtracker.address", "local");
2699  }
2700
2701  /**
2702   * Create a stubbed out RegionServerService, mainly for getting FS.
2703   */
2704  public RegionServerServices createMockRegionServerService() throws IOException {
2705    return createMockRegionServerService((ServerName) null);
2706  }
2707
2708  /**
2709   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2710   * TestTokenAuthentication
2711   */
2712  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2713    throws IOException {
2714    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2715    rss.setFileSystem(getTestFileSystem());
2716    rss.setRpcServer(rpc);
2717    return rss;
2718  }
2719
2720  /**
2721   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2722   * TestOpenRegionHandler
2723   */
2724  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2725    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2726    rss.setFileSystem(getTestFileSystem());
2727    return rss;
2728  }
2729
2730  /**
2731   * Switches the logger for the given class to DEBUG level.
2732   * @param clazz The class for which to switch to debug logging.
2733   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2734   *             HBase only uses log4j. You should do this by your own as it you know which log
2735   *             framework you are using then set the log level to debug is very easy.
2736   */
2737  @Deprecated
2738  public void enableDebug(Class<?> clazz) {
2739    Log4jUtils.enableDebug(clazz);
2740  }
2741
2742  /**
2743   * Expire the Master's session
2744   */
2745  public void expireMasterSession() throws Exception {
2746    HMaster master = getMiniHBaseCluster().getMaster();
2747    expireSession(master.getZooKeeper(), false);
2748  }
2749
2750  /**
2751   * Expire a region server's session
2752   * @param index which RS
2753   */
2754  public void expireRegionServerSession(int index) throws Exception {
2755    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2756    expireSession(rs.getZooKeeper(), false);
2757    decrementMinRegionServerCount();
2758  }
2759
2760  private void decrementMinRegionServerCount() {
2761    // decrement the count for this.conf, for newly spwaned master
2762    // this.hbaseCluster shares this configuration too
2763    decrementMinRegionServerCount(getConfiguration());
2764
2765    // each master thread keeps a copy of configuration
2766    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2767      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2768    }
2769  }
2770
2771  private void decrementMinRegionServerCount(Configuration conf) {
2772    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2773    if (currentCount != -1) {
2774      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2775    }
2776  }
2777
2778  public void expireSession(ZKWatcher nodeZK) throws Exception {
2779    expireSession(nodeZK, false);
2780  }
2781
2782  /**
2783   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2784   * http://hbase.apache.org/book.html#trouble.zookeeper There are issues when doing this: [1]
2785   * http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html [2]
2786   * https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2787   * @param nodeZK      - the ZK watcher to expire
2788   * @param checkStatus - true to check if we can create a Table with the current configuration.
2789   */
2790  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2791    Configuration c = new Configuration(this.conf);
2792    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2793    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2794    byte[] password = zk.getSessionPasswd();
2795    long sessionID = zk.getSessionId();
2796
2797    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2798    // so we create a first watcher to be sure that the
2799    // event was sent. We expect that if our watcher receives the event
2800    // other watchers on the same machine will get is as well.
2801    // When we ask to close the connection, ZK does not close it before
2802    // we receive all the events, so don't have to capture the event, just
2803    // closing the connection should be enough.
2804    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2805      @Override
2806      public void process(WatchedEvent watchedEvent) {
2807        LOG.info("Monitor ZKW received event=" + watchedEvent);
2808      }
2809    }, sessionID, password);
2810
2811    // Making it expire
2812    ZooKeeper newZK =
2813      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2814
2815    // ensure that we have connection to the server before closing down, otherwise
2816    // the close session event will be eaten out before we start CONNECTING state
2817    long start = EnvironmentEdgeManager.currentTime();
2818    while (
2819      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2820    ) {
2821      Thread.sleep(1);
2822    }
2823    newZK.close();
2824    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2825
2826    // Now closing & waiting to be sure that the clients get it.
2827    monitor.close();
2828
2829    if (checkStatus) {
2830      getConnection().getTable(TableName.META_TABLE_NAME).close();
2831    }
2832  }
2833
2834  /**
2835   * Get the Mini HBase cluster.
2836   * @return hbase cluster
2837   * @see #getHBaseClusterInterface()
2838   */
2839  public MiniHBaseCluster getHBaseCluster() {
2840    return getMiniHBaseCluster();
2841  }
2842
2843  /**
2844   * Returns the HBaseCluster instance.
2845   * <p>
2846   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2847   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2848   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2849   * instead w/o the need to type-cast.
2850   */
2851  public HBaseCluster getHBaseClusterInterface() {
2852    // implementation note: we should rename this method as #getHBaseCluster(),
2853    // but this would require refactoring 90+ calls.
2854    return hbaseCluster;
2855  }
2856
2857  /**
2858   * Resets the connections so that the next time getConnection() is called, a new connection is
2859   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2860   * the connection is not valid anymore. TODO: There should be a more coherent way of doing this.
2861   * Unfortunately the way tests are written, not all start() stop() calls go through this class.
2862   * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult
2863   * for this wrapper class to maintain the connection state automatically. Cleaning this is a much
2864   * bigger refactor.
2865   */
2866  public void invalidateConnection() throws IOException {
2867    closeConnection();
2868    // Update the master addresses if they changed.
2869    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2870    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2871    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2872      masterConfigBefore, masterConfAfter);
2873    conf.set(HConstants.MASTER_ADDRS_KEY,
2874      getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2875  }
2876
2877  /**
2878   * Get a shared Connection to the cluster. this method is thread safe.
2879   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2880   */
2881  public Connection getConnection() throws IOException {
2882    return getAsyncConnection().toConnection();
2883  }
2884
2885  /**
2886   * Get a assigned Connection to the cluster. this method is thread safe.
2887   * @param user assigned user
2888   * @return A Connection with assigned user.
2889   */
2890  public Connection getConnection(User user) throws IOException {
2891    return getAsyncConnection(user).toConnection();
2892  }
2893
2894  /**
2895   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2896   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2897   *         of cluster.
2898   */
2899  public AsyncClusterConnection getAsyncConnection() throws IOException {
2900    try {
2901      return asyncConnection.updateAndGet(connection -> {
2902        if (connection == null) {
2903          try {
2904            User user = UserProvider.instantiate(conf).getCurrent();
2905            connection = getAsyncConnection(user);
2906          } catch (IOException ioe) {
2907            throw new UncheckedIOException("Failed to create connection", ioe);
2908          }
2909        }
2910        return connection;
2911      });
2912    } catch (UncheckedIOException exception) {
2913      throw exception.getCause();
2914    }
2915  }
2916
2917  /**
2918   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2919   * @param user assigned user
2920   * @return An AsyncClusterConnection with assigned user.
2921   */
2922  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2923    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2924  }
2925
2926  public void closeConnection() throws IOException {
2927    if (hbaseAdmin != null) {
2928      Closeables.close(hbaseAdmin, true);
2929      hbaseAdmin = null;
2930    }
2931    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2932    if (asyncConnection != null) {
2933      Closeables.close(asyncConnection, true);
2934    }
2935  }
2936
2937  /**
2938   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2939   * it has no effect, it will be closed automatically when the cluster shutdowns
2940   */
2941  public Admin getAdmin() throws IOException {
2942    if (hbaseAdmin == null) {
2943      this.hbaseAdmin = getConnection().getAdmin();
2944    }
2945    return hbaseAdmin;
2946  }
2947
2948  private Admin hbaseAdmin = null;
2949
2950  /**
2951   * Returns an {@link Hbck} instance. Needs be closed when done.
2952   */
2953  public Hbck getHbck() throws IOException {
2954    return getConnection().getHbck();
2955  }
2956
2957  /**
2958   * Unassign the named region.
2959   * @param regionName The region to unassign.
2960   */
2961  public void unassignRegion(String regionName) throws IOException {
2962    unassignRegion(Bytes.toBytes(regionName));
2963  }
2964
2965  /**
2966   * Unassign the named region.
2967   * @param regionName The region to unassign.
2968   */
2969  public void unassignRegion(byte[] regionName) throws IOException {
2970    getAdmin().unassign(regionName, true);
2971  }
2972
2973  /**
2974   * Closes the region containing the given row.
2975   * @param row   The row to find the containing region.
2976   * @param table The table to find the region.
2977   */
2978  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2979    unassignRegionByRow(Bytes.toBytes(row), table);
2980  }
2981
2982  /**
2983   * Closes the region containing the given row.
2984   * @param row   The row to find the containing region.
2985   * @param table The table to find the region.
2986   */
2987  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2988    HRegionLocation hrl = table.getRegionLocation(row);
2989    unassignRegion(hrl.getRegion().getRegionName());
2990  }
2991
2992  /**
2993   * Retrieves a splittable region randomly from tableName
2994   * @param tableName   name of table
2995   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2996   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2997   */
2998  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2999    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3000    int regCount = regions.size();
3001    Set<Integer> attempted = new HashSet<>();
3002    int idx;
3003    int attempts = 0;
3004    do {
3005      regions = getHBaseCluster().getRegions(tableName);
3006      if (regCount != regions.size()) {
3007        // if there was region movement, clear attempted Set
3008        attempted.clear();
3009      }
3010      regCount = regions.size();
3011      // There are chances that before we get the region for the table from an RS the region may
3012      // be going for CLOSE. This may be because online schema change is enabled
3013      if (regCount > 0) {
3014        idx = ThreadLocalRandom.current().nextInt(regCount);
3015        // if we have just tried this region, there is no need to try again
3016        if (attempted.contains(idx)) {
3017          continue;
3018        }
3019        HRegion region = regions.get(idx);
3020        if (region.checkSplit().isPresent()) {
3021          return region;
3022        }
3023        attempted.add(idx);
3024      }
3025      attempts++;
3026    } while (maxAttempts == -1 || attempts < maxAttempts);
3027    return null;
3028  }
3029
3030  public MiniDFSCluster getDFSCluster() {
3031    return dfsCluster;
3032  }
3033
3034  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3035    setDFSCluster(cluster, true);
3036  }
3037
3038  /**
3039   * Set the MiniDFSCluster
3040   * @param cluster     cluster to use
3041   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
3042   *                    is set.
3043   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3044   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
3045   */
3046  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3047    throws IllegalStateException, IOException {
3048    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3049      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3050    }
3051    this.dfsCluster = cluster;
3052    this.setFs();
3053  }
3054
3055  public FileSystem getTestFileSystem() throws IOException {
3056    return HFileSystem.get(conf);
3057  }
3058
3059  /**
3060   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
3061   * (30 seconds).
3062   * @param table Table to wait on.
3063   */
3064  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
3065    waitTableAvailable(table.getName(), 30000);
3066  }
3067
3068  public void waitTableAvailable(TableName table, long timeoutMillis)
3069    throws InterruptedException, IOException {
3070    waitFor(timeoutMillis, predicateTableAvailable(table));
3071  }
3072
3073  /**
3074   * Wait until all regions in a table have been assigned
3075   * @param table         Table to wait on.
3076   * @param timeoutMillis Timeout.
3077   */
3078  public void waitTableAvailable(byte[] table, long timeoutMillis)
3079    throws InterruptedException, IOException {
3080    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3081  }
3082
3083  public String explainTableAvailability(TableName tableName) throws IOException {
3084    StringBuilder msg =
3085      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
3086    if (getHBaseCluster().getMaster().isAlive()) {
3087      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3088        .getRegionStates().getRegionAssignments();
3089      final List<Pair<RegionInfo, ServerName>> metaLocations =
3090        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3091      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3092        RegionInfo hri = metaLocation.getFirst();
3093        ServerName sn = metaLocation.getSecond();
3094        if (!assignments.containsKey(hri)) {
3095          msg.append(", region ").append(hri)
3096            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
3097        } else if (sn == null) {
3098          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
3099        } else if (!sn.equals(assignments.get(hri))) {
3100          msg.append(",  region ").append(hri)
3101            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
3102            .append(" <> ").append(assignments.get(hri));
3103        }
3104      }
3105    }
3106    return msg.toString();
3107  }
3108
3109  public String explainTableState(final TableName table, TableState.State state)
3110    throws IOException {
3111    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3112    if (tableState == null) {
3113      return "TableState in META: No table state in META for table " + table
3114        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3115    } else if (!tableState.inStates(state)) {
3116      return "TableState in META: Not " + state + " state, but " + tableState;
3117    } else {
3118      return "TableState in META: OK";
3119    }
3120  }
3121
3122  public TableState findLastTableState(final TableName table) throws IOException {
3123    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3124    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3125      @Override
3126      public boolean visit(Result r) throws IOException {
3127        if (!Arrays.equals(r.getRow(), table.getName())) {
3128          return false;
3129        }
3130        TableState state = CatalogFamilyFormat.getTableState(r);
3131        if (state != null) {
3132          lastTableState.set(state);
3133        }
3134        return true;
3135      }
3136    };
3137    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
3138      Integer.MAX_VALUE, visitor);
3139    return lastTableState.get();
3140  }
3141
3142  /**
3143   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3144   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
3145   * table.
3146   * @param table the table to wait on.
3147   * @throws InterruptedException if interrupted while waiting
3148   * @throws IOException          if an IO problem is encountered
3149   */
3150  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
3151    waitTableEnabled(table, 30000);
3152  }
3153
3154  /**
3155   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3156   * have been all assigned.
3157   * @see #waitTableEnabled(TableName, long)
3158   * @param table         Table to wait on.
3159   * @param timeoutMillis Time to wait on it being marked enabled.
3160   */
3161  public void waitTableEnabled(byte[] table, long timeoutMillis)
3162    throws InterruptedException, IOException {
3163    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3164  }
3165
3166  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
3167    waitFor(timeoutMillis, predicateTableEnabled(table));
3168  }
3169
3170  /**
3171   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
3172   * after default period (30 seconds)
3173   * @param table Table to wait on.
3174   */
3175  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
3176    waitTableDisabled(table, 30000);
3177  }
3178
3179  public void waitTableDisabled(TableName table, long millisTimeout)
3180    throws InterruptedException, IOException {
3181    waitFor(millisTimeout, predicateTableDisabled(table));
3182  }
3183
3184  /**
3185   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
3186   * @param table         Table to wait on.
3187   * @param timeoutMillis Time to wait on it being marked disabled.
3188   */
3189  public void waitTableDisabled(byte[] table, long timeoutMillis)
3190    throws InterruptedException, IOException {
3191    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3192  }
3193
3194  /**
3195   * Make sure that at least the specified number of region servers are running
3196   * @param num minimum number of region servers that should be running
3197   * @return true if we started some servers
3198   */
3199  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
3200    boolean startedServer = false;
3201    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3202    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
3203      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3204      startedServer = true;
3205    }
3206
3207    return startedServer;
3208  }
3209
3210  /**
3211   * Make sure that at least the specified number of region servers are running. We don't count the
3212   * ones that are currently stopping or are stopped.
3213   * @param num minimum number of region servers that should be running
3214   * @return true if we started some servers
3215   */
3216  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
3217    boolean startedServer = ensureSomeRegionServersAvailable(num);
3218
3219    int nonStoppedServers = 0;
3220    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
3221
3222      HRegionServer hrs = rst.getRegionServer();
3223      if (hrs.isStopping() || hrs.isStopped()) {
3224        LOG.info("A region server is stopped or stopping:" + hrs);
3225      } else {
3226        nonStoppedServers++;
3227      }
3228    }
3229    for (int i = nonStoppedServers; i < num; ++i) {
3230      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3231      startedServer = true;
3232    }
3233    return startedServer;
3234  }
3235
3236  /**
3237   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
3238   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
3239   * @param c                     Initial configuration
3240   * @param differentiatingSuffix Suffix to differentiate this user from others.
3241   * @return A new configuration instance with a different user set into it.
3242   */
3243  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
3244    throws IOException {
3245    FileSystem currentfs = FileSystem.get(c);
3246    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3247      return User.getCurrent();
3248    }
3249    // Else distributed filesystem. Make a new instance per daemon. Below
3250    // code is taken from the AppendTestUtil over in hdfs.
3251    String username = User.getCurrent().getName() + differentiatingSuffix;
3252    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
3253    return user;
3254  }
3255
3256  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3257    throws IOException {
3258    NavigableSet<String> online = new TreeSet<>();
3259    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3260      try {
3261        for (RegionInfo region : ProtobufUtil
3262          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3263          online.add(region.getRegionNameAsString());
3264        }
3265      } catch (RegionServerStoppedException e) {
3266        // That's fine.
3267      }
3268    }
3269    return online;
3270  }
3271
3272  /**
3273   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
3274   * linger. Here is the exception you'll see:
3275   *
3276   * <pre>
3277   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3278   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3279   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3280   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3281   * </pre>
3282   *
3283   * @param stream A DFSClient.DFSOutputStream.
3284   */
3285  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3286    try {
3287      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3288      for (Class<?> clazz : clazzes) {
3289        String className = clazz.getSimpleName();
3290        if (className.equals("DFSOutputStream")) {
3291          if (clazz.isInstance(stream)) {
3292            Field maxRecoveryErrorCountField =
3293              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3294            maxRecoveryErrorCountField.setAccessible(true);
3295            maxRecoveryErrorCountField.setInt(stream, max);
3296            break;
3297          }
3298        }
3299      }
3300    } catch (Exception e) {
3301      LOG.info("Could not set max recovery field", e);
3302    }
3303  }
3304
3305  /**
3306   * Uses directly the assignment manager to assign the region. and waits until the specified region
3307   * has completed assignment.
3308   * @return true if the region is assigned false otherwise.
3309   */
3310  public boolean assignRegion(final RegionInfo regionInfo)
3311    throws IOException, InterruptedException {
3312    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3313    am.assign(regionInfo);
3314    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3315  }
3316
3317  /**
3318   * Move region to destination server and wait till region is completely moved and online
3319   * @param destRegion region to move
3320   * @param destServer destination server of the region
3321   */
3322  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3323    throws InterruptedException, IOException {
3324    HMaster master = getMiniHBaseCluster().getMaster();
3325    // TODO: Here we start the move. The move can take a while.
3326    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3327    while (true) {
3328      ServerName serverName =
3329        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3330      if (serverName != null && serverName.equals(destServer)) {
3331        assertRegionOnServer(destRegion, serverName, 2000);
3332        break;
3333      }
3334      Thread.sleep(10);
3335    }
3336  }
3337
3338  /**
3339   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3340   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3341   * master has been informed and updated hbase:meta with the regions deployed server.
3342   * @param tableName the table name
3343   */
3344  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3345    waitUntilAllRegionsAssigned(tableName,
3346      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3347  }
3348
3349  /**
3350   * Waith until all system table's regions get assigned
3351   */
3352  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3353    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3354  }
3355
3356  /**
3357   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3358   * timeout. This means all regions have been deployed, master has been informed and updated
3359   * hbase:meta with the regions deployed server.
3360   * @param tableName the table name
3361   * @param timeout   timeout, in milliseconds
3362   */
3363  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3364    throws IOException {
3365    if (!TableName.isMetaTableName(tableName)) {
3366      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3367        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3368          + timeout + "ms");
3369        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3370          @Override
3371          public String explainFailure() throws IOException {
3372            return explainTableAvailability(tableName);
3373          }
3374
3375          @Override
3376          public boolean evaluate() throws IOException {
3377            Scan scan = new Scan();
3378            scan.addFamily(HConstants.CATALOG_FAMILY);
3379            boolean tableFound = false;
3380            try (ResultScanner s = meta.getScanner(scan)) {
3381              for (Result r; (r = s.next()) != null;) {
3382                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3383                RegionInfo info = RegionInfo.parseFromOrNull(b);
3384                if (info != null && info.getTable().equals(tableName)) {
3385                  // Get server hosting this region from catalog family. Return false if no server
3386                  // hosting this region, or if the server hosting this region was recently killed
3387                  // (for fault tolerance testing).
3388                  tableFound = true;
3389                  byte[] server =
3390                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3391                  if (server == null) {
3392                    return false;
3393                  } else {
3394                    byte[] startCode =
3395                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3396                    ServerName serverName =
3397                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3398                        + Bytes.toLong(startCode));
3399                    if (
3400                      !getHBaseClusterInterface().isDistributedCluster()
3401                        && getHBaseCluster().isKilledRS(serverName)
3402                    ) {
3403                      return false;
3404                    }
3405                  }
3406                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3407                    return false;
3408                  }
3409                }
3410              }
3411            }
3412            if (!tableFound) {
3413              LOG.warn(
3414                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3415            }
3416            return tableFound;
3417          }
3418        });
3419      }
3420    }
3421    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3422    // check from the master state if we are using a mini cluster
3423    if (!getHBaseClusterInterface().isDistributedCluster()) {
3424      // So, all regions are in the meta table but make sure master knows of the assignments before
3425      // returning -- sometimes this can lag.
3426      HMaster master = getHBaseCluster().getMaster();
3427      final RegionStates states = master.getAssignmentManager().getRegionStates();
3428      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3429        @Override
3430        public String explainFailure() throws IOException {
3431          return explainTableAvailability(tableName);
3432        }
3433
3434        @Override
3435        public boolean evaluate() throws IOException {
3436          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3437          return hris != null && !hris.isEmpty();
3438        }
3439      });
3440    }
3441    LOG.info("All regions for table " + tableName + " assigned.");
3442  }
3443
3444  /**
3445   * Do a small get/scan against one store. This is required because store has no actual methods of
3446   * querying itself, and relies on StoreScanner.
3447   */
3448  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3449    Scan scan = new Scan(get);
3450    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3451      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3452      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3453      // readpoint 0.
3454      0);
3455
3456    List<Cell> result = new ArrayList<>();
3457    scanner.next(result);
3458    if (!result.isEmpty()) {
3459      // verify that we are on the row we want:
3460      Cell kv = result.get(0);
3461      if (!CellUtil.matchingRows(kv, get.getRow())) {
3462        result.clear();
3463      }
3464    }
3465    scanner.close();
3466    return result;
3467  }
3468
3469  /**
3470   * Create region split keys between startkey and endKey
3471   * @param numRegions the number of regions to be created. it has to be greater than 3.
3472   * @return resulting split keys
3473   */
3474  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3475    if (numRegions <= 3) {
3476      throw new AssertionError();
3477    }
3478    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3479    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3480    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3481    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3482    return result;
3483  }
3484
3485  /**
3486   * Do a small get/scan against one store. This is required because store has no actual methods of
3487   * querying itself, and relies on StoreScanner.
3488   */
3489  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3490    throws IOException {
3491    Get get = new Get(row);
3492    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3493    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3494
3495    return getFromStoreFile(store, get);
3496  }
3497
3498  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3499    final List<? extends Cell> actual) {
3500    final int eLen = expected.size();
3501    final int aLen = actual.size();
3502    final int minLen = Math.min(eLen, aLen);
3503
3504    int i;
3505    for (i = 0; i < minLen
3506      && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) {
3507    }
3508
3509    if (additionalMsg == null) {
3510      additionalMsg = "";
3511    }
3512    if (!additionalMsg.isEmpty()) {
3513      additionalMsg = ". " + additionalMsg;
3514    }
3515
3516    if (eLen != aLen || i != minLen) {
3517      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3518        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3519        + " (length " + aLen + ")" + additionalMsg);
3520    }
3521  }
3522
3523  public static <T> String safeGetAsStr(List<T> lst, int i) {
3524    if (0 <= i && i < lst.size()) {
3525      return lst.get(i).toString();
3526    } else {
3527      return "<out_of_range>";
3528    }
3529  }
3530
3531  public String getClusterKey() {
3532    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3533      + ":"
3534      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3535  }
3536
3537  /** Creates a random table with the given parameters */
3538  public Table createRandomTable(TableName tableName, final Collection<String> families,
3539    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3540    final int numRowsPerFlush) throws IOException, InterruptedException {
3541
3542    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3543      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3544      + maxVersions + "\n");
3545
3546    final int numCF = families.size();
3547    final byte[][] cfBytes = new byte[numCF][];
3548    {
3549      int cfIndex = 0;
3550      for (String cf : families) {
3551        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3552      }
3553    }
3554
3555    final int actualStartKey = 0;
3556    final int actualEndKey = Integer.MAX_VALUE;
3557    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3558    final int splitStartKey = actualStartKey + keysPerRegion;
3559    final int splitEndKey = actualEndKey - keysPerRegion;
3560    final String keyFormat = "%08x";
3561    final Table table = createTable(tableName, cfBytes, maxVersions,
3562      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3563      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3564
3565    if (hbaseCluster != null) {
3566      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3567    }
3568
3569    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3570
3571    final Random rand = ThreadLocalRandom.current();
3572    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3573      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3574        final byte[] row = Bytes.toBytes(
3575          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3576
3577        Put put = new Put(row);
3578        Delete del = new Delete(row);
3579        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3580          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3581          final long ts = rand.nextInt();
3582          final byte[] qual = Bytes.toBytes("col" + iCol);
3583          if (rand.nextBoolean()) {
3584            final byte[] value =
3585              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3586                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3587            put.addColumn(cf, qual, ts, value);
3588          } else if (rand.nextDouble() < 0.8) {
3589            del.addColumn(cf, qual, ts);
3590          } else {
3591            del.addColumns(cf, qual, ts);
3592          }
3593        }
3594
3595        if (!put.isEmpty()) {
3596          mutator.mutate(put);
3597        }
3598
3599        if (!del.isEmpty()) {
3600          mutator.mutate(del);
3601        }
3602      }
3603      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3604      mutator.flush();
3605      if (hbaseCluster != null) {
3606        getMiniHBaseCluster().flushcache(table.getName());
3607      }
3608    }
3609    mutator.close();
3610
3611    return table;
3612  }
3613
3614  public static int randomFreePort() {
3615    return HBaseCommonTestingUtility.randomFreePort();
3616  }
3617
3618  public static String randomMultiCastAddress() {
3619    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3620  }
3621
3622  public static void waitForHostPort(String host, int port) throws IOException {
3623    final int maxTimeMs = 10000;
3624    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3625    IOException savedException = null;
3626    LOG.info("Waiting for server at " + host + ":" + port);
3627    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3628      try {
3629        Socket sock = new Socket(InetAddress.getByName(host), port);
3630        sock.close();
3631        savedException = null;
3632        LOG.info("Server at " + host + ":" + port + " is available");
3633        break;
3634      } catch (UnknownHostException e) {
3635        throw new IOException("Failed to look up " + host, e);
3636      } catch (IOException e) {
3637        savedException = e;
3638      }
3639      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3640    }
3641
3642    if (savedException != null) {
3643      throw savedException;
3644    }
3645  }
3646
3647  /**
3648   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3649   * continues.
3650   * @return the number of regions the table was split into
3651   */
3652  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3653    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3654    throws IOException {
3655    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3656      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3657  }
3658
3659  /**
3660   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3661   * continues.
3662   * @return the number of regions the table was split into
3663   */
3664  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3665    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3666    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3667    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3668    builder.setDurability(durability);
3669    builder.setRegionReplication(regionReplication);
3670    ColumnFamilyDescriptorBuilder cfBuilder =
3671      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3672    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3673    cfBuilder.setCompressionType(compression);
3674    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3675      numRegionsPerServer);
3676  }
3677
3678  /**
3679   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3680   * continues.
3681   * @return the number of regions the table was split into
3682   */
3683  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3684    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3685    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3686    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3687    builder.setDurability(durability);
3688    builder.setRegionReplication(regionReplication);
3689    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3690    for (int i = 0; i < columnFamilies.length; i++) {
3691      ColumnFamilyDescriptorBuilder cfBuilder =
3692        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3693      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3694      cfBuilder.setCompressionType(compression);
3695      hcds[i] = cfBuilder.build();
3696    }
3697    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3698  }
3699
3700  /**
3701   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3702   * continues.
3703   * @return the number of regions the table was split into
3704   */
3705  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3706    ColumnFamilyDescriptor hcd) throws IOException {
3707    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3708  }
3709
3710  /**
3711   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3712   * continues.
3713   * @return the number of regions the table was split into
3714   */
3715  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3716    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3717    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3718      numRegionsPerServer);
3719  }
3720
3721  /**
3722   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3723   * continues.
3724   * @return the number of regions the table was split into
3725   */
3726  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3727    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3728    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3729      numRegionsPerServer);
3730  }
3731
3732  /**
3733   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3734   * continues.
3735   * @return the number of regions the table was split into
3736   */
3737  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3738    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3739    throws IOException {
3740    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3741    for (ColumnFamilyDescriptor cd : cds) {
3742      if (!td.hasColumnFamily(cd.getName())) {
3743        builder.setColumnFamily(cd);
3744      }
3745    }
3746    td = builder.build();
3747    int totalNumberOfRegions = 0;
3748    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3749    Admin admin = unmanagedConnection.getAdmin();
3750
3751    try {
3752      // create a table a pre-splits regions.
3753      // The number of splits is set as:
3754      // region servers * regions per region server).
3755      int numberOfServers = admin.getRegionServers().size();
3756      if (numberOfServers == 0) {
3757        throw new IllegalStateException("No live regionservers");
3758      }
3759
3760      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3761      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3762        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3763        + numRegionsPerServer + ")");
3764
3765      byte[][] splits = splitter.split(totalNumberOfRegions);
3766
3767      admin.createTable(td, splits);
3768    } catch (MasterNotRunningException e) {
3769      LOG.error("Master not running", e);
3770      throw new IOException(e);
3771    } catch (TableExistsException e) {
3772      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3773    } finally {
3774      admin.close();
3775      unmanagedConnection.close();
3776    }
3777    return totalNumberOfRegions;
3778  }
3779
3780  public static int getMetaRSPort(Connection connection) throws IOException {
3781    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3782      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3783    }
3784  }
3785
3786  /**
3787   * Due to async racing issue, a region may not be in the online region list of a region server
3788   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3789   */
3790  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3791    final long timeout) throws IOException, InterruptedException {
3792    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3793    while (true) {
3794      List<RegionInfo> regions = getAdmin().getRegions(server);
3795      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3796      long now = EnvironmentEdgeManager.currentTime();
3797      if (now > timeoutTime) break;
3798      Thread.sleep(10);
3799    }
3800    throw new AssertionError(
3801      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3802  }
3803
3804  /**
3805   * Check to make sure the region is open on the specified region server, but not on any other one.
3806   */
3807  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3808    final long timeout) throws IOException, InterruptedException {
3809    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3810    while (true) {
3811      List<RegionInfo> regions = getAdmin().getRegions(server);
3812      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3813        List<JVMClusterUtil.RegionServerThread> rsThreads =
3814          getHBaseCluster().getLiveRegionServerThreads();
3815        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3816          HRegionServer rs = rsThread.getRegionServer();
3817          if (server.equals(rs.getServerName())) {
3818            continue;
3819          }
3820          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3821          for (HRegion r : hrs) {
3822            if (r.getRegionInfo().getRegionId() == hri.getRegionId()) {
3823              throw new AssertionError("Region should not be double assigned");
3824            }
3825          }
3826        }
3827        return; // good, we are happy
3828      }
3829      long now = EnvironmentEdgeManager.currentTime();
3830      if (now > timeoutTime) break;
3831      Thread.sleep(10);
3832    }
3833    throw new AssertionError(
3834      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3835  }
3836
3837  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3838    TableDescriptor td =
3839      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3840    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3841    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3842  }
3843
3844  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3845    BlockCache blockCache) throws IOException {
3846    TableDescriptor td =
3847      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3848    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3849    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3850  }
3851
3852  public static void setFileSystemURI(String fsURI) {
3853    FS_URI = fsURI;
3854  }
3855
3856  /**
3857   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3858   */
3859  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3860    return new ExplainingPredicate<IOException>() {
3861      @Override
3862      public String explainFailure() throws IOException {
3863        final RegionStates regionStates =
3864          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3865        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3866      }
3867
3868      @Override
3869      public boolean evaluate() throws IOException {
3870        HMaster master = getMiniHBaseCluster().getMaster();
3871        if (master == null) return false;
3872        AssignmentManager am = master.getAssignmentManager();
3873        if (am == null) return false;
3874        return !am.hasRegionsInTransition();
3875      }
3876    };
3877  }
3878
3879  /**
3880   * Returns a {@link Predicate} for checking that table is enabled
3881   */
3882  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3883    return new ExplainingPredicate<IOException>() {
3884      @Override
3885      public String explainFailure() throws IOException {
3886        return explainTableState(tableName, TableState.State.ENABLED);
3887      }
3888
3889      @Override
3890      public boolean evaluate() throws IOException {
3891        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3892      }
3893    };
3894  }
3895
3896  /**
3897   * Returns a {@link Predicate} for checking that table is enabled
3898   */
3899  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3900    return new ExplainingPredicate<IOException>() {
3901      @Override
3902      public String explainFailure() throws IOException {
3903        return explainTableState(tableName, TableState.State.DISABLED);
3904      }
3905
3906      @Override
3907      public boolean evaluate() throws IOException {
3908        return getAdmin().isTableDisabled(tableName);
3909      }
3910    };
3911  }
3912
3913  /**
3914   * Returns a {@link Predicate} for checking that table is enabled
3915   */
3916  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3917    return new ExplainingPredicate<IOException>() {
3918      @Override
3919      public String explainFailure() throws IOException {
3920        return explainTableAvailability(tableName);
3921      }
3922
3923      @Override
3924      public boolean evaluate() throws IOException {
3925        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3926        if (tableAvailable) {
3927          try (Table table = getConnection().getTable(tableName)) {
3928            TableDescriptor htd = table.getDescriptor();
3929            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3930              .getAllRegionLocations()) {
3931              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3932                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3933                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3934              for (byte[] family : htd.getColumnFamilyNames()) {
3935                scan.addFamily(family);
3936              }
3937              try (ResultScanner scanner = table.getScanner(scan)) {
3938                scanner.next();
3939              }
3940            }
3941          }
3942        }
3943        return tableAvailable;
3944      }
3945    };
3946  }
3947
3948  /**
3949   * Wait until no regions in transition.
3950   * @param timeout How long to wait.
3951   */
3952  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3953    waitFor(timeout, predicateNoRegionsInTransition());
3954  }
3955
3956  /**
3957   * Wait until no regions in transition. (time limit 15min)
3958   */
3959  public void waitUntilNoRegionsInTransition() throws IOException {
3960    waitUntilNoRegionsInTransition(15 * 60000);
3961  }
3962
3963  /**
3964   * Wait until labels is ready in VisibilityLabelsCache.
3965   */
3966  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3967    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3968    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3969
3970      @Override
3971      public boolean evaluate() {
3972        for (String label : labels) {
3973          if (labelsCache.getLabelOrdinal(label) == 0) {
3974            return false;
3975          }
3976        }
3977        return true;
3978      }
3979
3980      @Override
3981      public String explainFailure() {
3982        for (String label : labels) {
3983          if (labelsCache.getLabelOrdinal(label) == 0) {
3984            return label + " is not available yet";
3985          }
3986        }
3987        return "";
3988      }
3989    });
3990  }
3991
3992  /**
3993   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3994   * available.
3995   * @return the list of column descriptors
3996   */
3997  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3998    return generateColumnDescriptors("");
3999  }
4000
4001  /**
4002   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4003   * available.
4004   * @param prefix family names prefix
4005   * @return the list of column descriptors
4006   */
4007  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4008    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4009    long familyId = 0;
4010    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
4011      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
4012        for (BloomType bloomType : BloomType.values()) {
4013          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4014          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4015            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4016          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4017          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4018          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4019          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4020          familyId++;
4021        }
4022      }
4023    }
4024    return columnFamilyDescriptors;
4025  }
4026
4027  /**
4028   * Get supported compression algorithms.
4029   * @return supported compression algorithms.
4030   */
4031  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4032    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4033    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4034    for (String algoName : allAlgos) {
4035      try {
4036        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4037        algo.getCompressor();
4038        supportedAlgos.add(algo);
4039      } catch (Throwable t) {
4040        // this algo is not available
4041      }
4042    }
4043    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4044  }
4045
4046  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4047    Scan scan = new Scan().withStartRow(row);
4048    scan.setReadType(ReadType.PREAD);
4049    scan.setCaching(1);
4050    scan.setReversed(true);
4051    scan.addFamily(family);
4052    try (RegionScanner scanner = r.getScanner(scan)) {
4053      List<Cell> cells = new ArrayList<>(1);
4054      scanner.next(cells);
4055      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4056        return null;
4057      }
4058      return Result.create(cells);
4059    }
4060  }
4061
4062  private boolean isTargetTable(final byte[] inRow, Cell c) {
4063    String inputRowString = Bytes.toString(inRow);
4064    int i = inputRowString.indexOf(HConstants.DELIMITER);
4065    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4066    int o = outputRowString.indexOf(HConstants.DELIMITER);
4067    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4068  }
4069
4070  /**
4071   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
4072   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
4073   * kerby KDC server and utility for using it,
4074   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4075   * less baggage. It came in in HBASE-5291.
4076   */
4077  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4078    Properties conf = MiniKdc.createConf();
4079    conf.put(MiniKdc.DEBUG, true);
4080    MiniKdc kdc = null;
4081    File dir = null;
4082    // There is time lag between selecting a port and trying to bind with it. It's possible that
4083    // another service captures the port in between which'll result in BindException.
4084    boolean bindException;
4085    int numTries = 0;
4086    do {
4087      try {
4088        bindException = false;
4089        dir = new File(getDataTestDir("kdc").toUri().getPath());
4090        kdc = new MiniKdc(conf, dir);
4091        kdc.start();
4092      } catch (BindException e) {
4093        FileUtils.deleteDirectory(dir); // clean directory
4094        numTries++;
4095        if (numTries == 3) {
4096          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4097          throw e;
4098        }
4099        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4100        bindException = true;
4101      }
4102    } while (bindException);
4103    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4104    return kdc;
4105  }
4106
4107  public int getNumHFiles(final TableName tableName, final byte[] family) {
4108    int numHFiles = 0;
4109    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4110      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
4111    }
4112    return numHFiles;
4113  }
4114
4115  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4116    final byte[] family) {
4117    int numHFiles = 0;
4118    for (Region region : rs.getRegions(tableName)) {
4119      numHFiles += region.getStore(family).getStorefilesCount();
4120    }
4121    return numHFiles;
4122  }
4123
4124  private void assertEquals(String message, int expected, int actual) {
4125    if (expected == actual) {
4126      return;
4127    }
4128    String formatted = "";
4129    if (message != null && !"".equals(message)) {
4130      formatted = message + " ";
4131    }
4132    throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">");
4133  }
4134
4135  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4136    if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) {
4137      throw new AssertionError();
4138    }
4139    assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode());
4140    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4141    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4142    assertEquals("", ltdFamilies.size(), rtdFamilies.size());
4143    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
4144        it2 = rtdFamilies.iterator(); it.hasNext();) {
4145      assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4146    }
4147  }
4148
4149  /**
4150   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4151   * invocations.
4152   */
4153  public static void await(final long sleepMillis, final BooleanSupplier condition)
4154    throws InterruptedException {
4155    try {
4156      while (!condition.getAsBoolean()) {
4157        Thread.sleep(sleepMillis);
4158      }
4159    } catch (RuntimeException e) {
4160      if (e.getCause() instanceof AssertionError) {
4161        throw (AssertionError) e.getCause();
4162      }
4163      throw e;
4164    }
4165  }
4166}