001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.lang.reflect.Field;
029import java.lang.reflect.Modifier;
030import java.net.BindException;
031import java.net.DatagramSocket;
032import java.net.InetAddress;
033import java.net.ServerSocket;
034import java.net.Socket;
035import java.net.UnknownHostException;
036import java.nio.charset.StandardCharsets;
037import java.security.MessageDigest;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.HashSet;
043import java.util.Iterator;
044import java.util.List;
045import java.util.Map;
046import java.util.NavigableSet;
047import java.util.Properties;
048import java.util.Random;
049import java.util.Set;
050import java.util.TreeSet;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.atomic.AtomicReference;
053import java.util.stream.Collectors;
054import org.apache.commons.io.FileUtils;
055import org.apache.commons.lang3.RandomStringUtils;
056import org.apache.commons.logging.impl.Jdk14Logger;
057import org.apache.commons.logging.impl.Log4JLogger;
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.FileSystem;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
062import org.apache.hadoop.hbase.Waiter.Predicate;
063import org.apache.hadoop.hbase.client.Admin;
064import org.apache.hadoop.hbase.client.BufferedMutator;
065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.ConnectionFactory;
069import org.apache.hadoop.hbase.client.Consistency;
070import org.apache.hadoop.hbase.client.Delete;
071import org.apache.hadoop.hbase.client.Durability;
072import org.apache.hadoop.hbase.client.Get;
073import org.apache.hadoop.hbase.client.HBaseAdmin;
074import org.apache.hadoop.hbase.client.Hbck;
075import org.apache.hadoop.hbase.client.ImmutableHRegionInfo;
076import org.apache.hadoop.hbase.client.ImmutableHTableDescriptor;
077import org.apache.hadoop.hbase.client.Put;
078import org.apache.hadoop.hbase.client.RegionInfo;
079import org.apache.hadoop.hbase.client.RegionInfoBuilder;
080import org.apache.hadoop.hbase.client.RegionLocator;
081import org.apache.hadoop.hbase.client.Result;
082import org.apache.hadoop.hbase.client.ResultScanner;
083import org.apache.hadoop.hbase.client.Scan;
084import org.apache.hadoop.hbase.client.Table;
085import org.apache.hadoop.hbase.client.TableDescriptor;
086import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
087import org.apache.hadoop.hbase.client.TableState;
088import org.apache.hadoop.hbase.fs.HFileSystem;
089import org.apache.hadoop.hbase.io.compress.Compression;
090import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
091import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
092import org.apache.hadoop.hbase.io.hfile.BlockCache;
093import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
094import org.apache.hadoop.hbase.io.hfile.HFile;
095import org.apache.hadoop.hbase.ipc.RpcServerInterface;
096import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
097import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
098import org.apache.hadoop.hbase.master.HMaster;
099import org.apache.hadoop.hbase.master.RegionState;
100import org.apache.hadoop.hbase.master.ServerManager;
101import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
102import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
103import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
104import org.apache.hadoop.hbase.master.assignment.RegionStates;
105import org.apache.hadoop.hbase.mob.MobFileCache;
106import org.apache.hadoop.hbase.regionserver.BloomType;
107import org.apache.hadoop.hbase.regionserver.ChunkCreator;
108import org.apache.hadoop.hbase.regionserver.HRegion;
109import org.apache.hadoop.hbase.regionserver.HRegionServer;
110import org.apache.hadoop.hbase.regionserver.HStore;
111import org.apache.hadoop.hbase.regionserver.InternalScanner;
112import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
113import org.apache.hadoop.hbase.regionserver.Region;
114import org.apache.hadoop.hbase.regionserver.RegionScanner;
115import org.apache.hadoop.hbase.regionserver.RegionServerServices;
116import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
117import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
118import org.apache.hadoop.hbase.security.User;
119import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
120import org.apache.hadoop.hbase.trace.TraceUtil;
121import org.apache.hadoop.hbase.util.Bytes;
122import org.apache.hadoop.hbase.util.CommonFSUtils;
123import org.apache.hadoop.hbase.util.FSTableDescriptors;
124import org.apache.hadoop.hbase.util.FSUtils;
125import org.apache.hadoop.hbase.util.JVMClusterUtil;
126import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
127import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
128import org.apache.hadoop.hbase.util.Pair;
129import org.apache.hadoop.hbase.util.RegionSplitter;
130import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
131import org.apache.hadoop.hbase.util.RetryCounter;
132import org.apache.hadoop.hbase.util.Threads;
133import org.apache.hadoop.hbase.wal.WAL;
134import org.apache.hadoop.hbase.wal.WALFactory;
135import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
136import org.apache.hadoop.hbase.zookeeper.ZKConfig;
137import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
138import org.apache.hadoop.hdfs.DFSClient;
139import org.apache.hadoop.hdfs.DistributedFileSystem;
140import org.apache.hadoop.hdfs.MiniDFSCluster;
141import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
142import org.apache.hadoop.mapred.JobConf;
143import org.apache.hadoop.mapred.MiniMRCluster;
144import org.apache.hadoop.mapred.TaskLog;
145import org.apache.hadoop.minikdc.MiniKdc;
146import org.apache.log4j.LogManager;
147import org.apache.yetus.audience.InterfaceAudience;
148import org.apache.zookeeper.WatchedEvent;
149import org.apache.zookeeper.ZooKeeper;
150import org.apache.zookeeper.ZooKeeper.States;
151import org.slf4j.Logger;
152import org.slf4j.LoggerFactory;
153import org.slf4j.impl.Log4jLoggerAdapter;
154
155import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
156
157/**
158 * Facility for testing HBase. Replacement for
159 * old HBaseTestCase and HBaseClusterTestCase functionality.
160 * Create an instance and keep it around testing HBase.  This class is
161 * meant to be your one-stop shop for anything you might need testing.  Manages
162 * one cluster at a time only. Managed cluster can be an in-process
163 * {@link MiniHBaseCluster}, or a deployed cluster of type {@code DistributedHBaseCluster}.
164 * Not all methods work with the real cluster.
165 * Depends on log4j being on classpath and
166 * hbase-site.xml for logging and test-run configuration.  It does not set
167 * logging levels.
168 * In the configuration properties, default values for master-info-port and
169 * region-server-port are overridden such that a random port will be assigned (thus
170 * avoiding port contention if another local HBase instance is already running).
171 * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
172 * setting it to true.
173 */
174@InterfaceAudience.Public
175@SuppressWarnings("deprecation")
176public class HBaseTestingUtility extends HBaseZKTestingUtility {
177
178  /**
179   * System property key to get test directory value. Name is as it is because mini dfs has
180   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
181   * used in mini dfs.
182   * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs.
183   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a>
184   */
185  @Deprecated
186  private static final String TEST_DIRECTORY_KEY = "test.build.data";
187
188  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
189  /**
190   * The default number of regions per regionserver when creating a pre-split
191   * table.
192   */
193  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
194
195
196  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
197  public static final boolean PRESPLIT_TEST_TABLE = true;
198
199  private MiniDFSCluster dfsCluster = null;
200
201  private volatile HBaseCluster hbaseCluster = null;
202  private MiniMRCluster mrCluster = null;
203
204  /** If there is a mini cluster running for this testing utility instance. */
205  private volatile boolean miniClusterRunning;
206
207  private String hadoopLogDir;
208
209  /** Directory on test filesystem where we put the data for this instance of
210    * HBaseTestingUtility*/
211  private Path dataTestDirOnTestFS = null;
212
213  /**
214   * Shared cluster connection.
215   */
216  private volatile Connection connection;
217
218  /** Filesystem URI used for map-reduce mini-cluster setup */
219  private static String FS_URI;
220
221  /** This is for unit tests parameterized with a single boolean. */
222  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
223
224  /**
225   * Checks to see if a specific port is available.
226   *
227   * @param port the port number to check for availability
228   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
229   */
230  public static boolean available(int port) {
231    ServerSocket ss = null;
232    DatagramSocket ds = null;
233    try {
234      ss = new ServerSocket(port);
235      ss.setReuseAddress(true);
236      ds = new DatagramSocket(port);
237      ds.setReuseAddress(true);
238      return true;
239    } catch (IOException e) {
240      // Do nothing
241    } finally {
242      if (ds != null) {
243        ds.close();
244      }
245
246      if (ss != null) {
247        try {
248          ss.close();
249        } catch (IOException e) {
250          /* should not be thrown */
251        }
252      }
253    }
254
255    return false;
256  }
257
258  /**
259   * Create all combinations of Bloom filters and compression algorithms for
260   * testing.
261   */
262  private static List<Object[]> bloomAndCompressionCombinations() {
263    List<Object[]> configurations = new ArrayList<>();
264    for (Compression.Algorithm comprAlgo :
265         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
266      for (BloomType bloomType : BloomType.values()) {
267        configurations.add(new Object[] { comprAlgo, bloomType });
268      }
269    }
270    return Collections.unmodifiableList(configurations);
271  }
272
273  /**
274   * Create combination of memstoreTS and tags
275   */
276  private static List<Object[]> memStoreTSAndTagsCombination() {
277    List<Object[]> configurations = new ArrayList<>();
278    configurations.add(new Object[] { false, false });
279    configurations.add(new Object[] { false, true });
280    configurations.add(new Object[] { true, false });
281    configurations.add(new Object[] { true, true });
282    return Collections.unmodifiableList(configurations);
283  }
284
285  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
286    List<Object[]> configurations = new ArrayList<>();
287    configurations.add(new Object[] { false, false, true });
288    configurations.add(new Object[] { false, false, false });
289    configurations.add(new Object[] { false, true, true });
290    configurations.add(new Object[] { false, true, false });
291    configurations.add(new Object[] { true, false, true });
292    configurations.add(new Object[] { true, false, false });
293    configurations.add(new Object[] { true, true, true });
294    configurations.add(new Object[] { true, true, false });
295    return Collections.unmodifiableList(configurations);
296  }
297
298  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
299      bloomAndCompressionCombinations();
300
301
302  /**
303   * <p>Create an HBaseTestingUtility using a default configuration.
304   *
305   * <p>Initially, all tmp files are written to a local test data directory.
306   * Once {@link #startMiniDFSCluster} is called, either directly or via
307   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
308   *
309   * <p>Previously, there was a distinction between the type of utility returned by
310   * {@link #createLocalHTU()} and this constructor; this is no longer the case. All
311   * HBaseTestingUtility objects will behave as local until a DFS cluster is started,
312   * at which point they will switch to using mini DFS for storage.
313   */
314  public HBaseTestingUtility() {
315    this(HBaseConfiguration.create());
316  }
317
318  /**
319   * <p>Create an HBaseTestingUtility using a given configuration.
320   *
321   * <p>Initially, all tmp files are written to a local test data directory.
322   * Once {@link #startMiniDFSCluster} is called, either directly or via
323   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
324   *
325   * <p>Previously, there was a distinction between the type of utility returned by
326   * {@link #createLocalHTU()} and this constructor; this is no longer the case. All
327   * HBaseTestingUtility objects will behave as local until a DFS cluster is started,
328   * at which point they will switch to using mini DFS for storage.
329   *
330   * @param conf The configuration to use for further operations
331   */
332  public HBaseTestingUtility(@Nullable Configuration conf) {
333    super(conf);
334
335    // a hbase checksum verification failure will cause unit tests to fail
336    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
337
338    // Save this for when setting default file:// breaks things
339    if (this.conf.get("fs.defaultFS") != null) {
340      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
341    }
342    if (this.conf.get(HConstants.HBASE_DIR) != null) {
343      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
344    }
345    // Every cluster is a local cluster until we start DFS
346    // Note that conf could be null, but this.conf will not be
347    String dataTestDir = getDataTestDir().toString();
348    this.conf.set("fs.defaultFS","file:///");
349    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
350    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
351    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
352    // If the value for random ports isn't set set it to true, thus making
353    // tests opt-out for random port assignment
354    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
355        this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
356  }
357
358  /**
359   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #HBaseTestingUtility()}
360   *   instead.
361   * @return a normal HBaseTestingUtility
362   * @see #HBaseTestingUtility()
363   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19841">HBASE-19841</a>
364   */
365  @Deprecated
366  public static HBaseTestingUtility createLocalHTU() {
367    return new HBaseTestingUtility();
368  }
369
370  /**
371   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
372   *   {@link #HBaseTestingUtility(Configuration)} instead.
373   * @return a normal HBaseTestingUtility
374   * @see #HBaseTestingUtility(Configuration)
375   * @see <a href="https://issues.apache.org/jira/browse/HBASE-19841">HBASE-19841</a>
376   */
377  @Deprecated
378  public static HBaseTestingUtility createLocalHTU(Configuration c) {
379    return new HBaseTestingUtility(c);
380  }
381
382  /**
383   * Close both the region {@code r} and it's underlying WAL. For use in tests.
384   */
385  public static void closeRegionAndWAL(final Region r) throws IOException {
386    closeRegionAndWAL((HRegion)r);
387  }
388
389  /**
390   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
391   */
392  public static void closeRegionAndWAL(final HRegion r) throws IOException {
393    if (r == null) return;
394    r.close();
395    if (r.getWAL() == null) return;
396    r.getWAL().close();
397  }
398
399  /**
400   * Returns this classes's instance of {@link Configuration}.  Be careful how
401   * you use the returned Configuration since {@link Connection} instances
402   * can be shared.  The Map of Connections is keyed by the Configuration.  If
403   * say, a Connection was being used against a cluster that had been shutdown,
404   * see {@link #shutdownMiniCluster()}, then the Connection will no longer
405   * be wholesome.  Rather than use the return direct, its usually best to
406   * make a copy and use that.  Do
407   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
408   * @return Instance of Configuration.
409   */
410  @Override
411  public Configuration getConfiguration() {
412    return super.getConfiguration();
413  }
414
415  public void setHBaseCluster(HBaseCluster hbaseCluster) {
416    this.hbaseCluster = hbaseCluster;
417  }
418
419  /**
420   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
421   * Give it a random name so can have many concurrent tests running if
422   * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
423   * System property, as it's what minidfscluster bases
424   * it data dir on.  Moding a System property is not the way to do concurrent
425   * instances -- another instance could grab the temporary
426   * value unintentionally -- but not anything can do about it at moment;
427   * single instance only is how the minidfscluster works.
428   *
429   * We also create the underlying directory for
430   *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
431   *  in the conf, and as a system property for hadoop.tmp.dir
432   *
433   * @return The calculated data test build directory, if newly-created.
434   */
435  @Override
436  protected Path setupDataTestDir() {
437    Path testPath = super.setupDataTestDir();
438    if (null == testPath) {
439      return null;
440    }
441
442    createSubDirAndSystemProperty(
443      "hadoop.log.dir",
444      testPath, "hadoop-log-dir");
445
446    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
447    //  we want our own value to ensure uniqueness on the same machine
448    createSubDirAndSystemProperty(
449      "hadoop.tmp.dir",
450      testPath, "hadoop-tmp-dir");
451
452    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
453    createSubDir(
454      "mapreduce.cluster.local.dir",
455      testPath, "mapred-local-dir");
456
457    return testPath;
458  }
459
460  private void createSubDirAndSystemProperty(
461    String propertyName, Path parent, String subDirName){
462
463    String sysValue = System.getProperty(propertyName);
464
465    if (sysValue != null) {
466      // There is already a value set. So we do nothing but hope
467      //  that there will be no conflicts
468      LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
469        sysValue + " so I do NOT create it in " + parent);
470      String confValue = conf.get(propertyName);
471      if (confValue != null && !confValue.endsWith(sysValue)){
472       LOG.warn(
473         propertyName + " property value differs in configuration and system: "+
474         "Configuration="+confValue+" while System="+sysValue+
475         " Erasing configuration value by system value."
476       );
477      }
478      conf.set(propertyName, sysValue);
479    } else {
480      // Ok, it's not set, so we create it as a subdirectory
481      createSubDir(propertyName, parent, subDirName);
482      System.setProperty(propertyName, conf.get(propertyName));
483    }
484  }
485
486  /**
487   * @return Where to write test data on the test filesystem; Returns working directory
488   * for the test filesystem by default
489   * @see #setupDataTestDirOnTestFS()
490   * @see #getTestFileSystem()
491   */
492  private Path getBaseTestDirOnTestFS() throws IOException {
493    FileSystem fs = getTestFileSystem();
494    return new Path(fs.getWorkingDirectory(), "test-data");
495  }
496
497  /**
498   * @return META table descriptor
499   * @deprecated since 2.0 version and will be removed in 3.0 version.
500   *             use {@link #getMetaTableDescriptorBuilder()}
501   */
502  @Deprecated
503  public HTableDescriptor getMetaTableDescriptor() {
504    return new ImmutableHTableDescriptor(getMetaTableDescriptorBuilder().build());
505  }
506
507  /**
508   * @return META table descriptor
509   */
510  public TableDescriptorBuilder getMetaTableDescriptorBuilder() {
511    try {
512      return FSTableDescriptors.createMetaTableDescriptorBuilder(conf);
513    } catch (IOException e) {
514      throw new RuntimeException("Unable to create META table descriptor", e);
515    }
516  }
517
518  /**
519   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
520   * to write temporary test data. Call this method after setting up the mini dfs cluster
521   * if the test relies on it.
522   * @return a unique path in the test filesystem
523   */
524  public Path getDataTestDirOnTestFS() throws IOException {
525    if (dataTestDirOnTestFS == null) {
526      setupDataTestDirOnTestFS();
527    }
528
529    return dataTestDirOnTestFS;
530  }
531
532  /**
533   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
534   * to write temporary test data. Call this method after setting up the mini dfs cluster
535   * if the test relies on it.
536   * @return a unique path in the test filesystem
537   * @param subdirName name of the subdir to create under the base test dir
538   */
539  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
540    return new Path(getDataTestDirOnTestFS(), subdirName);
541  }
542
543  /**
544   * Sets up a path in test filesystem to be used by tests.
545   * Creates a new directory if not already setup.
546   */
547  private void setupDataTestDirOnTestFS() throws IOException {
548    if (dataTestDirOnTestFS != null) {
549      LOG.warn("Data test on test fs dir already setup in "
550          + dataTestDirOnTestFS.toString());
551      return;
552    }
553    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
554  }
555
556  /**
557   * Sets up a new path in test filesystem to be used by tests.
558   */
559  private Path getNewDataTestDirOnTestFS() throws IOException {
560    //The file system can be either local, mini dfs, or if the configuration
561    //is supplied externally, it can be an external cluster FS. If it is a local
562    //file system, the tests should use getBaseTestDir, otherwise, we can use
563    //the working directory, and create a unique sub dir there
564    FileSystem fs = getTestFileSystem();
565    Path newDataTestDir;
566    String randomStr = getRandomUUID().toString();
567    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
568      newDataTestDir = new Path(getDataTestDir(), randomStr);
569      File dataTestDir = new File(newDataTestDir.toString());
570      if (deleteOnExit()) dataTestDir.deleteOnExit();
571    } else {
572      Path base = getBaseTestDirOnTestFS();
573      newDataTestDir = new Path(base, randomStr);
574      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
575    }
576    return newDataTestDir;
577  }
578
579  /**
580   * Cleans the test data directory on the test filesystem.
581   * @return True if we removed the test dirs
582   * @throws IOException
583   */
584  public boolean cleanupDataTestDirOnTestFS() throws IOException {
585    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
586    if (ret)
587      dataTestDirOnTestFS = null;
588    return ret;
589  }
590
591  /**
592   * Cleans a subdirectory under the test data directory on the test filesystem.
593   * @return True if we removed child
594   * @throws IOException
595   */
596  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
597    Path cpath = getDataTestDirOnTestFS(subdirName);
598    return getTestFileSystem().delete(cpath, true);
599  }
600
601  /**
602   * Start a minidfscluster.
603   * @param servers How many DNs to start.
604   * @throws Exception
605   * @see #shutdownMiniDFSCluster()
606   * @return The mini dfs cluster created.
607   */
608  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
609    return startMiniDFSCluster(servers, null);
610  }
611
612  /**
613   * Start a minidfscluster.
614   * This is useful if you want to run datanode on distinct hosts for things
615   * like HDFS block location verification.
616   * If you start MiniDFSCluster without host names, all instances of the
617   * datanodes will have the same host name.
618   * @param hosts hostnames DNs to run on.
619   * @throws Exception
620   * @see #shutdownMiniDFSCluster()
621   * @return The mini dfs cluster created.
622   */
623  public MiniDFSCluster startMiniDFSCluster(final String hosts[])
624  throws Exception {
625    if ( hosts != null && hosts.length != 0) {
626      return startMiniDFSCluster(hosts.length, hosts);
627    } else {
628      return startMiniDFSCluster(1, null);
629    }
630  }
631
632  /**
633   * Start a minidfscluster.
634   * Can only create one.
635   * @param servers How many DNs to start.
636   * @param hosts hostnames DNs to run on.
637   * @throws Exception
638   * @see #shutdownMiniDFSCluster()
639   * @return The mini dfs cluster created.
640   */
641  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
642  throws Exception {
643    return startMiniDFSCluster(servers, null, hosts);
644  }
645
646  private void setFs() throws IOException {
647    if(this.dfsCluster == null){
648      LOG.info("Skipping setting fs because dfsCluster is null");
649      return;
650    }
651    FileSystem fs = this.dfsCluster.getFileSystem();
652    FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
653
654    // re-enable this check with dfs
655    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
656  }
657
658  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
659      throws Exception {
660    createDirsAndSetProperties();
661    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
662
663    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
664    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
665        setLevel(org.apache.log4j.Level.ERROR);
666    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
667        setLevel(org.apache.log4j.Level.ERROR);
668
669    TraceUtil.initTracer(conf);
670
671    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
672        true, null, racks, hosts, null);
673
674    // Set this just-started cluster as our filesystem.
675    setFs();
676
677    // Wait for the cluster to be totally up
678    this.dfsCluster.waitClusterUp();
679
680    //reset the test directory for test file system
681    dataTestDirOnTestFS = null;
682    String dataTestDir = getDataTestDir().toString();
683    conf.set(HConstants.HBASE_DIR, dataTestDir);
684    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
685
686    return this.dfsCluster;
687  }
688
689  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
690    createDirsAndSetProperties();
691    dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
692        null, null, null);
693    return dfsCluster;
694  }
695
696  /** This is used before starting HDFS and map-reduce mini-clusters */
697  private void createDirsAndSetProperties() throws IOException {
698    setupClusterTestDir();
699    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
700    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
701    createDirAndSetProperty("cache_data", "test.cache.data");
702    createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
703    hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
704    createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir");
705    createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir");
706    enableShortCircuit();
707
708    Path root = getDataTestDirOnTestFS("hadoop");
709    conf.set(MapreduceTestingShim.getMROutputDirProp(),
710      new Path(root, "mapred-output-dir").toString());
711    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
712    conf.set("mapreduce.jobtracker.staging.root.dir",
713      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
714    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
715    conf.set("yarn.app.mapreduce.am.staging-dir",
716      new Path(root, "mapreduce-am-staging-root-dir").toString());
717  }
718
719  /**
720   *  Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating
721   *  new column families. Default to false.
722   */
723  public boolean isNewVersionBehaviorEnabled(){
724    final String propName = "hbase.tests.new.version.behavior";
725    String v = System.getProperty(propName);
726    if (v != null){
727      return Boolean.parseBoolean(v);
728    }
729    return false;
730  }
731
732  /**
733   *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
734   *  This allows to specify this parameter on the command line.
735   *   If not set, default is true.
736   */
737  public boolean isReadShortCircuitOn(){
738    final String propName = "hbase.tests.use.shortcircuit.reads";
739    String readOnProp = System.getProperty(propName);
740    if (readOnProp != null){
741      return  Boolean.parseBoolean(readOnProp);
742    } else {
743      return conf.getBoolean(propName, false);
744    }
745  }
746
747  /** Enable the short circuit read, unless configured differently.
748   * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
749   */
750  private void enableShortCircuit() {
751    if (isReadShortCircuitOn()) {
752      String curUser = System.getProperty("user.name");
753      LOG.info("read short circuit is ON for user " + curUser);
754      // read short circuit, for hdfs
755      conf.set("dfs.block.local-path-access.user", curUser);
756      // read short circuit, for hbase
757      conf.setBoolean("dfs.client.read.shortcircuit", true);
758      // Skip checking checksum, for the hdfs client and the datanode
759      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
760    } else {
761      LOG.info("read short circuit is OFF");
762    }
763  }
764
765  private String createDirAndSetProperty(final String relPath, String property) {
766    String path = getDataTestDir(relPath).toString();
767    System.setProperty(property, path);
768    conf.set(property, path);
769    new File(path).mkdirs();
770    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
771    return path;
772  }
773
774  /**
775   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
776   * or does nothing.
777   * @throws IOException
778   */
779  public void shutdownMiniDFSCluster() throws IOException {
780    if (this.dfsCluster != null) {
781      // The below throws an exception per dn, AsynchronousCloseException.
782      this.dfsCluster.shutdown();
783      dfsCluster = null;
784      dataTestDirOnTestFS = null;
785      FSUtils.setFsDefault(this.conf, new Path("file:///"));
786    }
787  }
788
789  /**
790   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
791   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
792   * @param createWALDir Whether to create a new WAL directory.
793   * @return The mini HBase cluster created.
794   * @see #shutdownMiniCluster()
795   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
796   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
797   * @see #startMiniCluster(StartMiniClusterOption)
798   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
799   */
800  @Deprecated
801  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
802    StartMiniClusterOption option = StartMiniClusterOption.builder()
803        .createWALDir(createWALDir).build();
804    return startMiniCluster(option);
805  }
806
807  /**
808   * Start up a minicluster of hbase, dfs, and zookeeper.
809   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
810   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
811   * @param createRootDir Whether to create a new root or data directory path.
812   * @return The mini HBase cluster created.
813   * @see #shutdownMiniCluster()
814   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
815   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
816   * @see #startMiniCluster(StartMiniClusterOption)
817   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
818   */
819  @Deprecated
820  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir)
821  throws Exception {
822    StartMiniClusterOption option = StartMiniClusterOption.builder()
823        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir).build();
824    return startMiniCluster(option);
825  }
826
827  /**
828   * Start up a minicluster of hbase, dfs, and zookeeper.
829   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
830   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
831   * @param createRootDir Whether to create a new root or data directory path.
832   * @param createWALDir Whether to create a new WAL directory.
833   * @return The mini HBase cluster created.
834   * @see #shutdownMiniCluster()
835   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
836   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
837   * @see #startMiniCluster(StartMiniClusterOption)
838   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
839   */
840  @Deprecated
841  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
842      boolean createWALDir) throws Exception {
843    StartMiniClusterOption option = StartMiniClusterOption.builder()
844        .numRegionServers(numSlaves).numDataNodes(numSlaves).createRootDir(createRootDir)
845        .createWALDir(createWALDir).build();
846    return startMiniCluster(option);
847  }
848
849  /**
850   * Start up a minicluster of hbase, dfs, and zookeeper.
851   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
852   * @param numMasters Master node number.
853   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
854   * @param createRootDir Whether to create a new root or data directory path.
855   * @return The mini HBase cluster created.
856   * @see #shutdownMiniCluster()
857   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
858   *  {@link #startMiniCluster(StartMiniClusterOption)} instead.
859   * @see #startMiniCluster(StartMiniClusterOption)
860   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
861   */
862  @Deprecated
863  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
864    throws Exception {
865    StartMiniClusterOption option = StartMiniClusterOption.builder()
866        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
867        .numDataNodes(numSlaves).build();
868    return startMiniCluster(option);
869  }
870
871  /**
872   * Start up a minicluster of hbase, dfs, and zookeeper.
873   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
874   * @param numMasters Master node number.
875   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
876   * @return The mini HBase cluster created.
877   * @see #shutdownMiniCluster()
878   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
879   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
880   * @see #startMiniCluster(StartMiniClusterOption)
881   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
882   */
883  @Deprecated
884  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
885    StartMiniClusterOption option = StartMiniClusterOption.builder()
886        .numMasters(numMasters).numRegionServers(numSlaves).numDataNodes(numSlaves).build();
887    return startMiniCluster(option);
888  }
889
890  /**
891   * Start up a minicluster of hbase, dfs, and zookeeper.
892   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
893   * @param numMasters Master node number.
894   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
895   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
896   *                      HDFS data node number.
897   * @param createRootDir Whether to create a new root or data directory path.
898   * @return The mini HBase cluster created.
899   * @see #shutdownMiniCluster()
900   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
901   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
902   * @see #startMiniCluster(StartMiniClusterOption)
903   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
904   */
905  @Deprecated
906  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
907      boolean createRootDir) throws Exception {
908    StartMiniClusterOption option = StartMiniClusterOption.builder()
909        .numMasters(numMasters).numRegionServers(numSlaves).createRootDir(createRootDir)
910        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
911    return startMiniCluster(option);
912  }
913
914  /**
915   * Start up a minicluster of hbase, dfs, and zookeeper.
916   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
917   * @param numMasters Master node number.
918   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
919   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
920   *                      HDFS data node number.
921   * @return The mini HBase cluster created.
922   * @see #shutdownMiniCluster()
923   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
924   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
925   * @see #startMiniCluster(StartMiniClusterOption)
926   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
927   */
928  @Deprecated
929  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
930      throws Exception {
931    StartMiniClusterOption option = StartMiniClusterOption.builder()
932        .numMasters(numMasters).numRegionServers(numSlaves)
933        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
934    return startMiniCluster(option);
935  }
936
937  /**
938   * Start up a minicluster of hbase, dfs, and zookeeper.
939   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
940   * @param numMasters Master node number.
941   * @param numRegionServers Number of region servers.
942   * @param numDataNodes Number of datanodes.
943   * @return The mini HBase cluster created.
944   * @see #shutdownMiniCluster()
945   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
946   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
947   * @see #startMiniCluster(StartMiniClusterOption)
948   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
949   */
950  @Deprecated
951  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
952      throws Exception {
953    StartMiniClusterOption option = StartMiniClusterOption.builder()
954        .numMasters(numMasters).numRegionServers(numRegionServers).numDataNodes(numDataNodes)
955        .build();
956    return startMiniCluster(option);
957  }
958
959  /**
960   * Start up a minicluster of hbase, dfs, and zookeeper.
961   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
962   * @param numMasters Master node number.
963   * @param numSlaves Slave node number, for both HBase region server and HDFS data node.
964   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
965   *                      HDFS data node number.
966   * @param masterClass The class to use as HMaster, or null for default.
967   * @param rsClass The class to use as HRegionServer, or null for default.
968   * @return The mini HBase cluster created.
969   * @see #shutdownMiniCluster()
970   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
971   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
972   * @see #startMiniCluster(StartMiniClusterOption)
973   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
974   */
975  @Deprecated
976  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
977      Class<? extends HMaster> masterClass,
978      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
979      throws Exception {
980    StartMiniClusterOption option = StartMiniClusterOption.builder()
981        .numMasters(numMasters).masterClass(masterClass)
982        .numRegionServers(numSlaves).rsClass(rsClass)
983        .numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts)
984        .build();
985    return startMiniCluster(option);
986  }
987
988  /**
989   * Start up a minicluster of hbase, dfs, and zookeeper.
990   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
991   * @param numMasters Master node number.
992   * @param numRegionServers Number of region servers.
993   * @param numDataNodes Number of datanodes.
994   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
995   *                      HDFS data node number.
996   * @param masterClass The class to use as HMaster, or null for default.
997   * @param rsClass The class to use as HRegionServer, or null for default.
998   * @return The mini HBase cluster created.
999   * @see #shutdownMiniCluster()
1000   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1001   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
1002   * @see #startMiniCluster(StartMiniClusterOption)
1003   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1004   */
1005  @Deprecated
1006  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1007      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1008      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass)
1009    throws Exception {
1010    StartMiniClusterOption option = StartMiniClusterOption.builder()
1011        .numMasters(numMasters).masterClass(masterClass)
1012        .numRegionServers(numRegionServers).rsClass(rsClass)
1013        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
1014        .build();
1015    return startMiniCluster(option);
1016  }
1017
1018  /**
1019   * Start up a minicluster of hbase, dfs, and zookeeper.
1020   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1021   * @param numMasters Master node number.
1022   * @param numRegionServers Number of region servers.
1023   * @param numDataNodes Number of datanodes.
1024   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
1025   *                      HDFS data node number.
1026   * @param masterClass The class to use as HMaster, or null for default.
1027   * @param rsClass The class to use as HRegionServer, or null for default.
1028   * @param createRootDir Whether to create a new root or data directory path.
1029   * @param createWALDir Whether to create a new WAL directory.
1030   * @return The mini HBase cluster created.
1031   * @see #shutdownMiniCluster()
1032   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1033   *   {@link #startMiniCluster(StartMiniClusterOption)} instead.
1034   * @see #startMiniCluster(StartMiniClusterOption)
1035   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1036   */
1037  @Deprecated
1038  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1039      String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1040      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1041      boolean createWALDir) throws Exception {
1042    StartMiniClusterOption option = StartMiniClusterOption.builder()
1043        .numMasters(numMasters).masterClass(masterClass)
1044        .numRegionServers(numRegionServers).rsClass(rsClass)
1045        .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts)
1046        .createRootDir(createRootDir).createWALDir(createWALDir)
1047        .build();
1048    return startMiniCluster(option);
1049  }
1050
1051  /**
1052   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number.
1053   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1054   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1055   * @see #startMiniCluster(StartMiniClusterOption option)
1056   * @see #shutdownMiniDFSCluster()
1057   */
1058  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1059    StartMiniClusterOption option = StartMiniClusterOption.builder()
1060        .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1061    return startMiniCluster(option);
1062  }
1063
1064  /**
1065   * Start up a minicluster of hbase, dfs and zookeeper all using default options.
1066   * Option default value can be found in {@link StartMiniClusterOption.Builder}.
1067   * @see #startMiniCluster(StartMiniClusterOption option)
1068   * @see #shutdownMiniDFSCluster()
1069   */
1070  public MiniHBaseCluster startMiniCluster() throws Exception {
1071    return startMiniCluster(StartMiniClusterOption.builder().build());
1072  }
1073
1074  /**
1075   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed.
1076   * It modifies Configuration.  It homes the cluster data directory under a random
1077   * subdirectory in a directory under System property test.build.data, to be cleaned up on exit.
1078   * @see #shutdownMiniDFSCluster()
1079   */
1080  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1081    LOG.info("Starting up minicluster with option: {}", option);
1082
1083    // If we already put up a cluster, fail.
1084    if (miniClusterRunning) {
1085      throw new IllegalStateException("A mini-cluster is already running");
1086    }
1087    miniClusterRunning = true;
1088
1089    setupClusterTestDir();
1090    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
1091
1092    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1093    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1094    if (dfsCluster == null) {
1095      LOG.info("STARTING DFS");
1096      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1097    } else {
1098      LOG.info("NOT STARTING DFS");
1099    }
1100
1101    // Start up a zk cluster.
1102    if (getZkCluster() == null) {
1103      startMiniZKCluster(option.getNumZkServers());
1104    }
1105
1106    // Start the MiniHBaseCluster
1107    return startMiniHBaseCluster(option);
1108  }
1109
1110  /**
1111   * Starts up mini hbase cluster.
1112   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1113   * This is useful when doing stepped startup of clusters.
1114   * @return Reference to the hbase mini hbase cluster.
1115   * @see #startMiniCluster(StartMiniClusterOption)
1116   * @see #shutdownMiniHBaseCluster()
1117   */
1118  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1119      throws IOException, InterruptedException {
1120    // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
1121    createRootDir(option.isCreateRootDir());
1122    if (option.isCreateWALDir()) {
1123      createWALRootDir();
1124    }
1125    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1126    // for tests that do not read hbase-defaults.xml
1127    setHBaseFsTmpDir();
1128
1129    // These settings will make the server waits until this exact number of
1130    // regions servers are connected.
1131    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1132      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1133    }
1134    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1135      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1136    }
1137
1138    Configuration c = new Configuration(this.conf);
1139    TraceUtil.initTracer(c);
1140    this.hbaseCluster =
1141        new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
1142            option.getRsPorts(), option.getMasterClass(), option.getRsClass());
1143    // Don't leave here till we've done a successful scan of the hbase:meta
1144    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1145    ResultScanner s = t.getScanner(new Scan());
1146    while (s.next() != null) {
1147      continue;
1148    }
1149    s.close();
1150    t.close();
1151
1152    getAdmin(); // create immediately the hbaseAdmin
1153    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1154
1155    return (MiniHBaseCluster) hbaseCluster;
1156  }
1157
1158  /**
1159   * Starts up mini hbase cluster using default options.
1160   * Default options can be found in {@link StartMiniClusterOption.Builder}.
1161   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1162   * @see #shutdownMiniHBaseCluster()
1163   */
1164  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1165    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1166  }
1167
1168  /**
1169   * Starts up mini hbase cluster.
1170   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1171   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1172   * @param numMasters Master node number.
1173   * @param numRegionServers Number of region servers.
1174   * @return The mini HBase cluster created.
1175   * @see #shutdownMiniHBaseCluster()
1176   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1177   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1178   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1179   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1180   */
1181  @Deprecated
1182  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1183      throws IOException, InterruptedException {
1184    StartMiniClusterOption option = StartMiniClusterOption.builder()
1185        .numMasters(numMasters).numRegionServers(numRegionServers).build();
1186    return startMiniHBaseCluster(option);
1187  }
1188
1189  /**
1190   * Starts up mini hbase cluster.
1191   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1192   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1193   * @param numMasters Master node number.
1194   * @param numRegionServers Number of region servers.
1195   * @param rsPorts Ports that RegionServer should use.
1196   * @return The mini HBase cluster created.
1197   * @see #shutdownMiniHBaseCluster()
1198   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1199   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1200   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1201   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1202   */
1203  @Deprecated
1204  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1205      List<Integer> rsPorts) throws IOException, InterruptedException {
1206    StartMiniClusterOption option = StartMiniClusterOption.builder()
1207        .numMasters(numMasters).numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1208    return startMiniHBaseCluster(option);
1209  }
1210
1211  /**
1212   * Starts up mini hbase cluster.
1213   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1214   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1215   * @param numMasters Master node number.
1216   * @param numRegionServers Number of region servers.
1217   * @param rsPorts Ports that RegionServer should use.
1218   * @param masterClass The class to use as HMaster, or null for default.
1219   * @param rsClass The class to use as HRegionServer, or null for default.
1220   * @param createRootDir Whether to create a new root or data directory path.
1221   * @param createWALDir Whether to create a new WAL directory.
1222   * @return The mini HBase cluster created.
1223   * @see #shutdownMiniHBaseCluster()
1224   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1225   *   {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1226   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1227   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1228   */
1229  @Deprecated
1230  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1231      List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1232      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
1233      boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
1234    StartMiniClusterOption option = StartMiniClusterOption.builder()
1235        .numMasters(numMasters).masterClass(masterClass)
1236        .numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1237        .createRootDir(createRootDir).createWALDir(createWALDir).build();
1238    return startMiniHBaseCluster(option);
1239  }
1240
1241  /**
1242   * Starts the hbase cluster up again after shutting it down previously in a
1243   * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1244   * @param servers number of region servers
1245   */
1246  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1247    this.restartHBaseCluster(servers, null);
1248  }
1249
1250  public void restartHBaseCluster(int servers, List<Integer> ports)
1251      throws IOException, InterruptedException {
1252    StartMiniClusterOption option =
1253        StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1254    restartHBaseCluster(option);
1255  }
1256
1257  public void restartHBaseCluster(StartMiniClusterOption option)
1258      throws IOException, InterruptedException {
1259    if (hbaseAdmin != null) {
1260      hbaseAdmin.close();
1261      hbaseAdmin = null;
1262    }
1263    if (this.connection != null) {
1264      this.connection.close();
1265      this.connection = null;
1266    }
1267    this.hbaseCluster =
1268        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
1269            option.getRsPorts(), option.getMasterClass(), option.getRsClass());
1270    // Don't leave here till we've done a successful scan of the hbase:meta
1271    Connection conn = ConnectionFactory.createConnection(this.conf);
1272    Table t = conn.getTable(TableName.META_TABLE_NAME);
1273    ResultScanner s = t.getScanner(new Scan());
1274    while (s.next() != null) {
1275      // do nothing
1276    }
1277    LOG.info("HBase has been restarted");
1278    s.close();
1279    t.close();
1280    conn.close();
1281  }
1282
1283  /**
1284   * @return Current mini hbase cluster. Only has something in it after a call
1285   * to {@link #startMiniCluster()}.
1286   * @see #startMiniCluster()
1287   */
1288  public MiniHBaseCluster getMiniHBaseCluster() {
1289    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1290      return (MiniHBaseCluster)this.hbaseCluster;
1291    }
1292    throw new RuntimeException(hbaseCluster + " not an instance of " +
1293                               MiniHBaseCluster.class.getName());
1294  }
1295
1296  /**
1297   * Stops mini hbase, zk, and hdfs clusters.
1298   * @throws IOException
1299   * @see #startMiniCluster(int)
1300   */
1301  public void shutdownMiniCluster() throws Exception {
1302    LOG.info("Shutting down minicluster");
1303    shutdownMiniHBaseCluster();
1304    shutdownMiniDFSCluster();
1305    shutdownMiniZKCluster();
1306
1307    cleanupTestDir();
1308    miniClusterRunning = false;
1309    LOG.info("Minicluster is down");
1310  }
1311
1312  /**
1313   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1314   * @throws java.io.IOException in case command is unsuccessful
1315   */
1316  public void shutdownMiniHBaseCluster() throws IOException {
1317    cleanup();
1318    if (this.hbaseCluster != null) {
1319      this.hbaseCluster.shutdown();
1320      // Wait till hbase is down before going on to shutdown zk.
1321      this.hbaseCluster.waitUntilShutDown();
1322      this.hbaseCluster = null;
1323    }
1324    if (zooKeeperWatcher != null) {
1325      zooKeeperWatcher.close();
1326      zooKeeperWatcher = null;
1327    }
1328  }
1329
1330  /**
1331   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1332   * @throws java.io.IOException throws in case command is unsuccessful
1333   */
1334  public void killMiniHBaseCluster() throws IOException {
1335    cleanup();
1336    if (this.hbaseCluster != null) {
1337      getMiniHBaseCluster().killAll();
1338      this.hbaseCluster = null;
1339    }
1340    if (zooKeeperWatcher != null) {
1341      zooKeeperWatcher.close();
1342      zooKeeperWatcher = null;
1343    }
1344  }
1345
1346  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1347  private void cleanup() throws IOException {
1348    if (hbaseAdmin != null) {
1349      hbaseAdmin.close();
1350      hbaseAdmin = null;
1351    }
1352    if (this.connection != null) {
1353      this.connection.close();
1354      this.connection = null;
1355    }
1356    // unset the configuration for MIN and MAX RS to start
1357    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1358    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1359  }
1360
1361  /**
1362   * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1363   * is true, a new root directory path is fetched irrespective of whether it has been fetched
1364   * before or not. If false, previous path is used.
1365   * Note: this does not cause the root dir to be created.
1366   * @return Fully qualified path for the default hbase root dir
1367   * @throws IOException
1368   */
1369  public Path getDefaultRootDirPath(boolean create) throws IOException {
1370    if (!create) {
1371      return getDataTestDirOnTestFS();
1372    } else {
1373      return getNewDataTestDirOnTestFS();
1374    }
1375  }
1376
1377  /**
1378   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1379   * except that <code>create</code> flag is false.
1380   * Note: this does not cause the root dir to be created.
1381   * @return Fully qualified path for the default hbase root dir
1382   * @throws IOException
1383   */
1384  public Path getDefaultRootDirPath() throws IOException {
1385    return getDefaultRootDirPath(false);
1386  }
1387
1388  /**
1389   * Creates an hbase rootdir in user home directory.  Also creates hbase
1390   * version file.  Normally you won't make use of this method.  Root hbasedir
1391   * is created for you as part of mini cluster startup.  You'd only use this
1392   * method if you were doing manual operation.
1393   * @param create This flag decides whether to get a new
1394   * root or data directory path or not, if it has been fetched already.
1395   * Note : Directory will be made irrespective of whether path has been fetched or not.
1396   * If directory already exists, it will be overwritten
1397   * @return Fully qualified path to hbase root dir
1398   * @throws IOException
1399   */
1400  public Path createRootDir(boolean create) throws IOException {
1401    FileSystem fs = FileSystem.get(this.conf);
1402    Path hbaseRootdir = getDefaultRootDirPath(create);
1403    FSUtils.setRootDir(this.conf, hbaseRootdir);
1404    fs.mkdirs(hbaseRootdir);
1405    FSUtils.setVersion(fs, hbaseRootdir);
1406    return hbaseRootdir;
1407  }
1408
1409  /**
1410   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1411   * except that <code>create</code> flag is false.
1412   * @return Fully qualified path to hbase root dir
1413   * @throws IOException
1414   */
1415  public Path createRootDir() throws IOException {
1416    return createRootDir(false);
1417  }
1418
1419  /**
1420   * Creates a hbase walDir in the user's home directory.
1421   * Normally you won't make use of this method. Root hbaseWALDir
1422   * is created for you as part of mini cluster startup. You'd only use this
1423   * method if you were doing manual operation.
1424   *
1425   * @return Fully qualified path to hbase root dir
1426   * @throws IOException
1427  */
1428  public Path createWALRootDir() throws IOException {
1429    FileSystem fs = FileSystem.get(this.conf);
1430    Path walDir = getNewDataTestDirOnTestFS();
1431    FSUtils.setWALRootDir(this.conf, walDir);
1432    fs.mkdirs(walDir);
1433    return walDir;
1434  }
1435
1436  private void setHBaseFsTmpDir() throws IOException {
1437    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1438    if (hbaseFsTmpDirInString == null) {
1439      this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1440      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1441    } else {
1442      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1443    }
1444  }
1445
1446  /**
1447   * Flushes all caches in the mini hbase cluster
1448   * @throws IOException
1449   */
1450  public void flush() throws IOException {
1451    getMiniHBaseCluster().flushcache();
1452  }
1453
1454  /**
1455   * Flushes all caches in the mini hbase cluster
1456   * @throws IOException
1457   */
1458  public void flush(TableName tableName) throws IOException {
1459    getMiniHBaseCluster().flushcache(tableName);
1460  }
1461
1462  /**
1463   * Compact all regions in the mini hbase cluster
1464   * @throws IOException
1465   */
1466  public void compact(boolean major) throws IOException {
1467    getMiniHBaseCluster().compact(major);
1468  }
1469
1470  /**
1471   * Compact all of a table's reagion in the mini hbase cluster
1472   * @throws IOException
1473   */
1474  public void compact(TableName tableName, boolean major) throws IOException {
1475    getMiniHBaseCluster().compact(tableName, major);
1476  }
1477
1478  /**
1479   * Create a table.
1480   * @param tableName
1481   * @param family
1482   * @return A Table instance for the created table.
1483   * @throws IOException
1484   */
1485  public Table createTable(TableName tableName, String family)
1486  throws IOException{
1487    return createTable(tableName, new String[]{family});
1488  }
1489
1490  /**
1491   * Create a table.
1492   * @param tableName
1493   * @param families
1494   * @return A Table instance for the created table.
1495   * @throws IOException
1496   */
1497  public Table createTable(TableName tableName, String[] families)
1498  throws IOException {
1499    List<byte[]> fams = new ArrayList<>(families.length);
1500    for (String family : families) {
1501      fams.add(Bytes.toBytes(family));
1502    }
1503    return createTable(tableName, fams.toArray(new byte[0][]));
1504  }
1505
1506  /**
1507   * Create a table.
1508   * @param tableName
1509   * @param family
1510   * @return A Table instance for the created table.
1511   * @throws IOException
1512   */
1513  public Table createTable(TableName tableName, byte[] family)
1514  throws IOException{
1515    return createTable(tableName, new byte[][]{family});
1516  }
1517
1518  /**
1519   * Create a table with multiple regions.
1520   * @param tableName
1521   * @param family
1522   * @param numRegions
1523   * @return A Table instance for the created table.
1524   * @throws IOException
1525   */
1526  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1527      throws IOException {
1528    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1529    byte[] startKey = Bytes.toBytes("aaaaa");
1530    byte[] endKey = Bytes.toBytes("zzzzz");
1531    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1532
1533    return createTable(tableName, new byte[][] { family }, splitKeys);
1534  }
1535
1536  /**
1537   * Create a table.
1538   * @param tableName
1539   * @param families
1540   * @return A Table instance for the created table.
1541   * @throws IOException
1542   */
1543  public Table createTable(TableName tableName, byte[][] families)
1544  throws IOException {
1545    return createTable(tableName, families, (byte[][]) null);
1546  }
1547
1548  /**
1549   * Create a table with multiple regions.
1550   * @param tableName
1551   * @param families
1552   * @return A Table instance for the created table.
1553   * @throws IOException
1554   */
1555  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1556    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1557  }
1558
1559  /**
1560   * Create a table.
1561   * @param tableName
1562   * @param families
1563   * @param splitKeys
1564   * @return A Table instance for the created table.
1565   * @throws IOException
1566   */
1567  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1568      throws IOException {
1569    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1570  }
1571
1572  /**
1573   * Create a table.
1574   * @param tableName the table name
1575   * @param families the families
1576   * @param splitKeys the splitkeys
1577   * @param replicaCount the region replica count
1578   * @return A Table instance for the created table.
1579   * @throws IOException throws IOException
1580   */
1581  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1582      int replicaCount) throws IOException {
1583    return createTable(tableName, families, splitKeys, replicaCount,
1584      new Configuration(getConfiguration()));
1585  }
1586
1587  public Table createTable(TableName tableName, byte[][] families,
1588      int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1589  throws IOException{
1590    HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1591
1592    getAdmin().createTable(desc, startKey, endKey, numRegions);
1593    // HBaseAdmin only waits for regions to appear in hbase:meta we
1594    // should wait until they are assigned
1595    waitUntilAllRegionsAssigned(tableName);
1596    return getConnection().getTable(tableName);
1597  }
1598
1599  /**
1600   * Create a table.
1601   * @param htd
1602   * @param families
1603   * @param c Configuration to use
1604   * @return A Table instance for the created table.
1605   * @throws IOException
1606   */
1607  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1608  throws IOException {
1609    return createTable(htd, families, null, c);
1610  }
1611
1612  /**
1613   * Create a table.
1614   * @param htd table descriptor
1615   * @param families array of column families
1616   * @param splitKeys array of split keys
1617   * @param c Configuration to use
1618   * @return A Table instance for the created table.
1619   * @throws IOException if getAdmin or createTable fails
1620   */
1621  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1622      Configuration c) throws IOException {
1623    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1624    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1625    // on is interfering.
1626    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1627  }
1628
1629  /**
1630   * Create a table.
1631   * @param htd table descriptor
1632   * @param families array of column families
1633   * @param splitKeys array of split keys
1634   * @param type Bloom type
1635   * @param blockSize block size
1636   * @param c Configuration to use
1637   * @return A Table instance for the created table.
1638   * @throws IOException if getAdmin or createTable fails
1639   */
1640
1641  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1642      BloomType type, int blockSize, Configuration c) throws IOException {
1643    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1644    for (byte[] family : families) {
1645      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1646        .setBloomFilterType(type)
1647        .setBlocksize(blockSize);
1648      if (isNewVersionBehaviorEnabled()) {
1649          cfdb.setNewVersionBehavior(true);
1650      }
1651      builder.setColumnFamily(cfdb.build());
1652    }
1653    TableDescriptor td = builder.build();
1654    getAdmin().createTable(td, splitKeys);
1655    // HBaseAdmin only waits for regions to appear in hbase:meta
1656    // we should wait until they are assigned
1657    waitUntilAllRegionsAssigned(td.getTableName());
1658    return getConnection().getTable(td.getTableName());
1659  }
1660
1661  /**
1662   * Create a table.
1663   * @param htd table descriptor
1664   * @param splitRows array of split keys
1665   * @return A Table instance for the created table.
1666   * @throws IOException
1667   */
1668  public Table createTable(TableDescriptor htd, byte[][] splitRows)
1669      throws IOException {
1670    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1671    if (isNewVersionBehaviorEnabled()) {
1672      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1673         builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family)
1674           .setNewVersionBehavior(true).build());
1675      }
1676    }
1677    getAdmin().createTable(builder.build(), splitRows);
1678    // HBaseAdmin only waits for regions to appear in hbase:meta
1679    // we should wait until they are assigned
1680    waitUntilAllRegionsAssigned(htd.getTableName());
1681    return getConnection().getTable(htd.getTableName());
1682  }
1683
1684  /**
1685   * Create a table.
1686   * @param tableName the table name
1687   * @param families the families
1688   * @param splitKeys the split keys
1689   * @param replicaCount the replica count
1690   * @param c Configuration to use
1691   * @return A Table instance for the created table.
1692   * @throws IOException
1693   */
1694  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1695      int replicaCount, final Configuration c) throws IOException {
1696    HTableDescriptor htd = new HTableDescriptor(tableName);
1697    htd.setRegionReplication(replicaCount);
1698    return createTable(htd, families, splitKeys, c);
1699  }
1700
1701  /**
1702   * Create a table.
1703   * @param tableName
1704   * @param family
1705   * @param numVersions
1706   * @return A Table instance for the created table.
1707   * @throws IOException
1708   */
1709  public Table createTable(TableName tableName, byte[] family, int numVersions)
1710  throws IOException {
1711    return createTable(tableName, new byte[][]{family}, numVersions);
1712  }
1713
1714  /**
1715   * Create a table.
1716   * @param tableName
1717   * @param families
1718   * @param numVersions
1719   * @return A Table instance for the created table.
1720   * @throws IOException
1721   */
1722  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1723      throws IOException {
1724    return createTable(tableName, families, numVersions, (byte[][]) null);
1725  }
1726
1727  /**
1728   * Create a table.
1729   * @param tableName
1730   * @param families
1731   * @param numVersions
1732   * @param splitKeys
1733   * @return A Table instance for the created table.
1734   * @throws IOException
1735   */
1736  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1737      byte[][] splitKeys) throws IOException {
1738    HTableDescriptor desc = new HTableDescriptor(tableName);
1739    for (byte[] family : families) {
1740      HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1741      if (isNewVersionBehaviorEnabled()) {
1742        hcd.setNewVersionBehavior(true);
1743      }
1744      desc.addFamily(hcd);
1745    }
1746    getAdmin().createTable(desc, splitKeys);
1747    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1748    // assigned
1749    waitUntilAllRegionsAssigned(tableName);
1750    return getConnection().getTable(tableName);
1751  }
1752
1753  /**
1754   * Create a table with multiple regions.
1755   * @param tableName
1756   * @param families
1757   * @param numVersions
1758   * @return A Table instance for the created table.
1759   * @throws IOException
1760   */
1761  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1762      throws IOException {
1763    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1764  }
1765
1766  /**
1767   * Create a table.
1768   * @param tableName
1769   * @param families
1770   * @param numVersions
1771   * @param blockSize
1772   * @return A Table instance for the created table.
1773   * @throws IOException
1774   */
1775  public Table createTable(TableName tableName, byte[][] families,
1776    int numVersions, int blockSize) throws IOException {
1777    HTableDescriptor desc = new HTableDescriptor(tableName);
1778    for (byte[] family : families) {
1779      HColumnDescriptor hcd = new HColumnDescriptor(family)
1780          .setMaxVersions(numVersions)
1781          .setBlocksize(blockSize);
1782      if (isNewVersionBehaviorEnabled()) {
1783        hcd.setNewVersionBehavior(true);
1784      }
1785      desc.addFamily(hcd);
1786    }
1787    getAdmin().createTable(desc);
1788    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1789    // assigned
1790    waitUntilAllRegionsAssigned(tableName);
1791    return getConnection().getTable(tableName);
1792  }
1793
1794  public Table createTable(TableName tableName, byte[][] families,
1795      int numVersions, int blockSize, String cpName) throws IOException {
1796      HTableDescriptor desc = new HTableDescriptor(tableName);
1797      for (byte[] family : families) {
1798        HColumnDescriptor hcd = new HColumnDescriptor(family)
1799            .setMaxVersions(numVersions)
1800            .setBlocksize(blockSize);
1801        if (isNewVersionBehaviorEnabled()) {
1802          hcd.setNewVersionBehavior(true);
1803        }
1804        desc.addFamily(hcd);
1805      }
1806      if(cpName != null) {
1807        desc.addCoprocessor(cpName);
1808      }
1809      getAdmin().createTable(desc);
1810      // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1811      // assigned
1812      waitUntilAllRegionsAssigned(tableName);
1813      return getConnection().getTable(tableName);
1814    }
1815
1816  /**
1817   * Create a table.
1818   * @param tableName
1819   * @param families
1820   * @param numVersions
1821   * @return A Table instance for the created table.
1822   * @throws IOException
1823   */
1824  public Table createTable(TableName tableName, byte[][] families,
1825      int[] numVersions)
1826  throws IOException {
1827    HTableDescriptor desc = new HTableDescriptor(tableName);
1828    int i = 0;
1829    for (byte[] family : families) {
1830      HColumnDescriptor hcd = new HColumnDescriptor(family)
1831          .setMaxVersions(numVersions[i]);
1832      if (isNewVersionBehaviorEnabled()) {
1833        hcd.setNewVersionBehavior(true);
1834      }
1835      desc.addFamily(hcd);
1836      i++;
1837    }
1838    getAdmin().createTable(desc);
1839    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1840    // assigned
1841    waitUntilAllRegionsAssigned(tableName);
1842    return getConnection().getTable(tableName);
1843  }
1844
1845  /**
1846   * Create a table.
1847   * @param tableName
1848   * @param family
1849   * @param splitRows
1850   * @return A Table instance for the created table.
1851   * @throws IOException
1852   */
1853  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1854      throws IOException {
1855    HTableDescriptor desc = new HTableDescriptor(tableName);
1856    HColumnDescriptor hcd = new HColumnDescriptor(family);
1857    if (isNewVersionBehaviorEnabled()) {
1858      hcd.setNewVersionBehavior(true);
1859    }
1860    desc.addFamily(hcd);
1861    getAdmin().createTable(desc, splitRows);
1862    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1863    // assigned
1864    waitUntilAllRegionsAssigned(tableName);
1865    return getConnection().getTable(tableName);
1866  }
1867
1868  /**
1869   * Create a table with multiple regions.
1870   * @param tableName
1871   * @param family
1872   * @return A Table instance for the created table.
1873   * @throws IOException
1874   */
1875  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1876    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1877  }
1878
1879  /**
1880   * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
1881   */
1882  @SuppressWarnings("serial")
1883  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1884      throws IOException, InterruptedException {
1885    admin.modifyTable(desc);
1886    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
1887      setFirst(0);
1888      setSecond(0);
1889    }};
1890    int i = 0;
1891    do {
1892      status = admin.getAlterStatus(desc.getTableName());
1893      if (status.getSecond() != 0) {
1894        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
1895          + " regions updated.");
1896        Thread.sleep(1 * 1000L);
1897      } else {
1898        LOG.debug("All regions updated.");
1899        break;
1900      }
1901    } while (status.getFirst() != 0 && i++ < 500);
1902    if (status.getFirst() != 0) {
1903      throw new IOException("Failed to update all regions even after 500 seconds.");
1904    }
1905  }
1906
1907  /**
1908   * Set the number of Region replicas.
1909   */
1910  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1911      throws IOException, InterruptedException {
1912    admin.disableTable(table);
1913    HTableDescriptor desc = new HTableDescriptor(admin.getTableDescriptor(table));
1914    desc.setRegionReplication(replicaCount);
1915    admin.modifyTable(desc.getTableName(), desc);
1916    admin.enableTable(table);
1917  }
1918
1919  /**
1920   * Drop an existing table
1921   * @param tableName existing table
1922   */
1923  public void deleteTable(TableName tableName) throws IOException {
1924    try {
1925      getAdmin().disableTable(tableName);
1926    } catch (TableNotEnabledException e) {
1927      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1928    }
1929    getAdmin().deleteTable(tableName);
1930  }
1931
1932  /**
1933   * Drop an existing table
1934   * @param tableName existing table
1935   */
1936  public void deleteTableIfAny(TableName tableName) throws IOException {
1937    try {
1938      deleteTable(tableName);
1939    } catch (TableNotFoundException e) {
1940      // ignore
1941    }
1942  }
1943
1944  // ==========================================================================
1945  // Canned table and table descriptor creation
1946  // TODO replace HBaseTestCase
1947
1948  public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1949  public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1950  public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1951  public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1952  private static final int MAXVERSIONS = 3;
1953
1954  public static final char FIRST_CHAR = 'a';
1955  public static final char LAST_CHAR = 'z';
1956  public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1957  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1958
1959  /**
1960   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
1961   *   {@link #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)} instead.
1962   * @see #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)
1963   * @see <a href="https://issues.apache.org/jira/browse/HBASE-13893">HBASE-13893</a>
1964   */
1965  @Deprecated
1966  public HTableDescriptor createTableDescriptor(final String name,
1967      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1968    return this.createTableDescriptor(TableName.valueOf(name), minVersions, versions, ttl,
1969        keepDeleted);
1970  }
1971
1972  /**
1973   * Create a table of name <code>name</code>.
1974   * @param name Name to give table.
1975   * @return Column descriptor.
1976   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
1977   *   {@link #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)} instead.
1978   * @see #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)
1979   * @see <a href="https://issues.apache.org/jira/browse/HBASE-13893">HBASE-13893</a>
1980   */
1981  @Deprecated
1982  public HTableDescriptor createTableDescriptor(final String name) {
1983    return createTableDescriptor(TableName.valueOf(name),  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1984        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1985  }
1986
1987  public HTableDescriptor createTableDescriptor(final TableName name,
1988      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1989    HTableDescriptor htd = new HTableDescriptor(name);
1990    for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1991      HColumnDescriptor hcd = new HColumnDescriptor(cfName)
1992          .setMinVersions(minVersions)
1993          .setMaxVersions(versions)
1994          .setKeepDeletedCells(keepDeleted)
1995          .setBlockCacheEnabled(false)
1996          .setTimeToLive(ttl);
1997      if (isNewVersionBehaviorEnabled()) {
1998          hcd.setNewVersionBehavior(true);
1999      }
2000      htd.addFamily(hcd);
2001    }
2002    return htd;
2003  }
2004
2005  /**
2006   * Create a table of name <code>name</code>.
2007   * @param name Name to give table.
2008   * @return Column descriptor.
2009   */
2010  public HTableDescriptor createTableDescriptor(final TableName name) {
2011    return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
2012        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
2013  }
2014
2015  public HTableDescriptor createTableDescriptor(final TableName tableName,
2016      byte[] family) {
2017    return createTableDescriptor(tableName, new byte[][] {family}, 1);
2018  }
2019
2020  public HTableDescriptor createTableDescriptor(final TableName tableName,
2021      byte[][] families, int maxVersions) {
2022    HTableDescriptor desc = new HTableDescriptor(tableName);
2023    for (byte[] family : families) {
2024      HColumnDescriptor hcd = new HColumnDescriptor(family)
2025          .setMaxVersions(maxVersions);
2026      if (isNewVersionBehaviorEnabled()) {
2027          hcd.setNewVersionBehavior(true);
2028      }
2029      desc.addFamily(hcd);
2030    }
2031    return desc;
2032  }
2033
2034  /**
2035   * Create an HRegion that writes to the local tmp dirs
2036   * @param desc a table descriptor indicating which table the region belongs to
2037   * @param startKey the start boundary of the region
2038   * @param endKey the end boundary of the region
2039   * @return a region that writes to local dir for testing
2040   * @throws IOException
2041   */
2042  public HRegion createLocalHRegion(TableDescriptor desc, byte [] startKey,
2043      byte [] endKey)
2044  throws IOException {
2045    HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
2046    return createLocalHRegion(hri, desc);
2047  }
2048
2049  /**
2050   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
2051   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
2052   */
2053  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
2054    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
2055  }
2056
2057  /**
2058   * Create an HRegion that writes to the local tmp dirs with specified wal
2059   * @param info regioninfo
2060   * @param desc table descriptor
2061   * @param wal wal for this region.
2062   * @return created hregion
2063   * @throws IOException
2064   */
2065  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal)
2066      throws IOException {
2067    return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
2068  }
2069
2070  /**
2071   * @param tableName the name of the table
2072   * @param startKey the start key of the region
2073   * @param stopKey the stop key of the region
2074   * @param callingMethod the name of the calling method probably a test method
2075   * @param conf the configuration to use
2076   * @param isReadOnly {@code true} if the table is read only, {@code false} otherwise
2077   * @param families the column families to use
2078   * @throws IOException if an IO problem is encountered
2079   * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
2080   *         when done.
2081   * @deprecated since 2.0.0 and will be removed in 3.0.0. Use
2082   *   {@link #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...)}
2083   *   instead.
2084   * @see #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...)
2085   * @see <a href="https://issues.apache.org/jira/browse/HBASE-13893">HBASE-13893</a>
2086   */
2087  @Deprecated
2088  public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
2089      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
2090      WAL wal, byte[]... families) throws IOException {
2091    return this
2092        .createLocalHRegion(TableName.valueOf(tableName), startKey, stopKey, isReadOnly, durability,
2093            wal, families);
2094  }
2095
2096  /**
2097   * @param tableName
2098   * @param startKey
2099   * @param stopKey
2100   * @param isReadOnly
2101   * @param families
2102   * @return A region on which you must call
2103   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
2104   * @throws IOException
2105   */
2106  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
2107      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
2108    return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
2109        durability, wal, null, families);
2110  }
2111
2112  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
2113      byte[] stopKey,
2114      boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
2115      byte[]... families)
2116      throws IOException {
2117    HTableDescriptor htd = new HTableDescriptor(tableName);
2118    htd.setReadOnly(isReadOnly);
2119    int i=0;
2120    for (byte[] family : families) {
2121      HColumnDescriptor hcd = new HColumnDescriptor(family);
2122      if(compactedMemStore != null && i < compactedMemStore.length) {
2123        hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
2124      } else {
2125        hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
2126
2127      }
2128      i++;
2129      // Set default to be three versions.
2130      hcd.setMaxVersions(Integer.MAX_VALUE);
2131      htd.addFamily(hcd);
2132    }
2133    htd.setDurability(durability);
2134    HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
2135    return createLocalHRegion(info, htd, wal);
2136  }
2137
2138  //
2139  // ==========================================================================
2140
2141  /**
2142   * Provide an existing table name to truncate.
2143   * Scans the table and issues a delete for each row read.
2144   * @param tableName existing table
2145   * @return HTable to that new table
2146   * @throws IOException
2147   */
2148  public Table deleteTableData(TableName tableName) throws IOException {
2149    Table table = getConnection().getTable(tableName);
2150    Scan scan = new Scan();
2151    ResultScanner resScan = table.getScanner(scan);
2152    for(Result res : resScan) {
2153      Delete del = new Delete(res.getRow());
2154      table.delete(del);
2155    }
2156    resScan = table.getScanner(scan);
2157    resScan.close();
2158    return table;
2159  }
2160
2161  /**
2162   * Truncate a table using the admin command.
2163   * Effectively disables, deletes, and recreates the table.
2164   * @param tableName table which must exist.
2165   * @param preserveRegions keep the existing split points
2166   * @return HTable for the new table
2167   */
2168  public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws
2169      IOException {
2170    Admin admin = getAdmin();
2171    if (!admin.isTableDisabled(tableName)) {
2172      admin.disableTable(tableName);
2173    }
2174    admin.truncateTable(tableName, preserveRegions);
2175    return getConnection().getTable(tableName);
2176  }
2177
2178  /**
2179   * Truncate a table using the admin command.
2180   * Effectively disables, deletes, and recreates the table.
2181   * For previous behavior of issuing row deletes, see
2182   * deleteTableData.
2183   * Expressly does not preserve regions of existing table.
2184   * @param tableName table which must exist.
2185   * @return HTable for the new table
2186   */
2187  public Table truncateTable(final TableName tableName) throws IOException {
2188    return truncateTable(tableName, false);
2189  }
2190
2191  /**
2192   * Load table with rows from 'aaa' to 'zzz'.
2193   * @param t Table
2194   * @param f Family
2195   * @return Count of rows loaded.
2196   * @throws IOException
2197   */
2198  public int loadTable(final Table t, final byte[] f) throws IOException {
2199    return loadTable(t, new byte[][] {f});
2200  }
2201
2202  /**
2203   * Load table with rows from 'aaa' to 'zzz'.
2204   * @param t Table
2205   * @param f Family
2206   * @return Count of rows loaded.
2207   * @throws IOException
2208   */
2209  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2210    return loadTable(t, new byte[][] {f}, null, writeToWAL);
2211  }
2212
2213  /**
2214   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2215   * @param t Table
2216   * @param f Array of Families to load
2217   * @return Count of rows loaded.
2218   * @throws IOException
2219   */
2220  public int loadTable(final Table t, final byte[][] f) throws IOException {
2221    return loadTable(t, f, null);
2222  }
2223
2224  /**
2225   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2226   * @param t Table
2227   * @param f Array of Families to load
2228   * @param value the values of the cells. If null is passed, the row key is used as value
2229   * @return Count of rows loaded.
2230   * @throws IOException
2231   */
2232  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2233    return loadTable(t, f, value, true);
2234  }
2235
2236  /**
2237   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2238   * @param t Table
2239   * @param f Array of Families to load
2240   * @param value the values of the cells. If null is passed, the row key is used as value
2241   * @return Count of rows loaded.
2242   * @throws IOException
2243   */
2244  public int loadTable(final Table t, final byte[][] f, byte[] value,
2245      boolean writeToWAL) throws IOException {
2246    List<Put> puts = new ArrayList<>();
2247    for (byte[] row : HBaseTestingUtility.ROWS) {
2248      Put put = new Put(row);
2249      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2250      for (int i = 0; i < f.length; i++) {
2251        byte[] value1 = value != null ? value : row;
2252        put.addColumn(f[i], f[i], value1);
2253      }
2254      puts.add(put);
2255    }
2256    t.put(puts);
2257    return puts.size();
2258  }
2259
2260  /** A tracker for tracking and validating table rows
2261   * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])}
2262   */
2263  public static class SeenRowTracker {
2264    int dim = 'z' - 'a' + 1;
2265    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2266    byte[] startRow;
2267    byte[] stopRow;
2268
2269    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2270      this.startRow = startRow;
2271      this.stopRow = stopRow;
2272    }
2273
2274    void reset() {
2275      for (byte[] row : ROWS) {
2276        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2277      }
2278    }
2279
2280    int i(byte b) {
2281      return b - 'a';
2282    }
2283
2284    public void addRow(byte[] row) {
2285      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2286    }
2287
2288    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2289     * all other rows none
2290     */
2291    public void validate() {
2292      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2293        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2294          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2295            int count = seenRows[i(b1)][i(b2)][i(b3)];
2296            int expectedCount = 0;
2297            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2298                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2299              expectedCount = 1;
2300            }
2301            if (count != expectedCount) {
2302              String row = new String(new byte[] {b1,b2,b3}, StandardCharsets.UTF_8);
2303              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
2304                  "instead of " + expectedCount);
2305            }
2306          }
2307        }
2308      }
2309    }
2310  }
2311
2312  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2313    return loadRegion(r, f, false);
2314  }
2315
2316  public int loadRegion(final Region r, final byte[] f) throws IOException {
2317    return loadRegion((HRegion)r, f);
2318  }
2319
2320  /**
2321   * Load region with rows from 'aaa' to 'zzz'.
2322   * @param r Region
2323   * @param f Family
2324   * @param flush flush the cache if true
2325   * @return Count of rows loaded.
2326   * @throws IOException
2327   */
2328  public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2329  throws IOException {
2330    byte[] k = new byte[3];
2331    int rowCount = 0;
2332    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2333      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2334        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2335          k[0] = b1;
2336          k[1] = b2;
2337          k[2] = b3;
2338          Put put = new Put(k);
2339          put.setDurability(Durability.SKIP_WAL);
2340          put.addColumn(f, null, k);
2341          if (r.getWAL() == null) {
2342            put.setDurability(Durability.SKIP_WAL);
2343          }
2344          int preRowCount = rowCount;
2345          int pause = 10;
2346          int maxPause = 1000;
2347          while (rowCount == preRowCount) {
2348            try {
2349              r.put(put);
2350              rowCount++;
2351            } catch (RegionTooBusyException e) {
2352              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2353              Threads.sleep(pause);
2354            }
2355          }
2356        }
2357      }
2358      if (flush) {
2359        r.flush(true);
2360      }
2361    }
2362    return rowCount;
2363  }
2364
2365  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2366      throws IOException {
2367    for (int i = startRow; i < endRow; i++) {
2368      byte[] data = Bytes.toBytes(String.valueOf(i));
2369      Put put = new Put(data);
2370      put.addColumn(f, null, data);
2371      t.put(put);
2372    }
2373  }
2374
2375  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2376      throws IOException {
2377    Random r = new Random();
2378    byte[] row = new byte[rowSize];
2379    for (int i = 0; i < totalRows; i++) {
2380      r.nextBytes(row);
2381      Put put = new Put(row);
2382      put.addColumn(f, new byte[]{0}, new byte[]{0});
2383      t.put(put);
2384    }
2385  }
2386
2387  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2388      int replicaId)
2389      throws IOException {
2390    for (int i = startRow; i < endRow; i++) {
2391      String failMsg = "Failed verification of row :" + i;
2392      byte[] data = Bytes.toBytes(String.valueOf(i));
2393      Get get = new Get(data);
2394      get.setReplicaId(replicaId);
2395      get.setConsistency(Consistency.TIMELINE);
2396      Result result = table.get(get);
2397      assertTrue(failMsg, result.containsColumn(f, null));
2398      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2399      Cell cell = result.getColumnLatestCell(f, null);
2400      assertTrue(failMsg,
2401        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2402          cell.getValueLength()));
2403    }
2404  }
2405
2406  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2407      throws IOException {
2408    verifyNumericRows((HRegion)region, f, startRow, endRow);
2409  }
2410
2411  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2412      throws IOException {
2413    verifyNumericRows(region, f, startRow, endRow, true);
2414  }
2415
2416  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2417      final boolean present) throws IOException {
2418    verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2419  }
2420
2421  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2422      final boolean present) throws IOException {
2423    for (int i = startRow; i < endRow; i++) {
2424      String failMsg = "Failed verification of row :" + i;
2425      byte[] data = Bytes.toBytes(String.valueOf(i));
2426      Result result = region.get(new Get(data));
2427
2428      boolean hasResult = result != null && !result.isEmpty();
2429      assertEquals(failMsg + result, present, hasResult);
2430      if (!present) continue;
2431
2432      assertTrue(failMsg, result.containsColumn(f, null));
2433      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2434      Cell cell = result.getColumnLatestCell(f, null);
2435      assertTrue(failMsg,
2436        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2437          cell.getValueLength()));
2438    }
2439  }
2440
2441  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2442      throws IOException {
2443    for (int i = startRow; i < endRow; i++) {
2444      byte[] data = Bytes.toBytes(String.valueOf(i));
2445      Delete delete = new Delete(data);
2446      delete.addFamily(f);
2447      t.delete(delete);
2448    }
2449  }
2450
2451  /**
2452   * Return the number of rows in the given table.
2453   */
2454  public int countRows(final Table table) throws IOException {
2455    return countRows(table, new Scan());
2456  }
2457
2458  public int countRows(final Table table, final Scan scan) throws IOException {
2459    try (ResultScanner results = table.getScanner(scan)) {
2460      int count = 0;
2461      while (results.next() != null) {
2462        count++;
2463      }
2464      return count;
2465    }
2466  }
2467
2468  public int countRows(final Table table, final byte[]... families) throws IOException {
2469    Scan scan = new Scan();
2470    for (byte[] family: families) {
2471      scan.addFamily(family);
2472    }
2473    return countRows(table, scan);
2474  }
2475
2476  /**
2477   * Return the number of rows in the given table.
2478   */
2479  public int countRows(final TableName tableName) throws IOException {
2480    Table table = getConnection().getTable(tableName);
2481    try {
2482      return countRows(table);
2483    } finally {
2484      table.close();
2485    }
2486  }
2487
2488  public int countRows(final Region region) throws IOException {
2489    return countRows(region, new Scan());
2490  }
2491
2492  public int countRows(final Region region, final Scan scan) throws IOException {
2493    InternalScanner scanner = region.getScanner(scan);
2494    try {
2495      return countRows(scanner);
2496    } finally {
2497      scanner.close();
2498    }
2499  }
2500
2501  public int countRows(final InternalScanner scanner) throws IOException {
2502    int scannedCount = 0;
2503    List<Cell> results = new ArrayList<>();
2504    boolean hasMore = true;
2505    while (hasMore) {
2506      hasMore = scanner.next(results);
2507      scannedCount += results.size();
2508      results.clear();
2509    }
2510    return scannedCount;
2511  }
2512
2513  /**
2514   * Return an md5 digest of the entire contents of a table.
2515   */
2516  public String checksumRows(final Table table) throws Exception {
2517
2518    Scan scan = new Scan();
2519    ResultScanner results = table.getScanner(scan);
2520    MessageDigest digest = MessageDigest.getInstance("MD5");
2521    for (Result res : results) {
2522      digest.update(res.getRow());
2523    }
2524    results.close();
2525    return digest.toString();
2526  }
2527
2528  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2529  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2530  static {
2531    int i = 0;
2532    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2533      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2534        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2535          ROWS[i][0] = b1;
2536          ROWS[i][1] = b2;
2537          ROWS[i][2] = b3;
2538          i++;
2539        }
2540      }
2541    }
2542  }
2543
2544  public static final byte[][] KEYS = {
2545    HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2546    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2547    Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2548    Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2549    Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2550    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2551    Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2552    Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2553    Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2554  };
2555
2556  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2557      Bytes.toBytes("bbb"),
2558      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2559      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2560      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2561      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2562      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2563      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2564      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2565      Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2566  };
2567
2568  /**
2569   * Create rows in hbase:meta for regions of the specified table with the specified
2570   * start keys.  The first startKey should be a 0 length byte array if you
2571   * want to form a proper range of regions.
2572   * @param conf
2573   * @param htd
2574   * @param startKeys
2575   * @return list of region info for regions added to meta
2576   * @throws IOException
2577   * @deprecated since 2.0 version and will be removed in 3.0 version.
2578   *             use {@link #createMultiRegionsInMeta(Configuration, TableDescriptor, byte[][])}
2579   */
2580  @Deprecated
2581  public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2582      final HTableDescriptor htd, byte [][] startKeys) throws IOException {
2583    return createMultiRegionsInMeta(conf, (TableDescriptor) htd, startKeys)
2584        .stream().map(ImmutableHRegionInfo::new).collect(Collectors.toList());
2585  }
2586  /**
2587   * Create rows in hbase:meta for regions of the specified table with the specified
2588   * start keys.  The first startKey should be a 0 length byte array if you
2589   * want to form a proper range of regions.
2590   * @param conf
2591   * @param htd
2592   * @param startKeys
2593   * @return list of region info for regions added to meta
2594   * @throws IOException
2595   */
2596  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2597      final TableDescriptor htd, byte [][] startKeys)
2598  throws IOException {
2599    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2600    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2601    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2602    MetaTableAccessor
2603        .updateTableState(getConnection(), htd.getTableName(), TableState.State.ENABLED);
2604    // add custom ones
2605    for (int i = 0; i < startKeys.length; i++) {
2606      int j = (i + 1) % startKeys.length;
2607      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName())
2608          .setStartKey(startKeys[i])
2609          .setEndKey(startKeys[j])
2610          .build();
2611      MetaTableAccessor.addRegionToMeta(getConnection(), hri);
2612      newRegions.add(hri);
2613    }
2614
2615    meta.close();
2616    return newRegions;
2617  }
2618
2619  /**
2620   * Create an unmanaged WAL. Be sure to close it when you're through.
2621   */
2622  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2623      throws IOException {
2624    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2625    // unless I pass along via the conf.
2626    Configuration confForWAL = new Configuration(conf);
2627    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2628    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2629  }
2630
2631
2632  /**
2633   * Create a region with it's own WAL. Be sure to call
2634   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2635   */
2636  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2637      final Configuration conf, final TableDescriptor htd) throws IOException {
2638    return createRegionAndWAL(info, rootDir, conf, htd, true);
2639  }
2640
2641  /**
2642   * Create a region with it's own WAL. Be sure to call
2643   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2644   */
2645  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2646      final Configuration conf, final TableDescriptor htd, BlockCache blockCache)
2647      throws IOException {
2648    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2649    region.setBlockCache(blockCache);
2650    region.initialize();
2651    return region;
2652  }
2653  /**
2654   * Create a region with it's own WAL. Be sure to call
2655   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2656   */
2657  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2658      final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2659      throws IOException {
2660    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2661    region.setMobFileCache(mobFileCache);
2662    region.initialize();
2663    return region;
2664  }
2665
2666  /**
2667   * Create a region with it's own WAL. Be sure to call
2668   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2669   */
2670  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2671      final Configuration conf, final TableDescriptor htd, boolean initialize)
2672      throws IOException {
2673    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2674    WAL wal = createWal(conf, rootDir, info);
2675    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2676  }
2677
2678  /**
2679   * Returns all rows from the hbase:meta table.
2680   *
2681   * @throws IOException When reading the rows fails.
2682   */
2683  public List<byte[]> getMetaTableRows() throws IOException {
2684    // TODO: Redo using MetaTableAccessor class
2685    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2686    List<byte[]> rows = new ArrayList<>();
2687    ResultScanner s = t.getScanner(new Scan());
2688    for (Result result : s) {
2689      LOG.info("getMetaTableRows: row -> " +
2690        Bytes.toStringBinary(result.getRow()));
2691      rows.add(result.getRow());
2692    }
2693    s.close();
2694    t.close();
2695    return rows;
2696  }
2697
2698  /**
2699   * Returns all rows from the hbase:meta table for a given user table
2700   *
2701   * @throws IOException When reading the rows fails.
2702   */
2703  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2704    // TODO: Redo using MetaTableAccessor.
2705    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2706    List<byte[]> rows = new ArrayList<>();
2707    ResultScanner s = t.getScanner(new Scan());
2708    for (Result result : s) {
2709      RegionInfo info = MetaTableAccessor.getRegionInfo(result);
2710      if (info == null) {
2711        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2712        // TODO figure out what to do for this new hosed case.
2713        continue;
2714      }
2715
2716      if (info.getTable().equals(tableName)) {
2717        LOG.info("getMetaTableRows: row -> " +
2718            Bytes.toStringBinary(result.getRow()) + info);
2719        rows.add(result.getRow());
2720      }
2721    }
2722    s.close();
2723    t.close();
2724    return rows;
2725  }
2726
2727  /**
2728   * Returns all regions of the specified table
2729   *
2730   * @param tableName the table name
2731   * @return all regions of the specified table
2732   * @throws IOException when getting the regions fails.
2733   */
2734  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2735    try (Admin admin = getConnection().getAdmin()) {
2736      return admin.getRegions(tableName);
2737    }
2738  }
2739
2740  /*
2741   * Find any other region server which is different from the one identified by parameter
2742   * @param rs
2743   * @return another region server
2744   */
2745  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2746    for (JVMClusterUtil.RegionServerThread rst :
2747      getMiniHBaseCluster().getRegionServerThreads()) {
2748      if (!(rst.getRegionServer() == rs)) {
2749        return rst.getRegionServer();
2750      }
2751    }
2752    return null;
2753  }
2754
2755  /**
2756   * Tool to get the reference to the region server object that holds the
2757   * region of the specified user table.
2758   * @param tableName user table to lookup in hbase:meta
2759   * @return region server that holds it, null if the row doesn't exist
2760   * @throws IOException
2761   * @throws InterruptedException
2762   */
2763  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2764      throws IOException, InterruptedException {
2765    List<RegionInfo> regions = getRegions(tableName);
2766    if (regions == null || regions.isEmpty()) {
2767      return null;
2768    }
2769    LOG.debug("Found " + regions.size() + " regions for table " +
2770        tableName);
2771
2772    byte[] firstRegionName = regions.stream()
2773        .filter(r -> !r.isOffline())
2774        .map(RegionInfo::getRegionName)
2775        .findFirst()
2776        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2777
2778    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2779    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2780      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2781    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2782      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2783    RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2784    while(retrier.shouldRetry()) {
2785      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2786      if (index != -1) {
2787        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2788      }
2789      // Came back -1.  Region may not be online yet.  Sleep a while.
2790      retrier.sleepUntilNextRetry();
2791    }
2792    return null;
2793  }
2794
2795  /**
2796   * Starts a <code>MiniMRCluster</code> with a default number of
2797   * <code>TaskTracker</code>'s.
2798   *
2799   * @throws IOException When starting the cluster fails.
2800   */
2801  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2802    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2803    conf.setIfUnset(
2804        "yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2805        "99.0");
2806    startMiniMapReduceCluster(2);
2807    return mrCluster;
2808  }
2809
2810  /**
2811   * Tasktracker has a bug where changing the hadoop.log.dir system property
2812   * will not change its internal static LOG_DIR variable.
2813   */
2814  private void forceChangeTaskLogDir() {
2815    Field logDirField;
2816    try {
2817      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2818      logDirField.setAccessible(true);
2819
2820      Field modifiersField = Field.class.getDeclaredField("modifiers");
2821      modifiersField.setAccessible(true);
2822      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2823
2824      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2825    } catch (SecurityException e) {
2826      throw new RuntimeException(e);
2827    } catch (NoSuchFieldException e) {
2828      // TODO Auto-generated catch block
2829      throw new RuntimeException(e);
2830    } catch (IllegalArgumentException e) {
2831      throw new RuntimeException(e);
2832    } catch (IllegalAccessException e) {
2833      throw new RuntimeException(e);
2834    }
2835  }
2836
2837  /**
2838   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2839   * filesystem.
2840   * @param servers  The number of <code>TaskTracker</code>'s to start.
2841   * @throws IOException When starting the cluster fails.
2842   */
2843  private void startMiniMapReduceCluster(final int servers) throws IOException {
2844    if (mrCluster != null) {
2845      throw new IllegalStateException("MiniMRCluster is already running");
2846    }
2847    LOG.info("Starting mini mapreduce cluster...");
2848    setupClusterTestDir();
2849    createDirsAndSetProperties();
2850
2851    forceChangeTaskLogDir();
2852
2853    //// hadoop2 specific settings
2854    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2855    // we up the VM usable so that processes don't get killed.
2856    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2857
2858    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2859    // this avoids the problem by disabling speculative task execution in tests.
2860    conf.setBoolean("mapreduce.map.speculative", false);
2861    conf.setBoolean("mapreduce.reduce.speculative", false);
2862    ////
2863
2864    // Allow the user to override FS URI for this map-reduce cluster to use.
2865    mrCluster = new MiniMRCluster(servers,
2866      FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2867      null, null, new JobConf(this.conf));
2868    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2869    if (jobConf == null) {
2870      jobConf = mrCluster.createJobConf();
2871    }
2872
2873    jobConf.set("mapreduce.cluster.local.dir",
2874      conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2875    LOG.info("Mini mapreduce cluster started");
2876
2877    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2878    // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2879    // necessary config properties here.  YARN-129 required adding a few properties.
2880    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2881    // this for mrv2 support; mr1 ignores this
2882    conf.set("mapreduce.framework.name", "yarn");
2883    conf.setBoolean("yarn.is.minicluster", true);
2884    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2885    if (rmAddress != null) {
2886      conf.set("yarn.resourcemanager.address", rmAddress);
2887    }
2888    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2889    if (historyAddress != null) {
2890      conf.set("mapreduce.jobhistory.address", historyAddress);
2891    }
2892    String schedulerAddress =
2893      jobConf.get("yarn.resourcemanager.scheduler.address");
2894    if (schedulerAddress != null) {
2895      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2896    }
2897    String mrJobHistoryWebappAddress =
2898      jobConf.get("mapreduce.jobhistory.webapp.address");
2899    if (mrJobHistoryWebappAddress != null) {
2900      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2901    }
2902    String yarnRMWebappAddress =
2903      jobConf.get("yarn.resourcemanager.webapp.address");
2904    if (yarnRMWebappAddress != null) {
2905      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2906    }
2907  }
2908
2909  /**
2910   * Stops the previously started <code>MiniMRCluster</code>.
2911   */
2912  public void shutdownMiniMapReduceCluster() {
2913    if (mrCluster != null) {
2914      LOG.info("Stopping mini mapreduce cluster...");
2915      mrCluster.shutdown();
2916      mrCluster = null;
2917      LOG.info("Mini mapreduce cluster stopped");
2918    }
2919    // Restore configuration to point to local jobtracker
2920    conf.set("mapreduce.jobtracker.address", "local");
2921  }
2922
2923  /**
2924   * Create a stubbed out RegionServerService, mainly for getting FS.
2925   */
2926  public RegionServerServices createMockRegionServerService() throws IOException {
2927    return createMockRegionServerService((ServerName)null);
2928  }
2929
2930  /**
2931   * Create a stubbed out RegionServerService, mainly for getting FS.
2932   * This version is used by TestTokenAuthentication
2933   */
2934  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
2935      IOException {
2936    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2937    rss.setFileSystem(getTestFileSystem());
2938    rss.setRpcServer(rpc);
2939    return rss;
2940  }
2941
2942  /**
2943   * Create a stubbed out RegionServerService, mainly for getting FS.
2944   * This version is used by TestOpenRegionHandler
2945   */
2946  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2947    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2948    rss.setFileSystem(getTestFileSystem());
2949    return rss;
2950  }
2951
2952  /**
2953   * Switches the logger for the given class to DEBUG level.
2954   *
2955   * @param clazz  The class for which to switch to debug logging.
2956   */
2957  public void enableDebug(Class<?> clazz) {
2958    Logger l = LoggerFactory.getLogger(clazz);
2959    if (l instanceof Log4JLogger) {
2960      ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2961    } else if (l instanceof Log4jLoggerAdapter) {
2962      LogManager.getLogger(clazz).setLevel(org.apache.log4j.Level.DEBUG);
2963    } else if (l instanceof Jdk14Logger) {
2964      ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2965    }
2966  }
2967
2968  /**
2969   * Expire the Master's session
2970   * @throws Exception
2971   */
2972  public void expireMasterSession() throws Exception {
2973    HMaster master = getMiniHBaseCluster().getMaster();
2974    expireSession(master.getZooKeeper(), false);
2975  }
2976
2977  /**
2978   * Expire a region server's session
2979   * @param index which RS
2980   */
2981  public void expireRegionServerSession(int index) throws Exception {
2982    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2983    expireSession(rs.getZooKeeper(), false);
2984    decrementMinRegionServerCount();
2985  }
2986
2987  private void decrementMinRegionServerCount() {
2988    // decrement the count for this.conf, for newly spwaned master
2989    // this.hbaseCluster shares this configuration too
2990    decrementMinRegionServerCount(getConfiguration());
2991
2992    // each master thread keeps a copy of configuration
2993    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2994      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2995    }
2996  }
2997
2998  private void decrementMinRegionServerCount(Configuration conf) {
2999    int currentCount = conf.getInt(
3000        ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
3001    if (currentCount != -1) {
3002      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
3003          Math.max(currentCount - 1, 1));
3004    }
3005  }
3006
3007  public void expireSession(ZKWatcher nodeZK) throws Exception {
3008   expireSession(nodeZK, false);
3009  }
3010
3011  /**
3012   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
3013   * http://hbase.apache.org/book.html#trouble.zookeeper
3014   * There are issues when doing this:
3015   * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
3016   * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
3017   *
3018   * @param nodeZK - the ZK watcher to expire
3019   * @param checkStatus - true to check if we can create a Table with the
3020   *                    current configuration.
3021   */
3022  public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
3023    throws Exception {
3024    Configuration c = new Configuration(this.conf);
3025    String quorumServers = ZKConfig.getZKQuorumServersString(c);
3026    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
3027    byte[] password = zk.getSessionPasswd();
3028    long sessionID = zk.getSessionId();
3029
3030    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
3031    //  so we create a first watcher to be sure that the
3032    //  event was sent. We expect that if our watcher receives the event
3033    //  other watchers on the same machine will get is as well.
3034    // When we ask to close the connection, ZK does not close it before
3035    //  we receive all the events, so don't have to capture the event, just
3036    //  closing the connection should be enough.
3037    ZooKeeper monitor = new ZooKeeper(quorumServers,
3038      1000, new org.apache.zookeeper.Watcher(){
3039      @Override
3040      public void process(WatchedEvent watchedEvent) {
3041        LOG.info("Monitor ZKW received event="+watchedEvent);
3042      }
3043    } , sessionID, password);
3044
3045    // Making it expire
3046    ZooKeeper newZK = new ZooKeeper(quorumServers,
3047        1000, EmptyWatcher.instance, sessionID, password);
3048
3049    //ensure that we have connection to the server before closing down, otherwise
3050    //the close session event will be eaten out before we start CONNECTING state
3051    long start = System.currentTimeMillis();
3052    while (newZK.getState() != States.CONNECTED
3053         && System.currentTimeMillis() - start < 1000) {
3054       Thread.sleep(1);
3055    }
3056    newZK.close();
3057    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
3058
3059    // Now closing & waiting to be sure that the clients get it.
3060    monitor.close();
3061
3062    if (checkStatus) {
3063      getConnection().getTable(TableName.META_TABLE_NAME).close();
3064    }
3065  }
3066
3067  /**
3068   * Get the Mini HBase cluster.
3069   *
3070   * @return hbase cluster
3071   * @see #getHBaseClusterInterface()
3072   */
3073  public MiniHBaseCluster getHBaseCluster() {
3074    return getMiniHBaseCluster();
3075  }
3076
3077  /**
3078   * Returns the HBaseCluster instance.
3079   * <p>Returned object can be any of the subclasses of HBaseCluster, and the
3080   * tests referring this should not assume that the cluster is a mini cluster or a
3081   * distributed one. If the test only works on a mini cluster, then specific
3082   * method {@link #getMiniHBaseCluster()} can be used instead w/o the
3083   * need to type-cast.
3084   */
3085  public HBaseCluster getHBaseClusterInterface() {
3086    //implementation note: we should rename this method as #getHBaseCluster(),
3087    //but this would require refactoring 90+ calls.
3088    return hbaseCluster;
3089  }
3090
3091  /**
3092   * Get a Connection to the cluster.
3093   * Not thread-safe (This class needs a lot of work to make it thread-safe).
3094   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
3095   * @throws IOException
3096   */
3097  public Connection getConnection() throws IOException {
3098    if (this.connection == null) {
3099      this.connection = ConnectionFactory.createConnection(this.conf);
3100    }
3101    return this.connection;
3102  }
3103
3104  /**
3105   * Returns a Admin instance.
3106   * This instance is shared between HBaseTestingUtility instance users. Closing it has no effect,
3107   * it will be closed automatically when the cluster shutdowns
3108   *
3109   * @return HBaseAdmin instance which is guaranteed to support only {@link Admin} interface.
3110   *   Functions in HBaseAdmin not provided by {@link Admin} interface can be changed/deleted
3111   *   anytime.
3112   * @deprecated Since 2.0. Will be removed in 3.0. Use {@link #getAdmin()} instead.
3113   */
3114  @Deprecated
3115  public synchronized HBaseAdmin getHBaseAdmin()
3116  throws IOException {
3117    if (hbaseAdmin == null){
3118      this.hbaseAdmin = (HBaseAdmin) getConnection().getAdmin();
3119    }
3120    return hbaseAdmin;
3121  }
3122
3123  /**
3124   * Returns an Admin instance which is shared between HBaseTestingUtility instance users.
3125   * Closing it has no effect, it will be closed automatically when the cluster shutdowns
3126   */
3127  public synchronized Admin getAdmin() throws IOException {
3128    if (hbaseAdmin == null){
3129      this.hbaseAdmin = (HBaseAdmin) getConnection().getAdmin();
3130    }
3131    return hbaseAdmin;
3132  }
3133
3134  private HBaseAdmin hbaseAdmin = null;
3135
3136  /**
3137   * Returns an {@link Hbck} instance. Needs be closed when done.
3138   */
3139  public Hbck getHbck() throws IOException {
3140    return getConnection().getHbck();
3141  }
3142
3143  /**
3144   * Unassign the named region.
3145   *
3146   * @param regionName  The region to unassign.
3147   */
3148  public void unassignRegion(String regionName) throws IOException {
3149    unassignRegion(Bytes.toBytes(regionName));
3150  }
3151
3152  /**
3153   * Unassign the named region.
3154   *
3155   * @param regionName  The region to unassign.
3156   */
3157  public void unassignRegion(byte[] regionName) throws IOException {
3158    getAdmin().unassign(regionName, true);
3159  }
3160
3161  /**
3162   * Closes the region containing the given row.
3163   *
3164   * @param row  The row to find the containing region.
3165   * @param table  The table to find the region.
3166   */
3167  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
3168    unassignRegionByRow(Bytes.toBytes(row), table);
3169  }
3170
3171  /**
3172   * Closes the region containing the given row.
3173   *
3174   * @param row  The row to find the containing region.
3175   * @param table  The table to find the region.
3176   * @throws IOException
3177   */
3178  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
3179    HRegionLocation hrl = table.getRegionLocation(row);
3180    unassignRegion(hrl.getRegionInfo().getRegionName());
3181  }
3182
3183  /*
3184   * Retrieves a splittable region randomly from tableName
3185   *
3186   * @param tableName name of table
3187   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3188   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3189   */
3190  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3191    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3192    int regCount = regions.size();
3193    Set<Integer> attempted = new HashSet<>();
3194    int idx;
3195    int attempts = 0;
3196    do {
3197      regions = getHBaseCluster().getRegions(tableName);
3198      if (regCount != regions.size()) {
3199        // if there was region movement, clear attempted Set
3200        attempted.clear();
3201      }
3202      regCount = regions.size();
3203      // There are chances that before we get the region for the table from an RS the region may
3204      // be going for CLOSE.  This may be because online schema change is enabled
3205      if (regCount > 0) {
3206        idx = random.nextInt(regCount);
3207        // if we have just tried this region, there is no need to try again
3208        if (attempted.contains(idx))
3209          continue;
3210        try {
3211          regions.get(idx).checkSplit();
3212          return regions.get(idx);
3213        } catch (Exception ex) {
3214          LOG.warn("Caught exception", ex);
3215          attempted.add(idx);
3216        }
3217      }
3218      attempts++;
3219    } while (maxAttempts == -1 || attempts < maxAttempts);
3220    return null;
3221  }
3222
3223  public MiniDFSCluster getDFSCluster() {
3224    return dfsCluster;
3225  }
3226
3227  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3228    setDFSCluster(cluster, true);
3229  }
3230
3231  /**
3232   * Set the MiniDFSCluster
3233   * @param cluster cluster to use
3234   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3235   * it is set.
3236   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3237   * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3238   */
3239  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3240      throws IllegalStateException, IOException {
3241    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3242      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3243    }
3244    this.dfsCluster = cluster;
3245    this.setFs();
3246  }
3247
3248  public FileSystem getTestFileSystem() throws IOException {
3249    return HFileSystem.get(conf);
3250  }
3251
3252  /**
3253   * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3254   * (30 seconds).
3255   * @param table Table to wait on.
3256   * @throws InterruptedException
3257   * @throws IOException
3258   */
3259  public void waitTableAvailable(TableName table)
3260      throws InterruptedException, IOException {
3261    waitTableAvailable(table.getName(), 30000);
3262  }
3263
3264  public void waitTableAvailable(TableName table, long timeoutMillis)
3265      throws InterruptedException, IOException {
3266    waitFor(timeoutMillis, predicateTableAvailable(table));
3267  }
3268
3269  /**
3270   * Wait until all regions in a table have been assigned
3271   * @param table Table to wait on.
3272   * @param timeoutMillis Timeout.
3273   * @throws InterruptedException
3274   * @throws IOException
3275   */
3276  public void waitTableAvailable(byte[] table, long timeoutMillis)
3277  throws InterruptedException, IOException {
3278    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3279  }
3280
3281  public String explainTableAvailability(TableName tableName) throws IOException {
3282    String msg = explainTableState(tableName, TableState.State.ENABLED) + ", ";
3283    if (getHBaseCluster().getMaster().isAlive()) {
3284      Map<RegionInfo, ServerName> assignments =
3285          getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
3286              .getRegionAssignments();
3287      final List<Pair<RegionInfo, ServerName>> metaLocations =
3288          MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
3289      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3290        RegionInfo hri = metaLocation.getFirst();
3291        ServerName sn = metaLocation.getSecond();
3292        if (!assignments.containsKey(hri)) {
3293          msg += ", region " + hri
3294              + " not assigned, but found in meta, it expected to be on " + sn;
3295
3296        } else if (sn == null) {
3297          msg += ",  region " + hri
3298              + " assigned,  but has no server in meta";
3299        } else if (!sn.equals(assignments.get(hri))) {
3300          msg += ",  region " + hri
3301              + " assigned,  but has different servers in meta and AM ( " +
3302              sn + " <> " + assignments.get(hri);
3303        }
3304      }
3305    }
3306    return msg;
3307  }
3308
3309  public String explainTableState(final TableName table, TableState.State state)
3310      throws IOException {
3311    TableState tableState = MetaTableAccessor.getTableState(connection, table);
3312    if (tableState == null) {
3313      return "TableState in META: No table state in META for table " + table
3314          + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3315    } else if (!tableState.inStates(state)) {
3316      return "TableState in META: Not " + state + " state, but " + tableState;
3317    } else {
3318      return "TableState in META: OK";
3319    }
3320  }
3321
3322  @Nullable
3323  public TableState findLastTableState(final TableName table) throws IOException {
3324    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3325    MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3326      @Override
3327      public boolean visit(Result r) throws IOException {
3328        if (!Arrays.equals(r.getRow(), table.getName()))
3329          return false;
3330        TableState state = MetaTableAccessor.getTableState(r);
3331        if (state != null)
3332          lastTableState.set(state);
3333        return true;
3334      }
3335    };
3336    MetaTableAccessor
3337        .scanMeta(connection, null, null,
3338            MetaTableAccessor.QueryType.TABLE,
3339            Integer.MAX_VALUE, visitor);
3340    return lastTableState.get();
3341  }
3342
3343  /**
3344   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3345   * regions have been all assigned.  Will timeout after default period (30 seconds)
3346   * Tolerates nonexistent table.
3347   * @param table the table to wait on.
3348   * @throws InterruptedException if interrupted while waiting
3349   * @throws IOException if an IO problem is encountered
3350   */
3351  public void waitTableEnabled(TableName table)
3352      throws InterruptedException, IOException {
3353    waitTableEnabled(table, 30000);
3354  }
3355
3356  /**
3357   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3358   * regions have been all assigned.
3359   * @see #waitTableEnabled(TableName, long)
3360   * @param table Table to wait on.
3361   * @param timeoutMillis Time to wait on it being marked enabled.
3362   * @throws InterruptedException
3363   * @throws IOException
3364   */
3365  public void waitTableEnabled(byte[] table, long timeoutMillis)
3366  throws InterruptedException, IOException {
3367    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3368  }
3369
3370  public void waitTableEnabled(TableName table, long timeoutMillis)
3371  throws IOException {
3372    waitFor(timeoutMillis, predicateTableEnabled(table));
3373  }
3374
3375  /**
3376   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3377   * Will timeout after default period (30 seconds)
3378   * @param table Table to wait on.
3379   * @throws InterruptedException
3380   * @throws IOException
3381   */
3382  public void waitTableDisabled(byte[] table)
3383          throws InterruptedException, IOException {
3384    waitTableDisabled(table, 30000);
3385  }
3386
3387  public void waitTableDisabled(TableName table, long millisTimeout)
3388          throws InterruptedException, IOException {
3389    waitFor(millisTimeout, predicateTableDisabled(table));
3390  }
3391
3392  /**
3393   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3394   * @param table Table to wait on.
3395   * @param timeoutMillis Time to wait on it being marked disabled.
3396   * @throws InterruptedException
3397   * @throws IOException
3398   */
3399  public void waitTableDisabled(byte[] table, long timeoutMillis)
3400          throws InterruptedException, IOException {
3401    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3402  }
3403
3404  /**
3405   * Make sure that at least the specified number of region servers
3406   * are running
3407   * @param num minimum number of region servers that should be running
3408   * @return true if we started some servers
3409   * @throws IOException
3410   */
3411  public boolean ensureSomeRegionServersAvailable(final int num)
3412      throws IOException {
3413    boolean startedServer = false;
3414    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3415    for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3416      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3417      startedServer = true;
3418    }
3419
3420    return startedServer;
3421  }
3422
3423
3424  /**
3425   * Make sure that at least the specified number of region servers
3426   * are running. We don't count the ones that are currently stopping or are
3427   * stopped.
3428   * @param num minimum number of region servers that should be running
3429   * @return true if we started some servers
3430   * @throws IOException
3431   */
3432  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3433    throws IOException {
3434    boolean startedServer = ensureSomeRegionServersAvailable(num);
3435
3436    int nonStoppedServers = 0;
3437    for (JVMClusterUtil.RegionServerThread rst :
3438      getMiniHBaseCluster().getRegionServerThreads()) {
3439
3440      HRegionServer hrs = rst.getRegionServer();
3441      if (hrs.isStopping() || hrs.isStopped()) {
3442        LOG.info("A region server is stopped or stopping:"+hrs);
3443      } else {
3444        nonStoppedServers++;
3445      }
3446    }
3447    for (int i=nonStoppedServers; i<num; ++i) {
3448      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3449      startedServer = true;
3450    }
3451    return startedServer;
3452  }
3453
3454
3455  /**
3456   * This method clones the passed <code>c</code> configuration setting a new
3457   * user into the clone.  Use it getting new instances of FileSystem.  Only
3458   * works for DistributedFileSystem w/o Kerberos.
3459   * @param c Initial configuration
3460   * @param differentiatingSuffix Suffix to differentiate this user from others.
3461   * @return A new configuration instance with a different user set into it.
3462   * @throws IOException
3463   */
3464  public static User getDifferentUser(final Configuration c,
3465    final String differentiatingSuffix)
3466  throws IOException {
3467    FileSystem currentfs = FileSystem.get(c);
3468    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3469      return User.getCurrent();
3470    }
3471    // Else distributed filesystem.  Make a new instance per daemon.  Below
3472    // code is taken from the AppendTestUtil over in hdfs.
3473    String username = User.getCurrent().getName() +
3474      differentiatingSuffix;
3475    User user = User.createUserForTesting(c, username,
3476        new String[]{"supergroup"});
3477    return user;
3478  }
3479
3480  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3481      throws IOException {
3482    NavigableSet<String> online = new TreeSet<>();
3483    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3484      try {
3485        for (RegionInfo region :
3486            ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3487          online.add(region.getRegionNameAsString());
3488        }
3489      } catch (RegionServerStoppedException e) {
3490        // That's fine.
3491      }
3492    }
3493    for (MasterThread mt : cluster.getLiveMasterThreads()) {
3494      try {
3495        for (RegionInfo region :
3496            ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3497          online.add(region.getRegionNameAsString());
3498        }
3499      } catch (RegionServerStoppedException e) {
3500        // That's fine.
3501      } catch (ServerNotRunningYetException e) {
3502        // That's fine.
3503      }
3504    }
3505    return online;
3506  }
3507
3508  /**
3509   * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3510   * makes tests linger.  Here is the exception you'll see:
3511   * <pre>
3512   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3513   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3514   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3515   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3516   * </pre>
3517   * @param stream A DFSClient.DFSOutputStream.
3518   * @param max
3519   * @throws NoSuchFieldException
3520   * @throws SecurityException
3521   * @throws IllegalAccessException
3522   * @throws IllegalArgumentException
3523   */
3524  public static void setMaxRecoveryErrorCount(final OutputStream stream,
3525      final int max) {
3526    try {
3527      Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3528      for (Class<?> clazz: clazzes) {
3529        String className = clazz.getSimpleName();
3530        if (className.equals("DFSOutputStream")) {
3531          if (clazz.isInstance(stream)) {
3532            Field maxRecoveryErrorCountField =
3533              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3534            maxRecoveryErrorCountField.setAccessible(true);
3535            maxRecoveryErrorCountField.setInt(stream, max);
3536            break;
3537          }
3538        }
3539      }
3540    } catch (Exception e) {
3541      LOG.info("Could not set max recovery field", e);
3542    }
3543  }
3544
3545  /**
3546   * Uses directly the assignment manager to assign the region. and waits until the specified region
3547   * has completed assignment.
3548   * @return true if the region is assigned false otherwise.
3549   */
3550  public boolean assignRegion(final RegionInfo regionInfo)
3551      throws IOException, InterruptedException {
3552    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3553    am.assign(regionInfo);
3554    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3555  }
3556
3557  /**
3558   * Move region to destination server and wait till region is completely moved and online
3559   *
3560   * @param destRegion region to move
3561   * @param destServer destination server of the region
3562   * @throws InterruptedException
3563   * @throws IOException
3564   */
3565  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3566      throws InterruptedException, IOException {
3567    HMaster master = getMiniHBaseCluster().getMaster();
3568    // TODO: Here we start the move. The move can take a while.
3569    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3570    while (true) {
3571      ServerName serverName = master.getAssignmentManager().getRegionStates()
3572          .getRegionServerOfRegion(destRegion);
3573      if (serverName != null && serverName.equals(destServer)) {
3574        assertRegionOnServer(destRegion, serverName, 2000);
3575        break;
3576      }
3577      Thread.sleep(10);
3578    }
3579  }
3580
3581  /**
3582   * Wait until all regions for a table in hbase:meta have a non-empty
3583   * info:server, up to a configuable timeout value (default is 60 seconds)
3584   * This means all regions have been deployed,
3585   * master has been informed and updated hbase:meta with the regions deployed
3586   * server.
3587   * @param tableName the table name
3588   * @throws IOException
3589   */
3590  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3591    waitUntilAllRegionsAssigned(tableName,
3592      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3593  }
3594
3595  /**
3596   * Waith until all system table's regions get assigned
3597   * @throws IOException
3598   */
3599  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3600    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3601    waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME);
3602  }
3603
3604  /**
3605   * Wait until all regions for a table in hbase:meta have a non-empty
3606   * info:server, or until timeout.  This means all regions have been deployed,
3607   * master has been informed and updated hbase:meta with the regions deployed
3608   * server.
3609   * @param tableName the table name
3610   * @param timeout timeout, in milliseconds
3611   * @throws IOException
3612   */
3613  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3614      throws IOException {
3615    if (!TableName.isMetaTableName(tableName)) {
3616      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3617        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " +
3618            timeout + "ms");
3619        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3620          @Override
3621          public String explainFailure() throws IOException {
3622            return explainTableAvailability(tableName);
3623          }
3624
3625          @Override
3626          public boolean evaluate() throws IOException {
3627            Scan scan = new Scan();
3628            scan.addFamily(HConstants.CATALOG_FAMILY);
3629            boolean tableFound = false;
3630            try (ResultScanner s = meta.getScanner(scan)) {
3631              for (Result r; (r = s.next()) != null;) {
3632                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3633                HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3634                if (info != null && info.getTable().equals(tableName)) {
3635                  // Get server hosting this region from catalog family. Return false if no server
3636                  // hosting this region, or if the server hosting this region was recently killed
3637                  // (for fault tolerance testing).
3638                  tableFound = true;
3639                  byte[] server =
3640                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3641                  if (server == null) {
3642                    return false;
3643                  } else {
3644                    byte[] startCode =
3645                        r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3646                    ServerName serverName =
3647                        ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," +
3648                            Bytes.toLong(startCode));
3649                    if (!getHBaseClusterInterface().isDistributedCluster() &&
3650                        getHBaseCluster().isKilledRS(serverName)) {
3651                      return false;
3652                    }
3653                  }
3654                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3655                    return false;
3656                  }
3657                }
3658              }
3659            }
3660            if (!tableFound) {
3661              LOG.warn("Didn't find the entries for table " + tableName + " in meta, already deleted?");
3662            }
3663            return tableFound;
3664          }
3665        });
3666      }
3667    }
3668    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3669    // check from the master state if we are using a mini cluster
3670    if (!getHBaseClusterInterface().isDistributedCluster()) {
3671      // So, all regions are in the meta table but make sure master knows of the assignments before
3672      // returning -- sometimes this can lag.
3673      HMaster master = getHBaseCluster().getMaster();
3674      final RegionStates states = master.getAssignmentManager().getRegionStates();
3675      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3676        @Override
3677        public String explainFailure() throws IOException {
3678          return explainTableAvailability(tableName);
3679        }
3680
3681        @Override
3682        public boolean evaluate() throws IOException {
3683          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3684          return hris != null && !hris.isEmpty();
3685        }
3686      });
3687    }
3688    LOG.info("All regions for table " + tableName + " assigned.");
3689  }
3690
3691  /**
3692   * Do a small get/scan against one store. This is required because store
3693   * has no actual methods of querying itself, and relies on StoreScanner.
3694   */
3695  public static List<Cell> getFromStoreFile(HStore store,
3696                                                Get get) throws IOException {
3697    Scan scan = new Scan(get);
3698    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3699        scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3700        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3701        // readpoint 0.
3702        0);
3703
3704    List<Cell> result = new ArrayList<>();
3705    scanner.next(result);
3706    if (!result.isEmpty()) {
3707      // verify that we are on the row we want:
3708      Cell kv = result.get(0);
3709      if (!CellUtil.matchingRows(kv, get.getRow())) {
3710        result.clear();
3711      }
3712    }
3713    scanner.close();
3714    return result;
3715  }
3716
3717  /**
3718   * Create region split keys between startkey and endKey
3719   *
3720   * @param startKey
3721   * @param endKey
3722   * @param numRegions the number of regions to be created. it has to be greater than 3.
3723   * @return resulting split keys
3724   */
3725  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3726    assertTrue(numRegions>3);
3727    byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3728    byte [][] result = new byte[tmpSplitKeys.length+1][];
3729    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3730    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3731    return result;
3732  }
3733
3734  /**
3735   * Do a small get/scan against one store. This is required because store
3736   * has no actual methods of querying itself, and relies on StoreScanner.
3737   */
3738  public static List<Cell> getFromStoreFile(HStore store,
3739                                                byte [] row,
3740                                                NavigableSet<byte[]> columns
3741                                                ) throws IOException {
3742    Get get = new Get(row);
3743    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3744    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3745
3746    return getFromStoreFile(store,get);
3747  }
3748
3749  public static void assertKVListsEqual(String additionalMsg,
3750      final List<? extends Cell> expected,
3751      final List<? extends Cell> actual) {
3752    final int eLen = expected.size();
3753    final int aLen = actual.size();
3754    final int minLen = Math.min(eLen, aLen);
3755
3756    int i;
3757    for (i = 0; i < minLen
3758        && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0;
3759        ++i) {}
3760
3761    if (additionalMsg == null) {
3762      additionalMsg = "";
3763    }
3764    if (!additionalMsg.isEmpty()) {
3765      additionalMsg = ". " + additionalMsg;
3766    }
3767
3768    if (eLen != aLen || i != minLen) {
3769      throw new AssertionError(
3770          "Expected and actual KV arrays differ at position " + i + ": " +
3771          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3772          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3773    }
3774  }
3775
3776  public static <T> String safeGetAsStr(List<T> lst, int i) {
3777    if (0 <= i && i < lst.size()) {
3778      return lst.get(i).toString();
3779    } else {
3780      return "<out_of_range>";
3781    }
3782  }
3783
3784  public String getClusterKey() {
3785    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3786        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3787        + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3788            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3789  }
3790
3791  /** Creates a random table with the given parameters */
3792  public Table createRandomTable(TableName tableName,
3793      final Collection<String> families,
3794      final int maxVersions,
3795      final int numColsPerRow,
3796      final int numFlushes,
3797      final int numRegions,
3798      final int numRowsPerFlush)
3799      throws IOException, InterruptedException {
3800
3801    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3802        " regions, " + numFlushes + " storefiles per region, " +
3803        numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3804        "\n");
3805
3806    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3807    final int numCF = families.size();
3808    final byte[][] cfBytes = new byte[numCF][];
3809    {
3810      int cfIndex = 0;
3811      for (String cf : families) {
3812        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3813      }
3814    }
3815
3816    final int actualStartKey = 0;
3817    final int actualEndKey = Integer.MAX_VALUE;
3818    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3819    final int splitStartKey = actualStartKey + keysPerRegion;
3820    final int splitEndKey = actualEndKey - keysPerRegion;
3821    final String keyFormat = "%08x";
3822    final Table table = createTable(tableName, cfBytes,
3823        maxVersions,
3824        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3825        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3826        numRegions);
3827
3828    if (hbaseCluster != null) {
3829      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3830    }
3831
3832    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3833
3834    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3835      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3836        final byte[] row = Bytes.toBytes(String.format(keyFormat,
3837            actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3838
3839        Put put = new Put(row);
3840        Delete del = new Delete(row);
3841        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3842          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3843          final long ts = rand.nextInt();
3844          final byte[] qual = Bytes.toBytes("col" + iCol);
3845          if (rand.nextBoolean()) {
3846            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3847                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3848                ts + "_random_" + rand.nextLong());
3849            put.addColumn(cf, qual, ts, value);
3850          } else if (rand.nextDouble() < 0.8) {
3851            del.addColumn(cf, qual, ts);
3852          } else {
3853            del.addColumns(cf, qual, ts);
3854          }
3855        }
3856
3857        if (!put.isEmpty()) {
3858          mutator.mutate(put);
3859        }
3860
3861        if (!del.isEmpty()) {
3862          mutator.mutate(del);
3863        }
3864      }
3865      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3866      mutator.flush();
3867      if (hbaseCluster != null) {
3868        getMiniHBaseCluster().flushcache(table.getName());
3869      }
3870    }
3871    mutator.close();
3872
3873    return table;
3874  }
3875
3876  private static Random random = new Random();
3877
3878  private static final PortAllocator portAllocator = new PortAllocator(random);
3879
3880  public static int randomFreePort() {
3881    return portAllocator.randomFreePort();
3882  }
3883
3884  static class PortAllocator {
3885    private static final int MIN_RANDOM_PORT = 0xc000;
3886    private static final int MAX_RANDOM_PORT = 0xfffe;
3887
3888    /** A set of ports that have been claimed using {@link #randomFreePort()}. */
3889    private final Set<Integer> takenRandomPorts = new HashSet<>();
3890
3891    private final Random random;
3892    private final AvailablePortChecker portChecker;
3893
3894    public PortAllocator(Random random) {
3895      this.random = random;
3896      this.portChecker = new AvailablePortChecker() {
3897        @Override
3898        public boolean available(int port) {
3899          try {
3900            ServerSocket sock = new ServerSocket(port);
3901            sock.close();
3902            return true;
3903          } catch (IOException ex) {
3904            return false;
3905          }
3906        }
3907      };
3908    }
3909
3910    public PortAllocator(Random random, AvailablePortChecker portChecker) {
3911      this.random = random;
3912      this.portChecker = portChecker;
3913    }
3914
3915    /**
3916     * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3917     * called from single-threaded test setup code/
3918     */
3919    public int randomFreePort() {
3920      int port = 0;
3921      do {
3922        port = randomPort();
3923        if (takenRandomPorts.contains(port)) {
3924          port = 0;
3925          continue;
3926        }
3927        takenRandomPorts.add(port);
3928
3929        if (!portChecker.available(port)) {
3930          port = 0;
3931        }
3932      } while (port == 0);
3933      return port;
3934    }
3935
3936    /**
3937     * Returns a random port. These ports cannot be registered with IANA and are
3938     * intended for dynamic allocation (see http://bit.ly/dynports).
3939     */
3940    private int randomPort() {
3941      return MIN_RANDOM_PORT
3942          + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3943    }
3944
3945    interface AvailablePortChecker {
3946      boolean available(int port);
3947    }
3948  }
3949
3950  public static String randomMultiCastAddress() {
3951    return "226.1.1." + random.nextInt(254);
3952  }
3953
3954  public static void waitForHostPort(String host, int port)
3955      throws IOException {
3956    final int maxTimeMs = 10000;
3957    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3958    IOException savedException = null;
3959    LOG.info("Waiting for server at " + host + ":" + port);
3960    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3961      try {
3962        Socket sock = new Socket(InetAddress.getByName(host), port);
3963        sock.close();
3964        savedException = null;
3965        LOG.info("Server at " + host + ":" + port + " is available");
3966        break;
3967      } catch (UnknownHostException e) {
3968        throw new IOException("Failed to look up " + host, e);
3969      } catch (IOException e) {
3970        savedException = e;
3971      }
3972      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3973    }
3974
3975    if (savedException != null) {
3976      throw savedException;
3977    }
3978  }
3979
3980  /**
3981   * Creates a pre-split table for load testing. If the table already exists,
3982   * logs a warning and continues.
3983   * @return the number of regions the table was split into
3984   */
3985  public static int createPreSplitLoadTestTable(Configuration conf,
3986      TableName tableName, byte[] columnFamily, Algorithm compression,
3987      DataBlockEncoding dataBlockEncoding) throws IOException {
3988    return createPreSplitLoadTestTable(conf, tableName,
3989      columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3990      Durability.USE_DEFAULT);
3991  }
3992  /**
3993   * Creates a pre-split table for load testing. If the table already exists,
3994   * logs a warning and continues.
3995   * @return the number of regions the table was split into
3996   */
3997  public static int createPreSplitLoadTestTable(Configuration conf,
3998      TableName tableName, byte[] columnFamily, Algorithm compression,
3999      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
4000      Durability durability)
4001          throws IOException {
4002    HTableDescriptor desc = new HTableDescriptor(tableName);
4003    desc.setDurability(durability);
4004    desc.setRegionReplication(regionReplication);
4005    HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
4006    hcd.setDataBlockEncoding(dataBlockEncoding);
4007    hcd.setCompressionType(compression);
4008    return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
4009  }
4010
4011  /**
4012   * Creates a pre-split table for load testing. If the table already exists,
4013   * logs a warning and continues.
4014   * @return the number of regions the table was split into
4015   */
4016  public static int createPreSplitLoadTestTable(Configuration conf,
4017      TableName tableName, byte[][] columnFamilies, Algorithm compression,
4018      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
4019      Durability durability)
4020          throws IOException {
4021    HTableDescriptor desc = new HTableDescriptor(tableName);
4022    desc.setDurability(durability);
4023    desc.setRegionReplication(regionReplication);
4024    HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
4025    for (int i = 0; i < columnFamilies.length; i++) {
4026      HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
4027      hcd.setDataBlockEncoding(dataBlockEncoding);
4028      hcd.setCompressionType(compression);
4029      hcds[i] = hcd;
4030    }
4031    return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
4032  }
4033
4034  /**
4035   * Creates a pre-split table for load testing. If the table already exists,
4036   * logs a warning and continues.
4037   * @return the number of regions the table was split into
4038   */
4039  public static int createPreSplitLoadTestTable(Configuration conf,
4040      TableDescriptor desc, ColumnFamilyDescriptor hcd) throws IOException {
4041    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
4042  }
4043
4044  /**
4045   * Creates a pre-split table for load testing. If the table already exists,
4046   * logs a warning and continues.
4047   * @return the number of regions the table was split into
4048   */
4049  public static int createPreSplitLoadTestTable(Configuration conf,
4050      TableDescriptor desc, ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
4051    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] {hcd},
4052        numRegionsPerServer);
4053  }
4054
4055  /**
4056   * Creates a pre-split table for load testing. If the table already exists,
4057   * logs a warning and continues.
4058   * @return the number of regions the table was split into
4059   */
4060  public static int createPreSplitLoadTestTable(Configuration conf,
4061      TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
4062      int numRegionsPerServer) throws IOException {
4063    return createPreSplitLoadTestTable(conf, desc, hcds,
4064      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
4065  }
4066
4067  /**
4068   * Creates a pre-split table for load testing. If the table already exists,
4069   * logs a warning and continues.
4070   * @return the number of regions the table was split into
4071   */
4072  public static int createPreSplitLoadTestTable(Configuration conf,
4073      TableDescriptor td, ColumnFamilyDescriptor[] cds,
4074      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
4075    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
4076    for (ColumnFamilyDescriptor cd : cds) {
4077      if (!td.hasColumnFamily(cd.getName())) {
4078        builder.setColumnFamily(cd);
4079      }
4080    }
4081    td = builder.build();
4082    int totalNumberOfRegions = 0;
4083    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
4084    Admin admin = unmanagedConnection.getAdmin();
4085
4086    try {
4087      // create a table a pre-splits regions.
4088      // The number of splits is set as:
4089      //    region servers * regions per region server).
4090      int numberOfServers = admin.getRegionServers().size();
4091      if (numberOfServers == 0) {
4092        throw new IllegalStateException("No live regionservers");
4093      }
4094
4095      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
4096      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
4097          "pre-splitting table into " + totalNumberOfRegions + " regions " +
4098          "(regions per server: " + numRegionsPerServer + ")");
4099
4100      byte[][] splits = splitter.split(
4101          totalNumberOfRegions);
4102
4103      admin.createTable(td, splits);
4104    } catch (MasterNotRunningException e) {
4105      LOG.error("Master not running", e);
4106      throw new IOException(e);
4107    } catch (TableExistsException e) {
4108      LOG.warn("Table " + td.getTableName() +
4109          " already exists, continuing");
4110    } finally {
4111      admin.close();
4112      unmanagedConnection.close();
4113    }
4114    return totalNumberOfRegions;
4115  }
4116
4117  public static int getMetaRSPort(Connection connection) throws IOException {
4118    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
4119      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
4120    }
4121  }
4122
4123  /**
4124   *  Due to async racing issue, a region may not be in
4125   *  the online region list of a region server yet, after
4126   *  the assignment znode is deleted and the new assignment
4127   *  is recorded in master.
4128   */
4129  public void assertRegionOnServer(
4130      final RegionInfo hri, final ServerName server,
4131      final long timeout) throws IOException, InterruptedException {
4132    long timeoutTime = System.currentTimeMillis() + timeout;
4133    while (true) {
4134      List<RegionInfo> regions = getAdmin().getRegions(server);
4135      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
4136      long now = System.currentTimeMillis();
4137      if (now > timeoutTime) break;
4138      Thread.sleep(10);
4139    }
4140    fail("Could not find region " + hri.getRegionNameAsString()
4141      + " on server " + server);
4142  }
4143
4144  /**
4145   * Check to make sure the region is open on the specified
4146   * region server, but not on any other one.
4147   */
4148  public void assertRegionOnlyOnServer(
4149      final RegionInfo hri, final ServerName server,
4150      final long timeout) throws IOException, InterruptedException {
4151    long timeoutTime = System.currentTimeMillis() + timeout;
4152    while (true) {
4153      List<RegionInfo> regions = getAdmin().getRegions(server);
4154      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
4155        List<JVMClusterUtil.RegionServerThread> rsThreads =
4156          getHBaseCluster().getLiveRegionServerThreads();
4157        for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
4158          HRegionServer rs = rsThread.getRegionServer();
4159          if (server.equals(rs.getServerName())) {
4160            continue;
4161          }
4162          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
4163          for (HRegion r: hrs) {
4164            assertTrue("Region should not be double assigned",
4165              r.getRegionInfo().getRegionId() != hri.getRegionId());
4166          }
4167        }
4168        return; // good, we are happy
4169      }
4170      long now = System.currentTimeMillis();
4171      if (now > timeoutTime) break;
4172      Thread.sleep(10);
4173    }
4174    fail("Could not find region " + hri.getRegionNameAsString()
4175      + " on server " + server);
4176  }
4177
4178  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
4179    TableDescriptor td =
4180        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4181    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4182    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
4183  }
4184
4185  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
4186      BlockCache blockCache) throws IOException {
4187    TableDescriptor td =
4188        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
4189    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
4190    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
4191  }
4192
4193  public void setFileSystemURI(String fsURI) {
4194    FS_URI = fsURI;
4195  }
4196
4197  /**
4198   * Returns a {@link Predicate} for checking that there are no regions in transition in master
4199   */
4200  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
4201    return new ExplainingPredicate<IOException>() {
4202      @Override
4203      public String explainFailure() throws IOException {
4204        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
4205            .getAssignmentManager().getRegionStates();
4206        return "found in transition: " + regionStates.getRegionsInTransition().toString();
4207      }
4208
4209      @Override
4210      public boolean evaluate() throws IOException {
4211        HMaster master = getMiniHBaseCluster().getMaster();
4212        if (master == null) return false;
4213        AssignmentManager am = master.getAssignmentManager();
4214        if (am == null) return false;
4215        return !am.hasRegionsInTransition();
4216      }
4217    };
4218  }
4219
4220  /**
4221   * Returns a {@link Predicate} for checking that table is enabled
4222   */
4223  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4224    return new ExplainingPredicate<IOException>() {
4225      @Override
4226      public String explainFailure() throws IOException {
4227        return explainTableState(tableName, TableState.State.ENABLED);
4228      }
4229
4230      @Override
4231      public boolean evaluate() throws IOException {
4232        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
4233      }
4234    };
4235  }
4236
4237  /**
4238   * Returns a {@link Predicate} for checking that table is enabled
4239   */
4240  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4241    return new ExplainingPredicate<IOException>() {
4242      @Override
4243      public String explainFailure() throws IOException {
4244        return explainTableState(tableName, TableState.State.DISABLED);
4245      }
4246
4247      @Override
4248      public boolean evaluate() throws IOException {
4249        return getAdmin().isTableDisabled(tableName);
4250      }
4251    };
4252  }
4253
4254  /**
4255   * Returns a {@link Predicate} for checking that table is enabled
4256   */
4257  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4258    return new ExplainingPredicate<IOException>() {
4259      @Override
4260      public String explainFailure() throws IOException {
4261        return explainTableAvailability(tableName);
4262      }
4263
4264      @Override
4265      public boolean evaluate() throws IOException {
4266        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
4267        if (tableAvailable) {
4268          try (Table table = getConnection().getTable(tableName)) {
4269            TableDescriptor htd = table.getDescriptor();
4270            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
4271                .getAllRegionLocations()) {
4272              Scan scan = new Scan().withStartRow(loc.getRegionInfo().getStartKey())
4273                  .withStopRow(loc.getRegionInfo().getEndKey()).setOneRowLimit()
4274                  .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
4275              for (byte[] family : htd.getColumnFamilyNames()) {
4276                scan.addFamily(family);
4277              }
4278              try (ResultScanner scanner = table.getScanner(scan)) {
4279                scanner.next();
4280              }
4281            }
4282          }
4283        }
4284        return tableAvailable;
4285      }
4286    };
4287  }
4288
4289  /**
4290   * Wait until no regions in transition.
4291   * @param timeout How long to wait.
4292   * @throws IOException
4293   */
4294  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
4295    waitFor(timeout, predicateNoRegionsInTransition());
4296  }
4297
4298  /**
4299   * Wait until no regions in transition. (time limit 15min)
4300   * @throws IOException
4301   */
4302  public void waitUntilNoRegionsInTransition() throws IOException {
4303    waitUntilNoRegionsInTransition(15 * 60000);
4304  }
4305
4306  /**
4307   * Wait until labels is ready in VisibilityLabelsCache.
4308   * @param timeoutMillis
4309   * @param labels
4310   */
4311  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4312    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4313    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4314
4315      @Override
4316      public boolean evaluate() {
4317        for (String label : labels) {
4318          if (labelsCache.getLabelOrdinal(label) == 0) {
4319            return false;
4320          }
4321        }
4322        return true;
4323      }
4324
4325      @Override
4326      public String explainFailure() {
4327        for (String label : labels) {
4328          if (labelsCache.getLabelOrdinal(label) == 0) {
4329            return label + " is not available yet";
4330          }
4331        }
4332        return "";
4333      }
4334    });
4335  }
4336
4337  /**
4338   * Create a set of column descriptors with the combination of compression,
4339   * encoding, bloom codecs available.
4340   * @return the list of column descriptors
4341   */
4342  public static List<HColumnDescriptor> generateColumnDescriptors() {
4343    return generateColumnDescriptors("");
4344  }
4345
4346  /**
4347   * Create a set of column descriptors with the combination of compression,
4348   * encoding, bloom codecs available.
4349   * @param prefix family names prefix
4350   * @return the list of column descriptors
4351   */
4352  public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
4353    List<HColumnDescriptor> htds = new ArrayList<>();
4354    long familyId = 0;
4355    for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4356      for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4357        for (BloomType bloomType: BloomType.values()) {
4358          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4359          HColumnDescriptor htd = new HColumnDescriptor(name);
4360          htd.setCompressionType(compressionType);
4361          htd.setDataBlockEncoding(encodingType);
4362          htd.setBloomFilterType(bloomType);
4363          htds.add(htd);
4364          familyId++;
4365        }
4366      }
4367    }
4368    return htds;
4369  }
4370
4371  /**
4372   * Get supported compression algorithms.
4373   * @return supported compression algorithms.
4374   */
4375  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4376    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4377    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4378    for (String algoName : allAlgos) {
4379      try {
4380        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4381        algo.getCompressor();
4382        supportedAlgos.add(algo);
4383      } catch (Throwable t) {
4384        // this algo is not available
4385      }
4386    }
4387    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4388  }
4389
4390  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4391    Scan scan = new Scan(row);
4392    scan.setSmall(true);
4393    scan.setCaching(1);
4394    scan.setReversed(true);
4395    scan.addFamily(family);
4396    try (RegionScanner scanner = r.getScanner(scan)) {
4397      List<Cell> cells = new ArrayList<>(1);
4398      scanner.next(cells);
4399      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4400        return null;
4401      }
4402      return Result.create(cells);
4403    }
4404  }
4405
4406  private boolean isTargetTable(final byte[] inRow, Cell c) {
4407    String inputRowString = Bytes.toString(inRow);
4408    int i = inputRowString.indexOf(HConstants.DELIMITER);
4409    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4410    int o = outputRowString.indexOf(HConstants.DELIMITER);
4411    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4412  }
4413
4414  /**
4415   * Sets up {@link MiniKdc} for testing security.
4416   * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4417   * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4418   */
4419  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4420    Properties conf = MiniKdc.createConf();
4421    conf.put(MiniKdc.DEBUG, true);
4422    MiniKdc kdc = null;
4423    File dir = null;
4424    // There is time lag between selecting a port and trying to bind with it. It's possible that
4425    // another service captures the port in between which'll result in BindException.
4426    boolean bindException;
4427    int numTries = 0;
4428    do {
4429      try {
4430        bindException = false;
4431        dir = new File(getDataTestDir("kdc").toUri().getPath());
4432        kdc = new MiniKdc(conf, dir);
4433        kdc.start();
4434      } catch (BindException e) {
4435        FileUtils.deleteDirectory(dir);  // clean directory
4436        numTries++;
4437        if (numTries == 3) {
4438          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4439          throw e;
4440        }
4441        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4442        bindException = true;
4443      }
4444    } while (bindException);
4445    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4446    return kdc;
4447  }
4448
4449  public int getNumHFiles(final TableName tableName, final byte[] family) {
4450    int numHFiles = 0;
4451    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4452      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
4453                                    family);
4454    }
4455    return numHFiles;
4456  }
4457
4458  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4459                               final byte[] family) {
4460    int numHFiles = 0;
4461    for (Region region : rs.getRegions(tableName)) {
4462      numHFiles += region.getStore(family).getStorefilesCount();
4463    }
4464    return numHFiles;
4465  }
4466
4467  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4468    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
4469    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4470    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4471    assertEquals(ltdFamilies.size(), rtdFamilies.size());
4472    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), it2 =
4473         rtdFamilies.iterator(); it.hasNext();) {
4474      assertEquals(0,
4475          ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4476    }
4477  }
4478}