001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.io.UncheckedIOException;
024import java.lang.reflect.Field;
025import java.lang.reflect.Modifier;
026import java.net.BindException;
027import java.net.DatagramSocket;
028import java.net.InetAddress;
029import java.net.ServerSocket;
030import java.net.Socket;
031import java.net.UnknownHostException;
032import java.nio.charset.StandardCharsets;
033import java.security.MessageDigest;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Properties;
044import java.util.Random;
045import java.util.Set;
046import java.util.TreeSet;
047import java.util.concurrent.ThreadLocalRandom;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicReference;
050import java.util.function.BooleanSupplier;
051import org.apache.commons.io.FileUtils;
052import org.apache.commons.lang3.RandomStringUtils;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.fs.FileSystem;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
057import org.apache.hadoop.hbase.Waiter.Predicate;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.AsyncClusterConnection;
060import org.apache.hadoop.hbase.client.BufferedMutator;
061import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
064import org.apache.hadoop.hbase.client.Connection;
065import org.apache.hadoop.hbase.client.ConnectionFactory;
066import org.apache.hadoop.hbase.client.Consistency;
067import org.apache.hadoop.hbase.client.Delete;
068import org.apache.hadoop.hbase.client.Durability;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Hbck;
071import org.apache.hadoop.hbase.client.MasterRegistry;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.client.RegionLocator;
076import org.apache.hadoop.hbase.client.Result;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.Scan.ReadType;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.client.TableState;
084import org.apache.hadoop.hbase.fs.HFileSystem;
085import org.apache.hadoop.hbase.io.compress.Compression;
086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
088import org.apache.hadoop.hbase.io.hfile.BlockCache;
089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
090import org.apache.hadoop.hbase.io.hfile.HFile;
091import org.apache.hadoop.hbase.ipc.RpcServerInterface;
092import org.apache.hadoop.hbase.keymeta.KeymetaAdminClient;
093import org.apache.hadoop.hbase.logging.Log4jUtils;
094import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
095import org.apache.hadoop.hbase.master.HMaster;
096import org.apache.hadoop.hbase.master.RegionState;
097import org.apache.hadoop.hbase.master.ServerManager;
098import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
099import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
100import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
101import org.apache.hadoop.hbase.master.assignment.RegionStates;
102import org.apache.hadoop.hbase.mob.MobFileCache;
103import org.apache.hadoop.hbase.regionserver.BloomType;
104import org.apache.hadoop.hbase.regionserver.ChunkCreator;
105import org.apache.hadoop.hbase.regionserver.HRegion;
106import org.apache.hadoop.hbase.regionserver.HRegionServer;
107import org.apache.hadoop.hbase.regionserver.HStore;
108import org.apache.hadoop.hbase.regionserver.InternalScanner;
109import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
110import org.apache.hadoop.hbase.regionserver.Region;
111import org.apache.hadoop.hbase.regionserver.RegionScanner;
112import org.apache.hadoop.hbase.regionserver.RegionServerServices;
113import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
114import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
115import org.apache.hadoop.hbase.security.User;
116import org.apache.hadoop.hbase.security.UserProvider;
117import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
118import org.apache.hadoop.hbase.util.Bytes;
119import org.apache.hadoop.hbase.util.CommonFSUtils;
120import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
121import org.apache.hadoop.hbase.util.FSUtils;
122import org.apache.hadoop.hbase.util.JVMClusterUtil;
123import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
124import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
125import org.apache.hadoop.hbase.util.Pair;
126import org.apache.hadoop.hbase.util.ReflectionUtils;
127import org.apache.hadoop.hbase.util.RegionSplitter;
128import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
129import org.apache.hadoop.hbase.util.RetryCounter;
130import org.apache.hadoop.hbase.util.Threads;
131import org.apache.hadoop.hbase.wal.WAL;
132import org.apache.hadoop.hbase.wal.WALFactory;
133import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
134import org.apache.hadoop.hbase.zookeeper.ZKConfig;
135import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
136import org.apache.hadoop.hdfs.DFSClient;
137import org.apache.hadoop.hdfs.DFSConfigKeys;
138import org.apache.hadoop.hdfs.DistributedFileSystem;
139import org.apache.hadoop.hdfs.MiniDFSCluster;
140import org.apache.hadoop.hdfs.server.datanode.DataNode;
141import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
142import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
143import org.apache.hadoop.mapred.JobConf;
144import org.apache.hadoop.mapred.MiniMRCluster;
145import org.apache.hadoop.mapred.TaskLog;
146import org.apache.hadoop.metrics2.impl.JmxCacheBuster;
147import org.apache.hadoop.minikdc.MiniKdc;
148import org.apache.yetus.audience.InterfaceAudience;
149import org.apache.zookeeper.WatchedEvent;
150import org.apache.zookeeper.ZooKeeper;
151import org.apache.zookeeper.ZooKeeper.States;
152
153import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
154
155import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
156
157/**
158 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
159 * functionality. Create an instance and keep it around testing HBase. This class is meant to be
160 * your one-stop shop for anything you might need testing. Manages one cluster at a time only.
161 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type
162 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j
163 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set
164 * logging levels. In the configuration properties, default values for master-info-port and
165 * region-server-port are overridden such that a random port will be assigned (thus avoiding port
166 * contention if another local HBase instance is already running).
167 * <p>
168 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
169 * setting it to true.
170 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
171 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
172 */
173@InterfaceAudience.Public
174@Deprecated
175public class HBaseTestingUtility extends HBaseZKTestingUtility {
176
177  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
178  /**
179   * The default number of regions per regionserver when creating a pre-split table.
180   */
181  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
182
183  private MiniDFSCluster dfsCluster = null;
184  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
185
186  private volatile HBaseCluster hbaseCluster = null;
187  private MiniMRCluster mrCluster = null;
188
189  /** If there is a mini cluster running for this testing utility instance. */
190  private volatile boolean miniClusterRunning;
191
192  private String hadoopLogDir;
193
194  /**
195   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
196   */
197  private Path dataTestDirOnTestFS = null;
198
199  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
200
201  /** Filesystem URI used for map-reduce mini-cluster setup */
202  private static String FS_URI;
203
204  /** This is for unit tests parameterized with a single boolean. */
205  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
206
207  private Admin hbaseAdmin = null;
208
209  static {
210    // JmxCacheBuster may cause dead lock in test environment. As on master side, the table/region
211    // related metrics updating will finally lead to a meta access, so if meta is not online yet, we
212    // will block when updating while holding the metrics lock. But when we assign meta, there are
213    // bunch of places where we need to register a new metrics thus need to get the metrics lock,
214    // and then lead to a dead lock and cause the test to hang forever.
215    // The code is in hadoop so there is no easy way for us to fix, so here we just stop
216    // JmxCacheBuster to stabilize our tests first. See HBASE-30118 for more details and future
217    // plans.
218    JmxCacheBuster.stop();
219  }
220
221  /**
222   * Checks to see if a specific port is available.
223   * @param port the port number to check for availability
224   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
225   */
226  public static boolean available(int port) {
227    ServerSocket ss = null;
228    DatagramSocket ds = null;
229    try {
230      ss = new ServerSocket(port);
231      ss.setReuseAddress(true);
232      ds = new DatagramSocket(port);
233      ds.setReuseAddress(true);
234      return true;
235    } catch (IOException e) {
236      // Do nothing
237    } finally {
238      if (ds != null) {
239        ds.close();
240      }
241
242      if (ss != null) {
243        try {
244          ss.close();
245        } catch (IOException e) {
246          /* should not be thrown */
247        }
248      }
249    }
250
251    return false;
252  }
253
254  /**
255   * Create all combinations of Bloom filters and compression algorithms for testing.
256   */
257  private static List<Object[]> bloomAndCompressionCombinations() {
258    List<Object[]> configurations = new ArrayList<>();
259    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
260      for (BloomType bloomType : BloomType.values()) {
261        configurations.add(new Object[] { comprAlgo, bloomType });
262      }
263    }
264    return Collections.unmodifiableList(configurations);
265  }
266
267  /**
268   * Create combination of memstoreTS and tags
269   */
270  private static List<Object[]> memStoreTSAndTagsCombination() {
271    List<Object[]> configurations = new ArrayList<>();
272    configurations.add(new Object[] { false, false });
273    configurations.add(new Object[] { false, true });
274    configurations.add(new Object[] { true, false });
275    configurations.add(new Object[] { true, true });
276    return Collections.unmodifiableList(configurations);
277  }
278
279  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
280    List<Object[]> configurations = new ArrayList<>();
281    configurations.add(new Object[] { false, false, true });
282    configurations.add(new Object[] { false, false, false });
283    configurations.add(new Object[] { false, true, true });
284    configurations.add(new Object[] { false, true, false });
285    configurations.add(new Object[] { true, false, true });
286    configurations.add(new Object[] { true, false, false });
287    configurations.add(new Object[] { true, true, true });
288    configurations.add(new Object[] { true, true, false });
289    return Collections.unmodifiableList(configurations);
290  }
291
292  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
293    bloomAndCompressionCombinations();
294
295  /**
296   * <p>
297   * Create an HBaseTestingUtility using a default configuration.
298   * <p>
299   * Initially, all tmp files are written to a local test data directory. Once
300   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
301   * data will be written to the DFS directory instead.
302   */
303  public HBaseTestingUtility() {
304    this(HBaseConfiguration.create());
305  }
306
307  /**
308   * <p>
309   * Create an HBaseTestingUtility using a given configuration.
310   * <p>
311   * Initially, all tmp files are written to a local test data directory. Once
312   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
313   * data will be written to the DFS directory instead.
314   * @param conf The configuration to use for further operations
315   */
316  public HBaseTestingUtility(Configuration conf) {
317    super(conf);
318
319    // a hbase checksum verification failure will cause unit tests to fail
320    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
321
322    // Save this for when setting default file:// breaks things
323    if (this.conf.get("fs.defaultFS") != null) {
324      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
325    }
326    if (this.conf.get(HConstants.HBASE_DIR) != null) {
327      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
328    }
329    // Every cluster is a local cluster until we start DFS
330    // Note that conf could be null, but this.conf will not be
331    String dataTestDir = getDataTestDir().toString();
332    this.conf.set("fs.defaultFS", "file:///");
333    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
334    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
335    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
336    // If the value for random ports isn't set set it to true, thus making
337    // tests opt-out for random port assignment
338    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
339      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
340  }
341
342  /**
343   * Close both the region {@code r} and it's underlying WAL. For use in tests.
344   */
345  public static void closeRegionAndWAL(final Region r) throws IOException {
346    closeRegionAndWAL((HRegion) r);
347  }
348
349  /**
350   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
351   */
352  public static void closeRegionAndWAL(final HRegion r) throws IOException {
353    if (r == null) return;
354    r.close();
355    if (r.getWAL() == null) return;
356    r.getWAL().close();
357  }
358
359  /**
360   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
361   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
362   * by the Configuration. If say, a Connection was being used against a cluster that had been
363   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
364   * Rather than use the return direct, its usually best to make a copy and use that. Do
365   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
366   * @return Instance of Configuration.
367   */
368  @Override
369  public Configuration getConfiguration() {
370    return super.getConfiguration();
371  }
372
373  public void setHBaseCluster(HBaseCluster hbaseCluster) {
374    this.hbaseCluster = hbaseCluster;
375  }
376
377  /**
378   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
379   * have many concurrent tests running if we need to. Moding a System property is not the way to do
380   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
381   * not anything can do about it at moment; single instance only is how the minidfscluster works.
382   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
383   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
384   * (We do not create them!).
385   * @return The calculated data test build directory, if newly-created.
386   */
387  @Override
388  protected Path setupDataTestDir() {
389    Path testPath = super.setupDataTestDir();
390    if (null == testPath) {
391      return null;
392    }
393
394    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
395
396    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
397    // we want our own value to ensure uniqueness on the same machine
398    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
399
400    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
401    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
402    return testPath;
403  }
404
405  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
406
407    String sysValue = System.getProperty(propertyName);
408
409    if (sysValue != null) {
410      // There is already a value set. So we do nothing but hope
411      // that there will be no conflicts
412      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
413        + " so I do NOT create it in " + parent);
414      String confValue = conf.get(propertyName);
415      if (confValue != null && !confValue.endsWith(sysValue)) {
416        LOG.warn(propertyName + " property value differs in configuration and system: "
417          + "Configuration=" + confValue + " while System=" + sysValue
418          + " Erasing configuration value by system value.");
419      }
420      conf.set(propertyName, sysValue);
421    } else {
422      // Ok, it's not set, so we create it as a subdirectory
423      createSubDir(propertyName, parent, subDirName);
424      System.setProperty(propertyName, conf.get(propertyName));
425    }
426  }
427
428  /**
429   * @return Where to write test data on the test filesystem; Returns working directory for the test
430   *         filesystem by default
431   * @see #setupDataTestDirOnTestFS()
432   * @see #getTestFileSystem()
433   */
434  private Path getBaseTestDirOnTestFS() throws IOException {
435    FileSystem fs = getTestFileSystem();
436    return new Path(fs.getWorkingDirectory(), "test-data");
437  }
438
439  /**
440   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
441   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
442   * on it.
443   * @return a unique path in the test filesystem
444   */
445  public Path getDataTestDirOnTestFS() throws IOException {
446    if (dataTestDirOnTestFS == null) {
447      setupDataTestDirOnTestFS();
448    }
449
450    return dataTestDirOnTestFS;
451  }
452
453  /**
454   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
455   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
456   * on it.
457   * @return a unique path in the test filesystem
458   * @param subdirName name of the subdir to create under the base test dir
459   */
460  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
461    return new Path(getDataTestDirOnTestFS(), subdirName);
462  }
463
464  /**
465   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
466   * setup.
467   */
468  private void setupDataTestDirOnTestFS() throws IOException {
469    if (dataTestDirOnTestFS != null) {
470      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
471      return;
472    }
473    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
474  }
475
476  /**
477   * Sets up a new path in test filesystem to be used by tests.
478   */
479  private Path getNewDataTestDirOnTestFS() throws IOException {
480    // The file system can be either local, mini dfs, or if the configuration
481    // is supplied externally, it can be an external cluster FS. If it is a local
482    // file system, the tests should use getBaseTestDir, otherwise, we can use
483    // the working directory, and create a unique sub dir there
484    FileSystem fs = getTestFileSystem();
485    Path newDataTestDir;
486    String randomStr = getRandomUUID().toString();
487    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
488      newDataTestDir = new Path(getDataTestDir(), randomStr);
489      File dataTestDir = new File(newDataTestDir.toString());
490      if (deleteOnExit()) dataTestDir.deleteOnExit();
491    } else {
492      Path base = getBaseTestDirOnTestFS();
493      newDataTestDir = new Path(base, randomStr);
494      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
495    }
496    return newDataTestDir;
497  }
498
499  /**
500   * Cleans the test data directory on the test filesystem.
501   * @return True if we removed the test dirs
502   */
503  public boolean cleanupDataTestDirOnTestFS() throws IOException {
504    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
505    if (ret) dataTestDirOnTestFS = null;
506    return ret;
507  }
508
509  /**
510   * Cleans a subdirectory under the test data directory on the test filesystem.
511   * @return True if we removed child
512   */
513  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
514    Path cpath = getDataTestDirOnTestFS(subdirName);
515    return getTestFileSystem().delete(cpath, true);
516  }
517
518  // Workaround to avoid IllegalThreadStateException
519  // See HBASE-27148 for more details
520  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
521
522    private volatile boolean stopped = false;
523
524    private final MiniDFSCluster cluster;
525
526    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
527      super("FsDatasetAsyncDiskServiceFixer");
528      setDaemon(true);
529      this.cluster = cluster;
530    }
531
532    @Override
533    public void run() {
534      while (!stopped) {
535        try {
536          Thread.sleep(30000);
537        } catch (InterruptedException e) {
538          Thread.currentThread().interrupt();
539          continue;
540        }
541        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
542        // timeout of the thread pool executor is 60 seconds by default.
543        try {
544          for (DataNode dn : cluster.getDataNodes()) {
545            FsDatasetSpi<?> dataset = dn.getFSDataset();
546            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
547            service.setAccessible(true);
548            Object asyncDiskService = service.get(dataset);
549            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
550            group.setAccessible(true);
551            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
552            if (threadGroup.isDaemon()) {
553              threadGroup.setDaemon(false);
554            }
555          }
556        } catch (NoSuchFieldException e) {
557          LOG.debug("NoSuchFieldException: " + e.getMessage()
558            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
559            + "See HBASE-27595 for details.");
560        } catch (Exception e) {
561          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
562        }
563      }
564    }
565
566    void shutdown() {
567      stopped = true;
568      interrupt();
569    }
570  }
571
572  /**
573   * Start a minidfscluster.
574   * @param servers How many DNs to start.
575   * @see #shutdownMiniDFSCluster()
576   * @return The mini dfs cluster created.
577   */
578  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
579    return startMiniDFSCluster(servers, null);
580  }
581
582  /**
583   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
584   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
585   * instances of the datanodes will have the same host name.
586   * @param hosts hostnames DNs to run on.
587   * @see #shutdownMiniDFSCluster()
588   * @return The mini dfs cluster created.
589   */
590  public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception {
591    if (hosts != null && hosts.length != 0) {
592      return startMiniDFSCluster(hosts.length, hosts);
593    } else {
594      return startMiniDFSCluster(1, null);
595    }
596  }
597
598  /**
599   * Start a minidfscluster. Can only create one.
600   * @param servers How many DNs to start.
601   * @param hosts   hostnames DNs to run on.
602   * @see #shutdownMiniDFSCluster()
603   * @return The mini dfs cluster created.
604   */
605  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception {
606    return startMiniDFSCluster(servers, null, hosts);
607  }
608
609  private void setFs() throws IOException {
610    if (this.dfsCluster == null) {
611      LOG.info("Skipping setting fs because dfsCluster is null");
612      return;
613    }
614    FileSystem fs = this.dfsCluster.getFileSystem();
615    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
616
617    // re-enable this check with dfs
618    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
619  }
620
621  public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
622    throws Exception {
623    createDirsAndSetProperties();
624    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
625
626    this.dfsCluster =
627      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
628    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
629    this.dfsClusterFixer.start();
630    // Set this just-started cluster as our filesystem.
631    setFs();
632
633    // Wait for the cluster to be totally up
634    this.dfsCluster.waitClusterUp();
635
636    // reset the test directory for test file system
637    dataTestDirOnTestFS = null;
638    String dataTestDir = getDataTestDir().toString();
639    conf.set(HConstants.HBASE_DIR, dataTestDir);
640    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
641
642    return this.dfsCluster;
643  }
644
645  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
646    createDirsAndSetProperties();
647    dfsCluster =
648      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
649    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
650    this.dfsClusterFixer.start();
651    return dfsCluster;
652  }
653
654  /**
655   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
656   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
657   * the conf.
658   *
659   * <pre>
660   * Configuration conf = TEST_UTIL.getConfiguration();
661   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
662   *   Map.Entry&lt;String, String&gt; e = i.next();
663   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
664   * }
665   * </pre>
666   */
667  private void createDirsAndSetProperties() throws IOException {
668    setupClusterTestDir();
669    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
670    createDirAndSetProperty("test.cache.data");
671    createDirAndSetProperty("hadoop.tmp.dir");
672    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
673    createDirAndSetProperty("mapreduce.cluster.local.dir");
674    createDirAndSetProperty("mapreduce.cluster.temp.dir");
675    enableShortCircuit();
676
677    Path root = getDataTestDirOnTestFS("hadoop");
678    conf.set(MapreduceTestingShim.getMROutputDirProp(),
679      new Path(root, "mapred-output-dir").toString());
680    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
681    conf.set("mapreduce.jobtracker.staging.root.dir",
682      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
683    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
684    conf.set("yarn.app.mapreduce.am.staging-dir",
685      new Path(root, "mapreduce-am-staging-root-dir").toString());
686
687    // Frustrate yarn's and hdfs's attempts at writing /tmp.
688    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
689    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
690    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
691    createDirAndSetProperty("yarn.nodemanager.log-dirs");
692    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
693    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
694    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
695    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
696    createDirAndSetProperty("dfs.journalnode.edits.dir");
697    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
698    createDirAndSetProperty("nfs.dump.dir");
699    createDirAndSetProperty("java.io.tmpdir");
700    createDirAndSetProperty("dfs.journalnode.edits.dir");
701    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
702    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
703
704    // disable metrics logger since it depend on commons-logging internal classes and we do not want
705    // commons-logging on our classpath
706    conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
707    conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
708  }
709
710  /**
711   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
712   * Default to false.
713   */
714  public boolean isNewVersionBehaviorEnabled() {
715    final String propName = "hbase.tests.new.version.behavior";
716    String v = System.getProperty(propName);
717    if (v != null) {
718      return Boolean.parseBoolean(v);
719    }
720    return false;
721  }
722
723  /**
724   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
725   * allows to specify this parameter on the command line. If not set, default is true.
726   */
727  public boolean isReadShortCircuitOn() {
728    final String propName = "hbase.tests.use.shortcircuit.reads";
729    String readOnProp = System.getProperty(propName);
730    if (readOnProp != null) {
731      return Boolean.parseBoolean(readOnProp);
732    } else {
733      return conf.getBoolean(propName, false);
734    }
735  }
736
737  /**
738   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
739   * including skipping the hdfs checksum checks.
740   */
741  private void enableShortCircuit() {
742    if (isReadShortCircuitOn()) {
743      String curUser = System.getProperty("user.name");
744      LOG.info("read short circuit is ON for user " + curUser);
745      // read short circuit, for hdfs
746      conf.set("dfs.block.local-path-access.user", curUser);
747      // read short circuit, for hbase
748      conf.setBoolean("dfs.client.read.shortcircuit", true);
749      // Skip checking checksum, for the hdfs client and the datanode
750      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
751    } else {
752      LOG.info("read short circuit is OFF");
753    }
754  }
755
756  private String createDirAndSetProperty(final String property) {
757    return createDirAndSetProperty(property, property);
758  }
759
760  private String createDirAndSetProperty(final String relPath, String property) {
761    String path = getDataTestDir(relPath).toString();
762    System.setProperty(property, path);
763    conf.set(property, path);
764    new File(path).mkdirs();
765    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
766    return path;
767  }
768
769  /**
770   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
771   */
772  public void shutdownMiniDFSCluster() throws IOException {
773    if (this.dfsCluster != null) {
774      // The below throws an exception per dn, AsynchronousCloseException.
775      this.dfsCluster.shutdown();
776      dfsCluster = null;
777      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
778      // have a fixer
779      if (dfsClusterFixer != null) {
780        this.dfsClusterFixer.shutdown();
781        dfsClusterFixer = null;
782      }
783      dataTestDirOnTestFS = null;
784      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
785    }
786  }
787
788  /**
789   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
790   * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
791   * @param createWALDir Whether to create a new WAL directory.
792   * @return The mini HBase cluster created.
793   * @see #shutdownMiniCluster()
794   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
795   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
796   * @see #startMiniCluster(StartMiniClusterOption)
797   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
798   */
799  @Deprecated
800  public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception {
801    StartMiniClusterOption option =
802      StartMiniClusterOption.builder().createWALDir(createWALDir).build();
803    return startMiniCluster(option);
804  }
805
806  /**
807   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
808   * defined in {@link StartMiniClusterOption.Builder}.
809   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
810   * @param createRootDir Whether to create a new root or data directory path.
811   * @return The mini HBase cluster created.
812   * @see #shutdownMiniCluster()
813   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
814   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
815   * @see #startMiniCluster(StartMiniClusterOption)
816   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
817   */
818  @Deprecated
819  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception {
820    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
821      .numDataNodes(numSlaves).createRootDir(createRootDir).build();
822    return startMiniCluster(option);
823  }
824
825  /**
826   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
827   * defined in {@link StartMiniClusterOption.Builder}.
828   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
829   * @param createRootDir Whether to create a new root or data directory path.
830   * @param createWALDir  Whether to create a new WAL directory.
831   * @return The mini HBase cluster created.
832   * @see #shutdownMiniCluster()
833   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
834   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
835   * @see #startMiniCluster(StartMiniClusterOption)
836   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
837   */
838  @Deprecated
839  public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir,
840    boolean createWALDir) throws Exception {
841    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves)
842      .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build();
843    return startMiniCluster(option);
844  }
845
846  /**
847   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
848   * defined in {@link StartMiniClusterOption.Builder}.
849   * @param numMasters    Master node number.
850   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
851   * @param createRootDir Whether to create a new root or data directory path.
852   * @return The mini HBase cluster created.
853   * @see #shutdownMiniCluster()
854   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
855   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
856   * @see #startMiniCluster(StartMiniClusterOption)
857   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
858   */
859  @Deprecated
860  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir)
861    throws Exception {
862    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
863      .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build();
864    return startMiniCluster(option);
865  }
866
867  /**
868   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
869   * defined in {@link StartMiniClusterOption.Builder}.
870   * @param numMasters Master node number.
871   * @param numSlaves  Slave node number, for both HBase region server and HDFS data node.
872   * @return The mini HBase cluster created.
873   * @see #shutdownMiniCluster()
874   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
875   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
876   * @see #startMiniCluster(StartMiniClusterOption)
877   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
878   */
879  @Deprecated
880  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception {
881    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
882      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
883    return startMiniCluster(option);
884  }
885
886  /**
887   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
888   * defined in {@link StartMiniClusterOption.Builder}.
889   * @param numMasters    Master node number.
890   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
891   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
892   *                      HDFS data node number.
893   * @param createRootDir Whether to create a new root or data directory path.
894   * @return The mini HBase cluster created.
895   * @see #shutdownMiniCluster()
896   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
897   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
898   * @see #startMiniCluster(StartMiniClusterOption)
899   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
900   */
901  @Deprecated
902  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
903    boolean createRootDir) throws Exception {
904    StartMiniClusterOption option =
905      StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves)
906        .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
907    return startMiniCluster(option);
908  }
909
910  /**
911   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
912   * defined in {@link StartMiniClusterOption.Builder}.
913   * @param numMasters    Master node number.
914   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
915   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
916   *                      HDFS data node number.
917   * @return The mini HBase cluster created.
918   * @see #shutdownMiniCluster()
919   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
920   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
921   * @see #startMiniCluster(StartMiniClusterOption)
922   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
923   */
924  @Deprecated
925  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts)
926    throws Exception {
927    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
928      .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build();
929    return startMiniCluster(option);
930  }
931
932  /**
933   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
934   * defined in {@link StartMiniClusterOption.Builder}.
935   * @param numMasters       Master node number.
936   * @param numRegionServers Number of region servers.
937   * @param numDataNodes     Number of datanodes.
938   * @return The mini HBase cluster created.
939   * @see #shutdownMiniCluster()
940   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
941   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
942   * @see #startMiniCluster(StartMiniClusterOption)
943   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
944   */
945  @Deprecated
946  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes)
947    throws Exception {
948    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
949      .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build();
950    return startMiniCluster(option);
951  }
952
953  /**
954   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
955   * defined in {@link StartMiniClusterOption.Builder}.
956   * @param numMasters    Master node number.
957   * @param numSlaves     Slave node number, for both HBase region server and HDFS data node.
958   * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite
959   *                      HDFS data node number.
960   * @param masterClass   The class to use as HMaster, or null for default.
961   * @param rsClass       The class to use as HRegionServer, or null for default.
962   * @return The mini HBase cluster created.
963   * @see #shutdownMiniCluster()
964   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
965   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
966   * @see #startMiniCluster(StartMiniClusterOption)
967   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
968   */
969  @Deprecated
970  public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts,
971    Class<? extends HMaster> masterClass,
972    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
973    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
974      .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves)
975      .dataNodeHosts(dataNodeHosts).build();
976    return startMiniCluster(option);
977  }
978
979  /**
980   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
981   * defined in {@link StartMiniClusterOption.Builder}.
982   * @param numMasters       Master node number.
983   * @param numRegionServers Number of region servers.
984   * @param numDataNodes     Number of datanodes.
985   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
986   *                         overwrite HDFS data node number.
987   * @param masterClass      The class to use as HMaster, or null for default.
988   * @param rsClass          The class to use as HRegionServer, or null for default.
989   * @return The mini HBase cluster created.
990   * @see #shutdownMiniCluster()
991   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
992   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
993   * @see #startMiniCluster(StartMiniClusterOption)
994   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
995   */
996  @Deprecated
997  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
998    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
999    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception {
1000    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1001      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
1002      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build();
1003    return startMiniCluster(option);
1004  }
1005
1006  /**
1007   * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values,
1008   * defined in {@link StartMiniClusterOption.Builder}.
1009   * @param numMasters       Master node number.
1010   * @param numRegionServers Number of region servers.
1011   * @param numDataNodes     Number of datanodes.
1012   * @param dataNodeHosts    The hostnames of DataNodes to run on. If not null, its size will
1013   *                         overwrite HDFS data node number.
1014   * @param masterClass      The class to use as HMaster, or null for default.
1015   * @param rsClass          The class to use as HRegionServer, or null for default.
1016   * @param createRootDir    Whether to create a new root or data directory path.
1017   * @param createWALDir     Whether to create a new WAL directory.
1018   * @return The mini HBase cluster created.
1019   * @see #shutdownMiniCluster()
1020   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1021   *             {@link #startMiniCluster(StartMiniClusterOption)} instead.
1022   * @see #startMiniCluster(StartMiniClusterOption)
1023   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1024   */
1025  @Deprecated
1026  public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes,
1027    String[] dataNodeHosts, Class<? extends HMaster> masterClass,
1028    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1029    boolean createWALDir) throws Exception {
1030    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1031      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass)
1032      .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir)
1033      .createWALDir(createWALDir).build();
1034    return startMiniCluster(option);
1035  }
1036
1037  /**
1038   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
1039   * other options will use default values, defined in {@link StartMiniClusterOption.Builder}.
1040   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
1041   * @see #startMiniCluster(StartMiniClusterOption option)
1042   * @see #shutdownMiniDFSCluster()
1043   */
1044  public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception {
1045    StartMiniClusterOption option =
1046      StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build();
1047    return startMiniCluster(option);
1048  }
1049
1050  /**
1051   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
1052   * value can be found in {@link StartMiniClusterOption.Builder}.
1053   * @see #startMiniCluster(StartMiniClusterOption option)
1054   * @see #shutdownMiniDFSCluster()
1055   */
1056  public MiniHBaseCluster startMiniCluster() throws Exception {
1057    return startMiniCluster(StartMiniClusterOption.builder().build());
1058  }
1059
1060  /**
1061   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
1062   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
1063   * under System property test.build.data, to be cleaned up on exit.
1064   * @see #shutdownMiniDFSCluster()
1065   */
1066  public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception {
1067    LOG.info("Starting up minicluster with option: {}", option);
1068
1069    // If we already put up a cluster, fail.
1070    if (miniClusterRunning) {
1071      throw new IllegalStateException("A mini-cluster is already running");
1072    }
1073    miniClusterRunning = true;
1074
1075    setupClusterTestDir();
1076
1077    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1078    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1079    if (dfsCluster == null) {
1080      LOG.info("STARTING DFS");
1081      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
1082    } else {
1083      LOG.info("NOT STARTING DFS");
1084    }
1085
1086    // Start up a zk cluster.
1087    if (getZkCluster() == null) {
1088      startMiniZKCluster(option.getNumZkServers());
1089    }
1090
1091    // Start the MiniHBaseCluster
1092    return startMiniHBaseCluster(option);
1093  }
1094
1095  /**
1096   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1097   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
1098   * @return Reference to the hbase mini hbase cluster.
1099   * @see #startMiniCluster(StartMiniClusterOption)
1100   * @see #shutdownMiniHBaseCluster()
1101   */
1102  public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
1103    throws IOException, InterruptedException {
1104    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
1105    createRootDir(option.isCreateRootDir());
1106    if (option.isCreateWALDir()) {
1107      createWALRootDir();
1108    }
1109    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1110    // for tests that do not read hbase-defaults.xml
1111    setHBaseFsTmpDir();
1112
1113    // These settings will make the server waits until this exact number of
1114    // regions servers are connected.
1115    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1116      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
1117    }
1118    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1119      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
1120    }
1121
1122    Configuration c = new Configuration(this.conf);
1123    this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
1124      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1125      option.getMasterClass(), option.getRsClass());
1126    // Populate the master address configuration from mini cluster configuration.
1127    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
1128    // Don't leave here till we've done a successful scan of the hbase:meta
1129    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1130      ResultScanner s = t.getScanner(new Scan())) {
1131      for (;;) {
1132        if (s.next() == null) {
1133          break;
1134        }
1135      }
1136    }
1137
1138    getAdmin(); // create immediately the hbaseAdmin
1139    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
1140
1141    return (MiniHBaseCluster) hbaseCluster;
1142  }
1143
1144  /**
1145   * Starts up mini hbase cluster using default options. Default options can be found in
1146   * {@link StartMiniClusterOption.Builder}.
1147   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1148   * @see #shutdownMiniHBaseCluster()
1149   */
1150  public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException {
1151    return startMiniHBaseCluster(StartMiniClusterOption.builder().build());
1152  }
1153
1154  /**
1155   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1156   * {@link #startMiniCluster()}. All other options will use default values, defined in
1157   * {@link StartMiniClusterOption.Builder}.
1158   * @param numMasters       Master node number.
1159   * @param numRegionServers Number of region servers.
1160   * @return The mini HBase cluster created.
1161   * @see #shutdownMiniHBaseCluster()
1162   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1163   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1164   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1165   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1166   */
1167  @Deprecated
1168  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
1169    throws IOException, InterruptedException {
1170    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1171      .numRegionServers(numRegionServers).build();
1172    return startMiniHBaseCluster(option);
1173  }
1174
1175  /**
1176   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1177   * {@link #startMiniCluster()}. All other options will use default values, defined in
1178   * {@link StartMiniClusterOption.Builder}.
1179   * @param numMasters       Master node number.
1180   * @param numRegionServers Number of region servers.
1181   * @param rsPorts          Ports that RegionServer should use.
1182   * @return The mini HBase cluster created.
1183   * @see #shutdownMiniHBaseCluster()
1184   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1185   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1186   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1187   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1188   */
1189  @Deprecated
1190  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1191    List<Integer> rsPorts) throws IOException, InterruptedException {
1192    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1193      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
1194    return startMiniHBaseCluster(option);
1195  }
1196
1197  /**
1198   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
1199   * {@link #startMiniCluster()}. All other options will use default values, defined in
1200   * {@link StartMiniClusterOption.Builder}.
1201   * @param numMasters       Master node number.
1202   * @param numRegionServers Number of region servers.
1203   * @param rsPorts          Ports that RegionServer should use.
1204   * @param masterClass      The class to use as HMaster, or null for default.
1205   * @param rsClass          The class to use as HRegionServer, or null for default.
1206   * @param createRootDir    Whether to create a new root or data directory path.
1207   * @param createWALDir     Whether to create a new WAL directory.
1208   * @return The mini HBase cluster created.
1209   * @see #shutdownMiniHBaseCluster()
1210   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1211   *             {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead.
1212   * @see #startMiniHBaseCluster(StartMiniClusterOption)
1213   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
1214   */
1215  @Deprecated
1216  public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
1217    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1218    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir,
1219    boolean createWALDir) throws IOException, InterruptedException {
1220    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters)
1221      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1222      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1223    return startMiniHBaseCluster(option);
1224  }
1225
1226  /**
1227   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1228   * want to keep dfs/zk up and just stop/start hbase.
1229   * @param servers number of region servers
1230   */
1231  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1232    this.restartHBaseCluster(servers, null);
1233  }
1234
1235  public void restartHBaseCluster(int servers, List<Integer> ports)
1236    throws IOException, InterruptedException {
1237    StartMiniClusterOption option =
1238      StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1239    restartHBaseCluster(option);
1240    invalidateConnection();
1241  }
1242
1243  public void restartHBaseCluster(StartMiniClusterOption option)
1244    throws IOException, InterruptedException {
1245    closeConnection();
1246    this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(),
1247      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1248      option.getMasterClass(), option.getRsClass());
1249    // Don't leave here till we've done a successful scan of the hbase:meta
1250    Connection conn = ConnectionFactory.createConnection(this.conf);
1251    Table t = conn.getTable(TableName.META_TABLE_NAME);
1252    ResultScanner s = t.getScanner(new Scan());
1253    while (s.next() != null) {
1254      // do nothing
1255    }
1256    LOG.info("HBase has been restarted");
1257    s.close();
1258    t.close();
1259    conn.close();
1260  }
1261
1262  /**
1263   * @return Current mini hbase cluster. Only has something in it after a call to
1264   *         {@link #startMiniCluster()}.
1265   * @see #startMiniCluster()
1266   */
1267  public MiniHBaseCluster getMiniHBaseCluster() {
1268    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1269      return (MiniHBaseCluster) this.hbaseCluster;
1270    }
1271    throw new RuntimeException(
1272      hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName());
1273  }
1274
1275  /**
1276   * Stops mini hbase, zk, and hdfs clusters.
1277   * @see #startMiniCluster(int)
1278   */
1279  public void shutdownMiniCluster() throws IOException {
1280    LOG.info("Shutting down minicluster");
1281    shutdownMiniHBaseCluster();
1282    shutdownMiniDFSCluster();
1283    shutdownMiniZKCluster();
1284
1285    cleanupTestDir();
1286    miniClusterRunning = false;
1287    LOG.info("Minicluster is down");
1288  }
1289
1290  /**
1291   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1292   * @throws java.io.IOException in case command is unsuccessful
1293   */
1294  public void shutdownMiniHBaseCluster() throws IOException {
1295    cleanup();
1296    if (this.hbaseCluster != null) {
1297      this.hbaseCluster.shutdown();
1298      // Wait till hbase is down before going on to shutdown zk.
1299      this.hbaseCluster.waitUntilShutDown();
1300      this.hbaseCluster = null;
1301    }
1302    if (zooKeeperWatcher != null) {
1303      zooKeeperWatcher.close();
1304      zooKeeperWatcher = null;
1305    }
1306  }
1307
1308  /**
1309   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1310   * @throws java.io.IOException throws in case command is unsuccessful
1311   */
1312  public void killMiniHBaseCluster() throws IOException {
1313    cleanup();
1314    if (this.hbaseCluster != null) {
1315      getMiniHBaseCluster().killAll();
1316      this.hbaseCluster = null;
1317    }
1318    if (zooKeeperWatcher != null) {
1319      zooKeeperWatcher.close();
1320      zooKeeperWatcher = null;
1321    }
1322  }
1323
1324  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1325  private void cleanup() throws IOException {
1326    closeConnection();
1327    // unset the configuration for MIN and MAX RS to start
1328    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1329    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1330  }
1331
1332  /**
1333   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1334   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1335   * If false, previous path is used. Note: this does not cause the root dir to be created.
1336   * @return Fully qualified path for the default hbase root dir
1337   */
1338  public Path getDefaultRootDirPath(boolean create) throws IOException {
1339    if (!create) {
1340      return getDataTestDirOnTestFS();
1341    } else {
1342      return getNewDataTestDirOnTestFS();
1343    }
1344  }
1345
1346  /**
1347   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that
1348   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1349   * @return Fully qualified path for the default hbase root dir
1350   */
1351  public Path getDefaultRootDirPath() throws IOException {
1352    return getDefaultRootDirPath(false);
1353  }
1354
1355  /**
1356   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1357   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1358   * startup. You'd only use this method if you were doing manual operation.
1359   * @param create This flag decides whether to get a new root or data directory path or not, if it
1360   *               has been fetched already. Note : Directory will be made irrespective of whether
1361   *               path has been fetched or not. If directory already exists, it will be overwritten
1362   * @return Fully qualified path to hbase root dir
1363   */
1364  public Path createRootDir(boolean create) throws IOException {
1365    FileSystem fs = FileSystem.get(this.conf);
1366    Path hbaseRootdir = getDefaultRootDirPath(create);
1367    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1368    fs.mkdirs(hbaseRootdir);
1369    FSUtils.setVersion(fs, hbaseRootdir);
1370    return hbaseRootdir;
1371  }
1372
1373  /**
1374   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that
1375   * <code>create</code> flag is false.
1376   * @return Fully qualified path to hbase root dir
1377   */
1378  public Path createRootDir() throws IOException {
1379    return createRootDir(false);
1380  }
1381
1382  /**
1383   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1384   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1385   * this method if you were doing manual operation.
1386   * @return Fully qualified path to hbase root dir
1387   */
1388  public Path createWALRootDir() throws IOException {
1389    FileSystem fs = FileSystem.get(this.conf);
1390    Path walDir = getNewDataTestDirOnTestFS();
1391    CommonFSUtils.setWALRootDir(this.conf, walDir);
1392    fs.mkdirs(walDir);
1393    return walDir;
1394  }
1395
1396  private void setHBaseFsTmpDir() throws IOException {
1397    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1398    if (hbaseFsTmpDirInString == null) {
1399      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1400      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1401    } else {
1402      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1403    }
1404  }
1405
1406  /**
1407   * Flushes all caches in the mini hbase cluster
1408   */
1409  public void flush() throws IOException {
1410    getMiniHBaseCluster().flushcache();
1411  }
1412
1413  /**
1414   * Flushes all caches in the mini hbase cluster
1415   */
1416  public void flush(TableName tableName) throws IOException {
1417    getMiniHBaseCluster().flushcache(tableName);
1418  }
1419
1420  /**
1421   * Compact all regions in the mini hbase cluster
1422   */
1423  public void compact(boolean major) throws IOException {
1424    getMiniHBaseCluster().compact(major);
1425  }
1426
1427  /**
1428   * Compact all of a table's reagion in the mini hbase cluster
1429   */
1430  public void compact(TableName tableName, boolean major) throws IOException {
1431    getMiniHBaseCluster().compact(tableName, major);
1432  }
1433
1434  /**
1435   * Create a table.
1436   * @return A Table instance for the created table.
1437   */
1438  public Table createTable(TableName tableName, String family) throws IOException {
1439    return createTable(tableName, new String[] { family });
1440  }
1441
1442  /**
1443   * Create a table.
1444   * @return A Table instance for the created table.
1445   */
1446  public Table createTable(TableName tableName, String[] families) throws IOException {
1447    List<byte[]> fams = new ArrayList<>(families.length);
1448    for (String family : families) {
1449      fams.add(Bytes.toBytes(family));
1450    }
1451    return createTable(tableName, fams.toArray(new byte[0][]));
1452  }
1453
1454  /**
1455   * Create a table.
1456   * @return A Table instance for the created table.
1457   */
1458  public Table createTable(TableName tableName, byte[] family) throws IOException {
1459    return createTable(tableName, new byte[][] { family });
1460  }
1461
1462  /**
1463   * Create a table with multiple regions.
1464   * @return A Table instance for the created table.
1465   */
1466  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1467    throws IOException {
1468    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1469    byte[] startKey = Bytes.toBytes("aaaaa");
1470    byte[] endKey = Bytes.toBytes("zzzzz");
1471    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1472
1473    return createTable(tableName, new byte[][] { family }, splitKeys);
1474  }
1475
1476  /**
1477   * Create a table.
1478   * @return A Table instance for the created table.
1479   */
1480  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1481    return createTable(tableName, families, (byte[][]) null);
1482  }
1483
1484  /**
1485   * Create a table with multiple regions.
1486   * @return A Table instance for the created table.
1487   */
1488  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1489    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1490  }
1491
1492  /**
1493   * Create a table with multiple regions.
1494   * @param replicaCount replica count.
1495   * @return A Table instance for the created table.
1496   */
1497  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1498    throws IOException {
1499    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1500  }
1501
1502  /**
1503   * Create a table.
1504   * @return A Table instance for the created table.
1505   */
1506  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1507    throws IOException {
1508    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1509  }
1510
1511  /**
1512   * Create a table.
1513   * @param tableName    the table name
1514   * @param families     the families
1515   * @param splitKeys    the splitkeys
1516   * @param replicaCount the region replica count
1517   * @return A Table instance for the created table.
1518   * @throws IOException throws IOException
1519   */
1520  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1521    int replicaCount) throws IOException {
1522    return createTable(tableName, families, splitKeys, replicaCount,
1523      new Configuration(getConfiguration()));
1524  }
1525
1526  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1527    byte[] endKey, int numRegions) throws IOException {
1528    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1529
1530    getAdmin().createTable(desc, startKey, endKey, numRegions);
1531    // HBaseAdmin only waits for regions to appear in hbase:meta we
1532    // should wait until they are assigned
1533    waitUntilAllRegionsAssigned(tableName);
1534    return getConnection().getTable(tableName);
1535  }
1536
1537  /**
1538   * Create a table.
1539   * @param c Configuration to use
1540   * @return A Table instance for the created table.
1541   */
1542  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1543    throws IOException {
1544    return createTable(htd, families, null, c);
1545  }
1546
1547  /**
1548   * Create a table.
1549   * @param htd       table descriptor
1550   * @param families  array of column families
1551   * @param splitKeys array of split keys
1552   * @param c         Configuration to use
1553   * @return A Table instance for the created table.
1554   * @throws IOException if getAdmin or createTable fails
1555   */
1556  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1557    Configuration c) throws IOException {
1558    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1559    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1560    // on is interfering.
1561    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1562  }
1563
1564  /**
1565   * Create a table.
1566   * @param htd       table descriptor
1567   * @param families  array of column families
1568   * @param splitKeys array of split keys
1569   * @param type      Bloom type
1570   * @param blockSize block size
1571   * @param c         Configuration to use
1572   * @return A Table instance for the created table.
1573   * @throws IOException if getAdmin or createTable fails
1574   */
1575
1576  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1577    BloomType type, int blockSize, Configuration c) throws IOException {
1578    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1579    for (byte[] family : families) {
1580      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1581        .setBloomFilterType(type).setBlocksize(blockSize);
1582      if (isNewVersionBehaviorEnabled()) {
1583        cfdb.setNewVersionBehavior(true);
1584      }
1585      builder.setColumnFamily(cfdb.build());
1586    }
1587    TableDescriptor td = builder.build();
1588    if (splitKeys != null) {
1589      getAdmin().createTable(td, splitKeys);
1590    } else {
1591      getAdmin().createTable(td);
1592    }
1593    // HBaseAdmin only waits for regions to appear in hbase:meta
1594    // we should wait until they are assigned
1595    waitUntilAllRegionsAssigned(td.getTableName());
1596    return getConnection().getTable(td.getTableName());
1597  }
1598
1599  /**
1600   * Create a table.
1601   * @param htd       table descriptor
1602   * @param splitRows array of split keys
1603   * @return A Table instance for the created table.
1604   */
1605  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1606    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1607    if (isNewVersionBehaviorEnabled()) {
1608      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1609        builder.setColumnFamily(
1610          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1611      }
1612    }
1613    if (splitRows != null) {
1614      getAdmin().createTable(builder.build(), splitRows);
1615    } else {
1616      getAdmin().createTable(builder.build());
1617    }
1618    // HBaseAdmin only waits for regions to appear in hbase:meta
1619    // we should wait until they are assigned
1620    waitUntilAllRegionsAssigned(htd.getTableName());
1621    return getConnection().getTable(htd.getTableName());
1622  }
1623
1624  /**
1625   * Create a table.
1626   * @param tableName    the table name
1627   * @param families     the families
1628   * @param splitKeys    the split keys
1629   * @param replicaCount the replica count
1630   * @param c            Configuration to use
1631   * @return A Table instance for the created table.
1632   */
1633  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1634    int replicaCount, final Configuration c) throws IOException {
1635    TableDescriptor htd =
1636      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1637    return createTable(htd, families, splitKeys, c);
1638  }
1639
1640  /**
1641   * Create a table.
1642   * @return A Table instance for the created table.
1643   */
1644  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1645    return createTable(tableName, new byte[][] { family }, numVersions);
1646  }
1647
1648  /**
1649   * Create a table.
1650   * @return A Table instance for the created table.
1651   */
1652  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1653    throws IOException {
1654    return createTable(tableName, families, numVersions, (byte[][]) null);
1655  }
1656
1657  /**
1658   * Create a table.
1659   * @return A Table instance for the created table.
1660   */
1661  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1662    byte[][] splitKeys) throws IOException {
1663    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1664    for (byte[] family : families) {
1665      ColumnFamilyDescriptorBuilder cfBuilder =
1666        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1667      if (isNewVersionBehaviorEnabled()) {
1668        cfBuilder.setNewVersionBehavior(true);
1669      }
1670      builder.setColumnFamily(cfBuilder.build());
1671    }
1672    if (splitKeys != null) {
1673      getAdmin().createTable(builder.build(), splitKeys);
1674    } else {
1675      getAdmin().createTable(builder.build());
1676    }
1677    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1678    // assigned
1679    waitUntilAllRegionsAssigned(tableName);
1680    return getConnection().getTable(tableName);
1681  }
1682
1683  /**
1684   * Create a table with multiple regions.
1685   * @return A Table instance for the created table.
1686   */
1687  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1688    throws IOException {
1689    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1690  }
1691
1692  /**
1693   * Create a table.
1694   * @return A Table instance for the created table.
1695   */
1696  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1697    throws IOException {
1698    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1699    for (byte[] family : families) {
1700      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1701        .setMaxVersions(numVersions).setBlocksize(blockSize);
1702      if (isNewVersionBehaviorEnabled()) {
1703        cfBuilder.setNewVersionBehavior(true);
1704      }
1705      builder.setColumnFamily(cfBuilder.build());
1706    }
1707    getAdmin().createTable(builder.build());
1708    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1709    // assigned
1710    waitUntilAllRegionsAssigned(tableName);
1711    return getConnection().getTable(tableName);
1712  }
1713
1714  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1715    String cpName) throws IOException {
1716    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1717    for (byte[] family : families) {
1718      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1719        .setMaxVersions(numVersions).setBlocksize(blockSize);
1720      if (isNewVersionBehaviorEnabled()) {
1721        cfBuilder.setNewVersionBehavior(true);
1722      }
1723      builder.setColumnFamily(cfBuilder.build());
1724    }
1725    if (cpName != null) {
1726      builder.setCoprocessor(cpName);
1727    }
1728    getAdmin().createTable(builder.build());
1729    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1730    // assigned
1731    waitUntilAllRegionsAssigned(tableName);
1732    return getConnection().getTable(tableName);
1733  }
1734
1735  /**
1736   * Create a table.
1737   * @return A Table instance for the created table.
1738   */
1739  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1740    throws IOException {
1741    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1742    int i = 0;
1743    for (byte[] family : families) {
1744      ColumnFamilyDescriptorBuilder cfBuilder =
1745        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1746      if (isNewVersionBehaviorEnabled()) {
1747        cfBuilder.setNewVersionBehavior(true);
1748      }
1749      builder.setColumnFamily(cfBuilder.build());
1750      i++;
1751    }
1752    getAdmin().createTable(builder.build());
1753    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1754    // assigned
1755    waitUntilAllRegionsAssigned(tableName);
1756    return getConnection().getTable(tableName);
1757  }
1758
1759  /**
1760   * Create a table.
1761   * @return A Table instance for the created table.
1762   */
1763  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1764    throws IOException {
1765    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1766    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1767    if (isNewVersionBehaviorEnabled()) {
1768      cfBuilder.setNewVersionBehavior(true);
1769    }
1770    builder.setColumnFamily(cfBuilder.build());
1771    getAdmin().createTable(builder.build(), splitRows);
1772    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1773    // assigned
1774    waitUntilAllRegionsAssigned(tableName);
1775    return getConnection().getTable(tableName);
1776  }
1777
1778  /**
1779   * Create a table with multiple regions.
1780   * @return A Table instance for the created table.
1781   */
1782  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1783    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1784  }
1785
1786  /**
1787   * Modify a table, synchronous.
1788   * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use
1789   *             {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now.
1790   * @see Admin#modifyTable(TableDescriptor)
1791   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a>
1792   */
1793  @Deprecated
1794  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1795    throws IOException, InterruptedException {
1796    admin.modifyTable(desc);
1797  }
1798
1799  /**
1800   * Set the number of Region replicas.
1801   */
1802  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1803    throws IOException, InterruptedException {
1804    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1805      .setRegionReplication(replicaCount).build();
1806    admin.modifyTable(desc);
1807  }
1808
1809  /**
1810   * Drop an existing table
1811   * @param tableName existing table
1812   */
1813  public void deleteTable(TableName tableName) throws IOException {
1814    try {
1815      getAdmin().disableTable(tableName);
1816    } catch (TableNotEnabledException e) {
1817      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1818    }
1819    getAdmin().deleteTable(tableName);
1820  }
1821
1822  /**
1823   * Drop an existing table
1824   * @param tableName existing table
1825   */
1826  public void deleteTableIfAny(TableName tableName) throws IOException {
1827    try {
1828      deleteTable(tableName);
1829    } catch (TableNotFoundException e) {
1830      // ignore
1831    }
1832  }
1833
1834  // ==========================================================================
1835  // Canned table and table descriptor creation
1836
1837  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1838  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1839  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1840  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1841  private static final int MAXVERSIONS = 3;
1842
1843  public static final char FIRST_CHAR = 'a';
1844  public static final char LAST_CHAR = 'z';
1845  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1846  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1847
1848  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1849    return createModifyableTableDescriptor(TableName.valueOf(name),
1850      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1851      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1852  }
1853
1854  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1855    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1856    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1857    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1858      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1859        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1860        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1861      if (isNewVersionBehaviorEnabled()) {
1862        cfBuilder.setNewVersionBehavior(true);
1863      }
1864      builder.setColumnFamily(cfBuilder.build());
1865    }
1866    return builder.build();
1867  }
1868
1869  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1870    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1871    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1872    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1873      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1874        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1875        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1876      if (isNewVersionBehaviorEnabled()) {
1877        cfBuilder.setNewVersionBehavior(true);
1878      }
1879      builder.setColumnFamily(cfBuilder.build());
1880    }
1881    return builder;
1882  }
1883
1884  /**
1885   * Create a table of name <code>name</code>.
1886   * @param name Name to give table.
1887   * @return Column descriptor.
1888   */
1889  public TableDescriptor createTableDescriptor(final TableName name) {
1890    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1891      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1892  }
1893
1894  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1895    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1896  }
1897
1898  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1899    int maxVersions) {
1900    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1901    for (byte[] family : families) {
1902      ColumnFamilyDescriptorBuilder cfBuilder =
1903        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1904      if (isNewVersionBehaviorEnabled()) {
1905        cfBuilder.setNewVersionBehavior(true);
1906      }
1907      builder.setColumnFamily(cfBuilder.build());
1908    }
1909    return builder.build();
1910  }
1911
1912  /**
1913   * Create an HRegion that writes to the local tmp dirs
1914   * @param desc     a table descriptor indicating which table the region belongs to
1915   * @param startKey the start boundary of the region
1916   * @param endKey   the end boundary of the region
1917   * @return a region that writes to local dir for testing
1918   */
1919  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1920    throws IOException {
1921    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1922      .setEndKey(endKey).build();
1923    return createLocalHRegion(hri, desc);
1924  }
1925
1926  /**
1927   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1928   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1929   */
1930  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1931    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1932  }
1933
1934  /**
1935   * Create an HRegion that writes to the local tmp dirs with specified wal
1936   * @param info regioninfo
1937   * @param conf configuration
1938   * @param desc table descriptor
1939   * @param wal  wal for this region.
1940   * @return created hregion
1941   */
1942  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1943    WAL wal) throws IOException {
1944    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1945  }
1946
1947  /**
1948   * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
1949   * when done.
1950   */
1951  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1952    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1953    throws IOException {
1954    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1955      durability, wal, null, families);
1956  }
1957
1958  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1959    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1960    boolean[] compactedMemStore, byte[]... families) throws IOException {
1961    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1962    builder.setReadOnly(isReadOnly);
1963    int i = 0;
1964    for (byte[] family : families) {
1965      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1966      if (compactedMemStore != null && i < compactedMemStore.length) {
1967        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1968      } else {
1969        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1970
1971      }
1972      i++;
1973      // Set default to be three versions.
1974      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1975      builder.setColumnFamily(cfBuilder.build());
1976    }
1977    builder.setDurability(durability);
1978    RegionInfo info =
1979      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1980    return createLocalHRegion(info, conf, builder.build(), wal);
1981  }
1982
1983  //
1984  // ==========================================================================
1985
1986  /**
1987   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1988   * read.
1989   * @param tableName existing table
1990   * @return HTable to that new table
1991   */
1992  public Table deleteTableData(TableName tableName) throws IOException {
1993    Table table = getConnection().getTable(tableName);
1994    Scan scan = new Scan();
1995    ResultScanner resScan = table.getScanner(scan);
1996    for (Result res : resScan) {
1997      Delete del = new Delete(res.getRow());
1998      table.delete(del);
1999    }
2000    resScan = table.getScanner(scan);
2001    resScan.close();
2002    return table;
2003  }
2004
2005  /**
2006   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2007   * table.
2008   * @param tableName       table which must exist.
2009   * @param preserveRegions keep the existing split points
2010   * @return HTable for the new table
2011   */
2012  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
2013    throws IOException {
2014    Admin admin = getAdmin();
2015    if (!admin.isTableDisabled(tableName)) {
2016      admin.disableTable(tableName);
2017    }
2018    admin.truncateTable(tableName, preserveRegions);
2019    return getConnection().getTable(tableName);
2020  }
2021
2022  /**
2023   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
2024   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
2025   * preserve regions of existing table.
2026   * @param tableName table which must exist.
2027   * @return HTable for the new table
2028   */
2029  public Table truncateTable(final TableName tableName) throws IOException {
2030    return truncateTable(tableName, false);
2031  }
2032
2033  /**
2034   * Load table with rows from 'aaa' to 'zzz'.
2035   * @param t Table
2036   * @param f Family
2037   * @return Count of rows loaded.
2038   */
2039  public int loadTable(final Table t, final byte[] f) throws IOException {
2040    return loadTable(t, new byte[][] { f });
2041  }
2042
2043  /**
2044   * Load table with rows from 'aaa' to 'zzz'.
2045   * @param t Table
2046   * @param f Family
2047   * @return Count of rows loaded.
2048   */
2049  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2050    return loadTable(t, new byte[][] { f }, null, writeToWAL);
2051  }
2052
2053  /**
2054   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2055   * @param t Table
2056   * @param f Array of Families to load
2057   * @return Count of rows loaded.
2058   */
2059  public int loadTable(final Table t, final byte[][] f) throws IOException {
2060    return loadTable(t, f, null);
2061  }
2062
2063  /**
2064   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2065   * @param t     Table
2066   * @param f     Array of Families to load
2067   * @param value the values of the cells. If null is passed, the row key is used as value
2068   * @return Count of rows loaded.
2069   */
2070  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2071    return loadTable(t, f, value, true);
2072  }
2073
2074  /**
2075   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2076   * @param t     Table
2077   * @param f     Array of Families to load
2078   * @param value the values of the cells. If null is passed, the row key is used as value
2079   * @return Count of rows loaded.
2080   */
2081  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
2082    throws IOException {
2083    List<Put> puts = new ArrayList<>();
2084    for (byte[] row : HBaseTestingUtility.ROWS) {
2085      Put put = new Put(row);
2086      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2087      for (int i = 0; i < f.length; i++) {
2088        byte[] value1 = value != null ? value : row;
2089        put.addColumn(f[i], f[i], value1);
2090      }
2091      puts.add(put);
2092    }
2093    t.put(puts);
2094    return puts.size();
2095  }
2096
2097  /**
2098   * A tracker for tracking and validating table rows generated with
2099   * {@link HBaseTestingUtility#loadTable(Table, byte[])}
2100   */
2101  public static class SeenRowTracker {
2102    int dim = 'z' - 'a' + 1;
2103    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
2104    byte[] startRow;
2105    byte[] stopRow;
2106
2107    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2108      this.startRow = startRow;
2109      this.stopRow = stopRow;
2110    }
2111
2112    void reset() {
2113      for (byte[] row : ROWS) {
2114        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2115      }
2116    }
2117
2118    int i(byte b) {
2119      return b - 'a';
2120    }
2121
2122    public void addRow(byte[] row) {
2123      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2124    }
2125
2126    /**
2127     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
2128     * rows none
2129     */
2130    public void validate() {
2131      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2132        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2133          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2134            int count = seenRows[i(b1)][i(b2)][i(b3)];
2135            int expectedCount = 0;
2136            if (
2137              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
2138                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
2139            ) {
2140              expectedCount = 1;
2141            }
2142            if (count != expectedCount) {
2143              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
2144              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
2145                + "instead of " + expectedCount);
2146            }
2147          }
2148        }
2149      }
2150    }
2151  }
2152
2153  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2154    return loadRegion(r, f, false);
2155  }
2156
2157  public int loadRegion(final Region r, final byte[] f) throws IOException {
2158    return loadRegion((HRegion) r, f);
2159  }
2160
2161  /**
2162   * Load region with rows from 'aaa' to 'zzz'.
2163   * @param r     Region
2164   * @param f     Family
2165   * @param flush flush the cache if true
2166   * @return Count of rows loaded.
2167   */
2168  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
2169    byte[] k = new byte[3];
2170    int rowCount = 0;
2171    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2172      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2173        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2174          k[0] = b1;
2175          k[1] = b2;
2176          k[2] = b3;
2177          Put put = new Put(k);
2178          put.setDurability(Durability.SKIP_WAL);
2179          put.addColumn(f, null, k);
2180          if (r.getWAL() == null) {
2181            put.setDurability(Durability.SKIP_WAL);
2182          }
2183          int preRowCount = rowCount;
2184          int pause = 10;
2185          int maxPause = 1000;
2186          while (rowCount == preRowCount) {
2187            try {
2188              r.put(put);
2189              rowCount++;
2190            } catch (RegionTooBusyException e) {
2191              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2192              Threads.sleep(pause);
2193            }
2194          }
2195        }
2196      }
2197      if (flush) {
2198        r.flush(true);
2199      }
2200    }
2201    return rowCount;
2202  }
2203
2204  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2205    throws IOException {
2206    for (int i = startRow; i < endRow; i++) {
2207      byte[] data = Bytes.toBytes(String.valueOf(i));
2208      Put put = new Put(data);
2209      put.addColumn(f, null, data);
2210      t.put(put);
2211    }
2212  }
2213
2214  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2215    throws IOException {
2216    byte[] row = new byte[rowSize];
2217    for (int i = 0; i < totalRows; i++) {
2218      Bytes.random(row);
2219      Put put = new Put(row);
2220      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
2221      t.put(put);
2222    }
2223  }
2224
2225  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2226    int replicaId) throws IOException {
2227    for (int i = startRow; i < endRow; i++) {
2228      String failMsg = "Failed verification of row :" + i;
2229      byte[] data = Bytes.toBytes(String.valueOf(i));
2230      Get get = new Get(data);
2231      get.setReplicaId(replicaId);
2232      get.setConsistency(Consistency.TIMELINE);
2233      Result result = table.get(get);
2234      if (!result.containsColumn(f, null)) {
2235        throw new AssertionError(failMsg);
2236      }
2237      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2238      Cell cell = result.getColumnLatestCell(f, null);
2239      if (
2240        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2241          cell.getValueLength())
2242      ) {
2243        throw new AssertionError(failMsg);
2244      }
2245    }
2246  }
2247
2248  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2249    throws IOException {
2250    verifyNumericRows((HRegion) region, f, startRow, endRow);
2251  }
2252
2253  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2254    throws IOException {
2255    verifyNumericRows(region, f, startRow, endRow, true);
2256  }
2257
2258  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2259    final boolean present) throws IOException {
2260    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2261  }
2262
2263  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2264    final boolean present) throws IOException {
2265    for (int i = startRow; i < endRow; i++) {
2266      String failMsg = "Failed verification of row :" + i;
2267      byte[] data = Bytes.toBytes(String.valueOf(i));
2268      Result result = region.get(new Get(data));
2269
2270      boolean hasResult = result != null && !result.isEmpty();
2271      if (present != hasResult) {
2272        throw new AssertionError(
2273          failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">");
2274      }
2275      if (!present) continue;
2276
2277      if (!result.containsColumn(f, null)) {
2278        throw new AssertionError(failMsg);
2279      }
2280      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2281      Cell cell = result.getColumnLatestCell(f, null);
2282      if (
2283        !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2284          cell.getValueLength())
2285      ) {
2286        throw new AssertionError(failMsg);
2287      }
2288    }
2289  }
2290
2291  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2292    throws IOException {
2293    for (int i = startRow; i < endRow; i++) {
2294      byte[] data = Bytes.toBytes(String.valueOf(i));
2295      Delete delete = new Delete(data);
2296      delete.addFamily(f);
2297      t.delete(delete);
2298    }
2299  }
2300
2301  /**
2302   * Return the number of rows in the given table.
2303   * @param table to count rows
2304   * @return count of rows
2305   */
2306  public static int countRows(final Table table) throws IOException {
2307    return countRows(table, new Scan());
2308  }
2309
2310  public static int countRows(final Table table, final Scan scan) throws IOException {
2311    try (ResultScanner results = table.getScanner(scan)) {
2312      int count = 0;
2313      while (results.next() != null) {
2314        count++;
2315      }
2316      return count;
2317    }
2318  }
2319
2320  public int countRows(final Table table, final byte[]... families) throws IOException {
2321    Scan scan = new Scan();
2322    for (byte[] family : families) {
2323      scan.addFamily(family);
2324    }
2325    return countRows(table, scan);
2326  }
2327
2328  /**
2329   * Return the number of rows in the given table.
2330   */
2331  public int countRows(final TableName tableName) throws IOException {
2332    Table table = getConnection().getTable(tableName);
2333    try {
2334      return countRows(table);
2335    } finally {
2336      table.close();
2337    }
2338  }
2339
2340  public int countRows(final Region region) throws IOException {
2341    return countRows(region, new Scan());
2342  }
2343
2344  public int countRows(final Region region, final Scan scan) throws IOException {
2345    InternalScanner scanner = region.getScanner(scan);
2346    try {
2347      return countRows(scanner);
2348    } finally {
2349      scanner.close();
2350    }
2351  }
2352
2353  public int countRows(final InternalScanner scanner) throws IOException {
2354    int scannedCount = 0;
2355    List<Cell> results = new ArrayList<>();
2356    boolean hasMore = true;
2357    while (hasMore) {
2358      hasMore = scanner.next(results);
2359      scannedCount += results.size();
2360      results.clear();
2361    }
2362    return scannedCount;
2363  }
2364
2365  /**
2366   * Return an md5 digest of the entire contents of a table.
2367   */
2368  public String checksumRows(final Table table) throws Exception {
2369
2370    Scan scan = new Scan();
2371    ResultScanner results = table.getScanner(scan);
2372    MessageDigest digest = MessageDigest.getInstance("MD5");
2373    for (Result res : results) {
2374      digest.update(res.getRow());
2375    }
2376    results.close();
2377    return digest.toString();
2378  }
2379
2380  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2381  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2382  static {
2383    int i = 0;
2384    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2385      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2386        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2387          ROWS[i][0] = b1;
2388          ROWS[i][1] = b2;
2389          ROWS[i][2] = b3;
2390          i++;
2391        }
2392      }
2393    }
2394  }
2395
2396  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2397    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2398    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2399    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2400    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2401    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2402    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2403
2404  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2405    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2406    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2407    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2408    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2409    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2410    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2411
2412  /**
2413   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2414   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2415   * @return list of region info for regions added to meta
2416   */
2417  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2418    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2419    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2420    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2421    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2422    MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2423      TableState.State.ENABLED);
2424    // add custom ones
2425    for (int i = 0; i < startKeys.length; i++) {
2426      int j = (i + 1) % startKeys.length;
2427      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2428        .setEndKey(startKeys[j]).build();
2429      MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2430      newRegions.add(hri);
2431    }
2432
2433    meta.close();
2434    return newRegions;
2435  }
2436
2437  /**
2438   * Create an unmanaged WAL. Be sure to close it when you're through.
2439   */
2440  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2441    throws IOException {
2442    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2443    // unless I pass along via the conf.
2444    Configuration confForWAL = new Configuration(conf);
2445    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2446    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2447  }
2448
2449  /**
2450   * Create a region with it's own WAL. Be sure to call
2451   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2452   */
2453  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2454    final Configuration conf, final TableDescriptor htd) throws IOException {
2455    return createRegionAndWAL(info, rootDir, conf, htd, true);
2456  }
2457
2458  /**
2459   * Create a region with it's own WAL. Be sure to call
2460   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2461   */
2462  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2463    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2464    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2465    region.setBlockCache(blockCache);
2466    region.initialize();
2467    return region;
2468  }
2469
2470  /**
2471   * Create a region with it's own WAL. Be sure to call
2472   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2473   */
2474  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2475    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2476    throws IOException {
2477    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2478    region.setMobFileCache(mobFileCache);
2479    region.initialize();
2480    return region;
2481  }
2482
2483  /**
2484   * Create a region with it's own WAL. Be sure to call
2485   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2486   */
2487  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2488    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2489    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2490      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2491    WAL wal = createWal(conf, rootDir, info);
2492    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2493  }
2494
2495  /**
2496   * Returns all rows from the hbase:meta table.
2497   * @throws IOException When reading the rows fails.
2498   */
2499  public List<byte[]> getMetaTableRows() throws IOException {
2500    // TODO: Redo using MetaTableAccessor class
2501    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2502    List<byte[]> rows = new ArrayList<>();
2503    ResultScanner s = t.getScanner(new Scan());
2504    for (Result result : s) {
2505      LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()));
2506      rows.add(result.getRow());
2507    }
2508    s.close();
2509    t.close();
2510    return rows;
2511  }
2512
2513  /**
2514   * Returns all rows from the hbase:meta table for a given user table
2515   * @throws IOException When reading the rows fails.
2516   */
2517  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2518    // TODO: Redo using MetaTableAccessor.
2519    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2520    List<byte[]> rows = new ArrayList<>();
2521    ResultScanner s = t.getScanner(new Scan());
2522    for (Result result : s) {
2523      RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
2524      if (info == null) {
2525        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2526        // TODO figure out what to do for this new hosed case.
2527        continue;
2528      }
2529
2530      if (info.getTable().equals(tableName)) {
2531        LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info);
2532        rows.add(result.getRow());
2533      }
2534    }
2535    s.close();
2536    t.close();
2537    return rows;
2538  }
2539
2540  /**
2541   * Returns all regions of the specified table
2542   * @param tableName the table name
2543   * @return all regions of the specified table
2544   * @throws IOException when getting the regions fails.
2545   */
2546  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2547    try (Admin admin = getConnection().getAdmin()) {
2548      return admin.getRegions(tableName);
2549    }
2550  }
2551
2552  /**
2553   * Find any other region server which is different from the one identified by parameter
2554   * @return another region server
2555   */
2556  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2557    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2558      if (!(rst.getRegionServer() == rs)) {
2559        return rst.getRegionServer();
2560      }
2561    }
2562    return null;
2563  }
2564
2565  /**
2566   * Tool to get the reference to the region server object that holds the region of the specified
2567   * user table.
2568   * @param tableName user table to lookup in hbase:meta
2569   * @return region server that holds it, null if the row doesn't exist
2570   */
2571  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2572    throws IOException, InterruptedException {
2573    List<RegionInfo> regions = getRegions(tableName);
2574    if (regions == null || regions.isEmpty()) {
2575      return null;
2576    }
2577    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2578
2579    byte[] firstRegionName =
2580      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2581        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2582
2583    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2584    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2585      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2586    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2587      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2588    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2589    while (retrier.shouldRetry()) {
2590      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2591      if (index != -1) {
2592        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2593      }
2594      // Came back -1. Region may not be online yet. Sleep a while.
2595      retrier.sleepUntilNextRetry();
2596    }
2597    return null;
2598  }
2599
2600  /**
2601   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2602   * @throws IOException When starting the cluster fails.
2603   */
2604  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2605    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2606    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2607      "99.0");
2608    startMiniMapReduceCluster(2);
2609    return mrCluster;
2610  }
2611
2612  /**
2613   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2614   * internal static LOG_DIR variable.
2615   */
2616  private void forceChangeTaskLogDir() {
2617    Field logDirField;
2618    try {
2619      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2620      logDirField.setAccessible(true);
2621
2622      Field modifiersField = ReflectionUtils.getModifiersField();
2623      modifiersField.setAccessible(true);
2624      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2625
2626      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2627    } catch (SecurityException e) {
2628      throw new RuntimeException(e);
2629    } catch (NoSuchFieldException e) {
2630      throw new RuntimeException(e);
2631    } catch (IllegalArgumentException e) {
2632      throw new RuntimeException(e);
2633    } catch (IllegalAccessException e) {
2634      throw new RuntimeException(e);
2635    }
2636  }
2637
2638  /**
2639   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2640   * filesystem.
2641   * @param servers The number of <code>TaskTracker</code>'s to start.
2642   * @throws IOException When starting the cluster fails.
2643   */
2644  private void startMiniMapReduceCluster(final int servers) throws IOException {
2645    if (mrCluster != null) {
2646      throw new IllegalStateException("MiniMRCluster is already running");
2647    }
2648    LOG.info("Starting mini mapreduce cluster...");
2649    setupClusterTestDir();
2650    createDirsAndSetProperties();
2651
2652    forceChangeTaskLogDir();
2653
2654    //// hadoop2 specific settings
2655    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2656    // we up the VM usable so that processes don't get killed.
2657    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2658
2659    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2660    // this avoids the problem by disabling speculative task execution in tests.
2661    conf.setBoolean("mapreduce.map.speculative", false);
2662    conf.setBoolean("mapreduce.reduce.speculative", false);
2663    ////
2664
2665    // Allow the user to override FS URI for this map-reduce cluster to use.
2666    mrCluster =
2667      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2668        1, null, null, new JobConf(this.conf));
2669    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2670    if (jobConf == null) {
2671      jobConf = mrCluster.createJobConf();
2672    }
2673    // Hadoop MiniMR overwrites this while it should not
2674    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2675    LOG.info("Mini mapreduce cluster started");
2676
2677    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2678    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2679    // necessary config properties here. YARN-129 required adding a few properties.
2680    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2681    // this for mrv2 support; mr1 ignores this
2682    conf.set("mapreduce.framework.name", "yarn");
2683    conf.setBoolean("yarn.is.minicluster", true);
2684    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2685    if (rmAddress != null) {
2686      conf.set("yarn.resourcemanager.address", rmAddress);
2687    }
2688    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2689    if (historyAddress != null) {
2690      conf.set("mapreduce.jobhistory.address", historyAddress);
2691    }
2692    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2693    if (schedulerAddress != null) {
2694      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2695    }
2696    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2697    if (mrJobHistoryWebappAddress != null) {
2698      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2699    }
2700    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2701    if (yarnRMWebappAddress != null) {
2702      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2703    }
2704  }
2705
2706  /**
2707   * Stops the previously started <code>MiniMRCluster</code>.
2708   */
2709  public void shutdownMiniMapReduceCluster() {
2710    if (mrCluster != null) {
2711      LOG.info("Stopping mini mapreduce cluster...");
2712      mrCluster.shutdown();
2713      mrCluster = null;
2714      LOG.info("Mini mapreduce cluster stopped");
2715    }
2716    // Restore configuration to point to local jobtracker
2717    conf.set("mapreduce.jobtracker.address", "local");
2718  }
2719
2720  /**
2721   * Create a stubbed out RegionServerService, mainly for getting FS.
2722   */
2723  public RegionServerServices createMockRegionServerService() throws IOException {
2724    return createMockRegionServerService((ServerName) null);
2725  }
2726
2727  /**
2728   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2729   * TestTokenAuthentication
2730   */
2731  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2732    throws IOException {
2733    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2734    rss.setFileSystem(getTestFileSystem());
2735    rss.setRpcServer(rpc);
2736    return rss;
2737  }
2738
2739  /**
2740   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2741   * TestOpenRegionHandler
2742   */
2743  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2744    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2745    rss.setFileSystem(getTestFileSystem());
2746    return rss;
2747  }
2748
2749  /**
2750   * Switches the logger for the given class to DEBUG level.
2751   * @param clazz The class for which to switch to debug logging.
2752   * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as
2753   *             HBase only uses log4j. You should do this by your own as it you know which log
2754   *             framework you are using then set the log level to debug is very easy.
2755   */
2756  @Deprecated
2757  public void enableDebug(Class<?> clazz) {
2758    Log4jUtils.enableDebug(clazz);
2759  }
2760
2761  /**
2762   * Expire the Master's session
2763   */
2764  public void expireMasterSession() throws Exception {
2765    HMaster master = getMiniHBaseCluster().getMaster();
2766    expireSession(master.getZooKeeper(), false);
2767  }
2768
2769  /**
2770   * Expire a region server's session
2771   * @param index which RS
2772   */
2773  public void expireRegionServerSession(int index) throws Exception {
2774    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2775    expireSession(rs.getZooKeeper(), false);
2776    decrementMinRegionServerCount();
2777  }
2778
2779  private void decrementMinRegionServerCount() {
2780    // decrement the count for this.conf, for newly spwaned master
2781    // this.hbaseCluster shares this configuration too
2782    decrementMinRegionServerCount(getConfiguration());
2783
2784    // each master thread keeps a copy of configuration
2785    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2786      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2787    }
2788  }
2789
2790  private void decrementMinRegionServerCount(Configuration conf) {
2791    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2792    if (currentCount != -1) {
2793      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2794    }
2795  }
2796
2797  public void expireSession(ZKWatcher nodeZK) throws Exception {
2798    expireSession(nodeZK, false);
2799  }
2800
2801  /**
2802   * Expire a ZooKeeper session as recommended in ZooKeeper documentation <a href=
2803   * "https://hbase.apache.org/docs/troubleshooting#troubleshooting-zookeeper">Troubleshooting
2804   * ZooKeeper</a>
2805   * <p/>
2806   * There are issues when doing this:
2807   * <ol>
2808   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2809   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2810   * </ol>
2811   * @param nodeZK      - the ZK watcher to expire
2812   * @param checkStatus - true to check if we can create a Table with the current configuration.
2813   */
2814  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2815    Configuration c = new Configuration(this.conf);
2816    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2817    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2818    byte[] password = zk.getSessionPasswd();
2819    long sessionID = zk.getSessionId();
2820
2821    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2822    // so we create a first watcher to be sure that the
2823    // event was sent. We expect that if our watcher receives the event
2824    // other watchers on the same machine will get is as well.
2825    // When we ask to close the connection, ZK does not close it before
2826    // we receive all the events, so don't have to capture the event, just
2827    // closing the connection should be enough.
2828    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2829      @Override
2830      public void process(WatchedEvent watchedEvent) {
2831        LOG.info("Monitor ZKW received event=" + watchedEvent);
2832      }
2833    }, sessionID, password);
2834
2835    // Making it expire
2836    ZooKeeper newZK =
2837      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2838
2839    // ensure that we have connection to the server before closing down, otherwise
2840    // the close session event will be eaten out before we start CONNECTING state
2841    long start = EnvironmentEdgeManager.currentTime();
2842    while (
2843      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2844    ) {
2845      Thread.sleep(1);
2846    }
2847    newZK.close();
2848    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2849
2850    // Now closing & waiting to be sure that the clients get it.
2851    monitor.close();
2852
2853    if (checkStatus) {
2854      getConnection().getTable(TableName.META_TABLE_NAME).close();
2855    }
2856  }
2857
2858  /**
2859   * Get the Mini HBase cluster.
2860   * @return hbase cluster
2861   * @see #getHBaseClusterInterface()
2862   */
2863  public MiniHBaseCluster getHBaseCluster() {
2864    return getMiniHBaseCluster();
2865  }
2866
2867  /**
2868   * Returns the HBaseCluster instance.
2869   * <p>
2870   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2871   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2872   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2873   * instead w/o the need to type-cast.
2874   */
2875  public HBaseCluster getHBaseClusterInterface() {
2876    // implementation note: we should rename this method as #getHBaseCluster(),
2877    // but this would require refactoring 90+ calls.
2878    return hbaseCluster;
2879  }
2880
2881  /**
2882   * Resets the connections so that the next time getConnection() is called, a new connection is
2883   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2884   * the connection is not valid anymore. TODO: There should be a more coherent way of doing this.
2885   * Unfortunately the way tests are written, not all start() stop() calls go through this class.
2886   * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult
2887   * for this wrapper class to maintain the connection state automatically. Cleaning this is a much
2888   * bigger refactor.
2889   */
2890  public void invalidateConnection() throws IOException {
2891    closeConnection();
2892    // Update the master addresses if they changed.
2893    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2894    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
2895    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2896      masterConfigBefore, masterConfAfter);
2897    conf.set(HConstants.MASTER_ADDRS_KEY,
2898      getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
2899  }
2900
2901  /**
2902   * Get a shared Connection to the cluster. this method is thread safe.
2903   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2904   */
2905  public Connection getConnection() throws IOException {
2906    return getAsyncConnection().toConnection();
2907  }
2908
2909  /**
2910   * Get a assigned Connection to the cluster. this method is thread safe.
2911   * @param user assigned user
2912   * @return A Connection with assigned user.
2913   */
2914  public Connection getConnection(User user) throws IOException {
2915    return getAsyncConnection(user).toConnection();
2916  }
2917
2918  /**
2919   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2920   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2921   *         of cluster.
2922   */
2923  public AsyncClusterConnection getAsyncConnection() throws IOException {
2924    try {
2925      return asyncConnection.updateAndGet(connection -> {
2926        if (connection == null) {
2927          try {
2928            User user = UserProvider.instantiate(conf).getCurrent();
2929            connection = getAsyncConnection(user);
2930          } catch (IOException ioe) {
2931            throw new UncheckedIOException("Failed to create connection", ioe);
2932          }
2933        }
2934        return connection;
2935      });
2936    } catch (UncheckedIOException exception) {
2937      throw exception.getCause();
2938    }
2939  }
2940
2941  /**
2942   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2943   * @param user assigned user
2944   * @return An AsyncClusterConnection with assigned user.
2945   */
2946  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2947    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2948  }
2949
2950  public void closeConnection() throws IOException {
2951    if (hbaseAdmin != null) {
2952      Closeables.close(hbaseAdmin, true);
2953      hbaseAdmin = null;
2954    }
2955    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2956    if (asyncConnection != null) {
2957      Closeables.close(asyncConnection, true);
2958    }
2959  }
2960
2961  /**
2962   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2963   * it has no effect, it will be closed automatically when the cluster shutdowns
2964   */
2965  public Admin getAdmin() throws IOException {
2966    if (hbaseAdmin == null) {
2967      this.hbaseAdmin = getConnection().getAdmin();
2968    }
2969    return hbaseAdmin;
2970  }
2971
2972  public KeymetaAdminClient getKeymetaAdmin() throws IOException {
2973    return new KeymetaAdminClient(getConnection());
2974  }
2975
2976  /**
2977   * Returns an {@link Hbck} instance. Needs be closed when done.
2978   */
2979  public Hbck getHbck() throws IOException {
2980    return getConnection().getHbck();
2981  }
2982
2983  /**
2984   * Unassign the named region.
2985   * @param regionName The region to unassign.
2986   */
2987  public void unassignRegion(String regionName) throws IOException {
2988    unassignRegion(Bytes.toBytes(regionName));
2989  }
2990
2991  /**
2992   * Unassign the named region.
2993   * @param regionName The region to unassign.
2994   */
2995  public void unassignRegion(byte[] regionName) throws IOException {
2996    getAdmin().unassign(regionName, true);
2997  }
2998
2999  /**
3000   * Closes the region containing the given row.
3001   * @param row   The row to find the containing region.
3002   * @param table The table to find the region.
3003   */
3004  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
3005    unassignRegionByRow(Bytes.toBytes(row), table);
3006  }
3007
3008  /**
3009   * Closes the region containing the given row.
3010   * @param row   The row to find the containing region.
3011   * @param table The table to find the region.
3012   */
3013  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
3014    HRegionLocation hrl = table.getRegionLocation(row);
3015    unassignRegion(hrl.getRegion().getRegionName());
3016  }
3017
3018  /**
3019   * Retrieves a splittable region randomly from tableName
3020   * @param tableName   name of table
3021   * @param maxAttempts maximum number of attempts, unlimited for value of -1
3022   * @return the HRegion chosen, null if none was found within limit of maxAttempts
3023   */
3024  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3025    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3026    int regCount = regions.size();
3027    Set<Integer> attempted = new HashSet<>();
3028    int idx;
3029    int attempts = 0;
3030    do {
3031      regions = getHBaseCluster().getRegions(tableName);
3032      if (regCount != regions.size()) {
3033        // if there was region movement, clear attempted Set
3034        attempted.clear();
3035      }
3036      regCount = regions.size();
3037      // There are chances that before we get the region for the table from an RS the region may
3038      // be going for CLOSE. This may be because online schema change is enabled
3039      if (regCount > 0) {
3040        idx = ThreadLocalRandom.current().nextInt(regCount);
3041        // if we have just tried this region, there is no need to try again
3042        if (attempted.contains(idx)) {
3043          continue;
3044        }
3045        HRegion region = regions.get(idx);
3046        if (region.checkSplit().isPresent()) {
3047          return region;
3048        }
3049        attempted.add(idx);
3050      }
3051      attempts++;
3052    } while (maxAttempts == -1 || attempts < maxAttempts);
3053    return null;
3054  }
3055
3056  public MiniDFSCluster getDFSCluster() {
3057    return dfsCluster;
3058  }
3059
3060  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3061    setDFSCluster(cluster, true);
3062  }
3063
3064  /**
3065   * Set the MiniDFSCluster
3066   * @param cluster     cluster to use
3067   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
3068   *                    is set.
3069   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3070   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
3071   */
3072  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3073    throws IllegalStateException, IOException {
3074    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3075      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3076    }
3077    this.dfsCluster = cluster;
3078    this.setFs();
3079  }
3080
3081  public FileSystem getTestFileSystem() throws IOException {
3082    return HFileSystem.get(conf);
3083  }
3084
3085  /**
3086   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
3087   * (30 seconds).
3088   * @param table Table to wait on.
3089   */
3090  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
3091    waitTableAvailable(table.getName(), 30000);
3092  }
3093
3094  public void waitTableAvailable(TableName table, long timeoutMillis)
3095    throws InterruptedException, IOException {
3096    waitFor(timeoutMillis, predicateTableAvailable(table));
3097  }
3098
3099  /**
3100   * Wait until all regions in a table have been assigned
3101   * @param table         Table to wait on.
3102   * @param timeoutMillis Timeout.
3103   */
3104  public void waitTableAvailable(byte[] table, long timeoutMillis)
3105    throws InterruptedException, IOException {
3106    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3107  }
3108
3109  public String explainTableAvailability(TableName tableName) throws IOException {
3110    StringBuilder msg =
3111      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
3112    if (getHBaseCluster().getMaster().isAlive()) {
3113      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
3114        .getRegionStates().getRegionAssignments();
3115      final List<Pair<RegionInfo, ServerName>> metaLocations =
3116        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
3117      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3118        RegionInfo hri = metaLocation.getFirst();
3119        ServerName sn = metaLocation.getSecond();
3120        if (!assignments.containsKey(hri)) {
3121          msg.append(", region ").append(hri)
3122            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
3123        } else if (sn == null) {
3124          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
3125        } else if (!sn.equals(assignments.get(hri))) {
3126          msg.append(",  region ").append(hri)
3127            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
3128            .append(" <> ").append(assignments.get(hri));
3129        }
3130      }
3131    }
3132    return msg.toString();
3133  }
3134
3135  public String explainTableState(final TableName table, TableState.State state)
3136    throws IOException {
3137    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
3138    if (tableState == null) {
3139      return "TableState in META: No table state in META for table " + table
3140        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3141    } else if (!tableState.inStates(state)) {
3142      return "TableState in META: Not " + state + " state, but " + tableState;
3143    } else {
3144      return "TableState in META: OK";
3145    }
3146  }
3147
3148  public TableState findLastTableState(final TableName table) throws IOException {
3149    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3150    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
3151      @Override
3152      public boolean visit(Result r) throws IOException {
3153        if (!Arrays.equals(r.getRow(), table.getName())) {
3154          return false;
3155        }
3156        TableState state = CatalogFamilyFormat.getTableState(r);
3157        if (state != null) {
3158          lastTableState.set(state);
3159        }
3160        return true;
3161      }
3162    };
3163    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
3164      Integer.MAX_VALUE, visitor);
3165    return lastTableState.get();
3166  }
3167
3168  /**
3169   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3170   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
3171   * table.
3172   * @param table the table to wait on.
3173   * @throws InterruptedException if interrupted while waiting
3174   * @throws IOException          if an IO problem is encountered
3175   */
3176  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
3177    waitTableEnabled(table, 30000);
3178  }
3179
3180  /**
3181   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
3182   * have been all assigned.
3183   * @see #waitTableEnabled(TableName, long)
3184   * @param table         Table to wait on.
3185   * @param timeoutMillis Time to wait on it being marked enabled.
3186   */
3187  public void waitTableEnabled(byte[] table, long timeoutMillis)
3188    throws InterruptedException, IOException {
3189    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3190  }
3191
3192  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
3193    waitFor(timeoutMillis, predicateTableEnabled(table));
3194  }
3195
3196  /**
3197   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
3198   * after default period (30 seconds)
3199   * @param table Table to wait on.
3200   */
3201  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
3202    waitTableDisabled(table, 30000);
3203  }
3204
3205  public void waitTableDisabled(TableName table, long millisTimeout)
3206    throws InterruptedException, IOException {
3207    waitFor(millisTimeout, predicateTableDisabled(table));
3208  }
3209
3210  /**
3211   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
3212   * @param table         Table to wait on.
3213   * @param timeoutMillis Time to wait on it being marked disabled.
3214   */
3215  public void waitTableDisabled(byte[] table, long timeoutMillis)
3216    throws InterruptedException, IOException {
3217    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3218  }
3219
3220  /**
3221   * Make sure that at least the specified number of region servers are running
3222   * @param num minimum number of region servers that should be running
3223   * @return true if we started some servers
3224   */
3225  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
3226    boolean startedServer = false;
3227    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3228    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
3229      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3230      startedServer = true;
3231    }
3232
3233    return startedServer;
3234  }
3235
3236  /**
3237   * Make sure that at least the specified number of region servers are running. We don't count the
3238   * ones that are currently stopping or are stopped.
3239   * @param num minimum number of region servers that should be running
3240   * @return true if we started some servers
3241   */
3242  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
3243    boolean startedServer = ensureSomeRegionServersAvailable(num);
3244
3245    int nonStoppedServers = 0;
3246    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
3247
3248      HRegionServer hrs = rst.getRegionServer();
3249      if (hrs.isStopping() || hrs.isStopped()) {
3250        LOG.info("A region server is stopped or stopping:" + hrs);
3251      } else {
3252        nonStoppedServers++;
3253      }
3254    }
3255    for (int i = nonStoppedServers; i < num; ++i) {
3256      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3257      startedServer = true;
3258    }
3259    return startedServer;
3260  }
3261
3262  /**
3263   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
3264   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
3265   * @param c                     Initial configuration
3266   * @param differentiatingSuffix Suffix to differentiate this user from others.
3267   * @return A new configuration instance with a different user set into it.
3268   */
3269  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
3270    throws IOException {
3271    FileSystem currentfs = FileSystem.get(c);
3272    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3273      return User.getCurrent();
3274    }
3275    // Else distributed filesystem. Make a new instance per daemon. Below
3276    // code is taken from the AppendTestUtil over in hdfs.
3277    String username = User.getCurrent().getName() + differentiatingSuffix;
3278    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
3279    return user;
3280  }
3281
3282  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3283    throws IOException {
3284    NavigableSet<String> online = new TreeSet<>();
3285    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3286      try {
3287        for (RegionInfo region : ProtobufUtil
3288          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3289          online.add(region.getRegionNameAsString());
3290        }
3291      } catch (RegionServerStoppedException e) {
3292        // That's fine.
3293      }
3294    }
3295    return online;
3296  }
3297
3298  /**
3299   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
3300   * linger. Here is the exception you'll see:
3301   *
3302   * <pre>
3303   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3304   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3305   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3306   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3307   * </pre>
3308   *
3309   * @param stream A DFSClient.DFSOutputStream.
3310   */
3311  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3312    try {
3313      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3314      for (Class<?> clazz : clazzes) {
3315        String className = clazz.getSimpleName();
3316        if (className.equals("DFSOutputStream")) {
3317          if (clazz.isInstance(stream)) {
3318            Field maxRecoveryErrorCountField =
3319              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3320            maxRecoveryErrorCountField.setAccessible(true);
3321            maxRecoveryErrorCountField.setInt(stream, max);
3322            break;
3323          }
3324        }
3325      }
3326    } catch (Exception e) {
3327      LOG.info("Could not set max recovery field", e);
3328    }
3329  }
3330
3331  /**
3332   * Uses directly the assignment manager to assign the region. and waits until the specified region
3333   * has completed assignment.
3334   * @return true if the region is assigned false otherwise.
3335   */
3336  public boolean assignRegion(final RegionInfo regionInfo)
3337    throws IOException, InterruptedException {
3338    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3339    am.assign(regionInfo);
3340    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3341  }
3342
3343  /**
3344   * Move region to destination server and wait till region is completely moved and online
3345   * @param destRegion region to move
3346   * @param destServer destination server of the region
3347   */
3348  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3349    throws InterruptedException, IOException {
3350    HMaster master = getMiniHBaseCluster().getMaster();
3351    // TODO: Here we start the move. The move can take a while.
3352    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3353    while (true) {
3354      ServerName serverName =
3355        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3356      if (serverName != null && serverName.equals(destServer)) {
3357        assertRegionOnServer(destRegion, serverName, 2000);
3358        break;
3359      }
3360      Thread.sleep(10);
3361    }
3362  }
3363
3364  /**
3365   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3366   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3367   * master has been informed and updated hbase:meta with the regions deployed server.
3368   * @param tableName the table name
3369   */
3370  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3371    waitUntilAllRegionsAssigned(tableName,
3372      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3373  }
3374
3375  /**
3376   * Waith until all system table's regions get assigned
3377   */
3378  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3379    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3380  }
3381
3382  /**
3383   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3384   * timeout. This means all regions have been deployed, master has been informed and updated
3385   * hbase:meta with the regions deployed server.
3386   * @param tableName the table name
3387   * @param timeout   timeout, in milliseconds
3388   */
3389  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3390    throws IOException {
3391    if (!TableName.isMetaTableName(tableName)) {
3392      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3393        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3394          + timeout + "ms");
3395        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3396          @Override
3397          public String explainFailure() throws IOException {
3398            return explainTableAvailability(tableName);
3399          }
3400
3401          @Override
3402          public boolean evaluate() throws IOException {
3403            Scan scan = new Scan();
3404            scan.addFamily(HConstants.CATALOG_FAMILY);
3405            boolean tableFound = false;
3406            try (ResultScanner s = meta.getScanner(scan)) {
3407              for (Result r; (r = s.next()) != null;) {
3408                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3409                RegionInfo info = RegionInfo.parseFromOrNull(b);
3410                if (info != null && info.getTable().equals(tableName)) {
3411                  // Get server hosting this region from catalog family. Return false if no server
3412                  // hosting this region, or if the server hosting this region was recently killed
3413                  // (for fault tolerance testing).
3414                  tableFound = true;
3415                  byte[] server =
3416                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3417                  if (server == null) {
3418                    return false;
3419                  } else {
3420                    byte[] startCode =
3421                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3422                    ServerName serverName =
3423                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3424                        + Bytes.toLong(startCode));
3425                    if (
3426                      !getHBaseClusterInterface().isDistributedCluster()
3427                        && getHBaseCluster().isKilledRS(serverName)
3428                    ) {
3429                      return false;
3430                    }
3431                  }
3432                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3433                    return false;
3434                  }
3435                }
3436              }
3437            }
3438            if (!tableFound) {
3439              LOG.warn(
3440                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3441            }
3442            return tableFound;
3443          }
3444        });
3445      }
3446    }
3447    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3448    // check from the master state if we are using a mini cluster
3449    if (!getHBaseClusterInterface().isDistributedCluster()) {
3450      // So, all regions are in the meta table but make sure master knows of the assignments before
3451      // returning -- sometimes this can lag.
3452      HMaster master = getHBaseCluster().getMaster();
3453      final RegionStates states = master.getAssignmentManager().getRegionStates();
3454      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3455        @Override
3456        public String explainFailure() throws IOException {
3457          return explainTableAvailability(tableName);
3458        }
3459
3460        @Override
3461        public boolean evaluate() throws IOException {
3462          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3463          return hris != null && !hris.isEmpty();
3464        }
3465      });
3466    }
3467    LOG.info("All regions for table " + tableName + " assigned.");
3468  }
3469
3470  /**
3471   * Do a small get/scan against one store. This is required because store has no actual methods of
3472   * querying itself, and relies on StoreScanner.
3473   */
3474  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3475    Scan scan = new Scan(get);
3476    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3477      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3478      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3479      // readpoint 0.
3480      0);
3481
3482    List<Cell> result = new ArrayList<>();
3483    scanner.next(result);
3484    if (!result.isEmpty()) {
3485      // verify that we are on the row we want:
3486      Cell kv = result.get(0);
3487      if (!CellUtil.matchingRows(kv, get.getRow())) {
3488        result.clear();
3489      }
3490    }
3491    scanner.close();
3492    return result;
3493  }
3494
3495  /**
3496   * Create region split keys between startkey and endKey
3497   * @param numRegions the number of regions to be created. it has to be greater than 3.
3498   * @return resulting split keys
3499   */
3500  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3501    if (numRegions <= 3) {
3502      throw new AssertionError();
3503    }
3504    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3505    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3506    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3507    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3508    return result;
3509  }
3510
3511  /**
3512   * Do a small get/scan against one store. This is required because store has no actual methods of
3513   * querying itself, and relies on StoreScanner.
3514   */
3515  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3516    throws IOException {
3517    Get get = new Get(row);
3518    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3519    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3520
3521    return getFromStoreFile(store, get);
3522  }
3523
3524  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3525    final List<? extends Cell> actual) {
3526    final int eLen = expected.size();
3527    final int aLen = actual.size();
3528    final int minLen = Math.min(eLen, aLen);
3529
3530    int i;
3531    for (i = 0; i < minLen
3532      && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) {
3533    }
3534
3535    if (additionalMsg == null) {
3536      additionalMsg = "";
3537    }
3538    if (!additionalMsg.isEmpty()) {
3539      additionalMsg = ". " + additionalMsg;
3540    }
3541
3542    if (eLen != aLen || i != minLen) {
3543      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3544        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3545        + " (length " + aLen + ")" + additionalMsg);
3546    }
3547  }
3548
3549  public static <T> String safeGetAsStr(List<T> lst, int i) {
3550    if (0 <= i && i < lst.size()) {
3551      return lst.get(i).toString();
3552    } else {
3553      return "<out_of_range>";
3554    }
3555  }
3556
3557  public String getRpcConnnectionURI() throws UnknownHostException {
3558    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3559  }
3560
3561  public String getZkConnectionURI() {
3562    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3563      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3564      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3565  }
3566
3567  /**
3568   * Get the zk based cluster key for this cluster.
3569   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3570   *             connection info of a cluster. Keep here only for compatibility.
3571   * @see #getRpcConnnectionURI()
3572   * @see #getZkConnectionURI()
3573   */
3574  @Deprecated
3575  public String getClusterKey() {
3576    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3577      + ":"
3578      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3579  }
3580
3581  /** Creates a random table with the given parameters */
3582  public Table createRandomTable(TableName tableName, final Collection<String> families,
3583    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3584    final int numRowsPerFlush) throws IOException, InterruptedException {
3585
3586    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3587      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3588      + maxVersions + "\n");
3589
3590    final int numCF = families.size();
3591    final byte[][] cfBytes = new byte[numCF][];
3592    {
3593      int cfIndex = 0;
3594      for (String cf : families) {
3595        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3596      }
3597    }
3598
3599    final int actualStartKey = 0;
3600    final int actualEndKey = Integer.MAX_VALUE;
3601    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3602    final int splitStartKey = actualStartKey + keysPerRegion;
3603    final int splitEndKey = actualEndKey - keysPerRegion;
3604    final String keyFormat = "%08x";
3605    final Table table = createTable(tableName, cfBytes, maxVersions,
3606      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3607      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3608
3609    if (hbaseCluster != null) {
3610      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3611    }
3612
3613    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3614
3615    final Random rand = ThreadLocalRandom.current();
3616    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3617      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3618        final byte[] row = Bytes.toBytes(
3619          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3620
3621        Put put = new Put(row);
3622        Delete del = new Delete(row);
3623        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3624          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3625          final long ts = rand.nextInt();
3626          final byte[] qual = Bytes.toBytes("col" + iCol);
3627          if (rand.nextBoolean()) {
3628            final byte[] value =
3629              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3630                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3631            put.addColumn(cf, qual, ts, value);
3632          } else if (rand.nextDouble() < 0.8) {
3633            del.addColumn(cf, qual, ts);
3634          } else {
3635            del.addColumns(cf, qual, ts);
3636          }
3637        }
3638
3639        if (!put.isEmpty()) {
3640          mutator.mutate(put);
3641        }
3642
3643        if (!del.isEmpty()) {
3644          mutator.mutate(del);
3645        }
3646      }
3647      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3648      mutator.flush();
3649      if (hbaseCluster != null) {
3650        getMiniHBaseCluster().flushcache(table.getName());
3651      }
3652    }
3653    mutator.close();
3654
3655    return table;
3656  }
3657
3658  public static int randomFreePort() {
3659    return HBaseCommonTestingUtility.randomFreePort();
3660  }
3661
3662  public static String randomMultiCastAddress() {
3663    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3664  }
3665
3666  public static void waitForHostPort(String host, int port) throws IOException {
3667    final int maxTimeMs = 10000;
3668    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3669    IOException savedException = null;
3670    LOG.info("Waiting for server at " + host + ":" + port);
3671    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3672      try {
3673        Socket sock = new Socket(InetAddress.getByName(host), port);
3674        sock.close();
3675        savedException = null;
3676        LOG.info("Server at " + host + ":" + port + " is available");
3677        break;
3678      } catch (UnknownHostException e) {
3679        throw new IOException("Failed to look up " + host, e);
3680      } catch (IOException e) {
3681        savedException = e;
3682      }
3683      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3684    }
3685
3686    if (savedException != null) {
3687      throw savedException;
3688    }
3689  }
3690
3691  /**
3692   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3693   * continues.
3694   * @return the number of regions the table was split into
3695   */
3696  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3697    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding)
3698    throws IOException {
3699    return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression,
3700      dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT);
3701  }
3702
3703  /**
3704   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3705   * continues.
3706   * @return the number of regions the table was split into
3707   */
3708  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3709    byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3710    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3711    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3712    builder.setDurability(durability);
3713    builder.setRegionReplication(regionReplication);
3714    ColumnFamilyDescriptorBuilder cfBuilder =
3715      ColumnFamilyDescriptorBuilder.newBuilder(columnFamily);
3716    cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3717    cfBuilder.setCompressionType(compression);
3718    return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(),
3719      numRegionsPerServer);
3720  }
3721
3722  /**
3723   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3724   * continues.
3725   * @return the number of regions the table was split into
3726   */
3727  public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName,
3728    byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding,
3729    int numRegionsPerServer, int regionReplication, Durability durability) throws IOException {
3730    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
3731    builder.setDurability(durability);
3732    builder.setRegionReplication(regionReplication);
3733    ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
3734    for (int i = 0; i < columnFamilies.length; i++) {
3735      ColumnFamilyDescriptorBuilder cfBuilder =
3736        ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]);
3737      cfBuilder.setDataBlockEncoding(dataBlockEncoding);
3738      cfBuilder.setCompressionType(compression);
3739      hcds[i] = cfBuilder.build();
3740    }
3741    return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer);
3742  }
3743
3744  /**
3745   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3746   * continues.
3747   * @return the number of regions the table was split into
3748   */
3749  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3750    ColumnFamilyDescriptor hcd) throws IOException {
3751    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3752  }
3753
3754  /**
3755   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3756   * continues.
3757   * @return the number of regions the table was split into
3758   */
3759  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3760    ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3761    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd },
3762      numRegionsPerServer);
3763  }
3764
3765  /**
3766   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3767   * continues.
3768   * @return the number of regions the table was split into
3769   */
3770  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc,
3771    ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3772    return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(),
3773      numRegionsPerServer);
3774  }
3775
3776  /**
3777   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
3778   * continues.
3779   * @return the number of regions the table was split into
3780   */
3781  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
3782    ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer)
3783    throws IOException {
3784    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3785    for (ColumnFamilyDescriptor cd : cds) {
3786      if (!td.hasColumnFamily(cd.getName())) {
3787        builder.setColumnFamily(cd);
3788      }
3789    }
3790    td = builder.build();
3791    int totalNumberOfRegions = 0;
3792    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3793    Admin admin = unmanagedConnection.getAdmin();
3794
3795    try {
3796      // create a table a pre-splits regions.
3797      // The number of splits is set as:
3798      // region servers * regions per region server).
3799      int numberOfServers = admin.getRegionServers().size();
3800      if (numberOfServers == 0) {
3801        throw new IllegalStateException("No live regionservers");
3802      }
3803
3804      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3805      LOG.info("Number of live regionservers: " + numberOfServers + ", "
3806        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
3807        + numRegionsPerServer + ")");
3808
3809      byte[][] splits = splitter.split(totalNumberOfRegions);
3810
3811      admin.createTable(td, splits);
3812    } catch (MasterNotRunningException e) {
3813      LOG.error("Master not running", e);
3814      throw new IOException(e);
3815    } catch (TableExistsException e) {
3816      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
3817    } finally {
3818      admin.close();
3819      unmanagedConnection.close();
3820    }
3821    return totalNumberOfRegions;
3822  }
3823
3824  public static int getMetaRSPort(Connection connection) throws IOException {
3825    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3826      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3827    }
3828  }
3829
3830  /**
3831   * Due to async racing issue, a region may not be in the online region list of a region server
3832   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3833   */
3834  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3835    final long timeout) throws IOException, InterruptedException {
3836    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3837    while (true) {
3838      List<RegionInfo> regions = getAdmin().getRegions(server);
3839      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3840      long now = EnvironmentEdgeManager.currentTime();
3841      if (now > timeoutTime) break;
3842      Thread.sleep(10);
3843    }
3844    throw new AssertionError(
3845      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3846  }
3847
3848  /**
3849   * Check to make sure the region is open on the specified region server, but not on any other one.
3850   */
3851  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3852    final long timeout) throws IOException, InterruptedException {
3853    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3854    while (true) {
3855      List<RegionInfo> regions = getAdmin().getRegions(server);
3856      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3857        List<JVMClusterUtil.RegionServerThread> rsThreads =
3858          getHBaseCluster().getLiveRegionServerThreads();
3859        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3860          HRegionServer rs = rsThread.getRegionServer();
3861          if (server.equals(rs.getServerName())) {
3862            continue;
3863          }
3864          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3865          for (HRegion r : hrs) {
3866            if (r.getRegionInfo().getRegionId() == hri.getRegionId()) {
3867              throw new AssertionError("Region should not be double assigned");
3868            }
3869          }
3870        }
3871        return; // good, we are happy
3872      }
3873      long now = EnvironmentEdgeManager.currentTime();
3874      if (now > timeoutTime) break;
3875      Thread.sleep(10);
3876    }
3877    throw new AssertionError(
3878      "Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3879  }
3880
3881  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3882    TableDescriptor td =
3883      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3884    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3885    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3886  }
3887
3888  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3889    BlockCache blockCache) throws IOException {
3890    TableDescriptor td =
3891      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3892    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3893    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3894  }
3895
3896  public static void setFileSystemURI(String fsURI) {
3897    FS_URI = fsURI;
3898  }
3899
3900  /**
3901   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3902   */
3903  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3904    return new ExplainingPredicate<IOException>() {
3905      @Override
3906      public String explainFailure() throws IOException {
3907        final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager();
3908        return "found in transition: " + am.getRegionsInTransition().toString();
3909      }
3910
3911      @Override
3912      public boolean evaluate() throws IOException {
3913        HMaster master = getMiniHBaseCluster().getMaster();
3914        if (master == null) return false;
3915        AssignmentManager am = master.getAssignmentManager();
3916        if (am == null) return false;
3917        return !am.hasRegionsInTransition();
3918      }
3919    };
3920  }
3921
3922  /**
3923   * Returns a {@link Predicate} for checking that table is enabled
3924   */
3925  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3926    return new ExplainingPredicate<IOException>() {
3927      @Override
3928      public String explainFailure() throws IOException {
3929        return explainTableState(tableName, TableState.State.ENABLED);
3930      }
3931
3932      @Override
3933      public boolean evaluate() throws IOException {
3934        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3935      }
3936    };
3937  }
3938
3939  /**
3940   * Returns a {@link Predicate} for checking that table is enabled
3941   */
3942  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3943    return new ExplainingPredicate<IOException>() {
3944      @Override
3945      public String explainFailure() throws IOException {
3946        return explainTableState(tableName, TableState.State.DISABLED);
3947      }
3948
3949      @Override
3950      public boolean evaluate() throws IOException {
3951        return getAdmin().isTableDisabled(tableName);
3952      }
3953    };
3954  }
3955
3956  /**
3957   * Returns a {@link Predicate} for checking that table is enabled
3958   */
3959  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3960    return new ExplainingPredicate<IOException>() {
3961      @Override
3962      public String explainFailure() throws IOException {
3963        return explainTableAvailability(tableName);
3964      }
3965
3966      @Override
3967      public boolean evaluate() throws IOException {
3968        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3969        if (tableAvailable) {
3970          try (Table table = getConnection().getTable(tableName)) {
3971            TableDescriptor htd = table.getDescriptor();
3972            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3973              .getAllRegionLocations()) {
3974              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3975                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3976                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3977              for (byte[] family : htd.getColumnFamilyNames()) {
3978                scan.addFamily(family);
3979              }
3980              try (ResultScanner scanner = table.getScanner(scan)) {
3981                scanner.next();
3982              }
3983            }
3984          }
3985        }
3986        return tableAvailable;
3987      }
3988    };
3989  }
3990
3991  /**
3992   * Wait until no regions in transition.
3993   * @param timeout How long to wait.
3994   */
3995  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3996    waitFor(timeout, predicateNoRegionsInTransition());
3997  }
3998
3999  /**
4000   * Wait until no regions in transition. (time limit 15min)
4001   */
4002  public void waitUntilNoRegionsInTransition() throws IOException {
4003    waitUntilNoRegionsInTransition(15 * 60000);
4004  }
4005
4006  /**
4007   * Wait until labels is ready in VisibilityLabelsCache.
4008   */
4009  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4010    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4011    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4012
4013      @Override
4014      public boolean evaluate() {
4015        for (String label : labels) {
4016          if (labelsCache.getLabelOrdinal(label) == 0) {
4017            return false;
4018          }
4019        }
4020        return true;
4021      }
4022
4023      @Override
4024      public String explainFailure() {
4025        for (String label : labels) {
4026          if (labelsCache.getLabelOrdinal(label) == 0) {
4027            return label + " is not available yet";
4028          }
4029        }
4030        return "";
4031      }
4032    });
4033  }
4034
4035  /**
4036   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4037   * available.
4038   * @return the list of column descriptors
4039   */
4040  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
4041    return generateColumnDescriptors("");
4042  }
4043
4044  /**
4045   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
4046   * available.
4047   * @param prefix family names prefix
4048   * @return the list of column descriptors
4049   */
4050  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
4051    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
4052    long familyId = 0;
4053    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
4054      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
4055        for (BloomType bloomType : BloomType.values()) {
4056          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4057          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
4058            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
4059          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
4060          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
4061          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
4062          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
4063          familyId++;
4064        }
4065      }
4066    }
4067    return columnFamilyDescriptors;
4068  }
4069
4070  /**
4071   * Get supported compression algorithms.
4072   * @return supported compression algorithms.
4073   */
4074  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4075    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4076    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4077    for (String algoName : allAlgos) {
4078      try {
4079        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4080        algo.getCompressor();
4081        supportedAlgos.add(algo);
4082      } catch (Throwable t) {
4083        // this algo is not available
4084      }
4085    }
4086    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4087  }
4088
4089  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4090    Scan scan = new Scan().withStartRow(row);
4091    scan.setReadType(ReadType.PREAD);
4092    scan.setCaching(1);
4093    scan.setReversed(true);
4094    scan.addFamily(family);
4095    try (RegionScanner scanner = r.getScanner(scan)) {
4096      List<Cell> cells = new ArrayList<>(1);
4097      scanner.next(cells);
4098      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4099        return null;
4100      }
4101      return Result.create(cells);
4102    }
4103  }
4104
4105  private boolean isTargetTable(final byte[] inRow, Cell c) {
4106    String inputRowString = Bytes.toString(inRow);
4107    int i = inputRowString.indexOf(HConstants.DELIMITER);
4108    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4109    int o = outputRowString.indexOf(HConstants.DELIMITER);
4110    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4111  }
4112
4113  /**
4114   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
4115   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
4116   * kerby KDC server and utility for using it,
4117   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
4118   * less baggage. It came in in HBASE-5291.
4119   */
4120  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4121    Properties conf = MiniKdc.createConf();
4122    conf.put(MiniKdc.DEBUG, true);
4123    MiniKdc kdc = null;
4124    File dir = null;
4125    // There is time lag between selecting a port and trying to bind with it. It's possible that
4126    // another service captures the port in between which'll result in BindException.
4127    boolean bindException;
4128    int numTries = 0;
4129    do {
4130      try {
4131        bindException = false;
4132        dir = new File(getDataTestDir("kdc").toUri().getPath());
4133        kdc = new MiniKdc(conf, dir);
4134        kdc.start();
4135      } catch (BindException e) {
4136        FileUtils.deleteDirectory(dir); // clean directory
4137        numTries++;
4138        if (numTries == 3) {
4139          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4140          throw e;
4141        }
4142        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4143        bindException = true;
4144      }
4145    } while (bindException);
4146    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4147    return kdc;
4148  }
4149
4150  public int getNumHFiles(final TableName tableName, final byte[] family) {
4151    int numHFiles = 0;
4152    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4153      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
4154    }
4155    return numHFiles;
4156  }
4157
4158  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4159    final byte[] family) {
4160    int numHFiles = 0;
4161    for (Region region : rs.getRegions(tableName)) {
4162      numHFiles += region.getStore(family).getStorefilesCount();
4163    }
4164    return numHFiles;
4165  }
4166
4167  private void assertEquals(String message, int expected, int actual) {
4168    if (expected == actual) {
4169      return;
4170    }
4171    String formatted = "";
4172    if (message != null && !"".equals(message)) {
4173      formatted = message + " ";
4174    }
4175    throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">");
4176  }
4177
4178  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4179    if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) {
4180      throw new AssertionError();
4181    }
4182    assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode());
4183    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4184    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4185    assertEquals("", ltdFamilies.size(), rtdFamilies.size());
4186    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
4187        it2 = rtdFamilies.iterator(); it.hasNext();) {
4188      assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4189    }
4190  }
4191
4192  /**
4193   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
4194   * invocations.
4195   */
4196  public static void await(final long sleepMillis, final BooleanSupplier condition)
4197    throws InterruptedException {
4198    try {
4199      while (!condition.getAsBoolean()) {
4200        Thread.sleep(sleepMillis);
4201      }
4202    } catch (RuntimeException e) {
4203      if (e.getCause() instanceof AssertionError) {
4204        throw (AssertionError) e.getCause();
4205      }
4206      throw e;
4207    }
4208  }
4209}