001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.lang.reflect.Field;
029import java.lang.reflect.Modifier;
030import java.net.BindException;
031import java.net.DatagramSocket;
032import java.net.InetAddress;
033import java.net.ServerSocket;
034import java.net.Socket;
035import java.net.UnknownHostException;
036import java.nio.charset.StandardCharsets;
037import java.security.MessageDigest;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.HashSet;
043import java.util.Iterator;
044import java.util.List;
045import java.util.Map;
046import java.util.NavigableSet;
047import java.util.Properties;
048import java.util.Random;
049import java.util.Set;
050import java.util.TreeSet;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.atomic.AtomicReference;
053import org.apache.commons.io.FileUtils;
054import org.apache.commons.lang3.RandomStringUtils;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
059import org.apache.hadoop.hbase.Waiter.Predicate;
060import org.apache.hadoop.hbase.client.Admin;
061import org.apache.hadoop.hbase.client.AsyncClusterConnection;
062import org.apache.hadoop.hbase.client.BufferedMutator;
063import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
066import org.apache.hadoop.hbase.client.Connection;
067import org.apache.hadoop.hbase.client.ConnectionFactory;
068import org.apache.hadoop.hbase.client.Consistency;
069import org.apache.hadoop.hbase.client.Delete;
070import org.apache.hadoop.hbase.client.Durability;
071import org.apache.hadoop.hbase.client.Get;
072import org.apache.hadoop.hbase.client.Hbck;
073import org.apache.hadoop.hbase.client.MasterRegistry;
074import org.apache.hadoop.hbase.client.Put;
075import org.apache.hadoop.hbase.client.RegionInfo;
076import org.apache.hadoop.hbase.client.RegionInfoBuilder;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.Result;
079import org.apache.hadoop.hbase.client.ResultScanner;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.Table;
082import org.apache.hadoop.hbase.client.TableDescriptor;
083import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
084import org.apache.hadoop.hbase.client.TableState;
085import org.apache.hadoop.hbase.fs.HFileSystem;
086import org.apache.hadoop.hbase.io.compress.Compression;
087import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
089import org.apache.hadoop.hbase.io.hfile.BlockCache;
090import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
091import org.apache.hadoop.hbase.io.hfile.HFile;
092import org.apache.hadoop.hbase.ipc.RpcServerInterface;
093import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
094import org.apache.hadoop.hbase.logging.Log4jUtils;
095import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
096import org.apache.hadoop.hbase.master.HMaster;
097import org.apache.hadoop.hbase.master.RegionState;
098import org.apache.hadoop.hbase.master.ServerManager;
099import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
100import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
101import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
102import org.apache.hadoop.hbase.master.assignment.RegionStates;
103import org.apache.hadoop.hbase.mob.MobFileCache;
104import org.apache.hadoop.hbase.regionserver.BloomType;
105import org.apache.hadoop.hbase.regionserver.ChunkCreator;
106import org.apache.hadoop.hbase.regionserver.HRegion;
107import org.apache.hadoop.hbase.regionserver.HRegionServer;
108import org.apache.hadoop.hbase.regionserver.HStore;
109import org.apache.hadoop.hbase.regionserver.InternalScanner;
110import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
111import org.apache.hadoop.hbase.regionserver.Region;
112import org.apache.hadoop.hbase.regionserver.RegionScanner;
113import org.apache.hadoop.hbase.regionserver.RegionServerServices;
114import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
115import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
116import org.apache.hadoop.hbase.security.User;
117import org.apache.hadoop.hbase.security.UserProvider;
118import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
119import org.apache.hadoop.hbase.trace.TraceUtil;
120import org.apache.hadoop.hbase.util.Bytes;
121import org.apache.hadoop.hbase.util.CommonFSUtils;
122import org.apache.hadoop.hbase.util.FSUtils;
123import org.apache.hadoop.hbase.util.JVMClusterUtil;
124import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
125import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
126import org.apache.hadoop.hbase.util.Pair;
127import org.apache.hadoop.hbase.util.RegionSplitter;
128import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
129import org.apache.hadoop.hbase.util.RetryCounter;
130import org.apache.hadoop.hbase.util.Threads;
131import org.apache.hadoop.hbase.wal.WAL;
132import org.apache.hadoop.hbase.wal.WALFactory;
133import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
134import org.apache.hadoop.hbase.zookeeper.ZKConfig;
135import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
136import org.apache.hadoop.hdfs.DFSClient;
137import org.apache.hadoop.hdfs.DistributedFileSystem;
138import org.apache.hadoop.hdfs.MiniDFSCluster;
139import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
140import org.apache.hadoop.mapred.JobConf;
141import org.apache.hadoop.mapred.MiniMRCluster;
142import org.apache.hadoop.mapred.TaskLog;
143import org.apache.hadoop.minikdc.MiniKdc;
144import org.apache.yetus.audience.InterfaceAudience;
145import org.apache.zookeeper.WatchedEvent;
146import org.apache.zookeeper.ZooKeeper;
147import org.apache.zookeeper.ZooKeeper.States;
148
149import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
150
151import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
152
153/**
154 * Facility for testing HBase. Replacement for
155 * old HBaseTestCase and HBaseClusterTestCase functionality.
156 * Create an instance and keep it around testing HBase.  This class is
157 * meant to be your one-stop shop for anything you might need testing.  Manages
158 * one cluster at a time only. Managed cluster can be an in-process
159 * {@link MiniHBaseCluster}, or a deployed cluster of type {@code DistributedHBaseCluster}.
160 * Not all methods work with the real cluster.
161 * Depends on log4j being on classpath and
162 * hbase-site.xml for logging and test-run configuration.  It does not set
163 * logging levels.
164 * In the configuration properties, default values for master-info-port and
165 * region-server-port are overridden such that a random port will be assigned (thus
166 * avoiding port contention if another local HBase instance is already running).
167 * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
168 * setting it to true.
169 */
170@InterfaceAudience.Public
171@SuppressWarnings("deprecation")
172public class HBaseTestingUtility extends HBaseZKTestingUtility {
173
174  /**
175   * System property key to get test directory value. Name is as it is because mini dfs has
176   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
177   * used in mini dfs.
178   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
179   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
180   */
181  @Deprecated
182  private static final String TEST_DIRECTORY_KEY = "test.build.data";
183
184  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
185  /**
186   * The default number of regions per regionserver when creating a pre-split
187   * table.
188   */
189  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
190
191
192  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
193  public static final boolean PRESPLIT_TEST_TABLE = true;
194
195  private MiniDFSCluster dfsCluster = null;
196
197  private volatile HBaseCluster hbaseCluster = null;
198  private MiniMRCluster mrCluster = null;
199
200  /** If there is a mini cluster running for this testing utility instance. */
201  private volatile boolean miniClusterRunning;
202
203  private String hadoopLogDir;
204
205  /** Directory on test filesystem where we put the data for this instance of
206    * HBaseTestingUtility*/
207  private Path dataTestDirOnTestFS = null;
208
209  private volatile AsyncClusterConnection asyncConnection;
210
211  /** Filesystem URI used for map-reduce mini-cluster setup */
212  private static String FS_URI;
213
214  /** This is for unit tests parameterized with a single boolean. */
215  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
216
217  /**
218   * Checks to see if a specific port is available.
219   *
220   * @param port the port number to check for availability
221   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
222   */
223  public static boolean available(int port) {
224    ServerSocket ss = null;
225    DatagramSocket ds = null;
226    try {
227      ss = new ServerSocket(port);
228      ss.setReuseAddress(true);
229      ds = new DatagramSocket(port);
230      ds.setReuseAddress(true);
231      return true;
232    } catch (IOException e) {
233      // Do nothing
234    } finally {
235      if (ds != null) {
236        ds.close();
237      }
238
239      if (ss != null) {
240        try {
241          ss.close();
242        } catch (IOException e) {
243          /* should not be thrown */
244        }
245      }
246    }
247
248    return false;
249  }
250
251  /**
252   * Create all combinations of Bloom filters and compression algorithms for
253   * testing.
254   */
255  private static List<Object[]> bloomAndCompressionCombinations() {
256    List<Object[]> configurations = new ArrayList<>();
257    for (Compression.Algorithm comprAlgo :
258         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
259      for (BloomType bloomType : BloomType.values()) {
260        configurations.add(new Object[] { comprAlgo, bloomType });
261      }
262    }
263    return Collections.unmodifiableList(configurations);
264  }
265
266  /**
267   * Create combination of memstoreTS and tags
268   */
269  private static List<Object[]> memStoreTSAndTagsCombination() {
270    List<Object[]> configurations = new ArrayList<>();
271    configurations.add(new Object[] { false, false });
272    configurations.add(new Object[] { false, true });
273    configurations.add(new Object[] { true, false });
274    configurations.add(new Object[] { true, true });
275    return Collections.unmodifiableList(configurations);
276  }
277
278  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
279    List<Object[]> configurations = new ArrayList<>();
280    configurations.add(new Object[] { false, false, true });
281    configurations.add(new Object[] { false, false, false });
282    configurations.add(new Object[] { false, true, true });
283    configurations.add(new Object[] { false, true, false });
284    configurations.add(new Object[] { true, false, true });
285    configurations.add(new Object[] { true, false, false });
286    configurations.add(new Object[] { true, true, true });
287    configurations.add(new Object[] { true, true, false });
288    return Collections.unmodifiableList(configurations);
289  }
290
291  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
292      bloomAndCompressionCombinations();
293
294
295  /**
296   * <p>Create an HBaseTestingUtility using a default configuration.
297   *
298   * <p>Initially, all tmp files are written to a local test data directory.
299   * Once {@link #startMiniDFSCluster} is called, either directly or via
300   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
301   */
302  public HBaseTestingUtility() {
303    this(HBaseConfiguration.create());
304  }
305
306  /**
307   * <p>Create an HBaseTestingUtility using a given configuration.
308   *
309   * <p>Initially, all tmp files are written to a local test data directory.
310   * Once {@link #startMiniDFSCluster} is called, either directly or via
311   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
312   *
313   * @param conf The configuration to use for further operations
314   */
315  public HBaseTestingUtility(@Nullable Configuration conf) {
316    super(conf);
317
318    // a hbase checksum verification failure will cause unit tests to fail
319    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
320
321    // Save this for when setting default file:// breaks things
322    if (this.conf.get("fs.defaultFS") != null) {
323      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
324    }
325    if (this.conf.get(HConstants.HBASE_DIR) != null) {
326      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
327    }
328    // Every cluster is a local cluster until we start DFS
329    // Note that conf could be null, but this.conf will not be
330    String dataTestDir = getDataTestDir().toString();
331    this.conf.set("fs.defaultFS","file:///");
332    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
333    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
334    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
335    // If the value for random ports isn't set set it to true, thus making
336    // tests opt-out for random port assignment
337    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
338        this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
339  }
340
341  /**
342   * Close both the region {@code r} and it's underlying WAL. For use in tests.
343   */
344  public static void closeRegionAndWAL(final Region r) throws IOException {
345    closeRegionAndWAL((HRegion)r);
346  }
347
348  /**
349   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
350   */
351  public static void closeRegionAndWAL(final HRegion r) throws IOException {
352    if (r == null) return;
353    r.close();
354    if (r.getWAL() == null) return;
355    r.getWAL().close();
356  }
357
358  /**
359   * Returns this classes's instance of {@link Configuration}.  Be careful how
360   * you use the returned Configuration since {@link Connection} instances
361   * can be shared.  The Map of Connections is keyed by the Configuration.  If
362   * say, a Connection was being used against a cluster that had been shutdown,
363   * see {@link #shutdownMiniCluster()}, then the Connection will no longer
364   * be wholesome.  Rather than use the return direct, its usually best to
365   * make a copy and use that.  Do
366   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
367   * @return Instance of Configuration.
368   */
369  @Override
370  public Configuration getConfiguration() {
371    return super.getConfiguration();
372  }
373
374  public void setHBaseCluster(HBaseCluster hbaseCluster) {
375    this.hbaseCluster = hbaseCluster;
376  }
377
378  /**
379   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
380   * Give it a random name so can have many concurrent tests running if
381   * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
382   * System property, as it's what minidfscluster bases
383   * it data dir on.  Moding a System property is not the way to do concurrent
384   * instances -- another instance could grab the temporary
385   * value unintentionally -- but not anything can do about it at moment;
386   * single instance only is how the minidfscluster works.
387   *
388   * We also create the underlying directory names for
389   *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
390   *  in the conf, and as a system property for hadoop.tmp.dir (We do not create them!).
391   *
392   * @return The calculated data test build directory, if newly-created.
393   */
394  @Override
395  protected Path setupDataTestDir() {
396    Path testPath = super.setupDataTestDir();
397    if (null == testPath) {
398      return null;
399    }
400
401    createSubDirAndSystemProperty(
402      "hadoop.log.dir",
403      testPath, "hadoop-log-dir");
404
405    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
406    //  we want our own value to ensure uniqueness on the same machine
407    createSubDirAndSystemProperty(
408      "hadoop.tmp.dir",
409      testPath, "hadoop-tmp-dir");
410
411    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
412    createSubDir(
413      "mapreduce.cluster.local.dir",
414      testPath, "mapred-local-dir");
415    return testPath;
416  }
417
418  private void createSubDirAndSystemProperty(
419    String propertyName, Path parent, String subDirName){
420
421    String sysValue = System.getProperty(propertyName);
422
423    if (sysValue != null) {
424      // There is already a value set. So we do nothing but hope
425      //  that there will be no conflicts
426      LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
427        sysValue + " so I do NOT create it in " + parent);
428      String confValue = conf.get(propertyName);
429      if (confValue != null && !confValue.endsWith(sysValue)){
430       LOG.warn(
431         propertyName + " property value differs in configuration and system: "+
432         "Configuration="+confValue+" while System="+sysValue+
433         " Erasing configuration value by system value."
434       );
435      }
436      conf.set(propertyName, sysValue);
437    } else {
438      // Ok, it's not set, so we create it as a subdirectory
439      createSubDir(propertyName, parent, subDirName);
440      System.setProperty(propertyName, conf.get(propertyName));
441    }
442  }
443
444  /**
445   * @return Where to write test data on the test filesystem; Returns working directory
446   * for the test filesystem by default
447   * @see #setupDataTestDirOnTestFS()
448   * @see #getTestFileSystem()
449   */
450  private Path getBaseTestDirOnTestFS() throws IOException {
451    FileSystem fs = getTestFileSystem();
452    return new Path(fs.getWorkingDirectory(), "test-data");
453  }
454
455  /**
456   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
457   * to write temporary test data. Call this method after setting up the mini dfs cluster
458   * if the test relies on it.
459   * @return a unique path in the test filesystem
460   */
461  public Path getDataTestDirOnTestFS() throws IOException {
462    if (dataTestDirOnTestFS == null) {
463      setupDataTestDirOnTestFS();
464    }
465
466    return dataTestDirOnTestFS;
467  }
468
469  /**
470   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
471   * to write temporary test data. Call this method after setting up the mini dfs cluster
472   * if the test relies on it.
473   * @return a unique path in the test filesystem
474   * @param subdirName name of the subdir to create under the base test dir
475   */
476  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
477    return new Path(getDataTestDirOnTestFS(), subdirName);
478  }
479
480  /**
481   * Sets up a path in test filesystem to be used by tests.
482   * Creates a new directory if not already setup.
483   */
484  private void setupDataTestDirOnTestFS() throws IOException {
485    if (dataTestDirOnTestFS != null) {
486      LOG.warn("Data test on test fs dir already setup in "
487          + dataTestDirOnTestFS.toString());
488      return;
489    }
490    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
491  }
492
493  /**
494   * Sets up a new path in test filesystem to be used by tests.
495   */
496  private Path getNewDataTestDirOnTestFS() throws IOException {
497    //The file system can be either local, mini dfs, or if the configuration
498    //is supplied externally, it can be an external cluster FS. If it is a local
499    //file system, the tests should use getBaseTestDir, otherwise, we can use
500    //the working directory, and create a unique sub dir there
501    FileSystem fs = getTestFileSystem();
502    Path newDataTestDir;
503    String randomStr = getRandomUUID().toString();
504    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
505      newDataTestDir = new Path(getDataTestDir(), randomStr);
506      File dataTestDir = new File(newDataTestDir.toString());
507      if (deleteOnExit()) dataTestDir.deleteOnExit();
508    } else {
509      Path base = getBaseTestDirOnTestFS();
510      newDataTestDir = new Path(base, randomStr);
511      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
512    }
513    return newDataTestDir;
514  }
515
516  /**
517   * Cleans the test data directory on the test filesystem.
518   * @return True if we removed the test dirs
519   * @throws IOException
520   */
521  public boolean cleanupDataTestDirOnTestFS() throws IOException {
522    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
523    if (ret)
524      dataTestDirOnTestFS = null;
525    return ret;
526  }
527
528  /**
529   * Cleans a subdirectory under the test data directory on the test filesystem.
530   * @return True if we removed child
531   * @throws IOException
532   */
533  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
534    Path cpath = getDataTestDirOnTestFS(subdirName);
535    return getTestFileSystem().delete(cpath, true);
536  }
537
538  /**
539   * Start a minidfscluster.
540   * @param servers How many DNs to start.
541   * @throws Exception
542   * @see #shutdownMiniDFSCluster()
543   * @return The mini dfs cluster created.
544   */
545  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
546    return startMiniDFSCluster(servers, null);
547  }
548
549  /**
550   * Start a minidfscluster.
551   * This is useful if you want to run datanode on distinct hosts for things
552   * like HDFS block location verification.
553   * If you start MiniDFSCluster without host names, all instances of the
554   * datanodes will have the same host name.
555   * @param hosts hostnames DNs to run on.
556   * @throws Exception
557   * @see #shutdownMiniDFSCluster()
558   * @return The mini dfs cluster created.
559   */
560  public MiniDFSCluster startMiniDFSCluster(final String hosts[])
561  throws Exception {
562    if ( hosts != null && hosts.length != 0) {
563      return startMiniDFSCluster(hosts.length, hosts);
564    } else {
565      return startMiniDFSCluster(1, null);
566    }
567  }
568
569  /**
570   * Start a minidfscluster.
571   * Can only create one.
572   * @param servers How many DNs to start.
573   * @param hosts hostnames DNs to run on.
574   * @throws Exception
575   * @see #shutdownMiniDFSCluster()
576   * @return The mini dfs cluster created.
577   */
578  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
579  throws Exception {
580    return startMiniDFSCluster(servers, null, hosts);
581  }
582
583  private void setFs() throws IOException {
584    if(this.dfsCluster == null){
585      LOG.info("Skipping setting fs because dfsCluster is null");
586      return;
587    }
588    FileSystem fs = this.dfsCluster.getFileSystem();
589    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
590
591    // re-enable this check with dfs
592    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
593  }
594
595  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
596      throws Exception {
597    createDirsAndSetProperties();
598    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
599
600    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
601    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
602        setLevel(org.apache.log4j.Level.ERROR);
603    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
604        setLevel(org.apache.log4j.Level.ERROR);
605
606    TraceUtil.initTracer(conf);
607
608    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
609        true, null, racks, hosts, null);
610
611    // Set this just-started cluster as our filesystem.
612    setFs();
613
614    // Wait for the cluster to be totally up
615    this.dfsCluster.waitClusterUp();
616
617    //reset the test directory for test file system
618    dataTestDirOnTestFS = null;
619    String dataTestDir = getDataTestDir().toString();
620    conf.set(HConstants.HBASE_DIR, dataTestDir);
621    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
622
623    return this.dfsCluster;
624  }
625
626  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
627    createDirsAndSetProperties();
628    dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
629        null, null, null);
630    return dfsCluster;
631  }
632
633  /** This is used before starting HDFS and map-reduce mini-clusters
634   * Run something like the below to check for the likes of '/tmp' references -- i.e.
635   * references outside of the test data dir -- in the conf.
636   *     Configuration conf = TEST_UTIL.getConfiguration();
637   *     for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) {
638   *       Map.Entry<String, String> e = i.next();
639   *       assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
640   *     }
641   */
642  private void createDirsAndSetProperties() throws IOException {
643    setupClusterTestDir();
644    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
645    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
646    createDirAndSetProperty("test.cache.data");
647    createDirAndSetProperty("hadoop.tmp.dir");
648    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
649    createDirAndSetProperty("mapreduce.cluster.local.dir");
650    createDirAndSetProperty("mapreduce.cluster.temp.dir");
651    enableShortCircuit();
652
653    Path root = getDataTestDirOnTestFS("hadoop");
654    conf.set(MapreduceTestingShim.getMROutputDirProp(),
655      new Path(root, "mapred-output-dir").toString());
656    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
657    conf.set("mapreduce.jobtracker.staging.root.dir",
658      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
659    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
660    conf.set("yarn.app.mapreduce.am.staging-dir",
661      new Path(root, "mapreduce-am-staging-root-dir").toString());
662
663    // Frustrate yarn's and hdfs's attempts at writing /tmp.
664    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
665    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
666    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
667    createDirAndSetProperty("yarn.nodemanager.log-dirs");
668    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
669    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
670    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
671    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
672    createDirAndSetProperty("dfs.journalnode.edits.dir");
673    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
674    createDirAndSetProperty("nfs.dump.dir");
675    createDirAndSetProperty("java.io.tmpdir");
676    createDirAndSetProperty("dfs.journalnode.edits.dir");
677    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
678    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
679  }
680
681  /**
682   *  Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating
683   *  new column families. Default to false.
684   */
685  public boolean isNewVersionBehaviorEnabled(){
686    final String propName = "hbase.tests.new.version.behavior";
687    String v = System.getProperty(propName);
688    if (v != null){
689      return Boolean.parseBoolean(v);
690    }
691    return false;
692  }
693
694  /**
695   *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
696   *  This allows to specify this parameter on the command line.
697   *   If not set, default is true.
698   */
699  public boolean isReadShortCircuitOn(){
700    final String propName = "hbase.tests.use.shortcircuit.reads";
701    String readOnProp = System.getProperty(propName);
702    if (readOnProp != null){
703      return  Boolean.parseBoolean(readOnProp);
704    } else {
705      return conf.getBoolean(propName, false);
706    }
707  }
708
709  /** Enable the short circuit read, unless configured differently.
710   * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
711   */
712  private void enableShortCircuit() {
713    if (isReadShortCircuitOn()) {
714      String curUser = System.getProperty("user.name");
715      LOG.info("read short circuit is ON for user " + curUser);
716      // read short circuit, for hdfs
717      conf.set("dfs.block.local-path-access.user", curUser);
718      // read short circuit, for hbase
719      conf.setBoolean("dfs.client.read.shortcircuit", true);
720      // Skip checking checksum, for the hdfs client and the datanode
721      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
722    } else {
723      LOG.info("read short circuit is OFF");
724    }
725  }
726
727  private String createDirAndSetProperty(final String property) {
728    return createDirAndSetProperty(property, property);
729  }
730
731  private String createDirAndSetProperty(final String relPath, String property) {
732    String path = getDataTestDir(relPath).toString();
733    System.setProperty(property, path);
734    conf.set(property, path);
735    new File(path).mkdirs();
736    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
737    return path;
738  }
739
740  /**
741   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
742   * or does nothing.
743   * @throws IOException
744   */
745  public void shutdownMiniDFSCluster() throws IOException {
746    if (this.dfsCluster != null) {
747      // The below throws an exception per dn, AsynchronousCloseException.
748      this.dfsCluster.shutdown();
749      dfsCluster = null;
750      dataTestDirOnTestFS = null;
751      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
752    }
753  }
754
755  /**
756   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
757   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
758   * @param createWALDir Whether to create a new WAL directory.
759   * @return The mini HBase cluster created.
760   * @see #shutdownMiniCluster()
761   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
762   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
763   * @see #startMiniCluster(StartMiniClusterOption)
764   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
765   */
766  @Deprecated
767  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
768    StartMiniClusterOption option = StartMiniClusterOption.builder()
769        .createWALDir(createWALDir).build();
770    return startMiniCluster(option);
771  }
772
773  /**
774   * Start up a minicluster of hbase, dfs, and zookeeper.
775   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
776   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
777   * @param createRootDir Whether to create a new root or data directory path.
778   * @return The mini HBase cluster created.
779   * @see #shutdownMiniCluster()
780   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
781   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
782   * @see #startMiniCluster(StartMiniClusterOption)
783   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
784   */
785  @Deprecated
786  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir)
787  throws Exception {
788    StartMiniClusterOption option = StartMiniClusterOption.builder()
789        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir).build();
790    return startMiniCluster(option);
791  }
792
793  /**
794   * Start up a minicluster of hbase, dfs, and zookeeper.
795   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
796   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
797   * @param createRootDir Whether to create a new root or data directory path.
798   * @param createWALDir Whether to create a new WAL directory.
799   * @return The mini HBase cluster created.
800   * @see #shutdownMiniCluster()
801   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
802   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
803   * @see #startMiniCluster(StartMiniClusterOption)
804   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
805   */
806  @Deprecated
807  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
808      boolean createWALDir) throws Exception {
809    StartMiniClusterOption option = StartMiniClusterOption.builder()
810        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir)
811        .createWALDir(createWALDir).build();
812    return startMiniCluster(option);
813  }
814
815  /**
816   * Start up a minicluster of hbase, dfs, and zookeeper.
817   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
818   * @param numMasters Master node number.
819   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
820   * @param createRootDir Whether to create a new root or data directory path.
821   * @return The mini HBase cluster created.
822   * @see #shutdownMiniCluster()
823   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
824   *  {@link #startMiniCluster(StartMiniClusterOption)} instead.
825   * @see #startMiniCluster(StartMiniClusterOption)
826   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
827   */
828  @Deprecated
829  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
830    throws Exception {
831    StartMiniClusterOption option = StartMiniClusterOption.builder()
832        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
833        .numDataNodes(numSlaves).build();
834    return startMiniCluster(option);
835  }
836
837  /**
838   * Start up a minicluster of hbase, dfs, and zookeeper.
839   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
840   * @param numMasters Master node number.
841   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
842   * @return The mini HBase cluster created.
843   * @see #shutdownMiniCluster()
844   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
845   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
846   * @see #startMiniCluster(StartMiniClusterOption)
847   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
848   */
849  @Deprecated
850  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
851    StartMiniClusterOption option = StartMiniClusterOption.builder()
852        .numMasters(numMasters).numRegionServers(numSlaves).numDataNodes(numSlaves).build();
853    return startMiniCluster(option);
854  }
855
856  /**
857   * Start up a minicluster of hbase, dfs, and zookeeper.
858   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
859   * @param numMasters Master node number.
860   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
861   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
862   *                      HDFS data node number.
863   * @param createRootDir Whether to create a new root or data directory path.
864   * @return The mini HBase cluster created.
865   * @see #shutdownMiniCluster()
866   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
867   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
868   * @see #startMiniCluster(StartMiniClusterOption)
869   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
870   */
871  @Deprecated
872  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
873      boolean createRootDir) throws Exception {
874    StartMiniClusterOption option = StartMiniClusterOption.builder()
875        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
876        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
877    return startMiniCluster(option);
878  }
879
880  /**
881   * Start up a minicluster of hbase, dfs, and zookeeper.
882   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
883   * @param numMasters Master node number.
884   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
885   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
886   *                      HDFS data node number.
887   * @return The mini HBase cluster created.
888   * @see #shutdownMiniCluster()
889   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
890   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
891   * @see #startMiniCluster(StartMiniClusterOption)
892   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
893   */
894  @Deprecated
895  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
896      throws Exception {
897    StartMiniClusterOption option = StartMiniClusterOption.builder()
898        .numMasters(numMasters).numRegionServers(numSlaves)
899        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
900    return startMiniCluster(option);
901  }
902
903  /**
904   * Start up a minicluster of hbase, dfs, and zookeeper.
905   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
906   * @param numMasters Master node number.
907   * @param numRegionServers Number of region servers.
908   * @param numDataNodes Number of datanodes.
909   * @return The mini HBase cluster created.
910   * @see #shutdownMiniCluster()
911   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
912   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
913   * @see #startMiniCluster(StartMiniClusterOption)
914   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
915   */
916  @Deprecated
917  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
918      throws Exception {
919    StartMiniClusterOption option = StartMiniClusterOption.builder()
920        .numMasters(numMasters).numRegionServers(numRegionServers).numDataNodes(numDataNodes)
921        .build();
922    return startMiniCluster(option);
923  }
924
925  /**
926   * Start up a minicluster of hbase, dfs, and zookeeper.
927   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
928   * @param numMasters Master node number.
929   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
930   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
931   *                      HDFS data node number.
932   * @param masterClass The class to use as HMaster, or null for default.
933   * @param rsClass The class to use as HRegionServer, or null for default.
934   * @return The mini HBase cluster created.
935   * @see #shutdownMiniCluster()
936   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
937   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
938   * @see #startMiniCluster(StartMiniClusterOption)
939   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
940   */
941  @Deprecated
942  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
943      Class<? extends HMaster> masterClass,
944      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
945      throws Exception {
946    StartMiniClusterOption option = StartMiniClusterOption.builder()
947        .numMasters(numMasters).masterClass(masterClass)
948        .numRegionServers(numSlaves).rsClass(rsClass)
949        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts)
950        .build();
951    return startMiniCluster(option);
952  }
953
954  /**
955   * Start up a minicluster of hbase, dfs, and zookeeper.
956   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
957   * @param numMasters Master node number.
958   * @param numRegionServers Number of region servers.
959   * @param numDataNodes Number of datanodes.
960   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
961   *                      HDFS data node number.
962   * @param masterClass The class to use as HMaster, or null for default.
963   * @param rsClass The class to use as HRegionServer, or null for default.
964   * @return The mini HBase cluster created.
965   * @see #shutdownMiniCluster()
966   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
967   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
968   * @see #startMiniCluster(StartMiniClusterOption)
969   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
970   */
971  @Deprecated
972  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
973      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
974      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
975    throws Exception {
976    StartMiniClusterOption option = StartMiniClusterOption.builder()
977        .numMasters(numMasters).masterClass(masterClass)
978        .numRegionServers(numRegionServers).rsClass(rsClass)
979        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
980        .build();
981    return startMiniCluster(option);
982  }
983
984  /**
985   * Start up a minicluster of hbase, dfs, and zookeeper.
986   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
987   * @param numMasters Master node number.
988   * @param numRegionServers Number of region servers.
989   * @param numDataNodes Number of datanodes.
990   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
991   *                      HDFS data node number.
992   * @param masterClass The class to use as HMaster, or null for default.
993   * @param rsClass The class to use as HRegionServer, or null for default.
994   * @param createRootDir Whether to create a new root or data directory path.
995   * @param createWALDir Whether to create a new WAL directory.
996   * @return The mini HBase cluster created.
997   * @see #shutdownMiniCluster()
998   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
999   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
1000   * @see #startMiniCluster(StartMiniClusterOption)
1001   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1002   */
1003  @Deprecated
1004  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1005      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1006      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1007      boolean createWALDir) throws Exception {
1008    StartMiniClusterOption option = StartMiniClusterOption.builder()
1009        .numMasters(numMasters).masterClass(masterClass)
1010        .numRegionServers(numRegionServers).rsClass(rsClass)
1011        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
1012        .createRootDir(createRootDir).createWALDir(createWALDir)
1013        .build();
1014    return startMiniCluster(option);
1015  }
1016
1017  /**
1018   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number.
1019   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1020   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1021   * @see #startMiniCluster(StartMiniClusterOption option)
1022   * @see #shutdownMiniDFSCluster()
1023   */
1024  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1025    StartMiniClusterOption option = StartMiniClusterOption.builder()
1026        .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1027    return startMiniCluster(option);
1028  }
1029
1030  /**
1031   * Start up a minicluster of hbase, dfs and zookeeper all using default options.
1032   * Option default value can be found in {@link StartMiniClusterOption.Builder}.
1033   * @see #startMiniCluster(StartMiniClusterOption option)
1034   * @see #shutdownMiniDFSCluster()
1035   */
1036  public MiniHBaseCluster startMiniCluster() throws Exception {
1037    return startMiniCluster(StartMiniClusterOption.builder().build());
1038  }
1039
1040  /**
1041   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed.
1042   * It modifies Configuration.  It homes the cluster data directory under a random
1043   * subdirectory in a directory under System property test.build.data, to be cleaned up on exit.
1044   * @see #shutdownMiniDFSCluster()
1045   */
1046  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1047    LOG.info("Starting up minicluster with option: {}", option);
1048
1049    // If we already put up a cluster, fail.
1050    if (miniClusterRunning) {
1051      throw new IllegalStateException("A mini-cluster is already running");
1052    }
1053    miniClusterRunning = true;
1054
1055    setupClusterTestDir();
1056    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
1057
1058    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1059    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1060    if (dfsCluster == null) {
1061      LOG.info("STARTING DFS");
1062      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1063    } else {
1064      LOG.info("NOT STARTING DFS");
1065    }
1066
1067    // Start up a zk cluster.
1068    if (getZkCluster() == null) {
1069      startMiniZKCluster(option.getNumZkServers());
1070    }
1071
1072    // Start the MiniHBaseCluster
1073    return startMiniHBaseCluster(option);
1074  }
1075
1076  /**
1077   * Starts up mini hbase cluster.
1078   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1079   * This is useful when doing stepped startup of clusters.
1080   * @return Reference to the hbase mini hbase cluster.
1081   * @see #startMiniCluster(StartMiniClusterOption)
1082   * @see #shutdownMiniHBaseCluster()
1083   */
1084  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1085      throws IOException, InterruptedException {
1086    // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
1087    createRootDir(option.isCreateRootDir());
1088    if (option.isCreateWALDir()) {
1089      createWALRootDir();
1090    }
1091    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1092    // for tests that do not read hbase-defaults.xml
1093    setHBaseFsTmpDir();
1094
1095    // These settings will make the server waits until this exact number of
1096    // regions servers are connected.
1097    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1098      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1099    }
1100    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1101      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1102    }
1103
1104    Configuration c = new Configuration(this.conf);
1105    TraceUtil.initTracer(c);
1106    this.hbaseCluster =
1107        new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
1108            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
1109            option.getRsClass());
1110    // Populate the master address configuration from mini cluster configuration.
1111    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1112    // Don't leave here till we've done a successful scan of the hbase:meta
1113    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1114    ResultScanner s = t.getScanner(new Scan());
1115    while (s.next() != null) {
1116      continue;
1117    }
1118    s.close();
1119    t.close();
1120
1121    getAdmin(); // create immediately the hbaseAdmin
1122    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1123
1124    return (MiniHBaseCluster) hbaseCluster;
1125  }
1126
1127  /**
1128   * Starts up mini hbase cluster using default options.
1129   * Default options can be found in {@link StartMiniClusterOption.Builder}.
1130   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1131   * @see #shutdownMiniHBaseCluster()
1132   */
1133  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1134    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1135  }
1136
1137  /**
1138   * Starts up mini hbase cluster.
1139   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1140   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1141   * @param numMasters Master node number.
1142   * @param numRegionServers Number of region servers.
1143   * @return The mini HBase cluster created.
1144   * @see #shutdownMiniHBaseCluster()
1145   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1146   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1147   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1148   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1149   */
1150  @Deprecated
1151  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1152      throws IOException, InterruptedException {
1153    StartMiniClusterOption option = StartMiniClusterOption.builder()
1154        .numMasters(numMasters).numRegionServers(numRegionServers).build();
1155    return startMiniHBaseCluster(option);
1156  }
1157
1158  /**
1159   * Starts up mini hbase cluster.
1160   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1161   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1162   * @param numMasters Master node number.
1163   * @param numRegionServers Number of region servers.
1164   * @param rsPorts Ports that RegionServer should use.
1165   * @return The mini HBase cluster created.
1166   * @see #shutdownMiniHBaseCluster()
1167   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1168   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1169   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1170   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1171   */
1172  @Deprecated
1173  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1174      List<Integer> rsPorts) throws IOException, InterruptedException {
1175    StartMiniClusterOption option = StartMiniClusterOption.builder()
1176        .numMasters(numMasters).numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1177    return startMiniHBaseCluster(option);
1178  }
1179
1180  /**
1181   * Starts up mini hbase cluster.
1182   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1183   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1184   * @param numMasters Master node number.
1185   * @param numRegionServers Number of region servers.
1186   * @param rsPorts Ports that RegionServer should use.
1187   * @param masterClass The class to use as HMaster, or null for default.
1188   * @param rsClass The class to use as HRegionServer, or null for default.
1189   * @param createRootDir Whether to create a new root or data directory path.
1190   * @param createWALDir Whether to create a new WAL directory.
1191   * @return The mini HBase cluster created.
1192   * @see #shutdownMiniHBaseCluster()
1193   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1194   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1195   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1196   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1197   */
1198  @Deprecated
1199  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1200      List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1201      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
1202      boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
1203    StartMiniClusterOption option = StartMiniClusterOption.builder()
1204        .numMasters(numMasters).masterClass(masterClass)
1205        .numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1206        .createRootDir(createRootDir).createWALDir(createWALDir).build();
1207    return startMiniHBaseCluster(option);
1208  }
1209
1210  /**
1211   * Starts the hbase cluster up again after shutting it down previously in a
1212   * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1213   * @param servers number of region servers
1214   */
1215  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1216    this.restartHBaseCluster(servers, null);
1217  }
1218
1219  public void restartHBaseCluster(int servers, List<Integer> ports)
1220      throws IOException, InterruptedException {
1221    StartMiniClusterOption option =
1222        StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1223    restartHBaseCluster(option);
1224    invalidateConnection();
1225  }
1226
1227  public void restartHBaseCluster(StartMiniClusterOption option)
1228      throws IOException, InterruptedException {
1229    if (hbaseAdmin != null) {
1230      hbaseAdmin.close();
1231      hbaseAdmin = null;
1232    }
1233    if (this.asyncConnection != null) {
1234      this.asyncConnection.close();
1235      this.asyncConnection = null;
1236    }
1237    this.hbaseCluster =
1238        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
1239            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
1240            option.getRsClass());
1241    // Don't leave here till we've done a successful scan of the hbase:meta
1242    Connection conn = ConnectionFactory.createConnection(this.conf);
1243    Table t = conn.getTable(TableName.META_TABLE_NAME);
1244    ResultScanner s = t.getScanner(new Scan());
1245    while (s.next() != null) {
1246      // do nothing
1247    }
1248    LOG.info("HBase has been restarted");
1249    s.close();
1250    t.close();
1251    conn.close();
1252  }
1253
1254  /**
1255   * @return Current mini hbase cluster. Only has something in it after a call
1256   * to {@link #startMiniCluster()}.
1257   * @see #startMiniCluster()
1258   */
1259  public MiniHBaseCluster getMiniHBaseCluster() {
1260    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1261      return (MiniHBaseCluster)this.hbaseCluster;
1262    }
1263    throw new RuntimeException(hbaseCluster + " not an instance of " +
1264                               MiniHBaseCluster.class.getName());
1265  }
1266
1267  /**
1268   * Stops mini hbase, zk, and hdfs clusters.
1269   * @see #startMiniCluster(int)
1270   */
1271  public void shutdownMiniCluster() throws IOException {
1272    LOG.info("Shutting down minicluster");
1273    shutdownMiniHBaseCluster();
1274    shutdownMiniDFSCluster();
1275    shutdownMiniZKCluster();
1276
1277    cleanupTestDir();
1278    miniClusterRunning = false;
1279    LOG.info("Minicluster is down");
1280  }
1281
1282  /**
1283   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1284   * @throws java.io.IOException in case command is unsuccessful
1285   */
1286  public void shutdownMiniHBaseCluster() throws IOException {
1287    cleanup();
1288    if (this.hbaseCluster != null) {
1289      this.hbaseCluster.shutdown();
1290      // Wait till hbase is down before going on to shutdown zk.
1291      this.hbaseCluster.waitUntilShutDown();
1292      this.hbaseCluster = null;
1293    }
1294    if (zooKeeperWatcher != null) {
1295      zooKeeperWatcher.close();
1296      zooKeeperWatcher = null;
1297    }
1298  }
1299
1300  /**
1301   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1302   * @throws java.io.IOException throws in case command is unsuccessful
1303   */
1304  public void killMiniHBaseCluster() throws IOException {
1305    cleanup();
1306    if (this.hbaseCluster != null) {
1307      getMiniHBaseCluster().killAll();
1308      this.hbaseCluster = null;
1309    }
1310    if (zooKeeperWatcher != null) {
1311      zooKeeperWatcher.close();
1312      zooKeeperWatcher = null;
1313    }
1314  }
1315
1316  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1317  private void cleanup() throws IOException {
1318    closeConnection();
1319    // unset the configuration for MIN and MAX RS to start
1320    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1321    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1322  }
1323
1324  /**
1325   * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1326   * is true, a new root directory path is fetched irrespective of whether it has been fetched
1327   * before or not. If false, previous path is used.
1328   * Note: this does not cause the root dir to be created.
1329   * @return Fully qualified path for the default hbase root dir
1330   * @throws IOException
1331   */
1332  public Path getDefaultRootDirPath(boolean create) throws IOException {
1333    if (!create) {
1334      return getDataTestDirOnTestFS();
1335    } else {
1336      return getNewDataTestDirOnTestFS();
1337    }
1338  }
1339
1340  /**
1341   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1342   * except that <code>create</code> flag is false.
1343   * Note: this does not cause the root dir to be created.
1344   * @return Fully qualified path for the default hbase root dir
1345   * @throws IOException
1346   */
1347  public Path getDefaultRootDirPath() throws IOException {
1348    return getDefaultRootDirPath(false);
1349  }
1350
1351  /**
1352   * Creates an hbase rootdir in user home directory.  Also creates hbase
1353   * version file.  Normally you won't make use of this method.  Root hbasedir
1354   * is created for you as part of mini cluster startup.  You'd only use this
1355   * method if you were doing manual operation.
1356   * @param create This flag decides whether to get a new
1357   * root or data directory path or not, if it has been fetched already.
1358   * Note : Directory will be made irrespective of whether path has been fetched or not.
1359   * If directory already exists, it will be overwritten
1360   * @return Fully qualified path to hbase root dir
1361   * @throws IOException
1362   */
1363  public Path createRootDir(boolean create) throws IOException {
1364    FileSystem fs = FileSystem.get(this.conf);
1365    Path hbaseRootdir = getDefaultRootDirPath(create);
1366    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1367    fs.mkdirs(hbaseRootdir);
1368    FSUtils.setVersion(fs, hbaseRootdir);
1369    return hbaseRootdir;
1370  }
1371
1372  /**
1373   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1374   * except that <code>create</code> flag is false.
1375   * @return Fully qualified path to hbase root dir
1376   * @throws IOException
1377   */
1378  public Path createRootDir() throws IOException {
1379    return createRootDir(false);
1380  }
1381
1382  /**
1383   * Creates a hbase walDir in the user's home directory.
1384   * Normally you won't make use of this method. Root hbaseWALDir
1385   * is created for you as part of mini cluster startup. You'd only use this
1386   * method if you were doing manual operation.
1387   *
1388   * @return Fully qualified path to hbase root dir
1389   * @throws IOException
1390  */
1391  public Path createWALRootDir() throws IOException {
1392    FileSystem fs = FileSystem.get(this.conf);
1393    Path walDir = getNewDataTestDirOnTestFS();
1394    CommonFSUtils.setWALRootDir(this.conf, walDir);
1395    fs.mkdirs(walDir);
1396    return walDir;
1397  }
1398
1399  private void setHBaseFsTmpDir() throws IOException {
1400    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1401    if (hbaseFsTmpDirInString == null) {
1402      this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1403      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1404    } else {
1405      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1406    }
1407  }
1408
1409  /**
1410   * Flushes all caches in the mini hbase cluster
1411   * @throws IOException
1412   */
1413  public void flush() throws IOException {
1414    getMiniHBaseCluster().flushcache();
1415  }
1416
1417  /**
1418   * Flushes all caches in the mini hbase cluster
1419   * @throws IOException
1420   */
1421  public void flush(TableName tableName) throws IOException {
1422    getMiniHBaseCluster().flushcache(tableName);
1423  }
1424
1425  /**
1426   * Compact all regions in the mini hbase cluster
1427   * @throws IOException
1428   */
1429  public void compact(boolean major) throws IOException {
1430    getMiniHBaseCluster().compact(major);
1431  }
1432
1433  /**
1434   * Compact all of a table's reagion in the mini hbase cluster
1435   * @throws IOException
1436   */
1437  public void compact(TableName tableName, boolean major) throws IOException {
1438    getMiniHBaseCluster().compact(tableName, major);
1439  }
1440
1441  /**
1442   * Create a table.
1443   * @param tableName
1444   * @param family
1445   * @return A Table instance for the created table.
1446   * @throws IOException
1447   */
1448  public Table createTable(TableName tableName, String family)
1449  throws IOException{
1450    return createTable(tableName, new String[]{family});
1451  }
1452
1453  /**
1454   * Create a table.
1455   * @param tableName
1456   * @param families
1457   * @return A Table instance for the created table.
1458   * @throws IOException
1459   */
1460  public Table createTable(TableName tableName, String[] families)
1461  throws IOException {
1462    List<byte[]> fams = new ArrayList<>(families.length);
1463    for (String family : families) {
1464      fams.add(Bytes.toBytes(family));
1465    }
1466    return createTable(tableName, fams.toArray(new byte[0][]));
1467  }
1468
1469  /**
1470   * Create a table.
1471   * @param tableName
1472   * @param family
1473   * @return A Table instance for the created table.
1474   * @throws IOException
1475   */
1476  public Table createTable(TableName tableName, byte[] family)
1477  throws IOException{
1478    return createTable(tableName, new byte[][]{family});
1479  }
1480
1481  /**
1482   * Create a table with multiple regions.
1483   * @param tableName
1484   * @param family
1485   * @param numRegions
1486   * @return A Table instance for the created table.
1487   * @throws IOException
1488   */
1489  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1490      throws IOException {
1491    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1492    byte[] startKey = Bytes.toBytes("aaaaa");
1493    byte[] endKey = Bytes.toBytes("zzzzz");
1494    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1495
1496    return createTable(tableName, new byte[][] { family }, splitKeys);
1497  }
1498
1499  /**
1500   * Create a table.
1501   * @param tableName
1502   * @param families
1503   * @return A Table instance for the created table.
1504   * @throws IOException
1505   */
1506  public Table createTable(TableName tableName, byte[][] families)
1507  throws IOException {
1508    return createTable(tableName, families, (byte[][]) null);
1509  }
1510
1511  /**
1512   * Create a table with multiple regions.
1513   * @param tableName
1514   * @param families
1515   * @return A Table instance for the created table.
1516   * @throws IOException
1517   */
1518  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1519    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1520  }
1521
1522  /**
1523   * Create a table.
1524   * @param tableName
1525   * @param families
1526   * @param splitKeys
1527   * @return A Table instance for the created table.
1528   * @throws IOException
1529   */
1530  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1531      throws IOException {
1532    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1533  }
1534
1535  /**
1536   * Create a table.
1537   * @param tableName the table name
1538   * @param families the families
1539   * @param splitKeys the splitkeys
1540   * @param replicaCount the region replica count
1541   * @return A Table instance for the created table.
1542   * @throws IOException throws IOException
1543   */
1544  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1545      int replicaCount) throws IOException {
1546    return createTable(tableName, families, splitKeys, replicaCount,
1547      new Configuration(getConfiguration()));
1548  }
1549
1550  public Table createTable(TableName tableName, byte[][] families,
1551      int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1552  throws IOException{
1553    HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1554
1555    getAdmin().createTable(desc, startKey, endKey, numRegions);
1556    // HBaseAdmin only waits for regions to appear in hbase:meta we
1557    // should wait until they are assigned
1558    waitUntilAllRegionsAssigned(tableName);
1559    return getConnection().getTable(tableName);
1560  }
1561
1562  /**
1563   * Create a table.
1564   * @param htd
1565   * @param families
1566   * @param c Configuration to use
1567   * @return A Table instance for the created table.
1568   * @throws IOException
1569   */
1570  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1571  throws IOException {
1572    return createTable(htd, families, null, c);
1573  }
1574
1575  /**
1576   * Create a table.
1577   * @param htd table descriptor
1578   * @param families array of column families
1579   * @param splitKeys array of split keys
1580   * @param c Configuration to use
1581   * @return A Table instance for the created table.
1582   * @throws IOException if getAdmin or createTable fails
1583   */
1584  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1585      Configuration c) throws IOException {
1586    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1587    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1588    // on is interfering.
1589    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1590  }
1591
1592  /**
1593   * Create a table.
1594   * @param htd table descriptor
1595   * @param families array of column families
1596   * @param splitKeys array of split keys
1597   * @param type Bloom type
1598   * @param blockSize block size
1599   * @param c Configuration to use
1600   * @return A Table instance for the created table.
1601   * @throws IOException if getAdmin or createTable fails
1602   */
1603
1604  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1605      BloomType type, int blockSize, Configuration c) throws IOException {
1606    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1607    for (byte[] family : families) {
1608      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1609        .setBloomFilterType(type)
1610        .setBlocksize(blockSize);
1611      if (isNewVersionBehaviorEnabled()) {
1612          cfdb.setNewVersionBehavior(true);
1613      }
1614      builder.setColumnFamily(cfdb.build());
1615    }
1616    TableDescriptor td = builder.build();
1617    if (splitKeys != null) {
1618      getAdmin().createTable(td, splitKeys);
1619    } else {
1620      getAdmin().createTable(td);
1621    }
1622    // HBaseAdmin only waits for regions to appear in hbase:meta
1623    // we should wait until they are assigned
1624    waitUntilAllRegionsAssigned(td.getTableName());
1625    return getConnection().getTable(td.getTableName());
1626  }
1627
1628  /**
1629   * Create a table.
1630   * @param htd table descriptor
1631   * @param splitRows array of split keys
1632   * @return A Table instance for the created table.
1633   * @throws IOException
1634   */
1635  public Table createTable(TableDescriptor htd, byte[][] splitRows)
1636      throws IOException {
1637    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1638    if (isNewVersionBehaviorEnabled()) {
1639      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1640         builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family)
1641           .setNewVersionBehavior(true).build());
1642      }
1643    }
1644    if (splitRows != null) {
1645      getAdmin().createTable(builder.build(), splitRows);
1646    } else {
1647      getAdmin().createTable(builder.build());
1648    }
1649    // HBaseAdmin only waits for regions to appear in hbase:meta
1650    // we should wait until they are assigned
1651    waitUntilAllRegionsAssigned(htd.getTableName());
1652    return getConnection().getTable(htd.getTableName());
1653  }
1654
1655  /**
1656   * Create a table.
1657   * @param tableName the table name
1658   * @param families the families
1659   * @param splitKeys the split keys
1660   * @param replicaCount the replica count
1661   * @param c Configuration to use
1662   * @return A Table instance for the created table.
1663   * @throws IOException
1664   */
1665  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1666      int replicaCount, final Configuration c) throws IOException {
1667    HTableDescriptor htd = new HTableDescriptor(tableName);
1668    htd.setRegionReplication(replicaCount);
1669    return createTable(htd, families, splitKeys, c);
1670  }
1671
1672  /**
1673   * Create a table.
1674   * @param tableName
1675   * @param family
1676   * @param numVersions
1677   * @return A Table instance for the created table.
1678   * @throws IOException
1679   */
1680  public Table createTable(TableName tableName, byte[] family, int numVersions)
1681  throws IOException {
1682    return createTable(tableName, new byte[][]{family}, numVersions);
1683  }
1684
1685  /**
1686   * Create a table.
1687   * @param tableName
1688   * @param families
1689   * @param numVersions
1690   * @return A Table instance for the created table.
1691   * @throws IOException
1692   */
1693  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1694      throws IOException {
1695    return createTable(tableName, families, numVersions, (byte[][]) null);
1696  }
1697
1698  /**
1699   * Create a table.
1700   * @param tableName
1701   * @param families
1702   * @param numVersions
1703   * @param splitKeys
1704   * @return A Table instance for the created table.
1705   * @throws IOException
1706   */
1707  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1708      byte[][] splitKeys) throws IOException {
1709    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1710      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
1711    for (byte[] family : families) {
1712      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1713        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family)
1714          .setMaxVersions(numVersions);
1715      if (isNewVersionBehaviorEnabled()) {
1716        familyDescriptor.setNewVersionBehavior(true);
1717      }
1718      tableDescriptor.setColumnFamily(familyDescriptor);
1719    }
1720    if (splitKeys != null) {
1721      getAdmin().createTable(tableDescriptor, splitKeys);
1722    } else {
1723      getAdmin().createTable(tableDescriptor);
1724    }
1725    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1726    // assigned
1727    waitUntilAllRegionsAssigned(tableName);
1728    return getConnection().getTable(tableName);
1729  }
1730
1731  /**
1732   * Create a table with multiple regions.
1733   * @param tableName
1734   * @param families
1735   * @param numVersions
1736   * @return A Table instance for the created table.
1737   * @throws IOException
1738   */
1739  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1740      throws IOException {
1741    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1742  }
1743
1744  /**
1745   * Create a table.
1746   * @param tableName
1747   * @param families
1748   * @param numVersions
1749   * @param blockSize
1750   * @return A Table instance for the created table.
1751   * @throws IOException
1752   */
1753  public Table createTable(TableName tableName, byte[][] families,
1754    int numVersions, int blockSize) throws IOException {
1755    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1756      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
1757    for (byte[] family : families) {
1758      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1759        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family)
1760          .setMaxVersions(numVersions)
1761          .setBlocksize(blockSize);
1762      if (isNewVersionBehaviorEnabled()) {
1763        familyDescriptor.setNewVersionBehavior(true);
1764      }
1765      tableDescriptor.setColumnFamily(familyDescriptor);
1766    }
1767    getAdmin().createTable(tableDescriptor);
1768    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1769    // assigned
1770    waitUntilAllRegionsAssigned(tableName);
1771    return getConnection().getTable(tableName);
1772  }
1773
1774  public Table createTable(TableName tableName, byte[][] families,
1775      int numVersions, int blockSize, String cpName) throws IOException {
1776    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1777      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
1778    for (byte[] family : families) {
1779      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1780        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family)
1781          .setMaxVersions(numVersions)
1782          .setBlocksize(blockSize);
1783      if (isNewVersionBehaviorEnabled()) {
1784        familyDescriptor.setNewVersionBehavior(true);
1785      }
1786      tableDescriptor.setColumnFamily(familyDescriptor);
1787    }
1788    if (cpName != null) {
1789      tableDescriptor.setCoprocessor(cpName);
1790    }
1791    getAdmin().createTable(tableDescriptor);
1792    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1793    // assigned
1794    waitUntilAllRegionsAssigned(tableName);
1795    return getConnection().getTable(tableName);
1796  }
1797
1798  /**
1799   * Create a table.
1800   * @param tableName
1801   * @param families
1802   * @param numVersions
1803   * @return A Table instance for the created table.
1804   * @throws IOException
1805   */
1806  public Table createTable(TableName tableName, byte[][] families,
1807      int[] numVersions) throws IOException {
1808    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1809      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
1810    int i = 0;
1811    for (byte[] family : families) {
1812      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1813        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family)
1814          .setMaxVersions(numVersions[i]);
1815      if (isNewVersionBehaviorEnabled()) {
1816        familyDescriptor.setNewVersionBehavior(true);
1817      }
1818      tableDescriptor.setColumnFamily(familyDescriptor);
1819      i++;
1820    }
1821    getAdmin().createTable(tableDescriptor);
1822    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1823    // assigned
1824    waitUntilAllRegionsAssigned(tableName);
1825    return getConnection().getTable(tableName);
1826  }
1827
1828  /**
1829   * Create a table.
1830   * @param tableName
1831   * @param family
1832   * @param splitRows
1833   * @return A Table instance for the created table.
1834   * @throws IOException
1835   */
1836  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1837      throws IOException {
1838    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1839      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
1840    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1841      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family);
1842    if (isNewVersionBehaviorEnabled()) {
1843      familyDescriptor.setNewVersionBehavior(true);
1844    }
1845    tableDescriptor.setColumnFamily(familyDescriptor);
1846    getAdmin().createTable(tableDescriptor, splitRows);
1847    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1848    // assigned
1849    waitUntilAllRegionsAssigned(tableName);
1850    return getConnection().getTable(tableName);
1851  }
1852
1853  /**
1854   * Create a table with multiple regions.
1855   * @param tableName
1856   * @param family
1857   * @return A Table instance for the created table.
1858   * @throws IOException
1859   */
1860  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1861    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1862  }
1863
1864  /**
1865   * Modify a table, synchronous.
1866   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1867   *   {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1868   * @see Admin#modifyTable(TableDescriptor)
1869   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1870   */
1871  @Deprecated
1872  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1873      throws IOException, InterruptedException {
1874    admin.modifyTable(desc);
1875  }
1876
1877  /**
1878   * Set the number of Region replicas.
1879   */
1880  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1881      throws IOException, InterruptedException {
1882    admin.disableTable(table);
1883    HTableDescriptor desc = new HTableDescriptor(admin.getDescriptor(table));
1884    desc.setRegionReplication(replicaCount);
1885    admin.modifyTable(desc);
1886    admin.enableTable(table);
1887  }
1888
1889  /**
1890   * Drop an existing table
1891   * @param tableName existing table
1892   */
1893  public void deleteTable(TableName tableName) throws IOException {
1894    try {
1895      getAdmin().disableTable(tableName);
1896    } catch (TableNotEnabledException e) {
1897      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1898    }
1899    getAdmin().deleteTable(tableName);
1900  }
1901
1902  /**
1903   * Drop an existing table
1904   * @param tableName existing table
1905   */
1906  public void deleteTableIfAny(TableName tableName) throws IOException {
1907    try {
1908      deleteTable(tableName);
1909    } catch (TableNotFoundException e) {
1910      // ignore
1911    }
1912  }
1913
1914  // ==========================================================================
1915  // Canned table and table descriptor creation
1916  // TODO replace HBaseTestCase
1917
1918  public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1919  public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1920  public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1921  public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1922  private static final int MAXVERSIONS = 3;
1923
1924  public static final char FIRST_CHAR = 'a';
1925  public static final char LAST_CHAR = 'z';
1926  public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1927  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1928
1929  public TableDescriptorBuilder.ModifyableTableDescriptor createModifyableTableDescriptor(
1930      final String name) {
1931    return createModifyableTableDescriptor(TableName.valueOf(name),
1932      HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1933      MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1934  }
1935
1936  public HTableDescriptor createTableDescriptor(final TableName name,
1937      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1938    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1939      new TableDescriptorBuilder.ModifyableTableDescriptor(name);
1940    for (byte[] cfName : new byte[][]{fam1, fam2, fam3}) {
1941      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1942        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(cfName)
1943          .setMinVersions(minVersions)
1944          .setMaxVersions(versions)
1945          .setKeepDeletedCells(keepDeleted)
1946          .setBlockCacheEnabled(false)
1947          .setTimeToLive(ttl);
1948      if (isNewVersionBehaviorEnabled()) {
1949        familyDescriptor.setNewVersionBehavior(true);
1950      }
1951      tableDescriptor.setColumnFamily(familyDescriptor);
1952    }
1953    return new HTableDescriptor(tableDescriptor);
1954  }
1955
1956  public TableDescriptorBuilder.ModifyableTableDescriptor createModifyableTableDescriptor(
1957      final TableName name, final int minVersions, final int versions, final int ttl,
1958      KeepDeletedCells keepDeleted) {
1959    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1960      new TableDescriptorBuilder.ModifyableTableDescriptor(name);
1961    for (byte[] cfName : new byte[][]{fam1, fam2, fam3}) {
1962      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1963        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(cfName)
1964          .setMinVersions(minVersions)
1965          .setMaxVersions(versions)
1966          .setKeepDeletedCells(keepDeleted)
1967          .setBlockCacheEnabled(false)
1968          .setTimeToLive(ttl);
1969      if (isNewVersionBehaviorEnabled()) {
1970        familyDescriptor.setNewVersionBehavior(true);
1971      }
1972      tableDescriptor.setColumnFamily(familyDescriptor);
1973    }
1974    return tableDescriptor;
1975  }
1976
1977  /**
1978   * Create a table of name <code>name</code>.
1979   * @param name Name to give table.
1980   * @return Column descriptor.
1981   */
1982  public HTableDescriptor createTableDescriptor(final TableName name) {
1983    return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1984        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1985  }
1986
1987  public HTableDescriptor createTableDescriptor(final TableName tableName,
1988      byte[] family) {
1989    return createTableDescriptor(tableName, new byte[][] {family}, 1);
1990  }
1991
1992  public HTableDescriptor createTableDescriptor(final TableName tableName,
1993      byte[][] families, int maxVersions) {
1994    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
1995      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
1996
1997    for (byte[] family : families) {
1998      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
1999        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family)
2000          .setMaxVersions(maxVersions);
2001      if (isNewVersionBehaviorEnabled()) {
2002        familyDescriptor.setNewVersionBehavior(true);
2003      }
2004      tableDescriptor.setColumnFamily(familyDescriptor);
2005    }
2006    return new HTableDescriptor(tableDescriptor);
2007  }
2008
2009  /**
2010   * Create an HRegion that writes to the local tmp dirs
2011   * @param desc a table descriptor indicating which table the region belongs to
2012   * @param startKey the start boundary of the region
2013   * @param endKey the end boundary of the region
2014   * @return a region that writes to local dir for testing
2015   * @throws IOException
2016   */
2017  public HRegion createLocalHRegion(TableDescriptor desc, byte [] startKey,
2018      byte [] endKey)
2019  throws IOException {
2020    HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
2021    return createLocalHRegion(hri, desc);
2022  }
2023
2024  /**
2025   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
2026   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
2027   */
2028  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
2029    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
2030  }
2031
2032  /**
2033   * Create an HRegion that writes to the local tmp dirs with specified wal
2034   * @param info regioninfo
2035   * @param desc table descriptor
2036   * @param wal wal for this region.
2037   * @return created hregion
2038   * @throws IOException
2039   */
2040  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal)
2041      throws IOException {
2042    return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
2043  }
2044
2045  /**
2046   * @param tableName
2047   * @param startKey
2048   * @param stopKey
2049   * @param isReadOnly
2050   * @param families
2051   * @return A region on which you must call
2052   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
2053   * @throws IOException
2054   */
2055  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
2056      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
2057    return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
2058        durability, wal, null, families);
2059  }
2060
2061  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
2062      byte[] stopKey,
2063      boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
2064      byte[]... families)
2065      throws IOException {
2066    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
2067      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
2068    tableDescriptor.setReadOnly(isReadOnly);
2069    int i = 0;
2070    for (byte[] family : families) {
2071      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
2072        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family);
2073      if (compactedMemStore != null && i < compactedMemStore.length) {
2074        familyDescriptor.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
2075      } else {
2076        familyDescriptor.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
2077
2078      }
2079      i++;
2080      // Set default to be three versions.
2081      familyDescriptor.setMaxVersions(Integer.MAX_VALUE);
2082      tableDescriptor.setColumnFamily(familyDescriptor);
2083    }
2084    tableDescriptor.setDurability(durability);
2085    HRegionInfo info = new HRegionInfo(tableDescriptor.getTableName(), startKey, stopKey, false);
2086    return createLocalHRegion(info, tableDescriptor, wal);
2087  }
2088
2089  //
2090  // ==========================================================================
2091
2092  /**
2093   * Provide an existing table name to truncate.
2094   * Scans the table and issues a delete for each row read.
2095   * @param tableName existing table
2096   * @return HTable to that new table
2097   * @throws IOException
2098   */
2099  public Table deleteTableData(TableName tableName) throws IOException {
2100    Table table = getConnection().getTable(tableName);
2101    Scan scan = new Scan();
2102    ResultScanner resScan = table.getScanner(scan);
2103    for(Result res : resScan) {
2104      Delete del = new Delete(res.getRow());
2105      table.delete(del);
2106    }
2107    resScan = table.getScanner(scan);
2108    resScan.close();
2109    return table;
2110  }
2111
2112  /**
2113   * Truncate a table using the admin command.
2114   * Effectively disables, deletes, and recreates the table.
2115   * @param tableName table which must exist.
2116   * @param preserveRegions keep the existing split points
2117   * @return HTable for the new table
2118   */
2119  public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws
2120      IOException {
2121    Admin admin = getAdmin();
2122    if (!admin.isTableDisabled(tableName)) {
2123      admin.disableTable(tableName);
2124    }
2125    admin.truncateTable(tableName, preserveRegions);
2126    return getConnection().getTable(tableName);
2127  }
2128
2129  /**
2130   * Truncate a table using the admin command.
2131   * Effectively disables, deletes, and recreates the table.
2132   * For previous behavior of issuing row deletes, see
2133   * deleteTableData.
2134   * Expressly does not preserve regions of existing table.
2135   * @param tableName table which must exist.
2136   * @return HTable for the new table
2137   */
2138  public Table truncateTable(final TableName tableName) throws IOException {
2139    return truncateTable(tableName, false);
2140  }
2141
2142  /**
2143   * Load table with rows from 'aaa' to 'zzz'.
2144   * @param t Table
2145   * @param f Family
2146   * @return Count of rows loaded.
2147   * @throws IOException
2148   */
2149  public int loadTable(final Table t, final byte[] f) throws IOException {
2150    return loadTable(t, new byte[][] {f});
2151  }
2152
2153  /**
2154   * Load table with rows from 'aaa' to 'zzz'.
2155   * @param t Table
2156   * @param f Family
2157   * @return Count of rows loaded.
2158   * @throws IOException
2159   */
2160  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2161    return loadTable(t, new byte[][] {f}, null, writeToWAL);
2162  }
2163
2164  /**
2165   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2166   * @param t Table
2167   * @param f Array of Families to load
2168   * @return Count of rows loaded.
2169   * @throws IOException
2170   */
2171  public int loadTable(final Table t, final byte[][] f) throws IOException {
2172    return loadTable(t, f, null);
2173  }
2174
2175  /**
2176   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2177   * @param t Table
2178   * @param f Array of Families to load
2179   * @param value the values of the cells. If null is passed, the row key is used as value
2180   * @return Count of rows loaded.
2181   * @throws IOException
2182   */
2183  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2184    return loadTable(t, f, value, true);
2185  }
2186
2187  /**
2188   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2189   * @param t Table
2190   * @param f Array of Families to load
2191   * @param value the values of the cells. If null is passed, the row key is used as value
2192   * @return Count of rows loaded.
2193   * @throws IOException
2194   */
2195  public int loadTable(final Table t, final byte[][] f, byte[] value,
2196      boolean writeToWAL) throws IOException {
2197    List<Put> puts = new ArrayList<>();
2198    for (byte[] row : HBaseTestingUtility.ROWS) {
2199      Put put = new Put(row);
2200      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2201      for (int i = 0; i < f.length; i++) {
2202        byte[] value1 = value != null ? value : row;
2203        put.addColumn(f[i], f[i], value1);
2204      }
2205      puts.add(put);
2206    }
2207    t.put(puts);
2208    return puts.size();
2209  }
2210
2211  /** A tracker for tracking and validating table rows
2212   * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])}
2213   */
2214  public static class SeenRowTracker {
2215    int dim = 'z' - 'a' + 1;
2216    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2217    byte[] startRow;
2218    byte[] stopRow;
2219
2220    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2221      this.startRow = startRow;
2222      this.stopRow = stopRow;
2223    }
2224
2225    void reset() {
2226      for (byte[] row : ROWS) {
2227        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2228      }
2229    }
2230
2231    int i(byte b) {
2232      return b - 'a';
2233    }
2234
2235    public void addRow(byte[] row) {
2236      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2237    }
2238
2239    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2240     * all other rows none
2241     */
2242    public void validate() {
2243      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2244        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2245          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2246            int count = seenRows[i(b1)][i(b2)][i(b3)];
2247            int expectedCount = 0;
2248            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2249                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2250              expectedCount = 1;
2251            }
2252            if (count != expectedCount) {
2253              String row = new String(new byte[] {b1,b2,b3}, StandardCharsets.UTF_8);
2254              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
2255                  "instead of " + expectedCount);
2256            }
2257          }
2258        }
2259      }
2260    }
2261  }
2262
2263  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2264    return loadRegion(r, f, false);
2265  }
2266
2267  public int loadRegion(final Region r, final byte[] f) throws IOException {
2268    return loadRegion((HRegion)r, f);
2269  }
2270
2271  /**
2272   * Load region with rows from 'aaa' to 'zzz'.
2273   * @param r Region
2274   * @param f Family
2275   * @param flush flush the cache if true
2276   * @return Count of rows loaded.
2277   * @throws IOException
2278   */
2279  public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2280  throws IOException {
2281    byte[] k = new byte[3];
2282    int rowCount = 0;
2283    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2284      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2285        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2286          k[0] = b1;
2287          k[1] = b2;
2288          k[2] = b3;
2289          Put put = new Put(k);
2290          put.setDurability(Durability.SKIP_WAL);
2291          put.addColumn(f, null, k);
2292          if (r.getWAL() == null) {
2293            put.setDurability(Durability.SKIP_WAL);
2294          }
2295          int preRowCount = rowCount;
2296          int pause = 10;
2297          int maxPause = 1000;
2298          while (rowCount == preRowCount) {
2299            try {
2300              r.put(put);
2301              rowCount++;
2302            } catch (RegionTooBusyException e) {
2303              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2304              Threads.sleep(pause);
2305            }
2306          }
2307        }
2308      }
2309      if (flush) {
2310        r.flush(true);
2311      }
2312    }
2313    return rowCount;
2314  }
2315
2316  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2317      throws IOException {
2318    for (int i = startRow; i < endRow; i++) {
2319      byte[] data = Bytes.toBytes(String.valueOf(i));
2320      Put put = new Put(data);
2321      put.addColumn(f, null, data);
2322      t.put(put);
2323    }
2324  }
2325
2326  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2327      throws IOException {
2328    Random r = new Random();
2329    byte[] row = new byte[rowSize];
2330    for (int i = 0; i < totalRows; i++) {
2331      r.nextBytes(row);
2332      Put put = new Put(row);
2333      put.addColumn(f, new byte[]{0}, new byte[]{0});
2334      t.put(put);
2335    }
2336  }
2337
2338  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2339      int replicaId)
2340      throws IOException {
2341    for (int i = startRow; i < endRow; i++) {
2342      String failMsg = "Failed verification of row :" + i;
2343      byte[] data = Bytes.toBytes(String.valueOf(i));
2344      Get get = new Get(data);
2345      get.setReplicaId(replicaId);
2346      get.setConsistency(Consistency.TIMELINE);
2347      Result result = table.get(get);
2348      assertTrue(failMsg, result.containsColumn(f, null));
2349      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2350      Cell cell = result.getColumnLatestCell(f, null);
2351      assertTrue(failMsg,
2352        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2353          cell.getValueLength()));
2354    }
2355  }
2356
2357  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2358      throws IOException {
2359    verifyNumericRows((HRegion)region, f, startRow, endRow);
2360  }
2361
2362  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2363      throws IOException {
2364    verifyNumericRows(region, f, startRow, endRow, true);
2365  }
2366
2367  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2368      final boolean present) throws IOException {
2369    verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2370  }
2371
2372  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2373      final boolean present) throws IOException {
2374    for (int i = startRow; i < endRow; i++) {
2375      String failMsg = "Failed verification of row :" + i;
2376      byte[] data = Bytes.toBytes(String.valueOf(i));
2377      Result result = region.get(new Get(data));
2378
2379      boolean hasResult = result != null && !result.isEmpty();
2380      assertEquals(failMsg + result, present, hasResult);
2381      if (!present) continue;
2382
2383      assertTrue(failMsg, result.containsColumn(f, null));
2384      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2385      Cell cell = result.getColumnLatestCell(f, null);
2386      assertTrue(failMsg,
2387        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2388          cell.getValueLength()));
2389    }
2390  }
2391
2392  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2393      throws IOException {
2394    for (int i = startRow; i < endRow; i++) {
2395      byte[] data = Bytes.toBytes(String.valueOf(i));
2396      Delete delete = new Delete(data);
2397      delete.addFamily(f);
2398      t.delete(delete);
2399    }
2400  }
2401
2402  /**
2403   * Return the number of rows in the given table.
2404   * @param table to count rows
2405   * @return count of rows
2406   */
2407  public static int countRows(final Table table) throws IOException {
2408    return countRows(table, new Scan());
2409  }
2410
2411  public static int countRows(final Table table, final Scan scan) throws IOException {
2412    try (ResultScanner results = table.getScanner(scan)) {
2413      int count = 0;
2414      while (results.next() != null) {
2415        count++;
2416      }
2417      return count;
2418    }
2419  }
2420
2421  public int countRows(final Table table, final byte[]... families) throws IOException {
2422    Scan scan = new Scan();
2423    for (byte[] family: families) {
2424      scan.addFamily(family);
2425    }
2426    return countRows(table, scan);
2427  }
2428
2429  /**
2430   * Return the number of rows in the given table.
2431   */
2432  public int countRows(final TableName tableName) throws IOException {
2433    Table table = getConnection().getTable(tableName);
2434    try {
2435      return countRows(table);
2436    } finally {
2437      table.close();
2438    }
2439  }
2440
2441  public int countRows(final Region region) throws IOException {
2442    return countRows(region, new Scan());
2443  }
2444
2445  public int countRows(final Region region, final Scan scan) throws IOException {
2446    InternalScanner scanner = region.getScanner(scan);
2447    try {
2448      return countRows(scanner);
2449    } finally {
2450      scanner.close();
2451    }
2452  }
2453
2454  public int countRows(final InternalScanner scanner) throws IOException {
2455    int scannedCount = 0;
2456    List<Cell> results = new ArrayList<>();
2457    boolean hasMore = true;
2458    while (hasMore) {
2459      hasMore = scanner.next(results);
2460      scannedCount += results.size();
2461      results.clear();
2462    }
2463    return scannedCount;
2464  }
2465
2466  /**
2467   * Return an md5 digest of the entire contents of a table.
2468   */
2469  public String checksumRows(final Table table) throws Exception {
2470
2471    Scan scan = new Scan();
2472    ResultScanner results = table.getScanner(scan);
2473    MessageDigest digest = MessageDigest.getInstance("MD5");
2474    for (Result res : results) {
2475      digest.update(res.getRow());
2476    }
2477    results.close();
2478    return digest.toString();
2479  }
2480
2481  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2482  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2483  static {
2484    int i = 0;
2485    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2486      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2487        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2488          ROWS[i][0] = b1;
2489          ROWS[i][1] = b2;
2490          ROWS[i][2] = b3;
2491          i++;
2492        }
2493      }
2494    }
2495  }
2496
2497  public static final byte[][] KEYS = {
2498    HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2499    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2500    Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2501    Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2502    Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2503    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2504    Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2505    Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2506    Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2507  };
2508
2509  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2510      Bytes.toBytes("bbb"),
2511      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2512      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2513      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2514      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2515      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2516      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2517      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2518      Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2519  };
2520
2521  /**
2522   * Create rows in hbase:meta for regions of the specified table with the specified
2523   * start keys.  The first startKey should be a 0 length byte array if you
2524   * want to form a proper range of regions.
2525   * @param conf
2526   * @param htd
2527   * @param startKeys
2528   * @return list of region info for regions added to meta
2529   * @throws IOException
2530   */
2531  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2532      final TableDescriptor htd, byte [][] startKeys)
2533  throws IOException {
2534    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2535    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2536    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2537    MetaTableAccessor
2538        .updateTableState(getConnection(), htd.getTableName(), TableState.State.ENABLED);
2539    // add custom ones
2540    for (int i = 0; i < startKeys.length; i++) {
2541      int j = (i + 1) % startKeys.length;
2542      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName())
2543          .setStartKey(startKeys[i])
2544          .setEndKey(startKeys[j])
2545          .build();
2546      MetaTableAccessor.addRegionToMeta(getConnection(), hri);
2547      newRegions.add(hri);
2548    }
2549
2550    meta.close();
2551    return newRegions;
2552  }
2553
2554  /**
2555   * Create an unmanaged WAL. Be sure to close it when you're through.
2556   */
2557  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2558      throws IOException {
2559    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2560    // unless I pass along via the conf.
2561    Configuration confForWAL = new Configuration(conf);
2562    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2563    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2564  }
2565
2566
2567  /**
2568   * Create a region with it's own WAL. Be sure to call
2569   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2570   */
2571  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2572      final Configuration conf, final TableDescriptor htd) throws IOException {
2573    return createRegionAndWAL(info, rootDir, conf, htd, true);
2574  }
2575
2576  /**
2577   * Create a region with it's own WAL. Be sure to call
2578   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2579   */
2580  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2581      final Configuration conf, final TableDescriptor htd, BlockCache blockCache)
2582      throws IOException {
2583    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2584    region.setBlockCache(blockCache);
2585    region.initialize();
2586    return region;
2587  }
2588  /**
2589   * Create a region with it's own WAL. Be sure to call
2590   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2591   */
2592  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2593      final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2594      throws IOException {
2595    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2596    region.setMobFileCache(mobFileCache);
2597    region.initialize();
2598    return region;
2599  }
2600
2601  /**
2602   * Create a region with it's own WAL. Be sure to call
2603   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2604   */
2605  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2606      final Configuration conf, final TableDescriptor htd, boolean initialize)
2607      throws IOException {
2608    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2609    WAL wal = createWal(conf, rootDir, info);
2610    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2611  }
2612
2613  /**
2614   * Returns all rows from the hbase:meta table.
2615   *
2616   * @throws IOException When reading the rows fails.
2617   */
2618  public List<byte[]> getMetaTableRows() throws IOException {
2619    // TODO: Redo using MetaTableAccessor class
2620    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2621    List<byte[]> rows = new ArrayList<>();
2622    ResultScanner s = t.getScanner(new Scan());
2623    for (Result result : s) {
2624      LOG.info("getMetaTableRows: row -> " +
2625        Bytes.toStringBinary(result.getRow()));
2626      rows.add(result.getRow());
2627    }
2628    s.close();
2629    t.close();
2630    return rows;
2631  }
2632
2633  /**
2634   * Returns all rows from the hbase:meta table for a given user table
2635   *
2636   * @throws IOException When reading the rows fails.
2637   */
2638  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2639    // TODO: Redo using MetaTableAccessor.
2640    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2641    List<byte[]> rows = new ArrayList<>();
2642    ResultScanner s = t.getScanner(new Scan());
2643    for (Result result : s) {
2644      RegionInfo info = MetaTableAccessor.getRegionInfo(result);
2645      if (info == null) {
2646        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2647        // TODO figure out what to do for this new hosed case.
2648        continue;
2649      }
2650
2651      if (info.getTable().equals(tableName)) {
2652        LOG.info("getMetaTableRows: row -> " +
2653            Bytes.toStringBinary(result.getRow()) + info);
2654        rows.add(result.getRow());
2655      }
2656    }
2657    s.close();
2658    t.close();
2659    return rows;
2660  }
2661
2662  /**
2663   * Returns all regions of the specified table
2664   *
2665   * @param tableName the table name
2666   * @return all regions of the specified table
2667   * @throws IOException when getting the regions fails.
2668   */
2669  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2670    try (Admin admin = getConnection().getAdmin()) {
2671      return admin.getRegions(tableName);
2672    }
2673  }
2674
2675  /*
2676   * Find any other region server which is different from the one identified by parameter
2677   * @param rs
2678   * @return another region server
2679   */
2680  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2681    for (JVMClusterUtil.RegionServerThread rst :
2682      getMiniHBaseCluster().getRegionServerThreads()) {
2683      if (!(rst.getRegionServer() == rs)) {
2684        return rst.getRegionServer();
2685      }
2686    }
2687    return null;
2688  }
2689
2690  /**
2691   * Tool to get the reference to the region server object that holds the
2692   * region of the specified user table.
2693   * @param tableName user table to lookup in hbase:meta
2694   * @return region server that holds it, null if the row doesn't exist
2695   * @throws IOException
2696   * @throws InterruptedException
2697   */
2698  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2699      throws IOException, InterruptedException {
2700    List<RegionInfo> regions = getRegions(tableName);
2701    if (regions == null || regions.isEmpty()) {
2702      return null;
2703    }
2704    LOG.debug("Found " + regions.size() + " regions for table " +
2705        tableName);
2706
2707    byte[] firstRegionName = regions.stream()
2708        .filter(r -> !r.isOffline())
2709        .map(RegionInfo::getRegionName)
2710        .findFirst()
2711        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2712
2713    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2714    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2715      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2716    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2717      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2718    RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2719    while(retrier.shouldRetry()) {
2720      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2721      if (index != -1) {
2722        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2723      }
2724      // Came back -1.  Region may not be online yet.  Sleep a while.
2725      retrier.sleepUntilNextRetry();
2726    }
2727    return null;
2728  }
2729
2730  /**
2731   * Starts a <code>MiniMRCluster</code> with a default number of
2732   * <code>TaskTracker</code>'s.
2733   *
2734   * @throws IOException When starting the cluster fails.
2735   */
2736  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2737    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2738    conf.setIfUnset(
2739        "yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2740        "99.0");
2741    startMiniMapReduceCluster(2);
2742    return mrCluster;
2743  }
2744
2745  /**
2746   * Tasktracker has a bug where changing the hadoop.log.dir system property
2747   * will not change its internal static LOG_DIR variable.
2748   */
2749  private void forceChangeTaskLogDir() {
2750    Field logDirField;
2751    try {
2752      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2753      logDirField.setAccessible(true);
2754
2755      Field modifiersField = Field.class.getDeclaredField("modifiers");
2756      modifiersField.setAccessible(true);
2757      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2758
2759      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2760    } catch (SecurityException e) {
2761      throw new RuntimeException(e);
2762    } catch (NoSuchFieldException e) {
2763      // TODO Auto-generated catch block
2764      throw new RuntimeException(e);
2765    } catch (IllegalArgumentException e) {
2766      throw new RuntimeException(e);
2767    } catch (IllegalAccessException e) {
2768      throw new RuntimeException(e);
2769    }
2770  }
2771
2772  /**
2773   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2774   * filesystem.
2775   * @param servers  The number of <code>TaskTracker</code>'s to start.
2776   * @throws IOException When starting the cluster fails.
2777   */
2778  private void startMiniMapReduceCluster(final int servers) throws IOException {
2779    if (mrCluster != null) {
2780      throw new IllegalStateException("MiniMRCluster is already running");
2781    }
2782    LOG.info("Starting mini mapreduce cluster...");
2783    setupClusterTestDir();
2784    createDirsAndSetProperties();
2785
2786    forceChangeTaskLogDir();
2787
2788    //// hadoop2 specific settings
2789    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2790    // we up the VM usable so that processes don't get killed.
2791    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2792
2793    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2794    // this avoids the problem by disabling speculative task execution in tests.
2795    conf.setBoolean("mapreduce.map.speculative", false);
2796    conf.setBoolean("mapreduce.reduce.speculative", false);
2797    ////
2798
2799    // Allow the user to override FS URI for this map-reduce cluster to use.
2800    mrCluster = new MiniMRCluster(servers,
2801      FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2802      null, null, new JobConf(this.conf));
2803    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2804    if (jobConf == null) {
2805      jobConf = mrCluster.createJobConf();
2806    }
2807
2808    jobConf.set("mapreduce.cluster.local.dir",
2809      conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2810    LOG.info("Mini mapreduce cluster started");
2811
2812    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2813    // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2814    // necessary config properties here.  YARN-129 required adding a few properties.
2815    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2816    // this for mrv2 support; mr1 ignores this
2817    conf.set("mapreduce.framework.name", "yarn");
2818    conf.setBoolean("yarn.is.minicluster", true);
2819    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2820    if (rmAddress != null) {
2821      conf.set("yarn.resourcemanager.address", rmAddress);
2822    }
2823    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2824    if (historyAddress != null) {
2825      conf.set("mapreduce.jobhistory.address", historyAddress);
2826    }
2827    String schedulerAddress =
2828      jobConf.get("yarn.resourcemanager.scheduler.address");
2829    if (schedulerAddress != null) {
2830      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2831    }
2832    String mrJobHistoryWebappAddress =
2833      jobConf.get("mapreduce.jobhistory.webapp.address");
2834    if (mrJobHistoryWebappAddress != null) {
2835      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2836    }
2837    String yarnRMWebappAddress =
2838      jobConf.get("yarn.resourcemanager.webapp.address");
2839    if (yarnRMWebappAddress != null) {
2840      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2841    }
2842  }
2843
2844  /**
2845   * Stops the previously started <code>MiniMRCluster</code>.
2846   */
2847  public void shutdownMiniMapReduceCluster() {
2848    if (mrCluster != null) {
2849      LOG.info("Stopping mini mapreduce cluster...");
2850      mrCluster.shutdown();
2851      mrCluster = null;
2852      LOG.info("Mini mapreduce cluster stopped");
2853    }
2854    // Restore configuration to point to local jobtracker
2855    conf.set("mapreduce.jobtracker.address", "local");
2856  }
2857
2858  /**
2859   * Create a stubbed out RegionServerService, mainly for getting FS.
2860   */
2861  public RegionServerServices createMockRegionServerService() throws IOException {
2862    return createMockRegionServerService((ServerName)null);
2863  }
2864
2865  /**
2866   * Create a stubbed out RegionServerService, mainly for getting FS.
2867   * This version is used by TestTokenAuthentication
2868   */
2869  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
2870      IOException {
2871    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2872    rss.setFileSystem(getTestFileSystem());
2873    rss.setRpcServer(rpc);
2874    return rss;
2875  }
2876
2877  /**
2878   * Create a stubbed out RegionServerService, mainly for getting FS.
2879   * This version is used by TestOpenRegionHandler
2880   */
2881  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2882    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2883    rss.setFileSystem(getTestFileSystem());
2884    return rss;
2885  }
2886
2887  /**
2888   * Switches the logger for the given class to DEBUG level.
2889   * @param clazz The class for which to switch to debug logging.
2890   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2891   *             HBase only uses log4j. You should do this by your own as it you know which log
2892   *             framework you are using then set the log level to debug is very easy.
2893   */
2894  @Deprecated
2895  public void enableDebug(Class<?> clazz) {
2896    Log4jUtils.enableDebug(clazz);
2897  }
2898
2899  /**
2900   * Expire the Master's session
2901   * @throws Exception
2902   */
2903  public void expireMasterSession() throws Exception {
2904    HMaster master = getMiniHBaseCluster().getMaster();
2905    expireSession(master.getZooKeeper(), false);
2906  }
2907
2908  /**
2909   * Expire a region server's session
2910   * @param index which RS
2911   */
2912  public void expireRegionServerSession(int index) throws Exception {
2913    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2914    expireSession(rs.getZooKeeper(), false);
2915    decrementMinRegionServerCount();
2916  }
2917
2918  private void decrementMinRegionServerCount() {
2919    // decrement the count for this.conf, for newly spwaned master
2920    // this.hbaseCluster shares this configuration too
2921    decrementMinRegionServerCount(getConfiguration());
2922
2923    // each master thread keeps a copy of configuration
2924    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2925      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2926    }
2927  }
2928
2929  private void decrementMinRegionServerCount(Configuration conf) {
2930    int currentCount = conf.getInt(
2931        ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2932    if (currentCount != -1) {
2933      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2934          Math.max(currentCount - 1, 1));
2935    }
2936  }
2937
2938  public void expireSession(ZKWatcher nodeZK) throws Exception {
2939   expireSession(nodeZK, false);
2940  }
2941
2942  /**
2943   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2944   * http://hbase.apache.org/book.html#trouble.zookeeper
2945   * There are issues when doing this:
2946   * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2947   * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2948   *
2949   * @param nodeZK - the ZK watcher to expire
2950   * @param checkStatus - true to check if we can create a Table with the
2951   *                    current configuration.
2952   */
2953  public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
2954    throws Exception {
2955    Configuration c = new Configuration(this.conf);
2956    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2957    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2958    byte[] password = zk.getSessionPasswd();
2959    long sessionID = zk.getSessionId();
2960
2961    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2962    //  so we create a first watcher to be sure that the
2963    //  event was sent. We expect that if our watcher receives the event
2964    //  other watchers on the same machine will get is as well.
2965    // When we ask to close the connection, ZK does not close it before
2966    //  we receive all the events, so don't have to capture the event, just
2967    //  closing the connection should be enough.
2968    ZooKeeper monitor = new ZooKeeper(quorumServers,
2969      1000, new org.apache.zookeeper.Watcher(){
2970      @Override
2971      public void process(WatchedEvent watchedEvent) {
2972        LOG.info("Monitor ZKW received event="+watchedEvent);
2973      }
2974    } , sessionID, password);
2975
2976    // Making it expire
2977    ZooKeeper newZK = new ZooKeeper(quorumServers,
2978        1000, EmptyWatcher.instance, sessionID, password);
2979
2980    //ensure that we have connection to the server before closing down, otherwise
2981    //the close session event will be eaten out before we start CONNECTING state
2982    long start = System.currentTimeMillis();
2983    while (newZK.getState() != States.CONNECTED
2984         && System.currentTimeMillis() - start < 1000) {
2985       Thread.sleep(1);
2986    }
2987    newZK.close();
2988    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2989
2990    // Now closing & waiting to be sure that the clients get it.
2991    monitor.close();
2992
2993    if (checkStatus) {
2994      getConnection().getTable(TableName.META_TABLE_NAME).close();
2995    }
2996  }
2997
2998  /**
2999   * Get the Mini HBase cluster.
3000   *
3001   * @return hbase cluster
3002   * @see #getHBaseClusterInterface()
3003   */
3004  public MiniHBaseCluster getHBaseCluster() {
3005    return getMiniHBaseCluster();
3006  }
3007
3008  /**
3009   * Returns the HBaseCluster instance.
3010   * <p>Returned object can be any of the subclasses of HBaseCluster, and the
3011   * tests referring this should not assume that the cluster is a mini cluster or a
3012   * distributed one. If the test only works on a mini cluster, then specific
3013   * method {@link #getMiniHBaseCluster()} can be used instead w/o the
3014   * need to type-cast.
3015   */
3016  public HBaseCluster getHBaseClusterInterface() {
3017    //implementation note: we should rename this method as #getHBaseCluster(),
3018    //but this would require refactoring 90+ calls.
3019    return hbaseCluster;
3020  }
3021
3022  private void initConnection() throws IOException {
3023    User user = UserProvider.instantiate(conf).getCurrent();
3024    this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
3025  }
3026
3027  /**
3028   * Resets the connections so that the next time getConnection() is called, a new connection is
3029   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
3030   * the connection is not valid anymore.
3031   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
3032   *   written, not all start() stop() calls go through this class. Most tests directly operate on
3033   *   the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
3034   *   maintain the connection state automatically. Cleaning this is a much bigger refactor.
3035   */
3036  public void invalidateConnection() throws IOException {
3037    closeConnection();
3038    // Update the master addresses if they changed.
3039    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
3040    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
3041    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
3042        masterConfigBefore, masterConfAfter);
3043    conf.set(HConstants.MASTER_ADDRS_KEY,
3044        getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
3045  }
3046
3047  /**
3048   * Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it
3049   * thread-safe).
3050   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
3051   */
3052  public Connection getConnection() throws IOException {
3053    if (this.asyncConnection == null) {
3054      initConnection();
3055    }
3056    return this.asyncConnection.toConnection();
3057  }
3058
3059  public AsyncClusterConnection getAsyncConnection() throws IOException {
3060    if (this.asyncConnection == null) {
3061      initConnection();
3062    }
3063    return this.asyncConnection;
3064  }
3065
3066  public void closeConnection() throws IOException {
3067    Closeables.close(hbaseAdmin, true);
3068    Closeables.close(asyncConnection, true);
3069    this.hbaseAdmin = null;
3070    this.asyncConnection = null;
3071  }
3072
3073  /**
3074   * Returns an Admin instance which is shared between HBaseTestingUtility instance users.
3075   * Closing it has no effect, it will be closed automatically when the cluster shutdowns
3076   */
3077  public synchronized Admin getAdmin() throws IOException {
3078    if (hbaseAdmin == null){
3079      this.hbaseAdmin = getConnection().getAdmin();
3080    }
3081    return hbaseAdmin;
3082  }
3083
3084  private Admin hbaseAdmin = null;
3085
3086  /**
3087   * Returns an {@link Hbck} instance. Needs be closed when done.
3088   */
3089  public Hbck getHbck() throws IOException {
3090    return getConnection().getHbck();
3091  }
3092
3093  /**
3094   * Unassign the named region.
3095   *
3096   * @param regionName  The region to unassign.
3097   */
3098  public void unassignRegion(String regionName) throws IOException {
3099    unassignRegion(Bytes.toBytes(regionName));
3100  }
3101
3102  /**
3103   * Unassign the named region.
3104   *
3105   * @param regionName  The region to unassign.
3106   */
3107  public void unassignRegion(byte[] regionName) throws IOException {
3108    getAdmin().unassign(regionName, true);
3109  }
3110
3111  /**
3112   * Closes the region containing the given row.
3113   *
3114   * @param row  The row to find the containing region.
3115   * @param table  The table to find the region.
3116   */
3117  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
3118    unassignRegionByRow(Bytes.toBytes(row), table);
3119  }
3120
3121  /**
3122   * Closes the region containing the given row.
3123   *
3124   * @param row  The row to find the containing region.
3125   * @param table  The table to find the region.
3126   * @throws IOException
3127   */
3128  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
3129    HRegionLocation hrl = table.getRegionLocation(row);
3130    unassignRegion(hrl.getRegion().getRegionName());
3131  }
3132
3133  /*
3134   * Retrieves a splittable region randomly from tableName
3135   *
3136   * @param tableName name of table
3137   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3138   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3139   */
3140  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3141    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3142    int regCount = regions.size();
3143    Set<Integer> attempted = new HashSet<>();
3144    int idx;
3145    int attempts = 0;
3146    do {
3147      regions = getHBaseCluster().getRegions(tableName);
3148      if (regCount != regions.size()) {
3149        // if there was region movement, clear attempted Set
3150        attempted.clear();
3151      }
3152      regCount = regions.size();
3153      // There are chances that before we get the region for the table from an RS the region may
3154      // be going for CLOSE.  This may be because online schema change is enabled
3155      if (regCount > 0) {
3156        idx = random.nextInt(regCount);
3157        // if we have just tried this region, there is no need to try again
3158        if (attempted.contains(idx))
3159          continue;
3160        try {
3161          regions.get(idx).checkSplit();
3162          return regions.get(idx);
3163        } catch (Exception ex) {
3164          LOG.warn("Caught exception", ex);
3165          attempted.add(idx);
3166        }
3167      }
3168      attempts++;
3169    } while (maxAttempts == -1 || attempts < maxAttempts);
3170    return null;
3171  }
3172
3173  public MiniDFSCluster getDFSCluster() {
3174    return dfsCluster;
3175  }
3176
3177  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3178    setDFSCluster(cluster, true);
3179  }
3180
3181  /**
3182   * Set the MiniDFSCluster
3183   * @param cluster cluster to use
3184   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3185   * it is set.
3186   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3187   * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3188   */
3189  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3190      throws IllegalStateException, IOException {
3191    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3192      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3193    }
3194    this.dfsCluster = cluster;
3195    this.setFs();
3196  }
3197
3198  public FileSystem getTestFileSystem() throws IOException {
3199    return HFileSystem.get(conf);
3200  }
3201
3202  /**
3203   * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3204   * (30 seconds).
3205   * @param table Table to wait on.
3206   * @throws InterruptedException
3207   * @throws IOException
3208   */
3209  public void waitTableAvailable(TableName table)
3210      throws InterruptedException, IOException {
3211    waitTableAvailable(table.getName(), 30000);
3212  }
3213
3214  public void waitTableAvailable(TableName table, long timeoutMillis)
3215      throws InterruptedException, IOException {
3216    waitFor(timeoutMillis, predicateTableAvailable(table));
3217  }
3218
3219  /**
3220   * Wait until all regions in a table have been assigned
3221   * @param table Table to wait on.
3222   * @param timeoutMillis Timeout.
3223   */
3224  public void waitTableAvailable(byte[] table, long timeoutMillis)
3225      throws InterruptedException, IOException {
3226    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3227  }
3228
3229  public String explainTableAvailability(TableName tableName) throws IOException {
3230    String msg = explainTableState(tableName, TableState.State.ENABLED) + ", ";
3231    if (getHBaseCluster().getMaster().isAlive()) {
3232      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3233        .getRegionStates().getRegionAssignments();
3234      final List<Pair<RegionInfo, ServerName>> metaLocations =
3235        MetaTableAccessor.getTableRegionsAndLocations(asyncConnection.toConnection(), tableName);
3236      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3237        RegionInfo hri = metaLocation.getFirst();
3238        ServerName sn = metaLocation.getSecond();
3239        if (!assignments.containsKey(hri)) {
3240          msg += ", region " + hri + " not assigned, but found in meta, it expected to be on " + sn;
3241
3242        } else if (sn == null) {
3243          msg += ",  region " + hri + " assigned,  but has no server in meta";
3244        } else if (!sn.equals(assignments.get(hri))) {
3245          msg += ",  region " + hri + " assigned,  but has different servers in meta and AM ( " +
3246            sn + " <> " + assignments.get(hri);
3247        }
3248      }
3249    }
3250    return msg;
3251  }
3252
3253  public String explainTableState(final TableName table, TableState.State state)
3254      throws IOException {
3255    TableState tableState = MetaTableAccessor.getTableState(asyncConnection.toConnection(), table);
3256    if (tableState == null) {
3257      return "TableState in META: No table state in META for table " + table +
3258        " last state in meta (including deleted is " + findLastTableState(table) + ")";
3259    } else if (!tableState.inStates(state)) {
3260      return "TableState in META: Not " + state + " state, but " + tableState;
3261    } else {
3262      return "TableState in META: OK";
3263    }
3264  }
3265
3266  @Nullable
3267  public TableState findLastTableState(final TableName table) throws IOException {
3268    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3269    MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3270      @Override
3271      public boolean visit(Result r) throws IOException {
3272        if (!Arrays.equals(r.getRow(), table.getName())) {
3273          return false;
3274        }
3275        TableState state = MetaTableAccessor.getTableState(r);
3276        if (state != null) {
3277          lastTableState.set(state);
3278        }
3279        return true;
3280      }
3281    };
3282    MetaTableAccessor.scanMeta(asyncConnection.toConnection(), null, null,
3283      MetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
3284    return lastTableState.get();
3285  }
3286
3287  /**
3288   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3289   * regions have been all assigned.  Will timeout after default period (30 seconds)
3290   * Tolerates nonexistent table.
3291   * @param table the table to wait on.
3292   * @throws InterruptedException if interrupted while waiting
3293   * @throws IOException if an IO problem is encountered
3294   */
3295  public void waitTableEnabled(TableName table)
3296      throws InterruptedException, IOException {
3297    waitTableEnabled(table, 30000);
3298  }
3299
3300  /**
3301   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3302   * regions have been all assigned.
3303   * @see #waitTableEnabled(TableName, long)
3304   * @param table Table to wait on.
3305   * @param timeoutMillis Time to wait on it being marked enabled.
3306   * @throws InterruptedException
3307   * @throws IOException
3308   */
3309  public void waitTableEnabled(byte[] table, long timeoutMillis)
3310  throws InterruptedException, IOException {
3311    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3312  }
3313
3314  public void waitTableEnabled(TableName table, long timeoutMillis)
3315  throws IOException {
3316    waitFor(timeoutMillis, predicateTableEnabled(table));
3317  }
3318
3319  /**
3320   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3321   * Will timeout after default period (30 seconds)
3322   * @param table Table to wait on.
3323   * @throws InterruptedException
3324   * @throws IOException
3325   */
3326  public void waitTableDisabled(byte[] table)
3327          throws InterruptedException, IOException {
3328    waitTableDisabled(table, 30000);
3329  }
3330
3331  public void waitTableDisabled(TableName table, long millisTimeout)
3332          throws InterruptedException, IOException {
3333    waitFor(millisTimeout, predicateTableDisabled(table));
3334  }
3335
3336  /**
3337   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3338   * @param table Table to wait on.
3339   * @param timeoutMillis Time to wait on it being marked disabled.
3340   * @throws InterruptedException
3341   * @throws IOException
3342   */
3343  public void waitTableDisabled(byte[] table, long timeoutMillis)
3344          throws InterruptedException, IOException {
3345    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3346  }
3347
3348  /**
3349   * Make sure that at least the specified number of region servers
3350   * are running
3351   * @param num minimum number of region servers that should be running
3352   * @return true if we started some servers
3353   * @throws IOException
3354   */
3355  public boolean ensureSomeRegionServersAvailable(final int num)
3356      throws IOException {
3357    boolean startedServer = false;
3358    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3359    for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3360      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3361      startedServer = true;
3362    }
3363
3364    return startedServer;
3365  }
3366
3367
3368  /**
3369   * Make sure that at least the specified number of region servers
3370   * are running. We don't count the ones that are currently stopping or are
3371   * stopped.
3372   * @param num minimum number of region servers that should be running
3373   * @return true if we started some servers
3374   * @throws IOException
3375   */
3376  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3377    throws IOException {
3378    boolean startedServer = ensureSomeRegionServersAvailable(num);
3379
3380    int nonStoppedServers = 0;
3381    for (JVMClusterUtil.RegionServerThread rst :
3382      getMiniHBaseCluster().getRegionServerThreads()) {
3383
3384      HRegionServer hrs = rst.getRegionServer();
3385      if (hrs.isStopping() || hrs.isStopped()) {
3386        LOG.info("A region server is stopped or stopping:"+hrs);
3387      } else {
3388        nonStoppedServers++;
3389      }
3390    }
3391    for (int i=nonStoppedServers; i<num; ++i) {
3392      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3393      startedServer = true;
3394    }
3395    return startedServer;
3396  }
3397
3398
3399  /**
3400   * This method clones the passed <code>c</code> configuration setting a new
3401   * user into the clone.  Use it getting new instances of FileSystem.  Only
3402   * works for DistributedFileSystem w/o Kerberos.
3403   * @param c Initial configuration
3404   * @param differentiatingSuffix Suffix to differentiate this user from others.
3405   * @return A new configuration instance with a different user set into it.
3406   * @throws IOException
3407   */
3408  public static User getDifferentUser(final Configuration c,
3409    final String differentiatingSuffix)
3410  throws IOException {
3411    FileSystem currentfs = FileSystem.get(c);
3412    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3413      return User.getCurrent();
3414    }
3415    // Else distributed filesystem.  Make a new instance per daemon.  Below
3416    // code is taken from the AppendTestUtil over in hdfs.
3417    String username = User.getCurrent().getName() +
3418      differentiatingSuffix;
3419    User user = User.createUserForTesting(c, username,
3420        new String[]{"supergroup"});
3421    return user;
3422  }
3423
3424  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3425      throws IOException {
3426    NavigableSet<String> online = new TreeSet<>();
3427    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3428      try {
3429        for (RegionInfo region :
3430            ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3431          online.add(region.getRegionNameAsString());
3432        }
3433      } catch (RegionServerStoppedException e) {
3434        // That's fine.
3435      }
3436    }
3437    for (MasterThread mt : cluster.getLiveMasterThreads()) {
3438      try {
3439        for (RegionInfo region :
3440            ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3441          online.add(region.getRegionNameAsString());
3442        }
3443      } catch (RegionServerStoppedException e) {
3444        // That's fine.
3445      } catch (ServerNotRunningYetException e) {
3446        // That's fine.
3447      }
3448    }
3449    return online;
3450  }
3451
3452  /**
3453   * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3454   * makes tests linger.  Here is the exception you'll see:
3455   * <pre>
3456   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3457   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3458   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3459   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3460   * </pre>
3461   * @param stream A DFSClient.DFSOutputStream.
3462   * @param max
3463   * @throws NoSuchFieldException
3464   * @throws SecurityException
3465   * @throws IllegalAccessException
3466   * @throws IllegalArgumentException
3467   */
3468  public static void setMaxRecoveryErrorCount(final OutputStream stream,
3469      final int max) {
3470    try {
3471      Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3472      for (Class<?> clazz: clazzes) {
3473        String className = clazz.getSimpleName();
3474        if (className.equals("DFSOutputStream")) {
3475          if (clazz.isInstance(stream)) {
3476            Field maxRecoveryErrorCountField =
3477              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3478            maxRecoveryErrorCountField.setAccessible(true);
3479            maxRecoveryErrorCountField.setInt(stream, max);
3480            break;
3481          }
3482        }
3483      }
3484    } catch (Exception e) {
3485      LOG.info("Could not set max recovery field", e);
3486    }
3487  }
3488
3489  /**
3490   * Uses directly the assignment manager to assign the region. and waits until the specified region
3491   * has completed assignment.
3492   * @return true if the region is assigned false otherwise.
3493   */
3494  public boolean assignRegion(final RegionInfo regionInfo)
3495      throws IOException, InterruptedException {
3496    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3497    am.assign(regionInfo);
3498    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3499  }
3500
3501  /**
3502   * Move region to destination server and wait till region is completely moved and online
3503   *
3504   * @param destRegion region to move
3505   * @param destServer destination server of the region
3506   * @throws InterruptedException
3507   * @throws IOException
3508   */
3509  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3510      throws InterruptedException, IOException {
3511    HMaster master = getMiniHBaseCluster().getMaster();
3512    // TODO: Here we start the move. The move can take a while.
3513    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3514    while (true) {
3515      ServerName serverName = master.getAssignmentManager().getRegionStates()
3516          .getRegionServerOfRegion(destRegion);
3517      if (serverName != null && serverName.equals(destServer)) {
3518        assertRegionOnServer(destRegion, serverName, 2000);
3519        break;
3520      }
3521      Thread.sleep(10);
3522    }
3523  }
3524
3525  /**
3526   * Wait until all regions for a table in hbase:meta have a non-empty
3527   * info:server, up to a configuable timeout value (default is 60 seconds)
3528   * This means all regions have been deployed,
3529   * master has been informed and updated hbase:meta with the regions deployed
3530   * server.
3531   * @param tableName the table name
3532   * @throws IOException
3533   */
3534  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3535    waitUntilAllRegionsAssigned(tableName,
3536      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3537  }
3538
3539  /**
3540   * Waith until all system table's regions get assigned
3541   * @throws IOException
3542   */
3543  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3544    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3545  }
3546
3547  /**
3548   * Wait until all regions for a table in hbase:meta have a non-empty
3549   * info:server, or until timeout.  This means all regions have been deployed,
3550   * master has been informed and updated hbase:meta with the regions deployed
3551   * server.
3552   * @param tableName the table name
3553   * @param timeout timeout, in milliseconds
3554   * @throws IOException
3555   */
3556  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3557      throws IOException {
3558    if (!TableName.isMetaTableName(tableName)) {
3559      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3560        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " +
3561            timeout + "ms");
3562        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3563          @Override
3564          public String explainFailure() throws IOException {
3565            return explainTableAvailability(tableName);
3566          }
3567
3568          @Override
3569          public boolean evaluate() throws IOException {
3570            Scan scan = new Scan();
3571            scan.addFamily(HConstants.CATALOG_FAMILY);
3572            boolean tableFound = false;
3573            try (ResultScanner s = meta.getScanner(scan)) {
3574              for (Result r; (r = s.next()) != null;) {
3575                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3576                HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3577                if (info != null && info.getTable().equals(tableName)) {
3578                  // Get server hosting this region from catalog family. Return false if no server
3579                  // hosting this region, or if the server hosting this region was recently killed
3580                  // (for fault tolerance testing).
3581                  tableFound = true;
3582                  byte[] server =
3583                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3584                  if (server == null) {
3585                    return false;
3586                  } else {
3587                    byte[] startCode =
3588                        r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3589                    ServerName serverName =
3590                        ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," +
3591                            Bytes.toLong(startCode));
3592                    if (!getHBaseClusterInterface().isDistributedCluster() &&
3593                        getHBaseCluster().isKilledRS(serverName)) {
3594                      return false;
3595                    }
3596                  }
3597                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3598                    return false;
3599                  }
3600                }
3601              }
3602            }
3603            if (!tableFound) {
3604              LOG.warn("Didn't find the entries for table " + tableName + " in meta, already deleted?");
3605            }
3606            return tableFound;
3607          }
3608        });
3609      }
3610    }
3611    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3612    // check from the master state if we are using a mini cluster
3613    if (!getHBaseClusterInterface().isDistributedCluster()) {
3614      // So, all regions are in the meta table but make sure master knows of the assignments before
3615      // returning -- sometimes this can lag.
3616      HMaster master = getHBaseCluster().getMaster();
3617      final RegionStates states = master.getAssignmentManager().getRegionStates();
3618      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3619        @Override
3620        public String explainFailure() throws IOException {
3621          return explainTableAvailability(tableName);
3622        }
3623
3624        @Override
3625        public boolean evaluate() throws IOException {
3626          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3627          return hris != null && !hris.isEmpty();
3628        }
3629      });
3630    }
3631    LOG.info("All regions for table " + tableName + " assigned.");
3632  }
3633
3634  /**
3635   * Do a small get/scan against one store. This is required because store
3636   * has no actual methods of querying itself, and relies on StoreScanner.
3637   */
3638  public static List<Cell> getFromStoreFile(HStore store,
3639                                                Get get) throws IOException {
3640    Scan scan = new Scan(get);
3641    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3642        scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3643        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3644        // readpoint 0.
3645        0);
3646
3647    List<Cell> result = new ArrayList<>();
3648    scanner.next(result);
3649    if (!result.isEmpty()) {
3650      // verify that we are on the row we want:
3651      Cell kv = result.get(0);
3652      if (!CellUtil.matchingRows(kv, get.getRow())) {
3653        result.clear();
3654      }
3655    }
3656    scanner.close();
3657    return result;
3658  }
3659
3660  /**
3661   * Create region split keys between startkey and endKey
3662   *
3663   * @param startKey
3664   * @param endKey
3665   * @param numRegions the number of regions to be created. it has to be greater than 3.
3666   * @return resulting split keys
3667   */
3668  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3669    assertTrue(numRegions>3);
3670    byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3671    byte [][] result = new byte[tmpSplitKeys.length+1][];
3672    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3673    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3674    return result;
3675  }
3676
3677  /**
3678   * Do a small get/scan against one store. This is required because store
3679   * has no actual methods of querying itself, and relies on StoreScanner.
3680   */
3681  public static List<Cell> getFromStoreFile(HStore store,
3682                                                byte [] row,
3683                                                NavigableSet<byte[]> columns
3684                                                ) throws IOException {
3685    Get get = new Get(row);
3686    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3687    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3688
3689    return getFromStoreFile(store,get);
3690  }
3691
3692  public static void assertKVListsEqual(String additionalMsg,
3693      final List<? extends Cell> expected,
3694      final List<? extends Cell> actual) {
3695    final int eLen = expected.size();
3696    final int aLen = actual.size();
3697    final int minLen = Math.min(eLen, aLen);
3698
3699    int i;
3700    for (i = 0; i < minLen
3701        && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0;
3702        ++i) {}
3703
3704    if (additionalMsg == null) {
3705      additionalMsg = "";
3706    }
3707    if (!additionalMsg.isEmpty()) {
3708      additionalMsg = ". " + additionalMsg;
3709    }
3710
3711    if (eLen != aLen || i != minLen) {
3712      throw new AssertionError(
3713          "Expected and actual KV arrays differ at position " + i + ": " +
3714          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3715          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3716    }
3717  }
3718
3719  public static <T> String safeGetAsStr(List<T> lst, int i) {
3720    if (0 <= i && i < lst.size()) {
3721      return lst.get(i).toString();
3722    } else {
3723      return "<out_of_range>";
3724    }
3725  }
3726
3727  public String getClusterKey() {
3728    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3729        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3730        + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3731            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3732  }
3733
3734  /** Creates a random table with the given parameters */
3735  public Table createRandomTable(TableName tableName,
3736      final Collection<String> families,
3737      final int maxVersions,
3738      final int numColsPerRow,
3739      final int numFlushes,
3740      final int numRegions,
3741      final int numRowsPerFlush)
3742      throws IOException, InterruptedException {
3743
3744    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3745        " regions, " + numFlushes + " storefiles per region, " +
3746        numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3747        "\n");
3748
3749    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3750    final int numCF = families.size();
3751    final byte[][] cfBytes = new byte[numCF][];
3752    {
3753      int cfIndex = 0;
3754      for (String cf : families) {
3755        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3756      }
3757    }
3758
3759    final int actualStartKey = 0;
3760    final int actualEndKey = Integer.MAX_VALUE;
3761    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3762    final int splitStartKey = actualStartKey + keysPerRegion;
3763    final int splitEndKey = actualEndKey - keysPerRegion;
3764    final String keyFormat = "%08x";
3765    final Table table = createTable(tableName, cfBytes,
3766        maxVersions,
3767        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3768        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3769        numRegions);
3770
3771    if (hbaseCluster != null) {
3772      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3773    }
3774
3775    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3776
3777    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3778      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3779        final byte[] row = Bytes.toBytes(String.format(keyFormat,
3780            actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3781
3782        Put put = new Put(row);
3783        Delete del = new Delete(row);
3784        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3785          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3786          final long ts = rand.nextInt();
3787          final byte[] qual = Bytes.toBytes("col" + iCol);
3788          if (rand.nextBoolean()) {
3789            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3790                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3791                ts + "_random_" + rand.nextLong());
3792            put.addColumn(cf, qual, ts, value);
3793          } else if (rand.nextDouble() < 0.8) {
3794            del.addColumn(cf, qual, ts);
3795          } else {
3796            del.addColumns(cf, qual, ts);
3797          }
3798        }
3799
3800        if (!put.isEmpty()) {
3801          mutator.mutate(put);
3802        }
3803
3804        if (!del.isEmpty()) {
3805          mutator.mutate(del);
3806        }
3807      }
3808      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3809      mutator.flush();
3810      if (hbaseCluster != null) {
3811        getMiniHBaseCluster().flushcache(table.getName());
3812      }
3813    }
3814    mutator.close();
3815
3816    return table;
3817  }
3818
3819  public static int randomFreePort() {
3820    return HBaseCommonTestingUtility.randomFreePort();
3821  }
3822  public static String randomMultiCastAddress() {
3823    return "226.1.1." + random.nextInt(254);
3824  }
3825
3826  public static void waitForHostPort(String host, int port)
3827      throws IOException {
3828    final int maxTimeMs = 10000;
3829    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3830    IOException savedException = null;
3831    LOG.info("Waiting for server at " + host + ":" + port);
3832    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3833      try {
3834        Socket sock = new Socket(InetAddress.getByName(host), port);
3835        sock.close();
3836        savedException = null;
3837        LOG.info("Server at " + host + ":" + port + " is available");
3838        break;
3839      } catch (UnknownHostException e) {
3840        throw new IOException("Failed to look up " + host, e);
3841      } catch (IOException e) {
3842        savedException = e;
3843      }
3844      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3845    }
3846
3847    if (savedException != null) {
3848      throw savedException;
3849    }
3850  }
3851
3852  /**
3853   * Creates a pre-split table for load testing. If the table already exists,
3854   * logs a warning and continues.
3855   * @return the number of regions the table was split into
3856   */
3857  public static int createPreSplitLoadTestTable(Configuration conf,
3858      TableName tableName, byte[] columnFamily, Algorithm compression,
3859      DataBlockEncoding dataBlockEncoding) throws IOException {
3860    return createPreSplitLoadTestTable(conf, tableName,
3861      columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3862      Durability.USE_DEFAULT);
3863  }
3864  /**
3865   * Creates a pre-split table for load testing. If the table already exists,
3866   * logs a warning and continues.
3867   * @return the number of regions the table was split into
3868   */
3869  public static int createPreSplitLoadTestTable(Configuration conf,
3870      TableName tableName, byte[] columnFamily, Algorithm compression,
3871      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3872      Durability durability)
3873          throws IOException {
3874    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
3875      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
3876    tableDescriptor.setDurability(durability);
3877    tableDescriptor.setRegionReplication(regionReplication);
3878    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
3879      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(columnFamily);
3880    familyDescriptor.setDataBlockEncoding(dataBlockEncoding);
3881    familyDescriptor.setCompressionType(compression);
3882    return createPreSplitLoadTestTable(conf, tableDescriptor, familyDescriptor,
3883      numRegionsPerServer);
3884  }
3885
3886  /**
3887   * Creates a pre-split table for load testing. If the table already exists,
3888   * logs a warning and continues.
3889   * @return the number of regions the table was split into
3890   */
3891  public static int createPreSplitLoadTestTable(Configuration conf,
3892      TableName tableName, byte[][] columnFamilies, Algorithm compression,
3893      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3894      Durability durability)
3895          throws IOException {
3896    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
3897      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
3898    tableDescriptor.setDurability(durability);
3899    tableDescriptor.setRegionReplication(regionReplication);
3900    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3901    for (int i = 0; i < columnFamilies.length; i++) {
3902      ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
3903        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(columnFamilies[i]);
3904      familyDescriptor.setDataBlockEncoding(dataBlockEncoding);
3905      familyDescriptor.setCompressionType(compression);
3906      hcds[i] = familyDescriptor;
3907    }
3908    return createPreSplitLoadTestTable(conf, tableDescriptor, hcds, numRegionsPerServer);
3909  }
3910
3911  /**
3912   * Creates a pre-split table for load testing. If the table already exists,
3913   * logs a warning and continues.
3914   * @return the number of regions the table was split into
3915   */
3916  public static int createPreSplitLoadTestTable(Configuration conf,
3917      TableDescriptor desc, ColumnFamilyDescriptor hcd) throws IOException {
3918    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3919  }
3920
3921  /**
3922   * Creates a pre-split table for load testing. If the table already exists,
3923   * logs a warning and continues.
3924   * @return the number of regions the table was split into
3925   */
3926  public static int createPreSplitLoadTestTable(Configuration conf,
3927      TableDescriptor desc, ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3928    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] {hcd},
3929        numRegionsPerServer);
3930  }
3931
3932  /**
3933   * Creates a pre-split table for load testing. If the table already exists,
3934   * logs a warning and continues.
3935   * @return the number of regions the table was split into
3936   */
3937  public static int createPreSplitLoadTestTable(Configuration conf,
3938      TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
3939      int numRegionsPerServer) throws IOException {
3940    return createPreSplitLoadTestTable(conf, desc, hcds,
3941      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
3942  }
3943
3944  /**
3945   * Creates a pre-split table for load testing. If the table already exists,
3946   * logs a warning and continues.
3947   * @return the number of regions the table was split into
3948   */
3949  public static int createPreSplitLoadTestTable(Configuration conf,
3950      TableDescriptor td, ColumnFamilyDescriptor[] cds,
3951      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
3952    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3953    for (ColumnFamilyDescriptor cd : cds) {
3954      if (!td.hasColumnFamily(cd.getName())) {
3955        builder.setColumnFamily(cd);
3956      }
3957    }
3958    td = builder.build();
3959    int totalNumberOfRegions = 0;
3960    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3961    Admin admin = unmanagedConnection.getAdmin();
3962
3963    try {
3964      // create a table a pre-splits regions.
3965      // The number of splits is set as:
3966      //    region servers * regions per region server).
3967      int numberOfServers = admin.getRegionServers().size();
3968      if (numberOfServers == 0) {
3969        throw new IllegalStateException("No live regionservers");
3970      }
3971
3972      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3973      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3974          "pre-splitting table into " + totalNumberOfRegions + " regions " +
3975          "(regions per server: " + numRegionsPerServer + ")");
3976
3977      byte[][] splits = splitter.split(
3978          totalNumberOfRegions);
3979
3980      admin.createTable(td, splits);
3981    } catch (MasterNotRunningException e) {
3982      LOG.error("Master not running", e);
3983      throw new IOException(e);
3984    } catch (TableExistsException e) {
3985      LOG.warn("Table " + td.getTableName() +
3986          " already exists, continuing");
3987    } finally {
3988      admin.close();
3989      unmanagedConnection.close();
3990    }
3991    return totalNumberOfRegions;
3992  }
3993
3994  public static int getMetaRSPort(Connection connection) throws IOException {
3995    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3996      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3997    }
3998  }
3999
4000  /**
4001   *  Due to async racing issue, a region may not be in
4002   *  the online region list of a region server yet, after
4003   *  the assignment znode is deleted and the new assignment
4004   *  is recorded in master.
4005   */
4006  public void assertRegionOnServer(
4007      final RegionInfo hri, final ServerName server,
4008      final long timeout) throws IOException, InterruptedException {
4009    long timeoutTime = System.currentTimeMillis() + timeout;
4010    while (true) {
4011      List<RegionInfo> regions = getAdmin().getRegions(server);
4012      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
4013      long now = System.currentTimeMillis();
4014      if (now > timeoutTime) break;
4015      Thread.sleep(10);
4016    }
4017    fail("Could not find region " + hri.getRegionNameAsString()
4018      + " on server " + server);
4019  }
4020
4021  /**
4022   * Check to make sure the region is open on the specified
4023   * region server, but not on any other one.
4024   */
4025  public void assertRegionOnlyOnServer(
4026      final RegionInfo hri, final ServerName server,
4027      final long timeout) throws IOException, InterruptedException {
4028    long timeoutTime = System.currentTimeMillis() + timeout;
4029    while (true) {
4030      List<RegionInfo> regions = getAdmin().getRegions(server);
4031      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
4032        List<JVMClusterUtil.RegionServerThread> rsThreads =
4033          getHBaseCluster().getLiveRegionServerThreads();
4034        for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
4035          HRegionServer rs = rsThread.getRegionServer();
4036          if (server.equals(rs.getServerName())) {
4037            continue;
4038          }
4039          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
4040          for (HRegion r: hrs) {
4041            assertTrue("Region should not be double assigned",
4042              r.getRegionInfo().getRegionId() != hri.getRegionId());
4043          }
4044        }
4045        return; // good, we are happy
4046      }
4047      long now = System.currentTimeMillis();
4048      if (now > timeoutTime) break;
4049      Thread.sleep(10);
4050    }
4051    fail("Could not find region " + hri.getRegionNameAsString()
4052      + " on server " + server);
4053  }
4054
4055  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
4056    TableDescriptor td =
4057        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4058    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4059    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
4060  }
4061
4062  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
4063      BlockCache blockCache) throws IOException {
4064    TableDescriptor td =
4065        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4066    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4067    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
4068  }
4069
4070  public void setFileSystemURI(String fsURI) {
4071    FS_URI = fsURI;
4072  }
4073
4074  /**
4075   * Returns a {@link Predicate} for checking that there are no regions in transition in master
4076   */
4077  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
4078    return new ExplainingPredicate<IOException>() {
4079      @Override
4080      public String explainFailure() throws IOException {
4081        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
4082            .getAssignmentManager().getRegionStates();
4083        return "found in transition: " + regionStates.getRegionsInTransition().toString();
4084      }
4085
4086      @Override
4087      public boolean evaluate() throws IOException {
4088        HMaster master = getMiniHBaseCluster().getMaster();
4089        if (master == null) return false;
4090        AssignmentManager am = master.getAssignmentManager();
4091        if (am == null) return false;
4092        return !am.hasRegionsInTransition();
4093      }
4094    };
4095  }
4096
4097  /**
4098   * Returns a {@link Predicate} for checking that table is enabled
4099   */
4100  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4101    return new ExplainingPredicate<IOException>() {
4102      @Override
4103      public String explainFailure() throws IOException {
4104        return explainTableState(tableName, TableState.State.ENABLED);
4105      }
4106
4107      @Override
4108      public boolean evaluate() throws IOException {
4109        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
4110      }
4111    };
4112  }
4113
4114  /**
4115   * Returns a {@link Predicate} for checking that table is enabled
4116   */
4117  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4118    return new ExplainingPredicate<IOException>() {
4119      @Override
4120      public String explainFailure() throws IOException {
4121        return explainTableState(tableName, TableState.State.DISABLED);
4122      }
4123
4124      @Override
4125      public boolean evaluate() throws IOException {
4126        return getAdmin().isTableDisabled(tableName);
4127      }
4128    };
4129  }
4130
4131  /**
4132   * Returns a {@link Predicate} for checking that table is enabled
4133   */
4134  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4135    return new ExplainingPredicate<IOException>() {
4136      @Override
4137      public String explainFailure() throws IOException {
4138        return explainTableAvailability(tableName);
4139      }
4140
4141      @Override
4142      public boolean evaluate() throws IOException {
4143        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
4144        if (tableAvailable) {
4145          try (Table table = getConnection().getTable(tableName)) {
4146            TableDescriptor htd = table.getDescriptor();
4147            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
4148                .getAllRegionLocations()) {
4149              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
4150                  .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
4151                  .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
4152              for (byte[] family : htd.getColumnFamilyNames()) {
4153                scan.addFamily(family);
4154              }
4155              try (ResultScanner scanner = table.getScanner(scan)) {
4156                scanner.next();
4157              }
4158            }
4159          }
4160        }
4161        return tableAvailable;
4162      }
4163    };
4164  }
4165
4166  /**
4167   * Wait until no regions in transition.
4168   * @param timeout How long to wait.
4169   * @throws IOException
4170   */
4171  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
4172    waitFor(timeout, predicateNoRegionsInTransition());
4173  }
4174
4175  /**
4176   * Wait until no regions in transition. (time limit 15min)
4177   * @throws IOException
4178   */
4179  public void waitUntilNoRegionsInTransition() throws IOException {
4180    waitUntilNoRegionsInTransition(15 * 60000);
4181  }
4182
4183  /**
4184   * Wait until labels is ready in VisibilityLabelsCache.
4185   * @param timeoutMillis
4186   * @param labels
4187   */
4188  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4189    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4190    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4191
4192      @Override
4193      public boolean evaluate() {
4194        for (String label : labels) {
4195          if (labelsCache.getLabelOrdinal(label) == 0) {
4196            return false;
4197          }
4198        }
4199        return true;
4200      }
4201
4202      @Override
4203      public String explainFailure() {
4204        for (String label : labels) {
4205          if (labelsCache.getLabelOrdinal(label) == 0) {
4206            return label + " is not available yet";
4207          }
4208        }
4209        return "";
4210      }
4211    });
4212  }
4213
4214  /**
4215   * Create a set of column descriptors with the combination of compression,
4216   * encoding, bloom codecs available.
4217   * @return the list of column descriptors
4218   */
4219  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4220    return generateColumnDescriptors("");
4221  }
4222
4223  /**
4224   * Create a set of column descriptors with the combination of compression,
4225   * encoding, bloom codecs available.
4226   * @param prefix family names prefix
4227   * @return the list of column descriptors
4228   */
4229  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4230    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4231    long familyId = 0;
4232    for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4233      for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4234        for (BloomType bloomType: BloomType.values()) {
4235          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4236          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4237            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4238          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4239          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4240          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4241          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4242          familyId++;
4243        }
4244      }
4245    }
4246    return columnFamilyDescriptors;
4247  }
4248
4249  /**
4250   * Get supported compression algorithms.
4251   * @return supported compression algorithms.
4252   */
4253  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4254    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4255    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4256    for (String algoName : allAlgos) {
4257      try {
4258        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4259        algo.getCompressor();
4260        supportedAlgos.add(algo);
4261      } catch (Throwable t) {
4262        // this algo is not available
4263      }
4264    }
4265    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4266  }
4267
4268  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4269    Scan scan = new Scan().withStartRow(row);
4270    scan.setSmall(true);
4271    scan.setCaching(1);
4272    scan.setReversed(true);
4273    scan.addFamily(family);
4274    try (RegionScanner scanner = r.getScanner(scan)) {
4275      List<Cell> cells = new ArrayList<>(1);
4276      scanner.next(cells);
4277      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4278        return null;
4279      }
4280      return Result.create(cells);
4281    }
4282  }
4283
4284  private boolean isTargetTable(final byte[] inRow, Cell c) {
4285    String inputRowString = Bytes.toString(inRow);
4286    int i = inputRowString.indexOf(HConstants.DELIMITER);
4287    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4288    int o = outputRowString.indexOf(HConstants.DELIMITER);
4289    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4290  }
4291
4292  /**
4293   * Sets up {@link MiniKdc} for testing security.
4294   * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4295   * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4296   * FYI, there is also the easier-to-use kerby KDC server and utility for using it,
4297   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4298   * less baggage. It came in in HBASE-5291.
4299   */
4300  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4301    Properties conf = MiniKdc.createConf();
4302    conf.put(MiniKdc.DEBUG, true);
4303    MiniKdc kdc = null;
4304    File dir = null;
4305    // There is time lag between selecting a port and trying to bind with it. It's possible that
4306    // another service captures the port in between which'll result in BindException.
4307    boolean bindException;
4308    int numTries = 0;
4309    do {
4310      try {
4311        bindException = false;
4312        dir = new File(getDataTestDir("kdc").toUri().getPath());
4313        kdc = new MiniKdc(conf, dir);
4314        kdc.start();
4315      } catch (BindException e) {
4316        FileUtils.deleteDirectory(dir);  // clean directory
4317        numTries++;
4318        if (numTries == 3) {
4319          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4320          throw e;
4321        }
4322        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4323        bindException = true;
4324      }
4325    } while (bindException);
4326    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4327    return kdc;
4328  }
4329
4330  public int getNumHFiles(final TableName tableName, final byte[] family) {
4331    int numHFiles = 0;
4332    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4333      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
4334                                    family);
4335    }
4336    return numHFiles;
4337  }
4338
4339  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4340                               final byte[] family) {
4341    int numHFiles = 0;
4342    for (Region region : rs.getRegions(tableName)) {
4343      numHFiles += region.getStore(family).getStorefilesCount();
4344    }
4345    return numHFiles;
4346  }
4347
4348  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4349    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
4350    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4351    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4352    assertEquals(ltdFamilies.size(), rtdFamilies.size());
4353    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), it2 =
4354         rtdFamilies.iterator(); it.hasNext();) {
4355      assertEquals(0,
4356          ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4357    }
4358  }
4359}