001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.io.UncheckedIOException;
024import java.lang.reflect.Field;
025import java.lang.reflect.Modifier;
026import java.net.BindException;
027import java.net.DatagramSocket;
028import java.net.InetAddress;
029import java.net.ServerSocket;
030import java.net.Socket;
031import java.net.UnknownHostException;
032import java.nio.charset.StandardCharsets;
033import java.security.MessageDigest;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Properties;
044import java.util.Random;
045import java.util.Set;
046import java.util.TreeSet;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.atomic.AtomicReference;
049import java.util.function.BooleanSupplier;
050import org.apache.commons.io.FileUtils;
051import org.apache.commons.lang3.RandomStringUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
056import org.apache.hadoop.hbase.Waiter.Predicate;
057import org.apache.hadoop.hbase.client.Admin;
058import org.apache.hadoop.hbase.client.AsyncClusterConnection;
059import org.apache.hadoop.hbase.client.BufferedMutator;
060import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Connection;
064import org.apache.hadoop.hbase.client.ConnectionFactory;
065import org.apache.hadoop.hbase.client.Consistency;
066import org.apache.hadoop.hbase.client.Delete;
067import org.apache.hadoop.hbase.client.Durability;
068import org.apache.hadoop.hbase.client.Get;
069import org.apache.hadoop.hbase.client.Hbck;
070import org.apache.hadoop.hbase.client.MasterRegistry;
071import org.apache.hadoop.hbase.client.Put;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionInfoBuilder;
074import org.apache.hadoop.hbase.client.RegionLocator;
075import org.apache.hadoop.hbase.client.Result;
076import org.apache.hadoop.hbase.client.ResultScanner;
077import org.apache.hadoop.hbase.client.Scan;
078import org.apache.hadoop.hbase.client.Scan.ReadType;
079import org.apache.hadoop.hbase.client.Table;
080import org.apache.hadoop.hbase.client.TableDescriptor;
081import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
082import org.apache.hadoop.hbase.client.TableState;
083import org.apache.hadoop.hbase.fs.HFileSystem;
084import org.apache.hadoop.hbase.io.compress.Compression;
085import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
086import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
087import org.apache.hadoop.hbase.io.hfile.BlockCache;
088import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
089import org.apache.hadoop.hbase.io.hfile.HFile;
090import org.apache.hadoop.hbase.ipc.RpcServerInterface;
091import org.apache.hadoop.hbase.logging.Log4jUtils;
092import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
093import org.apache.hadoop.hbase.master.HMaster;
094import org.apache.hadoop.hbase.master.RegionState;
095import org.apache.hadoop.hbase.master.ServerManager;
096import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
097import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
098import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
099import org.apache.hadoop.hbase.master.assignment.RegionStates;
100import org.apache.hadoop.hbase.mob.MobFileCache;
101import org.apache.hadoop.hbase.regionserver.BloomType;
102import org.apache.hadoop.hbase.regionserver.ChunkCreator;
103import org.apache.hadoop.hbase.regionserver.HRegion;
104import org.apache.hadoop.hbase.regionserver.HRegionServer;
105import org.apache.hadoop.hbase.regionserver.HStore;
106import org.apache.hadoop.hbase.regionserver.InternalScanner;
107import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
108import org.apache.hadoop.hbase.regionserver.Region;
109import org.apache.hadoop.hbase.regionserver.RegionScanner;
110import org.apache.hadoop.hbase.regionserver.RegionServerServices;
111import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
112import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
113import org.apache.hadoop.hbase.security.User;
114import org.apache.hadoop.hbase.security.UserProvider;
115import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
116import org.apache.hadoop.hbase.util.Bytes;
117import org.apache.hadoop.hbase.util.CommonFSUtils;
118import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
119import org.apache.hadoop.hbase.util.FSUtils;
120import org.apache.hadoop.hbase.util.JVMClusterUtil;
121import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
122import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
123import org.apache.hadoop.hbase.util.Pair;
124import org.apache.hadoop.hbase.util.ReflectionUtils;
125import org.apache.hadoop.hbase.util.RegionSplitter;
126import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
127import org.apache.hadoop.hbase.util.RetryCounter;
128import org.apache.hadoop.hbase.util.Threads;
129import org.apache.hadoop.hbase.wal.WAL;
130import org.apache.hadoop.hbase.wal.WALFactory;
131import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
132import org.apache.hadoop.hbase.zookeeper.ZKConfig;
133import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
134import org.apache.hadoop.hdfs.DFSClient;
135import org.apache.hadoop.hdfs.DistributedFileSystem;
136import org.apache.hadoop.hdfs.MiniDFSCluster;
137import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
138import org.apache.hadoop.mapred.JobConf;
139import org.apache.hadoop.mapred.MiniMRCluster;
140import org.apache.hadoop.mapred.TaskLog;
141import org.apache.hadoop.minikdc.MiniKdc;
142import org.apache.yetus.audience.InterfaceAudience;
143import org.apache.zookeeper.WatchedEvent;
144import org.apache.zookeeper.ZooKeeper;
145import org.apache.zookeeper.ZooKeeper.States;
146
147import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
148
149import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
150
151/**
152 * Facility for testing HBase. Replacement for
153 * old HBaseTestCase and HBaseClusterTestCase functionality.
154 * Create an instance and keep it around testing HBase.  This class is
155 * meant to be your one-stop shop for anything you might need testing.  Manages
156 * one cluster at a time only. Managed cluster can be an in-process
157 * {@link MiniHBaseCluster}, or a deployed cluster of type {@code DistributedHBaseCluster}.
158 * Not all methods work with the real cluster.
159 * Depends on log4j being on classpath and
160 * hbase-site.xml for logging and test-run configuration.  It does not set
161 * logging levels.
162 * In the configuration properties, default values for master-info-port and
163 * region-server-port are overridden such that a random port will be assigned (thus
164 * avoiding port contention if another local HBase instance is already running).
165 * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
166 * setting it to true.
167 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
168 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
169 */
170@InterfaceAudience.Public
171@Deprecated
172public class HBaseTestingUtility extends HBaseZKTestingUtility {
173
174  /**
175   * System property key to get test directory value. Name is as it is because mini dfs has
176   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
177   * used in mini dfs.
178   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
179   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
180   */
181  @Deprecated
182  private static final String TEST_DIRECTORY_KEY = "test.build.data";
183
184  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
185  /**
186   * The default number of regions per regionserver when creating a pre-split
187   * table.
188   */
189  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
190
191
192  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
193  public static final boolean PRESPLIT_TEST_TABLE = true;
194
195  private MiniDFSCluster dfsCluster = null;
196
197  private volatile HBaseCluster hbaseCluster = null;
198  private MiniMRCluster mrCluster = null;
199
200  /** If there is a mini cluster running for this testing utility instance. */
201  private volatile boolean miniClusterRunning;
202
203  private String hadoopLogDir;
204
205  /** Directory on test filesystem where we put the data for this instance of
206    * HBaseTestingUtility*/
207  private Path dataTestDirOnTestFS = null;
208
209  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
210
211  /** Filesystem URI used for map-reduce mini-cluster setup */
212  private static String FS_URI;
213
214  /** This is for unit tests parameterized with a single boolean. */
215  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
216
217  /**
218   * Checks to see if a specific port is available.
219   *
220   * @param port the port number to check for availability
221   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
222   */
223  public static boolean available(int port) {
224    ServerSocket ss = null;
225    DatagramSocket ds = null;
226    try {
227      ss = new ServerSocket(port);
228      ss.setReuseAddress(true);
229      ds = new DatagramSocket(port);
230      ds.setReuseAddress(true);
231      return true;
232    } catch (IOException e) {
233      // Do nothing
234    } finally {
235      if (ds != null) {
236        ds.close();
237      }
238
239      if (ss != null) {
240        try {
241          ss.close();
242        } catch (IOException e) {
243          /* should not be thrown */
244        }
245      }
246    }
247
248    return false;
249  }
250
251  /**
252   * Create all combinations of Bloom filters and compression algorithms for
253   * testing.
254   */
255  private static List<Object[]> bloomAndCompressionCombinations() {
256    List<Object[]> configurations = new ArrayList<>();
257    for (Compression.Algorithm comprAlgo :
258         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
259      for (BloomType bloomType : BloomType.values()) {
260        configurations.add(new Object[] { comprAlgo, bloomType });
261      }
262    }
263    return Collections.unmodifiableList(configurations);
264  }
265
266  /**
267   * Create combination of memstoreTS and tags
268   */
269  private static List<Object[]> memStoreTSAndTagsCombination() {
270    List<Object[]> configurations = new ArrayList<>();
271    configurations.add(new Object[] { false, false });
272    configurations.add(new Object[] { false, true });
273    configurations.add(new Object[] { true, false });
274    configurations.add(new Object[] { true, true });
275    return Collections.unmodifiableList(configurations);
276  }
277
278  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
279    List<Object[]> configurations = new ArrayList<>();
280    configurations.add(new Object[] { false, false, true });
281    configurations.add(new Object[] { false, false, false });
282    configurations.add(new Object[] { false, true, true });
283    configurations.add(new Object[] { false, true, false });
284    configurations.add(new Object[] { true, false, true });
285    configurations.add(new Object[] { true, false, false });
286    configurations.add(new Object[] { true, true, true });
287    configurations.add(new Object[] { true, true, false });
288    return Collections.unmodifiableList(configurations);
289  }
290
291  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
292      bloomAndCompressionCombinations();
293
294
295  /**
296   * <p>Create an HBaseTestingUtility using a default configuration.
297   *
298   * <p>Initially, all tmp files are written to a local test data directory.
299   * Once {@link #startMiniDFSCluster} is called, either directly or via
300   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
301   */
302  public HBaseTestingUtility() {
303    this(HBaseConfiguration.create());
304  }
305
306  /**
307   * <p>Create an HBaseTestingUtility using a given configuration.
308   *
309   * <p>Initially, all tmp files are written to a local test data directory.
310   * Once {@link #startMiniDFSCluster} is called, either directly or via
311   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
312   *
313   * @param conf The configuration to use for further operations
314   */
315  public HBaseTestingUtility(Configuration conf) {
316    super(conf);
317
318    // a hbase checksum verification failure will cause unit tests to fail
319    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
320
321    // Save this for when setting default file:// breaks things
322    if (this.conf.get("fs.defaultFS") != null) {
323      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
324    }
325    if (this.conf.get(HConstants.HBASE_DIR) != null) {
326      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
327    }
328    // Every cluster is a local cluster until we start DFS
329    // Note that conf could be null, but this.conf will not be
330    String dataTestDir = getDataTestDir().toString();
331    this.conf.set("fs.defaultFS","file:///");
332    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
333    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
334    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
335    // If the value for random ports isn't set set it to true, thus making
336    // tests opt-out for random port assignment
337    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
338        this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
339  }
340
341  /**
342   * Close both the region {@code r} and it's underlying WAL. For use in tests.
343   */
344  public static void closeRegionAndWAL(final Region r) throws IOException {
345    closeRegionAndWAL((HRegion)r);
346  }
347
348  /**
349   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
350   */
351  public static void closeRegionAndWAL(final HRegion r) throws IOException {
352    if (r == null) return;
353    r.close();
354    if (r.getWAL() == null) return;
355    r.getWAL().close();
356  }
357
358  /**
359   * Returns this classes's instance of {@link Configuration}.  Be careful how
360   * you use the returned Configuration since {@link Connection} instances
361   * can be shared.  The Map of Connections is keyed by the Configuration.  If
362   * say, a Connection was being used against a cluster that had been shutdown,
363   * see {@link #shutdownMiniCluster()}, then the Connection will no longer
364   * be wholesome.  Rather than use the return direct, its usually best to
365   * make a copy and use that.  Do
366   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
367   * @return Instance of Configuration.
368   */
369  @Override
370  public Configuration getConfiguration() {
371    return super.getConfiguration();
372  }
373
374  public void setHBaseCluster(HBaseCluster hbaseCluster) {
375    this.hbaseCluster = hbaseCluster;
376  }
377
378  /**
379   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
380   * Give it a random name so can have many concurrent tests running if
381   * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
382   * System property, as it's what minidfscluster bases
383   * it data dir on.  Moding a System property is not the way to do concurrent
384   * instances -- another instance could grab the temporary
385   * value unintentionally -- but not anything can do about it at moment;
386   * single instance only is how the minidfscluster works.
387   *
388   * We also create the underlying directory names for
389   *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
390   *  in the conf, and as a system property for hadoop.tmp.dir (We do not create them!).
391   *
392   * @return The calculated data test build directory, if newly-created.
393   */
394  @Override
395  protected Path setupDataTestDir() {
396    Path testPath = super.setupDataTestDir();
397    if (null == testPath) {
398      return null;
399    }
400
401    createSubDirAndSystemProperty(
402      "hadoop.log.dir",
403      testPath, "hadoop-log-dir");
404
405    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
406    //  we want our own value to ensure uniqueness on the same machine
407    createSubDirAndSystemProperty(
408      "hadoop.tmp.dir",
409      testPath, "hadoop-tmp-dir");
410
411    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
412    createSubDir(
413      "mapreduce.cluster.local.dir",
414      testPath, "mapred-local-dir");
415    return testPath;
416  }
417
418  private void createSubDirAndSystemProperty(
419    String propertyName, Path parent, String subDirName){
420
421    String sysValue = System.getProperty(propertyName);
422
423    if (sysValue != null) {
424      // There is already a value set. So we do nothing but hope
425      //  that there will be no conflicts
426      LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
427        sysValue + " so I do NOT create it in " + parent);
428      String confValue = conf.get(propertyName);
429      if (confValue != null && !confValue.endsWith(sysValue)){
430       LOG.warn(
431         propertyName + " property value differs in configuration and system: "+
432         "Configuration="+confValue+" while System="+sysValue+
433         " Erasing configuration value by system value."
434       );
435      }
436      conf.set(propertyName, sysValue);
437    } else {
438      // Ok, it's not set, so we create it as a subdirectory
439      createSubDir(propertyName, parent, subDirName);
440      System.setProperty(propertyName, conf.get(propertyName));
441    }
442  }
443
444  /**
445   * @return Where to write test data on the test filesystem; Returns working directory
446   * for the test filesystem by default
447   * @see #setupDataTestDirOnTestFS()
448   * @see #getTestFileSystem()
449   */
450  private Path getBaseTestDirOnTestFS() throws IOException {
451    FileSystem fs = getTestFileSystem();
452    return new Path(fs.getWorkingDirectory(), "test-data");
453  }
454
455  /**
456   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
457   * to write temporary test data. Call this method after setting up the mini dfs cluster
458   * if the test relies on it.
459   * @return a unique path in the test filesystem
460   */
461  public Path getDataTestDirOnTestFS() throws IOException {
462    if (dataTestDirOnTestFS == null) {
463      setupDataTestDirOnTestFS();
464    }
465
466    return dataTestDirOnTestFS;
467  }
468
469  /**
470   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
471   * to write temporary test data. Call this method after setting up the mini dfs cluster
472   * if the test relies on it.
473   * @return a unique path in the test filesystem
474   * @param subdirName name of the subdir to create under the base test dir
475   */
476  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
477    return new Path(getDataTestDirOnTestFS(), subdirName);
478  }
479
480  /**
481   * Sets up a path in test filesystem to be used by tests.
482   * Creates a new directory if not already setup.
483   */
484  private void setupDataTestDirOnTestFS() throws IOException {
485    if (dataTestDirOnTestFS != null) {
486      LOG.warn("Data test on test fs dir already setup in "
487          + dataTestDirOnTestFS.toString());
488      return;
489    }
490    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
491  }
492
493  /**
494   * Sets up a new path in test filesystem to be used by tests.
495   */
496  private Path getNewDataTestDirOnTestFS() throws IOException {
497    //The file system can be either local, mini dfs, or if the configuration
498    //is supplied externally, it can be an external cluster FS. If it is a local
499    //file system, the tests should use getBaseTestDir, otherwise, we can use
500    //the working directory, and create a unique sub dir there
501    FileSystem fs = getTestFileSystem();
502    Path newDataTestDir;
503    String randomStr = getRandomUUID().toString();
504    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
505      newDataTestDir = new Path(getDataTestDir(), randomStr);
506      File dataTestDir = new File(newDataTestDir.toString());
507      if (deleteOnExit()) dataTestDir.deleteOnExit();
508    } else {
509      Path base = getBaseTestDirOnTestFS();
510      newDataTestDir = new Path(base, randomStr);
511      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
512    }
513    return newDataTestDir;
514  }
515
516  /**
517   * Cleans the test data directory on the test filesystem.
518   * @return True if we removed the test dirs
519   * @throws IOException
520   */
521  public boolean cleanupDataTestDirOnTestFS() throws IOException {
522    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
523    if (ret)
524      dataTestDirOnTestFS = null;
525    return ret;
526  }
527
528  /**
529   * Cleans a subdirectory under the test data directory on the test filesystem.
530   * @return True if we removed child
531   * @throws IOException
532   */
533  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
534    Path cpath = getDataTestDirOnTestFS(subdirName);
535    return getTestFileSystem().delete(cpath, true);
536  }
537
538  /**
539   * Start a minidfscluster.
540   * @param servers How many DNs to start.
541   * @throws Exception
542   * @see #shutdownMiniDFSCluster()
543   * @return The mini dfs cluster created.
544   */
545  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
546    return startMiniDFSCluster(servers, null);
547  }
548
549  /**
550   * Start a minidfscluster.
551   * This is useful if you want to run datanode on distinct hosts for things
552   * like HDFS block location verification.
553   * If you start MiniDFSCluster without host names, all instances of the
554   * datanodes will have the same host name.
555   * @param hosts hostnames DNs to run on.
556   * @throws Exception
557   * @see #shutdownMiniDFSCluster()
558   * @return The mini dfs cluster created.
559   */
560  public MiniDFSCluster startMiniDFSCluster(final String hosts[])
561  throws Exception {
562    if ( hosts != null && hosts.length != 0) {
563      return startMiniDFSCluster(hosts.length, hosts);
564    } else {
565      return startMiniDFSCluster(1, null);
566    }
567  }
568
569  /**
570   * Start a minidfscluster.
571   * Can only create one.
572   * @param servers How many DNs to start.
573   * @param hosts hostnames DNs to run on.
574   * @throws Exception
575   * @see #shutdownMiniDFSCluster()
576   * @return The mini dfs cluster created.
577   */
578  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
579  throws Exception {
580    return startMiniDFSCluster(servers, null, hosts);
581  }
582
583  private void setFs() throws IOException {
584    if(this.dfsCluster == null){
585      LOG.info("Skipping setting fs because dfsCluster is null");
586      return;
587    }
588    FileSystem fs = this.dfsCluster.getFileSystem();
589    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
590
591    // re-enable this check with dfs
592    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
593  }
594
595  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
596      throws Exception {
597    createDirsAndSetProperties();
598    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
599
600    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
601    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
602    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
603      "ERROR");
604
605    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
606        true, null, racks, hosts, null);
607
608    // Set this just-started cluster as our filesystem.
609    setFs();
610
611    // Wait for the cluster to be totally up
612    this.dfsCluster.waitClusterUp();
613
614    //reset the test directory for test file system
615    dataTestDirOnTestFS = null;
616    String dataTestDir = getDataTestDir().toString();
617    conf.set(HConstants.HBASE_DIR, dataTestDir);
618    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
619
620    return this.dfsCluster;
621  }
622
623  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
624    createDirsAndSetProperties();
625    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
626    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
627    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
628      "ERROR");
629    dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
630        null, null, null);
631    return dfsCluster;
632  }
633
634  /**
635   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
636   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
637   * the conf.
638   * <pre>
639   * Configuration conf = TEST_UTIL.getConfiguration();
640   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
641   *   Map.Entry&lt;String, String&gt; e = i.next();
642   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
643   * }
644   * </pre>
645   */
646  private void createDirsAndSetProperties() throws IOException {
647    setupClusterTestDir();
648    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
649    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
650    createDirAndSetProperty("test.cache.data");
651    createDirAndSetProperty("hadoop.tmp.dir");
652    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
653    createDirAndSetProperty("mapreduce.cluster.local.dir");
654    createDirAndSetProperty("mapreduce.cluster.temp.dir");
655    enableShortCircuit();
656
657    Path root = getDataTestDirOnTestFS("hadoop");
658    conf.set(MapreduceTestingShim.getMROutputDirProp(),
659      new Path(root, "mapred-output-dir").toString());
660    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
661    conf.set("mapreduce.jobtracker.staging.root.dir",
662      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
663    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
664    conf.set("yarn.app.mapreduce.am.staging-dir",
665      new Path(root, "mapreduce-am-staging-root-dir").toString());
666
667    // Frustrate yarn's and hdfs's attempts at writing /tmp.
668    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
669    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
670    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
671    createDirAndSetProperty("yarn.nodemanager.log-dirs");
672    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
673    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
674    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
675    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
676    createDirAndSetProperty("dfs.journalnode.edits.dir");
677    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
678    createDirAndSetProperty("nfs.dump.dir");
679    createDirAndSetProperty("java.io.tmpdir");
680    createDirAndSetProperty("dfs.journalnode.edits.dir");
681    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
682    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
683  }
684
685  /**
686   *  Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating
687   *  new column families. Default to false.
688   */
689  public boolean isNewVersionBehaviorEnabled(){
690    final String propName = "hbase.tests.new.version.behavior";
691    String v = System.getProperty(propName);
692    if (v != null){
693      return Boolean.parseBoolean(v);
694    }
695    return false;
696  }
697
698  /**
699   *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
700   *  This allows to specify this parameter on the command line.
701   *   If not set, default is true.
702   */
703  public boolean isReadShortCircuitOn(){
704    final String propName = "hbase.tests.use.shortcircuit.reads";
705    String readOnProp = System.getProperty(propName);
706    if (readOnProp != null){
707      return  Boolean.parseBoolean(readOnProp);
708    } else {
709      return conf.getBoolean(propName, false);
710    }
711  }
712
713  /** Enable the short circuit read, unless configured differently.
714   * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
715   */
716  private void enableShortCircuit() {
717    if (isReadShortCircuitOn()) {
718      String curUser = System.getProperty("user.name");
719      LOG.info("read short circuit is ON for user " + curUser);
720      // read short circuit, for hdfs
721      conf.set("dfs.block.local-path-access.user", curUser);
722      // read short circuit, for hbase
723      conf.setBoolean("dfs.client.read.shortcircuit", true);
724      // Skip checking checksum, for the hdfs client and the datanode
725      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
726    } else {
727      LOG.info("read short circuit is OFF");
728    }
729  }
730
731  private String createDirAndSetProperty(final String property) {
732    return createDirAndSetProperty(property, property);
733  }
734
735  private String createDirAndSetProperty(final String relPath, String property) {
736    String path = getDataTestDir(relPath).toString();
737    System.setProperty(property, path);
738    conf.set(property, path);
739    new File(path).mkdirs();
740    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
741    return path;
742  }
743
744  /**
745   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
746   * or does nothing.
747   * @throws IOException
748   */
749  public void shutdownMiniDFSCluster() throws IOException {
750    if (this.dfsCluster != null) {
751      // The below throws an exception per dn, AsynchronousCloseException.
752      this.dfsCluster.shutdown();
753      dfsCluster = null;
754      dataTestDirOnTestFS = null;
755      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
756    }
757  }
758
759  /**
760   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
761   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
762   * @param createWALDir Whether to create a new WAL directory.
763   * @return The mini HBase cluster created.
764   * @see #shutdownMiniCluster()
765   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
766   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
767   * @see #startMiniCluster(StartMiniClusterOption)
768   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
769   */
770  @Deprecated
771  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
772    StartMiniClusterOption option = StartMiniClusterOption.builder()
773        .createWALDir(createWALDir).build();
774    return startMiniCluster(option);
775  }
776
777  /**
778   * Start up a minicluster of hbase, dfs, and zookeeper.
779   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
780   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
781   * @param createRootDir Whether to create a new root or data directory path.
782   * @return The mini HBase cluster created.
783   * @see #shutdownMiniCluster()
784   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
785   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
786   * @see #startMiniCluster(StartMiniClusterOption)
787   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
788   */
789  @Deprecated
790  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir)
791  throws Exception {
792    StartMiniClusterOption option = StartMiniClusterOption.builder()
793        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir).build();
794    return startMiniCluster(option);
795  }
796
797  /**
798   * Start up a minicluster of hbase, dfs, and zookeeper.
799   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
800   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
801   * @param createRootDir Whether to create a new root or data directory path.
802   * @param createWALDir Whether to create a new WAL directory.
803   * @return The mini HBase cluster created.
804   * @see #shutdownMiniCluster()
805   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
806   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
807   * @see #startMiniCluster(StartMiniClusterOption)
808   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
809   */
810  @Deprecated
811  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
812      boolean createWALDir) throws Exception {
813    StartMiniClusterOption option = StartMiniClusterOption.builder()
814        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir)
815        .createWALDir(createWALDir).build();
816    return startMiniCluster(option);
817  }
818
819  /**
820   * Start up a minicluster of hbase, dfs, and zookeeper.
821   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
822   * @param numMasters Master node number.
823   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
824   * @param createRootDir Whether to create a new root or data directory path.
825   * @return The mini HBase cluster created.
826   * @see #shutdownMiniCluster()
827   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
828   *  {@link #startMiniCluster(StartMiniClusterOption)} instead.
829   * @see #startMiniCluster(StartMiniClusterOption)
830   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
831   */
832  @Deprecated
833  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
834    throws Exception {
835    StartMiniClusterOption option = StartMiniClusterOption.builder()
836        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
837        .numDataNodes(numSlaves).build();
838    return startMiniCluster(option);
839  }
840
841  /**
842   * Start up a minicluster of hbase, dfs, and zookeeper.
843   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
844   * @param numMasters Master node number.
845   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
846   * @return The mini HBase cluster created.
847   * @see #shutdownMiniCluster()
848   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
849   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
850   * @see #startMiniCluster(StartMiniClusterOption)
851   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
852   */
853  @Deprecated
854  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
855    StartMiniClusterOption option = StartMiniClusterOption.builder()
856        .numMasters(numMasters).numRegionServers(numSlaves).numDataNodes(numSlaves).build();
857    return startMiniCluster(option);
858  }
859
860  /**
861   * Start up a minicluster of hbase, dfs, and zookeeper.
862   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
863   * @param numMasters Master node number.
864   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
865   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
866   *                      HDFS data node number.
867   * @param createRootDir Whether to create a new root or data directory path.
868   * @return The mini HBase cluster created.
869   * @see #shutdownMiniCluster()
870   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
871   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
872   * @see #startMiniCluster(StartMiniClusterOption)
873   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
874   */
875  @Deprecated
876  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
877      boolean createRootDir) throws Exception {
878    StartMiniClusterOption option = StartMiniClusterOption.builder()
879        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
880        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
881    return startMiniCluster(option);
882  }
883
884  /**
885   * Start up a minicluster of hbase, dfs, and zookeeper.
886   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
887   * @param numMasters Master node number.
888   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
889   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
890   *                      HDFS data node number.
891   * @return The mini HBase cluster created.
892   * @see #shutdownMiniCluster()
893   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
894   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
895   * @see #startMiniCluster(StartMiniClusterOption)
896   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
897   */
898  @Deprecated
899  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
900      throws Exception {
901    StartMiniClusterOption option = StartMiniClusterOption.builder()
902        .numMasters(numMasters).numRegionServers(numSlaves)
903        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
904    return startMiniCluster(option);
905  }
906
907  /**
908   * Start up a minicluster of hbase, dfs, and zookeeper.
909   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
910   * @param numMasters Master node number.
911   * @param numRegionServers Number of region servers.
912   * @param numDataNodes Number of datanodes.
913   * @return The mini HBase cluster created.
914   * @see #shutdownMiniCluster()
915   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
916   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
917   * @see #startMiniCluster(StartMiniClusterOption)
918   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
919   */
920  @Deprecated
921  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
922      throws Exception {
923    StartMiniClusterOption option = StartMiniClusterOption.builder()
924        .numMasters(numMasters).numRegionServers(numRegionServers).numDataNodes(numDataNodes)
925        .build();
926    return startMiniCluster(option);
927  }
928
929  /**
930   * Start up a minicluster of hbase, dfs, and zookeeper.
931   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
932   * @param numMasters Master node number.
933   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
934   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
935   *                      HDFS data node number.
936   * @param masterClass The class to use as HMaster, or null for default.
937   * @param rsClass The class to use as HRegionServer, or null for default.
938   * @return The mini HBase cluster created.
939   * @see #shutdownMiniCluster()
940   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
941   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
942   * @see #startMiniCluster(StartMiniClusterOption)
943   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
944   */
945  @Deprecated
946  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
947      Class<? extends HMaster> masterClass,
948      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
949      throws Exception {
950    StartMiniClusterOption option = StartMiniClusterOption.builder()
951        .numMasters(numMasters).masterClass(masterClass)
952        .numRegionServers(numSlaves).rsClass(rsClass)
953        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts)
954        .build();
955    return startMiniCluster(option);
956  }
957
958  /**
959   * Start up a minicluster of hbase, dfs, and zookeeper.
960   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
961   * @param numMasters Master node number.
962   * @param numRegionServers Number of region servers.
963   * @param numDataNodes Number of datanodes.
964   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
965   *                      HDFS data node number.
966   * @param masterClass The class to use as HMaster, or null for default.
967   * @param rsClass The class to use as HRegionServer, or null for default.
968   * @return The mini HBase cluster created.
969   * @see #shutdownMiniCluster()
970   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
971   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
972   * @see #startMiniCluster(StartMiniClusterOption)
973   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
974   */
975  @Deprecated
976  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
977      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
978      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
979    throws Exception {
980    StartMiniClusterOption option = StartMiniClusterOption.builder()
981        .numMasters(numMasters).masterClass(masterClass)
982        .numRegionServers(numRegionServers).rsClass(rsClass)
983        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
984        .build();
985    return startMiniCluster(option);
986  }
987
988  /**
989   * Start up a minicluster of hbase, dfs, and zookeeper.
990   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
991   * @param numMasters Master node number.
992   * @param numRegionServers Number of region servers.
993   * @param numDataNodes Number of datanodes.
994   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
995   *                      HDFS data node number.
996   * @param masterClass The class to use as HMaster, or null for default.
997   * @param rsClass The class to use as HRegionServer, or null for default.
998   * @param createRootDir Whether to create a new root or data directory path.
999   * @param createWALDir Whether to create a new WAL directory.
1000   * @return The mini HBase cluster created.
1001   * @see #shutdownMiniCluster()
1002   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1003   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
1004   * @see #startMiniCluster(StartMiniClusterOption)
1005   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1006   */
1007  @Deprecated
1008  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1009      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1010      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1011      boolean createWALDir) throws Exception {
1012    StartMiniClusterOption option = StartMiniClusterOption.builder()
1013        .numMasters(numMasters).masterClass(masterClass)
1014        .numRegionServers(numRegionServers).rsClass(rsClass)
1015        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
1016        .createRootDir(createRootDir).createWALDir(createWALDir)
1017        .build();
1018    return startMiniCluster(option);
1019  }
1020
1021  /**
1022   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number.
1023   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1024   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1025   * @see #startMiniCluster(StartMiniClusterOption option)
1026   * @see #shutdownMiniDFSCluster()
1027   */
1028  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1029    StartMiniClusterOption option = StartMiniClusterOption.builder()
1030        .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1031    return startMiniCluster(option);
1032  }
1033
1034  /**
1035   * Start up a minicluster of hbase, dfs and zookeeper all using default options.
1036   * Option default value can be found in {@link StartMiniClusterOption.Builder}.
1037   * @see #startMiniCluster(StartMiniClusterOption option)
1038   * @see #shutdownMiniDFSCluster()
1039   */
1040  public MiniHBaseCluster startMiniCluster() throws Exception {
1041    return startMiniCluster(StartMiniClusterOption.builder().build());
1042  }
1043
1044  /**
1045   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed.
1046   * It modifies Configuration.  It homes the cluster data directory under a random
1047   * subdirectory in a directory under System property test.build.data, to be cleaned up on exit.
1048   * @see #shutdownMiniDFSCluster()
1049   */
1050  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1051    LOG.info("Starting up minicluster with option: {}", option);
1052
1053    // If we already put up a cluster, fail.
1054    if (miniClusterRunning) {
1055      throw new IllegalStateException("A mini-cluster is already running");
1056    }
1057    miniClusterRunning = true;
1058
1059    setupClusterTestDir();
1060    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
1061
1062    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1063    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1064    if (dfsCluster == null) {
1065      LOG.info("STARTING DFS");
1066      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1067    } else {
1068      LOG.info("NOT STARTING DFS");
1069    }
1070
1071    // Start up a zk cluster.
1072    if (getZkCluster() == null) {
1073      startMiniZKCluster(option.getNumZkServers());
1074    }
1075
1076    // Start the MiniHBaseCluster
1077    return startMiniHBaseCluster(option);
1078  }
1079
1080  /**
1081   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1082   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1083   * @return Reference to the hbase mini hbase cluster.
1084   * @see #startMiniCluster(StartMiniClusterOption)
1085   * @see #shutdownMiniHBaseCluster()
1086   */
1087  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1088    throws IOException, InterruptedException {
1089    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1090    createRootDir(option.isCreateRootDir());
1091    if (option.isCreateWALDir()) {
1092      createWALRootDir();
1093    }
1094    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1095    // for tests that do not read hbase-defaults.xml
1096    setHBaseFsTmpDir();
1097
1098    // These settings will make the server waits until this exact number of
1099    // regions servers are connected.
1100    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1101      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1102    }
1103    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1104      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1105    }
1106
1107    // Avoid log flooded with chore execution time, see HBASE-24646 for more details.
1108    Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
1109
1110    Configuration c = new Configuration(this.conf);
1111    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1112      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1113      option.getMasterClass(), option.getRsClass());
1114    // Populate the master address configuration from mini cluster configuration.
1115    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1116    // Don't leave here till we've done a successful scan of the hbase:meta
1117    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1118      ResultScanner s = t.getScanner(new Scan())) {
1119      for (;;) {
1120        if (s.next() == null) {
1121          break;
1122        }
1123      }
1124    }
1125
1126
1127    getAdmin(); // create immediately the hbaseAdmin
1128    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1129
1130    return (MiniHBaseCluster) hbaseCluster;
1131  }
1132
1133  /**
1134   * Starts up mini hbase cluster using default options.
1135   * Default options can be found in {@link StartMiniClusterOption.Builder}.
1136   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1137   * @see #shutdownMiniHBaseCluster()
1138   */
1139  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1140    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1141  }
1142
1143  /**
1144   * Starts up mini hbase cluster.
1145   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1146   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1147   * @param numMasters Master node number.
1148   * @param numRegionServers Number of region servers.
1149   * @return The mini HBase cluster created.
1150   * @see #shutdownMiniHBaseCluster()
1151   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1152   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1153   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1154   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1155   */
1156  @Deprecated
1157  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1158      throws IOException, InterruptedException {
1159    StartMiniClusterOption option = StartMiniClusterOption.builder()
1160        .numMasters(numMasters).numRegionServers(numRegionServers).build();
1161    return startMiniHBaseCluster(option);
1162  }
1163
1164  /**
1165   * Starts up mini hbase cluster.
1166   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1167   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1168   * @param numMasters Master node number.
1169   * @param numRegionServers Number of region servers.
1170   * @param rsPorts Ports that RegionServer should use.
1171   * @return The mini HBase cluster created.
1172   * @see #shutdownMiniHBaseCluster()
1173   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1174   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1175   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1176   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1177   */
1178  @Deprecated
1179  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1180      List<Integer> rsPorts) throws IOException, InterruptedException {
1181    StartMiniClusterOption option = StartMiniClusterOption.builder()
1182        .numMasters(numMasters).numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1183    return startMiniHBaseCluster(option);
1184  }
1185
1186  /**
1187   * Starts up mini hbase cluster.
1188   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1189   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1190   * @param numMasters Master node number.
1191   * @param numRegionServers Number of region servers.
1192   * @param rsPorts Ports that RegionServer should use.
1193   * @param masterClass The class to use as HMaster, or null for default.
1194   * @param rsClass The class to use as HRegionServer, or null for default.
1195   * @param createRootDir Whether to create a new root or data directory path.
1196   * @param createWALDir Whether to create a new WAL directory.
1197   * @return The mini HBase cluster created.
1198   * @see #shutdownMiniHBaseCluster()
1199   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1200   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1201   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1202   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1203   */
1204  @Deprecated
1205  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1206      List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1207      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
1208      boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
1209    StartMiniClusterOption option = StartMiniClusterOption.builder()
1210        .numMasters(numMasters).masterClass(masterClass)
1211        .numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1212        .createRootDir(createRootDir).createWALDir(createWALDir).build();
1213    return startMiniHBaseCluster(option);
1214  }
1215
1216  /**
1217   * Starts the hbase cluster up again after shutting it down previously in a
1218   * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1219   * @param servers number of region servers
1220   */
1221  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1222    this.restartHBaseCluster(servers, null);
1223  }
1224
1225  public void restartHBaseCluster(int servers, List<Integer> ports)
1226      throws IOException, InterruptedException {
1227    StartMiniClusterOption option =
1228        StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1229    restartHBaseCluster(option);
1230    invalidateConnection();
1231  }
1232
1233  public void restartHBaseCluster(StartMiniClusterOption option)
1234      throws IOException, InterruptedException {
1235    closeConnection();
1236    this.hbaseCluster =
1237        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
1238            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
1239            option.getRsClass());
1240    // Don't leave here till we've done a successful scan of the hbase:meta
1241    Connection conn = ConnectionFactory.createConnection(this.conf);
1242    Table t = conn.getTable(TableName.META_TABLE_NAME);
1243    ResultScanner s = t.getScanner(new Scan());
1244    while (s.next() != null) {
1245      // do nothing
1246    }
1247    LOG.info("HBase has been restarted");
1248    s.close();
1249    t.close();
1250    conn.close();
1251  }
1252
1253  /**
1254   * @return Current mini hbase cluster. Only has something in it after a call
1255   * to {@link #startMiniCluster()}.
1256   * @see #startMiniCluster()
1257   */
1258  public MiniHBaseCluster getMiniHBaseCluster() {
1259    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1260      return (MiniHBaseCluster)this.hbaseCluster;
1261    }
1262    throw new RuntimeException(hbaseCluster + " not an instance of " +
1263                               MiniHBaseCluster.class.getName());
1264  }
1265
1266  /**
1267   * Stops mini hbase, zk, and hdfs clusters.
1268   * @see #startMiniCluster(int)
1269   */
1270  public void shutdownMiniCluster() throws IOException {
1271    LOG.info("Shutting down minicluster");
1272    shutdownMiniHBaseCluster();
1273    shutdownMiniDFSCluster();
1274    shutdownMiniZKCluster();
1275
1276    cleanupTestDir();
1277    miniClusterRunning = false;
1278    LOG.info("Minicluster is down");
1279  }
1280
1281  /**
1282   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1283   * @throws java.io.IOException in case command is unsuccessful
1284   */
1285  public void shutdownMiniHBaseCluster() throws IOException {
1286    cleanup();
1287    if (this.hbaseCluster != null) {
1288      this.hbaseCluster.shutdown();
1289      // Wait till hbase is down before going on to shutdown zk.
1290      this.hbaseCluster.waitUntilShutDown();
1291      this.hbaseCluster = null;
1292    }
1293    if (zooKeeperWatcher != null) {
1294      zooKeeperWatcher.close();
1295      zooKeeperWatcher = null;
1296    }
1297  }
1298
1299  /**
1300   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1301   * @throws java.io.IOException throws in case command is unsuccessful
1302   */
1303  public void killMiniHBaseCluster() throws IOException {
1304    cleanup();
1305    if (this.hbaseCluster != null) {
1306      getMiniHBaseCluster().killAll();
1307      this.hbaseCluster = null;
1308    }
1309    if (zooKeeperWatcher != null) {
1310      zooKeeperWatcher.close();
1311      zooKeeperWatcher = null;
1312    }
1313  }
1314
1315  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1316  private void cleanup() throws IOException {
1317    closeConnection();
1318    // unset the configuration for MIN and MAX RS to start
1319    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1320    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1321  }
1322
1323  /**
1324   * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1325   * is true, a new root directory path is fetched irrespective of whether it has been fetched
1326   * before or not. If false, previous path is used.
1327   * Note: this does not cause the root dir to be created.
1328   * @return Fully qualified path for the default hbase root dir
1329   * @throws IOException
1330   */
1331  public Path getDefaultRootDirPath(boolean create) throws IOException {
1332    if (!create) {
1333      return getDataTestDirOnTestFS();
1334    } else {
1335      return getNewDataTestDirOnTestFS();
1336    }
1337  }
1338
1339  /**
1340   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1341   * except that <code>create</code> flag is false.
1342   * Note: this does not cause the root dir to be created.
1343   * @return Fully qualified path for the default hbase root dir
1344   * @throws IOException
1345   */
1346  public Path getDefaultRootDirPath() throws IOException {
1347    return getDefaultRootDirPath(false);
1348  }
1349
1350  /**
1351   * Creates an hbase rootdir in user home directory.  Also creates hbase
1352   * version file.  Normally you won't make use of this method.  Root hbasedir
1353   * is created for you as part of mini cluster startup.  You'd only use this
1354   * method if you were doing manual operation.
1355   * @param create This flag decides whether to get a new
1356   * root or data directory path or not, if it has been fetched already.
1357   * Note : Directory will be made irrespective of whether path has been fetched or not.
1358   * If directory already exists, it will be overwritten
1359   * @return Fully qualified path to hbase root dir
1360   * @throws IOException
1361   */
1362  public Path createRootDir(boolean create) throws IOException {
1363    FileSystem fs = FileSystem.get(this.conf);
1364    Path hbaseRootdir = getDefaultRootDirPath(create);
1365    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1366    fs.mkdirs(hbaseRootdir);
1367    FSUtils.setVersion(fs, hbaseRootdir);
1368    return hbaseRootdir;
1369  }
1370
1371  /**
1372   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1373   * except that <code>create</code> flag is false.
1374   * @return Fully qualified path to hbase root dir
1375   * @throws IOException
1376   */
1377  public Path createRootDir() throws IOException {
1378    return createRootDir(false);
1379  }
1380
1381  /**
1382   * Creates a hbase walDir in the user's home directory.
1383   * Normally you won't make use of this method. Root hbaseWALDir
1384   * is created for you as part of mini cluster startup. You'd only use this
1385   * method if you were doing manual operation.
1386   *
1387   * @return Fully qualified path to hbase root dir
1388   * @throws IOException
1389  */
1390  public Path createWALRootDir() throws IOException {
1391    FileSystem fs = FileSystem.get(this.conf);
1392    Path walDir = getNewDataTestDirOnTestFS();
1393    CommonFSUtils.setWALRootDir(this.conf, walDir);
1394    fs.mkdirs(walDir);
1395    return walDir;
1396  }
1397
1398  private void setHBaseFsTmpDir() throws IOException {
1399    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1400    if (hbaseFsTmpDirInString == null) {
1401      this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1402      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1403    } else {
1404      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1405    }
1406  }
1407
1408  /**
1409   * Flushes all caches in the mini hbase cluster
1410   */
1411  public void flush() throws IOException {
1412    getMiniHBaseCluster().flushcache();
1413  }
1414
1415  /**
1416   * Flushes all caches in the mini hbase cluster
1417   */
1418  public void flush(TableName tableName) throws IOException {
1419    getMiniHBaseCluster().flushcache(tableName);
1420  }
1421
1422  /**
1423   * Compact all regions in the mini hbase cluster
1424   */
1425  public void compact(boolean major) throws IOException {
1426    getMiniHBaseCluster().compact(major);
1427  }
1428
1429  /**
1430   * Compact all of a table's reagion in the mini hbase cluster
1431   */
1432  public void compact(TableName tableName, boolean major) throws IOException {
1433    getMiniHBaseCluster().compact(tableName, major);
1434  }
1435
1436  /**
1437   * Create a table.
1438   * @param tableName
1439   * @param family
1440   * @return A Table instance for the created table.
1441   * @throws IOException
1442   */
1443  public Table createTable(TableName tableName, String family)
1444  throws IOException{
1445    return createTable(tableName, new String[]{family});
1446  }
1447
1448  /**
1449   * Create a table.
1450   * @param tableName
1451   * @param families
1452   * @return A Table instance for the created table.
1453   * @throws IOException
1454   */
1455  public Table createTable(TableName tableName, String[] families)
1456  throws IOException {
1457    List<byte[]> fams = new ArrayList<>(families.length);
1458    for (String family : families) {
1459      fams.add(Bytes.toBytes(family));
1460    }
1461    return createTable(tableName, fams.toArray(new byte[0][]));
1462  }
1463
1464  /**
1465   * Create a table.
1466   * @param tableName
1467   * @param family
1468   * @return A Table instance for the created table.
1469   * @throws IOException
1470   */
1471  public Table createTable(TableName tableName, byte[] family)
1472  throws IOException{
1473    return createTable(tableName, new byte[][]{family});
1474  }
1475
1476  /**
1477   * Create a table with multiple regions.
1478   * @param tableName
1479   * @param family
1480   * @param numRegions
1481   * @return A Table instance for the created table.
1482   * @throws IOException
1483   */
1484  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1485      throws IOException {
1486    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1487    byte[] startKey = Bytes.toBytes("aaaaa");
1488    byte[] endKey = Bytes.toBytes("zzzzz");
1489    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1490
1491    return createTable(tableName, new byte[][] { family }, splitKeys);
1492  }
1493
1494  /**
1495   * Create a table.
1496   * @param tableName
1497   * @param families
1498   * @return A Table instance for the created table.
1499   * @throws IOException
1500   */
1501  public Table createTable(TableName tableName, byte[][] families)
1502  throws IOException {
1503    return createTable(tableName, families, (byte[][]) null);
1504  }
1505
1506  /**
1507   * Create a table with multiple regions.
1508   * @param tableName
1509   * @param families
1510   * @return A Table instance for the created table.
1511   * @throws IOException
1512   */
1513  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1514    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1515  }
1516
1517  /**
1518   * Create a table with multiple regions.
1519   * @param tableName
1520   * @param replicaCount replica count.
1521   * @param families
1522   * @return A Table instance for the created table.
1523   * @throws IOException
1524   */
1525  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1526    throws IOException {
1527    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1528  }
1529
1530  /**
1531   * Create a table.
1532   * @param tableName
1533   * @param families
1534   * @param splitKeys
1535   * @return A Table instance for the created table.
1536   * @throws IOException
1537   */
1538  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1539      throws IOException {
1540    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1541  }
1542
1543  /**
1544   * Create a table.
1545   * @param tableName the table name
1546   * @param families the families
1547   * @param splitKeys the splitkeys
1548   * @param replicaCount the region replica count
1549   * @return A Table instance for the created table.
1550   * @throws IOException throws IOException
1551   */
1552  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1553      int replicaCount) throws IOException {
1554    return createTable(tableName, families, splitKeys, replicaCount,
1555      new Configuration(getConfiguration()));
1556  }
1557
1558  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1559    byte[] endKey, int numRegions) throws IOException {
1560    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1561
1562    getAdmin().createTable(desc, startKey, endKey, numRegions);
1563    // HBaseAdmin only waits for regions to appear in hbase:meta we
1564    // should wait until they are assigned
1565    waitUntilAllRegionsAssigned(tableName);
1566    return getConnection().getTable(tableName);
1567  }
1568
1569  /**
1570   * Create a table.
1571   * @param c Configuration to use
1572   * @return A Table instance for the created table.
1573   */
1574  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1575    throws IOException {
1576    return createTable(htd, families, null, c);
1577  }
1578
1579  /**
1580   * Create a table.
1581   * @param htd table descriptor
1582   * @param families array of column families
1583   * @param splitKeys array of split keys
1584   * @param c Configuration to use
1585   * @return A Table instance for the created table.
1586   * @throws IOException if getAdmin or createTable fails
1587   */
1588  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1589      Configuration c) throws IOException {
1590    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1591    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1592    // on is interfering.
1593    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1594  }
1595
1596  /**
1597   * Create a table.
1598   * @param htd table descriptor
1599   * @param families array of column families
1600   * @param splitKeys array of split keys
1601   * @param type Bloom type
1602   * @param blockSize block size
1603   * @param c Configuration to use
1604   * @return A Table instance for the created table.
1605   * @throws IOException if getAdmin or createTable fails
1606   */
1607
1608  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1609      BloomType type, int blockSize, Configuration c) throws IOException {
1610    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1611    for (byte[] family : families) {
1612      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1613        .setBloomFilterType(type)
1614        .setBlocksize(blockSize);
1615      if (isNewVersionBehaviorEnabled()) {
1616          cfdb.setNewVersionBehavior(true);
1617      }
1618      builder.setColumnFamily(cfdb.build());
1619    }
1620    TableDescriptor td = builder.build();
1621    if (splitKeys != null) {
1622      getAdmin().createTable(td, splitKeys);
1623    } else {
1624      getAdmin().createTable(td);
1625    }
1626    // HBaseAdmin only waits for regions to appear in hbase:meta
1627    // we should wait until they are assigned
1628    waitUntilAllRegionsAssigned(td.getTableName());
1629    return getConnection().getTable(td.getTableName());
1630  }
1631
1632  /**
1633   * Create a table.
1634   * @param htd table descriptor
1635   * @param splitRows array of split keys
1636   * @return A Table instance for the created table.
1637   * @throws IOException
1638   */
1639  public Table createTable(TableDescriptor htd, byte[][] splitRows)
1640      throws IOException {
1641    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1642    if (isNewVersionBehaviorEnabled()) {
1643      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1644         builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family)
1645           .setNewVersionBehavior(true).build());
1646      }
1647    }
1648    if (splitRows != null) {
1649      getAdmin().createTable(builder.build(), splitRows);
1650    } else {
1651      getAdmin().createTable(builder.build());
1652    }
1653    // HBaseAdmin only waits for regions to appear in hbase:meta
1654    // we should wait until they are assigned
1655    waitUntilAllRegionsAssigned(htd.getTableName());
1656    return getConnection().getTable(htd.getTableName());
1657  }
1658
1659  /**
1660   * Create a table.
1661   * @param tableName the table name
1662   * @param families the families
1663   * @param splitKeys the split keys
1664   * @param replicaCount the replica count
1665   * @param c Configuration to use
1666   * @return A Table instance for the created table.
1667   */
1668  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1669    int replicaCount, final Configuration c) throws IOException {
1670    TableDescriptor htd =
1671      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1672    return createTable(htd, families, splitKeys, c);
1673  }
1674
1675  /**
1676   * Create a table.
1677   * @return A Table instance for the created table.
1678   */
1679  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1680    return createTable(tableName, new byte[][] { family }, numVersions);
1681  }
1682
1683  /**
1684   * Create a table.
1685   * @return A Table instance for the created table.
1686   */
1687  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1688      throws IOException {
1689    return createTable(tableName, families, numVersions, (byte[][]) null);
1690  }
1691
1692  /**
1693   * Create a table.
1694   * @return A Table instance for the created table.
1695   */
1696  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1697      byte[][] splitKeys) throws IOException {
1698    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1699    for (byte[] family : families) {
1700      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1701        .setMaxVersions(numVersions);
1702      if (isNewVersionBehaviorEnabled()) {
1703        cfBuilder.setNewVersionBehavior(true);
1704      }
1705      builder.setColumnFamily(cfBuilder.build());
1706    }
1707    if (splitKeys != null) {
1708      getAdmin().createTable(builder.build(), splitKeys);
1709    } else {
1710      getAdmin().createTable(builder.build());
1711    }
1712    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1713    // assigned
1714    waitUntilAllRegionsAssigned(tableName);
1715    return getConnection().getTable(tableName);
1716  }
1717
1718  /**
1719   * Create a table with multiple regions.
1720   * @return A Table instance for the created table.
1721   */
1722  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1723      throws IOException {
1724    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1725  }
1726
1727  /**
1728   * Create a table.
1729   * @return A Table instance for the created table.
1730   */
1731  public Table createTable(TableName tableName, byte[][] families,
1732    int numVersions, int blockSize) throws IOException {
1733    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1734    for (byte[] family : families) {
1735      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1736        .setMaxVersions(numVersions).setBlocksize(blockSize);
1737      if (isNewVersionBehaviorEnabled()) {
1738        cfBuilder.setNewVersionBehavior(true);
1739      }
1740      builder.setColumnFamily(cfBuilder.build());
1741    }
1742    getAdmin().createTable(builder.build());
1743    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1744    // assigned
1745    waitUntilAllRegionsAssigned(tableName);
1746    return getConnection().getTable(tableName);
1747  }
1748
1749  public Table createTable(TableName tableName, byte[][] families,
1750      int numVersions, int blockSize, String cpName) throws IOException {
1751    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1752    for (byte[] family : families) {
1753      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1754        .setMaxVersions(numVersions).setBlocksize(blockSize);
1755      if (isNewVersionBehaviorEnabled()) {
1756        cfBuilder.setNewVersionBehavior(true);
1757      }
1758      builder.setColumnFamily(cfBuilder.build());
1759    }
1760    if (cpName != null) {
1761      builder.setCoprocessor(cpName);
1762    }
1763    getAdmin().createTable(builder.build());
1764    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1765    // assigned
1766    waitUntilAllRegionsAssigned(tableName);
1767    return getConnection().getTable(tableName);
1768  }
1769
1770  /**
1771   * Create a table.
1772   * @return A Table instance for the created table.
1773   */
1774  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1775    throws IOException {
1776    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1777    int i = 0;
1778    for (byte[] family : families) {
1779      ColumnFamilyDescriptorBuilder cfBuilder =
1780        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1781      if (isNewVersionBehaviorEnabled()) {
1782        cfBuilder.setNewVersionBehavior(true);
1783      }
1784      builder.setColumnFamily(cfBuilder.build());
1785      i++;
1786    }
1787    getAdmin().createTable(builder.build());
1788    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1789    // assigned
1790    waitUntilAllRegionsAssigned(tableName);
1791    return getConnection().getTable(tableName);
1792  }
1793
1794  /**
1795   * Create a table.
1796   * @return A Table instance for the created table.
1797   */
1798  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1799    throws IOException {
1800    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1801    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1802    if (isNewVersionBehaviorEnabled()) {
1803      cfBuilder.setNewVersionBehavior(true);
1804    }
1805    builder.setColumnFamily(cfBuilder.build());
1806    getAdmin().createTable(builder.build(), splitRows);
1807    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1808    // assigned
1809    waitUntilAllRegionsAssigned(tableName);
1810    return getConnection().getTable(tableName);
1811  }
1812
1813  /**
1814   * Create a table with multiple regions.
1815   * @return A Table instance for the created table.
1816   */
1817  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1818    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1819  }
1820
1821  /**
1822   * Modify a table, synchronous.
1823   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1824   *   {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1825   * @see Admin#modifyTable(TableDescriptor)
1826   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1827   */
1828  @Deprecated
1829  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1830      throws IOException, InterruptedException {
1831    admin.modifyTable(desc);
1832  }
1833
1834  /**
1835   * Set the number of Region replicas.
1836   */
1837  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1838    throws IOException, InterruptedException {
1839    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1840      .setRegionReplication(replicaCount).build();
1841    admin.modifyTable(desc);
1842  }
1843
1844  /**
1845   * Drop an existing table
1846   * @param tableName existing table
1847   */
1848  public void deleteTable(TableName tableName) throws IOException {
1849    try {
1850      getAdmin().disableTable(tableName);
1851    } catch (TableNotEnabledException e) {
1852      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1853    }
1854    getAdmin().deleteTable(tableName);
1855  }
1856
1857  /**
1858   * Drop an existing table
1859   * @param tableName existing table
1860   */
1861  public void deleteTableIfAny(TableName tableName) throws IOException {
1862    try {
1863      deleteTable(tableName);
1864    } catch (TableNotFoundException e) {
1865      // ignore
1866    }
1867  }
1868
1869  // ==========================================================================
1870  // Canned table and table descriptor creation
1871
1872  public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1873  public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1874  public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1875  public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1876  private static final int MAXVERSIONS = 3;
1877
1878  public static final char FIRST_CHAR = 'a';
1879  public static final char LAST_CHAR = 'z';
1880  public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1881  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1882
1883  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1884    return createModifyableTableDescriptor(TableName.valueOf(name),
1885      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1886      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1887  }
1888
1889  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1890    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1891    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1892    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1893      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1894        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1895        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1896      if (isNewVersionBehaviorEnabled()) {
1897        cfBuilder.setNewVersionBehavior(true);
1898      }
1899      builder.setColumnFamily(cfBuilder.build());
1900    }
1901    return builder.build();
1902  }
1903
1904  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1905    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1906    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1907    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1908      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1909        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1910        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1911      if (isNewVersionBehaviorEnabled()) {
1912        cfBuilder.setNewVersionBehavior(true);
1913      }
1914      builder.setColumnFamily(cfBuilder.build());
1915    }
1916    return builder;
1917  }
1918
1919  /**
1920   * Create a table of name <code>name</code>.
1921   * @param name Name to give table.
1922   * @return Column descriptor.
1923   */
1924  public TableDescriptor createTableDescriptor(final TableName name) {
1925    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1926      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1927  }
1928
1929  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1930    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1931  }
1932
1933  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1934    int maxVersions) {
1935    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1936    for (byte[] family : families) {
1937      ColumnFamilyDescriptorBuilder cfBuilder =
1938        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1939      if (isNewVersionBehaviorEnabled()) {
1940        cfBuilder.setNewVersionBehavior(true);
1941      }
1942      builder.setColumnFamily(cfBuilder.build());
1943    }
1944    return builder.build();
1945  }
1946
1947  /**
1948   * Create an HRegion that writes to the local tmp dirs
1949   * @param desc a table descriptor indicating which table the region belongs to
1950   * @param startKey the start boundary of the region
1951   * @param endKey the end boundary of the region
1952   * @return a region that writes to local dir for testing
1953   */
1954  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1955    throws IOException {
1956    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1957      .setEndKey(endKey).build();
1958    return createLocalHRegion(hri, desc);
1959  }
1960
1961  /**
1962   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1963   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1964   */
1965  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1966    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1967  }
1968
1969  /**
1970   * Create an HRegion that writes to the local tmp dirs with specified wal
1971   * @param info regioninfo
1972   * @param conf configuration
1973   * @param desc table descriptor
1974   * @param wal wal for this region.
1975   * @return created hregion
1976   * @throws IOException
1977   */
1978  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1979      WAL wal) throws IOException {
1980    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1981  }
1982
1983  /**
1984   * @param tableName
1985   * @param startKey
1986   * @param stopKey
1987   * @param isReadOnly
1988   * @param families
1989   * @return A region on which you must call
1990   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
1991   * @throws IOException
1992   */
1993  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1994      Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1995      throws IOException {
1996    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1997        durability, wal, null, families);
1998  }
1999
2000  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
2001    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
2002    boolean[] compactedMemStore, byte[]... families) throws IOException {
2003    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
2004    builder.setReadOnly(isReadOnly);
2005    int i = 0;
2006    for (byte[] family : families) {
2007      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
2008      if (compactedMemStore != null && i < compactedMemStore.length) {
2009        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
2010      } else {
2011        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
2012
2013      }
2014      i++;
2015      // Set default to be three versions.
2016      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
2017      builder.setColumnFamily(cfBuilder.build());
2018    }
2019    builder.setDurability(durability);
2020    RegionInfo info =
2021      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
2022    return createLocalHRegion(info, conf, builder.build(), wal);
2023  }
2024
2025  //
2026  // ==========================================================================
2027
2028  /**
2029   * Provide an existing table name to truncate.
2030   * Scans the table and issues a delete for each row read.
2031   * @param tableName existing table
2032   * @return HTable to that new table
2033   * @throws IOException
2034   */
2035  public Table deleteTableData(TableName tableName) throws IOException {
2036    Table table = getConnection().getTable(tableName);
2037    Scan scan = new Scan();
2038    ResultScanner resScan = table.getScanner(scan);
2039    for(Result res : resScan) {
2040      Delete del = new Delete(res.getRow());
2041      table.delete(del);
2042    }
2043    resScan = table.getScanner(scan);
2044    resScan.close();
2045    return table;
2046  }
2047
2048  /**
2049   * Truncate a table using the admin command.
2050   * Effectively disables, deletes, and recreates the table.
2051   * @param tableName table which must exist.
2052   * @param preserveRegions keep the existing split points
2053   * @return HTable for the new table
2054   */
2055  public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws
2056      IOException {
2057    Admin admin = getAdmin();
2058    if (!admin.isTableDisabled(tableName)) {
2059      admin.disableTable(tableName);
2060    }
2061    admin.truncateTable(tableName, preserveRegions);
2062    return getConnection().getTable(tableName);
2063  }
2064
2065  /**
2066   * Truncate a table using the admin command.
2067   * Effectively disables, deletes, and recreates the table.
2068   * For previous behavior of issuing row deletes, see
2069   * deleteTableData.
2070   * Expressly does not preserve regions of existing table.
2071   * @param tableName table which must exist.
2072   * @return HTable for the new table
2073   */
2074  public Table truncateTable(final TableName tableName) throws IOException {
2075    return truncateTable(tableName, false);
2076  }
2077
2078  /**
2079   * Load table with rows from 'aaa' to 'zzz'.
2080   * @param t Table
2081   * @param f Family
2082   * @return Count of rows loaded.
2083   * @throws IOException
2084   */
2085  public int loadTable(final Table t, final byte[] f) throws IOException {
2086    return loadTable(t, new byte[][] {f});
2087  }
2088
2089  /**
2090   * Load table with rows from 'aaa' to 'zzz'.
2091   * @param t Table
2092   * @param f Family
2093   * @return Count of rows loaded.
2094   * @throws IOException
2095   */
2096  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2097    return loadTable(t, new byte[][] {f}, null, writeToWAL);
2098  }
2099
2100  /**
2101   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2102   * @param t Table
2103   * @param f Array of Families to load
2104   * @return Count of rows loaded.
2105   * @throws IOException
2106   */
2107  public int loadTable(final Table t, final byte[][] f) throws IOException {
2108    return loadTable(t, f, null);
2109  }
2110
2111  /**
2112   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2113   * @param t Table
2114   * @param f Array of Families to load
2115   * @param value the values of the cells. If null is passed, the row key is used as value
2116   * @return Count of rows loaded.
2117   * @throws IOException
2118   */
2119  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2120    return loadTable(t, f, value, true);
2121  }
2122
2123  /**
2124   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2125   * @param t Table
2126   * @param f Array of Families to load
2127   * @param value the values of the cells. If null is passed, the row key is used as value
2128   * @return Count of rows loaded.
2129   * @throws IOException
2130   */
2131  public int loadTable(final Table t, final byte[][] f, byte[] value,
2132      boolean writeToWAL) throws IOException {
2133    List<Put> puts = new ArrayList<>();
2134    for (byte[] row : HBaseTestingUtility.ROWS) {
2135      Put put = new Put(row);
2136      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2137      for (int i = 0; i < f.length; i++) {
2138        byte[] value1 = value != null ? value : row;
2139        put.addColumn(f[i], f[i], value1);
2140      }
2141      puts.add(put);
2142    }
2143    t.put(puts);
2144    return puts.size();
2145  }
2146
2147  /** A tracker for tracking and validating table rows
2148   * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])}
2149   */
2150  public static class SeenRowTracker {
2151    int dim = 'z' - 'a' + 1;
2152    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2153    byte[] startRow;
2154    byte[] stopRow;
2155
2156    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2157      this.startRow = startRow;
2158      this.stopRow = stopRow;
2159    }
2160
2161    void reset() {
2162      for (byte[] row : ROWS) {
2163        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2164      }
2165    }
2166
2167    int i(byte b) {
2168      return b - 'a';
2169    }
2170
2171    public void addRow(byte[] row) {
2172      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2173    }
2174
2175    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2176     * all other rows none
2177     */
2178    public void validate() {
2179      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2180        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2181          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2182            int count = seenRows[i(b1)][i(b2)][i(b3)];
2183            int expectedCount = 0;
2184            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2185                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2186              expectedCount = 1;
2187            }
2188            if (count != expectedCount) {
2189              String row = new String(new byte[] {b1,b2,b3}, StandardCharsets.UTF_8);
2190              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
2191                  "instead of " + expectedCount);
2192            }
2193          }
2194        }
2195      }
2196    }
2197  }
2198
2199  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2200    return loadRegion(r, f, false);
2201  }
2202
2203  public int loadRegion(final Region r, final byte[] f) throws IOException {
2204    return loadRegion((HRegion)r, f);
2205  }
2206
2207  /**
2208   * Load region with rows from 'aaa' to 'zzz'.
2209   * @param r Region
2210   * @param f Family
2211   * @param flush flush the cache if true
2212   * @return Count of rows loaded.
2213   * @throws IOException
2214   */
2215  public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2216  throws IOException {
2217    byte[] k = new byte[3];
2218    int rowCount = 0;
2219    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2220      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2221        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2222          k[0] = b1;
2223          k[1] = b2;
2224          k[2] = b3;
2225          Put put = new Put(k);
2226          put.setDurability(Durability.SKIP_WAL);
2227          put.addColumn(f, null, k);
2228          if (r.getWAL() == null) {
2229            put.setDurability(Durability.SKIP_WAL);
2230          }
2231          int preRowCount = rowCount;
2232          int pause = 10;
2233          int maxPause = 1000;
2234          while (rowCount == preRowCount) {
2235            try {
2236              r.put(put);
2237              rowCount++;
2238            } catch (RegionTooBusyException e) {
2239              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2240              Threads.sleep(pause);
2241            }
2242          }
2243        }
2244      }
2245      if (flush) {
2246        r.flush(true);
2247      }
2248    }
2249    return rowCount;
2250  }
2251
2252  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2253      throws IOException {
2254    for (int i = startRow; i < endRow; i++) {
2255      byte[] data = Bytes.toBytes(String.valueOf(i));
2256      Put put = new Put(data);
2257      put.addColumn(f, null, data);
2258      t.put(put);
2259    }
2260  }
2261
2262  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2263      throws IOException {
2264    Random r = new Random();
2265    byte[] row = new byte[rowSize];
2266    for (int i = 0; i < totalRows; i++) {
2267      r.nextBytes(row);
2268      Put put = new Put(row);
2269      put.addColumn(f, new byte[]{0}, new byte[]{0});
2270      t.put(put);
2271    }
2272  }
2273
2274  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2275      int replicaId)
2276      throws IOException {
2277    for (int i = startRow; i < endRow; i++) {
2278      String failMsg = "Failed verification of row :" + i;
2279      byte[] data = Bytes.toBytes(String.valueOf(i));
2280      Get get = new Get(data);
2281      get.setReplicaId(replicaId);
2282      get.setConsistency(Consistency.TIMELINE);
2283      Result result = table.get(get);
2284      if (!result.containsColumn(f, null)) {
2285        throw new AssertionError(failMsg);
2286      }
2287      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2288      Cell cell = result.getColumnLatestCell(f, null);
2289      if (!Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2290        cell.getValueLength())) {
2291        throw new AssertionError(failMsg);
2292      }
2293    }
2294  }
2295
2296  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2297      throws IOException {
2298    verifyNumericRows((HRegion)region, f, startRow, endRow);
2299  }
2300
2301  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2302      throws IOException {
2303    verifyNumericRows(region, f, startRow, endRow, true);
2304  }
2305
2306  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2307      final boolean present) throws IOException {
2308    verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2309  }
2310
2311  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2312      final boolean present) throws IOException {
2313    for (int i = startRow; i < endRow; i++) {
2314      String failMsg = "Failed verification of row :" + i;
2315      byte[] data = Bytes.toBytes(String.valueOf(i));
2316      Result result = region.get(new Get(data));
2317
2318      boolean hasResult = result != null && !result.isEmpty();
2319      if (present != hasResult) {
2320        throw new AssertionError(
2321          failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">");
2322      }
2323      if (!present) continue;
2324
2325      if (!result.containsColumn(f, null)) {
2326        throw new AssertionError(failMsg);
2327      }
2328      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2329      Cell cell = result.getColumnLatestCell(f, null);
2330      if (!Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2331        cell.getValueLength())) {
2332        throw new AssertionError(failMsg);
2333      }
2334    }
2335  }
2336
2337  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2338      throws IOException {
2339    for (int i = startRow; i < endRow; i++) {
2340      byte[] data = Bytes.toBytes(String.valueOf(i));
2341      Delete delete = new Delete(data);
2342      delete.addFamily(f);
2343      t.delete(delete);
2344    }
2345  }
2346
2347  /**
2348   * Return the number of rows in the given table.
2349   * @param table to count rows
2350   * @return count of rows
2351   */
2352  public static int countRows(final Table table) throws IOException {
2353    return countRows(table, new Scan());
2354  }
2355
2356  public static int countRows(final Table table, final Scan scan) throws IOException {
2357    try (ResultScanner results = table.getScanner(scan)) {
2358      int count = 0;
2359      while (results.next() != null) {
2360        count++;
2361      }
2362      return count;
2363    }
2364  }
2365
2366  public int countRows(final Table table, final byte[]... families) throws IOException {
2367    Scan scan = new Scan();
2368    for (byte[] family: families) {
2369      scan.addFamily(family);
2370    }
2371    return countRows(table, scan);
2372  }
2373
2374  /**
2375   * Return the number of rows in the given table.
2376   */
2377  public int countRows(final TableName tableName) throws IOException {
2378    Table table = getConnection().getTable(tableName);
2379    try {
2380      return countRows(table);
2381    } finally {
2382      table.close();
2383    }
2384  }
2385
2386  public int countRows(final Region region) throws IOException {
2387    return countRows(region, new Scan());
2388  }
2389
2390  public int countRows(final Region region, final Scan scan) throws IOException {
2391    InternalScanner scanner = region.getScanner(scan);
2392    try {
2393      return countRows(scanner);
2394    } finally {
2395      scanner.close();
2396    }
2397  }
2398
2399  public int countRows(final InternalScanner scanner) throws IOException {
2400    int scannedCount = 0;
2401    List<Cell> results = new ArrayList<>();
2402    boolean hasMore = true;
2403    while (hasMore) {
2404      hasMore = scanner.next(results);
2405      scannedCount += results.size();
2406      results.clear();
2407    }
2408    return scannedCount;
2409  }
2410
2411  /**
2412   * Return an md5 digest of the entire contents of a table.
2413   */
2414  public String checksumRows(final Table table) throws Exception {
2415
2416    Scan scan = new Scan();
2417    ResultScanner results = table.getScanner(scan);
2418    MessageDigest digest = MessageDigest.getInstance("MD5");
2419    for (Result res : results) {
2420      digest.update(res.getRow());
2421    }
2422    results.close();
2423    return digest.toString();
2424  }
2425
2426  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2427  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2428  static {
2429    int i = 0;
2430    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2431      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2432        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2433          ROWS[i][0] = b1;
2434          ROWS[i][1] = b2;
2435          ROWS[i][2] = b3;
2436          i++;
2437        }
2438      }
2439    }
2440  }
2441
2442  public static final byte[][] KEYS = {
2443    HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2444    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2445    Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2446    Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2447    Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2448    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2449    Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2450    Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2451    Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2452  };
2453
2454  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2455      Bytes.toBytes("bbb"),
2456      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2457      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2458      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2459      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2460      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2461      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2462      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2463      Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2464  };
2465
2466  /**
2467   * Create rows in hbase:meta for regions of the specified table with the specified
2468   * start keys.  The first startKey should be a 0 length byte array if you
2469   * want to form a proper range of regions.
2470   * @param conf
2471   * @param htd
2472   * @param startKeys
2473   * @return list of region info for regions added to meta
2474   * @throws IOException
2475   */
2476  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2477      final TableDescriptor htd, byte [][] startKeys)
2478  throws IOException {
2479    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2480    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2481    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2482    MetaTableAccessor
2483        .updateTableState(getConnection(), htd.getTableName(), TableState.State.ENABLED);
2484    // add custom ones
2485    for (int i = 0; i < startKeys.length; i++) {
2486      int j = (i + 1) % startKeys.length;
2487      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName())
2488          .setStartKey(startKeys[i])
2489          .setEndKey(startKeys[j])
2490          .build();
2491      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2492      newRegions.add(hri);
2493    }
2494
2495    meta.close();
2496    return newRegions;
2497  }
2498
2499  /**
2500   * Create an unmanaged WAL. Be sure to close it when you're through.
2501   */
2502  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2503      throws IOException {
2504    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2505    // unless I pass along via the conf.
2506    Configuration confForWAL = new Configuration(conf);
2507    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2508    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2509  }
2510
2511
2512  /**
2513   * Create a region with it's own WAL. Be sure to call
2514   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2515   */
2516  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2517      final Configuration conf, final TableDescriptor htd) throws IOException {
2518    return createRegionAndWAL(info, rootDir, conf, htd, true);
2519  }
2520
2521  /**
2522   * Create a region with it's own WAL. Be sure to call
2523   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2524   */
2525  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2526      final Configuration conf, final TableDescriptor htd, BlockCache blockCache)
2527      throws IOException {
2528    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2529    region.setBlockCache(blockCache);
2530    region.initialize();
2531    return region;
2532  }
2533  /**
2534   * Create a region with it's own WAL. Be sure to call
2535   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2536   */
2537  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2538      final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2539      throws IOException {
2540    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2541    region.setMobFileCache(mobFileCache);
2542    region.initialize();
2543    return region;
2544  }
2545
2546  /**
2547   * Create a region with it's own WAL. Be sure to call
2548   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2549   */
2550  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2551      final Configuration conf, final TableDescriptor htd, boolean initialize)
2552      throws IOException {
2553    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
2554      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2555    WAL wal = createWal(conf, rootDir, info);
2556    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2557  }
2558
2559  /**
2560   * Returns all rows from the hbase:meta table.
2561   *
2562   * @throws IOException When reading the rows fails.
2563   */
2564  public List<byte[]> getMetaTableRows() throws IOException {
2565    // TODO: Redo using MetaTableAccessor class
2566    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2567    List<byte[]> rows = new ArrayList<>();
2568    ResultScanner s = t.getScanner(new Scan());
2569    for (Result result : s) {
2570      LOG.info("getMetaTableRows: row -> " +
2571        Bytes.toStringBinary(result.getRow()));
2572      rows.add(result.getRow());
2573    }
2574    s.close();
2575    t.close();
2576    return rows;
2577  }
2578
2579  /**
2580   * Returns all rows from the hbase:meta table for a given user table
2581   *
2582   * @throws IOException When reading the rows fails.
2583   */
2584  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2585    // TODO: Redo using MetaTableAccessor.
2586    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2587    List<byte[]> rows = new ArrayList<>();
2588    ResultScanner s = t.getScanner(new Scan());
2589    for (Result result : s) {
2590      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2591      if (info == null) {
2592        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2593        // TODO figure out what to do for this new hosed case.
2594        continue;
2595      }
2596
2597      if (info.getTable().equals(tableName)) {
2598        LOG.info("getMetaTableRows: row -> " +
2599            Bytes.toStringBinary(result.getRow()) + info);
2600        rows.add(result.getRow());
2601      }
2602    }
2603    s.close();
2604    t.close();
2605    return rows;
2606  }
2607
2608  /**
2609   * Returns all regions of the specified table
2610   *
2611   * @param tableName the table name
2612   * @return all regions of the specified table
2613   * @throws IOException when getting the regions fails.
2614   */
2615  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2616    try (Admin admin = getConnection().getAdmin()) {
2617      return admin.getRegions(tableName);
2618    }
2619  }
2620
2621  /*
2622   * Find any other region server which is different from the one identified by parameter
2623   * @param rs
2624   * @return another region server
2625   */
2626  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2627    for (JVMClusterUtil.RegionServerThread rst :
2628      getMiniHBaseCluster().getRegionServerThreads()) {
2629      if (!(rst.getRegionServer() == rs)) {
2630        return rst.getRegionServer();
2631      }
2632    }
2633    return null;
2634  }
2635
2636  /**
2637   * Tool to get the reference to the region server object that holds the
2638   * region of the specified user table.
2639   * @param tableName user table to lookup in hbase:meta
2640   * @return region server that holds it, null if the row doesn't exist
2641   * @throws IOException
2642   * @throws InterruptedException
2643   */
2644  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2645      throws IOException, InterruptedException {
2646    List<RegionInfo> regions = getRegions(tableName);
2647    if (regions == null || regions.isEmpty()) {
2648      return null;
2649    }
2650    LOG.debug("Found " + regions.size() + " regions for table " +
2651        tableName);
2652
2653    byte[] firstRegionName = regions.stream()
2654        .filter(r -> !r.isOffline())
2655        .map(RegionInfo::getRegionName)
2656        .findFirst()
2657        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2658
2659    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2660    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2661      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2662    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2663      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2664    RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2665    while(retrier.shouldRetry()) {
2666      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2667      if (index != -1) {
2668        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2669      }
2670      // Came back -1.  Region may not be online yet.  Sleep a while.
2671      retrier.sleepUntilNextRetry();
2672    }
2673    return null;
2674  }
2675
2676  /**
2677   * Starts a <code>MiniMRCluster</code> with a default number of
2678   * <code>TaskTracker</code>'s.
2679   *
2680   * @throws IOException When starting the cluster fails.
2681   */
2682  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2683    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2684    conf.setIfUnset(
2685        "yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2686        "99.0");
2687    startMiniMapReduceCluster(2);
2688    return mrCluster;
2689  }
2690
2691  /**
2692   * Tasktracker has a bug where changing the hadoop.log.dir system property
2693   * will not change its internal static LOG_DIR variable.
2694   */
2695  private void forceChangeTaskLogDir() {
2696    Field logDirField;
2697    try {
2698      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2699      logDirField.setAccessible(true);
2700
2701      Field modifiersField = ReflectionUtils.getModifiersField();
2702      modifiersField.setAccessible(true);
2703      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2704
2705      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2706    } catch (SecurityException e) {
2707      throw new RuntimeException(e);
2708    } catch (NoSuchFieldException e) {
2709      throw new RuntimeException(e);
2710    } catch (IllegalArgumentException e) {
2711      throw new RuntimeException(e);
2712    } catch (IllegalAccessException e) {
2713      throw new RuntimeException(e);
2714    }
2715  }
2716
2717  /**
2718   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2719   * filesystem.
2720   * @param servers  The number of <code>TaskTracker</code>'s to start.
2721   * @throws IOException When starting the cluster fails.
2722   */
2723  private void startMiniMapReduceCluster(final int servers) throws IOException {
2724    if (mrCluster != null) {
2725      throw new IllegalStateException("MiniMRCluster is already running");
2726    }
2727    LOG.info("Starting mini mapreduce cluster...");
2728    setupClusterTestDir();
2729    createDirsAndSetProperties();
2730
2731    forceChangeTaskLogDir();
2732
2733    //// hadoop2 specific settings
2734    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2735    // we up the VM usable so that processes don't get killed.
2736    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2737
2738    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2739    // this avoids the problem by disabling speculative task execution in tests.
2740    conf.setBoolean("mapreduce.map.speculative", false);
2741    conf.setBoolean("mapreduce.reduce.speculative", false);
2742    ////
2743
2744    // Allow the user to override FS URI for this map-reduce cluster to use.
2745    mrCluster = new MiniMRCluster(servers,
2746      FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2747      null, null, new JobConf(this.conf));
2748    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2749    if (jobConf == null) {
2750      jobConf = mrCluster.createJobConf();
2751    }
2752
2753    jobConf.set("mapreduce.cluster.local.dir",
2754      conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2755    LOG.info("Mini mapreduce cluster started");
2756
2757    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2758    // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2759    // necessary config properties here.  YARN-129 required adding a few properties.
2760    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2761    // this for mrv2 support; mr1 ignores this
2762    conf.set("mapreduce.framework.name", "yarn");
2763    conf.setBoolean("yarn.is.minicluster", true);
2764    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2765    if (rmAddress != null) {
2766      conf.set("yarn.resourcemanager.address", rmAddress);
2767    }
2768    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2769    if (historyAddress != null) {
2770      conf.set("mapreduce.jobhistory.address", historyAddress);
2771    }
2772    String schedulerAddress =
2773      jobConf.get("yarn.resourcemanager.scheduler.address");
2774    if (schedulerAddress != null) {
2775      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2776    }
2777    String mrJobHistoryWebappAddress =
2778      jobConf.get("mapreduce.jobhistory.webapp.address");
2779    if (mrJobHistoryWebappAddress != null) {
2780      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2781    }
2782    String yarnRMWebappAddress =
2783      jobConf.get("yarn.resourcemanager.webapp.address");
2784    if (yarnRMWebappAddress != null) {
2785      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2786    }
2787  }
2788
2789  /**
2790   * Stops the previously started <code>MiniMRCluster</code>.
2791   */
2792  public void shutdownMiniMapReduceCluster() {
2793    if (mrCluster != null) {
2794      LOG.info("Stopping mini mapreduce cluster...");
2795      mrCluster.shutdown();
2796      mrCluster = null;
2797      LOG.info("Mini mapreduce cluster stopped");
2798    }
2799    // Restore configuration to point to local jobtracker
2800    conf.set("mapreduce.jobtracker.address", "local");
2801  }
2802
2803  /**
2804   * Create a stubbed out RegionServerService, mainly for getting FS.
2805   */
2806  public RegionServerServices createMockRegionServerService() throws IOException {
2807    return createMockRegionServerService((ServerName)null);
2808  }
2809
2810  /**
2811   * Create a stubbed out RegionServerService, mainly for getting FS.
2812   * This version is used by TestTokenAuthentication
2813   */
2814  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
2815      IOException {
2816    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2817    rss.setFileSystem(getTestFileSystem());
2818    rss.setRpcServer(rpc);
2819    return rss;
2820  }
2821
2822  /**
2823   * Create a stubbed out RegionServerService, mainly for getting FS.
2824   * This version is used by TestOpenRegionHandler
2825   */
2826  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2827    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2828    rss.setFileSystem(getTestFileSystem());
2829    return rss;
2830  }
2831
2832  /**
2833   * Switches the logger for the given class to DEBUG level.
2834   * @param clazz The class for which to switch to debug logging.
2835   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2836   *             HBase only uses log4j. You should do this by your own as it you know which log
2837   *             framework you are using then set the log level to debug is very easy.
2838   */
2839  @Deprecated
2840  public void enableDebug(Class<?> clazz) {
2841    Log4jUtils.enableDebug(clazz);
2842  }
2843
2844  /**
2845   * Expire the Master's session
2846   * @throws Exception
2847   */
2848  public void expireMasterSession() throws Exception {
2849    HMaster master = getMiniHBaseCluster().getMaster();
2850    expireSession(master.getZooKeeper(), false);
2851  }
2852
2853  /**
2854   * Expire a region server's session
2855   * @param index which RS
2856   */
2857  public void expireRegionServerSession(int index) throws Exception {
2858    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2859    expireSession(rs.getZooKeeper(), false);
2860    decrementMinRegionServerCount();
2861  }
2862
2863  private void decrementMinRegionServerCount() {
2864    // decrement the count for this.conf, for newly spwaned master
2865    // this.hbaseCluster shares this configuration too
2866    decrementMinRegionServerCount(getConfiguration());
2867
2868    // each master thread keeps a copy of configuration
2869    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2870      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2871    }
2872  }
2873
2874  private void decrementMinRegionServerCount(Configuration conf) {
2875    int currentCount = conf.getInt(
2876        ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2877    if (currentCount != -1) {
2878      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2879          Math.max(currentCount - 1, 1));
2880    }
2881  }
2882
2883  public void expireSession(ZKWatcher nodeZK) throws Exception {
2884   expireSession(nodeZK, false);
2885  }
2886
2887  /**
2888   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2889   * http://hbase.apache.org/book.html#trouble.zookeeper
2890   * There are issues when doing this:
2891   * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2892   * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2893   *
2894   * @param nodeZK - the ZK watcher to expire
2895   * @param checkStatus - true to check if we can create a Table with the
2896   *                    current configuration.
2897   */
2898  public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
2899    throws Exception {
2900    Configuration c = new Configuration(this.conf);
2901    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2902    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2903    byte[] password = zk.getSessionPasswd();
2904    long sessionID = zk.getSessionId();
2905
2906    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2907    //  so we create a first watcher to be sure that the
2908    //  event was sent. We expect that if our watcher receives the event
2909    //  other watchers on the same machine will get is as well.
2910    // When we ask to close the connection, ZK does not close it before
2911    //  we receive all the events, so don't have to capture the event, just
2912    //  closing the connection should be enough.
2913    ZooKeeper monitor = new ZooKeeper(quorumServers,
2914      1000, new org.apache.zookeeper.Watcher(){
2915      @Override
2916      public void process(WatchedEvent watchedEvent) {
2917        LOG.info("Monitor ZKW received event="+watchedEvent);
2918      }
2919    } , sessionID, password);
2920
2921    // Making it expire
2922    ZooKeeper newZK = new ZooKeeper(quorumServers,
2923        1000, EmptyWatcher.instance, sessionID, password);
2924
2925    //ensure that we have connection to the server before closing down, otherwise
2926    //the close session event will be eaten out before we start CONNECTING state
2927    long start = EnvironmentEdgeManager.currentTime();
2928    while (newZK.getState() != States.CONNECTED
2929         && EnvironmentEdgeManager.currentTime() - start < 1000) {
2930       Thread.sleep(1);
2931    }
2932    newZK.close();
2933    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2934
2935    // Now closing & waiting to be sure that the clients get it.
2936    monitor.close();
2937
2938    if (checkStatus) {
2939      getConnection().getTable(TableName.META_TABLE_NAME).close();
2940    }
2941  }
2942
2943  /**
2944   * Get the Mini HBase cluster.
2945   *
2946   * @return hbase cluster
2947   * @see #getHBaseClusterInterface()
2948   */
2949  public MiniHBaseCluster getHBaseCluster() {
2950    return getMiniHBaseCluster();
2951  }
2952
2953  /**
2954   * Returns the HBaseCluster instance.
2955   * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2956   * tests referring this should not assume that the cluster is a mini cluster or a
2957   * distributed one. If the test only works on a mini cluster, then specific
2958   * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2959   * need to type-cast.
2960   */
2961  public HBaseCluster getHBaseClusterInterface() {
2962    //implementation note: we should rename this method as #getHBaseCluster(),
2963    //but this would require refactoring 90+ calls.
2964    return hbaseCluster;
2965  }
2966
2967  /**
2968   * Resets the connections so that the next time getConnection() is called, a new connection is
2969   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2970   * the connection is not valid anymore.
2971   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2972   *   written, not all start() stop() calls go through this class. Most tests directly operate on
2973   *   the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
2974   *   maintain the connection state automatically. Cleaning this is a much bigger refactor.
2975   */
2976  public void invalidateConnection() throws IOException {
2977    closeConnection();
2978    // Update the master addresses if they changed.
2979    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2980    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2981    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2982        masterConfigBefore, masterConfAfter);
2983    conf.set(HConstants.MASTER_ADDRS_KEY,
2984        getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2985  }
2986
2987  /**
2988   * Get a shared Connection to the cluster.
2989   * this method is thread safe.
2990   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2991   */
2992  public Connection getConnection() throws IOException {
2993    return getAsyncConnection().toConnection();
2994  }
2995
2996  /**
2997   * Get a assigned Connection to the cluster.
2998   * this method is thread safe.
2999   * @param user assigned user
3000   * @return A Connection with assigned user.
3001   */
3002  public Connection getConnection(User user) throws IOException {
3003    return getAsyncConnection(user).toConnection();
3004  }
3005
3006  /**
3007   * Get a shared AsyncClusterConnection to the cluster.
3008   * this method is thread safe.
3009   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster.
3010   */
3011  public AsyncClusterConnection getAsyncConnection() throws IOException {
3012    try {
3013      return asyncConnection.updateAndGet(connection -> {
3014        if (connection == null) {
3015          try {
3016            User user = UserProvider.instantiate(conf).getCurrent();
3017            connection = getAsyncConnection(user);
3018          } catch(IOException ioe) {
3019            throw new UncheckedIOException("Failed to create connection", ioe);
3020          }
3021        }
3022        return connection;
3023      });
3024    } catch (UncheckedIOException exception) {
3025      throw exception.getCause();
3026    }
3027  }
3028
3029  /**
3030   * Get a assigned AsyncClusterConnection to the cluster.
3031   * this method is thread safe.
3032   * @param user assigned user
3033   * @return An AsyncClusterConnection with assigned user.
3034   */
3035  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
3036    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
3037  }
3038
3039  public void closeConnection() throws IOException {
3040    if (hbaseAdmin != null) {
3041      Closeables.close(hbaseAdmin, true);
3042      hbaseAdmin = null;
3043    }
3044    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
3045    if (asyncConnection != null) {
3046      Closeables.close(asyncConnection, true);
3047    }
3048  }
3049
3050  /**
3051   * Returns an Admin instance which is shared between HBaseTestingUtility instance users.
3052   * Closing it has no effect, it will be closed automatically when the cluster shutdowns
3053   */
3054  public Admin getAdmin() throws IOException {
3055    if (hbaseAdmin == null){
3056      this.hbaseAdmin = getConnection().getAdmin();
3057    }
3058    return hbaseAdmin;
3059  }
3060
3061  private Admin hbaseAdmin = null;
3062
3063  /**
3064   * Returns an {@link Hbck} instance. Needs be closed when done.
3065   */
3066  public Hbck getHbck() throws IOException {
3067    return getConnection().getHbck();
3068  }
3069
3070  /**
3071   * Unassign the named region.
3072   *
3073   * @param regionName  The region to unassign.
3074   */
3075  public void unassignRegion(String regionName) throws IOException {
3076    unassignRegion(Bytes.toBytes(regionName));
3077  }
3078
3079  /**
3080   * Unassign the named region.
3081   *
3082   * @param regionName  The region to unassign.
3083   */
3084  public void unassignRegion(byte[] regionName) throws IOException {
3085    getAdmin().unassign(regionName, true);
3086  }
3087
3088  /**
3089   * Closes the region containing the given row.
3090   *
3091   * @param row  The row to find the containing region.
3092   * @param table  The table to find the region.
3093   */
3094  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
3095    unassignRegionByRow(Bytes.toBytes(row), table);
3096  }
3097
3098  /**
3099   * Closes the region containing the given row.
3100   *
3101   * @param row  The row to find the containing region.
3102   * @param table  The table to find the region.
3103   * @throws IOException
3104   */
3105  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
3106    HRegionLocation hrl = table.getRegionLocation(row);
3107    unassignRegion(hrl.getRegion().getRegionName());
3108  }
3109
3110  /**
3111   * Retrieves a splittable region randomly from tableName
3112   * @param tableName name of table
3113   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3114   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3115   */
3116  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3117    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3118    int regCount = regions.size();
3119    Set<Integer> attempted = new HashSet<>();
3120    int idx;
3121    int attempts = 0;
3122    do {
3123      regions = getHBaseCluster().getRegions(tableName);
3124      if (regCount != regions.size()) {
3125        // if there was region movement, clear attempted Set
3126        attempted.clear();
3127      }
3128      regCount = regions.size();
3129      // There are chances that before we get the region for the table from an RS the region may
3130      // be going for CLOSE.  This may be because online schema change is enabled
3131      if (regCount > 0) {
3132        idx = random.nextInt(regCount);
3133        // if we have just tried this region, there is no need to try again
3134        if (attempted.contains(idx)) {
3135          continue;
3136        }
3137        HRegion region = regions.get(idx);
3138        if (region.checkSplit().isPresent()) {
3139          return region;
3140        }
3141        attempted.add(idx);
3142      }
3143      attempts++;
3144    } while (maxAttempts == -1 || attempts < maxAttempts);
3145    return null;
3146  }
3147
3148  public MiniDFSCluster getDFSCluster() {
3149    return dfsCluster;
3150  }
3151
3152  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3153    setDFSCluster(cluster, true);
3154  }
3155
3156  /**
3157   * Set the MiniDFSCluster
3158   * @param cluster cluster to use
3159   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3160   * it is set.
3161   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3162   * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3163   */
3164  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3165      throws IllegalStateException, IOException {
3166    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3167      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3168    }
3169    this.dfsCluster = cluster;
3170    this.setFs();
3171  }
3172
3173  public FileSystem getTestFileSystem() throws IOException {
3174    return HFileSystem.get(conf);
3175  }
3176
3177  /**
3178   * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3179   * (30 seconds).
3180   * @param table Table to wait on.
3181   * @throws InterruptedException
3182   * @throws IOException
3183   */
3184  public void waitTableAvailable(TableName table)
3185      throws InterruptedException, IOException {
3186    waitTableAvailable(table.getName(), 30000);
3187  }
3188
3189  public void waitTableAvailable(TableName table, long timeoutMillis)
3190      throws InterruptedException, IOException {
3191    waitFor(timeoutMillis, predicateTableAvailable(table));
3192  }
3193
3194  /**
3195   * Wait until all regions in a table have been assigned
3196   * @param table Table to wait on.
3197   * @param timeoutMillis Timeout.
3198   */
3199  public void waitTableAvailable(byte[] table, long timeoutMillis)
3200      throws InterruptedException, IOException {
3201    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3202  }
3203
3204  public String explainTableAvailability(TableName tableName) throws IOException {
3205    StringBuilder msg =
3206      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
3207    if (getHBaseCluster().getMaster().isAlive()) {
3208      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3209        .getRegionStates().getRegionAssignments();
3210      final List<Pair<RegionInfo, ServerName>> metaLocations =
3211        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3212      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3213        RegionInfo hri = metaLocation.getFirst();
3214        ServerName sn = metaLocation.getSecond();
3215        if (!assignments.containsKey(hri)) {
3216          msg.append(", region ").append(hri)
3217            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
3218        } else if (sn == null) {
3219          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
3220        } else if (!sn.equals(assignments.get(hri))) {
3221          msg.append(",  region ").append(hri)
3222            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
3223            .append(" <> ").append(assignments.get(hri));
3224        }
3225      }
3226    }
3227    return msg.toString();
3228  }
3229
3230  public String explainTableState(final TableName table, TableState.State state)
3231      throws IOException {
3232    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3233    if (tableState == null) {
3234      return "TableState in META: No table state in META for table " + table +
3235        " last state in meta (including deleted is " + findLastTableState(table) + ")";
3236    } else if (!tableState.inStates(state)) {
3237      return "TableState in META: Not " + state + " state, but " + tableState;
3238    } else {
3239      return "TableState in META: OK";
3240    }
3241  }
3242
3243  public TableState findLastTableState(final TableName table) throws IOException {
3244    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3245    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3246      @Override
3247      public boolean visit(Result r) throws IOException {
3248        if (!Arrays.equals(r.getRow(), table.getName())) {
3249          return false;
3250        }
3251        TableState state = CatalogFamilyFormat.getTableState(r);
3252        if (state != null) {
3253          lastTableState.set(state);
3254        }
3255        return true;
3256      }
3257    };
3258    MetaTableAccessor.scanMeta(getConnection(), null, null,
3259      ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
3260    return lastTableState.get();
3261  }
3262
3263  /**
3264   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3265   * regions have been all assigned.  Will timeout after default period (30 seconds)
3266   * Tolerates nonexistent table.
3267   * @param table the table to wait on.
3268   * @throws InterruptedException if interrupted while waiting
3269   * @throws IOException if an IO problem is encountered
3270   */
3271  public void waitTableEnabled(TableName table)
3272      throws InterruptedException, IOException {
3273    waitTableEnabled(table, 30000);
3274  }
3275
3276  /**
3277   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3278   * regions have been all assigned.
3279   * @see #waitTableEnabled(TableName, long)
3280   * @param table Table to wait on.
3281   * @param timeoutMillis Time to wait on it being marked enabled.
3282   * @throws InterruptedException
3283   * @throws IOException
3284   */
3285  public void waitTableEnabled(byte[] table, long timeoutMillis)
3286  throws InterruptedException, IOException {
3287    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3288  }
3289
3290  public void waitTableEnabled(TableName table, long timeoutMillis)
3291  throws IOException {
3292    waitFor(timeoutMillis, predicateTableEnabled(table));
3293  }
3294
3295  /**
3296   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3297   * Will timeout after default period (30 seconds)
3298   * @param table Table to wait on.
3299   * @throws InterruptedException
3300   * @throws IOException
3301   */
3302  public void waitTableDisabled(byte[] table)
3303          throws InterruptedException, IOException {
3304    waitTableDisabled(table, 30000);
3305  }
3306
3307  public void waitTableDisabled(TableName table, long millisTimeout)
3308          throws InterruptedException, IOException {
3309    waitFor(millisTimeout, predicateTableDisabled(table));
3310  }
3311
3312  /**
3313   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3314   * @param table Table to wait on.
3315   * @param timeoutMillis Time to wait on it being marked disabled.
3316   * @throws InterruptedException
3317   * @throws IOException
3318   */
3319  public void waitTableDisabled(byte[] table, long timeoutMillis)
3320          throws InterruptedException, IOException {
3321    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3322  }
3323
3324  /**
3325   * Make sure that at least the specified number of region servers
3326   * are running
3327   * @param num minimum number of region servers that should be running
3328   * @return true if we started some servers
3329   * @throws IOException
3330   */
3331  public boolean ensureSomeRegionServersAvailable(final int num)
3332      throws IOException {
3333    boolean startedServer = false;
3334    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3335    for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3336      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3337      startedServer = true;
3338    }
3339
3340    return startedServer;
3341  }
3342
3343
3344  /**
3345   * Make sure that at least the specified number of region servers
3346   * are running. We don't count the ones that are currently stopping or are
3347   * stopped.
3348   * @param num minimum number of region servers that should be running
3349   * @return true if we started some servers
3350   * @throws IOException
3351   */
3352  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3353    throws IOException {
3354    boolean startedServer = ensureSomeRegionServersAvailable(num);
3355
3356    int nonStoppedServers = 0;
3357    for (JVMClusterUtil.RegionServerThread rst :
3358      getMiniHBaseCluster().getRegionServerThreads()) {
3359
3360      HRegionServer hrs = rst.getRegionServer();
3361      if (hrs.isStopping() || hrs.isStopped()) {
3362        LOG.info("A region server is stopped or stopping:"+hrs);
3363      } else {
3364        nonStoppedServers++;
3365      }
3366    }
3367    for (int i=nonStoppedServers; i<num; ++i) {
3368      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3369      startedServer = true;
3370    }
3371    return startedServer;
3372  }
3373
3374
3375  /**
3376   * This method clones the passed <code>c</code> configuration setting a new
3377   * user into the clone.  Use it getting new instances of FileSystem.  Only
3378   * works for DistributedFileSystem w/o Kerberos.
3379   * @param c Initial configuration
3380   * @param differentiatingSuffix Suffix to differentiate this user from others.
3381   * @return A new configuration instance with a different user set into it.
3382   * @throws IOException
3383   */
3384  public static User getDifferentUser(final Configuration c,
3385    final String differentiatingSuffix)
3386  throws IOException {
3387    FileSystem currentfs = FileSystem.get(c);
3388    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3389      return User.getCurrent();
3390    }
3391    // Else distributed filesystem.  Make a new instance per daemon.  Below
3392    // code is taken from the AppendTestUtil over in hdfs.
3393    String username = User.getCurrent().getName() +
3394      differentiatingSuffix;
3395    User user = User.createUserForTesting(c, username,
3396        new String[]{"supergroup"});
3397    return user;
3398  }
3399
3400  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3401      throws IOException {
3402    NavigableSet<String> online = new TreeSet<>();
3403    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3404      try {
3405        for (RegionInfo region :
3406            ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3407          online.add(region.getRegionNameAsString());
3408        }
3409      } catch (RegionServerStoppedException e) {
3410        // That's fine.
3411      }
3412    }
3413    return online;
3414  }
3415
3416  /**
3417   * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3418   * makes tests linger.  Here is the exception you'll see:
3419   * <pre>
3420   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3421   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3422   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3423   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3424   * </pre>
3425   * @param stream A DFSClient.DFSOutputStream.
3426   * @param max
3427   * @throws NoSuchFieldException
3428   * @throws SecurityException
3429   * @throws IllegalAccessException
3430   * @throws IllegalArgumentException
3431   */
3432  public static void setMaxRecoveryErrorCount(final OutputStream stream,
3433      final int max) {
3434    try {
3435      Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3436      for (Class<?> clazz: clazzes) {
3437        String className = clazz.getSimpleName();
3438        if (className.equals("DFSOutputStream")) {
3439          if (clazz.isInstance(stream)) {
3440            Field maxRecoveryErrorCountField =
3441              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3442            maxRecoveryErrorCountField.setAccessible(true);
3443            maxRecoveryErrorCountField.setInt(stream, max);
3444            break;
3445          }
3446        }
3447      }
3448    } catch (Exception e) {
3449      LOG.info("Could not set max recovery field", e);
3450    }
3451  }
3452
3453  /**
3454   * Uses directly the assignment manager to assign the region. and waits until the specified region
3455   * has completed assignment.
3456   * @return true if the region is assigned false otherwise.
3457   */
3458  public boolean assignRegion(final RegionInfo regionInfo)
3459      throws IOException, InterruptedException {
3460    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3461    am.assign(regionInfo);
3462    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3463  }
3464
3465  /**
3466   * Move region to destination server and wait till region is completely moved and online
3467   *
3468   * @param destRegion region to move
3469   * @param destServer destination server of the region
3470   * @throws InterruptedException
3471   * @throws IOException
3472   */
3473  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3474      throws InterruptedException, IOException {
3475    HMaster master = getMiniHBaseCluster().getMaster();
3476    // TODO: Here we start the move. The move can take a while.
3477    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3478    while (true) {
3479      ServerName serverName = master.getAssignmentManager().getRegionStates()
3480          .getRegionServerOfRegion(destRegion);
3481      if (serverName != null && serverName.equals(destServer)) {
3482        assertRegionOnServer(destRegion, serverName, 2000);
3483        break;
3484      }
3485      Thread.sleep(10);
3486    }
3487  }
3488
3489  /**
3490   * Wait until all regions for a table in hbase:meta have a non-empty
3491   * info:server, up to a configuable timeout value (default is 60 seconds)
3492   * This means all regions have been deployed,
3493   * master has been informed and updated hbase:meta with the regions deployed
3494   * server.
3495   * @param tableName the table name
3496   * @throws IOException
3497   */
3498  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3499    waitUntilAllRegionsAssigned(tableName,
3500      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3501  }
3502
3503  /**
3504   * Waith until all system table's regions get assigned
3505   * @throws IOException
3506   */
3507  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3508    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3509  }
3510
3511  /**
3512   * Wait until all regions for a table in hbase:meta have a non-empty
3513   * info:server, or until timeout.  This means all regions have been deployed,
3514   * master has been informed and updated hbase:meta with the regions deployed
3515   * server.
3516   * @param tableName the table name
3517   * @param timeout timeout, in milliseconds
3518   * @throws IOException
3519   */
3520  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3521      throws IOException {
3522    if (!TableName.isMetaTableName(tableName)) {
3523      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3524        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " +
3525            timeout + "ms");
3526        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3527          @Override
3528          public String explainFailure() throws IOException {
3529            return explainTableAvailability(tableName);
3530          }
3531
3532          @Override
3533          public boolean evaluate() throws IOException {
3534            Scan scan = new Scan();
3535            scan.addFamily(HConstants.CATALOG_FAMILY);
3536            boolean tableFound = false;
3537            try (ResultScanner s = meta.getScanner(scan)) {
3538              for (Result r; (r = s.next()) != null;) {
3539                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3540                RegionInfo info = RegionInfo.parseFromOrNull(b);
3541                if (info != null && info.getTable().equals(tableName)) {
3542                  // Get server hosting this region from catalog family. Return false if no server
3543                  // hosting this region, or if the server hosting this region was recently killed
3544                  // (for fault tolerance testing).
3545                  tableFound = true;
3546                  byte[] server =
3547                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3548                  if (server == null) {
3549                    return false;
3550                  } else {
3551                    byte[] startCode =
3552                        r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3553                    ServerName serverName =
3554                        ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," +
3555                            Bytes.toLong(startCode));
3556                    if (!getHBaseClusterInterface().isDistributedCluster() &&
3557                        getHBaseCluster().isKilledRS(serverName)) {
3558                      return false;
3559                    }
3560                  }
3561                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3562                    return false;
3563                  }
3564                }
3565              }
3566            }
3567            if (!tableFound) {
3568              LOG.warn("Didn't find the entries for table " + tableName + " in meta, already deleted?");
3569            }
3570            return tableFound;
3571          }
3572        });
3573      }
3574    }
3575    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3576    // check from the master state if we are using a mini cluster
3577    if (!getHBaseClusterInterface().isDistributedCluster()) {
3578      // So, all regions are in the meta table but make sure master knows of the assignments before
3579      // returning -- sometimes this can lag.
3580      HMaster master = getHBaseCluster().getMaster();
3581      final RegionStates states = master.getAssignmentManager().getRegionStates();
3582      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3583        @Override
3584        public String explainFailure() throws IOException {
3585          return explainTableAvailability(tableName);
3586        }
3587
3588        @Override
3589        public boolean evaluate() throws IOException {
3590          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3591          return hris != null && !hris.isEmpty();
3592        }
3593      });
3594    }
3595    LOG.info("All regions for table " + tableName + " assigned.");
3596  }
3597
3598  /**
3599   * Do a small get/scan against one store. This is required because store
3600   * has no actual methods of querying itself, and relies on StoreScanner.
3601   */
3602  public static List<Cell> getFromStoreFile(HStore store,
3603                                                Get get) throws IOException {
3604    Scan scan = new Scan(get);
3605    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3606        scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3607        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3608        // readpoint 0.
3609        0);
3610
3611    List<Cell> result = new ArrayList<>();
3612    scanner.next(result);
3613    if (!result.isEmpty()) {
3614      // verify that we are on the row we want:
3615      Cell kv = result.get(0);
3616      if (!CellUtil.matchingRows(kv, get.getRow())) {
3617        result.clear();
3618      }
3619    }
3620    scanner.close();
3621    return result;
3622  }
3623
3624  /**
3625   * Create region split keys between startkey and endKey
3626   *
3627   * @param startKey
3628   * @param endKey
3629   * @param numRegions the number of regions to be created. it has to be greater than 3.
3630   * @return resulting split keys
3631   */
3632  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3633    if (numRegions <= 3) {
3634      throw new AssertionError();
3635    }
3636    byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3637    byte [][] result = new byte[tmpSplitKeys.length+1][];
3638    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3639    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3640    return result;
3641  }
3642
3643  /**
3644   * Do a small get/scan against one store. This is required because store
3645   * has no actual methods of querying itself, and relies on StoreScanner.
3646   */
3647  public static List<Cell> getFromStoreFile(HStore store,
3648                                                byte [] row,
3649                                                NavigableSet<byte[]> columns
3650                                                ) throws IOException {
3651    Get get = new Get(row);
3652    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3653    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3654
3655    return getFromStoreFile(store,get);
3656  }
3657
3658  public static void assertKVListsEqual(String additionalMsg,
3659      final List<? extends Cell> expected,
3660      final List<? extends Cell> actual) {
3661    final int eLen = expected.size();
3662    final int aLen = actual.size();
3663    final int minLen = Math.min(eLen, aLen);
3664
3665    int i;
3666    for (i = 0; i < minLen
3667        && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0;
3668        ++i) {}
3669
3670    if (additionalMsg == null) {
3671      additionalMsg = "";
3672    }
3673    if (!additionalMsg.isEmpty()) {
3674      additionalMsg = ". " + additionalMsg;
3675    }
3676
3677    if (eLen != aLen || i != minLen) {
3678      throw new AssertionError(
3679          "Expected and actual KV arrays differ at position " + i + ": " +
3680          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3681          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3682    }
3683  }
3684
3685  public static <T> String safeGetAsStr(List<T> lst, int i) {
3686    if (0 <= i && i < lst.size()) {
3687      return lst.get(i).toString();
3688    } else {
3689      return "<out_of_range>";
3690    }
3691  }
3692
3693  public String getClusterKey() {
3694    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3695        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3696        + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3697            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3698  }
3699
3700  /** Creates a random table with the given parameters */
3701  public Table createRandomTable(TableName tableName,
3702      final Collection<String> families,
3703      final int maxVersions,
3704      final int numColsPerRow,
3705      final int numFlushes,
3706      final int numRegions,
3707      final int numRowsPerFlush)
3708      throws IOException, InterruptedException {
3709
3710    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3711        " regions, " + numFlushes + " storefiles per region, " +
3712        numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3713        "\n");
3714
3715    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3716    final int numCF = families.size();
3717    final byte[][] cfBytes = new byte[numCF][];
3718    {
3719      int cfIndex = 0;
3720      for (String cf : families) {
3721        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3722      }
3723    }
3724
3725    final int actualStartKey = 0;
3726    final int actualEndKey = Integer.MAX_VALUE;
3727    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3728    final int splitStartKey = actualStartKey + keysPerRegion;
3729    final int splitEndKey = actualEndKey - keysPerRegion;
3730    final String keyFormat = "%08x";
3731    final Table table = createTable(tableName, cfBytes,
3732        maxVersions,
3733        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3734        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3735        numRegions);
3736
3737    if (hbaseCluster != null) {
3738      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3739    }
3740
3741    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3742
3743    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3744      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3745        final byte[] row = Bytes.toBytes(String.format(keyFormat,
3746            actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3747
3748        Put put = new Put(row);
3749        Delete del = new Delete(row);
3750        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3751          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3752          final long ts = rand.nextInt();
3753          final byte[] qual = Bytes.toBytes("col" + iCol);
3754          if (rand.nextBoolean()) {
3755            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3756                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3757                ts + "_random_" + rand.nextLong());
3758            put.addColumn(cf, qual, ts, value);
3759          } else if (rand.nextDouble() < 0.8) {
3760            del.addColumn(cf, qual, ts);
3761          } else {
3762            del.addColumns(cf, qual, ts);
3763          }
3764        }
3765
3766        if (!put.isEmpty()) {
3767          mutator.mutate(put);
3768        }
3769
3770        if (!del.isEmpty()) {
3771          mutator.mutate(del);
3772        }
3773      }
3774      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3775      mutator.flush();
3776      if (hbaseCluster != null) {
3777        getMiniHBaseCluster().flushcache(table.getName());
3778      }
3779    }
3780    mutator.close();
3781
3782    return table;
3783  }
3784
3785  public static int randomFreePort() {
3786    return HBaseCommonTestingUtility.randomFreePort();
3787  }
3788  public static String randomMultiCastAddress() {
3789    return "226.1.1." + random.nextInt(254);
3790  }
3791
3792  public static void waitForHostPort(String host, int port)
3793      throws IOException {
3794    final int maxTimeMs = 10000;
3795    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3796    IOException savedException = null;
3797    LOG.info("Waiting for server at " + host + ":" + port);
3798    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3799      try {
3800        Socket sock = new Socket(InetAddress.getByName(host), port);
3801        sock.close();
3802        savedException = null;
3803        LOG.info("Server at " + host + ":" + port + " is available");
3804        break;
3805      } catch (UnknownHostException e) {
3806        throw new IOException("Failed to look up " + host, e);
3807      } catch (IOException e) {
3808        savedException = e;
3809      }
3810      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3811    }
3812
3813    if (savedException != null) {
3814      throw savedException;
3815    }
3816  }
3817
3818  /**
3819   * Creates a pre-split table for load testing. If the table already exists,
3820   * logs a warning and continues.
3821   * @return the number of regions the table was split into
3822   */
3823  public static int createPreSplitLoadTestTable(Configuration conf,
3824      TableName tableName, byte[] columnFamily, Algorithm compression,
3825      DataBlockEncoding dataBlockEncoding) throws IOException {
3826    return createPreSplitLoadTestTable(conf, tableName,
3827      columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3828      Durability.USE_DEFAULT);
3829  }
3830  /**
3831   * Creates a pre-split table for load testing. If the table already exists,
3832   * logs a warning and continues.
3833   * @return the number of regions the table was split into
3834   */
3835  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3836    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3837    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3838    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3839    builder.setDurability(durability);
3840    builder.setRegionReplication(regionReplication);
3841    ColumnFamilyDescriptorBuilder cfBuilder =
3842      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3843    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3844    cfBuilder.setCompressionType(compression);
3845    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3846      numRegionsPerServer);
3847  }
3848
3849  /**
3850   * Creates a pre-split table for load testing. If the table already exists,
3851   * logs a warning and continues.
3852   * @return the number of regions the table was split into
3853   */
3854  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3855    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3856    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3857    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3858    builder.setDurability(durability);
3859    builder.setRegionReplication(regionReplication);
3860    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3861    for (int i = 0; i < columnFamilies.length; i++) {
3862      ColumnFamilyDescriptorBuilder cfBuilder =
3863        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3864      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3865      cfBuilder.setCompressionType(compression);
3866      hcds[i] = cfBuilder.build();
3867    }
3868    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3869  }
3870
3871  /**
3872   * Creates a pre-split table for load testing. If the table already exists,
3873   * logs a warning and continues.
3874   * @return the number of regions the table was split into
3875   */
3876  public static int createPreSplitLoadTestTable(Configuration conf,
3877      TableDescriptor desc, ColumnFamilyDescriptor hcd) throws IOException {
3878    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3879  }
3880
3881  /**
3882   * Creates a pre-split table for load testing. If the table already exists,
3883   * logs a warning and continues.
3884   * @return the number of regions the table was split into
3885   */
3886  public static int createPreSplitLoadTestTable(Configuration conf,
3887      TableDescriptor desc, ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3888    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] {hcd},
3889        numRegionsPerServer);
3890  }
3891
3892  /**
3893   * Creates a pre-split table for load testing. If the table already exists,
3894   * logs a warning and continues.
3895   * @return the number of regions the table was split into
3896   */
3897  public static int createPreSplitLoadTestTable(Configuration conf,
3898      TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
3899      int numRegionsPerServer) throws IOException {
3900    return createPreSplitLoadTestTable(conf, desc, hcds,
3901      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
3902  }
3903
3904  /**
3905   * Creates a pre-split table for load testing. If the table already exists,
3906   * logs a warning and continues.
3907   * @return the number of regions the table was split into
3908   */
3909  public static int createPreSplitLoadTestTable(Configuration conf,
3910      TableDescriptor td, ColumnFamilyDescriptor[] cds,
3911      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
3912    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3913    for (ColumnFamilyDescriptor cd : cds) {
3914      if (!td.hasColumnFamily(cd.getName())) {
3915        builder.setColumnFamily(cd);
3916      }
3917    }
3918    td = builder.build();
3919    int totalNumberOfRegions = 0;
3920    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3921    Admin admin = unmanagedConnection.getAdmin();
3922
3923    try {
3924      // create a table a pre-splits regions.
3925      // The number of splits is set as:
3926      //    region servers * regions per region server).
3927      int numberOfServers = admin.getRegionServers().size();
3928      if (numberOfServers == 0) {
3929        throw new IllegalStateException("No live regionservers");
3930      }
3931
3932      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3933      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3934          "pre-splitting table into " + totalNumberOfRegions + " regions " +
3935          "(regions per server: " + numRegionsPerServer + ")");
3936
3937      byte[][] splits = splitter.split(
3938          totalNumberOfRegions);
3939
3940      admin.createTable(td, splits);
3941    } catch (MasterNotRunningException e) {
3942      LOG.error("Master not running", e);
3943      throw new IOException(e);
3944    } catch (TableExistsException e) {
3945      LOG.warn("Table " + td.getTableName() +
3946          " already exists, continuing");
3947    } finally {
3948      admin.close();
3949      unmanagedConnection.close();
3950    }
3951    return totalNumberOfRegions;
3952  }
3953
3954  public static int getMetaRSPort(Connection connection) throws IOException {
3955    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3956      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3957    }
3958  }
3959
3960  /**
3961   * Due to async racing issue, a region may not be in the online region list of a region server
3962   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3963   */
3964  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3965    final long timeout) throws IOException, InterruptedException {
3966    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3967    while (true) {
3968      List<RegionInfo> regions = getAdmin().getRegions(server);
3969      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3970      long now = EnvironmentEdgeManager.currentTime();
3971      if (now > timeoutTime) break;
3972      Thread.sleep(10);
3973    }
3974    throw new AssertionError(
3975      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3976  }
3977
3978  /**
3979   * Check to make sure the region is open on the specified
3980   * region server, but not on any other one.
3981   */
3982  public void assertRegionOnlyOnServer(
3983      final RegionInfo hri, final ServerName server,
3984      final long timeout) throws IOException, InterruptedException {
3985    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3986    while (true) {
3987      List<RegionInfo> regions = getAdmin().getRegions(server);
3988      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3989        List<JVMClusterUtil.RegionServerThread> rsThreads =
3990          getHBaseCluster().getLiveRegionServerThreads();
3991        for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3992          HRegionServer rs = rsThread.getRegionServer();
3993          if (server.equals(rs.getServerName())) {
3994            continue;
3995          }
3996          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3997          for (HRegion r: hrs) {
3998            if (r.getRegionInfo().getRegionId() == hri.getRegionId()) {
3999              throw new AssertionError("Region should not be double assigned");
4000            }
4001          }
4002        }
4003        return; // good, we are happy
4004      }
4005      long now = EnvironmentEdgeManager.currentTime();
4006      if (now > timeoutTime) break;
4007      Thread.sleep(10);
4008    }
4009    throw new AssertionError(
4010      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
4011  }
4012
4013  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
4014    TableDescriptor td =
4015        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4016    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4017    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
4018  }
4019
4020  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
4021      BlockCache blockCache) throws IOException {
4022    TableDescriptor td =
4023        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4024    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4025    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
4026  }
4027
4028  public static void setFileSystemURI(String fsURI) {
4029    FS_URI = fsURI;
4030  }
4031
4032  /**
4033   * Returns a {@link Predicate} for checking that there are no regions in transition in master
4034   */
4035  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
4036    return new ExplainingPredicate<IOException>() {
4037      @Override
4038      public String explainFailure() throws IOException {
4039        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
4040            .getAssignmentManager().getRegionStates();
4041        return "found in transition: " + regionStates.getRegionsInTransition().toString();
4042      }
4043
4044      @Override
4045      public boolean evaluate() throws IOException {
4046        HMaster master = getMiniHBaseCluster().getMaster();
4047        if (master == null) return false;
4048        AssignmentManager am = master.getAssignmentManager();
4049        if (am == null) return false;
4050        return !am.hasRegionsInTransition();
4051      }
4052    };
4053  }
4054
4055  /**
4056   * Returns a {@link Predicate} for checking that table is enabled
4057   */
4058  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4059    return new ExplainingPredicate<IOException>() {
4060      @Override
4061      public String explainFailure() throws IOException {
4062        return explainTableState(tableName, TableState.State.ENABLED);
4063      }
4064
4065      @Override
4066      public boolean evaluate() throws IOException {
4067        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
4068      }
4069    };
4070  }
4071
4072  /**
4073   * Returns a {@link Predicate} for checking that table is enabled
4074   */
4075  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4076    return new ExplainingPredicate<IOException>() {
4077      @Override
4078      public String explainFailure() throws IOException {
4079        return explainTableState(tableName, TableState.State.DISABLED);
4080      }
4081
4082      @Override
4083      public boolean evaluate() throws IOException {
4084        return getAdmin().isTableDisabled(tableName);
4085      }
4086    };
4087  }
4088
4089  /**
4090   * Returns a {@link Predicate} for checking that table is enabled
4091   */
4092  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4093    return new ExplainingPredicate<IOException>() {
4094      @Override
4095      public String explainFailure() throws IOException {
4096        return explainTableAvailability(tableName);
4097      }
4098
4099      @Override
4100      public boolean evaluate() throws IOException {
4101        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
4102        if (tableAvailable) {
4103          try (Table table = getConnection().getTable(tableName)) {
4104            TableDescriptor htd = table.getDescriptor();
4105            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
4106                .getAllRegionLocations()) {
4107              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
4108                  .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
4109                  .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
4110              for (byte[] family : htd.getColumnFamilyNames()) {
4111                scan.addFamily(family);
4112              }
4113              try (ResultScanner scanner = table.getScanner(scan)) {
4114                scanner.next();
4115              }
4116            }
4117          }
4118        }
4119        return tableAvailable;
4120      }
4121    };
4122  }
4123
4124  /**
4125   * Wait until no regions in transition.
4126   * @param timeout How long to wait.
4127   * @throws IOException
4128   */
4129  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
4130    waitFor(timeout, predicateNoRegionsInTransition());
4131  }
4132
4133  /**
4134   * Wait until no regions in transition. (time limit 15min)
4135   * @throws IOException
4136   */
4137  public void waitUntilNoRegionsInTransition() throws IOException {
4138    waitUntilNoRegionsInTransition(15 * 60000);
4139  }
4140
4141  /**
4142   * Wait until labels is ready in VisibilityLabelsCache.
4143   * @param timeoutMillis
4144   * @param labels
4145   */
4146  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4147    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4148    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4149
4150      @Override
4151      public boolean evaluate() {
4152        for (String label : labels) {
4153          if (labelsCache.getLabelOrdinal(label) == 0) {
4154            return false;
4155          }
4156        }
4157        return true;
4158      }
4159
4160      @Override
4161      public String explainFailure() {
4162        for (String label : labels) {
4163          if (labelsCache.getLabelOrdinal(label) == 0) {
4164            return label + " is not available yet";
4165          }
4166        }
4167        return "";
4168      }
4169    });
4170  }
4171
4172  /**
4173   * Create a set of column descriptors with the combination of compression,
4174   * encoding, bloom codecs available.
4175   * @return the list of column descriptors
4176   */
4177  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4178    return generateColumnDescriptors("");
4179  }
4180
4181  /**
4182   * Create a set of column descriptors with the combination of compression,
4183   * encoding, bloom codecs available.
4184   * @param prefix family names prefix
4185   * @return the list of column descriptors
4186   */
4187  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4188    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4189    long familyId = 0;
4190    for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4191      for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4192        for (BloomType bloomType: BloomType.values()) {
4193          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4194          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4195            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4196          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4197          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4198          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4199          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4200          familyId++;
4201        }
4202      }
4203    }
4204    return columnFamilyDescriptors;
4205  }
4206
4207  /**
4208   * Get supported compression algorithms.
4209   * @return supported compression algorithms.
4210   */
4211  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4212    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4213    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4214    for (String algoName : allAlgos) {
4215      try {
4216        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4217        algo.getCompressor();
4218        supportedAlgos.add(algo);
4219      } catch (Throwable t) {
4220        // this algo is not available
4221      }
4222    }
4223    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4224  }
4225
4226  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4227    Scan scan = new Scan().withStartRow(row);
4228    scan.setReadType(ReadType.PREAD);
4229    scan.setCaching(1);
4230    scan.setReversed(true);
4231    scan.addFamily(family);
4232    try (RegionScanner scanner = r.getScanner(scan)) {
4233      List<Cell> cells = new ArrayList<>(1);
4234      scanner.next(cells);
4235      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4236        return null;
4237      }
4238      return Result.create(cells);
4239    }
4240  }
4241
4242  private boolean isTargetTable(final byte[] inRow, Cell c) {
4243    String inputRowString = Bytes.toString(inRow);
4244    int i = inputRowString.indexOf(HConstants.DELIMITER);
4245    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4246    int o = outputRowString.indexOf(HConstants.DELIMITER);
4247    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4248  }
4249
4250  /**
4251   * Sets up {@link MiniKdc} for testing security.
4252   * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4253   * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4254   * FYI, there is also the easier-to-use kerby KDC server and utility for using it,
4255   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4256   * less baggage. It came in in HBASE-5291.
4257   */
4258  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4259    Properties conf = MiniKdc.createConf();
4260    conf.put(MiniKdc.DEBUG, true);
4261    MiniKdc kdc = null;
4262    File dir = null;
4263    // There is time lag between selecting a port and trying to bind with it. It's possible that
4264    // another service captures the port in between which'll result in BindException.
4265    boolean bindException;
4266    int numTries = 0;
4267    do {
4268      try {
4269        bindException = false;
4270        dir = new File(getDataTestDir("kdc").toUri().getPath());
4271        kdc = new MiniKdc(conf, dir);
4272        kdc.start();
4273      } catch (BindException e) {
4274        FileUtils.deleteDirectory(dir);  // clean directory
4275        numTries++;
4276        if (numTries == 3) {
4277          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4278          throw e;
4279        }
4280        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4281        bindException = true;
4282      }
4283    } while (bindException);
4284    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4285    return kdc;
4286  }
4287
4288  public int getNumHFiles(final TableName tableName, final byte[] family) {
4289    int numHFiles = 0;
4290    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4291      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
4292                                    family);
4293    }
4294    return numHFiles;
4295  }
4296
4297  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4298                               final byte[] family) {
4299    int numHFiles = 0;
4300    for (Region region : rs.getRegions(tableName)) {
4301      numHFiles += region.getStore(family).getStorefilesCount();
4302    }
4303    return numHFiles;
4304  }
4305
4306  private void assertEquals(String message, int expected, int actual) {
4307    if (expected == actual) {
4308      return;
4309    }
4310    String formatted = "";
4311    if (message != null && !"".equals(message)) {
4312      formatted = message + " ";
4313    }
4314    throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">");
4315  }
4316
4317  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4318    if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) {
4319      throw new AssertionError();
4320    }
4321    assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode());
4322    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4323    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4324    assertEquals("", ltdFamilies.size(), rtdFamilies.size());
4325    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
4326      it2 = rtdFamilies.iterator(); it.hasNext();) {
4327      assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4328    }
4329  }
4330
4331  /**
4332   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4333   * invocations.
4334   */
4335  public static void await(final long sleepMillis, final BooleanSupplier condition)
4336    throws InterruptedException {
4337    try {
4338      while (!condition.getAsBoolean()) {
4339        Thread.sleep(sleepMillis);
4340      }
4341    } catch (RuntimeException e) {
4342      if (e.getCause() instanceof AssertionError) {
4343        throw (AssertionError) e.getCause();
4344      }
4345      throw e;
4346    }
4347  }
4348}