001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.UncheckedIOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.Modifier;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.function.BooleanSupplier;
055import org.apache.commons.io.FileUtils;
056import org.apache.commons.lang3.RandomStringUtils;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.fs.FileSystem;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
061import org.apache.hadoop.hbase.Waiter.Predicate;
062import org.apache.hadoop.hbase.client.Admin;
063import org.apache.hadoop.hbase.client.AsyncClusterConnection;
064import org.apache.hadoop.hbase.client.BufferedMutator;
065import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
068import org.apache.hadoop.hbase.client.Connection;
069import org.apache.hadoop.hbase.client.ConnectionFactory;
070import org.apache.hadoop.hbase.client.Consistency;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Hbck;
075import org.apache.hadoop.hbase.client.MasterRegistry;
076import org.apache.hadoop.hbase.client.Put;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionInfoBuilder;
079import org.apache.hadoop.hbase.client.RegionLocator;
080import org.apache.hadoop.hbase.client.Result;
081import org.apache.hadoop.hbase.client.ResultScanner;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.Table;
084import org.apache.hadoop.hbase.client.TableDescriptor;
085import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
086import org.apache.hadoop.hbase.client.TableState;
087import org.apache.hadoop.hbase.fs.HFileSystem;
088import org.apache.hadoop.hbase.io.compress.Compression;
089import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
090import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
091import org.apache.hadoop.hbase.io.hfile.BlockCache;
092import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
093import org.apache.hadoop.hbase.io.hfile.HFile;
094import org.apache.hadoop.hbase.ipc.RpcServerInterface;
095import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
096import org.apache.hadoop.hbase.logging.Log4jUtils;
097import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
098import org.apache.hadoop.hbase.master.HMaster;
099import org.apache.hadoop.hbase.master.RegionState;
100import org.apache.hadoop.hbase.master.ServerManager;
101import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
102import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
103import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
104import org.apache.hadoop.hbase.master.assignment.RegionStates;
105import org.apache.hadoop.hbase.mob.MobFileCache;
106import org.apache.hadoop.hbase.regionserver.BloomType;
107import org.apache.hadoop.hbase.regionserver.ChunkCreator;
108import org.apache.hadoop.hbase.regionserver.HRegion;
109import org.apache.hadoop.hbase.regionserver.HRegionServer;
110import org.apache.hadoop.hbase.regionserver.HStore;
111import org.apache.hadoop.hbase.regionserver.InternalScanner;
112import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
113import org.apache.hadoop.hbase.regionserver.Region;
114import org.apache.hadoop.hbase.regionserver.RegionScanner;
115import org.apache.hadoop.hbase.regionserver.RegionServerServices;
116import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
117import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
118import org.apache.hadoop.hbase.security.User;
119import org.apache.hadoop.hbase.security.UserProvider;
120import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
121import org.apache.hadoop.hbase.trace.TraceUtil;
122import org.apache.hadoop.hbase.util.Bytes;
123import org.apache.hadoop.hbase.util.CommonFSUtils;
124import org.apache.hadoop.hbase.util.FSUtils;
125import org.apache.hadoop.hbase.util.JVMClusterUtil;
126import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
127import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
128import org.apache.hadoop.hbase.util.Pair;
129import org.apache.hadoop.hbase.util.RegionSplitter;
130import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
131import org.apache.hadoop.hbase.util.RetryCounter;
132import org.apache.hadoop.hbase.util.Threads;
133import org.apache.hadoop.hbase.wal.WAL;
134import org.apache.hadoop.hbase.wal.WALFactory;
135import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
136import org.apache.hadoop.hbase.zookeeper.ZKConfig;
137import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
138import org.apache.hadoop.hdfs.DFSClient;
139import org.apache.hadoop.hdfs.DistributedFileSystem;
140import org.apache.hadoop.hdfs.MiniDFSCluster;
141import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
142import org.apache.hadoop.mapred.JobConf;
143import org.apache.hadoop.mapred.MiniMRCluster;
144import org.apache.hadoop.mapred.TaskLog;
145import org.apache.hadoop.minikdc.MiniKdc;
146import org.apache.yetus.audience.InterfaceAudience;
147import org.apache.zookeeper.WatchedEvent;
148import org.apache.zookeeper.ZooKeeper;
149import org.apache.zookeeper.ZooKeeper.States;
150
151import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
152
153import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
154
155/**
156 * Facility for testing HBase. Replacement for
157 * old HBaseTestCase and HBaseClusterTestCase functionality.
158 * Create an instance and keep it around testing HBase.  This class is
159 * meant to be your one-stop shop for anything you might need testing.  Manages
160 * one cluster at a time only. Managed cluster can be an in-process
161 * {@link MiniHBaseCluster}, or a deployed cluster of type {@code DistributedHBaseCluster}.
162 * Not all methods work with the real cluster.
163 * Depends on log4j being on classpath and
164 * hbase-site.xml for logging and test-run configuration.  It does not set
165 * logging levels.
166 * In the configuration properties, default values for master-info-port and
167 * region-server-port are overridden such that a random port will be assigned (thus
168 * avoiding port contention if another local HBase instance is already running).
169 * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
170 * setting it to true.
171 */
172@InterfaceAudience.Public
173@SuppressWarnings("deprecation")
174public class HBaseTestingUtility extends HBaseZKTestingUtility {
175
176  /**
177   * System property key to get test directory value. Name is as it is because mini dfs has
178   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
179   * used in mini dfs.
180   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
181   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
182   */
183  @Deprecated
184  private static final String TEST_DIRECTORY_KEY = "test.build.data";
185
186  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
187  /**
188   * The default number of regions per regionserver when creating a pre-split
189   * table.
190   */
191  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
192
193
194  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
195  public static final boolean PRESPLIT_TEST_TABLE = true;
196
197  private MiniDFSCluster dfsCluster = null;
198
199  private volatile HBaseCluster hbaseCluster = null;
200  private MiniMRCluster mrCluster = null;
201
202  /** If there is a mini cluster running for this testing utility instance. */
203  private volatile boolean miniClusterRunning;
204
205  private String hadoopLogDir;
206
207  /** Directory on test filesystem where we put the data for this instance of
208    * HBaseTestingUtility*/
209  private Path dataTestDirOnTestFS = null;
210
211  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
212
213  /** Filesystem URI used for map-reduce mini-cluster setup */
214  private static String FS_URI;
215
216  /** This is for unit tests parameterized with a single boolean. */
217  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
218
219  /**
220   * Checks to see if a specific port is available.
221   *
222   * @param port the port number to check for availability
223   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
224   */
225  public static boolean available(int port) {
226    ServerSocket ss = null;
227    DatagramSocket ds = null;
228    try {
229      ss = new ServerSocket(port);
230      ss.setReuseAddress(true);
231      ds = new DatagramSocket(port);
232      ds.setReuseAddress(true);
233      return true;
234    } catch (IOException e) {
235      // Do nothing
236    } finally {
237      if (ds != null) {
238        ds.close();
239      }
240
241      if (ss != null) {
242        try {
243          ss.close();
244        } catch (IOException e) {
245          /* should not be thrown */
246        }
247      }
248    }
249
250    return false;
251  }
252
253  /**
254   * Create all combinations of Bloom filters and compression algorithms for
255   * testing.
256   */
257  private static List<Object[]> bloomAndCompressionCombinations() {
258    List<Object[]> configurations = new ArrayList<>();
259    for (Compression.Algorithm comprAlgo :
260         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
261      for (BloomType bloomType : BloomType.values()) {
262        configurations.add(new Object[] { comprAlgo, bloomType });
263      }
264    }
265    return Collections.unmodifiableList(configurations);
266  }
267
268  /**
269   * Create combination of memstoreTS and tags
270   */
271  private static List<Object[]> memStoreTSAndTagsCombination() {
272    List<Object[]> configurations = new ArrayList<>();
273    configurations.add(new Object[] { false, false });
274    configurations.add(new Object[] { false, true });
275    configurations.add(new Object[] { true, false });
276    configurations.add(new Object[] { true, true });
277    return Collections.unmodifiableList(configurations);
278  }
279
280  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
281    List<Object[]> configurations = new ArrayList<>();
282    configurations.add(new Object[] { false, false, true });
283    configurations.add(new Object[] { false, false, false });
284    configurations.add(new Object[] { false, true, true });
285    configurations.add(new Object[] { false, true, false });
286    configurations.add(new Object[] { true, false, true });
287    configurations.add(new Object[] { true, false, false });
288    configurations.add(new Object[] { true, true, true });
289    configurations.add(new Object[] { true, true, false });
290    return Collections.unmodifiableList(configurations);
291  }
292
293  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
294      bloomAndCompressionCombinations();
295
296
297  /**
298   * <p>Create an HBaseTestingUtility using a default configuration.
299   *
300   * <p>Initially, all tmp files are written to a local test data directory.
301   * Once {@link #startMiniDFSCluster} is called, either directly or via
302   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
303   */
304  public HBaseTestingUtility() {
305    this(HBaseConfiguration.create());
306  }
307
308  /**
309   * <p>Create an HBaseTestingUtility using a given configuration.
310   *
311   * <p>Initially, all tmp files are written to a local test data directory.
312   * Once {@link #startMiniDFSCluster} is called, either directly or via
313   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
314   *
315   * @param conf The configuration to use for further operations
316   */
317  public HBaseTestingUtility(@Nullable Configuration conf) {
318    super(conf);
319
320    // a hbase checksum verification failure will cause unit tests to fail
321    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
322
323    // Save this for when setting default file:// breaks things
324    if (this.conf.get("fs.defaultFS") != null) {
325      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
326    }
327    if (this.conf.get(HConstants.HBASE_DIR) != null) {
328      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
329    }
330    // Every cluster is a local cluster until we start DFS
331    // Note that conf could be null, but this.conf will not be
332    String dataTestDir = getDataTestDir().toString();
333    this.conf.set("fs.defaultFS","file:///");
334    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
335    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
336    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
337    // If the value for random ports isn't set set it to true, thus making
338    // tests opt-out for random port assignment
339    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
340        this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
341  }
342
343  /**
344   * Close both the region {@code r} and it's underlying WAL. For use in tests.
345   */
346  public static void closeRegionAndWAL(final Region r) throws IOException {
347    closeRegionAndWAL((HRegion)r);
348  }
349
350  /**
351   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
352   */
353  public static void closeRegionAndWAL(final HRegion r) throws IOException {
354    if (r == null) return;
355    r.close();
356    if (r.getWAL() == null) return;
357    r.getWAL().close();
358  }
359
360  /**
361   * Returns this classes's instance of {@link Configuration}.  Be careful how
362   * you use the returned Configuration since {@link Connection} instances
363   * can be shared.  The Map of Connections is keyed by the Configuration.  If
364   * say, a Connection was being used against a cluster that had been shutdown,
365   * see {@link #shutdownMiniCluster()}, then the Connection will no longer
366   * be wholesome.  Rather than use the return direct, its usually best to
367   * make a copy and use that.  Do
368   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
369   * @return Instance of Configuration.
370   */
371  @Override
372  public Configuration getConfiguration() {
373    return super.getConfiguration();
374  }
375
376  public void setHBaseCluster(HBaseCluster hbaseCluster) {
377    this.hbaseCluster = hbaseCluster;
378  }
379
380  /**
381   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
382   * Give it a random name so can have many concurrent tests running if
383   * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
384   * System property, as it's what minidfscluster bases
385   * it data dir on.  Moding a System property is not the way to do concurrent
386   * instances -- another instance could grab the temporary
387   * value unintentionally -- but not anything can do about it at moment;
388   * single instance only is how the minidfscluster works.
389   *
390   * We also create the underlying directory names for
391   *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
392   *  in the conf, and as a system property for hadoop.tmp.dir (We do not create them!).
393   *
394   * @return The calculated data test build directory, if newly-created.
395   */
396  @Override
397  protected Path setupDataTestDir() {
398    Path testPath = super.setupDataTestDir();
399    if (null == testPath) {
400      return null;
401    }
402
403    createSubDirAndSystemProperty(
404      "hadoop.log.dir",
405      testPath, "hadoop-log-dir");
406
407    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
408    //  we want our own value to ensure uniqueness on the same machine
409    createSubDirAndSystemProperty(
410      "hadoop.tmp.dir",
411      testPath, "hadoop-tmp-dir");
412
413    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
414    createSubDir(
415      "mapreduce.cluster.local.dir",
416      testPath, "mapred-local-dir");
417    return testPath;
418  }
419
420  private void createSubDirAndSystemProperty(
421    String propertyName, Path parent, String subDirName){
422
423    String sysValue = System.getProperty(propertyName);
424
425    if (sysValue != null) {
426      // There is already a value set. So we do nothing but hope
427      //  that there will be no conflicts
428      LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
429        sysValue + " so I do NOT create it in " + parent);
430      String confValue = conf.get(propertyName);
431      if (confValue != null && !confValue.endsWith(sysValue)){
432       LOG.warn(
433         propertyName + " property value differs in configuration and system: "+
434         "Configuration="+confValue+" while System="+sysValue+
435         " Erasing configuration value by system value."
436       );
437      }
438      conf.set(propertyName, sysValue);
439    } else {
440      // Ok, it's not set, so we create it as a subdirectory
441      createSubDir(propertyName, parent, subDirName);
442      System.setProperty(propertyName, conf.get(propertyName));
443    }
444  }
445
446  /**
447   * @return Where to write test data on the test filesystem; Returns working directory
448   * for the test filesystem by default
449   * @see #setupDataTestDirOnTestFS()
450   * @see #getTestFileSystem()
451   */
452  private Path getBaseTestDirOnTestFS() throws IOException {
453    FileSystem fs = getTestFileSystem();
454    return new Path(fs.getWorkingDirectory(), "test-data");
455  }
456
457  /**
458   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
459   * to write temporary test data. Call this method after setting up the mini dfs cluster
460   * if the test relies on it.
461   * @return a unique path in the test filesystem
462   */
463  public Path getDataTestDirOnTestFS() throws IOException {
464    if (dataTestDirOnTestFS == null) {
465      setupDataTestDirOnTestFS();
466    }
467
468    return dataTestDirOnTestFS;
469  }
470
471  /**
472   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
473   * to write temporary test data. Call this method after setting up the mini dfs cluster
474   * if the test relies on it.
475   * @return a unique path in the test filesystem
476   * @param subdirName name of the subdir to create under the base test dir
477   */
478  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
479    return new Path(getDataTestDirOnTestFS(), subdirName);
480  }
481
482  /**
483   * Sets up a path in test filesystem to be used by tests.
484   * Creates a new directory if not already setup.
485   */
486  private void setupDataTestDirOnTestFS() throws IOException {
487    if (dataTestDirOnTestFS != null) {
488      LOG.warn("Data test on test fs dir already setup in "
489          + dataTestDirOnTestFS.toString());
490      return;
491    }
492    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
493  }
494
495  /**
496   * Sets up a new path in test filesystem to be used by tests.
497   */
498  private Path getNewDataTestDirOnTestFS() throws IOException {
499    //The file system can be either local, mini dfs, or if the configuration
500    //is supplied externally, it can be an external cluster FS. If it is a local
501    //file system, the tests should use getBaseTestDir, otherwise, we can use
502    //the working directory, and create a unique sub dir there
503    FileSystem fs = getTestFileSystem();
504    Path newDataTestDir;
505    String randomStr = getRandomUUID().toString();
506    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
507      newDataTestDir = new Path(getDataTestDir(), randomStr);
508      File dataTestDir = new File(newDataTestDir.toString());
509      if (deleteOnExit()) dataTestDir.deleteOnExit();
510    } else {
511      Path base = getBaseTestDirOnTestFS();
512      newDataTestDir = new Path(base, randomStr);
513      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
514    }
515    return newDataTestDir;
516  }
517
518  /**
519   * Cleans the test data directory on the test filesystem.
520   * @return True if we removed the test dirs
521   * @throws IOException
522   */
523  public boolean cleanupDataTestDirOnTestFS() throws IOException {
524    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
525    if (ret)
526      dataTestDirOnTestFS = null;
527    return ret;
528  }
529
530  /**
531   * Cleans a subdirectory under the test data directory on the test filesystem.
532   * @return True if we removed child
533   * @throws IOException
534   */
535  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
536    Path cpath = getDataTestDirOnTestFS(subdirName);
537    return getTestFileSystem().delete(cpath, true);
538  }
539
540  /**
541   * Start a minidfscluster.
542   * @param servers How many DNs to start.
543   * @throws Exception
544   * @see #shutdownMiniDFSCluster()
545   * @return The mini dfs cluster created.
546   */
547  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
548    return startMiniDFSCluster(servers, null);
549  }
550
551  /**
552   * Start a minidfscluster.
553   * This is useful if you want to run datanode on distinct hosts for things
554   * like HDFS block location verification.
555   * If you start MiniDFSCluster without host names, all instances of the
556   * datanodes will have the same host name.
557   * @param hosts hostnames DNs to run on.
558   * @throws Exception
559   * @see #shutdownMiniDFSCluster()
560   * @return The mini dfs cluster created.
561   */
562  public MiniDFSCluster startMiniDFSCluster(final String hosts[])
563  throws Exception {
564    if ( hosts != null && hosts.length != 0) {
565      return startMiniDFSCluster(hosts.length, hosts);
566    } else {
567      return startMiniDFSCluster(1, null);
568    }
569  }
570
571  /**
572   * Start a minidfscluster.
573   * Can only create one.
574   * @param servers How many DNs to start.
575   * @param hosts hostnames DNs to run on.
576   * @throws Exception
577   * @see #shutdownMiniDFSCluster()
578   * @return The mini dfs cluster created.
579   */
580  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
581  throws Exception {
582    return startMiniDFSCluster(servers, null, hosts);
583  }
584
585  private void setFs() throws IOException {
586    if(this.dfsCluster == null){
587      LOG.info("Skipping setting fs because dfsCluster is null");
588      return;
589    }
590    FileSystem fs = this.dfsCluster.getFileSystem();
591    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
592
593    // re-enable this check with dfs
594    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
595  }
596
597  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
598      throws Exception {
599    createDirsAndSetProperties();
600    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
601
602    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
603    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
604    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
605      "ERROR");
606
607    TraceUtil.initTracer(conf);
608
609    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
610        true, null, racks, hosts, null);
611
612    // Set this just-started cluster as our filesystem.
613    setFs();
614
615    // Wait for the cluster to be totally up
616    this.dfsCluster.waitClusterUp();
617
618    //reset the test directory for test file system
619    dataTestDirOnTestFS = null;
620    String dataTestDir = getDataTestDir().toString();
621    conf.set(HConstants.HBASE_DIR, dataTestDir);
622    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
623
624    return this.dfsCluster;
625  }
626
627  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
628    createDirsAndSetProperties();
629    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
630    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.util.MBeans.class.getName(), "ERROR");
631    Log4jUtils.setLogLevel(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class.getName(),
632      "ERROR");
633    dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
634        null, null, null);
635    return dfsCluster;
636  }
637
638  /**
639   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
640   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
641   * the conf.
642   *
643   * <pre>
644   * Configuration conf = TEST_UTIL.getConfiguration();
645   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
646   *   Map.Entry&lt;String, String&gt; e = i.next();
647   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
648   * }
649   * </pre>
650   */
651  private void createDirsAndSetProperties() throws IOException {
652    setupClusterTestDir();
653    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
654    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
655    createDirAndSetProperty("test.cache.data");
656    createDirAndSetProperty("hadoop.tmp.dir");
657    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
658    createDirAndSetProperty("mapreduce.cluster.local.dir");
659    createDirAndSetProperty("mapreduce.cluster.temp.dir");
660    enableShortCircuit();
661
662    Path root = getDataTestDirOnTestFS("hadoop");
663    conf.set(MapreduceTestingShim.getMROutputDirProp(),
664      new Path(root, "mapred-output-dir").toString());
665    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
666    conf.set("mapreduce.jobtracker.staging.root.dir",
667      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
668    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
669    conf.set("yarn.app.mapreduce.am.staging-dir",
670      new Path(root, "mapreduce-am-staging-root-dir").toString());
671
672    // Frustrate yarn's and hdfs's attempts at writing /tmp.
673    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
674    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
675    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
676    createDirAndSetProperty("yarn.nodemanager.log-dirs");
677    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
678    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
679    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
680    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
681    createDirAndSetProperty("dfs.journalnode.edits.dir");
682    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
683    createDirAndSetProperty("nfs.dump.dir");
684    createDirAndSetProperty("java.io.tmpdir");
685    createDirAndSetProperty("dfs.journalnode.edits.dir");
686    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
687    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
688  }
689
690  /**
691   *  Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating
692   *  new column families. Default to false.
693   */
694  public boolean isNewVersionBehaviorEnabled(){
695    final String propName = "hbase.tests.new.version.behavior";
696    String v = System.getProperty(propName);
697    if (v != null){
698      return Boolean.parseBoolean(v);
699    }
700    return false;
701  }
702
703  /**
704   *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
705   *  This allows to specify this parameter on the command line.
706   *   If not set, default is true.
707   */
708  public boolean isReadShortCircuitOn(){
709    final String propName = "hbase.tests.use.shortcircuit.reads";
710    String readOnProp = System.getProperty(propName);
711    if (readOnProp != null){
712      return  Boolean.parseBoolean(readOnProp);
713    } else {
714      return conf.getBoolean(propName, false);
715    }
716  }
717
718  /** Enable the short circuit read, unless configured differently.
719   * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
720   */
721  private void enableShortCircuit() {
722    if (isReadShortCircuitOn()) {
723      String curUser = System.getProperty("user.name");
724      LOG.info("read short circuit is ON for user " + curUser);
725      // read short circuit, for hdfs
726      conf.set("dfs.block.local-path-access.user", curUser);
727      // read short circuit, for hbase
728      conf.setBoolean("dfs.client.read.shortcircuit", true);
729      // Skip checking checksum, for the hdfs client and the datanode
730      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
731    } else {
732      LOG.info("read short circuit is OFF");
733    }
734  }
735
736  private String createDirAndSetProperty(final String property) {
737    return createDirAndSetProperty(property, property);
738  }
739
740  private String createDirAndSetProperty(final String relPath, String property) {
741    String path = getDataTestDir(relPath).toString();
742    System.setProperty(property, path);
743    conf.set(property, path);
744    new File(path).mkdirs();
745    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
746    return path;
747  }
748
749  /**
750   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
751   * or does nothing.
752   * @throws IOException
753   */
754  public void shutdownMiniDFSCluster() throws IOException {
755    if (this.dfsCluster != null) {
756      // The below throws an exception per dn, AsynchronousCloseException.
757      this.dfsCluster.shutdown();
758      dfsCluster = null;
759      dataTestDirOnTestFS = null;
760      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
761    }
762  }
763
764  /**
765   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
766   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
767   * @param createWALDir Whether to create a new WAL directory.
768   * @return The mini HBase cluster created.
769   * @see #shutdownMiniCluster()
770   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
771   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
772   * @see #startMiniCluster(StartMiniClusterOption)
773   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
774   */
775  @Deprecated
776  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
777    StartMiniClusterOption option = StartMiniClusterOption.builder()
778        .createWALDir(createWALDir).build();
779    return startMiniCluster(option);
780  }
781
782  /**
783   * Start up a minicluster of hbase, dfs, and zookeeper.
784   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
785   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
786   * @param createRootDir Whether to create a new root or data directory path.
787   * @return The mini HBase cluster created.
788   * @see #shutdownMiniCluster()
789   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
790   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
791   * @see #startMiniCluster(StartMiniClusterOption)
792   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
793   */
794  @Deprecated
795  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir)
796  throws Exception {
797    StartMiniClusterOption option = StartMiniClusterOption.builder()
798        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir).build();
799    return startMiniCluster(option);
800  }
801
802  /**
803   * Start up a minicluster of hbase, dfs, and zookeeper.
804   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
805   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
806   * @param createRootDir Whether to create a new root or data directory path.
807   * @param createWALDir Whether to create a new WAL directory.
808   * @return The mini HBase cluster created.
809   * @see #shutdownMiniCluster()
810   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
811   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
812   * @see #startMiniCluster(StartMiniClusterOption)
813   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
814   */
815  @Deprecated
816  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
817      boolean createWALDir) throws Exception {
818    StartMiniClusterOption option = StartMiniClusterOption.builder()
819        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir)
820        .createWALDir(createWALDir).build();
821    return startMiniCluster(option);
822  }
823
824  /**
825   * Start up a minicluster of hbase, dfs, and zookeeper.
826   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
827   * @param numMasters Master node number.
828   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
829   * @param createRootDir Whether to create a new root or data directory path.
830   * @return The mini HBase cluster created.
831   * @see #shutdownMiniCluster()
832   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
833   *  {@link #startMiniCluster(StartMiniClusterOption)} instead.
834   * @see #startMiniCluster(StartMiniClusterOption)
835   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
836   */
837  @Deprecated
838  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
839    throws Exception {
840    StartMiniClusterOption option = StartMiniClusterOption.builder()
841        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
842        .numDataNodes(numSlaves).build();
843    return startMiniCluster(option);
844  }
845
846  /**
847   * Start up a minicluster of hbase, dfs, and zookeeper.
848   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
849   * @param numMasters Master node number.
850   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
851   * @return The mini HBase cluster created.
852   * @see #shutdownMiniCluster()
853   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
854   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
855   * @see #startMiniCluster(StartMiniClusterOption)
856   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
857   */
858  @Deprecated
859  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
860    StartMiniClusterOption option = StartMiniClusterOption.builder()
861        .numMasters(numMasters).numRegionServers(numSlaves).numDataNodes(numSlaves).build();
862    return startMiniCluster(option);
863  }
864
865  /**
866   * Start up a minicluster of hbase, dfs, and zookeeper.
867   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
868   * @param numMasters Master node number.
869   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
870   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
871   *                      HDFS data node number.
872   * @param createRootDir Whether to create a new root or data directory path.
873   * @return The mini HBase cluster created.
874   * @see #shutdownMiniCluster()
875   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
876   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
877   * @see #startMiniCluster(StartMiniClusterOption)
878   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
879   */
880  @Deprecated
881  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
882      boolean createRootDir) throws Exception {
883    StartMiniClusterOption option = StartMiniClusterOption.builder()
884        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
885        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
886    return startMiniCluster(option);
887  }
888
889  /**
890   * Start up a minicluster of hbase, dfs, and zookeeper.
891   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
892   * @param numMasters Master node number.
893   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
894   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
895   *                      HDFS data node number.
896   * @return The mini HBase cluster created.
897   * @see #shutdownMiniCluster()
898   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
899   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
900   * @see #startMiniCluster(StartMiniClusterOption)
901   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
902   */
903  @Deprecated
904  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
905      throws Exception {
906    StartMiniClusterOption option = StartMiniClusterOption.builder()
907        .numMasters(numMasters).numRegionServers(numSlaves)
908        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
909    return startMiniCluster(option);
910  }
911
912  /**
913   * Start up a minicluster of hbase, dfs, and zookeeper.
914   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
915   * @param numMasters Master node number.
916   * @param numRegionServers Number of region servers.
917   * @param numDataNodes Number of datanodes.
918   * @return The mini HBase cluster created.
919   * @see #shutdownMiniCluster()
920   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
921   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
922   * @see #startMiniCluster(StartMiniClusterOption)
923   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
924   */
925  @Deprecated
926  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
927      throws Exception {
928    StartMiniClusterOption option = StartMiniClusterOption.builder()
929        .numMasters(numMasters).numRegionServers(numRegionServers).numDataNodes(numDataNodes)
930        .build();
931    return startMiniCluster(option);
932  }
933
934  /**
935   * Start up a minicluster of hbase, dfs, and zookeeper.
936   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
937   * @param numMasters Master node number.
938   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
939   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
940   *                      HDFS data node number.
941   * @param masterClass The class to use as HMaster, or null for default.
942   * @param rsClass The class to use as HRegionServer, or null for default.
943   * @return The mini HBase cluster created.
944   * @see #shutdownMiniCluster()
945   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
946   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
947   * @see #startMiniCluster(StartMiniClusterOption)
948   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
949   */
950  @Deprecated
951  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
952      Class<? extends HMaster> masterClass,
953      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
954      throws Exception {
955    StartMiniClusterOption option = StartMiniClusterOption.builder()
956        .numMasters(numMasters).masterClass(masterClass)
957        .numRegionServers(numSlaves).rsClass(rsClass)
958        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts)
959        .build();
960    return startMiniCluster(option);
961  }
962
963  /**
964   * Start up a minicluster of hbase, dfs, and zookeeper.
965   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
966   * @param numMasters Master node number.
967   * @param numRegionServers Number of region servers.
968   * @param numDataNodes Number of datanodes.
969   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
970   *                      HDFS data node number.
971   * @param masterClass The class to use as HMaster, or null for default.
972   * @param rsClass The class to use as HRegionServer, or null for default.
973   * @return The mini HBase cluster created.
974   * @see #shutdownMiniCluster()
975   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
976   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
977   * @see #startMiniCluster(StartMiniClusterOption)
978   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
979   */
980  @Deprecated
981  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
982      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
983      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
984    throws Exception {
985    StartMiniClusterOption option = StartMiniClusterOption.builder()
986        .numMasters(numMasters).masterClass(masterClass)
987        .numRegionServers(numRegionServers).rsClass(rsClass)
988        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
989        .build();
990    return startMiniCluster(option);
991  }
992
993  /**
994   * Start up a minicluster of hbase, dfs, and zookeeper.
995   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
996   * @param numMasters Master node number.
997   * @param numRegionServers Number of region servers.
998   * @param numDataNodes Number of datanodes.
999   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
1000   *                      HDFS data node number.
1001   * @param masterClass The class to use as HMaster, or null for default.
1002   * @param rsClass The class to use as HRegionServer, or null for default.
1003   * @param createRootDir Whether to create a new root or data directory path.
1004   * @param createWALDir Whether to create a new WAL directory.
1005   * @return The mini HBase cluster created.
1006   * @see #shutdownMiniCluster()
1007   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1008   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
1009   * @see #startMiniCluster(StartMiniClusterOption)
1010   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1011   */
1012  @Deprecated
1013  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1014      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1015      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1016      boolean createWALDir) throws Exception {
1017    StartMiniClusterOption option = StartMiniClusterOption.builder()
1018        .numMasters(numMasters).masterClass(masterClass)
1019        .numRegionServers(numRegionServers).rsClass(rsClass)
1020        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
1021        .createRootDir(createRootDir).createWALDir(createWALDir)
1022        .build();
1023    return startMiniCluster(option);
1024  }
1025
1026  /**
1027   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number.
1028   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1029   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1030   * @see #startMiniCluster(StartMiniClusterOption option)
1031   * @see #shutdownMiniDFSCluster()
1032   */
1033  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1034    StartMiniClusterOption option = StartMiniClusterOption.builder()
1035        .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1036    return startMiniCluster(option);
1037  }
1038
1039  /**
1040   * Start up a minicluster of hbase, dfs and zookeeper all using default options.
1041   * Option default value can be found in {@link StartMiniClusterOption.Builder}.
1042   * @see #startMiniCluster(StartMiniClusterOption option)
1043   * @see #shutdownMiniDFSCluster()
1044   */
1045  public MiniHBaseCluster startMiniCluster() throws Exception {
1046    return startMiniCluster(StartMiniClusterOption.builder().build());
1047  }
1048
1049  /**
1050   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed.
1051   * It modifies Configuration.  It homes the cluster data directory under a random
1052   * subdirectory in a directory under System property test.build.data, to be cleaned up on exit.
1053   * @see #shutdownMiniDFSCluster()
1054   */
1055  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1056    LOG.info("Starting up minicluster with option: {}", option);
1057
1058    // If we already put up a cluster, fail.
1059    if (miniClusterRunning) {
1060      throw new IllegalStateException("A mini-cluster is already running");
1061    }
1062    miniClusterRunning = true;
1063
1064    setupClusterTestDir();
1065    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
1066
1067    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1068    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1069    if (dfsCluster == null) {
1070      LOG.info("STARTING DFS");
1071      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1072    } else {
1073      LOG.info("NOT STARTING DFS");
1074    }
1075
1076    // Start up a zk cluster.
1077    if (getZkCluster() == null) {
1078      startMiniZKCluster(option.getNumZkServers());
1079    }
1080
1081    // Start the MiniHBaseCluster
1082    return startMiniHBaseCluster(option);
1083  }
1084
1085  /**
1086   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1087   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1088   * @return Reference to the hbase mini hbase cluster.
1089   * @see #startMiniCluster(StartMiniClusterOption)
1090   * @see #shutdownMiniHBaseCluster()
1091   */
1092  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1093    throws IOException, InterruptedException {
1094    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1095    createRootDir(option.isCreateRootDir());
1096    if (option.isCreateWALDir()) {
1097      createWALRootDir();
1098    }
1099    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1100    // for tests that do not read hbase-defaults.xml
1101    setHBaseFsTmpDir();
1102
1103    // These settings will make the server waits until this exact number of
1104    // regions servers are connected.
1105    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1106      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1107    }
1108    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1109      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1110    }
1111
1112    // Avoid log flooded with chore execution time, see HBASE-24646 for more details.
1113    Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
1114
1115    Configuration c = new Configuration(this.conf);
1116    TraceUtil.initTracer(c);
1117    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1118      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1119      option.getMasterClass(), option.getRsClass());
1120    // Populate the master address configuration from mini cluster configuration.
1121    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1122    // Don't leave here till we've done a successful scan of the hbase:meta
1123    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1124      ResultScanner s = t.getScanner(new Scan())) {
1125      for (;;) {
1126        if (s.next() == null) {
1127          break;
1128        }
1129      }
1130    }
1131
1132
1133    getAdmin(); // create immediately the hbaseAdmin
1134    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1135
1136    return (MiniHBaseCluster) hbaseCluster;
1137  }
1138
1139  /**
1140   * Starts up mini hbase cluster using default options.
1141   * Default options can be found in {@link StartMiniClusterOption.Builder}.
1142   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1143   * @see #shutdownMiniHBaseCluster()
1144   */
1145  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1146    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1147  }
1148
1149  /**
1150   * Starts up mini hbase cluster.
1151   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1152   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1153   * @param numMasters Master node number.
1154   * @param numRegionServers Number of region servers.
1155   * @return The mini HBase cluster created.
1156   * @see #shutdownMiniHBaseCluster()
1157   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1158   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1159   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1160   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1161   */
1162  @Deprecated
1163  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1164      throws IOException, InterruptedException {
1165    StartMiniClusterOption option = StartMiniClusterOption.builder()
1166        .numMasters(numMasters).numRegionServers(numRegionServers).build();
1167    return startMiniHBaseCluster(option);
1168  }
1169
1170  /**
1171   * Starts up mini hbase cluster.
1172   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1173   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1174   * @param numMasters Master node number.
1175   * @param numRegionServers Number of region servers.
1176   * @param rsPorts Ports that RegionServer should use.
1177   * @return The mini HBase cluster created.
1178   * @see #shutdownMiniHBaseCluster()
1179   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1180   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1181   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1182   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1183   */
1184  @Deprecated
1185  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1186      List<Integer> rsPorts) throws IOException, InterruptedException {
1187    StartMiniClusterOption option = StartMiniClusterOption.builder()
1188        .numMasters(numMasters).numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1189    return startMiniHBaseCluster(option);
1190  }
1191
1192  /**
1193   * Starts up mini hbase cluster.
1194   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1195   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1196   * @param numMasters Master node number.
1197   * @param numRegionServers Number of region servers.
1198   * @param rsPorts Ports that RegionServer should use.
1199   * @param masterClass The class to use as HMaster, or null for default.
1200   * @param rsClass The class to use as HRegionServer, or null for default.
1201   * @param createRootDir Whether to create a new root or data directory path.
1202   * @param createWALDir Whether to create a new WAL directory.
1203   * @return The mini HBase cluster created.
1204   * @see #shutdownMiniHBaseCluster()
1205   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1206   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1207   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1208   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1209   */
1210  @Deprecated
1211  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1212      List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1213      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
1214      boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
1215    StartMiniClusterOption option = StartMiniClusterOption.builder()
1216        .numMasters(numMasters).masterClass(masterClass)
1217        .numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1218        .createRootDir(createRootDir).createWALDir(createWALDir).build();
1219    return startMiniHBaseCluster(option);
1220  }
1221
1222  /**
1223   * Starts the hbase cluster up again after shutting it down previously in a
1224   * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1225   * @param servers number of region servers
1226   */
1227  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1228    this.restartHBaseCluster(servers, null);
1229  }
1230
1231  public void restartHBaseCluster(int servers, List<Integer> ports)
1232      throws IOException, InterruptedException {
1233    StartMiniClusterOption option =
1234        StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1235    restartHBaseCluster(option);
1236    invalidateConnection();
1237  }
1238
1239  public void restartHBaseCluster(StartMiniClusterOption option)
1240      throws IOException, InterruptedException {
1241    closeConnection();
1242    this.hbaseCluster =
1243        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
1244            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
1245            option.getRsClass());
1246    // Don't leave here till we've done a successful scan of the hbase:meta
1247    Connection conn = ConnectionFactory.createConnection(this.conf);
1248    Table t = conn.getTable(TableName.META_TABLE_NAME);
1249    ResultScanner s = t.getScanner(new Scan());
1250    while (s.next() != null) {
1251      // do nothing
1252    }
1253    LOG.info("HBase has been restarted");
1254    s.close();
1255    t.close();
1256    conn.close();
1257  }
1258
1259  /**
1260   * @return Current mini hbase cluster. Only has something in it after a call
1261   * to {@link #startMiniCluster()}.
1262   * @see #startMiniCluster()
1263   */
1264  public MiniHBaseCluster getMiniHBaseCluster() {
1265    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1266      return (MiniHBaseCluster)this.hbaseCluster;
1267    }
1268    throw new RuntimeException(hbaseCluster + " not an instance of " +
1269                               MiniHBaseCluster.class.getName());
1270  }
1271
1272  /**
1273   * Stops mini hbase, zk, and hdfs clusters.
1274   * @see #startMiniCluster(int)
1275   */
1276  public void shutdownMiniCluster() throws IOException {
1277    LOG.info("Shutting down minicluster");
1278    shutdownMiniHBaseCluster();
1279    shutdownMiniDFSCluster();
1280    shutdownMiniZKCluster();
1281
1282    cleanupTestDir();
1283    miniClusterRunning = false;
1284    LOG.info("Minicluster is down");
1285  }
1286
1287  /**
1288   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1289   * @throws java.io.IOException in case command is unsuccessful
1290   */
1291  public void shutdownMiniHBaseCluster() throws IOException {
1292    cleanup();
1293    if (this.hbaseCluster != null) {
1294      this.hbaseCluster.shutdown();
1295      // Wait till hbase is down before going on to shutdown zk.
1296      this.hbaseCluster.waitUntilShutDown();
1297      this.hbaseCluster = null;
1298    }
1299    if (zooKeeperWatcher != null) {
1300      zooKeeperWatcher.close();
1301      zooKeeperWatcher = null;
1302    }
1303  }
1304
1305  /**
1306   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1307   * @throws java.io.IOException throws in case command is unsuccessful
1308   */
1309  public void killMiniHBaseCluster() throws IOException {
1310    cleanup();
1311    if (this.hbaseCluster != null) {
1312      getMiniHBaseCluster().killAll();
1313      this.hbaseCluster = null;
1314    }
1315    if (zooKeeperWatcher != null) {
1316      zooKeeperWatcher.close();
1317      zooKeeperWatcher = null;
1318    }
1319  }
1320
1321  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1322  private void cleanup() throws IOException {
1323    closeConnection();
1324    // unset the configuration for MIN and MAX RS to start
1325    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1326    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1327  }
1328
1329  /**
1330   * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1331   * is true, a new root directory path is fetched irrespective of whether it has been fetched
1332   * before or not. If false, previous path is used.
1333   * Note: this does not cause the root dir to be created.
1334   * @return Fully qualified path for the default hbase root dir
1335   * @throws IOException
1336   */
1337  public Path getDefaultRootDirPath(boolean create) throws IOException {
1338    if (!create) {
1339      return getDataTestDirOnTestFS();
1340    } else {
1341      return getNewDataTestDirOnTestFS();
1342    }
1343  }
1344
1345  /**
1346   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1347   * except that <code>create</code> flag is false.
1348   * Note: this does not cause the root dir to be created.
1349   * @return Fully qualified path for the default hbase root dir
1350   * @throws IOException
1351   */
1352  public Path getDefaultRootDirPath() throws IOException {
1353    return getDefaultRootDirPath(false);
1354  }
1355
1356  /**
1357   * Creates an hbase rootdir in user home directory.  Also creates hbase
1358   * version file.  Normally you won't make use of this method.  Root hbasedir
1359   * is created for you as part of mini cluster startup.  You'd only use this
1360   * method if you were doing manual operation.
1361   * @param create This flag decides whether to get a new
1362   * root or data directory path or not, if it has been fetched already.
1363   * Note : Directory will be made irrespective of whether path has been fetched or not.
1364   * If directory already exists, it will be overwritten
1365   * @return Fully qualified path to hbase root dir
1366   * @throws IOException
1367   */
1368  public Path createRootDir(boolean create) throws IOException {
1369    FileSystem fs = FileSystem.get(this.conf);
1370    Path hbaseRootdir = getDefaultRootDirPath(create);
1371    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1372    fs.mkdirs(hbaseRootdir);
1373    FSUtils.setVersion(fs, hbaseRootdir);
1374    return hbaseRootdir;
1375  }
1376
1377  /**
1378   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1379   * except that <code>create</code> flag is false.
1380   * @return Fully qualified path to hbase root dir
1381   * @throws IOException
1382   */
1383  public Path createRootDir() throws IOException {
1384    return createRootDir(false);
1385  }
1386
1387  /**
1388   * Creates a hbase walDir in the user's home directory.
1389   * Normally you won't make use of this method. Root hbaseWALDir
1390   * is created for you as part of mini cluster startup. You'd only use this
1391   * method if you were doing manual operation.
1392   *
1393   * @return Fully qualified path to hbase root dir
1394   * @throws IOException
1395  */
1396  public Path createWALRootDir() throws IOException {
1397    FileSystem fs = FileSystem.get(this.conf);
1398    Path walDir = getNewDataTestDirOnTestFS();
1399    CommonFSUtils.setWALRootDir(this.conf, walDir);
1400    fs.mkdirs(walDir);
1401    return walDir;
1402  }
1403
1404  private void setHBaseFsTmpDir() throws IOException {
1405    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1406    if (hbaseFsTmpDirInString == null) {
1407      this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1408      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1409    } else {
1410      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1411    }
1412  }
1413
1414  /**
1415   * Flushes all caches in the mini hbase cluster
1416   * @throws IOException
1417   */
1418  public void flush() throws IOException {
1419    getMiniHBaseCluster().flushcache();
1420  }
1421
1422  /**
1423   * Flushes all caches in the mini hbase cluster
1424   * @throws IOException
1425   */
1426  public void flush(TableName tableName) throws IOException {
1427    getMiniHBaseCluster().flushcache(tableName);
1428  }
1429
1430  /**
1431   * Compact all regions in the mini hbase cluster
1432   * @throws IOException
1433   */
1434  public void compact(boolean major) throws IOException {
1435    getMiniHBaseCluster().compact(major);
1436  }
1437
1438  /**
1439   * Compact all of a table's reagion in the mini hbase cluster
1440   * @throws IOException
1441   */
1442  public void compact(TableName tableName, boolean major) throws IOException {
1443    getMiniHBaseCluster().compact(tableName, major);
1444  }
1445
1446  /**
1447   * Create a table.
1448   * @param tableName
1449   * @param family
1450   * @return A Table instance for the created table.
1451   * @throws IOException
1452   */
1453  public Table createTable(TableName tableName, String family)
1454  throws IOException{
1455    return createTable(tableName, new String[]{family});
1456  }
1457
1458  /**
1459   * Create a table.
1460   * @param tableName
1461   * @param families
1462   * @return A Table instance for the created table.
1463   * @throws IOException
1464   */
1465  public Table createTable(TableName tableName, String[] families)
1466  throws IOException {
1467    List<byte[]> fams = new ArrayList<>(families.length);
1468    for (String family : families) {
1469      fams.add(Bytes.toBytes(family));
1470    }
1471    return createTable(tableName, fams.toArray(new byte[0][]));
1472  }
1473
1474  /**
1475   * Create a table.
1476   * @param tableName
1477   * @param family
1478   * @return A Table instance for the created table.
1479   * @throws IOException
1480   */
1481  public Table createTable(TableName tableName, byte[] family)
1482  throws IOException{
1483    return createTable(tableName, new byte[][]{family});
1484  }
1485
1486  /**
1487   * Create a table with multiple regions.
1488   * @param tableName
1489   * @param family
1490   * @param numRegions
1491   * @return A Table instance for the created table.
1492   * @throws IOException
1493   */
1494  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1495      throws IOException {
1496    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1497    byte[] startKey = Bytes.toBytes("aaaaa");
1498    byte[] endKey = Bytes.toBytes("zzzzz");
1499    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1500
1501    return createTable(tableName, new byte[][] { family }, splitKeys);
1502  }
1503
1504  /**
1505   * Create a table.
1506   * @param tableName
1507   * @param families
1508   * @return A Table instance for the created table.
1509   * @throws IOException
1510   */
1511  public Table createTable(TableName tableName, byte[][] families)
1512  throws IOException {
1513    return createTable(tableName, families, (byte[][]) null);
1514  }
1515
1516  /**
1517   * Create a table with multiple regions.
1518   * @param tableName
1519   * @param families
1520   * @return A Table instance for the created table.
1521   * @throws IOException
1522   */
1523  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1524    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1525  }
1526
1527  /**
1528   * Create a table with multiple regions.
1529   * @param tableName
1530   * @param replicaCount replica count.
1531   * @param families
1532   * @return A Table instance for the created table.
1533   * @throws IOException
1534   */
1535  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1536    throws IOException {
1537    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1538  }
1539
1540  /**
1541   * Create a table.
1542   * @param tableName
1543   * @param families
1544   * @param splitKeys
1545   * @return A Table instance for the created table.
1546   * @throws IOException
1547   */
1548  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1549      throws IOException {
1550    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1551  }
1552
1553  /**
1554   * Create a table.
1555   * @param tableName the table name
1556   * @param families the families
1557   * @param splitKeys the splitkeys
1558   * @param replicaCount the region replica count
1559   * @return A Table instance for the created table.
1560   * @throws IOException throws IOException
1561   */
1562  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1563      int replicaCount) throws IOException {
1564    return createTable(tableName, families, splitKeys, replicaCount,
1565      new Configuration(getConfiguration()));
1566  }
1567
1568  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1569    byte[] endKey, int numRegions) throws IOException {
1570    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1571
1572    getAdmin().createTable(desc, startKey, endKey, numRegions);
1573    // HBaseAdmin only waits for regions to appear in hbase:meta we
1574    // should wait until they are assigned
1575    waitUntilAllRegionsAssigned(tableName);
1576    return getConnection().getTable(tableName);
1577  }
1578
1579  /**
1580   * Create a table.
1581   * @param c Configuration to use
1582   * @return A Table instance for the created table.
1583   */
1584  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1585    throws IOException {
1586    return createTable(htd, families, null, c);
1587  }
1588
1589  /**
1590   * Create a table.
1591   * @param htd table descriptor
1592   * @param families array of column families
1593   * @param splitKeys array of split keys
1594   * @param c Configuration to use
1595   * @return A Table instance for the created table.
1596   * @throws IOException if getAdmin or createTable fails
1597   */
1598  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1599      Configuration c) throws IOException {
1600    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1601    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1602    // on is interfering.
1603    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1604  }
1605
1606  /**
1607   * Create a table.
1608   * @param htd table descriptor
1609   * @param families array of column families
1610   * @param splitKeys array of split keys
1611   * @param type Bloom type
1612   * @param blockSize block size
1613   * @param c Configuration to use
1614   * @return A Table instance for the created table.
1615   * @throws IOException if getAdmin or createTable fails
1616   */
1617
1618  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1619      BloomType type, int blockSize, Configuration c) throws IOException {
1620    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1621    for (byte[] family : families) {
1622      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1623        .setBloomFilterType(type)
1624        .setBlocksize(blockSize);
1625      if (isNewVersionBehaviorEnabled()) {
1626          cfdb.setNewVersionBehavior(true);
1627      }
1628      builder.setColumnFamily(cfdb.build());
1629    }
1630    TableDescriptor td = builder.build();
1631    if (splitKeys != null) {
1632      getAdmin().createTable(td, splitKeys);
1633    } else {
1634      getAdmin().createTable(td);
1635    }
1636    // HBaseAdmin only waits for regions to appear in hbase:meta
1637    // we should wait until they are assigned
1638    waitUntilAllRegionsAssigned(td.getTableName());
1639    return getConnection().getTable(td.getTableName());
1640  }
1641
1642  /**
1643   * Create a table.
1644   * @param htd table descriptor
1645   * @param splitRows array of split keys
1646   * @return A Table instance for the created table.
1647   * @throws IOException
1648   */
1649  public Table createTable(TableDescriptor htd, byte[][] splitRows)
1650      throws IOException {
1651    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1652    if (isNewVersionBehaviorEnabled()) {
1653      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1654         builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family)
1655           .setNewVersionBehavior(true).build());
1656      }
1657    }
1658    if (splitRows != null) {
1659      getAdmin().createTable(builder.build(), splitRows);
1660    } else {
1661      getAdmin().createTable(builder.build());
1662    }
1663    // HBaseAdmin only waits for regions to appear in hbase:meta
1664    // we should wait until they are assigned
1665    waitUntilAllRegionsAssigned(htd.getTableName());
1666    return getConnection().getTable(htd.getTableName());
1667  }
1668
1669  /**
1670   * Create a table.
1671   * @param tableName the table name
1672   * @param families the families
1673   * @param splitKeys the split keys
1674   * @param replicaCount the replica count
1675   * @param c Configuration to use
1676   * @return A Table instance for the created table.
1677   */
1678  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1679    int replicaCount, final Configuration c) throws IOException {
1680    TableDescriptor htd =
1681      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1682    return createTable(htd, families, splitKeys, c);
1683  }
1684
1685  /**
1686   * Create a table.
1687   * @return A Table instance for the created table.
1688   */
1689  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1690    return createTable(tableName, new byte[][] { family }, numVersions);
1691  }
1692
1693  /**
1694   * Create a table.
1695   * @return A Table instance for the created table.
1696   */
1697  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1698      throws IOException {
1699    return createTable(tableName, families, numVersions, (byte[][]) null);
1700  }
1701
1702  /**
1703   * Create a table.
1704   * @return A Table instance for the created table.
1705   */
1706  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1707      byte[][] splitKeys) throws IOException {
1708    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1709    for (byte[] family : families) {
1710      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1711        .setMaxVersions(numVersions);
1712      if (isNewVersionBehaviorEnabled()) {
1713        cfBuilder.setNewVersionBehavior(true);
1714      }
1715      builder.setColumnFamily(cfBuilder.build());
1716    }
1717    if (splitKeys != null) {
1718      getAdmin().createTable(builder.build(), splitKeys);
1719    } else {
1720      getAdmin().createTable(builder.build());
1721    }
1722    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1723    // assigned
1724    waitUntilAllRegionsAssigned(tableName);
1725    return getConnection().getTable(tableName);
1726  }
1727
1728  /**
1729   * Create a table with multiple regions.
1730   * @return A Table instance for the created table.
1731   */
1732  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1733      throws IOException {
1734    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1735  }
1736
1737  /**
1738   * Create a table.
1739   * @return A Table instance for the created table.
1740   */
1741  public Table createTable(TableName tableName, byte[][] families,
1742    int numVersions, int blockSize) throws IOException {
1743    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1744    for (byte[] family : families) {
1745      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1746        .setMaxVersions(numVersions).setBlocksize(blockSize);
1747      if (isNewVersionBehaviorEnabled()) {
1748        cfBuilder.setNewVersionBehavior(true);
1749      }
1750      builder.setColumnFamily(cfBuilder.build());
1751    }
1752    getAdmin().createTable(builder.build());
1753    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1754    // assigned
1755    waitUntilAllRegionsAssigned(tableName);
1756    return getConnection().getTable(tableName);
1757  }
1758
1759  public Table createTable(TableName tableName, byte[][] families,
1760      int numVersions, int blockSize, String cpName) throws IOException {
1761    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1762    for (byte[] family : families) {
1763      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1764        .setMaxVersions(numVersions).setBlocksize(blockSize);
1765      if (isNewVersionBehaviorEnabled()) {
1766        cfBuilder.setNewVersionBehavior(true);
1767      }
1768      builder.setColumnFamily(cfBuilder.build());
1769    }
1770    if (cpName != null) {
1771      builder.setCoprocessor(cpName);
1772    }
1773    getAdmin().createTable(builder.build());
1774    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1775    // assigned
1776    waitUntilAllRegionsAssigned(tableName);
1777    return getConnection().getTable(tableName);
1778  }
1779
1780  /**
1781   * Create a table.
1782   * @return A Table instance for the created table.
1783   */
1784  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1785    throws IOException {
1786    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1787    int i = 0;
1788    for (byte[] family : families) {
1789      ColumnFamilyDescriptorBuilder cfBuilder =
1790        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1791      if (isNewVersionBehaviorEnabled()) {
1792        cfBuilder.setNewVersionBehavior(true);
1793      }
1794      builder.setColumnFamily(cfBuilder.build());
1795      i++;
1796    }
1797    getAdmin().createTable(builder.build());
1798    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1799    // assigned
1800    waitUntilAllRegionsAssigned(tableName);
1801    return getConnection().getTable(tableName);
1802  }
1803
1804  /**
1805   * Create a table.
1806   * @return A Table instance for the created table.
1807   */
1808  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1809    throws IOException {
1810    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1811    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1812    if (isNewVersionBehaviorEnabled()) {
1813      cfBuilder.setNewVersionBehavior(true);
1814    }
1815    builder.setColumnFamily(cfBuilder.build());
1816    getAdmin().createTable(builder.build(), splitRows);
1817    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1818    // assigned
1819    waitUntilAllRegionsAssigned(tableName);
1820    return getConnection().getTable(tableName);
1821  }
1822
1823  /**
1824   * Create a table with multiple regions.
1825   * @return A Table instance for the created table.
1826   */
1827  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1828    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1829  }
1830
1831  /**
1832   * Modify a table, synchronous.
1833   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1834   *   {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1835   * @see Admin#modifyTable(TableDescriptor)
1836   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1837   */
1838  @Deprecated
1839  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1840      throws IOException, InterruptedException {
1841    admin.modifyTable(desc);
1842  }
1843
1844  /**
1845   * Set the number of Region replicas.
1846   */
1847  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1848    throws IOException, InterruptedException {
1849    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1850      .setRegionReplication(replicaCount).build();
1851    admin.modifyTable(desc);
1852  }
1853
1854  /**
1855   * Drop an existing table
1856   * @param tableName existing table
1857   */
1858  public void deleteTable(TableName tableName) throws IOException {
1859    try {
1860      getAdmin().disableTable(tableName);
1861    } catch (TableNotEnabledException e) {
1862      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1863    }
1864    getAdmin().deleteTable(tableName);
1865  }
1866
1867  /**
1868   * Drop an existing table
1869   * @param tableName existing table
1870   */
1871  public void deleteTableIfAny(TableName tableName) throws IOException {
1872    try {
1873      deleteTable(tableName);
1874    } catch (TableNotFoundException e) {
1875      // ignore
1876    }
1877  }
1878
1879  // ==========================================================================
1880  // Canned table and table descriptor creation
1881
1882  public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1883  public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1884  public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1885  public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1886  private static final int MAXVERSIONS = 3;
1887
1888  public static final char FIRST_CHAR = 'a';
1889  public static final char LAST_CHAR = 'z';
1890  public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1891  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1892
1893  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1894    return createModifyableTableDescriptor(TableName.valueOf(name),
1895      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1896      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1897  }
1898
1899  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1900    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1901    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1902    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1903      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1904        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1905        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1906      if (isNewVersionBehaviorEnabled()) {
1907        cfBuilder.setNewVersionBehavior(true);
1908      }
1909      builder.setColumnFamily(cfBuilder.build());
1910    }
1911    return builder.build();
1912  }
1913
1914  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1915    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1916    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1917    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1918      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1919        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1920        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1921      if (isNewVersionBehaviorEnabled()) {
1922        cfBuilder.setNewVersionBehavior(true);
1923      }
1924      builder.setColumnFamily(cfBuilder.build());
1925    }
1926    return builder;
1927  }
1928
1929  /**
1930   * Create a table of name <code>name</code>.
1931   * @param name Name to give table.
1932   * @return Column descriptor.
1933   */
1934  public TableDescriptor createTableDescriptor(final TableName name) {
1935    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1936      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1937  }
1938
1939  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1940    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1941  }
1942
1943  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1944    int maxVersions) {
1945    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1946    for (byte[] family : families) {
1947      ColumnFamilyDescriptorBuilder cfBuilder =
1948        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1949      if (isNewVersionBehaviorEnabled()) {
1950        cfBuilder.setNewVersionBehavior(true);
1951      }
1952      builder.setColumnFamily(cfBuilder.build());
1953    }
1954    return builder.build();
1955  }
1956
1957  /**
1958   * Create an HRegion that writes to the local tmp dirs
1959   * @param desc a table descriptor indicating which table the region belongs to
1960   * @param startKey the start boundary of the region
1961   * @param endKey the end boundary of the region
1962   * @return a region that writes to local dir for testing
1963   */
1964  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1965    throws IOException {
1966    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1967      .setEndKey(endKey).build();
1968    return createLocalHRegion(hri, desc);
1969  }
1970
1971  /**
1972   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1973   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1974   */
1975  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1976    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1977  }
1978
1979  /**
1980   * Create an HRegion that writes to the local tmp dirs with specified wal
1981   * @param info regioninfo
1982   * @param conf configuration
1983   * @param desc table descriptor
1984   * @param wal wal for this region.
1985   * @return created hregion
1986   * @throws IOException
1987   */
1988  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1989      WAL wal) throws IOException {
1990    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1991  }
1992
1993  /**
1994   * @param tableName
1995   * @param startKey
1996   * @param stopKey
1997   * @param isReadOnly
1998   * @param families
1999   * @return A region on which you must call
2000   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
2001   * @throws IOException
2002   */
2003  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
2004      Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
2005      throws IOException {
2006    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
2007        durability, wal, null, families);
2008  }
2009
2010  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
2011    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
2012    boolean[] compactedMemStore, byte[]... families) throws IOException {
2013    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
2014    builder.setReadOnly(isReadOnly);
2015    int i = 0;
2016    for (byte[] family : families) {
2017      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
2018      if (compactedMemStore != null && i < compactedMemStore.length) {
2019        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
2020      } else {
2021        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
2022
2023      }
2024      i++;
2025      // Set default to be three versions.
2026      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
2027      builder.setColumnFamily(cfBuilder.build());
2028    }
2029    builder.setDurability(durability);
2030    RegionInfo info =
2031      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
2032    return createLocalHRegion(info, conf, builder.build(), wal);
2033  }
2034
2035  //
2036  // ==========================================================================
2037
2038  /**
2039   * Provide an existing table name to truncate.
2040   * Scans the table and issues a delete for each row read.
2041   * @param tableName existing table
2042   * @return HTable to that new table
2043   * @throws IOException
2044   */
2045  public Table deleteTableData(TableName tableName) throws IOException {
2046    Table table = getConnection().getTable(tableName);
2047    Scan scan = new Scan();
2048    ResultScanner resScan = table.getScanner(scan);
2049    for(Result res : resScan) {
2050      Delete del = new Delete(res.getRow());
2051      table.delete(del);
2052    }
2053    resScan = table.getScanner(scan);
2054    resScan.close();
2055    return table;
2056  }
2057
2058  /**
2059   * Truncate a table using the admin command.
2060   * Effectively disables, deletes, and recreates the table.
2061   * @param tableName table which must exist.
2062   * @param preserveRegions keep the existing split points
2063   * @return HTable for the new table
2064   */
2065  public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws
2066      IOException {
2067    Admin admin = getAdmin();
2068    if (!admin.isTableDisabled(tableName)) {
2069      admin.disableTable(tableName);
2070    }
2071    admin.truncateTable(tableName, preserveRegions);
2072    return getConnection().getTable(tableName);
2073  }
2074
2075  /**
2076   * Truncate a table using the admin command.
2077   * Effectively disables, deletes, and recreates the table.
2078   * For previous behavior of issuing row deletes, see
2079   * deleteTableData.
2080   * Expressly does not preserve regions of existing table.
2081   * @param tableName table which must exist.
2082   * @return HTable for the new table
2083   */
2084  public Table truncateTable(final TableName tableName) throws IOException {
2085    return truncateTable(tableName, false);
2086  }
2087
2088  /**
2089   * Load table with rows from 'aaa' to 'zzz'.
2090   * @param t Table
2091   * @param f Family
2092   * @return Count of rows loaded.
2093   * @throws IOException
2094   */
2095  public int loadTable(final Table t, final byte[] f) throws IOException {
2096    return loadTable(t, new byte[][] {f});
2097  }
2098
2099  /**
2100   * Load table with rows from 'aaa' to 'zzz'.
2101   * @param t Table
2102   * @param f Family
2103   * @return Count of rows loaded.
2104   * @throws IOException
2105   */
2106  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2107    return loadTable(t, new byte[][] {f}, null, writeToWAL);
2108  }
2109
2110  /**
2111   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2112   * @param t Table
2113   * @param f Array of Families to load
2114   * @return Count of rows loaded.
2115   * @throws IOException
2116   */
2117  public int loadTable(final Table t, final byte[][] f) throws IOException {
2118    return loadTable(t, f, null);
2119  }
2120
2121  /**
2122   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2123   * @param t Table
2124   * @param f Array of Families to load
2125   * @param value the values of the cells. If null is passed, the row key is used as value
2126   * @return Count of rows loaded.
2127   * @throws IOException
2128   */
2129  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2130    return loadTable(t, f, value, true);
2131  }
2132
2133  /**
2134   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2135   * @param t Table
2136   * @param f Array of Families to load
2137   * @param value the values of the cells. If null is passed, the row key is used as value
2138   * @return Count of rows loaded.
2139   * @throws IOException
2140   */
2141  public int loadTable(final Table t, final byte[][] f, byte[] value,
2142      boolean writeToWAL) throws IOException {
2143    List<Put> puts = new ArrayList<>();
2144    for (byte[] row : HBaseTestingUtility.ROWS) {
2145      Put put = new Put(row);
2146      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2147      for (int i = 0; i < f.length; i++) {
2148        byte[] value1 = value != null ? value : row;
2149        put.addColumn(f[i], f[i], value1);
2150      }
2151      puts.add(put);
2152    }
2153    t.put(puts);
2154    return puts.size();
2155  }
2156
2157  /** A tracker for tracking and validating table rows
2158   * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])}
2159   */
2160  public static class SeenRowTracker {
2161    int dim = 'z' - 'a' + 1;
2162    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2163    byte[] startRow;
2164    byte[] stopRow;
2165
2166    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2167      this.startRow = startRow;
2168      this.stopRow = stopRow;
2169    }
2170
2171    void reset() {
2172      for (byte[] row : ROWS) {
2173        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2174      }
2175    }
2176
2177    int i(byte b) {
2178      return b - 'a';
2179    }
2180
2181    public void addRow(byte[] row) {
2182      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2183    }
2184
2185    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2186     * all other rows none
2187     */
2188    public void validate() {
2189      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2190        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2191          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2192            int count = seenRows[i(b1)][i(b2)][i(b3)];
2193            int expectedCount = 0;
2194            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2195                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2196              expectedCount = 1;
2197            }
2198            if (count != expectedCount) {
2199              String row = new String(new byte[] {b1,b2,b3}, StandardCharsets.UTF_8);
2200              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
2201                  "instead of " + expectedCount);
2202            }
2203          }
2204        }
2205      }
2206    }
2207  }
2208
2209  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2210    return loadRegion(r, f, false);
2211  }
2212
2213  public int loadRegion(final Region r, final byte[] f) throws IOException {
2214    return loadRegion((HRegion)r, f);
2215  }
2216
2217  /**
2218   * Load region with rows from 'aaa' to 'zzz'.
2219   * @param r Region
2220   * @param f Family
2221   * @param flush flush the cache if true
2222   * @return Count of rows loaded.
2223   * @throws IOException
2224   */
2225  public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2226  throws IOException {
2227    byte[] k = new byte[3];
2228    int rowCount = 0;
2229    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2230      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2231        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2232          k[0] = b1;
2233          k[1] = b2;
2234          k[2] = b3;
2235          Put put = new Put(k);
2236          put.setDurability(Durability.SKIP_WAL);
2237          put.addColumn(f, null, k);
2238          if (r.getWAL() == null) {
2239            put.setDurability(Durability.SKIP_WAL);
2240          }
2241          int preRowCount = rowCount;
2242          int pause = 10;
2243          int maxPause = 1000;
2244          while (rowCount == preRowCount) {
2245            try {
2246              r.put(put);
2247              rowCount++;
2248            } catch (RegionTooBusyException e) {
2249              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2250              Threads.sleep(pause);
2251            }
2252          }
2253        }
2254      }
2255      if (flush) {
2256        r.flush(true);
2257      }
2258    }
2259    return rowCount;
2260  }
2261
2262  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2263      throws IOException {
2264    for (int i = startRow; i < endRow; i++) {
2265      byte[] data = Bytes.toBytes(String.valueOf(i));
2266      Put put = new Put(data);
2267      put.addColumn(f, null, data);
2268      t.put(put);
2269    }
2270  }
2271
2272  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2273      throws IOException {
2274    Random r = new Random();
2275    byte[] row = new byte[rowSize];
2276    for (int i = 0; i < totalRows; i++) {
2277      r.nextBytes(row);
2278      Put put = new Put(row);
2279      put.addColumn(f, new byte[]{0}, new byte[]{0});
2280      t.put(put);
2281    }
2282  }
2283
2284  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2285      int replicaId)
2286      throws IOException {
2287    for (int i = startRow; i < endRow; i++) {
2288      String failMsg = "Failed verification of row :" + i;
2289      byte[] data = Bytes.toBytes(String.valueOf(i));
2290      Get get = new Get(data);
2291      get.setReplicaId(replicaId);
2292      get.setConsistency(Consistency.TIMELINE);
2293      Result result = table.get(get);
2294      assertTrue(failMsg, result.containsColumn(f, null));
2295      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2296      Cell cell = result.getColumnLatestCell(f, null);
2297      assertTrue(failMsg,
2298        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2299          cell.getValueLength()));
2300    }
2301  }
2302
2303  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2304      throws IOException {
2305    verifyNumericRows((HRegion)region, f, startRow, endRow);
2306  }
2307
2308  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2309      throws IOException {
2310    verifyNumericRows(region, f, startRow, endRow, true);
2311  }
2312
2313  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2314      final boolean present) throws IOException {
2315    verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2316  }
2317
2318  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2319      final boolean present) throws IOException {
2320    for (int i = startRow; i < endRow; i++) {
2321      String failMsg = "Failed verification of row :" + i;
2322      byte[] data = Bytes.toBytes(String.valueOf(i));
2323      Result result = region.get(new Get(data));
2324
2325      boolean hasResult = result != null && !result.isEmpty();
2326      assertEquals(failMsg + result, present, hasResult);
2327      if (!present) continue;
2328
2329      assertTrue(failMsg, result.containsColumn(f, null));
2330      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2331      Cell cell = result.getColumnLatestCell(f, null);
2332      assertTrue(failMsg,
2333        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2334          cell.getValueLength()));
2335    }
2336  }
2337
2338  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2339      throws IOException {
2340    for (int i = startRow; i < endRow; i++) {
2341      byte[] data = Bytes.toBytes(String.valueOf(i));
2342      Delete delete = new Delete(data);
2343      delete.addFamily(f);
2344      t.delete(delete);
2345    }
2346  }
2347
2348  /**
2349   * Return the number of rows in the given table.
2350   * @param table to count rows
2351   * @return count of rows
2352   */
2353  public static int countRows(final Table table) throws IOException {
2354    return countRows(table, new Scan());
2355  }
2356
2357  public static int countRows(final Table table, final Scan scan) throws IOException {
2358    try (ResultScanner results = table.getScanner(scan)) {
2359      int count = 0;
2360      while (results.next() != null) {
2361        count++;
2362      }
2363      return count;
2364    }
2365  }
2366
2367  public int countRows(final Table table, final byte[]... families) throws IOException {
2368    Scan scan = new Scan();
2369    for (byte[] family: families) {
2370      scan.addFamily(family);
2371    }
2372    return countRows(table, scan);
2373  }
2374
2375  /**
2376   * Return the number of rows in the given table.
2377   */
2378  public int countRows(final TableName tableName) throws IOException {
2379    Table table = getConnection().getTable(tableName);
2380    try {
2381      return countRows(table);
2382    } finally {
2383      table.close();
2384    }
2385  }
2386
2387  public int countRows(final Region region) throws IOException {
2388    return countRows(region, new Scan());
2389  }
2390
2391  public int countRows(final Region region, final Scan scan) throws IOException {
2392    InternalScanner scanner = region.getScanner(scan);
2393    try {
2394      return countRows(scanner);
2395    } finally {
2396      scanner.close();
2397    }
2398  }
2399
2400  public int countRows(final InternalScanner scanner) throws IOException {
2401    int scannedCount = 0;
2402    List<Cell> results = new ArrayList<>();
2403    boolean hasMore = true;
2404    while (hasMore) {
2405      hasMore = scanner.next(results);
2406      scannedCount += results.size();
2407      results.clear();
2408    }
2409    return scannedCount;
2410  }
2411
2412  /**
2413   * Return an md5 digest of the entire contents of a table.
2414   */
2415  public String checksumRows(final Table table) throws Exception {
2416
2417    Scan scan = new Scan();
2418    ResultScanner results = table.getScanner(scan);
2419    MessageDigest digest = MessageDigest.getInstance("MD5");
2420    for (Result res : results) {
2421      digest.update(res.getRow());
2422    }
2423    results.close();
2424    return digest.toString();
2425  }
2426
2427  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2428  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2429  static {
2430    int i = 0;
2431    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2432      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2433        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2434          ROWS[i][0] = b1;
2435          ROWS[i][1] = b2;
2436          ROWS[i][2] = b3;
2437          i++;
2438        }
2439      }
2440    }
2441  }
2442
2443  public static final byte[][] KEYS = {
2444    HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2445    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2446    Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2447    Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2448    Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2449    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2450    Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2451    Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2452    Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2453  };
2454
2455  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2456      Bytes.toBytes("bbb"),
2457      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2458      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2459      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2460      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2461      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2462      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2463      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2464      Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2465  };
2466
2467  /**
2468   * Create rows in hbase:meta for regions of the specified table with the specified
2469   * start keys.  The first startKey should be a 0 length byte array if you
2470   * want to form a proper range of regions.
2471   * @param conf
2472   * @param htd
2473   * @param startKeys
2474   * @return list of region info for regions added to meta
2475   * @throws IOException
2476   */
2477  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2478      final TableDescriptor htd, byte [][] startKeys)
2479  throws IOException {
2480    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2481    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2482    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2483    MetaTableAccessor
2484        .updateTableState(getConnection(), htd.getTableName(), TableState.State.ENABLED);
2485    // add custom ones
2486    for (int i = 0; i < startKeys.length; i++) {
2487      int j = (i + 1) % startKeys.length;
2488      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName())
2489          .setStartKey(startKeys[i])
2490          .setEndKey(startKeys[j])
2491          .build();
2492      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2493      newRegions.add(hri);
2494    }
2495
2496    meta.close();
2497    return newRegions;
2498  }
2499
2500  /**
2501   * Create an unmanaged WAL. Be sure to close it when you're through.
2502   */
2503  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2504      throws IOException {
2505    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2506    // unless I pass along via the conf.
2507    Configuration confForWAL = new Configuration(conf);
2508    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2509    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2510  }
2511
2512
2513  /**
2514   * Create a region with it's own WAL. Be sure to call
2515   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2516   */
2517  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2518      final Configuration conf, final TableDescriptor htd) throws IOException {
2519    return createRegionAndWAL(info, rootDir, conf, htd, true);
2520  }
2521
2522  /**
2523   * Create a region with it's own WAL. Be sure to call
2524   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2525   */
2526  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2527      final Configuration conf, final TableDescriptor htd, BlockCache blockCache)
2528      throws IOException {
2529    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2530    region.setBlockCache(blockCache);
2531    region.initialize();
2532    return region;
2533  }
2534  /**
2535   * Create a region with it's own WAL. Be sure to call
2536   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2537   */
2538  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2539      final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2540      throws IOException {
2541    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2542    region.setMobFileCache(mobFileCache);
2543    region.initialize();
2544    return region;
2545  }
2546
2547  /**
2548   * Create a region with it's own WAL. Be sure to call
2549   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2550   */
2551  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2552      final Configuration conf, final TableDescriptor htd, boolean initialize)
2553      throws IOException {
2554    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
2555      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2556    WAL wal = createWal(conf, rootDir, info);
2557    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2558  }
2559
2560  /**
2561   * Returns all rows from the hbase:meta table.
2562   *
2563   * @throws IOException When reading the rows fails.
2564   */
2565  public List<byte[]> getMetaTableRows() throws IOException {
2566    // TODO: Redo using MetaTableAccessor class
2567    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2568    List<byte[]> rows = new ArrayList<>();
2569    ResultScanner s = t.getScanner(new Scan());
2570    for (Result result : s) {
2571      LOG.info("getMetaTableRows: row -> " +
2572        Bytes.toStringBinary(result.getRow()));
2573      rows.add(result.getRow());
2574    }
2575    s.close();
2576    t.close();
2577    return rows;
2578  }
2579
2580  /**
2581   * Returns all rows from the hbase:meta table for a given user table
2582   *
2583   * @throws IOException When reading the rows fails.
2584   */
2585  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2586    // TODO: Redo using MetaTableAccessor.
2587    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2588    List<byte[]> rows = new ArrayList<>();
2589    ResultScanner s = t.getScanner(new Scan());
2590    for (Result result : s) {
2591      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2592      if (info == null) {
2593        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2594        // TODO figure out what to do for this new hosed case.
2595        continue;
2596      }
2597
2598      if (info.getTable().equals(tableName)) {
2599        LOG.info("getMetaTableRows: row -> " +
2600            Bytes.toStringBinary(result.getRow()) + info);
2601        rows.add(result.getRow());
2602      }
2603    }
2604    s.close();
2605    t.close();
2606    return rows;
2607  }
2608
2609  /**
2610   * Returns all regions of the specified table
2611   *
2612   * @param tableName the table name
2613   * @return all regions of the specified table
2614   * @throws IOException when getting the regions fails.
2615   */
2616  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2617    try (Admin admin = getConnection().getAdmin()) {
2618      return admin.getRegions(tableName);
2619    }
2620  }
2621
2622  /*
2623   * Find any other region server which is different from the one identified by parameter
2624   * @param rs
2625   * @return another region server
2626   */
2627  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2628    for (JVMClusterUtil.RegionServerThread rst :
2629      getMiniHBaseCluster().getRegionServerThreads()) {
2630      if (!(rst.getRegionServer() == rs)) {
2631        return rst.getRegionServer();
2632      }
2633    }
2634    return null;
2635  }
2636
2637  /**
2638   * Tool to get the reference to the region server object that holds the
2639   * region of the specified user table.
2640   * @param tableName user table to lookup in hbase:meta
2641   * @return region server that holds it, null if the row doesn't exist
2642   * @throws IOException
2643   * @throws InterruptedException
2644   */
2645  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2646      throws IOException, InterruptedException {
2647    List<RegionInfo> regions = getRegions(tableName);
2648    if (regions == null || regions.isEmpty()) {
2649      return null;
2650    }
2651    LOG.debug("Found " + regions.size() + " regions for table " +
2652        tableName);
2653
2654    byte[] firstRegionName = regions.stream()
2655        .filter(r -> !r.isOffline())
2656        .map(RegionInfo::getRegionName)
2657        .findFirst()
2658        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2659
2660    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2661    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2662      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2663    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2664      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2665    RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2666    while(retrier.shouldRetry()) {
2667      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2668      if (index != -1) {
2669        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2670      }
2671      // Came back -1.  Region may not be online yet.  Sleep a while.
2672      retrier.sleepUntilNextRetry();
2673    }
2674    return null;
2675  }
2676
2677  /**
2678   * Starts a <code>MiniMRCluster</code> with a default number of
2679   * <code>TaskTracker</code>'s.
2680   *
2681   * @throws IOException When starting the cluster fails.
2682   */
2683  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2684    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2685    conf.setIfUnset(
2686        "yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2687        "99.0");
2688    startMiniMapReduceCluster(2);
2689    return mrCluster;
2690  }
2691
2692  /**
2693   * Tasktracker has a bug where changing the hadoop.log.dir system property
2694   * will not change its internal static LOG_DIR variable.
2695   */
2696  private void forceChangeTaskLogDir() {
2697    Field logDirField;
2698    try {
2699      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2700      logDirField.setAccessible(true);
2701
2702      Field modifiersField = Field.class.getDeclaredField("modifiers");
2703      modifiersField.setAccessible(true);
2704      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2705
2706      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2707    } catch (SecurityException e) {
2708      throw new RuntimeException(e);
2709    } catch (NoSuchFieldException e) {
2710      throw new RuntimeException(e);
2711    } catch (IllegalArgumentException e) {
2712      throw new RuntimeException(e);
2713    } catch (IllegalAccessException e) {
2714      throw new RuntimeException(e);
2715    }
2716  }
2717
2718  /**
2719   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2720   * filesystem.
2721   * @param servers  The number of <code>TaskTracker</code>'s to start.
2722   * @throws IOException When starting the cluster fails.
2723   */
2724  private void startMiniMapReduceCluster(final int servers) throws IOException {
2725    if (mrCluster != null) {
2726      throw new IllegalStateException("MiniMRCluster is already running");
2727    }
2728    LOG.info("Starting mini mapreduce cluster...");
2729    setupClusterTestDir();
2730    createDirsAndSetProperties();
2731
2732    forceChangeTaskLogDir();
2733
2734    //// hadoop2 specific settings
2735    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2736    // we up the VM usable so that processes don't get killed.
2737    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2738
2739    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2740    // this avoids the problem by disabling speculative task execution in tests.
2741    conf.setBoolean("mapreduce.map.speculative", false);
2742    conf.setBoolean("mapreduce.reduce.speculative", false);
2743    ////
2744
2745    // Allow the user to override FS URI for this map-reduce cluster to use.
2746    mrCluster = new MiniMRCluster(servers,
2747      FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2748      null, null, new JobConf(this.conf));
2749    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2750    if (jobConf == null) {
2751      jobConf = mrCluster.createJobConf();
2752    }
2753
2754    jobConf.set("mapreduce.cluster.local.dir",
2755      conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2756    LOG.info("Mini mapreduce cluster started");
2757
2758    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2759    // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2760    // necessary config properties here.  YARN-129 required adding a few properties.
2761    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2762    // this for mrv2 support; mr1 ignores this
2763    conf.set("mapreduce.framework.name", "yarn");
2764    conf.setBoolean("yarn.is.minicluster", true);
2765    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2766    if (rmAddress != null) {
2767      conf.set("yarn.resourcemanager.address", rmAddress);
2768    }
2769    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2770    if (historyAddress != null) {
2771      conf.set("mapreduce.jobhistory.address", historyAddress);
2772    }
2773    String schedulerAddress =
2774      jobConf.get("yarn.resourcemanager.scheduler.address");
2775    if (schedulerAddress != null) {
2776      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2777    }
2778    String mrJobHistoryWebappAddress =
2779      jobConf.get("mapreduce.jobhistory.webapp.address");
2780    if (mrJobHistoryWebappAddress != null) {
2781      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2782    }
2783    String yarnRMWebappAddress =
2784      jobConf.get("yarn.resourcemanager.webapp.address");
2785    if (yarnRMWebappAddress != null) {
2786      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2787    }
2788  }
2789
2790  /**
2791   * Stops the previously started <code>MiniMRCluster</code>.
2792   */
2793  public void shutdownMiniMapReduceCluster() {
2794    if (mrCluster != null) {
2795      LOG.info("Stopping mini mapreduce cluster...");
2796      mrCluster.shutdown();
2797      mrCluster = null;
2798      LOG.info("Mini mapreduce cluster stopped");
2799    }
2800    // Restore configuration to point to local jobtracker
2801    conf.set("mapreduce.jobtracker.address", "local");
2802  }
2803
2804  /**
2805   * Create a stubbed out RegionServerService, mainly for getting FS.
2806   */
2807  public RegionServerServices createMockRegionServerService() throws IOException {
2808    return createMockRegionServerService((ServerName)null);
2809  }
2810
2811  /**
2812   * Create a stubbed out RegionServerService, mainly for getting FS.
2813   * This version is used by TestTokenAuthentication
2814   */
2815  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
2816      IOException {
2817    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2818    rss.setFileSystem(getTestFileSystem());
2819    rss.setRpcServer(rpc);
2820    return rss;
2821  }
2822
2823  /**
2824   * Create a stubbed out RegionServerService, mainly for getting FS.
2825   * This version is used by TestOpenRegionHandler
2826   */
2827  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2828    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2829    rss.setFileSystem(getTestFileSystem());
2830    return rss;
2831  }
2832
2833  /**
2834   * Switches the logger for the given class to DEBUG level.
2835   * @param clazz The class for which to switch to debug logging.
2836   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2837   *             HBase only uses log4j. You should do this by your own as it you know which log
2838   *             framework you are using then set the log level to debug is very easy.
2839   */
2840  @Deprecated
2841  public void enableDebug(Class<?> clazz) {
2842    Log4jUtils.enableDebug(clazz);
2843  }
2844
2845  /**
2846   * Expire the Master's session
2847   * @throws Exception
2848   */
2849  public void expireMasterSession() throws Exception {
2850    HMaster master = getMiniHBaseCluster().getMaster();
2851    expireSession(master.getZooKeeper(), false);
2852  }
2853
2854  /**
2855   * Expire a region server's session
2856   * @param index which RS
2857   */
2858  public void expireRegionServerSession(int index) throws Exception {
2859    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2860    expireSession(rs.getZooKeeper(), false);
2861    decrementMinRegionServerCount();
2862  }
2863
2864  private void decrementMinRegionServerCount() {
2865    // decrement the count for this.conf, for newly spwaned master
2866    // this.hbaseCluster shares this configuration too
2867    decrementMinRegionServerCount(getConfiguration());
2868
2869    // each master thread keeps a copy of configuration
2870    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2871      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2872    }
2873  }
2874
2875  private void decrementMinRegionServerCount(Configuration conf) {
2876    int currentCount = conf.getInt(
2877        ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2878    if (currentCount != -1) {
2879      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2880          Math.max(currentCount - 1, 1));
2881    }
2882  }
2883
2884  public void expireSession(ZKWatcher nodeZK) throws Exception {
2885   expireSession(nodeZK, false);
2886  }
2887
2888  /**
2889   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2890   * http://hbase.apache.org/book.html#trouble.zookeeper
2891   * There are issues when doing this:
2892   * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2893   * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2894   *
2895   * @param nodeZK - the ZK watcher to expire
2896   * @param checkStatus - true to check if we can create a Table with the
2897   *                    current configuration.
2898   */
2899  public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
2900    throws Exception {
2901    Configuration c = new Configuration(this.conf);
2902    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2903    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2904    byte[] password = zk.getSessionPasswd();
2905    long sessionID = zk.getSessionId();
2906
2907    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2908    //  so we create a first watcher to be sure that the
2909    //  event was sent. We expect that if our watcher receives the event
2910    //  other watchers on the same machine will get is as well.
2911    // When we ask to close the connection, ZK does not close it before
2912    //  we receive all the events, so don't have to capture the event, just
2913    //  closing the connection should be enough.
2914    ZooKeeper monitor = new ZooKeeper(quorumServers,
2915      1000, new org.apache.zookeeper.Watcher(){
2916      @Override
2917      public void process(WatchedEvent watchedEvent) {
2918        LOG.info("Monitor ZKW received event="+watchedEvent);
2919      }
2920    } , sessionID, password);
2921
2922    // Making it expire
2923    ZooKeeper newZK = new ZooKeeper(quorumServers,
2924        1000, EmptyWatcher.instance, sessionID, password);
2925
2926    //ensure that we have connection to the server before closing down, otherwise
2927    //the close session event will be eaten out before we start CONNECTING state
2928    long start = System.currentTimeMillis();
2929    while (newZK.getState() != States.CONNECTED
2930         && System.currentTimeMillis() - start < 1000) {
2931       Thread.sleep(1);
2932    }
2933    newZK.close();
2934    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2935
2936    // Now closing & waiting to be sure that the clients get it.
2937    monitor.close();
2938
2939    if (checkStatus) {
2940      getConnection().getTable(TableName.META_TABLE_NAME).close();
2941    }
2942  }
2943
2944  /**
2945   * Get the Mini HBase cluster.
2946   *
2947   * @return hbase cluster
2948   * @see #getHBaseClusterInterface()
2949   */
2950  public MiniHBaseCluster getHBaseCluster() {
2951    return getMiniHBaseCluster();
2952  }
2953
2954  /**
2955   * Returns the HBaseCluster instance.
2956   * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2957   * tests referring this should not assume that the cluster is a mini cluster or a
2958   * distributed one. If the test only works on a mini cluster, then specific
2959   * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2960   * need to type-cast.
2961   */
2962  public HBaseCluster getHBaseClusterInterface() {
2963    //implementation note: we should rename this method as #getHBaseCluster(),
2964    //but this would require refactoring 90+ calls.
2965    return hbaseCluster;
2966  }
2967
2968  /**
2969   * Resets the connections so that the next time getConnection() is called, a new connection is
2970   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2971   * the connection is not valid anymore.
2972   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2973   *   written, not all start() stop() calls go through this class. Most tests directly operate on
2974   *   the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
2975   *   maintain the connection state automatically. Cleaning this is a much bigger refactor.
2976   */
2977  public void invalidateConnection() throws IOException {
2978    closeConnection();
2979    // Update the master addresses if they changed.
2980    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2981    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2982    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2983        masterConfigBefore, masterConfAfter);
2984    conf.set(HConstants.MASTER_ADDRS_KEY,
2985        getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2986  }
2987
2988  /**
2989   * Get a shared Connection to the cluster.
2990   * this method is thread safe.
2991   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2992   */
2993  public Connection getConnection() throws IOException {
2994    return getAsyncConnection().toConnection();
2995  }
2996
2997  /**
2998   * Get a assigned Connection to the cluster.
2999   * this method is thread safe.
3000   * @param user assigned user
3001   * @return A Connection with assigned user.
3002   */
3003  public Connection getConnection(User user) throws IOException {
3004    return getAsyncConnection(user).toConnection();
3005  }
3006
3007  /**
3008   * Get a shared AsyncClusterConnection to the cluster.
3009   * this method is thread safe.
3010   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster.
3011   */
3012  public AsyncClusterConnection getAsyncConnection() throws IOException {
3013    try {
3014      return asyncConnection.updateAndGet(connection -> {
3015        if (connection == null) {
3016          try {
3017            User user = UserProvider.instantiate(conf).getCurrent();
3018            connection = getAsyncConnection(user);
3019          } catch(IOException ioe) {
3020            throw new UncheckedIOException("Failed to create connection", ioe);
3021          }
3022        }
3023        return connection;
3024      });
3025    } catch (UncheckedIOException exception) {
3026      throw exception.getCause();
3027    }
3028  }
3029
3030  /**
3031   * Get a assigned AsyncClusterConnection to the cluster.
3032   * this method is thread safe.
3033   * @param user assigned user
3034   * @return An AsyncClusterConnection with assigned user.
3035   */
3036  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
3037    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
3038  }
3039
3040  public void closeConnection() throws IOException {
3041    if (hbaseAdmin != null) {
3042      Closeables.close(hbaseAdmin, true);
3043      hbaseAdmin = null;
3044    }
3045    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
3046    if (asyncConnection != null) {
3047      Closeables.close(asyncConnection, true);
3048    }
3049  }
3050
3051  /**
3052   * Returns an Admin instance which is shared between HBaseTestingUtility instance users.
3053   * Closing it has no effect, it will be closed automatically when the cluster shutdowns
3054   */
3055  public synchronized Admin getAdmin() throws IOException {
3056    if (hbaseAdmin == null){
3057      this.hbaseAdmin = getConnection().getAdmin();
3058    }
3059    return hbaseAdmin;
3060  }
3061
3062  private Admin hbaseAdmin = null;
3063
3064  /**
3065   * Returns an {@link Hbck} instance. Needs be closed when done.
3066   */
3067  public Hbck getHbck() throws IOException {
3068    return getConnection().getHbck();
3069  }
3070
3071  /**
3072   * Unassign the named region.
3073   *
3074   * @param regionName  The region to unassign.
3075   */
3076  public void unassignRegion(String regionName) throws IOException {
3077    unassignRegion(Bytes.toBytes(regionName));
3078  }
3079
3080  /**
3081   * Unassign the named region.
3082   *
3083   * @param regionName  The region to unassign.
3084   */
3085  public void unassignRegion(byte[] regionName) throws IOException {
3086    getAdmin().unassign(regionName, true);
3087  }
3088
3089  /**
3090   * Closes the region containing the given row.
3091   *
3092   * @param row  The row to find the containing region.
3093   * @param table  The table to find the region.
3094   */
3095  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
3096    unassignRegionByRow(Bytes.toBytes(row), table);
3097  }
3098
3099  /**
3100   * Closes the region containing the given row.
3101   *
3102   * @param row  The row to find the containing region.
3103   * @param table  The table to find the region.
3104   * @throws IOException
3105   */
3106  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
3107    HRegionLocation hrl = table.getRegionLocation(row);
3108    unassignRegion(hrl.getRegion().getRegionName());
3109  }
3110
3111  /**
3112   * Retrieves a splittable region randomly from tableName
3113   * @param tableName name of table
3114   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3115   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3116   */
3117  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3118    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3119    int regCount = regions.size();
3120    Set<Integer> attempted = new HashSet<>();
3121    int idx;
3122    int attempts = 0;
3123    do {
3124      regions = getHBaseCluster().getRegions(tableName);
3125      if (regCount != regions.size()) {
3126        // if there was region movement, clear attempted Set
3127        attempted.clear();
3128      }
3129      regCount = regions.size();
3130      // There are chances that before we get the region for the table from an RS the region may
3131      // be going for CLOSE.  This may be because online schema change is enabled
3132      if (regCount > 0) {
3133        idx = random.nextInt(regCount);
3134        // if we have just tried this region, there is no need to try again
3135        if (attempted.contains(idx)) {
3136          continue;
3137        }
3138        HRegion region = regions.get(idx);
3139        if (region.checkSplit().isPresent()) {
3140          return region;
3141        }
3142        attempted.add(idx);
3143      }
3144      attempts++;
3145    } while (maxAttempts == -1 || attempts < maxAttempts);
3146    return null;
3147  }
3148
3149  public MiniDFSCluster getDFSCluster() {
3150    return dfsCluster;
3151  }
3152
3153  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3154    setDFSCluster(cluster, true);
3155  }
3156
3157  /**
3158   * Set the MiniDFSCluster
3159   * @param cluster cluster to use
3160   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3161   * it is set.
3162   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3163   * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3164   */
3165  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3166      throws IllegalStateException, IOException {
3167    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3168      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3169    }
3170    this.dfsCluster = cluster;
3171    this.setFs();
3172  }
3173
3174  public FileSystem getTestFileSystem() throws IOException {
3175    return HFileSystem.get(conf);
3176  }
3177
3178  /**
3179   * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3180   * (30 seconds).
3181   * @param table Table to wait on.
3182   * @throws InterruptedException
3183   * @throws IOException
3184   */
3185  public void waitTableAvailable(TableName table)
3186      throws InterruptedException, IOException {
3187    waitTableAvailable(table.getName(), 30000);
3188  }
3189
3190  public void waitTableAvailable(TableName table, long timeoutMillis)
3191      throws InterruptedException, IOException {
3192    waitFor(timeoutMillis, predicateTableAvailable(table));
3193  }
3194
3195  /**
3196   * Wait until all regions in a table have been assigned
3197   * @param table Table to wait on.
3198   * @param timeoutMillis Timeout.
3199   */
3200  public void waitTableAvailable(byte[] table, long timeoutMillis)
3201      throws InterruptedException, IOException {
3202    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3203  }
3204
3205  public String explainTableAvailability(TableName tableName) throws IOException {
3206    String msg = explainTableState(tableName, TableState.State.ENABLED) + ", ";
3207    if (getHBaseCluster().getMaster().isAlive()) {
3208      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3209        .getRegionStates().getRegionAssignments();
3210      final List<Pair<RegionInfo, ServerName>> metaLocations =
3211        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3212      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3213        RegionInfo hri = metaLocation.getFirst();
3214        ServerName sn = metaLocation.getSecond();
3215        if (!assignments.containsKey(hri)) {
3216          msg += ", region " + hri + " not assigned, but found in meta, it expected to be on " + sn;
3217
3218        } else if (sn == null) {
3219          msg += ",  region " + hri + " assigned,  but has no server in meta";
3220        } else if (!sn.equals(assignments.get(hri))) {
3221          msg += ",  region " + hri + " assigned,  but has different servers in meta and AM ( " +
3222            sn + " <> " + assignments.get(hri);
3223        }
3224      }
3225    }
3226    return msg;
3227  }
3228
3229  public String explainTableState(final TableName table, TableState.State state)
3230      throws IOException {
3231    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3232    if (tableState == null) {
3233      return "TableState in META: No table state in META for table " + table +
3234        " last state in meta (including deleted is " + findLastTableState(table) + ")";
3235    } else if (!tableState.inStates(state)) {
3236      return "TableState in META: Not " + state + " state, but " + tableState;
3237    } else {
3238      return "TableState in META: OK";
3239    }
3240  }
3241
3242  @Nullable
3243  public TableState findLastTableState(final TableName table) throws IOException {
3244    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3245    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3246      @Override
3247      public boolean visit(Result r) throws IOException {
3248        if (!Arrays.equals(r.getRow(), table.getName())) {
3249          return false;
3250        }
3251        TableState state = CatalogFamilyFormat.getTableState(r);
3252        if (state != null) {
3253          lastTableState.set(state);
3254        }
3255        return true;
3256      }
3257    };
3258    MetaTableAccessor.scanMeta(getConnection(), null, null,
3259      ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
3260    return lastTableState.get();
3261  }
3262
3263  /**
3264   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3265   * regions have been all assigned.  Will timeout after default period (30 seconds)
3266   * Tolerates nonexistent table.
3267   * @param table the table to wait on.
3268   * @throws InterruptedException if interrupted while waiting
3269   * @throws IOException if an IO problem is encountered
3270   */
3271  public void waitTableEnabled(TableName table)
3272      throws InterruptedException, IOException {
3273    waitTableEnabled(table, 30000);
3274  }
3275
3276  /**
3277   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3278   * regions have been all assigned.
3279   * @see #waitTableEnabled(TableName, long)
3280   * @param table Table to wait on.
3281   * @param timeoutMillis Time to wait on it being marked enabled.
3282   * @throws InterruptedException
3283   * @throws IOException
3284   */
3285  public void waitTableEnabled(byte[] table, long timeoutMillis)
3286  throws InterruptedException, IOException {
3287    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3288  }
3289
3290  public void waitTableEnabled(TableName table, long timeoutMillis)
3291  throws IOException {
3292    waitFor(timeoutMillis, predicateTableEnabled(table));
3293  }
3294
3295  /**
3296   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3297   * Will timeout after default period (30 seconds)
3298   * @param table Table to wait on.
3299   * @throws InterruptedException
3300   * @throws IOException
3301   */
3302  public void waitTableDisabled(byte[] table)
3303          throws InterruptedException, IOException {
3304    waitTableDisabled(table, 30000);
3305  }
3306
3307  public void waitTableDisabled(TableName table, long millisTimeout)
3308          throws InterruptedException, IOException {
3309    waitFor(millisTimeout, predicateTableDisabled(table));
3310  }
3311
3312  /**
3313   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3314   * @param table Table to wait on.
3315   * @param timeoutMillis Time to wait on it being marked disabled.
3316   * @throws InterruptedException
3317   * @throws IOException
3318   */
3319  public void waitTableDisabled(byte[] table, long timeoutMillis)
3320          throws InterruptedException, IOException {
3321    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3322  }
3323
3324  /**
3325   * Make sure that at least the specified number of region servers
3326   * are running
3327   * @param num minimum number of region servers that should be running
3328   * @return true if we started some servers
3329   * @throws IOException
3330   */
3331  public boolean ensureSomeRegionServersAvailable(final int num)
3332      throws IOException {
3333    boolean startedServer = false;
3334    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3335    for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3336      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3337      startedServer = true;
3338    }
3339
3340    return startedServer;
3341  }
3342
3343
3344  /**
3345   * Make sure that at least the specified number of region servers
3346   * are running. We don't count the ones that are currently stopping or are
3347   * stopped.
3348   * @param num minimum number of region servers that should be running
3349   * @return true if we started some servers
3350   * @throws IOException
3351   */
3352  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3353    throws IOException {
3354    boolean startedServer = ensureSomeRegionServersAvailable(num);
3355
3356    int nonStoppedServers = 0;
3357    for (JVMClusterUtil.RegionServerThread rst :
3358      getMiniHBaseCluster().getRegionServerThreads()) {
3359
3360      HRegionServer hrs = rst.getRegionServer();
3361      if (hrs.isStopping() || hrs.isStopped()) {
3362        LOG.info("A region server is stopped or stopping:"+hrs);
3363      } else {
3364        nonStoppedServers++;
3365      }
3366    }
3367    for (int i=nonStoppedServers; i<num; ++i) {
3368      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3369      startedServer = true;
3370    }
3371    return startedServer;
3372  }
3373
3374
3375  /**
3376   * This method clones the passed <code>c</code> configuration setting a new
3377   * user into the clone.  Use it getting new instances of FileSystem.  Only
3378   * works for DistributedFileSystem w/o Kerberos.
3379   * @param c Initial configuration
3380   * @param differentiatingSuffix Suffix to differentiate this user from others.
3381   * @return A new configuration instance with a different user set into it.
3382   * @throws IOException
3383   */
3384  public static User getDifferentUser(final Configuration c,
3385    final String differentiatingSuffix)
3386  throws IOException {
3387    FileSystem currentfs = FileSystem.get(c);
3388    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3389      return User.getCurrent();
3390    }
3391    // Else distributed filesystem.  Make a new instance per daemon.  Below
3392    // code is taken from the AppendTestUtil over in hdfs.
3393    String username = User.getCurrent().getName() +
3394      differentiatingSuffix;
3395    User user = User.createUserForTesting(c, username,
3396        new String[]{"supergroup"});
3397    return user;
3398  }
3399
3400  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3401      throws IOException {
3402    NavigableSet<String> online = new TreeSet<>();
3403    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3404      try {
3405        for (RegionInfo region :
3406            ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3407          online.add(region.getRegionNameAsString());
3408        }
3409      } catch (RegionServerStoppedException e) {
3410        // That's fine.
3411      }
3412    }
3413    for (MasterThread mt : cluster.getLiveMasterThreads()) {
3414      try {
3415        for (RegionInfo region :
3416            ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3417          online.add(region.getRegionNameAsString());
3418        }
3419      } catch (RegionServerStoppedException e) {
3420        // That's fine.
3421      } catch (ServerNotRunningYetException e) {
3422        // That's fine.
3423      }
3424    }
3425    return online;
3426  }
3427
3428  /**
3429   * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3430   * makes tests linger.  Here is the exception you'll see:
3431   * <pre>
3432   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3433   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3434   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3435   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3436   * </pre>
3437   * @param stream A DFSClient.DFSOutputStream.
3438   * @param max
3439   * @throws NoSuchFieldException
3440   * @throws SecurityException
3441   * @throws IllegalAccessException
3442   * @throws IllegalArgumentException
3443   */
3444  public static void setMaxRecoveryErrorCount(final OutputStream stream,
3445      final int max) {
3446    try {
3447      Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3448      for (Class<?> clazz: clazzes) {
3449        String className = clazz.getSimpleName();
3450        if (className.equals("DFSOutputStream")) {
3451          if (clazz.isInstance(stream)) {
3452            Field maxRecoveryErrorCountField =
3453              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3454            maxRecoveryErrorCountField.setAccessible(true);
3455            maxRecoveryErrorCountField.setInt(stream, max);
3456            break;
3457          }
3458        }
3459      }
3460    } catch (Exception e) {
3461      LOG.info("Could not set max recovery field", e);
3462    }
3463  }
3464
3465  /**
3466   * Uses directly the assignment manager to assign the region. and waits until the specified region
3467   * has completed assignment.
3468   * @return true if the region is assigned false otherwise.
3469   */
3470  public boolean assignRegion(final RegionInfo regionInfo)
3471      throws IOException, InterruptedException {
3472    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3473    am.assign(regionInfo);
3474    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3475  }
3476
3477  /**
3478   * Move region to destination server and wait till region is completely moved and online
3479   *
3480   * @param destRegion region to move
3481   * @param destServer destination server of the region
3482   * @throws InterruptedException
3483   * @throws IOException
3484   */
3485  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3486      throws InterruptedException, IOException {
3487    HMaster master = getMiniHBaseCluster().getMaster();
3488    // TODO: Here we start the move. The move can take a while.
3489    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3490    while (true) {
3491      ServerName serverName = master.getAssignmentManager().getRegionStates()
3492          .getRegionServerOfRegion(destRegion);
3493      if (serverName != null && serverName.equals(destServer)) {
3494        assertRegionOnServer(destRegion, serverName, 2000);
3495        break;
3496      }
3497      Thread.sleep(10);
3498    }
3499  }
3500
3501  /**
3502   * Wait until all regions for a table in hbase:meta have a non-empty
3503   * info:server, up to a configuable timeout value (default is 60 seconds)
3504   * This means all regions have been deployed,
3505   * master has been informed and updated hbase:meta with the regions deployed
3506   * server.
3507   * @param tableName the table name
3508   * @throws IOException
3509   */
3510  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3511    waitUntilAllRegionsAssigned(tableName,
3512      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3513  }
3514
3515  /**
3516   * Waith until all system table's regions get assigned
3517   * @throws IOException
3518   */
3519  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3520    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3521  }
3522
3523  /**
3524   * Wait until all regions for a table in hbase:meta have a non-empty
3525   * info:server, or until timeout.  This means all regions have been deployed,
3526   * master has been informed and updated hbase:meta with the regions deployed
3527   * server.
3528   * @param tableName the table name
3529   * @param timeout timeout, in milliseconds
3530   * @throws IOException
3531   */
3532  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3533      throws IOException {
3534    if (!TableName.isMetaTableName(tableName)) {
3535      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3536        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " +
3537            timeout + "ms");
3538        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3539          @Override
3540          public String explainFailure() throws IOException {
3541            return explainTableAvailability(tableName);
3542          }
3543
3544          @Override
3545          public boolean evaluate() throws IOException {
3546            Scan scan = new Scan();
3547            scan.addFamily(HConstants.CATALOG_FAMILY);
3548            boolean tableFound = false;
3549            try (ResultScanner s = meta.getScanner(scan)) {
3550              for (Result r; (r = s.next()) != null;) {
3551                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3552                RegionInfo info = RegionInfo.parseFromOrNull(b);
3553                if (info != null && info.getTable().equals(tableName)) {
3554                  // Get server hosting this region from catalog family. Return false if no server
3555                  // hosting this region, or if the server hosting this region was recently killed
3556                  // (for fault tolerance testing).
3557                  tableFound = true;
3558                  byte[] server =
3559                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3560                  if (server == null) {
3561                    return false;
3562                  } else {
3563                    byte[] startCode =
3564                        r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3565                    ServerName serverName =
3566                        ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," +
3567                            Bytes.toLong(startCode));
3568                    if (!getHBaseClusterInterface().isDistributedCluster() &&
3569                        getHBaseCluster().isKilledRS(serverName)) {
3570                      return false;
3571                    }
3572                  }
3573                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3574                    return false;
3575                  }
3576                }
3577              }
3578            }
3579            if (!tableFound) {
3580              LOG.warn("Didn't find the entries for table " + tableName + " in meta, already deleted?");
3581            }
3582            return tableFound;
3583          }
3584        });
3585      }
3586    }
3587    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3588    // check from the master state if we are using a mini cluster
3589    if (!getHBaseClusterInterface().isDistributedCluster()) {
3590      // So, all regions are in the meta table but make sure master knows of the assignments before
3591      // returning -- sometimes this can lag.
3592      HMaster master = getHBaseCluster().getMaster();
3593      final RegionStates states = master.getAssignmentManager().getRegionStates();
3594      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3595        @Override
3596        public String explainFailure() throws IOException {
3597          return explainTableAvailability(tableName);
3598        }
3599
3600        @Override
3601        public boolean evaluate() throws IOException {
3602          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3603          return hris != null && !hris.isEmpty();
3604        }
3605      });
3606    }
3607    LOG.info("All regions for table " + tableName + " assigned.");
3608  }
3609
3610  /**
3611   * Do a small get/scan against one store. This is required because store
3612   * has no actual methods of querying itself, and relies on StoreScanner.
3613   */
3614  public static List<Cell> getFromStoreFile(HStore store,
3615                                                Get get) throws IOException {
3616    Scan scan = new Scan(get);
3617    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3618        scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3619        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3620        // readpoint 0.
3621        0);
3622
3623    List<Cell> result = new ArrayList<>();
3624    scanner.next(result);
3625    if (!result.isEmpty()) {
3626      // verify that we are on the row we want:
3627      Cell kv = result.get(0);
3628      if (!CellUtil.matchingRows(kv, get.getRow())) {
3629        result.clear();
3630      }
3631    }
3632    scanner.close();
3633    return result;
3634  }
3635
3636  /**
3637   * Create region split keys between startkey and endKey
3638   *
3639   * @param startKey
3640   * @param endKey
3641   * @param numRegions the number of regions to be created. it has to be greater than 3.
3642   * @return resulting split keys
3643   */
3644  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3645    assertTrue(numRegions>3);
3646    byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3647    byte [][] result = new byte[tmpSplitKeys.length+1][];
3648    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3649    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3650    return result;
3651  }
3652
3653  /**
3654   * Do a small get/scan against one store. This is required because store
3655   * has no actual methods of querying itself, and relies on StoreScanner.
3656   */
3657  public static List<Cell> getFromStoreFile(HStore store,
3658                                                byte [] row,
3659                                                NavigableSet<byte[]> columns
3660                                                ) throws IOException {
3661    Get get = new Get(row);
3662    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3663    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3664
3665    return getFromStoreFile(store,get);
3666  }
3667
3668  public static void assertKVListsEqual(String additionalMsg,
3669      final List<? extends Cell> expected,
3670      final List<? extends Cell> actual) {
3671    final int eLen = expected.size();
3672    final int aLen = actual.size();
3673    final int minLen = Math.min(eLen, aLen);
3674
3675    int i;
3676    for (i = 0; i < minLen
3677        && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0;
3678        ++i) {}
3679
3680    if (additionalMsg == null) {
3681      additionalMsg = "";
3682    }
3683    if (!additionalMsg.isEmpty()) {
3684      additionalMsg = ". " + additionalMsg;
3685    }
3686
3687    if (eLen != aLen || i != minLen) {
3688      throw new AssertionError(
3689          "Expected and actual KV arrays differ at position " + i + ": " +
3690          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3691          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3692    }
3693  }
3694
3695  public static <T> String safeGetAsStr(List<T> lst, int i) {
3696    if (0 <= i && i < lst.size()) {
3697      return lst.get(i).toString();
3698    } else {
3699      return "<out_of_range>";
3700    }
3701  }
3702
3703  public String getClusterKey() {
3704    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3705        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3706        + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3707            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3708  }
3709
3710  /** Creates a random table with the given parameters */
3711  public Table createRandomTable(TableName tableName,
3712      final Collection<String> families,
3713      final int maxVersions,
3714      final int numColsPerRow,
3715      final int numFlushes,
3716      final int numRegions,
3717      final int numRowsPerFlush)
3718      throws IOException, InterruptedException {
3719
3720    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3721        " regions, " + numFlushes + " storefiles per region, " +
3722        numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3723        "\n");
3724
3725    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3726    final int numCF = families.size();
3727    final byte[][] cfBytes = new byte[numCF][];
3728    {
3729      int cfIndex = 0;
3730      for (String cf : families) {
3731        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3732      }
3733    }
3734
3735    final int actualStartKey = 0;
3736    final int actualEndKey = Integer.MAX_VALUE;
3737    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3738    final int splitStartKey = actualStartKey + keysPerRegion;
3739    final int splitEndKey = actualEndKey - keysPerRegion;
3740    final String keyFormat = "%08x";
3741    final Table table = createTable(tableName, cfBytes,
3742        maxVersions,
3743        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3744        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3745        numRegions);
3746
3747    if (hbaseCluster != null) {
3748      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3749    }
3750
3751    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3752
3753    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3754      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3755        final byte[] row = Bytes.toBytes(String.format(keyFormat,
3756            actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3757
3758        Put put = new Put(row);
3759        Delete del = new Delete(row);
3760        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3761          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3762          final long ts = rand.nextInt();
3763          final byte[] qual = Bytes.toBytes("col" + iCol);
3764          if (rand.nextBoolean()) {
3765            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3766                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3767                ts + "_random_" + rand.nextLong());
3768            put.addColumn(cf, qual, ts, value);
3769          } else if (rand.nextDouble() < 0.8) {
3770            del.addColumn(cf, qual, ts);
3771          } else {
3772            del.addColumns(cf, qual, ts);
3773          }
3774        }
3775
3776        if (!put.isEmpty()) {
3777          mutator.mutate(put);
3778        }
3779
3780        if (!del.isEmpty()) {
3781          mutator.mutate(del);
3782        }
3783      }
3784      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3785      mutator.flush();
3786      if (hbaseCluster != null) {
3787        getMiniHBaseCluster().flushcache(table.getName());
3788      }
3789    }
3790    mutator.close();
3791
3792    return table;
3793  }
3794
3795  public static int randomFreePort() {
3796    return HBaseCommonTestingUtility.randomFreePort();
3797  }
3798  public static String randomMultiCastAddress() {
3799    return "226.1.1." + random.nextInt(254);
3800  }
3801
3802  public static void waitForHostPort(String host, int port)
3803      throws IOException {
3804    final int maxTimeMs = 10000;
3805    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3806    IOException savedException = null;
3807    LOG.info("Waiting for server at " + host + ":" + port);
3808    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3809      try {
3810        Socket sock = new Socket(InetAddress.getByName(host), port);
3811        sock.close();
3812        savedException = null;
3813        LOG.info("Server at " + host + ":" + port + " is available");
3814        break;
3815      } catch (UnknownHostException e) {
3816        throw new IOException("Failed to look up " + host, e);
3817      } catch (IOException e) {
3818        savedException = e;
3819      }
3820      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3821    }
3822
3823    if (savedException != null) {
3824      throw savedException;
3825    }
3826  }
3827
3828  /**
3829   * Creates a pre-split table for load testing. If the table already exists,
3830   * logs a warning and continues.
3831   * @return the number of regions the table was split into
3832   */
3833  public static int createPreSplitLoadTestTable(Configuration conf,
3834      TableName tableName, byte[] columnFamily, Algorithm compression,
3835      DataBlockEncoding dataBlockEncoding) throws IOException {
3836    return createPreSplitLoadTestTable(conf, tableName,
3837      columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3838      Durability.USE_DEFAULT);
3839  }
3840  /**
3841   * Creates a pre-split table for load testing. If the table already exists,
3842   * logs a warning and continues.
3843   * @return the number of regions the table was split into
3844   */
3845  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3846    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3847    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3848    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3849    builder.setDurability(durability);
3850    builder.setRegionReplication(regionReplication);
3851    ColumnFamilyDescriptorBuilder cfBuilder =
3852      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3853    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3854    cfBuilder.setCompressionType(compression);
3855    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3856      numRegionsPerServer);
3857  }
3858
3859  /**
3860   * Creates a pre-split table for load testing. If the table already exists,
3861   * logs a warning and continues.
3862   * @return the number of regions the table was split into
3863   */
3864  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3865    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3866    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3867    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3868    builder.setDurability(durability);
3869    builder.setRegionReplication(regionReplication);
3870    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3871    for (int i = 0; i < columnFamilies.length; i++) {
3872      ColumnFamilyDescriptorBuilder cfBuilder =
3873        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3874      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3875      cfBuilder.setCompressionType(compression);
3876      hcds[i] = cfBuilder.build();
3877    }
3878    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3879  }
3880
3881  /**
3882   * Creates a pre-split table for load testing. If the table already exists,
3883   * logs a warning and continues.
3884   * @return the number of regions the table was split into
3885   */
3886  public static int createPreSplitLoadTestTable(Configuration conf,
3887      TableDescriptor desc, ColumnFamilyDescriptor hcd) throws IOException {
3888    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3889  }
3890
3891  /**
3892   * Creates a pre-split table for load testing. If the table already exists,
3893   * logs a warning and continues.
3894   * @return the number of regions the table was split into
3895   */
3896  public static int createPreSplitLoadTestTable(Configuration conf,
3897      TableDescriptor desc, ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3898    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] {hcd},
3899        numRegionsPerServer);
3900  }
3901
3902  /**
3903   * Creates a pre-split table for load testing. If the table already exists,
3904   * logs a warning and continues.
3905   * @return the number of regions the table was split into
3906   */
3907  public static int createPreSplitLoadTestTable(Configuration conf,
3908      TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
3909      int numRegionsPerServer) throws IOException {
3910    return createPreSplitLoadTestTable(conf, desc, hcds,
3911      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
3912  }
3913
3914  /**
3915   * Creates a pre-split table for load testing. If the table already exists,
3916   * logs a warning and continues.
3917   * @return the number of regions the table was split into
3918   */
3919  public static int createPreSplitLoadTestTable(Configuration conf,
3920      TableDescriptor td, ColumnFamilyDescriptor[] cds,
3921      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
3922    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3923    for (ColumnFamilyDescriptor cd : cds) {
3924      if (!td.hasColumnFamily(cd.getName())) {
3925        builder.setColumnFamily(cd);
3926      }
3927    }
3928    td = builder.build();
3929    int totalNumberOfRegions = 0;
3930    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3931    Admin admin = unmanagedConnection.getAdmin();
3932
3933    try {
3934      // create a table a pre-splits regions.
3935      // The number of splits is set as:
3936      //    region servers * regions per region server).
3937      int numberOfServers = admin.getRegionServers().size();
3938      if (numberOfServers == 0) {
3939        throw new IllegalStateException("No live regionservers");
3940      }
3941
3942      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3943      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3944          "pre-splitting table into " + totalNumberOfRegions + " regions " +
3945          "(regions per server: " + numRegionsPerServer + ")");
3946
3947      byte[][] splits = splitter.split(
3948          totalNumberOfRegions);
3949
3950      admin.createTable(td, splits);
3951    } catch (MasterNotRunningException e) {
3952      LOG.error("Master not running", e);
3953      throw new IOException(e);
3954    } catch (TableExistsException e) {
3955      LOG.warn("Table " + td.getTableName() +
3956          " already exists, continuing");
3957    } finally {
3958      admin.close();
3959      unmanagedConnection.close();
3960    }
3961    return totalNumberOfRegions;
3962  }
3963
3964  public static int getMetaRSPort(Connection connection) throws IOException {
3965    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3966      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3967    }
3968  }
3969
3970  /**
3971   *  Due to async racing issue, a region may not be in
3972   *  the online region list of a region server yet, after
3973   *  the assignment znode is deleted and the new assignment
3974   *  is recorded in master.
3975   */
3976  public void assertRegionOnServer(
3977      final RegionInfo hri, final ServerName server,
3978      final long timeout) throws IOException, InterruptedException {
3979    long timeoutTime = System.currentTimeMillis() + timeout;
3980    while (true) {
3981      List<RegionInfo> regions = getAdmin().getRegions(server);
3982      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3983      long now = System.currentTimeMillis();
3984      if (now > timeoutTime) break;
3985      Thread.sleep(10);
3986    }
3987    fail("Could not find region " + hri.getRegionNameAsString()
3988      + " on server " + server);
3989  }
3990
3991  /**
3992   * Check to make sure the region is open on the specified
3993   * region server, but not on any other one.
3994   */
3995  public void assertRegionOnlyOnServer(
3996      final RegionInfo hri, final ServerName server,
3997      final long timeout) throws IOException, InterruptedException {
3998    long timeoutTime = System.currentTimeMillis() + timeout;
3999    while (true) {
4000      List<RegionInfo> regions = getAdmin().getRegions(server);
4001      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
4002        List<JVMClusterUtil.RegionServerThread> rsThreads =
4003          getHBaseCluster().getLiveRegionServerThreads();
4004        for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
4005          HRegionServer rs = rsThread.getRegionServer();
4006          if (server.equals(rs.getServerName())) {
4007            continue;
4008          }
4009          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
4010          for (HRegion r: hrs) {
4011            assertTrue("Region should not be double assigned",
4012              r.getRegionInfo().getRegionId() != hri.getRegionId());
4013          }
4014        }
4015        return; // good, we are happy
4016      }
4017      long now = System.currentTimeMillis();
4018      if (now > timeoutTime) break;
4019      Thread.sleep(10);
4020    }
4021    fail("Could not find region " + hri.getRegionNameAsString()
4022      + " on server " + server);
4023  }
4024
4025  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
4026    TableDescriptor td =
4027        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4028    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4029    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
4030  }
4031
4032  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
4033      BlockCache blockCache) throws IOException {
4034    TableDescriptor td =
4035        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4036    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4037    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
4038  }
4039
4040  public void setFileSystemURI(String fsURI) {
4041    FS_URI = fsURI;
4042  }
4043
4044  /**
4045   * Returns a {@link Predicate} for checking that there are no regions in transition in master
4046   */
4047  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
4048    return new ExplainingPredicate<IOException>() {
4049      @Override
4050      public String explainFailure() throws IOException {
4051        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
4052            .getAssignmentManager().getRegionStates();
4053        return "found in transition: " + regionStates.getRegionsInTransition().toString();
4054      }
4055
4056      @Override
4057      public boolean evaluate() throws IOException {
4058        HMaster master = getMiniHBaseCluster().getMaster();
4059        if (master == null) return false;
4060        AssignmentManager am = master.getAssignmentManager();
4061        if (am == null) return false;
4062        return !am.hasRegionsInTransition();
4063      }
4064    };
4065  }
4066
4067  /**
4068   * Returns a {@link Predicate} for checking that table is enabled
4069   */
4070  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4071    return new ExplainingPredicate<IOException>() {
4072      @Override
4073      public String explainFailure() throws IOException {
4074        return explainTableState(tableName, TableState.State.ENABLED);
4075      }
4076
4077      @Override
4078      public boolean evaluate() throws IOException {
4079        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
4080      }
4081    };
4082  }
4083
4084  /**
4085   * Returns a {@link Predicate} for checking that table is enabled
4086   */
4087  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4088    return new ExplainingPredicate<IOException>() {
4089      @Override
4090      public String explainFailure() throws IOException {
4091        return explainTableState(tableName, TableState.State.DISABLED);
4092      }
4093
4094      @Override
4095      public boolean evaluate() throws IOException {
4096        return getAdmin().isTableDisabled(tableName);
4097      }
4098    };
4099  }
4100
4101  /**
4102   * Returns a {@link Predicate} for checking that table is enabled
4103   */
4104  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4105    return new ExplainingPredicate<IOException>() {
4106      @Override
4107      public String explainFailure() throws IOException {
4108        return explainTableAvailability(tableName);
4109      }
4110
4111      @Override
4112      public boolean evaluate() throws IOException {
4113        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
4114        if (tableAvailable) {
4115          try (Table table = getConnection().getTable(tableName)) {
4116            TableDescriptor htd = table.getDescriptor();
4117            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
4118                .getAllRegionLocations()) {
4119              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
4120                  .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
4121                  .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
4122              for (byte[] family : htd.getColumnFamilyNames()) {
4123                scan.addFamily(family);
4124              }
4125              try (ResultScanner scanner = table.getScanner(scan)) {
4126                scanner.next();
4127              }
4128            }
4129          }
4130        }
4131        return tableAvailable;
4132      }
4133    };
4134  }
4135
4136  /**
4137   * Wait until no regions in transition.
4138   * @param timeout How long to wait.
4139   * @throws IOException
4140   */
4141  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
4142    waitFor(timeout, predicateNoRegionsInTransition());
4143  }
4144
4145  /**
4146   * Wait until no regions in transition. (time limit 15min)
4147   * @throws IOException
4148   */
4149  public void waitUntilNoRegionsInTransition() throws IOException {
4150    waitUntilNoRegionsInTransition(15 * 60000);
4151  }
4152
4153  /**
4154   * Wait until labels is ready in VisibilityLabelsCache.
4155   * @param timeoutMillis
4156   * @param labels
4157   */
4158  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4159    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4160    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4161
4162      @Override
4163      public boolean evaluate() {
4164        for (String label : labels) {
4165          if (labelsCache.getLabelOrdinal(label) == 0) {
4166            return false;
4167          }
4168        }
4169        return true;
4170      }
4171
4172      @Override
4173      public String explainFailure() {
4174        for (String label : labels) {
4175          if (labelsCache.getLabelOrdinal(label) == 0) {
4176            return label + " is not available yet";
4177          }
4178        }
4179        return "";
4180      }
4181    });
4182  }
4183
4184  /**
4185   * Create a set of column descriptors with the combination of compression,
4186   * encoding, bloom codecs available.
4187   * @return the list of column descriptors
4188   */
4189  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4190    return generateColumnDescriptors("");
4191  }
4192
4193  /**
4194   * Create a set of column descriptors with the combination of compression,
4195   * encoding, bloom codecs available.
4196   * @param prefix family names prefix
4197   * @return the list of column descriptors
4198   */
4199  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4200    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4201    long familyId = 0;
4202    for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4203      for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4204        for (BloomType bloomType: BloomType.values()) {
4205          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4206          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4207            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4208          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4209          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4210          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4211          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4212          familyId++;
4213        }
4214      }
4215    }
4216    return columnFamilyDescriptors;
4217  }
4218
4219  /**
4220   * Get supported compression algorithms.
4221   * @return supported compression algorithms.
4222   */
4223  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4224    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4225    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4226    for (String algoName : allAlgos) {
4227      try {
4228        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4229        algo.getCompressor();
4230        supportedAlgos.add(algo);
4231      } catch (Throwable t) {
4232        // this algo is not available
4233      }
4234    }
4235    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4236  }
4237
4238  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4239    Scan scan = new Scan().withStartRow(row);
4240    scan.setSmall(true);
4241    scan.setCaching(1);
4242    scan.setReversed(true);
4243    scan.addFamily(family);
4244    try (RegionScanner scanner = r.getScanner(scan)) {
4245      List<Cell> cells = new ArrayList<>(1);
4246      scanner.next(cells);
4247      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4248        return null;
4249      }
4250      return Result.create(cells);
4251    }
4252  }
4253
4254  private boolean isTargetTable(final byte[] inRow, Cell c) {
4255    String inputRowString = Bytes.toString(inRow);
4256    int i = inputRowString.indexOf(HConstants.DELIMITER);
4257    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4258    int o = outputRowString.indexOf(HConstants.DELIMITER);
4259    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4260  }
4261
4262  /**
4263   * Sets up {@link MiniKdc} for testing security.
4264   * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4265   * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4266   * FYI, there is also the easier-to-use kerby KDC server and utility for using it,
4267   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4268   * less baggage. It came in in HBASE-5291.
4269   */
4270  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4271    Properties conf = MiniKdc.createConf();
4272    conf.put(MiniKdc.DEBUG, true);
4273    MiniKdc kdc = null;
4274    File dir = null;
4275    // There is time lag between selecting a port and trying to bind with it. It's possible that
4276    // another service captures the port in between which'll result in BindException.
4277    boolean bindException;
4278    int numTries = 0;
4279    do {
4280      try {
4281        bindException = false;
4282        dir = new File(getDataTestDir("kdc").toUri().getPath());
4283        kdc = new MiniKdc(conf, dir);
4284        kdc.start();
4285      } catch (BindException e) {
4286        FileUtils.deleteDirectory(dir);  // clean directory
4287        numTries++;
4288        if (numTries == 3) {
4289          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4290          throw e;
4291        }
4292        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4293        bindException = true;
4294      }
4295    } while (bindException);
4296    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4297    return kdc;
4298  }
4299
4300  public int getNumHFiles(final TableName tableName, final byte[] family) {
4301    int numHFiles = 0;
4302    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4303      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
4304                                    family);
4305    }
4306    return numHFiles;
4307  }
4308
4309  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4310                               final byte[] family) {
4311    int numHFiles = 0;
4312    for (Region region : rs.getRegions(tableName)) {
4313      numHFiles += region.getStore(family).getStorefilesCount();
4314    }
4315    return numHFiles;
4316  }
4317
4318  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4319    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
4320    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4321    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4322    assertEquals(ltdFamilies.size(), rtdFamilies.size());
4323    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), it2 =
4324         rtdFamilies.iterator(); it.hasNext();) {
4325      assertEquals(0,
4326          ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4327    }
4328  }
4329
4330  /**
4331   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4332   * invocations.
4333   */
4334  public static void await(final long sleepMillis, final BooleanSupplier condition)
4335    throws InterruptedException {
4336    try {
4337      while (!condition.getAsBoolean()) {
4338        Thread.sleep(sleepMillis);
4339      }
4340    } catch (RuntimeException e) {
4341      if (e.getCause() instanceof AssertionError) {
4342        throw (AssertionError) e.getCause();
4343      }
4344      throw e;
4345    }
4346  }
4347}