001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.File;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.io.UncheckedIOException;
030import java.lang.reflect.Field;
031import java.lang.reflect.Modifier;
032import java.net.BindException;
033import java.net.DatagramSocket;
034import java.net.InetAddress;
035import java.net.ServerSocket;
036import java.net.Socket;
037import java.net.UnknownHostException;
038import java.nio.charset.StandardCharsets;
039import java.security.MessageDigest;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.Collections;
044import java.util.HashSet;
045import java.util.Iterator;
046import java.util.List;
047import java.util.Map;
048import java.util.NavigableSet;
049import java.util.Properties;
050import java.util.Random;
051import java.util.Set;
052import java.util.TreeSet;
053import java.util.concurrent.ExecutionException;
054import java.util.concurrent.ThreadLocalRandom;
055import java.util.concurrent.TimeUnit;
056import java.util.concurrent.atomic.AtomicReference;
057import java.util.function.BooleanSupplier;
058import org.apache.commons.io.FileUtils;
059import org.apache.commons.lang3.RandomStringUtils;
060import org.apache.hadoop.conf.Configuration;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
064import org.apache.hadoop.hbase.Waiter.Predicate;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.AsyncAdmin;
067import org.apache.hadoop.hbase.client.AsyncClusterConnection;
068import org.apache.hadoop.hbase.client.BufferedMutator;
069import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Consistency;
075import org.apache.hadoop.hbase.client.Delete;
076import org.apache.hadoop.hbase.client.Durability;
077import org.apache.hadoop.hbase.client.Get;
078import org.apache.hadoop.hbase.client.Hbck;
079import org.apache.hadoop.hbase.client.MasterRegistry;
080import org.apache.hadoop.hbase.client.Put;
081import org.apache.hadoop.hbase.client.RegionInfo;
082import org.apache.hadoop.hbase.client.RegionInfoBuilder;
083import org.apache.hadoop.hbase.client.RegionLocator;
084import org.apache.hadoop.hbase.client.Result;
085import org.apache.hadoop.hbase.client.ResultScanner;
086import org.apache.hadoop.hbase.client.Scan;
087import org.apache.hadoop.hbase.client.Scan.ReadType;
088import org.apache.hadoop.hbase.client.Table;
089import org.apache.hadoop.hbase.client.TableDescriptor;
090import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
091import org.apache.hadoop.hbase.client.TableState;
092import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
093import org.apache.hadoop.hbase.fs.HFileSystem;
094import org.apache.hadoop.hbase.io.compress.Compression;
095import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
096import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
097import org.apache.hadoop.hbase.io.hfile.BlockCache;
098import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
099import org.apache.hadoop.hbase.io.hfile.HFile;
100import org.apache.hadoop.hbase.ipc.RpcServerInterface;
101import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
102import org.apache.hadoop.hbase.master.HMaster;
103import org.apache.hadoop.hbase.master.RegionState;
104import org.apache.hadoop.hbase.master.ServerManager;
105import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
107import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
108import org.apache.hadoop.hbase.master.assignment.RegionStates;
109import org.apache.hadoop.hbase.mob.MobFileCache;
110import org.apache.hadoop.hbase.regionserver.BloomType;
111import org.apache.hadoop.hbase.regionserver.ChunkCreator;
112import org.apache.hadoop.hbase.regionserver.HRegion;
113import org.apache.hadoop.hbase.regionserver.HRegionServer;
114import org.apache.hadoop.hbase.regionserver.HStore;
115import org.apache.hadoop.hbase.regionserver.InternalScanner;
116import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
117import org.apache.hadoop.hbase.regionserver.Region;
118import org.apache.hadoop.hbase.regionserver.RegionScanner;
119import org.apache.hadoop.hbase.regionserver.RegionServerServices;
120import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
121import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
122import org.apache.hadoop.hbase.security.User;
123import org.apache.hadoop.hbase.security.UserProvider;
124import org.apache.hadoop.hbase.security.access.AccessController;
125import org.apache.hadoop.hbase.security.access.PermissionStorage;
126import org.apache.hadoop.hbase.security.access.SecureTestUtil;
127import org.apache.hadoop.hbase.security.token.TokenProvider;
128import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
129import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
130import org.apache.hadoop.hbase.util.Bytes;
131import org.apache.hadoop.hbase.util.CommonFSUtils;
132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133import org.apache.hadoop.hbase.util.FSUtils;
134import org.apache.hadoop.hbase.util.JVM;
135import org.apache.hadoop.hbase.util.JVMClusterUtil;
136import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
137import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
138import org.apache.hadoop.hbase.util.Pair;
139import org.apache.hadoop.hbase.util.ReflectionUtils;
140import org.apache.hadoop.hbase.util.RetryCounter;
141import org.apache.hadoop.hbase.util.Threads;
142import org.apache.hadoop.hbase.wal.WAL;
143import org.apache.hadoop.hbase.wal.WALFactory;
144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
145import org.apache.hadoop.hbase.zookeeper.ZKConfig;
146import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
147import org.apache.hadoop.hdfs.DFSClient;
148import org.apache.hadoop.hdfs.DistributedFileSystem;
149import org.apache.hadoop.hdfs.MiniDFSCluster;
150import org.apache.hadoop.hdfs.server.datanode.DataNode;
151import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
152import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
153import org.apache.hadoop.mapred.JobConf;
154import org.apache.hadoop.mapred.MiniMRCluster;
155import org.apache.hadoop.mapred.TaskLog;
156import org.apache.hadoop.minikdc.MiniKdc;
157import org.apache.yetus.audience.InterfaceAudience;
158import org.apache.yetus.audience.InterfaceStability;
159import org.apache.zookeeper.WatchedEvent;
160import org.apache.zookeeper.ZooKeeper;
161import org.apache.zookeeper.ZooKeeper.States;
162
163import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
164
165import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
166
167/**
168 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
169 * functionality. Create an instance and keep it around testing HBase.
170 * <p/>
171 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
172 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
173 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
174 * cluster.
175 * <p/>
176 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
177 * <p/>
178 * It does not set logging levels.
179 * <p/>
180 * In the configuration properties, default values for master-info-port and region-server-port are
181 * overridden such that a random port will be assigned (thus avoiding port contention if another
182 * local HBase instance is already running).
183 * <p/>
184 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
185 * setting it to true.
186 */
187@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
188@InterfaceStability.Evolving
189public class HBaseTestingUtil extends HBaseZKTestingUtil {
190
191  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
192
193  private MiniDFSCluster dfsCluster = null;
194  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
195
196  private volatile HBaseClusterInterface hbaseCluster = null;
197  private MiniMRCluster mrCluster = null;
198
199  /** If there is a mini cluster running for this testing utility instance. */
200  private volatile boolean miniClusterRunning;
201
202  private String hadoopLogDir;
203
204  /**
205   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
206   */
207  private Path dataTestDirOnTestFS = null;
208
209  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
210
211  /** Filesystem URI used for map-reduce mini-cluster setup */
212  private static String FS_URI;
213
214  /** This is for unit tests parameterized with a single boolean. */
215  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
216
217  /**
218   * Checks to see if a specific port is available.
219   * @param port the port number to check for availability
220   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
221   */
222  public static boolean available(int port) {
223    ServerSocket ss = null;
224    DatagramSocket ds = null;
225    try {
226      ss = new ServerSocket(port);
227      ss.setReuseAddress(true);
228      ds = new DatagramSocket(port);
229      ds.setReuseAddress(true);
230      return true;
231    } catch (IOException e) {
232      // Do nothing
233    } finally {
234      if (ds != null) {
235        ds.close();
236      }
237
238      if (ss != null) {
239        try {
240          ss.close();
241        } catch (IOException e) {
242          /* should not be thrown */
243        }
244      }
245    }
246
247    return false;
248  }
249
250  /**
251   * Create all combinations of Bloom filters and compression algorithms for testing.
252   */
253  private static List<Object[]> bloomAndCompressionCombinations() {
254    List<Object[]> configurations = new ArrayList<>();
255    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
256      for (BloomType bloomType : BloomType.values()) {
257        configurations.add(new Object[] { comprAlgo, bloomType });
258      }
259    }
260    return Collections.unmodifiableList(configurations);
261  }
262
263  /**
264   * Create combination of memstoreTS and tags
265   */
266  private static List<Object[]> memStoreTSAndTagsCombination() {
267    List<Object[]> configurations = new ArrayList<>();
268    configurations.add(new Object[] { false, false });
269    configurations.add(new Object[] { false, true });
270    configurations.add(new Object[] { true, false });
271    configurations.add(new Object[] { true, true });
272    return Collections.unmodifiableList(configurations);
273  }
274
275  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
276    List<Object[]> configurations = new ArrayList<>();
277    configurations.add(new Object[] { false, false, true });
278    configurations.add(new Object[] { false, false, false });
279    configurations.add(new Object[] { false, true, true });
280    configurations.add(new Object[] { false, true, false });
281    configurations.add(new Object[] { true, false, true });
282    configurations.add(new Object[] { true, false, false });
283    configurations.add(new Object[] { true, true, true });
284    configurations.add(new Object[] { true, true, false });
285    return Collections.unmodifiableList(configurations);
286  }
287
288  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
289    bloomAndCompressionCombinations();
290
291  /**
292   * <p>
293   * Create an HBaseTestingUtility using a default configuration.
294   * <p>
295   * Initially, all tmp files are written to a local test data directory. Once
296   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
297   * data will be written to the DFS directory instead.
298   */
299  public HBaseTestingUtil() {
300    this(HBaseConfiguration.create());
301  }
302
303  /**
304   * <p>
305   * Create an HBaseTestingUtility using a given configuration.
306   * <p>
307   * Initially, all tmp files are written to a local test data directory. Once
308   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
309   * data will be written to the DFS directory instead.
310   * @param conf The configuration to use for further operations
311   */
312  public HBaseTestingUtil(@Nullable Configuration conf) {
313    super(conf);
314
315    // a hbase checksum verification failure will cause unit tests to fail
316    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
317
318    // Save this for when setting default file:// breaks things
319    if (this.conf.get("fs.defaultFS") != null) {
320      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
321    }
322    if (this.conf.get(HConstants.HBASE_DIR) != null) {
323      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
324    }
325    // Every cluster is a local cluster until we start DFS
326    // Note that conf could be null, but this.conf will not be
327    String dataTestDir = getDataTestDir().toString();
328    this.conf.set("fs.defaultFS", "file:///");
329    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
330    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
331    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
332    // If the value for random ports isn't set set it to true, thus making
333    // tests opt-out for random port assignment
334    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
335      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
336  }
337
338  /**
339   * Close both the region {@code r} and it's underlying WAL. For use in tests.
340   */
341  public static void closeRegionAndWAL(final Region r) throws IOException {
342    closeRegionAndWAL((HRegion) r);
343  }
344
345  /**
346   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
347   */
348  public static void closeRegionAndWAL(final HRegion r) throws IOException {
349    if (r == null) return;
350    r.close();
351    if (r.getWAL() == null) return;
352    r.getWAL().close();
353  }
354
355  /**
356   * Start mini secure cluster with given kdc and principals.
357   * @param kdc              Mini kdc server
358   * @param servicePrincipal Service principal without realm.
359   * @param spnegoPrincipal  Spnego principal without realm.
360   * @return Handler to shutdown the cluster
361   */
362  public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
363    String spnegoPrincipal) throws Exception {
364    Configuration conf = getConfiguration();
365
366    SecureTestUtil.enableSecurity(conf);
367    VisibilityTestUtil.enableVisiblityLabels(conf);
368    SecureTestUtil.verifyConfiguration(conf);
369
370    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
371      AccessController.class.getName() + ',' + TokenProvider.class.getName());
372
373    HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
374      spnegoPrincipal + '@' + kdc.getRealm());
375
376    startMiniCluster();
377    try {
378      waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
379    } catch (Exception e) {
380      shutdownMiniCluster();
381      throw e;
382    }
383
384    return this::shutdownMiniCluster;
385  }
386
387  /**
388   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
389   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
390   * by the Configuration. If say, a Connection was being used against a cluster that had been
391   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
392   * Rather than use the return direct, its usually best to make a copy and use that. Do
393   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
394   * @return Instance of Configuration.
395   */
396  @Override
397  public Configuration getConfiguration() {
398    return super.getConfiguration();
399  }
400
401  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
402    this.hbaseCluster = hbaseCluster;
403  }
404
405  /**
406   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
407   * have many concurrent tests running if we need to. Moding a System property is not the way to do
408   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
409   * not anything can do about it at moment; single instance only is how the minidfscluster works.
410   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
411   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
412   * (We do not create them!).
413   * @return The calculated data test build directory, if newly-created.
414   */
415  @Override
416  protected Path setupDataTestDir() {
417    Path testPath = super.setupDataTestDir();
418    if (null == testPath) {
419      return null;
420    }
421
422    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
423
424    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
425    // we want our own value to ensure uniqueness on the same machine
426    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
427
428    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
429    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
430    return testPath;
431  }
432
433  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
434
435    String sysValue = System.getProperty(propertyName);
436
437    if (sysValue != null) {
438      // There is already a value set. So we do nothing but hope
439      // that there will be no conflicts
440      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
441        + " so I do NOT create it in " + parent);
442      String confValue = conf.get(propertyName);
443      if (confValue != null && !confValue.endsWith(sysValue)) {
444        LOG.warn(propertyName + " property value differs in configuration and system: "
445          + "Configuration=" + confValue + " while System=" + sysValue
446          + " Erasing configuration value by system value.");
447      }
448      conf.set(propertyName, sysValue);
449    } else {
450      // Ok, it's not set, so we create it as a subdirectory
451      createSubDir(propertyName, parent, subDirName);
452      System.setProperty(propertyName, conf.get(propertyName));
453    }
454  }
455
456  /**
457   * @return Where to write test data on the test filesystem; Returns working directory for the test
458   *         filesystem by default
459   * @see #setupDataTestDirOnTestFS()
460   * @see #getTestFileSystem()
461   */
462  private Path getBaseTestDirOnTestFS() throws IOException {
463    FileSystem fs = getTestFileSystem();
464    return new Path(fs.getWorkingDirectory(), "test-data");
465  }
466
467  /**
468   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
469   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
470   * on it.
471   * @return a unique path in the test filesystem
472   */
473  public Path getDataTestDirOnTestFS() throws IOException {
474    if (dataTestDirOnTestFS == null) {
475      setupDataTestDirOnTestFS();
476    }
477
478    return dataTestDirOnTestFS;
479  }
480
481  /**
482   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
483   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
484   * on it.
485   * @return a unique path in the test filesystem
486   * @param subdirName name of the subdir to create under the base test dir
487   */
488  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
489    return new Path(getDataTestDirOnTestFS(), subdirName);
490  }
491
492  /**
493   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
494   * setup.
495   */
496  private void setupDataTestDirOnTestFS() throws IOException {
497    if (dataTestDirOnTestFS != null) {
498      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
499      return;
500    }
501    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
502  }
503
504  /**
505   * Sets up a new path in test filesystem to be used by tests.
506   */
507  private Path getNewDataTestDirOnTestFS() throws IOException {
508    // The file system can be either local, mini dfs, or if the configuration
509    // is supplied externally, it can be an external cluster FS. If it is a local
510    // file system, the tests should use getBaseTestDir, otherwise, we can use
511    // the working directory, and create a unique sub dir there
512    FileSystem fs = getTestFileSystem();
513    Path newDataTestDir;
514    String randomStr = getRandomUUID().toString();
515    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
516      newDataTestDir = new Path(getDataTestDir(), randomStr);
517      File dataTestDir = new File(newDataTestDir.toString());
518      if (deleteOnExit()) dataTestDir.deleteOnExit();
519    } else {
520      Path base = getBaseTestDirOnTestFS();
521      newDataTestDir = new Path(base, randomStr);
522      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
523    }
524    return newDataTestDir;
525  }
526
527  /**
528   * Cleans the test data directory on the test filesystem.
529   * @return True if we removed the test dirs
530   */
531  public boolean cleanupDataTestDirOnTestFS() throws IOException {
532    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
533    if (ret) {
534      dataTestDirOnTestFS = null;
535    }
536    return ret;
537  }
538
539  /**
540   * Cleans a subdirectory under the test data directory on the test filesystem.
541   * @return True if we removed child
542   */
543  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
544    Path cpath = getDataTestDirOnTestFS(subdirName);
545    return getTestFileSystem().delete(cpath, true);
546  }
547
548  /**
549   * Start a minidfscluster.
550   * @param servers How many DNs to start.
551   * @see #shutdownMiniDFSCluster()
552   * @return The mini dfs cluster created.
553   */
554  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
555    return startMiniDFSCluster(servers, null);
556  }
557
558  /**
559   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
560   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
561   * instances of the datanodes will have the same host name.
562   * @param hosts hostnames DNs to run on.
563   * @see #shutdownMiniDFSCluster()
564   * @return The mini dfs cluster created.
565   */
566  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
567    if (hosts != null && hosts.length != 0) {
568      return startMiniDFSCluster(hosts.length, hosts);
569    } else {
570      return startMiniDFSCluster(1, null);
571    }
572  }
573
574  /**
575   * Start a minidfscluster. Can only create one.
576   * @param servers How many DNs to start.
577   * @param hosts   hostnames DNs to run on.
578   * @see #shutdownMiniDFSCluster()
579   * @return The mini dfs cluster created.
580   */
581  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
582    return startMiniDFSCluster(servers, null, hosts);
583  }
584
585  private void setFs() throws IOException {
586    if (this.dfsCluster == null) {
587      LOG.info("Skipping setting fs because dfsCluster is null");
588      return;
589    }
590    FileSystem fs = this.dfsCluster.getFileSystem();
591    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
592
593    // re-enable this check with dfs
594    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
595  }
596
597  // Workaround to avoid IllegalThreadStateException
598  // See HBASE-27148 for more details
599  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
600
601    private volatile boolean stopped = false;
602
603    private final MiniDFSCluster cluster;
604
605    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
606      super("FsDatasetAsyncDiskServiceFixer");
607      setDaemon(true);
608      this.cluster = cluster;
609    }
610
611    @Override
612    public void run() {
613      while (!stopped) {
614        try {
615          Thread.sleep(30000);
616        } catch (InterruptedException e) {
617          Thread.currentThread().interrupt();
618          continue;
619        }
620        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
621        // timeout of the thread pool executor is 60 seconds by default.
622        try {
623          for (DataNode dn : cluster.getDataNodes()) {
624            FsDatasetSpi<?> dataset = dn.getFSDataset();
625            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
626            service.setAccessible(true);
627            Object asyncDiskService = service.get(dataset);
628            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
629            group.setAccessible(true);
630            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
631            if (threadGroup.isDaemon()) {
632              threadGroup.setDaemon(false);
633            }
634          }
635        } catch (NoSuchFieldException e) {
636          LOG.debug("NoSuchFieldException: " + e.getMessage()
637            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
638            + "See HBASE-27595 for details.");
639        } catch (Exception e) {
640          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
641        }
642      }
643    }
644
645    void shutdown() {
646      stopped = true;
647      interrupt();
648    }
649  }
650
651  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
652    throws Exception {
653    createDirsAndSetProperties();
654    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
655
656    this.dfsCluster =
657      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
658    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
659    this.dfsClusterFixer.start();
660    // Set this just-started cluster as our filesystem.
661    setFs();
662
663    // Wait for the cluster to be totally up
664    this.dfsCluster.waitClusterUp();
665
666    // reset the test directory for test file system
667    dataTestDirOnTestFS = null;
668    String dataTestDir = getDataTestDir().toString();
669    conf.set(HConstants.HBASE_DIR, dataTestDir);
670    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
671
672    return this.dfsCluster;
673  }
674
675  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
676    createDirsAndSetProperties();
677    dfsCluster =
678      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
679    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
680    this.dfsClusterFixer.start();
681    return dfsCluster;
682  }
683
684  /**
685   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
686   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
687   * the conf.
688   *
689   * <pre>
690   * Configuration conf = TEST_UTIL.getConfiguration();
691   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
692   *   Map.Entry&lt;String, String&gt; e = i.next();
693   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
694   * }
695   * </pre>
696   */
697  private void createDirsAndSetProperties() throws IOException {
698    setupClusterTestDir();
699    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
700    createDirAndSetProperty("test.cache.data");
701    createDirAndSetProperty("hadoop.tmp.dir");
702    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
703    createDirAndSetProperty("mapreduce.cluster.local.dir");
704    createDirAndSetProperty("mapreduce.cluster.temp.dir");
705    enableShortCircuit();
706
707    Path root = getDataTestDirOnTestFS("hadoop");
708    conf.set(MapreduceTestingShim.getMROutputDirProp(),
709      new Path(root, "mapred-output-dir").toString());
710    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
711    conf.set("mapreduce.jobtracker.staging.root.dir",
712      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
713    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
714    conf.set("yarn.app.mapreduce.am.staging-dir",
715      new Path(root, "mapreduce-am-staging-root-dir").toString());
716
717    // Frustrate yarn's and hdfs's attempts at writing /tmp.
718    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
719    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
720    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
721    createDirAndSetProperty("yarn.nodemanager.log-dirs");
722    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
723    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
724    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
725    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
726    createDirAndSetProperty("dfs.journalnode.edits.dir");
727    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
728    createDirAndSetProperty("nfs.dump.dir");
729    createDirAndSetProperty("java.io.tmpdir");
730    createDirAndSetProperty("dfs.journalnode.edits.dir");
731    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
732    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
733  }
734
735  /**
736   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
737   * Default to false.
738   */
739  public boolean isNewVersionBehaviorEnabled() {
740    final String propName = "hbase.tests.new.version.behavior";
741    String v = System.getProperty(propName);
742    if (v != null) {
743      return Boolean.parseBoolean(v);
744    }
745    return false;
746  }
747
748  /**
749   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
750   * allows to specify this parameter on the command line. If not set, default is true.
751   */
752  public boolean isReadShortCircuitOn() {
753    final String propName = "hbase.tests.use.shortcircuit.reads";
754    String readOnProp = System.getProperty(propName);
755    if (readOnProp != null) {
756      return Boolean.parseBoolean(readOnProp);
757    } else {
758      return conf.getBoolean(propName, false);
759    }
760  }
761
762  /**
763   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
764   * including skipping the hdfs checksum checks.
765   */
766  private void enableShortCircuit() {
767    if (isReadShortCircuitOn()) {
768      String curUser = System.getProperty("user.name");
769      LOG.info("read short circuit is ON for user " + curUser);
770      // read short circuit, for hdfs
771      conf.set("dfs.block.local-path-access.user", curUser);
772      // read short circuit, for hbase
773      conf.setBoolean("dfs.client.read.shortcircuit", true);
774      // Skip checking checksum, for the hdfs client and the datanode
775      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
776    } else {
777      LOG.info("read short circuit is OFF");
778    }
779  }
780
781  private String createDirAndSetProperty(final String property) {
782    return createDirAndSetProperty(property, property);
783  }
784
785  private String createDirAndSetProperty(final String relPath, String property) {
786    String path = getDataTestDir(relPath).toString();
787    System.setProperty(property, path);
788    conf.set(property, path);
789    new File(path).mkdirs();
790    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
791    return path;
792  }
793
794  /**
795   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
796   */
797  public void shutdownMiniDFSCluster() throws IOException {
798    if (this.dfsCluster != null) {
799      // The below throws an exception per dn, AsynchronousCloseException.
800      this.dfsCluster.shutdown();
801      dfsCluster = null;
802      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
803      // have a fixer
804      if (dfsClusterFixer != null) {
805        this.dfsClusterFixer.shutdown();
806        dfsClusterFixer = null;
807      }
808      dataTestDirOnTestFS = null;
809      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
810    }
811  }
812
813  /**
814   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
815   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
816   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
817   * @see #startMiniCluster(StartTestingClusterOption option)
818   * @see #shutdownMiniDFSCluster()
819   */
820  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
821    StartTestingClusterOption option = StartTestingClusterOption.builder()
822      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
823    return startMiniCluster(option);
824  }
825
826  /**
827   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
828   * value can be found in {@link StartTestingClusterOption.Builder}.
829   * @see #startMiniCluster(StartTestingClusterOption option)
830   * @see #shutdownMiniDFSCluster()
831   */
832  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
833    return startMiniCluster(StartTestingClusterOption.builder().build());
834  }
835
836  /**
837   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
838   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
839   * under System property test.build.data, to be cleaned up on exit.
840   * @see #shutdownMiniDFSCluster()
841   */
842  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
843    throws Exception {
844    LOG.info("Starting up minicluster with option: {}", option);
845
846    // If we already put up a cluster, fail.
847    if (miniClusterRunning) {
848      throw new IllegalStateException("A mini-cluster is already running");
849    }
850    miniClusterRunning = true;
851
852    setupClusterTestDir();
853
854    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
855    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
856    if (dfsCluster == null) {
857      LOG.info("STARTING DFS");
858      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
859    } else {
860      LOG.info("NOT STARTING DFS");
861    }
862
863    // Start up a zk cluster.
864    if (getZkCluster() == null) {
865      startMiniZKCluster(option.getNumZkServers());
866    }
867
868    // Start the MiniHBaseCluster
869    return startMiniHBaseCluster(option);
870  }
871
872  /**
873   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
874   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
875   * @return Reference to the hbase mini hbase cluster.
876   * @see #startMiniCluster(StartTestingClusterOption)
877   * @see #shutdownMiniHBaseCluster()
878   */
879  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
880    throws IOException, InterruptedException {
881    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
882    createRootDir(option.isCreateRootDir());
883    if (option.isCreateWALDir()) {
884      createWALRootDir();
885    }
886    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
887    // for tests that do not read hbase-defaults.xml
888    setHBaseFsTmpDir();
889
890    // These settings will make the server waits until this exact number of
891    // regions servers are connected.
892    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
893      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
894    }
895    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
896      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
897    }
898
899    Configuration c = new Configuration(this.conf);
900    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
901      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
902      option.getMasterClass(), option.getRsClass());
903    // Populate the master address configuration from mini cluster configuration.
904    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
905    // Don't leave here till we've done a successful scan of the hbase:meta
906    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
907      ResultScanner s = t.getScanner(new Scan())) {
908      for (;;) {
909        if (s.next() == null) {
910          break;
911        }
912      }
913    }
914
915    getAdmin(); // create immediately the hbaseAdmin
916    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
917
918    return (SingleProcessHBaseCluster) hbaseCluster;
919  }
920
921  /**
922   * Starts up mini hbase cluster using default options. Default options can be found in
923   * {@link StartTestingClusterOption.Builder}.
924   * @see #startMiniHBaseCluster(StartTestingClusterOption)
925   * @see #shutdownMiniHBaseCluster()
926   */
927  public SingleProcessHBaseCluster startMiniHBaseCluster()
928    throws IOException, InterruptedException {
929    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
930  }
931
932  /**
933   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
934   * {@link #startMiniCluster()}. All other options will use default values, defined in
935   * {@link StartTestingClusterOption.Builder}.
936   * @param numMasters       Master node number.
937   * @param numRegionServers Number of region servers.
938   * @return The mini HBase cluster created.
939   * @see #shutdownMiniHBaseCluster()
940   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
941   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
942   * @see #startMiniHBaseCluster(StartTestingClusterOption)
943   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
944   */
945  @Deprecated
946  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
947    throws IOException, InterruptedException {
948    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
949      .numRegionServers(numRegionServers).build();
950    return startMiniHBaseCluster(option);
951  }
952
953  /**
954   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
955   * {@link #startMiniCluster()}. All other options will use default values, defined in
956   * {@link StartTestingClusterOption.Builder}.
957   * @param numMasters       Master node number.
958   * @param numRegionServers Number of region servers.
959   * @param rsPorts          Ports that RegionServer should use.
960   * @return The mini HBase cluster created.
961   * @see #shutdownMiniHBaseCluster()
962   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
963   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
964   * @see #startMiniHBaseCluster(StartTestingClusterOption)
965   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
966   */
967  @Deprecated
968  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
969    List<Integer> rsPorts) throws IOException, InterruptedException {
970    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
971      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
972    return startMiniHBaseCluster(option);
973  }
974
975  /**
976   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
977   * {@link #startMiniCluster()}. All other options will use default values, defined in
978   * {@link StartTestingClusterOption.Builder}.
979   * @param numMasters       Master node number.
980   * @param numRegionServers Number of region servers.
981   * @param rsPorts          Ports that RegionServer should use.
982   * @param masterClass      The class to use as HMaster, or null for default.
983   * @param rsClass          The class to use as HRegionServer, or null for default.
984   * @param createRootDir    Whether to create a new root or data directory path.
985   * @param createWALDir     Whether to create a new WAL directory.
986   * @return The mini HBase cluster created.
987   * @see #shutdownMiniHBaseCluster()
988   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
989   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
990   * @see #startMiniHBaseCluster(StartTestingClusterOption)
991   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
992   */
993  @Deprecated
994  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
995    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
996    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
997    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
998    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
999      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
1000      .createRootDir(createRootDir).createWALDir(createWALDir).build();
1001    return startMiniHBaseCluster(option);
1002  }
1003
1004  /**
1005   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
1006   * want to keep dfs/zk up and just stop/start hbase.
1007   * @param servers number of region servers
1008   */
1009  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1010    this.restartHBaseCluster(servers, null);
1011  }
1012
1013  public void restartHBaseCluster(int servers, List<Integer> ports)
1014    throws IOException, InterruptedException {
1015    StartTestingClusterOption option =
1016      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
1017    restartHBaseCluster(option);
1018    invalidateConnection();
1019  }
1020
1021  public void restartHBaseCluster(StartTestingClusterOption option)
1022    throws IOException, InterruptedException {
1023    closeConnection();
1024    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
1025      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
1026      option.getMasterClass(), option.getRsClass());
1027    // Don't leave here till we've done a successful scan of the hbase:meta
1028    Connection conn = ConnectionFactory.createConnection(this.conf);
1029    Table t = conn.getTable(TableName.META_TABLE_NAME);
1030    ResultScanner s = t.getScanner(new Scan());
1031    while (s.next() != null) {
1032      // do nothing
1033    }
1034    LOG.info("HBase has been restarted");
1035    s.close();
1036    t.close();
1037    conn.close();
1038  }
1039
1040  /**
1041   * Returns current mini hbase cluster. Only has something in it after a call to
1042   * {@link #startMiniCluster()}.
1043   * @see #startMiniCluster()
1044   */
1045  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1046    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1047      return (SingleProcessHBaseCluster) this.hbaseCluster;
1048    }
1049    throw new RuntimeException(
1050      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1051  }
1052
1053  /**
1054   * Stops mini hbase, zk, and hdfs clusters.
1055   * @see #startMiniCluster(int)
1056   */
1057  public void shutdownMiniCluster() throws IOException {
1058    LOG.info("Shutting down minicluster");
1059    shutdownMiniHBaseCluster();
1060    shutdownMiniDFSCluster();
1061    shutdownMiniZKCluster();
1062
1063    cleanupTestDir();
1064    miniClusterRunning = false;
1065    LOG.info("Minicluster is down");
1066  }
1067
1068  /**
1069   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1070   * @throws java.io.IOException in case command is unsuccessful
1071   */
1072  public void shutdownMiniHBaseCluster() throws IOException {
1073    cleanup();
1074    if (this.hbaseCluster != null) {
1075      this.hbaseCluster.shutdown();
1076      // Wait till hbase is down before going on to shutdown zk.
1077      this.hbaseCluster.waitUntilShutDown();
1078      this.hbaseCluster = null;
1079    }
1080    if (zooKeeperWatcher != null) {
1081      zooKeeperWatcher.close();
1082      zooKeeperWatcher = null;
1083    }
1084  }
1085
1086  /**
1087   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1088   * @throws java.io.IOException throws in case command is unsuccessful
1089   */
1090  public void killMiniHBaseCluster() throws IOException {
1091    cleanup();
1092    if (this.hbaseCluster != null) {
1093      getMiniHBaseCluster().killAll();
1094      this.hbaseCluster = null;
1095    }
1096    if (zooKeeperWatcher != null) {
1097      zooKeeperWatcher.close();
1098      zooKeeperWatcher = null;
1099    }
1100  }
1101
1102  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1103  private void cleanup() throws IOException {
1104    closeConnection();
1105    // unset the configuration for MIN and MAX RS to start
1106    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1107    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1108  }
1109
1110  /**
1111   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1112   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1113   * If false, previous path is used. Note: this does not cause the root dir to be created.
1114   * @return Fully qualified path for the default hbase root dir
1115   */
1116  public Path getDefaultRootDirPath(boolean create) throws IOException {
1117    if (!create) {
1118      return getDataTestDirOnTestFS();
1119    } else {
1120      return getNewDataTestDirOnTestFS();
1121    }
1122  }
1123
1124  /**
1125   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1126   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1127   * @return Fully qualified path for the default hbase root dir
1128   */
1129  public Path getDefaultRootDirPath() throws IOException {
1130    return getDefaultRootDirPath(false);
1131  }
1132
1133  /**
1134   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1135   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1136   * startup. You'd only use this method if you were doing manual operation.
1137   * @param create This flag decides whether to get a new root or data directory path or not, if it
1138   *               has been fetched already. Note : Directory will be made irrespective of whether
1139   *               path has been fetched or not. If directory already exists, it will be overwritten
1140   * @return Fully qualified path to hbase root dir
1141   */
1142  public Path createRootDir(boolean create) throws IOException {
1143    FileSystem fs = FileSystem.get(this.conf);
1144    Path hbaseRootdir = getDefaultRootDirPath(create);
1145    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1146    fs.mkdirs(hbaseRootdir);
1147    FSUtils.setVersion(fs, hbaseRootdir);
1148    return hbaseRootdir;
1149  }
1150
1151  /**
1152   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1153   * flag is false.
1154   * @return Fully qualified path to hbase root dir
1155   */
1156  public Path createRootDir() throws IOException {
1157    return createRootDir(false);
1158  }
1159
1160  /**
1161   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1162   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1163   * this method if you were doing manual operation.
1164   * @return Fully qualified path to hbase root dir
1165   */
1166  public Path createWALRootDir() throws IOException {
1167    FileSystem fs = FileSystem.get(this.conf);
1168    Path walDir = getNewDataTestDirOnTestFS();
1169    CommonFSUtils.setWALRootDir(this.conf, walDir);
1170    fs.mkdirs(walDir);
1171    return walDir;
1172  }
1173
1174  private void setHBaseFsTmpDir() throws IOException {
1175    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1176    if (hbaseFsTmpDirInString == null) {
1177      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1178      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1179    } else {
1180      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1181    }
1182  }
1183
1184  /**
1185   * Flushes all caches in the mini hbase cluster
1186   */
1187  public void flush() throws IOException {
1188    getMiniHBaseCluster().flushcache();
1189  }
1190
1191  /**
1192   * Flushes all caches in the mini hbase cluster
1193   */
1194  public void flush(TableName tableName) throws IOException {
1195    getMiniHBaseCluster().flushcache(tableName);
1196  }
1197
1198  /**
1199   * Compact all regions in the mini hbase cluster
1200   */
1201  public void compact(boolean major) throws IOException {
1202    getMiniHBaseCluster().compact(major);
1203  }
1204
1205  /**
1206   * Compact all of a table's reagion in the mini hbase cluster
1207   */
1208  public void compact(TableName tableName, boolean major) throws IOException {
1209    getMiniHBaseCluster().compact(tableName, major);
1210  }
1211
1212  /**
1213   * Create a table.
1214   * @return A Table instance for the created table.
1215   */
1216  public Table createTable(TableName tableName, String family) throws IOException {
1217    return createTable(tableName, new String[] { family });
1218  }
1219
1220  /**
1221   * Create a table.
1222   * @return A Table instance for the created table.
1223   */
1224  public Table createTable(TableName tableName, String[] families) throws IOException {
1225    List<byte[]> fams = new ArrayList<>(families.length);
1226    for (String family : families) {
1227      fams.add(Bytes.toBytes(family));
1228    }
1229    return createTable(tableName, fams.toArray(new byte[0][]));
1230  }
1231
1232  /**
1233   * Create a table.
1234   * @return A Table instance for the created table.
1235   */
1236  public Table createTable(TableName tableName, byte[] family) throws IOException {
1237    return createTable(tableName, new byte[][] { family });
1238  }
1239
1240  /**
1241   * Create a table with multiple regions.
1242   * @return A Table instance for the created table.
1243   */
1244  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1245    throws IOException {
1246    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1247    byte[] startKey = Bytes.toBytes("aaaaa");
1248    byte[] endKey = Bytes.toBytes("zzzzz");
1249    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1250
1251    return createTable(tableName, new byte[][] { family }, splitKeys);
1252  }
1253
1254  /**
1255   * Create a table.
1256   * @return A Table instance for the created table.
1257   */
1258  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1259    return createTable(tableName, families, (byte[][]) null);
1260  }
1261
1262  /**
1263   * Create a table with multiple regions.
1264   * @return A Table instance for the created table.
1265   */
1266  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1267    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1268  }
1269
1270  /**
1271   * Create a table with multiple regions.
1272   * @param replicaCount replica count.
1273   * @return A Table instance for the created table.
1274   */
1275  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1276    throws IOException {
1277    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1278  }
1279
1280  /**
1281   * Create a table.
1282   * @return A Table instance for the created table.
1283   */
1284  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1285    throws IOException {
1286    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1287  }
1288
1289  /**
1290   * Create a table.
1291   * @param tableName    the table name
1292   * @param families     the families
1293   * @param splitKeys    the splitkeys
1294   * @param replicaCount the region replica count
1295   * @return A Table instance for the created table.
1296   * @throws IOException throws IOException
1297   */
1298  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1299    int replicaCount) throws IOException {
1300    return createTable(tableName, families, splitKeys, replicaCount,
1301      new Configuration(getConfiguration()));
1302  }
1303
1304  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1305    byte[] endKey, int numRegions) throws IOException {
1306    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1307
1308    getAdmin().createTable(desc, startKey, endKey, numRegions);
1309    // HBaseAdmin only waits for regions to appear in hbase:meta we
1310    // should wait until they are assigned
1311    waitUntilAllRegionsAssigned(tableName);
1312    return getConnection().getTable(tableName);
1313  }
1314
1315  /**
1316   * Create a table.
1317   * @param c Configuration to use
1318   * @return A Table instance for the created table.
1319   */
1320  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1321    throws IOException {
1322    return createTable(htd, families, null, c);
1323  }
1324
1325  /**
1326   * Create a table.
1327   * @param htd       table descriptor
1328   * @param families  array of column families
1329   * @param splitKeys array of split keys
1330   * @param c         Configuration to use
1331   * @return A Table instance for the created table.
1332   * @throws IOException if getAdmin or createTable fails
1333   */
1334  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1335    Configuration c) throws IOException {
1336    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1337    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1338    // on is interfering.
1339    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1340  }
1341
1342  /**
1343   * Create a table.
1344   * @param htd       table descriptor
1345   * @param families  array of column families
1346   * @param splitKeys array of split keys
1347   * @param type      Bloom type
1348   * @param blockSize block size
1349   * @param c         Configuration to use
1350   * @return A Table instance for the created table.
1351   * @throws IOException if getAdmin or createTable fails
1352   */
1353
1354  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1355    BloomType type, int blockSize, Configuration c) throws IOException {
1356    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1357    for (byte[] family : families) {
1358      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1359        .setBloomFilterType(type).setBlocksize(blockSize);
1360      if (isNewVersionBehaviorEnabled()) {
1361        cfdb.setNewVersionBehavior(true);
1362      }
1363      builder.setColumnFamily(cfdb.build());
1364    }
1365    TableDescriptor td = builder.build();
1366    if (splitKeys != null) {
1367      getAdmin().createTable(td, splitKeys);
1368    } else {
1369      getAdmin().createTable(td);
1370    }
1371    // HBaseAdmin only waits for regions to appear in hbase:meta
1372    // we should wait until they are assigned
1373    waitUntilAllRegionsAssigned(td.getTableName());
1374    return getConnection().getTable(td.getTableName());
1375  }
1376
1377  /**
1378   * Create a table.
1379   * @param htd       table descriptor
1380   * @param splitRows array of split keys
1381   * @return A Table instance for the created table.
1382   */
1383  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1384    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1385    if (isNewVersionBehaviorEnabled()) {
1386      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1387        builder.setColumnFamily(
1388          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1389      }
1390    }
1391    if (splitRows != null) {
1392      getAdmin().createTable(builder.build(), splitRows);
1393    } else {
1394      getAdmin().createTable(builder.build());
1395    }
1396    // HBaseAdmin only waits for regions to appear in hbase:meta
1397    // we should wait until they are assigned
1398    waitUntilAllRegionsAssigned(htd.getTableName());
1399    return getConnection().getTable(htd.getTableName());
1400  }
1401
1402  /**
1403   * Create a table.
1404   * @param tableName    the table name
1405   * @param families     the families
1406   * @param splitKeys    the split keys
1407   * @param replicaCount the replica count
1408   * @param c            Configuration to use
1409   * @return A Table instance for the created table.
1410   */
1411  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1412    int replicaCount, final Configuration c) throws IOException {
1413    TableDescriptor htd =
1414      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1415    return createTable(htd, families, splitKeys, c);
1416  }
1417
1418  /**
1419   * Create a table.
1420   * @return A Table instance for the created table.
1421   */
1422  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1423    return createTable(tableName, new byte[][] { family }, numVersions);
1424  }
1425
1426  /**
1427   * Create a table.
1428   * @return A Table instance for the created table.
1429   */
1430  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1431    throws IOException {
1432    return createTable(tableName, families, numVersions, (byte[][]) null);
1433  }
1434
1435  /**
1436   * Create a table.
1437   * @return A Table instance for the created table.
1438   */
1439  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1440    byte[][] splitKeys) throws IOException {
1441    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1442    for (byte[] family : families) {
1443      ColumnFamilyDescriptorBuilder cfBuilder =
1444        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1445      if (isNewVersionBehaviorEnabled()) {
1446        cfBuilder.setNewVersionBehavior(true);
1447      }
1448      builder.setColumnFamily(cfBuilder.build());
1449    }
1450    if (splitKeys != null) {
1451      getAdmin().createTable(builder.build(), splitKeys);
1452    } else {
1453      getAdmin().createTable(builder.build());
1454    }
1455    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1456    // assigned
1457    waitUntilAllRegionsAssigned(tableName);
1458    return getConnection().getTable(tableName);
1459  }
1460
1461  /**
1462   * Create a table with multiple regions.
1463   * @return A Table instance for the created table.
1464   */
1465  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1466    throws IOException {
1467    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1468  }
1469
1470  /**
1471   * Create a table.
1472   * @return A Table instance for the created table.
1473   */
1474  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1475    throws IOException {
1476    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1477    for (byte[] family : families) {
1478      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1479        .setMaxVersions(numVersions).setBlocksize(blockSize);
1480      if (isNewVersionBehaviorEnabled()) {
1481        cfBuilder.setNewVersionBehavior(true);
1482      }
1483      builder.setColumnFamily(cfBuilder.build());
1484    }
1485    getAdmin().createTable(builder.build());
1486    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1487    // assigned
1488    waitUntilAllRegionsAssigned(tableName);
1489    return getConnection().getTable(tableName);
1490  }
1491
1492  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1493    String cpName) throws IOException {
1494    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1495    for (byte[] family : families) {
1496      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1497        .setMaxVersions(numVersions).setBlocksize(blockSize);
1498      if (isNewVersionBehaviorEnabled()) {
1499        cfBuilder.setNewVersionBehavior(true);
1500      }
1501      builder.setColumnFamily(cfBuilder.build());
1502    }
1503    if (cpName != null) {
1504      builder.setCoprocessor(cpName);
1505    }
1506    getAdmin().createTable(builder.build());
1507    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1508    // assigned
1509    waitUntilAllRegionsAssigned(tableName);
1510    return getConnection().getTable(tableName);
1511  }
1512
1513  /**
1514   * Create a table.
1515   * @return A Table instance for the created table.
1516   */
1517  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1518    throws IOException {
1519    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1520    int i = 0;
1521    for (byte[] family : families) {
1522      ColumnFamilyDescriptorBuilder cfBuilder =
1523        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1524      if (isNewVersionBehaviorEnabled()) {
1525        cfBuilder.setNewVersionBehavior(true);
1526      }
1527      builder.setColumnFamily(cfBuilder.build());
1528      i++;
1529    }
1530    getAdmin().createTable(builder.build());
1531    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1532    // assigned
1533    waitUntilAllRegionsAssigned(tableName);
1534    return getConnection().getTable(tableName);
1535  }
1536
1537  /**
1538   * Create a table.
1539   * @return A Table instance for the created table.
1540   */
1541  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1542    throws IOException {
1543    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1544    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1545    if (isNewVersionBehaviorEnabled()) {
1546      cfBuilder.setNewVersionBehavior(true);
1547    }
1548    builder.setColumnFamily(cfBuilder.build());
1549    getAdmin().createTable(builder.build(), splitRows);
1550    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1551    // assigned
1552    waitUntilAllRegionsAssigned(tableName);
1553    return getConnection().getTable(tableName);
1554  }
1555
1556  /**
1557   * Create a table with multiple regions.
1558   * @return A Table instance for the created table.
1559   */
1560  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1561    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1562  }
1563
1564  /**
1565   * Set the number of Region replicas.
1566   */
1567  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1568    throws IOException, InterruptedException {
1569    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1570      .setRegionReplication(replicaCount).build();
1571    admin.modifyTable(desc);
1572  }
1573
1574  /**
1575   * Set the number of Region replicas.
1576   */
1577  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1578    throws ExecutionException, IOException, InterruptedException {
1579    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1580      .setRegionReplication(replicaCount).build();
1581    admin.modifyTable(desc).get();
1582  }
1583
1584  /**
1585   * Drop an existing table
1586   * @param tableName existing table
1587   */
1588  public void deleteTable(TableName tableName) throws IOException {
1589    try {
1590      getAdmin().disableTable(tableName);
1591    } catch (TableNotEnabledException e) {
1592      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1593    }
1594    getAdmin().deleteTable(tableName);
1595  }
1596
1597  /**
1598   * Drop an existing table
1599   * @param tableName existing table
1600   */
1601  public void deleteTableIfAny(TableName tableName) throws IOException {
1602    try {
1603      deleteTable(tableName);
1604    } catch (TableNotFoundException e) {
1605      // ignore
1606    }
1607  }
1608
1609  // ==========================================================================
1610  // Canned table and table descriptor creation
1611
1612  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1613  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1614  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1615  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1616  private static final int MAXVERSIONS = 3;
1617
1618  public static final char FIRST_CHAR = 'a';
1619  public static final char LAST_CHAR = 'z';
1620  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1621  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1622
1623  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1624    return createModifyableTableDescriptor(TableName.valueOf(name),
1625      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1626      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1627  }
1628
1629  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1630    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1631    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1632    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1633      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1634        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1635        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1636      if (isNewVersionBehaviorEnabled()) {
1637        cfBuilder.setNewVersionBehavior(true);
1638      }
1639      builder.setColumnFamily(cfBuilder.build());
1640    }
1641    return builder.build();
1642  }
1643
1644  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1645    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1646    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1647    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1648      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1649        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1650        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1651      if (isNewVersionBehaviorEnabled()) {
1652        cfBuilder.setNewVersionBehavior(true);
1653      }
1654      builder.setColumnFamily(cfBuilder.build());
1655    }
1656    return builder;
1657  }
1658
1659  /**
1660   * Create a table of name <code>name</code>.
1661   * @param name Name to give table.
1662   * @return Column descriptor.
1663   */
1664  public TableDescriptor createTableDescriptor(final TableName name) {
1665    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1666      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1667  }
1668
1669  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1670    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1671  }
1672
1673  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1674    int maxVersions) {
1675    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1676    for (byte[] family : families) {
1677      ColumnFamilyDescriptorBuilder cfBuilder =
1678        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1679      if (isNewVersionBehaviorEnabled()) {
1680        cfBuilder.setNewVersionBehavior(true);
1681      }
1682      builder.setColumnFamily(cfBuilder.build());
1683    }
1684    return builder.build();
1685  }
1686
1687  /**
1688   * Create an HRegion that writes to the local tmp dirs
1689   * @param desc     a table descriptor indicating which table the region belongs to
1690   * @param startKey the start boundary of the region
1691   * @param endKey   the end boundary of the region
1692   * @return a region that writes to local dir for testing
1693   */
1694  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1695    throws IOException {
1696    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1697      .setEndKey(endKey).build();
1698    return createLocalHRegion(hri, desc);
1699  }
1700
1701  /**
1702   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1703   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1704   */
1705  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1706    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1707  }
1708
1709  /**
1710   * Create an HRegion that writes to the local tmp dirs with specified wal
1711   * @param info regioninfo
1712   * @param conf configuration
1713   * @param desc table descriptor
1714   * @param wal  wal for this region.
1715   * @return created hregion
1716   */
1717  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1718    WAL wal) throws IOException {
1719    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1720      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1721    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1722  }
1723
1724  /**
1725   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1726   *         when done.
1727   */
1728  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1729    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1730    throws IOException {
1731    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1732      durability, wal, null, families);
1733  }
1734
1735  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1736    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1737    boolean[] compactedMemStore, byte[]... families) throws IOException {
1738    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1739    builder.setReadOnly(isReadOnly);
1740    int i = 0;
1741    for (byte[] family : families) {
1742      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1743      if (compactedMemStore != null && i < compactedMemStore.length) {
1744        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1745      } else {
1746        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1747
1748      }
1749      i++;
1750      // Set default to be three versions.
1751      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1752      builder.setColumnFamily(cfBuilder.build());
1753    }
1754    builder.setDurability(durability);
1755    RegionInfo info =
1756      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1757    return createLocalHRegion(info, conf, builder.build(), wal);
1758  }
1759
1760  //
1761  // ==========================================================================
1762
1763  /**
1764   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1765   * read.
1766   * @param tableName existing table
1767   * @return HTable to that new table
1768   */
1769  public Table deleteTableData(TableName tableName) throws IOException {
1770    Table table = getConnection().getTable(tableName);
1771    Scan scan = new Scan();
1772    ResultScanner resScan = table.getScanner(scan);
1773    for (Result res : resScan) {
1774      Delete del = new Delete(res.getRow());
1775      table.delete(del);
1776    }
1777    resScan = table.getScanner(scan);
1778    resScan.close();
1779    return table;
1780  }
1781
1782  /**
1783   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1784   * table.
1785   * @param tableName       table which must exist.
1786   * @param preserveRegions keep the existing split points
1787   * @return HTable for the new table
1788   */
1789  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1790    throws IOException {
1791    Admin admin = getAdmin();
1792    if (!admin.isTableDisabled(tableName)) {
1793      admin.disableTable(tableName);
1794    }
1795    admin.truncateTable(tableName, preserveRegions);
1796    return getConnection().getTable(tableName);
1797  }
1798
1799  /**
1800   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1801   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1802   * preserve regions of existing table.
1803   * @param tableName table which must exist.
1804   * @return HTable for the new table
1805   */
1806  public Table truncateTable(final TableName tableName) throws IOException {
1807    return truncateTable(tableName, false);
1808  }
1809
1810  /**
1811   * Load table with rows from 'aaa' to 'zzz'.
1812   * @param t Table
1813   * @param f Family
1814   * @return Count of rows loaded.
1815   */
1816  public int loadTable(final Table t, final byte[] f) throws IOException {
1817    return loadTable(t, new byte[][] { f });
1818  }
1819
1820  /**
1821   * Load table with rows from 'aaa' to 'zzz'.
1822   * @param t Table
1823   * @param f Family
1824   * @return Count of rows loaded.
1825   */
1826  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1827    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1828  }
1829
1830  /**
1831   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1832   * @param t Table
1833   * @param f Array of Families to load
1834   * @return Count of rows loaded.
1835   */
1836  public int loadTable(final Table t, final byte[][] f) throws IOException {
1837    return loadTable(t, f, null);
1838  }
1839
1840  /**
1841   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1842   * @param t     Table
1843   * @param f     Array of Families to load
1844   * @param value the values of the cells. If null is passed, the row key is used as value
1845   * @return Count of rows loaded.
1846   */
1847  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1848    return loadTable(t, f, value, true);
1849  }
1850
1851  /**
1852   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1853   * @param t     Table
1854   * @param f     Array of Families to load
1855   * @param value the values of the cells. If null is passed, the row key is used as value
1856   * @return Count of rows loaded.
1857   */
1858  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1859    throws IOException {
1860    List<Put> puts = new ArrayList<>();
1861    for (byte[] row : HBaseTestingUtil.ROWS) {
1862      Put put = new Put(row);
1863      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1864      for (int i = 0; i < f.length; i++) {
1865        byte[] value1 = value != null ? value : row;
1866        put.addColumn(f[i], f[i], value1);
1867      }
1868      puts.add(put);
1869    }
1870    t.put(puts);
1871    return puts.size();
1872  }
1873
1874  /**
1875   * A tracker for tracking and validating table rows generated with
1876   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1877   */
1878  public static class SeenRowTracker {
1879    int dim = 'z' - 'a' + 1;
1880    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1881    byte[] startRow;
1882    byte[] stopRow;
1883
1884    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1885      this.startRow = startRow;
1886      this.stopRow = stopRow;
1887    }
1888
1889    void reset() {
1890      for (byte[] row : ROWS) {
1891        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1892      }
1893    }
1894
1895    int i(byte b) {
1896      return b - 'a';
1897    }
1898
1899    public void addRow(byte[] row) {
1900      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1901    }
1902
1903    /**
1904     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1905     * rows none
1906     */
1907    public void validate() {
1908      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1909        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1910          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1911            int count = seenRows[i(b1)][i(b2)][i(b3)];
1912            int expectedCount = 0;
1913            if (
1914              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1915                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1916            ) {
1917              expectedCount = 1;
1918            }
1919            if (count != expectedCount) {
1920              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1921              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1922                + "instead of " + expectedCount);
1923            }
1924          }
1925        }
1926      }
1927    }
1928  }
1929
1930  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1931    return loadRegion(r, f, false);
1932  }
1933
1934  public int loadRegion(final Region r, final byte[] f) throws IOException {
1935    return loadRegion((HRegion) r, f);
1936  }
1937
1938  /**
1939   * Load region with rows from 'aaa' to 'zzz'.
1940   * @param r     Region
1941   * @param f     Family
1942   * @param flush flush the cache if true
1943   * @return Count of rows loaded.
1944   */
1945  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1946    byte[] k = new byte[3];
1947    int rowCount = 0;
1948    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1949      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1950        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1951          k[0] = b1;
1952          k[1] = b2;
1953          k[2] = b3;
1954          Put put = new Put(k);
1955          put.setDurability(Durability.SKIP_WAL);
1956          put.addColumn(f, null, k);
1957          if (r.getWAL() == null) {
1958            put.setDurability(Durability.SKIP_WAL);
1959          }
1960          int preRowCount = rowCount;
1961          int pause = 10;
1962          int maxPause = 1000;
1963          while (rowCount == preRowCount) {
1964            try {
1965              r.put(put);
1966              rowCount++;
1967            } catch (RegionTooBusyException e) {
1968              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1969              Threads.sleep(pause);
1970            }
1971          }
1972        }
1973      }
1974      if (flush) {
1975        r.flush(true);
1976      }
1977    }
1978    return rowCount;
1979  }
1980
1981  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1982    throws IOException {
1983    for (int i = startRow; i < endRow; i++) {
1984      byte[] data = Bytes.toBytes(String.valueOf(i));
1985      Put put = new Put(data);
1986      put.addColumn(f, null, data);
1987      t.put(put);
1988    }
1989  }
1990
1991  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1992    throws IOException {
1993    for (int i = 0; i < totalRows; i++) {
1994      byte[] row = new byte[rowSize];
1995      Bytes.random(row);
1996      Put put = new Put(row);
1997      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1998      t.put(put);
1999    }
2000  }
2001
2002  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2003    int replicaId) throws IOException {
2004    for (int i = startRow; i < endRow; i++) {
2005      String failMsg = "Failed verification of row :" + i;
2006      byte[] data = Bytes.toBytes(String.valueOf(i));
2007      Get get = new Get(data);
2008      get.setReplicaId(replicaId);
2009      get.setConsistency(Consistency.TIMELINE);
2010      Result result = table.get(get);
2011      assertTrue(failMsg, result.containsColumn(f, null));
2012      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2013      Cell cell = result.getColumnLatestCell(f, null);
2014      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2015        cell.getValueOffset(), cell.getValueLength()));
2016    }
2017  }
2018
2019  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2020    throws IOException {
2021    verifyNumericRows((HRegion) region, f, startRow, endRow);
2022  }
2023
2024  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2025    throws IOException {
2026    verifyNumericRows(region, f, startRow, endRow, true);
2027  }
2028
2029  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2030    final boolean present) throws IOException {
2031    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
2032  }
2033
2034  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2035    final boolean present) throws IOException {
2036    for (int i = startRow; i < endRow; i++) {
2037      String failMsg = "Failed verification of row :" + i;
2038      byte[] data = Bytes.toBytes(String.valueOf(i));
2039      Result result = region.get(new Get(data));
2040
2041      boolean hasResult = result != null && !result.isEmpty();
2042      assertEquals(failMsg + result, present, hasResult);
2043      if (!present) continue;
2044
2045      assertTrue(failMsg, result.containsColumn(f, null));
2046      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2047      Cell cell = result.getColumnLatestCell(f, null);
2048      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2049        cell.getValueOffset(), cell.getValueLength()));
2050    }
2051  }
2052
2053  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2054    throws IOException {
2055    for (int i = startRow; i < endRow; i++) {
2056      byte[] data = Bytes.toBytes(String.valueOf(i));
2057      Delete delete = new Delete(data);
2058      delete.addFamily(f);
2059      t.delete(delete);
2060    }
2061  }
2062
2063  /**
2064   * Return the number of rows in the given table.
2065   * @param table to count rows
2066   * @return count of rows
2067   */
2068  public static int countRows(final Table table) throws IOException {
2069    return countRows(table, new Scan());
2070  }
2071
2072  public static int countRows(final Table table, final Scan scan) throws IOException {
2073    try (ResultScanner results = table.getScanner(scan)) {
2074      int count = 0;
2075      while (results.next() != null) {
2076        count++;
2077      }
2078      return count;
2079    }
2080  }
2081
2082  public static int countRows(final Table table, final byte[]... families) throws IOException {
2083    Scan scan = new Scan();
2084    for (byte[] family : families) {
2085      scan.addFamily(family);
2086    }
2087    return countRows(table, scan);
2088  }
2089
2090  /**
2091   * Return the number of rows in the given table.
2092   */
2093  public int countRows(final TableName tableName) throws IOException {
2094    try (Table table = getConnection().getTable(tableName)) {
2095      return countRows(table);
2096    }
2097  }
2098
2099  public static int countRows(final Region region) throws IOException {
2100    return countRows(region, new Scan());
2101  }
2102
2103  public static int countRows(final Region region, final Scan scan) throws IOException {
2104    try (InternalScanner scanner = region.getScanner(scan)) {
2105      return countRows(scanner);
2106    }
2107  }
2108
2109  public static int countRows(final InternalScanner scanner) throws IOException {
2110    int scannedCount = 0;
2111    List<Cell> results = new ArrayList<>();
2112    boolean hasMore = true;
2113    while (hasMore) {
2114      hasMore = scanner.next(results);
2115      scannedCount += results.size();
2116      results.clear();
2117    }
2118    return scannedCount;
2119  }
2120
2121  /**
2122   * Return an md5 digest of the entire contents of a table.
2123   */
2124  public String checksumRows(final Table table) throws Exception {
2125    MessageDigest digest = MessageDigest.getInstance("MD5");
2126    try (ResultScanner results = table.getScanner(new Scan())) {
2127      for (Result res : results) {
2128        digest.update(res.getRow());
2129      }
2130    }
2131    return digest.toString();
2132  }
2133
2134  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2135  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2136  static {
2137    int i = 0;
2138    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2139      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2140        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2141          ROWS[i][0] = b1;
2142          ROWS[i][1] = b2;
2143          ROWS[i][2] = b3;
2144          i++;
2145        }
2146      }
2147    }
2148  }
2149
2150  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2151    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2152    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2153    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2154    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2155    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2156    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2157
2158  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2159    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2160    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2161    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2162    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2163    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2164    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2165
2166  /**
2167   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2168   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2169   * @return list of region info for regions added to meta
2170   */
2171  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2172    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2173    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2174      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2175      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2176      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2177        TableState.State.ENABLED);
2178      // add custom ones
2179      for (int i = 0; i < startKeys.length; i++) {
2180        int j = (i + 1) % startKeys.length;
2181        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2182          .setEndKey(startKeys[j]).build();
2183        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2184        newRegions.add(hri);
2185      }
2186      return newRegions;
2187    }
2188  }
2189
2190  /**
2191   * Create an unmanaged WAL. Be sure to close it when you're through.
2192   */
2193  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2194    throws IOException {
2195    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2196    // unless I pass along via the conf.
2197    Configuration confForWAL = new Configuration(conf);
2198    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2199    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2200  }
2201
2202  /**
2203   * Create a region with it's own WAL. Be sure to call
2204   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2205   */
2206  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2207    final Configuration conf, final TableDescriptor htd) throws IOException {
2208    return createRegionAndWAL(info, rootDir, conf, htd, true);
2209  }
2210
2211  /**
2212   * Create a region with it's own WAL. Be sure to call
2213   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2214   */
2215  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2216    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2217    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2218    region.setBlockCache(blockCache);
2219    region.initialize();
2220    return region;
2221  }
2222
2223  /**
2224   * Create a region with it's own WAL. Be sure to call
2225   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2226   */
2227  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2228    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2229    throws IOException {
2230    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2231    region.setMobFileCache(mobFileCache);
2232    region.initialize();
2233    return region;
2234  }
2235
2236  /**
2237   * Create a region with it's own WAL. Be sure to call
2238   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2239   */
2240  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2241    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2242    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2243      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2244    WAL wal = createWal(conf, rootDir, info);
2245    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2246  }
2247
2248  /**
2249   * Find any other region server which is different from the one identified by parameter
2250   * @return another region server
2251   */
2252  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2253    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2254      if (!(rst.getRegionServer() == rs)) {
2255        return rst.getRegionServer();
2256      }
2257    }
2258    return null;
2259  }
2260
2261  /**
2262   * Tool to get the reference to the region server object that holds the region of the specified
2263   * user table.
2264   * @param tableName user table to lookup in hbase:meta
2265   * @return region server that holds it, null if the row doesn't exist
2266   */
2267  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2268    throws IOException, InterruptedException {
2269    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2270    if (regions == null || regions.isEmpty()) {
2271      return null;
2272    }
2273    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2274
2275    byte[] firstRegionName =
2276      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2277        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2278
2279    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2280    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2281      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2282    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2283      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2284    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2285    while (retrier.shouldRetry()) {
2286      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2287      if (index != -1) {
2288        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2289      }
2290      // Came back -1. Region may not be online yet. Sleep a while.
2291      retrier.sleepUntilNextRetry();
2292    }
2293    return null;
2294  }
2295
2296  /**
2297   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2298   * @throws IOException When starting the cluster fails.
2299   */
2300  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2301    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2302    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2303      "99.0");
2304    startMiniMapReduceCluster(2);
2305    return mrCluster;
2306  }
2307
2308  /**
2309   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2310   * internal static LOG_DIR variable.
2311   */
2312  private void forceChangeTaskLogDir() {
2313    Field logDirField;
2314    try {
2315      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2316      logDirField.setAccessible(true);
2317
2318      Field modifiersField = ReflectionUtils.getModifiersField();
2319      modifiersField.setAccessible(true);
2320      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2321
2322      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2323    } catch (SecurityException e) {
2324      throw new RuntimeException(e);
2325    } catch (NoSuchFieldException e) {
2326      throw new RuntimeException(e);
2327    } catch (IllegalArgumentException e) {
2328      throw new RuntimeException(e);
2329    } catch (IllegalAccessException e) {
2330      throw new RuntimeException(e);
2331    }
2332  }
2333
2334  /**
2335   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2336   * filesystem.
2337   * @param servers The number of <code>TaskTracker</code>'s to start.
2338   * @throws IOException When starting the cluster fails.
2339   */
2340  private void startMiniMapReduceCluster(final int servers) throws IOException {
2341    if (mrCluster != null) {
2342      throw new IllegalStateException("MiniMRCluster is already running");
2343    }
2344    LOG.info("Starting mini mapreduce cluster...");
2345    setupClusterTestDir();
2346    createDirsAndSetProperties();
2347
2348    forceChangeTaskLogDir();
2349
2350    //// hadoop2 specific settings
2351    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2352    // we up the VM usable so that processes don't get killed.
2353    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2354
2355    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2356    // this avoids the problem by disabling speculative task execution in tests.
2357    conf.setBoolean("mapreduce.map.speculative", false);
2358    conf.setBoolean("mapreduce.reduce.speculative", false);
2359    ////
2360
2361    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2362    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2363    if (JVM.getJVMSpecVersion() >= 17) {
2364      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2365      conf.set("yarn.app.mapreduce.am.command-opts",
2366        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2367    }
2368
2369    // Allow the user to override FS URI for this map-reduce cluster to use.
2370    mrCluster =
2371      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2372        1, null, null, new JobConf(this.conf));
2373    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2374    if (jobConf == null) {
2375      jobConf = mrCluster.createJobConf();
2376    }
2377
2378    // Hadoop MiniMR overwrites this while it should not
2379    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2380    LOG.info("Mini mapreduce cluster started");
2381
2382    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2383    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2384    // necessary config properties here. YARN-129 required adding a few properties.
2385    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2386    // this for mrv2 support; mr1 ignores this
2387    conf.set("mapreduce.framework.name", "yarn");
2388    conf.setBoolean("yarn.is.minicluster", true);
2389    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2390    if (rmAddress != null) {
2391      conf.set("yarn.resourcemanager.address", rmAddress);
2392    }
2393    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2394    if (historyAddress != null) {
2395      conf.set("mapreduce.jobhistory.address", historyAddress);
2396    }
2397    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2398    if (schedulerAddress != null) {
2399      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2400    }
2401    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2402    if (mrJobHistoryWebappAddress != null) {
2403      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2404    }
2405    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2406    if (yarnRMWebappAddress != null) {
2407      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2408    }
2409  }
2410
2411  /**
2412   * Stops the previously started <code>MiniMRCluster</code>.
2413   */
2414  public void shutdownMiniMapReduceCluster() {
2415    if (mrCluster != null) {
2416      LOG.info("Stopping mini mapreduce cluster...");
2417      mrCluster.shutdown();
2418      mrCluster = null;
2419      LOG.info("Mini mapreduce cluster stopped");
2420    }
2421    // Restore configuration to point to local jobtracker
2422    conf.set("mapreduce.jobtracker.address", "local");
2423  }
2424
2425  /**
2426   * Create a stubbed out RegionServerService, mainly for getting FS.
2427   */
2428  public RegionServerServices createMockRegionServerService() throws IOException {
2429    return createMockRegionServerService((ServerName) null);
2430  }
2431
2432  /**
2433   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2434   * TestTokenAuthentication
2435   */
2436  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2437    throws IOException {
2438    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2439    rss.setFileSystem(getTestFileSystem());
2440    rss.setRpcServer(rpc);
2441    return rss;
2442  }
2443
2444  /**
2445   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2446   * TestOpenRegionHandler
2447   */
2448  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2449    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2450    rss.setFileSystem(getTestFileSystem());
2451    return rss;
2452  }
2453
2454  /**
2455   * Expire the Master's session
2456   */
2457  public void expireMasterSession() throws Exception {
2458    HMaster master = getMiniHBaseCluster().getMaster();
2459    expireSession(master.getZooKeeper(), false);
2460  }
2461
2462  /**
2463   * Expire a region server's session
2464   * @param index which RS
2465   */
2466  public void expireRegionServerSession(int index) throws Exception {
2467    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2468    expireSession(rs.getZooKeeper(), false);
2469    decrementMinRegionServerCount();
2470  }
2471
2472  private void decrementMinRegionServerCount() {
2473    // decrement the count for this.conf, for newly spwaned master
2474    // this.hbaseCluster shares this configuration too
2475    decrementMinRegionServerCount(getConfiguration());
2476
2477    // each master thread keeps a copy of configuration
2478    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2479      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2480    }
2481  }
2482
2483  private void decrementMinRegionServerCount(Configuration conf) {
2484    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2485    if (currentCount != -1) {
2486      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2487    }
2488  }
2489
2490  public void expireSession(ZKWatcher nodeZK) throws Exception {
2491    expireSession(nodeZK, false);
2492  }
2493
2494  /**
2495   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2496   * http://hbase.apache.org/book.html#trouble.zookeeper
2497   * <p/>
2498   * There are issues when doing this:
2499   * <ol>
2500   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2501   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2502   * </ol>
2503   * @param nodeZK      - the ZK watcher to expire
2504   * @param checkStatus - true to check if we can create a Table with the current configuration.
2505   */
2506  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2507    Configuration c = new Configuration(this.conf);
2508    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2509    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2510    byte[] password = zk.getSessionPasswd();
2511    long sessionID = zk.getSessionId();
2512
2513    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2514    // so we create a first watcher to be sure that the
2515    // event was sent. We expect that if our watcher receives the event
2516    // other watchers on the same machine will get is as well.
2517    // When we ask to close the connection, ZK does not close it before
2518    // we receive all the events, so don't have to capture the event, just
2519    // closing the connection should be enough.
2520    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2521      @Override
2522      public void process(WatchedEvent watchedEvent) {
2523        LOG.info("Monitor ZKW received event=" + watchedEvent);
2524      }
2525    }, sessionID, password);
2526
2527    // Making it expire
2528    ZooKeeper newZK =
2529      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2530
2531    // ensure that we have connection to the server before closing down, otherwise
2532    // the close session event will be eaten out before we start CONNECTING state
2533    long start = EnvironmentEdgeManager.currentTime();
2534    while (
2535      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2536    ) {
2537      Thread.sleep(1);
2538    }
2539    newZK.close();
2540    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2541
2542    // Now closing & waiting to be sure that the clients get it.
2543    monitor.close();
2544
2545    if (checkStatus) {
2546      getConnection().getTable(TableName.META_TABLE_NAME).close();
2547    }
2548  }
2549
2550  /**
2551   * Get the Mini HBase cluster.
2552   * @return hbase cluster
2553   * @see #getHBaseClusterInterface()
2554   */
2555  public SingleProcessHBaseCluster getHBaseCluster() {
2556    return getMiniHBaseCluster();
2557  }
2558
2559  /**
2560   * Returns the HBaseCluster instance.
2561   * <p>
2562   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2563   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2564   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2565   * instead w/o the need to type-cast.
2566   */
2567  public HBaseClusterInterface getHBaseClusterInterface() {
2568    // implementation note: we should rename this method as #getHBaseCluster(),
2569    // but this would require refactoring 90+ calls.
2570    return hbaseCluster;
2571  }
2572
2573  /**
2574   * Resets the connections so that the next time getConnection() is called, a new connection is
2575   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2576   * the connection is not valid anymore.
2577   * <p/>
2578   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2579   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2580   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2581   * the connection state automatically. Cleaning this is a much bigger refactor.
2582   */
2583  public void invalidateConnection() throws IOException {
2584    closeConnection();
2585    // Update the master addresses if they changed.
2586    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2587    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2588    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2589      masterConfigBefore, masterConfAfter);
2590    conf.set(HConstants.MASTER_ADDRS_KEY,
2591      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2592  }
2593
2594  /**
2595   * Get a shared Connection to the cluster. this method is thread safe.
2596   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2597   */
2598  public Connection getConnection() throws IOException {
2599    return getAsyncConnection().toConnection();
2600  }
2601
2602  /**
2603   * Get a assigned Connection to the cluster. this method is thread safe.
2604   * @param user assigned user
2605   * @return A Connection with assigned user.
2606   */
2607  public Connection getConnection(User user) throws IOException {
2608    return getAsyncConnection(user).toConnection();
2609  }
2610
2611  /**
2612   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2613   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2614   *         of cluster.
2615   */
2616  public AsyncClusterConnection getAsyncConnection() throws IOException {
2617    try {
2618      return asyncConnection.updateAndGet(connection -> {
2619        if (connection == null) {
2620          try {
2621            User user = UserProvider.instantiate(conf).getCurrent();
2622            connection = getAsyncConnection(user);
2623          } catch (IOException ioe) {
2624            throw new UncheckedIOException("Failed to create connection", ioe);
2625          }
2626        }
2627        return connection;
2628      });
2629    } catch (UncheckedIOException exception) {
2630      throw exception.getCause();
2631    }
2632  }
2633
2634  /**
2635   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2636   * @param user assigned user
2637   * @return An AsyncClusterConnection with assigned user.
2638   */
2639  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2640    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2641  }
2642
2643  public void closeConnection() throws IOException {
2644    if (hbaseAdmin != null) {
2645      Closeables.close(hbaseAdmin, true);
2646      hbaseAdmin = null;
2647    }
2648    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2649    if (asyncConnection != null) {
2650      Closeables.close(asyncConnection, true);
2651    }
2652  }
2653
2654  /**
2655   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2656   * it has no effect, it will be closed automatically when the cluster shutdowns
2657   */
2658  public Admin getAdmin() throws IOException {
2659    if (hbaseAdmin == null) {
2660      this.hbaseAdmin = getConnection().getAdmin();
2661    }
2662    return hbaseAdmin;
2663  }
2664
2665  private Admin hbaseAdmin = null;
2666
2667  /**
2668   * Returns an {@link Hbck} instance. Needs be closed when done.
2669   */
2670  public Hbck getHbck() throws IOException {
2671    return getConnection().getHbck();
2672  }
2673
2674  /**
2675   * Unassign the named region.
2676   * @param regionName The region to unassign.
2677   */
2678  public void unassignRegion(String regionName) throws IOException {
2679    unassignRegion(Bytes.toBytes(regionName));
2680  }
2681
2682  /**
2683   * Unassign the named region.
2684   * @param regionName The region to unassign.
2685   */
2686  public void unassignRegion(byte[] regionName) throws IOException {
2687    getAdmin().unassign(regionName);
2688  }
2689
2690  /**
2691   * Closes the region containing the given row.
2692   * @param row   The row to find the containing region.
2693   * @param table The table to find the region.
2694   */
2695  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2696    unassignRegionByRow(Bytes.toBytes(row), table);
2697  }
2698
2699  /**
2700   * Closes the region containing the given row.
2701   * @param row   The row to find the containing region.
2702   * @param table The table to find the region.
2703   */
2704  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2705    HRegionLocation hrl = table.getRegionLocation(row);
2706    unassignRegion(hrl.getRegion().getRegionName());
2707  }
2708
2709  /**
2710   * Retrieves a splittable region randomly from tableName
2711   * @param tableName   name of table
2712   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2713   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2714   */
2715  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2716    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2717    int regCount = regions.size();
2718    Set<Integer> attempted = new HashSet<>();
2719    int idx;
2720    int attempts = 0;
2721    do {
2722      regions = getHBaseCluster().getRegions(tableName);
2723      if (regCount != regions.size()) {
2724        // if there was region movement, clear attempted Set
2725        attempted.clear();
2726      }
2727      regCount = regions.size();
2728      // There are chances that before we get the region for the table from an RS the region may
2729      // be going for CLOSE. This may be because online schema change is enabled
2730      if (regCount > 0) {
2731        idx = ThreadLocalRandom.current().nextInt(regCount);
2732        // if we have just tried this region, there is no need to try again
2733        if (attempted.contains(idx)) {
2734          continue;
2735        }
2736        HRegion region = regions.get(idx);
2737        if (region.checkSplit().isPresent()) {
2738          return region;
2739        }
2740        attempted.add(idx);
2741      }
2742      attempts++;
2743    } while (maxAttempts == -1 || attempts < maxAttempts);
2744    return null;
2745  }
2746
2747  public MiniDFSCluster getDFSCluster() {
2748    return dfsCluster;
2749  }
2750
2751  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2752    setDFSCluster(cluster, true);
2753  }
2754
2755  /**
2756   * Set the MiniDFSCluster
2757   * @param cluster     cluster to use
2758   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2759   *                    is set.
2760   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2761   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2762   */
2763  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2764    throws IllegalStateException, IOException {
2765    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2766      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2767    }
2768    this.dfsCluster = cluster;
2769    this.setFs();
2770  }
2771
2772  public FileSystem getTestFileSystem() throws IOException {
2773    return HFileSystem.get(conf);
2774  }
2775
2776  /**
2777   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2778   * (30 seconds).
2779   * @param table Table to wait on.
2780   */
2781  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2782    waitTableAvailable(table.getName(), 30000);
2783  }
2784
2785  public void waitTableAvailable(TableName table, long timeoutMillis)
2786    throws InterruptedException, IOException {
2787    waitFor(timeoutMillis, predicateTableAvailable(table));
2788  }
2789
2790  /**
2791   * Wait until all regions in a table have been assigned
2792   * @param table         Table to wait on.
2793   * @param timeoutMillis Timeout.
2794   */
2795  public void waitTableAvailable(byte[] table, long timeoutMillis)
2796    throws InterruptedException, IOException {
2797    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2798  }
2799
2800  public String explainTableAvailability(TableName tableName) throws IOException {
2801    StringBuilder msg =
2802      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2803    if (getHBaseCluster().getMaster().isAlive()) {
2804      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2805        .getRegionStates().getRegionAssignments();
2806      final List<Pair<RegionInfo, ServerName>> metaLocations =
2807        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2808      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2809        RegionInfo hri = metaLocation.getFirst();
2810        ServerName sn = metaLocation.getSecond();
2811        if (!assignments.containsKey(hri)) {
2812          msg.append(", region ").append(hri)
2813            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2814        } else if (sn == null) {
2815          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2816        } else if (!sn.equals(assignments.get(hri))) {
2817          msg.append(",  region ").append(hri)
2818            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2819            .append(" <> ").append(assignments.get(hri));
2820        }
2821      }
2822    }
2823    return msg.toString();
2824  }
2825
2826  public String explainTableState(final TableName table, TableState.State state)
2827    throws IOException {
2828    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2829    if (tableState == null) {
2830      return "TableState in META: No table state in META for table " + table
2831        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2832    } else if (!tableState.inStates(state)) {
2833      return "TableState in META: Not " + state + " state, but " + tableState;
2834    } else {
2835      return "TableState in META: OK";
2836    }
2837  }
2838
2839  @Nullable
2840  public TableState findLastTableState(final TableName table) throws IOException {
2841    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2842    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2843      @Override
2844      public boolean visit(Result r) throws IOException {
2845        if (!Arrays.equals(r.getRow(), table.getName())) {
2846          return false;
2847        }
2848        TableState state = CatalogFamilyFormat.getTableState(r);
2849        if (state != null) {
2850          lastTableState.set(state);
2851        }
2852        return true;
2853      }
2854    };
2855    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2856      Integer.MAX_VALUE, visitor);
2857    return lastTableState.get();
2858  }
2859
2860  /**
2861   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2862   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2863   * table.
2864   * @param table the table to wait on.
2865   * @throws InterruptedException if interrupted while waiting
2866   * @throws IOException          if an IO problem is encountered
2867   */
2868  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2869    waitTableEnabled(table, 30000);
2870  }
2871
2872  /**
2873   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2874   * have been all assigned.
2875   * @see #waitTableEnabled(TableName, long)
2876   * @param table         Table to wait on.
2877   * @param timeoutMillis Time to wait on it being marked enabled.
2878   */
2879  public void waitTableEnabled(byte[] table, long timeoutMillis)
2880    throws InterruptedException, IOException {
2881    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2882  }
2883
2884  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2885    waitFor(timeoutMillis, predicateTableEnabled(table));
2886  }
2887
2888  /**
2889   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2890   * after default period (30 seconds)
2891   * @param table Table to wait on.
2892   */
2893  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2894    waitTableDisabled(table, 30000);
2895  }
2896
2897  public void waitTableDisabled(TableName table, long millisTimeout)
2898    throws InterruptedException, IOException {
2899    waitFor(millisTimeout, predicateTableDisabled(table));
2900  }
2901
2902  /**
2903   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2904   * @param table         Table to wait on.
2905   * @param timeoutMillis Time to wait on it being marked disabled.
2906   */
2907  public void waitTableDisabled(byte[] table, long timeoutMillis)
2908    throws InterruptedException, IOException {
2909    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2910  }
2911
2912  /**
2913   * Make sure that at least the specified number of region servers are running
2914   * @param num minimum number of region servers that should be running
2915   * @return true if we started some servers
2916   */
2917  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2918    boolean startedServer = false;
2919    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2920    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2921      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2922      startedServer = true;
2923    }
2924
2925    return startedServer;
2926  }
2927
2928  /**
2929   * Make sure that at least the specified number of region servers are running. We don't count the
2930   * ones that are currently stopping or are stopped.
2931   * @param num minimum number of region servers that should be running
2932   * @return true if we started some servers
2933   */
2934  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2935    boolean startedServer = ensureSomeRegionServersAvailable(num);
2936
2937    int nonStoppedServers = 0;
2938    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2939
2940      HRegionServer hrs = rst.getRegionServer();
2941      if (hrs.isStopping() || hrs.isStopped()) {
2942        LOG.info("A region server is stopped or stopping:" + hrs);
2943      } else {
2944        nonStoppedServers++;
2945      }
2946    }
2947    for (int i = nonStoppedServers; i < num; ++i) {
2948      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2949      startedServer = true;
2950    }
2951    return startedServer;
2952  }
2953
2954  /**
2955   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2956   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2957   * @param c                     Initial configuration
2958   * @param differentiatingSuffix Suffix to differentiate this user from others.
2959   * @return A new configuration instance with a different user set into it.
2960   */
2961  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2962    throws IOException {
2963    FileSystem currentfs = FileSystem.get(c);
2964    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2965      return User.getCurrent();
2966    }
2967    // Else distributed filesystem. Make a new instance per daemon. Below
2968    // code is taken from the AppendTestUtil over in hdfs.
2969    String username = User.getCurrent().getName() + differentiatingSuffix;
2970    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2971    return user;
2972  }
2973
2974  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2975    throws IOException {
2976    NavigableSet<String> online = new TreeSet<>();
2977    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2978      try {
2979        for (RegionInfo region : ProtobufUtil
2980          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2981          online.add(region.getRegionNameAsString());
2982        }
2983      } catch (RegionServerStoppedException e) {
2984        // That's fine.
2985      }
2986    }
2987    return online;
2988  }
2989
2990  /**
2991   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2992   * linger. Here is the exception you'll see:
2993   *
2994   * <pre>
2995   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2996   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2997   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2998   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2999   * </pre>
3000   *
3001   * @param stream A DFSClient.DFSOutputStream.
3002   */
3003  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
3004    try {
3005      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
3006      for (Class<?> clazz : clazzes) {
3007        String className = clazz.getSimpleName();
3008        if (className.equals("DFSOutputStream")) {
3009          if (clazz.isInstance(stream)) {
3010            Field maxRecoveryErrorCountField =
3011              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3012            maxRecoveryErrorCountField.setAccessible(true);
3013            maxRecoveryErrorCountField.setInt(stream, max);
3014            break;
3015          }
3016        }
3017      }
3018    } catch (Exception e) {
3019      LOG.info("Could not set max recovery field", e);
3020    }
3021  }
3022
3023  /**
3024   * Uses directly the assignment manager to assign the region. and waits until the specified region
3025   * has completed assignment.
3026   * @return true if the region is assigned false otherwise.
3027   */
3028  public boolean assignRegion(final RegionInfo regionInfo)
3029    throws IOException, InterruptedException {
3030    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3031    am.assign(regionInfo);
3032    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3033  }
3034
3035  /**
3036   * Move region to destination server and wait till region is completely moved and online
3037   * @param destRegion region to move
3038   * @param destServer destination server of the region
3039   */
3040  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3041    throws InterruptedException, IOException {
3042    HMaster master = getMiniHBaseCluster().getMaster();
3043    // TODO: Here we start the move. The move can take a while.
3044    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3045    while (true) {
3046      ServerName serverName =
3047        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3048      if (serverName != null && serverName.equals(destServer)) {
3049        assertRegionOnServer(destRegion, serverName, 2000);
3050        break;
3051      }
3052      Thread.sleep(10);
3053    }
3054  }
3055
3056  /**
3057   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3058   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3059   * master has been informed and updated hbase:meta with the regions deployed server.
3060   * @param tableName the table name
3061   */
3062  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3063    waitUntilAllRegionsAssigned(tableName,
3064      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3065  }
3066
3067  /**
3068   * Waith until all system table's regions get assigned
3069   */
3070  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3071    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3072  }
3073
3074  /**
3075   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3076   * timeout. This means all regions have been deployed, master has been informed and updated
3077   * hbase:meta with the regions deployed server.
3078   * @param tableName the table name
3079   * @param timeout   timeout, in milliseconds
3080   */
3081  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3082    throws IOException {
3083    if (!TableName.isMetaTableName(tableName)) {
3084      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3085        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3086          + timeout + "ms");
3087        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3088          @Override
3089          public String explainFailure() throws IOException {
3090            return explainTableAvailability(tableName);
3091          }
3092
3093          @Override
3094          public boolean evaluate() throws IOException {
3095            Scan scan = new Scan();
3096            scan.addFamily(HConstants.CATALOG_FAMILY);
3097            boolean tableFound = false;
3098            try (ResultScanner s = meta.getScanner(scan)) {
3099              for (Result r; (r = s.next()) != null;) {
3100                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3101                RegionInfo info = RegionInfo.parseFromOrNull(b);
3102                if (info != null && info.getTable().equals(tableName)) {
3103                  // Get server hosting this region from catalog family. Return false if no server
3104                  // hosting this region, or if the server hosting this region was recently killed
3105                  // (for fault tolerance testing).
3106                  tableFound = true;
3107                  byte[] server =
3108                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3109                  if (server == null) {
3110                    return false;
3111                  } else {
3112                    byte[] startCode =
3113                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3114                    ServerName serverName =
3115                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3116                        + Bytes.toLong(startCode));
3117                    if (
3118                      !getHBaseClusterInterface().isDistributedCluster()
3119                        && getHBaseCluster().isKilledRS(serverName)
3120                    ) {
3121                      return false;
3122                    }
3123                  }
3124                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3125                    return false;
3126                  }
3127                }
3128              }
3129            }
3130            if (!tableFound) {
3131              LOG.warn(
3132                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3133            }
3134            return tableFound;
3135          }
3136        });
3137      }
3138    }
3139    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3140    // check from the master state if we are using a mini cluster
3141    if (!getHBaseClusterInterface().isDistributedCluster()) {
3142      // So, all regions are in the meta table but make sure master knows of the assignments before
3143      // returning -- sometimes this can lag.
3144      HMaster master = getHBaseCluster().getMaster();
3145      final RegionStates states = master.getAssignmentManager().getRegionStates();
3146      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3147        @Override
3148        public String explainFailure() throws IOException {
3149          return explainTableAvailability(tableName);
3150        }
3151
3152        @Override
3153        public boolean evaluate() throws IOException {
3154          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3155          return hris != null && !hris.isEmpty();
3156        }
3157      });
3158    }
3159    LOG.info("All regions for table " + tableName + " assigned.");
3160  }
3161
3162  /**
3163   * Do a small get/scan against one store. This is required because store has no actual methods of
3164   * querying itself, and relies on StoreScanner.
3165   */
3166  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3167    Scan scan = new Scan(get);
3168    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3169      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3170      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3171      // readpoint 0.
3172      0);
3173
3174    List<Cell> result = new ArrayList<>();
3175    scanner.next(result);
3176    if (!result.isEmpty()) {
3177      // verify that we are on the row we want:
3178      Cell kv = result.get(0);
3179      if (!CellUtil.matchingRows(kv, get.getRow())) {
3180        result.clear();
3181      }
3182    }
3183    scanner.close();
3184    return result;
3185  }
3186
3187  /**
3188   * Create region split keys between startkey and endKey
3189   * @param numRegions the number of regions to be created. it has to be greater than 3.
3190   * @return resulting split keys
3191   */
3192  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3193    assertTrue(numRegions > 3);
3194    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3195    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3196    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3197    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3198    return result;
3199  }
3200
3201  /**
3202   * Do a small get/scan against one store. This is required because store has no actual methods of
3203   * querying itself, and relies on StoreScanner.
3204   */
3205  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3206    throws IOException {
3207    Get get = new Get(row);
3208    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3209    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3210
3211    return getFromStoreFile(store, get);
3212  }
3213
3214  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3215    final List<? extends Cell> actual) {
3216    final int eLen = expected.size();
3217    final int aLen = actual.size();
3218    final int minLen = Math.min(eLen, aLen);
3219
3220    int i = 0;
3221    while (
3222      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3223    ) {
3224      i++;
3225    }
3226
3227    if (additionalMsg == null) {
3228      additionalMsg = "";
3229    }
3230    if (!additionalMsg.isEmpty()) {
3231      additionalMsg = ". " + additionalMsg;
3232    }
3233
3234    if (eLen != aLen || i != minLen) {
3235      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3236        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3237        + " (length " + aLen + ")" + additionalMsg);
3238    }
3239  }
3240
3241  public static <T> String safeGetAsStr(List<T> lst, int i) {
3242    if (0 <= i && i < lst.size()) {
3243      return lst.get(i).toString();
3244    } else {
3245      return "<out_of_range>";
3246    }
3247  }
3248
3249  public String getRpcConnnectionURI() throws UnknownHostException {
3250    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3251  }
3252
3253  public String getZkConnectionURI() {
3254    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3255      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3256      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3257  }
3258
3259  /**
3260   * Get the zk based cluster key for this cluster.
3261   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3262   *             connection info of a cluster. Keep here only for compatibility.
3263   * @see #getRpcConnnectionURI()
3264   * @see #getZkConnectionURI()
3265   */
3266  @Deprecated
3267  public String getClusterKey() {
3268    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3269      + ":"
3270      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3271  }
3272
3273  /**
3274   * Creates a random table with the given parameters
3275   */
3276  public Table createRandomTable(TableName tableName, final Collection<String> families,
3277    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3278    final int numRowsPerFlush) throws IOException, InterruptedException {
3279    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3280      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3281      + maxVersions + "\n");
3282
3283    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3284    final int numCF = families.size();
3285    final byte[][] cfBytes = new byte[numCF][];
3286    {
3287      int cfIndex = 0;
3288      for (String cf : families) {
3289        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3290      }
3291    }
3292
3293    final int actualStartKey = 0;
3294    final int actualEndKey = Integer.MAX_VALUE;
3295    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3296    final int splitStartKey = actualStartKey + keysPerRegion;
3297    final int splitEndKey = actualEndKey - keysPerRegion;
3298    final String keyFormat = "%08x";
3299    final Table table = createTable(tableName, cfBytes, maxVersions,
3300      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3301      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3302
3303    if (hbaseCluster != null) {
3304      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3305    }
3306
3307    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3308
3309    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3310      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3311        final byte[] row = Bytes.toBytes(
3312          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3313
3314        Put put = new Put(row);
3315        Delete del = new Delete(row);
3316        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3317          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3318          final long ts = rand.nextInt();
3319          final byte[] qual = Bytes.toBytes("col" + iCol);
3320          if (rand.nextBoolean()) {
3321            final byte[] value =
3322              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3323                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3324            put.addColumn(cf, qual, ts, value);
3325          } else if (rand.nextDouble() < 0.8) {
3326            del.addColumn(cf, qual, ts);
3327          } else {
3328            del.addColumns(cf, qual, ts);
3329          }
3330        }
3331
3332        if (!put.isEmpty()) {
3333          mutator.mutate(put);
3334        }
3335
3336        if (!del.isEmpty()) {
3337          mutator.mutate(del);
3338        }
3339      }
3340      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3341      mutator.flush();
3342      if (hbaseCluster != null) {
3343        getMiniHBaseCluster().flushcache(table.getName());
3344      }
3345    }
3346    mutator.close();
3347
3348    return table;
3349  }
3350
3351  public static int randomFreePort() {
3352    return HBaseCommonTestingUtil.randomFreePort();
3353  }
3354
3355  public static String randomMultiCastAddress() {
3356    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3357  }
3358
3359  public static void waitForHostPort(String host, int port) throws IOException {
3360    final int maxTimeMs = 10000;
3361    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3362    IOException savedException = null;
3363    LOG.info("Waiting for server at " + host + ":" + port);
3364    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3365      try {
3366        Socket sock = new Socket(InetAddress.getByName(host), port);
3367        sock.close();
3368        savedException = null;
3369        LOG.info("Server at " + host + ":" + port + " is available");
3370        break;
3371      } catch (UnknownHostException e) {
3372        throw new IOException("Failed to look up " + host, e);
3373      } catch (IOException e) {
3374        savedException = e;
3375      }
3376      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3377    }
3378
3379    if (savedException != null) {
3380      throw savedException;
3381    }
3382  }
3383
3384  public static int getMetaRSPort(Connection connection) throws IOException {
3385    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3386      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3387    }
3388  }
3389
3390  /**
3391   * Due to async racing issue, a region may not be in the online region list of a region server
3392   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3393   */
3394  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3395    final long timeout) throws IOException, InterruptedException {
3396    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3397    while (true) {
3398      List<RegionInfo> regions = getAdmin().getRegions(server);
3399      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3400      long now = EnvironmentEdgeManager.currentTime();
3401      if (now > timeoutTime) break;
3402      Thread.sleep(10);
3403    }
3404    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3405  }
3406
3407  /**
3408   * Check to make sure the region is open on the specified region server, but not on any other one.
3409   */
3410  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3411    final long timeout) throws IOException, InterruptedException {
3412    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3413    while (true) {
3414      List<RegionInfo> regions = getAdmin().getRegions(server);
3415      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3416        List<JVMClusterUtil.RegionServerThread> rsThreads =
3417          getHBaseCluster().getLiveRegionServerThreads();
3418        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3419          HRegionServer rs = rsThread.getRegionServer();
3420          if (server.equals(rs.getServerName())) {
3421            continue;
3422          }
3423          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3424          for (HRegion r : hrs) {
3425            assertTrue("Region should not be double assigned",
3426              r.getRegionInfo().getRegionId() != hri.getRegionId());
3427          }
3428        }
3429        return; // good, we are happy
3430      }
3431      long now = EnvironmentEdgeManager.currentTime();
3432      if (now > timeoutTime) break;
3433      Thread.sleep(10);
3434    }
3435    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3436  }
3437
3438  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3439    TableDescriptor td =
3440      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3441    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3442    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3443  }
3444
3445  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3446    BlockCache blockCache) throws IOException {
3447    TableDescriptor td =
3448      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3449    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3450    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3451  }
3452
3453  public static void setFileSystemURI(String fsURI) {
3454    FS_URI = fsURI;
3455  }
3456
3457  /**
3458   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3459   */
3460  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3461    return new ExplainingPredicate<IOException>() {
3462      @Override
3463      public String explainFailure() throws IOException {
3464        final RegionStates regionStates =
3465          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3466        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3467      }
3468
3469      @Override
3470      public boolean evaluate() throws IOException {
3471        HMaster master = getMiniHBaseCluster().getMaster();
3472        if (master == null) return false;
3473        AssignmentManager am = master.getAssignmentManager();
3474        if (am == null) return false;
3475        return !am.hasRegionsInTransition();
3476      }
3477    };
3478  }
3479
3480  /**
3481   * Returns a {@link Predicate} for checking that table is enabled
3482   */
3483  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3484    return new ExplainingPredicate<IOException>() {
3485      @Override
3486      public String explainFailure() throws IOException {
3487        return explainTableState(tableName, TableState.State.ENABLED);
3488      }
3489
3490      @Override
3491      public boolean evaluate() throws IOException {
3492        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3493      }
3494    };
3495  }
3496
3497  /**
3498   * Returns a {@link Predicate} for checking that table is enabled
3499   */
3500  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3501    return new ExplainingPredicate<IOException>() {
3502      @Override
3503      public String explainFailure() throws IOException {
3504        return explainTableState(tableName, TableState.State.DISABLED);
3505      }
3506
3507      @Override
3508      public boolean evaluate() throws IOException {
3509        return getAdmin().isTableDisabled(tableName);
3510      }
3511    };
3512  }
3513
3514  /**
3515   * Returns a {@link Predicate} for checking that table is enabled
3516   */
3517  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3518    return new ExplainingPredicate<IOException>() {
3519      @Override
3520      public String explainFailure() throws IOException {
3521        return explainTableAvailability(tableName);
3522      }
3523
3524      @Override
3525      public boolean evaluate() throws IOException {
3526        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3527        if (tableAvailable) {
3528          try (Table table = getConnection().getTable(tableName)) {
3529            TableDescriptor htd = table.getDescriptor();
3530            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3531              .getAllRegionLocations()) {
3532              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3533                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3534                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3535              for (byte[] family : htd.getColumnFamilyNames()) {
3536                scan.addFamily(family);
3537              }
3538              try (ResultScanner scanner = table.getScanner(scan)) {
3539                scanner.next();
3540              }
3541            }
3542          }
3543        }
3544        return tableAvailable;
3545      }
3546    };
3547  }
3548
3549  /**
3550   * Wait until no regions in transition.
3551   * @param timeout How long to wait.
3552   */
3553  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3554    waitFor(timeout, predicateNoRegionsInTransition());
3555  }
3556
3557  /**
3558   * Wait until no regions in transition. (time limit 15min)
3559   */
3560  public void waitUntilNoRegionsInTransition() throws IOException {
3561    waitUntilNoRegionsInTransition(15 * 60000);
3562  }
3563
3564  /**
3565   * Wait until labels is ready in VisibilityLabelsCache.
3566   */
3567  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3568    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3569    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3570
3571      @Override
3572      public boolean evaluate() {
3573        for (String label : labels) {
3574          if (labelsCache.getLabelOrdinal(label) == 0) {
3575            return false;
3576          }
3577        }
3578        return true;
3579      }
3580
3581      @Override
3582      public String explainFailure() {
3583        for (String label : labels) {
3584          if (labelsCache.getLabelOrdinal(label) == 0) {
3585            return label + " is not available yet";
3586          }
3587        }
3588        return "";
3589      }
3590    });
3591  }
3592
3593  /**
3594   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3595   * available.
3596   * @return the list of column descriptors
3597   */
3598  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3599    return generateColumnDescriptors("");
3600  }
3601
3602  /**
3603   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3604   * available.
3605   * @param prefix family names prefix
3606   * @return the list of column descriptors
3607   */
3608  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3609    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3610    long familyId = 0;
3611    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3612      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3613        for (BloomType bloomType : BloomType.values()) {
3614          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3615          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3616            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3617          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3618          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3619          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3620          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3621          familyId++;
3622        }
3623      }
3624    }
3625    return columnFamilyDescriptors;
3626  }
3627
3628  /**
3629   * Get supported compression algorithms.
3630   * @return supported compression algorithms.
3631   */
3632  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3633    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3634    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3635    for (String algoName : allAlgos) {
3636      try {
3637        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3638        algo.getCompressor();
3639        supportedAlgos.add(algo);
3640      } catch (Throwable t) {
3641        // this algo is not available
3642      }
3643    }
3644    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3645  }
3646
3647  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3648    Scan scan = new Scan().withStartRow(row);
3649    scan.setReadType(ReadType.PREAD);
3650    scan.setCaching(1);
3651    scan.setReversed(true);
3652    scan.addFamily(family);
3653    try (RegionScanner scanner = r.getScanner(scan)) {
3654      List<Cell> cells = new ArrayList<>(1);
3655      scanner.next(cells);
3656      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3657        return null;
3658      }
3659      return Result.create(cells);
3660    }
3661  }
3662
3663  private boolean isTargetTable(final byte[] inRow, Cell c) {
3664    String inputRowString = Bytes.toString(inRow);
3665    int i = inputRowString.indexOf(HConstants.DELIMITER);
3666    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3667    int o = outputRowString.indexOf(HConstants.DELIMITER);
3668    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3669  }
3670
3671  /**
3672   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3673   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3674   * kerby KDC server and utility for using it,
3675   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3676   * less baggage. It came in in HBASE-5291.
3677   */
3678  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3679    Properties conf = MiniKdc.createConf();
3680    conf.put(MiniKdc.DEBUG, true);
3681    MiniKdc kdc = null;
3682    File dir = null;
3683    // There is time lag between selecting a port and trying to bind with it. It's possible that
3684    // another service captures the port in between which'll result in BindException.
3685    boolean bindException;
3686    int numTries = 0;
3687    do {
3688      try {
3689        bindException = false;
3690        dir = new File(getDataTestDir("kdc").toUri().getPath());
3691        kdc = new MiniKdc(conf, dir);
3692        kdc.start();
3693      } catch (BindException e) {
3694        FileUtils.deleteDirectory(dir); // clean directory
3695        numTries++;
3696        if (numTries == 3) {
3697          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3698          throw e;
3699        }
3700        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3701        bindException = true;
3702      }
3703    } while (bindException);
3704    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3705    return kdc;
3706  }
3707
3708  public int getNumHFiles(final TableName tableName, final byte[] family) {
3709    int numHFiles = 0;
3710    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3711      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3712    }
3713    return numHFiles;
3714  }
3715
3716  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3717    final byte[] family) {
3718    int numHFiles = 0;
3719    for (Region region : rs.getRegions(tableName)) {
3720      numHFiles += region.getStore(family).getStorefilesCount();
3721    }
3722    return numHFiles;
3723  }
3724
3725  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3726    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3727    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3728    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3729    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3730    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3731        it2 = rtdFamilies.iterator(); it.hasNext();) {
3732      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3733    }
3734  }
3735
3736  /**
3737   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3738   * invocations.
3739   */
3740  public static void await(final long sleepMillis, final BooleanSupplier condition)
3741    throws InterruptedException {
3742    try {
3743      while (!condition.getAsBoolean()) {
3744        Thread.sleep(sleepMillis);
3745      }
3746    } catch (RuntimeException e) {
3747      if (e.getCause() instanceof AssertionError) {
3748        throw (AssertionError) e.getCause();
3749      }
3750      throw e;
3751    }
3752  }
3753}